mapreduce框架设计思想,wordcount程序原理与实现
2017-10-08 22:12
1081 查看
1、mapreduce框架设计思想
mapreduce结构
一个完整的mapreduce程序在分布式运行时有三类实例进程:
1、MRAppMaster:负责整个程序的过程调度及状态协调
2、mapTask:负责map阶段的整个数据处理流程
3、ReduceTask:负责reduce阶段的整个数据处理流程
运行流程:以wordcount(单词统计)为例
分析:
假如要统计三个文件中每个单词出现的次数
将文件上传到hdfs后,以图为例假设每个文件被切分为了4个block存在了4台机器上,mapreduce application master假设在第四台机器上,它来进行map task和reduce task的调度,如图示,每台机器上运行一个map task程序,每个map task分别计算自己所在机器上的三个block中每个单词出现的次数,最后每个map task将读到的a-h开头的单词与次数提交到第一台机器的reduce task,h-p开头的提交到第二台机器的reduce task,p-z开头的提交到第三天机器的reduce task,最后输出到三个hdfs文件。而对于调度问题都是由mapreduce application master来解决。
计算机制可以做如下思考。
1、每个map task 按行读取block中的数据,每行按照空格分开,然后存入到一个hashmap中,key为单词,value为数量,每读到相同的单词,value+1,读到以前没读过的单词,则新增key,value。
2、等到自己这台机器上的三个block都读完后,再创建三个hashmap将a-h的单词放到第一个hashmap,h-p开头的放到第二个hashmap,p-z开头的放到第三个hashmap。最后提交到对应的三个reduce task。
注:这样说只是为了更简单易懂,实际上存在一些差异的,比如是不是a-h就一定交给第一个reduce task,所有a开头的都会交给第一个reduce task?答案是否定的,实际上提交给哪个是根据每个单词的hashcode以及reduce task的数量决定的(参考 HashPartitioner类)。比如application单词以及出现次数就会交给第一个reduce task,而angry就可能交给第二个reduce task了。
为了方便理解下面就写个wordcount程序。
2、wordcount程序实现
1、编写map方法
2、编写reduce方法
解释:
map()方法中我们设置的时候单词做为key,1作为value写出,比如
4、进行单词统计测试
将工程打成jar包上传到linux(hadoop集群启动起来)
到这里程序就跑完了,可以查看/wordcount/output目录结构
说明只跑了一个reduce task,因为默认就是这样,可以自己设置数量。
再查看单词统计结果
3、wordcount原理
wordcount运行过程分析图
1、程序启动后,根据客户端提交的job能得到待处理的数据的信息,比如要切片的数量(默认就是blocksize,每个切片会分配一个 maptask实例去运行),程序jar包,配置参数等。mapreduce application master会根据这些信息获得map task实例数量。然后向集群中申请响应数量的map task进程。
2、每个map task会利用InputFormat去读取分给自己的切片的数据信息,读取到的内容以(key,value)(没读一行调用一次map()方法)形式传到我们自己重写的map()方法里进行处理。
3、将map()方法中的输出结果传到输出收集器(缓存?OutputCollector)中。
4、当map task执行完后输出收集器会将信息写入到一个输出文件中,这个文件是分区(有几个reduce task就有几个区)的。
5、mapreduce application master监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据范围(数据分区)。
6、reduce task进程启动之后,根据mapreduce application master告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储。
mapreduce结构
一个完整的mapreduce程序在分布式运行时有三类实例进程:
1、MRAppMaster:负责整个程序的过程调度及状态协调
2、mapTask:负责map阶段的整个数据处理流程
3、ReduceTask:负责reduce阶段的整个数据处理流程
运行流程:以wordcount(单词统计)为例
分析:
假如要统计三个文件中每个单词出现的次数
将文件上传到hdfs后,以图为例假设每个文件被切分为了4个block存在了4台机器上,mapreduce application master假设在第四台机器上,它来进行map task和reduce task的调度,如图示,每台机器上运行一个map task程序,每个map task分别计算自己所在机器上的三个block中每个单词出现的次数,最后每个map task将读到的a-h开头的单词与次数提交到第一台机器的reduce task,h-p开头的提交到第二台机器的reduce task,p-z开头的提交到第三天机器的reduce task,最后输出到三个hdfs文件。而对于调度问题都是由mapreduce application master来解决。
计算机制可以做如下思考。
1、每个map task 按行读取block中的数据,每行按照空格分开,然后存入到一个hashmap中,key为单词,value为数量,每读到相同的单词,value+1,读到以前没读过的单词,则新增key,value。
2、等到自己这台机器上的三个block都读完后,再创建三个hashmap将a-h的单词放到第一个hashmap,h-p开头的放到第二个hashmap,p-z开头的放到第三个hashmap。最后提交到对应的三个reduce task。
注:这样说只是为了更简单易懂,实际上存在一些差异的,比如是不是a-h就一定交给第一个reduce task,所有a开头的都会交给第一个reduce task?答案是否定的,实际上提交给哪个是根据每个单词的hashcode以及reduce task的数量决定的(参考 HashPartitioner类)。比如application单词以及出现次数就会交给第一个reduce task,而angry就可能交给第二个reduce task了。
为了方便理解下面就写个wordcount程序。
2、wordcount程序实现
1、编写map方法
/** * KEYIN:默认情况下,是mr框架所读到的一行文本的起始偏移量,Long,但是在hadoop中有自己的 * 更精简的序列化接口(Seria会将类结构都序列化,而实际我们只需要序列化数据),所以不直接用Long,而用LongWritable * VALUEIN:默认情况下,是mr框架所读到的一行文本的内容,String,同上,用Text * KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,String,这里是单词 * VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词次数,Integer * @author 12706 * */ public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ /* * map阶段的业务逻辑就写在自定义的map()方法中 * maptask会对每一行输入数据调用一次我们自定义的map()方法 */ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将maptask传给我们的文本内容先转换成String String line = value.toString(); //根据空格将这一行切分成单词 String[] words = line.split(" "); //将单词输出为<单词,1> for (String word : words) { //将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发, //以便于相同单词会到相同的reduce task context.write(new Text(word), new IntWritable(1)); } } }
2、编写reduce方法
/** * KEYIN VALUEIN对应mapper输出的KEYOUT KEYOUT类型对应 * KEYOUT,VALUEOUT:是自定义reduce逻辑处理结果的输出数据类型 * KEYOUT:是单词 * VALUEOUT:是总次数 * @author 12706 * */ public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ /* * <angry,1><angry,1><angry,1><angry,1><angry,1><angry,1> * <banana,1><banana,1><banana,1><banana,1><banana,1> * ... * <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1> * 入参key,是一组相同单词kv的key * 每个单词统计调一次,换一组再调用一次 */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; // Iterator<IntWritable> iterator = values.iterator(); // while(iterator.hasNext()){ // IntWritable next = iterator.next(); // } for (IntWritable value : values) { count += value.get(); } context.write(key, new IntWritable(count)); } }
解释:
map()方法中我们设置的时候单词做为key,1作为value写出,比如
/** * 相当于一个yarn集群的客户端 * 需要在此封装mr程序的相关运行参数,指定jar包 * 最后提交给yarn * @author 12706 * */ public class WordcountDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //指定本程序的jar包所在的本地路径 job.setJarByClass(WordcountDriver.class); //指定本业务job要使用的mapper,reducer业务类 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); //虽然指定了泛型,以防框架使用第三方的类型 //指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定最终输出的数据的kv类型(这里就是指reducer输出类型) job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); //指定输出结果目录 FileOutputFormat.setOutputPath(job, new Path(args[1])); //将job中配置的相关参数以及job所用的java类所在的jar包,提交给yarn去运行 // job.submit(); boolean b = job.waitForCompletion(true); //这个方法是用来结束当前正在运行中的java虚拟机。如果status是非零参数,那么表示是非正常退出。 System.exit(b?0:1); } }
4、进行单词统计测试
将工程打成jar包上传到linux(hadoop集群启动起来)
[root@mini2 hadoop-2.6.4]# hadoop fs -mkdir /wordcount/input 注:/wordcount/output目录不能创建 [root@mini2 hadoop-2.6.4]# hadoop fs -put LICENSE.txt NOTICE.txt README.txt /wordcount/input [root@mini2 ~]# hadoop jar wordcount.jar com.itheima.hadoop.mapreduce.WordcountDriver /wordcount/input /wordcount/output
到这里程序就跑完了,可以查看/wordcount/output目录结构
[root@mini2 ~]# hadoop fs -ls /wordcount/output -rw-r--r-- 2 root supergroup 0 2017-10-09 05:06 /wordcount/output/_SUCCESS -rw-r--r-- 2 root supergroup 8989 2017-10-09 05:06 /wordcount/output/part-r-00000
说明只跑了一个reduce task,因为默认就是这样,可以自己设置数量。
再查看单词统计结果
[root@mini2 ~]# hadoop fs -cat /wordcount/output/part-r-00000 ... within 8 without 6 work 5 work, 2 work. 1 works 1 worldwide, 2 writing 1 writing, 3 written 3 you 3 your 5
3、wordcount原理
wordcount运行过程分析图
1、程序启动后,根据客户端提交的job能得到待处理的数据的信息,比如要切片的数量(默认就是blocksize,每个切片会分配一个 maptask实例去运行),程序jar包,配置参数等。mapreduce application master会根据这些信息获得map task实例数量。然后向集群中申请响应数量的map task进程。
2、每个map task会利用InputFormat去读取分给自己的切片的数据信息,读取到的内容以(key,value)(没读一行调用一次map()方法)形式传到我们自己重写的map()方法里进行处理。
3、将map()方法中的输出结果传到输出收集器(缓存?OutputCollector)中。
4、当map task执行完后输出收集器会将信息写入到一个输出文件中,这个文件是分区(有几个reduce task就有几个区)的。
5、mapreduce application master监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据范围(数据分区)。
6、reduce task进程启动之后,根据mapreduce application master告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储。
相关文章推荐
- MapReduce编写wordcount程序代码实现
- [读书笔记]深入解析MapReduce架构设计与实现原理——CH4 Hadoop RPC基本框架
- Hadoop之Mapreduce------>入门级程序WordCount原理
- 笔记:深入解析MapReduce架构设计与实现原理 第4章 RPC框架解析
- 通过简单的Word Count讲解MapReduce原理以及Java实现
- 框架设计的一些思想(应用框架的设计和实现上摘)
- WordCount,第一个MapReduce程序
- wordCount程序中MapReduce工作过程分析
- mapreduce的countWord程序
- Eclipse下运行hadoop自带的mapreduce程序--wordcount
- [读书笔记]深入解析MapReduce架构设计与实现原理——CH4 Java反射机制与动态代理
- Hadoop MapReduce示例程序WordCount.java手动编译运行解析
- MapReduce原理与设计思想
- MapReduce原理与设计思想
- Spark on Yarn上实现WordCount程序
- HADOOP 分布式集群环境下第一个mapReduce程序—WordCount
- hadoop hdfs搭建 mapreduce环境搭建 wordcount程序简单注释
- 在eclipse上运行MapReduce的wordcount程序所遇到的问题
- Hadoop技术内幕:深入解析MapReduce架构设计与实现原理
- 程序框架的设计与实现(庆祝郑州园成立)