Hadoop WordCount源码解读
2015-07-26 17:43
351 查看
MapReduce编程模型
MapReduce采用“分而治之”的思想。将HDFS上海量数据切分成为若干块,将每块的数据分给集群上的节点进行计算。然后通过整合各节点的中间结果,得到最终的结果。
HDFS上默认块的大小要比磁盘默认的大小大的多。其目的是为了最小化寻址开销。如果块设置得足够大,从磁盘传输数据的时间明显大于定位这个块开始位置所需时间。这样,传输一个由多个块组成的文件时间取决于磁盘传输速率。HDFS默认块的大小为64MB。随着磁盘驱动器的进一步发展块的默认大小可以设置的更大。
MapReduce的处理过程
一个复杂的MapReduce任务可以分为若干个Job。每个Job又可以分为Mapper和Reducer两个阶段。这两个阶段对应到代码内就是继承Mapper的内部类和继承Reducer的内部类。继承Mapper的内部类需要实现map函数,继承Reducer的内部类需要实现Reduce函数。Map函数接收一个<key,value>的键值对同时也会输出一个 <key,value> 的键值对。Reduce函数接收一个<key,list of values>(值为所有键为key的value集合,例如: map的输出为<1,1>,<1,2>,<1,3>,<1,4>则reduce的输入为<1,[1,2,3,4]>)同时经过处理后同样会输出<key,value>键值对。MapReduce运行过程的数据流
首先分析主函数Main方法
系统运行时会加载Hadoop默认的一些配置。
程序运行时会接收从命令行传来的一些参数。默认第一个参数为程序要处理的数据文件夹路径,即in文件夹路径。默认第二个参数为程序结果要输出到的文件夹路径,即out文件夹路径。如果命令行的参数少于2个时程序会停止运行。
初始化一个Job任务,这个Job任务需要加载hadoop的一些配置,并给这个Job 命名为“word count”
job.setJarByClass(WordCount.class);
这个方法使用了WordCount.class的类加载器来寻找包含该类的Jar包,然后设置该Jar包为作业所用的Jar包。
setMapperClass 该方法设置了该Job任务所使用的Mapper类(拆分)。
setCombinerClass 该方法设置了该Job任务所使用的Combiner类(中间结果合并)。
setReducerClass该方法设置了该Job任务所使用的Reducer类(合并)。
可以发现Combiner处理类和Reducer处理类使用的是同一个类即IntSumReducer类。为什么这两个风马牛不相及的处理类可以使用同一个类呢?这仅仅是巧合吗?答案是否定的。
Combiner本质上是一个本地的Reducer方法。在集群环境中,带宽是这个集群最稀有的资源。可以先将本地map所处理的数据进行本地Reduce,然后再将结果传给集群其他节点进行处理。这样设计的好处是,可以节省集群的带宽使用率。但是,并不是所有的Combine处理过程均能使用Reducer的处理类,这需要在逻辑上考虑是否可移植!要想进行本地reduce(combine),一个必要的条件是,reduce的输入输出格式必须一样!
设置Reducer的键输出的类型为Text类型,值输出的类型为IntWritable类型。例如本程序输出的单词和其出现的次数<单词,次数>。
Text类似于Java的String类型。
IntWritable类型类似于Java的int类型。
设置程序输入和输出的路径。本示例是从命令行中接收参数。第一个参数为输入路径。第二个参数为输出路径。
与其他语言一样。Hadoop会有自己的一些默认配置。本示例中使用默认的输入方式即TextInputFormat。
Job. setInputFormat(TextInputFormat.class );
TextInputFormat是Hadoop的默认输入方式。在TextInputFormat方式中,系统会自动将输入文件的每行数据形成一条记录,每条记录表示成<key,value>的形式,这条记录会作为map的输入数据。key值是每个数据的记录在数据分片中字节偏移量,数据类型为LongWritable。LongWritable类似于Java的Long类型。Value值是每行的内容,数据类型为Text。
比如,输入文件为
Hello world
Hello hadoop
则会形成
<0,Hello world>
<12,Hello hadoop>
Map类中map方法分析
用户自定义map类需要继承Mapper类,并实现map方法。此类是一种规范类型,它有四种形式的参数分别用来指定map的输入key值的类型、map的输入value值的类型、map的输出key值类型、map的输出value值类型。由于本示例中使用的是默认的TextInputFormat输入类型。所以map输入键的类型为LongWritable的父类型Object,map的输入值的类型为Text。因为map要输出的键值对类型为<单词,次数>,所以map的输出key值类型为Text,输出value值的类型为IntWritable。
Map方法为用户想要实现的特定的功能。在本示例中,map方法对输入的记录以空格为单位进行切分,然后使用context对象进行输出。Context包含运行时的上下文信息。
Reduce类中reduce方法分析
当数据输入到Reduce端时,key为具体的单词,而values是对应单词的计数值所组成的列表,Map的输出就是Reduce的输入,所以reduce方法只要遍历values并求和,即可得到某个单词的总次数。
WordCount处理过程
⒈MapReduce框架会自动将输入文件分片。由于测试文件较小,故将每个文件分为一个split,并将文件按行分割形成<key,value>对。
⒉将分割好的<key,value>对,交给用户定义好的map进行处理,并输出<key,value>对。
⒊map方法输出<key,value>对后,Mapper会自动将输出的<key,value>对排序,然后执行Combine过程即本地Reduce方法。
⒋Reducer会先将Combine结果排序,并将具有相同的key的value形成集合。最后通过用户自定义的reduce方法输出结果。
MapReduce采用“分而治之”的思想。将HDFS上海量数据切分成为若干块,将每块的数据分给集群上的节点进行计算。然后通过整合各节点的中间结果,得到最终的结果。
HDFS上默认块的大小要比磁盘默认的大小大的多。其目的是为了最小化寻址开销。如果块设置得足够大,从磁盘传输数据的时间明显大于定位这个块开始位置所需时间。这样,传输一个由多个块组成的文件时间取决于磁盘传输速率。HDFS默认块的大小为64MB。随着磁盘驱动器的进一步发展块的默认大小可以设置的更大。
MapReduce的处理过程
一个复杂的MapReduce任务可以分为若干个Job。每个Job又可以分为Mapper和Reducer两个阶段。这两个阶段对应到代码内就是继承Mapper的内部类和继承Reducer的内部类。继承Mapper的内部类需要实现map函数,继承Reducer的内部类需要实现Reduce函数。Map函数接收一个<key,value>的键值对同时也会输出一个 <key,value> 的键值对。Reduce函数接收一个<key,list of values>(值为所有键为key的value集合,例如: map的输出为<1,1>,<1,2>,<1,3>,<1,4>则reduce的输入为<1,[1,2,3,4]>)同时经过处理后同样会输出<key,value>键值对。MapReduce运行过程的数据流
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
首先分析主函数Main方法
Configuration conf = new Configuration();
系统运行时会加载Hadoop默认的一些配置。
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); }
程序运行时会接收从命令行传来的一些参数。默认第一个参数为程序要处理的数据文件夹路径,即in文件夹路径。默认第二个参数为程序结果要输出到的文件夹路径,即out文件夹路径。如果命令行的参数少于2个时程序会停止运行。
Job job = new Job(conf, "word count");
初始化一个Job任务,这个Job任务需要加载hadoop的一些配置,并给这个Job 命名为“word count”
job.setJarByClass(WordCount.class);
这个方法使用了WordCount.class的类加载器来寻找包含该类的Jar包,然后设置该Jar包为作业所用的Jar包。
job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class);
setMapperClass 该方法设置了该Job任务所使用的Mapper类(拆分)。
setCombinerClass 该方法设置了该Job任务所使用的Combiner类(中间结果合并)。
setReducerClass该方法设置了该Job任务所使用的Reducer类(合并)。
可以发现Combiner处理类和Reducer处理类使用的是同一个类即IntSumReducer类。为什么这两个风马牛不相及的处理类可以使用同一个类呢?这仅仅是巧合吗?答案是否定的。
Combiner本质上是一个本地的Reducer方法。在集群环境中,带宽是这个集群最稀有的资源。可以先将本地map所处理的数据进行本地Reduce,然后再将结果传给集群其他节点进行处理。这样设计的好处是,可以节省集群的带宽使用率。但是,并不是所有的Combine处理过程均能使用Reducer的处理类,这需要在逻辑上考虑是否可移植!要想进行本地reduce(combine),一个必要的条件是,reduce的输入输出格式必须一样!
job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
设置Reducer的键输出的类型为Text类型,值输出的类型为IntWritable类型。例如本程序输出的单词和其出现的次数<单词,次数>。
Text类似于Java的String类型。
IntWritable类型类似于Java的int类型。
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
设置程序输入和输出的路径。本示例是从命令行中接收参数。第一个参数为输入路径。第二个参数为输出路径。
与其他语言一样。Hadoop会有自己的一些默认配置。本示例中使用默认的输入方式即TextInputFormat。
Job. setInputFormat(TextInputFormat.class );
TextInputFormat是Hadoop的默认输入方式。在TextInputFormat方式中,系统会自动将输入文件的每行数据形成一条记录,每条记录表示成<key,value>的形式,这条记录会作为map的输入数据。key值是每个数据的记录在数据分片中字节偏移量,数据类型为LongWritable。LongWritable类似于Java的Long类型。Value值是每行的内容,数据类型为Text。
比如,输入文件为
Hello world
Hello hadoop
则会形成
<0,Hello world>
<12,Hello hadoop>
Map类中map方法分析
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
用户自定义map类需要继承Mapper类,并实现map方法。此类是一种规范类型,它有四种形式的参数分别用来指定map的输入key值的类型、map的输入value值的类型、map的输出key值类型、map的输出value值类型。由于本示例中使用的是默认的TextInputFormat输入类型。所以map输入键的类型为LongWritable的父类型Object,map的输入值的类型为Text。因为map要输出的键值对类型为<单词,次数>,所以map的输出key值类型为Text,输出value值的类型为IntWritable。
public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } }
Map方法为用户想要实现的特定的功能。在本示例中,map方法对输入的记录以空格为单位进行切分,然后使用context对象进行输出。Context包含运行时的上下文信息。
Reduce类中reduce方法分析
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>
同样的,用户自定义Reduce需要继承Reducer类,并实现reduce方法。此类也是一种规范类型,它同样有四种形式的参数用来指定Reduce的输入key值类型、输入value值类型、输出key类型、输出value类型。但,reduce的输入key值的类型必须和map输出key值类型相同。Reduce的输人value值的类型必须和map输出value值类型相同。
public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); }
当数据输入到Reduce端时,key为具体的单词,而values是对应单词的计数值所组成的列表,Map的输出就是Reduce的输入,所以reduce方法只要遍历values并求和,即可得到某个单词的总次数。
WordCount处理过程
⒈MapReduce框架会自动将输入文件分片。由于测试文件较小,故将每个文件分为一个split,并将文件按行分割形成<key,value>对。
⒉将分割好的<key,value>对,交给用户定义好的map进行处理,并输出<key,value>对。
⒊map方法输出<key,value>对后,Mapper会自动将输出的<key,value>对排序,然后执行Combine过程即本地Reduce方法。
⒋Reducer会先将Combine结果排序,并将具有相同的key的value形成集合。最后通过用户自定义的reduce方法输出结果。
相关文章推荐
- CentOS系统初始化
- Linux下编译、链接和装载
- UVa 10271 - Chopsticks(DP)
- linux shell脚本守护进程监控svn服务
- linux device tree源代码解析--转
- pxe-mof exiting intel pxe rom operating system not found
- 阿里云服务器 ECS Ubuntu系统下PHP,MYSQL,APACHE2的安装配置
- 【linux高级程序设计】(第八章)进程管理与程序开发 1
- nginx+tomcat集群配置
- NSOperation - 01
- 如何隐藏Linux内核版本及登录时显示的信息
- 9.1 libvirt与openstack
- shell 脚本统计文件梳理及获取磁盘占有率
- Linux中等待队列的实现
- 7.1 虚拟机直接IO原理与架构
- 自学opencv过程中轮廓学习时遇到的几个问题
- Tomcat\conf\server.xml文件解析
- ubuntu内核升级
- Linu命令与实例
- autoconf、automake详解