您的位置:首页 > 运维架构

Hadoop WordCount源码解读

2015-07-26 17:43 351 查看
MapReduce编程模型

MapReduce采用“分而治之”的思想。将HDFS上海量数据切分成为若干块,将每块的数据分给集群上的节点进行计算。然后通过整合各节点的中间结果,得到最终的结果。

HDFS上默认块的大小要比磁盘默认的大小大的多。其目的是为了最小化寻址开销。如果块设置得足够大,从磁盘传输数据的时间明显大于定位这个块开始位置所需时间。这样,传输一个由多个块组成的文件时间取决于磁盘传输速率。HDFS默认块的大小为64MB。随着磁盘驱动器的进一步发展块的默认大小可以设置的更大。

MapReduce的处理过程

一个复杂的MapReduce任务可以分为若干个Job。每个Job又可以分为Mapper和Reducer两个阶段。这两个阶段对应到代码内就是继承Mapper的内部类和继承Reducer的内部类。继承Mapper的内部类需要实现map函数,继承Reducer的内部类需要实现Reduce函数。Map函数接收一个<key,value>的键值对同时也会输出一个 <key,value> 的键值对。Reduce函数接收一个<key,list of values>(值为所有键为key的value集合,例如: map的输出为<1,1>,<1,2>,<1,3>,<1,4>则reduce的输入为<1,[1,2,3,4]>)同时经过处理后同样会输出<key,value>键值对。MapReduce运行过程的数据流



import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}


首先分析主函数Main方法

Configuration conf = new Configuration();


系统运行时会加载Hadoop默认的一些配置。

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}


程序运行时会接收从命令行传来的一些参数。默认第一个参数为程序要处理的数据文件夹路径,即in文件夹路径。默认第二个参数为程序结果要输出到的文件夹路径,即out文件夹路径。如果命令行的参数少于2个时程序会停止运行。

Job job = new Job(conf, "word count");


初始化一个Job任务,这个Job任务需要加载hadoop的一些配置,并给这个Job 命名为“word count”

job.setJarByClass(WordCount.class);

这个方法使用了WordCount.class的类加载器来寻找包含该类的Jar包,然后设置该Jar包为作业所用的Jar包。

job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);


setMapperClass 该方法设置了该Job任务所使用的Mapper类(拆分)。

setCombinerClass 该方法设置了该Job任务所使用的Combiner类(中间结果合并)。

setReducerClass该方法设置了该Job任务所使用的Reducer类(合并)。

可以发现Combiner处理类和Reducer处理类使用的是同一个类即IntSumReducer类。为什么这两个风马牛不相及的处理类可以使用同一个类呢?这仅仅是巧合吗?答案是否定的。

Combiner本质上是一个本地的Reducer方法。在集群环境中,带宽是这个集群最稀有的资源。可以先将本地map所处理的数据进行本地Reduce,然后再将结果传给集群其他节点进行处理。这样设计的好处是,可以节省集群的带宽使用率。但是,并不是所有的Combine处理过程均能使用Reducer的处理类,这需要在逻辑上考虑是否可移植!要想进行本地reduce(combine),一个必要的条件是,reduce的输入输出格式必须一样!

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);


设置Reducer的键输出的类型为Text类型,值输出的类型为IntWritable类型。例如本程序输出的单词和其出现的次数<单词,次数>。

Text类似于Java的String类型。

IntWritable类型类似于Java的int类型。

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));


设置程序输入和输出的路径。本示例是从命令行中接收参数。第一个参数为输入路径。第二个参数为输出路径。

与其他语言一样。Hadoop会有自己的一些默认配置。本示例中使用默认的输入方式即TextInputFormat。

Job. setInputFormat(TextInputFormat.class );

TextInputFormat是Hadoop的默认输入方式。在TextInputFormat方式中,系统会自动将输入文件的每行数据形成一条记录,每条记录表示成<key,value>的形式,这条记录会作为map的输入数据。key值是每个数据的记录在数据分片中字节偏移量,数据类型为LongWritable。LongWritable类似于Java的Long类型。Value值是每行的内容,数据类型为Text。

比如,输入文件为

Hello world

Hello hadoop

则会形成

<0,Hello world>

<12,Hello hadoop>

Map类中map方法分析

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>


用户自定义map类需要继承Mapper类,并实现map方法。此类是一种规范类型,它有四种形式的参数分别用来指定map的输入key值的类型、map的输入value值的类型、map的输出key值类型、map的输出value值类型。由于本示例中使用的是默认的TextInputFormat输入类型。所以map输入键的类型为LongWritable的父类型Object,map的输入值的类型为Text。因为map要输出的键值对类型为<单词,次数>,所以map的输出key值类型为Text,输出value值的类型为IntWritable。

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}


Map方法为用户想要实现的特定的功能。在本示例中,map方法对输入的记录以空格为单位进行切分,然后使用context对象进行输出。Context包含运行时的上下文信息。

Reduce类中reduce方法分析

public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable>


同样的,用户自定义Reduce需要继承Reducer类,并实现reduce方法。此类也是一种规范类型,它同样有四种形式的参数用来指定Reduce的输入key值类型、输入value值类型、输出key类型、输出value类型。但,reduce的输入key值的类型必须和map输出key值类型相同。Reduce的输人value值的类型必须和map输出value值类型相同。


public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}


当数据输入到Reduce端时,key为具体的单词,而values是对应单词的计数值所组成的列表,Map的输出就是Reduce的输入,所以reduce方法只要遍历values并求和,即可得到某个单词的总次数。

WordCount处理过程

⒈MapReduce框架会自动将输入文件分片。由于测试文件较小,故将每个文件分为一个split,并将文件按行分割形成<key,value>对。



⒉将分割好的<key,value>对,交给用户定义好的map进行处理,并输出<key,value>对。



⒊map方法输出<key,value>对后,Mapper会自动将输出的<key,value>对排序,然后执行Combine过程即本地Reduce方法。



⒋Reducer会先将Combine结果排序,并将具有相同的key的value形成集合。最后通过用户自定义的reduce方法输出结果。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: