您的位置:首页 > 运维架构

Hadoop Wordcount 程序 详解

2015-05-29 11:06 543 查看
package org.apache.hadoop.examples;

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
//输入key值类型、输入value值类型、输出key值类型和输出value值类型。
//输入key值是每个数据的记录在数据分片中字节偏移量,数据类型是LongWritable;  value值是每行的内容,数据类型是Text。
//在本例中需要输出<word,1>,因此输出的key值类型是Text,输出的value值类型是IntWritable。
      private final static IntWritable one = new IntWritable(1);

      private Text word = new Text();

      public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//这里有三个参数,前面两个Object key, Text value就是输入的key和value,第三个参数Context contextcontext应该是用来传递数据以及其他运行状态信息,
//map中的key、value写入context,让它传递给Reducer进行reduce,

        StringTokenizer itr = new StringTokenizer(value.toString());   
//StringTokenizer类(import java.util.StringTokenizer;)将每一行拆分成为一个个的单词,并将<word,1>作为map方法的结果输出

        while (itr.hasMoreTokens()) {

        word.set(itr.nextToken());  //将某一个单词写入word变量。

        context.write(word, one);   //同时将这个word变量和常量1 组合成一个context对。  这个context 对会自动传递给reduce类。

      }

    }

  }

//将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key,value>对 <0,Hello World>,<12,Bye World>。。。。。。
//将分割好的<key,value>对交给用户定义的map方法进行处理,生成新的<key,value>对,   
// <0,Hello World>,<12,Bye World> -----><Hello,1>,<World,1>,<Bye,1>,<World,1>,
//得到map方法输出的<key,value>对后,Mapper会自动将它们按照key值进行排序,
//   然后<span style="font-family: Arial, Helvetica, sans-serif;">job.setCombinerClass(IntSumReducer.class);</span><span style="font-family: Arial, Helvetica, sans-serif;">执行Combine过程,将key相同value值累加,得到Mapper的最终输出结果。输出  <Bye,1>,<Hello,1>,<World,2></span>
//比如第一个split通过map输出<Bye,1>,<Hello,1>,<World,2>,第二个split通过map输出<Bye,1>,<Hadoop,2>,<Hello,1>.  
//Reducer端会自动排序, 生成 <Bye,list(1,1)>,<Hadoop,list(2)>,<Hello,list(1,1,)> ,<World,list(1,1,)>,
//然后将这样的结果交给 <span style="font-family: Arial, Helvetica, sans-serif;">  public void reduce 函数处理</span>

  public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
//Reduce类以map的输出作为输入,因此Reduce的输入类型是<Text,Intwritable>。 而Reduce的输出是单词和它的数目,因此,它的输出类型是<Text,IntWritable>。
      private IntWritable result = new IntWritable();

      public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

        int sum = 0;

        for (IntWritable val : values) {

           sum += val.get();

        }

      result.set(sum);

      context.write(key, result);

    }

  }

  public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

    if (otherArgs.length != 2) {

      System.err.println("Usage: wordcount <in> <out>");

      System.exit(2);

    }

    Job job = new Job(conf, "word count");

    job.setJarByClass(WordCount.class);

    job.setMapperClass(TokenizerMapper.class);

    job.setCombinerClass(IntSumReducer.class);  //中间结果合并

    job.setReducerClass(IntSumReducer.class);  //合并.这里用Reduce类来进行Map产生的中间结果合并,避免给网络数据传输产生压力。

    job.setOutputKeyClass(Text.class);  //因为结果是<单词,个数>,所以key设置为"Text"类型

    job.setOutputValueClass(IntWritable.class);  //Value设置为"IntWritable",相当于Java中的int类型。

    FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //设置输入路径

    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); //设置输出路径

    System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: