hadoop核心逻辑shuffle代码分析-reduce端
2013-04-01 23:34
357 查看
接上篇文章,下面是reduce端的过程分析。
大概介绍下reduce的实际作用。以mapreduce经常做的groupby为例,map是将输入按group by的key排序,reduce就是做各种类型的聚合,比如sum,max,mean等。因此,可想而知,reduce的输入必须是按照groupby排序的,所以自然,reduce的输入必须汇聚所有map的输入,这也是reduce框架最复杂的部分。
首先,还是看一张reduce过程整体时序图:
1.从多个mapper端fetch数据。(copy)
fetch指从mapper端拉取mapper的输出数据。在fetch阶段reducetask会启动多个fetch线程从所有的mapper端取这个reducer的数据,同时还有一个fetchEvent线程负责获取mapper完成的event通知Fetch线程。
这里顺便提下,mapreduce的调度系统有一个假设:数据就近原则,也就是yarn的控制器会根据数据的位置分配mapper的位置,因此一般来说mapper的输入数据都是本地文件,再加上配置的机架感知和本地读策略mapper的输入处理效率是很高的,但reduce就不一样了,每个reducer的输入都要来自所有的mapper,因此,一般没办法优化reducer的位置。
reducer不会等到所有的mapper执行完毕再去拉数据,而是在mappertask完成一定比例后就会开始fetch,下面先列出一些fetch阶段的重要参数:
mapreduce.job.reduce.slowstart.completedmaps:在maptask完成了一定百分比后将触发fetch,默认为0.05
mapreduce.reduce.shuffle.read.timeout:fetch读数据的timeout时间,默认为3分钟
mapreduce.reduce.shuffle.maxfetchfailures:fetch最大的失败次数,默认为10次
mapreduce.reduce.shuffle.connect.timeout:fetch建立连接的timeout时间,默认也是3分钟
mapreduce.reduce.shuffle.parallelcopies:同时创建的fetch线程个数
fetch的目的有2个:内存buf和磁盘。那么如何判刑是保存进内存还是硬盘纳:首先判断内存buf大小有没有超过maxSingleShuffleLimit,如果没有则存入内存,如果整个内存buf都存满了则copy过程需要暂停一会儿(stallShuffle),其他情况则写入磁盘。代码如下:
2.内存merge和磁盘merge(merge & sort)
第一阶段和第二阶段其实是密切相关的。fetch线程负责取数据,merge负责存数据,merge的输出就是用户的reducer的输入。分布式环境中每个mapper的输入只是整体的一小部分,因此关键的数据的汇总就是在merge阶段处理的。reducer的merge是最复杂的,包括内存merge,磁盘merge,finalmerge等阶段一步步的将零散的mapper的输出整合成一个整体有序的输入,下面会仔细介绍此过程。
在第一部中提到了内存buf和磁盘,我们知道写内存的速度肯定是比磁盘快的多的,但内存也不能太大,所以需要对内存的使用做一些限制。下面列一下merge过程中的相关参数:
mapreduce.reduce.shuffle.input.buffer.percent:copy阶段内存使用的最大值,默认为0.9
mapreduce.task.io.sort.factor:每次merge的文件个数,关系到需要merge次数的参数,默认为100
mapreduce.reduce.shuffle.memory.limit.percent:每个fetch取到的输出的大小能够占的内存比的大小。默认是0.25。因此实际每个fetcher的输出能放在内存的大小是reducer的java heap size*0.9*0.25。所以,如果我们想fetch不进磁盘的话,可以适当调大这个值。
mapreduce.reduce.merge.memtomem.enabled:是否enable内存merge,默认是false,如果设置为true,则在finalmerge时内存的数据不会存入磁盘再merge而是直接在内存中merge。不过,好像内存中的merge并没有想象的那么美好,因此默认是关闭的:https://issues.apache.org/jira/browse/MAPREDUCE-268
mapreduce.reduce.shuffle.merge.percent:触发merge的内存条件,默认是java heap size*0.9*0.9
下面看下InMemoryMerge处理的过程。InMemoryMerger不是上面提到的MemToMemMerge,InMemoryMerger主要负责将mapper的输出(内存buf)整合并输出到磁盘上。输出磁盘的过程类似mapper端的spill,而且也是等内存攒到一定数量后才会触发spill,但是不会出现类似mapper端的情况,因为如果超过内存的上线后mapper的输出会直接输出到磁盘上。InMemoryMerger的spill会在达到merge的threshold后触发spill。看下spill的代码:
最后将内存buf中的值merge到一个输出文件中。DiskMerge的merge过程类似,不过开始merge的条件是磁盘上的零散文件超过mapreduce.task.io.sort.factor的2倍-1。
在fetch结束后会进入finalmerge。所谓finalmerge就是将各个mapper的输入整合成真正reducer的输入。在fetch阶段我们已经一边copy数据一边merge,因此到了finalmerge时,mapper的数据部分存在于内存buf中部分存在于tmp目录的文件中。下面先看下finalmerge的代码:
这段代码处理了内存中的mapper数据,将内存的数据排序和merge(其实merge可以单纯看做排序)。
将磁盘上的文件信息读入内存进行最终merge前的准备。
最后开始build最终merge的list。可以看到是内存buf和磁盘文件的整合。这里Merger.merge方法比较复杂,大概介绍下:对于reducer的merge不会真的全做merge,而是根据输入segments的个数做判断,如果segment个数超过io.factor(就是在mapper端详解中介绍的同时merge的个数),Merger会构造内存heap用多路归并算法合并输入的segment,直到segment的数量不超过io.factor为止,注意,这里不需要merge为一个文件。比如:输入为45个文件,io.factor为10,第一次merge前10个文件,第二次merge10到20文件,merge4次后剩余7个小于10,这时,此次merge就结束了。这样可以省一次merge,而且这次merge的IO操作还是最大的,大大的提高了效率!这也是为什么有时候要适当调高io.factor参数的原因了。
这里还有一个疑问的,有的文章说可以让merge的输出也就是reducer的输入直接放在内存中,我实在没找到在哪儿,因为在第一步merge内存segment时会write成磁盘文件。因此内存segment始终是要落到磁盘才行的。
最后,有的人可能要问了,merge结果不是一个文件那读的时候怎么办,跟merge过程类似,读的时候会把merge结果的所有文件的数据放到内存queue中(单个文件已经有序),依次取出。
3.执行reduce过程(groupby and exec)
到了reducer的执行其实就比较明了了,就是从merge的结果构成的heap中取数据,一条记录是key和相同key对应的序列。
这边有个要注意的是,在0.20后的mapreduce多了一个比较的comparator:getOutputValueGroupingComparator(),在用户定义了此comparator后,reducer是用它来做grouping的。相关内容可以参考secendarySort:
/article/4808239.html
最后还是总结一下:
1.mapreduce的优化通常围绕shuffle的过程展开,包括如何增加并发、多用内存少用磁盘、减少shuffle的文件大小等,相关参数参考上文。
2.根据经验,shuffle时如果数据过大经常会发生OutOfMemory,这时可以适当调小相应的参数,比如减少每个fetch占内存的百分比让fetch的输出直接进入磁盘。
3.io.factor在mapper很多时比较影响系统的效率,不但mapper端的merge,reducer的merge也会受到影响,需要综合考虑。
大概介绍下reduce的实际作用。以mapreduce经常做的groupby为例,map是将输入按group by的key排序,reduce就是做各种类型的聚合,比如sum,max,mean等。因此,可想而知,reduce的输入必须是按照groupby排序的,所以自然,reduce的输入必须汇聚所有map的输入,这也是reduce框架最复杂的部分。
首先,还是看一张reduce过程整体时序图:
1.从多个mapper端fetch数据。(copy)
fetch指从mapper端拉取mapper的输出数据。在fetch阶段reducetask会启动多个fetch线程从所有的mapper端取这个reducer的数据,同时还有一个fetchEvent线程负责获取mapper完成的event通知Fetch线程。
这里顺便提下,mapreduce的调度系统有一个假设:数据就近原则,也就是yarn的控制器会根据数据的位置分配mapper的位置,因此一般来说mapper的输入数据都是本地文件,再加上配置的机架感知和本地读策略mapper的输入处理效率是很高的,但reduce就不一样了,每个reducer的输入都要来自所有的mapper,因此,一般没办法优化reducer的位置。
reducer不会等到所有的mapper执行完毕再去拉数据,而是在mappertask完成一定比例后就会开始fetch,下面先列出一些fetch阶段的重要参数:
mapreduce.job.reduce.slowstart.completedmaps:在maptask完成了一定百分比后将触发fetch,默认为0.05
mapreduce.reduce.shuffle.read.timeout:fetch读数据的timeout时间,默认为3分钟
mapreduce.reduce.shuffle.maxfetchfailures:fetch最大的失败次数,默认为10次
mapreduce.reduce.shuffle.connect.timeout:fetch建立连接的timeout时间,默认也是3分钟
mapreduce.reduce.shuffle.parallelcopies:同时创建的fetch线程个数
fetch的目的有2个:内存buf和磁盘。那么如何判刑是保存进内存还是硬盘纳:首先判断内存buf大小有没有超过maxSingleShuffleLimit,如果没有则存入内存,如果整个内存buf都存满了则copy过程需要暂停一会儿(stallShuffle),其他情况则写入磁盘。代码如下:
if (!canShuffleToMemory(requestedSize)) { LOG.info(mapId + ": Shuffling to disk since " + requestedSize + " is greater than maxSingleShuffleLimit (" + maxSingleShuffleLimit + ")"); return new MapOutput<K,V>(mapId, this, requestedSize, jobConf, localDirAllocator, fetcher, true, mapOutputFile); } if (usedMemory > memoryLimit) { LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory + ") is greater than memoryLimit (" + memoryLimit + ")." + " CommitMemory is (" + commitMemory + ")"); return stallShuffle; } // Allow the in-memory shuffle to progress LOG.debug(mapId + ": Proceeding with shuffle since usedMemory (" + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")." + "CommitMemory is (" + commitMemory + ")"); return unconditionalReserve(mapId, requestedSize, true);
2.内存merge和磁盘merge(merge & sort)
第一阶段和第二阶段其实是密切相关的。fetch线程负责取数据,merge负责存数据,merge的输出就是用户的reducer的输入。分布式环境中每个mapper的输入只是整体的一小部分,因此关键的数据的汇总就是在merge阶段处理的。reducer的merge是最复杂的,包括内存merge,磁盘merge,finalmerge等阶段一步步的将零散的mapper的输出整合成一个整体有序的输入,下面会仔细介绍此过程。
在第一部中提到了内存buf和磁盘,我们知道写内存的速度肯定是比磁盘快的多的,但内存也不能太大,所以需要对内存的使用做一些限制。下面列一下merge过程中的相关参数:
mapreduce.reduce.shuffle.input.buffer.percent:copy阶段内存使用的最大值,默认为0.9
mapreduce.task.io.sort.factor:每次merge的文件个数,关系到需要merge次数的参数,默认为100
mapreduce.reduce.shuffle.memory.limit.percent:每个fetch取到的输出的大小能够占的内存比的大小。默认是0.25。因此实际每个fetcher的输出能放在内存的大小是reducer的java heap size*0.9*0.25。所以,如果我们想fetch不进磁盘的话,可以适当调大这个值。
mapreduce.reduce.merge.memtomem.enabled:是否enable内存merge,默认是false,如果设置为true,则在finalmerge时内存的数据不会存入磁盘再merge而是直接在内存中merge。不过,好像内存中的merge并没有想象的那么美好,因此默认是关闭的:https://issues.apache.org/jira/browse/MAPREDUCE-268
mapreduce.reduce.shuffle.merge.percent:触发merge的内存条件,默认是java heap size*0.9*0.9
下面看下InMemoryMerge处理的过程。InMemoryMerger不是上面提到的MemToMemMerge,InMemoryMerger主要负责将mapper的输出(内存buf)整合并输出到磁盘上。输出磁盘的过程类似mapper端的spill,而且也是等内存攒到一定数量后才会触发spill,但是不会出现类似mapper端的情况,因为如果超过内存的上线后mapper的输出会直接输出到磁盘上。InMemoryMerger的spill会在达到merge的threshold后触发spill。看下spill的代码:
Writer<K,V> writer = new Writer<K,V>(jobConf, rfs, outputPath, (Class<K>) jobConf.getMapOutputKeyClass(), (Class<V>) jobConf.getMapOutputValueClass(), codec, null); RawKeyValueIterator rIter = null; try { LOG.info("Initiating in-memory merge with " + noInMemorySegments + " segments..."); rIter = Merger.merge(jobConf, rfs, (Class<K>)jobConf.getMapOutputKeyClass(), (Class<V>)jobConf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), new Path(reduceId.toString()), (RawComparator<K>)jobConf.getOutputKeyComparator(), reporter, spilledRecordsCounter, null, null); if (null == combinerClass) { Merger.writeFile(rIter, writer, reporter, jobConf); } else { combineCollector.setWriter(writer); combineAndSpill(rIter, reduceCombineInputCounter); }有2点需要注意:1,merge的Comparator是key的Comparator,相关原因在后面会讲到。2,merge过程也会使用combiner,原来reduce端也会combine!这个在所有的mapreduce教材中都没有讲到。
最后将内存buf中的值merge到一个输出文件中。DiskMerge的merge过程类似,不过开始merge的条件是磁盘上的零散文件超过mapreduce.task.io.sort.factor的2倍-1。
synchronized (onDiskMerger) { if (!onDiskMerger.isInProgress() && onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) { onDiskMerger.startMerge(onDiskMapOutputs); } }
在fetch结束后会进入finalmerge。所谓finalmerge就是将各个mapper的输入整合成真正reducer的输入。在fetch阶段我们已经一边copy数据一边merge,因此到了finalmerge时,mapper的数据部分存在于内存buf中部分存在于tmp目录的文件中。下面先看下finalmerge的代码:
if (numMemDiskSegments > 0 && ioSortFactor > onDiskMapOutputs.size()) { mergePhaseFinished = true; // must spill to disk, but can't retain in-mem for intermediate merge final Path outputPath = mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes).suffix( Task.MERGED_OUTPUT_PREFIX); final RawKeyValueIterator rIter = Merger.merge(job, fs, keyClass, valueClass, memDiskSegments, numMemDiskSegments, tmpDir, comparator, reporter, spilledRecordsCounter, null, mergePhase); final Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath, keyClass, valueClass, codec, null); try { Merger.writeFile(rIter, writer, reporter, job); // add to list of final disk outputs. onDiskMapOutputs.add(outputPath);
这段代码处理了内存中的mapper数据,将内存的数据排序和merge(其实merge可以单纯看做排序)。
// segments on disk List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>(); long onDiskBytes = inMemToDiskBytes; Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]); for (Path file : onDisk) { onDiskBytes += fs.getFileStatus(file).getLen(); LOG.debug("Disk file: " + file + " Length is " + fs.getFileStatus(file).getLen()); diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs, (file.toString().endsWith( Task.MERGED_OUTPUT_PREFIX) ? null : mergedMapOutputsCounter) )); } LOG.info("Merging " + onDisk.length + " files, " + onDiskBytes + " bytes from disk"); Collections.sort(diskSegments, new Comparator<Segment<K,V>>() { public int compare(Segment<K, V> o1, Segment<K, V> o2) { if (o1.getLength() == o2.getLength()) { return 0; } return o1.getLength() < o2.getLength() ? -1 : 1; } });
将磁盘上的文件信息读入内存进行最终merge前的准备。
// build final list of segments from merged backed by disk + in-mem List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>(); long inMemBytes = createInMemorySegments(inMemoryMapOutputs, finalSegments, 0); LOG.info("Merging " + finalSegments.size() + " segments, " + inMemBytes + " bytes from memory into reduce"); if (0 != onDiskBytes) { final int numInMemSegments = memDiskSegments.size(); diskSegments.addAll(0, memDiskSegments); memDiskSegments.clear(); // Pass mergePhase only if there is a going to be intermediate // merges. See comment where mergePhaseFinished is being set Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; RawKeyValueIterator diskMerge = Merger.merge( job, fs, keyClass, valueClass, diskSegments, ioSortFactor, numInMemSegments, tmpDir, comparator, reporter, false, spilledRecordsCounter, null, thisPhase); diskSegments.clear(); if (0 == finalSegments.size()) { return diskMerge; } finalSegments.add(new Segment<K,V>( new RawKVIteratorReader(diskMerge, onDiskBytes), true)); } return Merger.merge(job, fs, keyClass, valueClass, finalSegments, finalSegments.size(), tmpDir, comparator, reporter, spilledRecordsCounter, null, null);
最后开始build最终merge的list。可以看到是内存buf和磁盘文件的整合。这里Merger.merge方法比较复杂,大概介绍下:对于reducer的merge不会真的全做merge,而是根据输入segments的个数做判断,如果segment个数超过io.factor(就是在mapper端详解中介绍的同时merge的个数),Merger会构造内存heap用多路归并算法合并输入的segment,直到segment的数量不超过io.factor为止,注意,这里不需要merge为一个文件。比如:输入为45个文件,io.factor为10,第一次merge前10个文件,第二次merge10到20文件,merge4次后剩余7个小于10,这时,此次merge就结束了。这样可以省一次merge,而且这次merge的IO操作还是最大的,大大的提高了效率!这也是为什么有时候要适当调高io.factor参数的原因了。
这里还有一个疑问的,有的文章说可以让merge的输出也就是reducer的输入直接放在内存中,我实在没找到在哪儿,因为在第一步merge内存segment时会write成磁盘文件。因此内存segment始终是要落到磁盘才行的。
最后,有的人可能要问了,merge结果不是一个文件那读的时候怎么办,跟merge过程类似,读的时候会把merge结果的所有文件的数据放到内存queue中(单个文件已经有序),依次取出。
3.执行reduce过程(groupby and exec)
到了reducer的执行其实就比较明了了,就是从merge的结果构成的heap中取数据,一条记录是key和相同key对应的序列。
public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKey()) { reduce(context.getCurrentKey(), context.getValues(), context); // If a back up store is used, reset it Iterator<VALUEIN> iter = context.getValues().iterator(); if(iter instanceof ReduceContext.ValueIterator) { ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); } } cleanup(context); }
这边有个要注意的是,在0.20后的mapreduce多了一个比较的comparator:getOutputValueGroupingComparator(),在用户定义了此comparator后,reducer是用它来做grouping的。相关内容可以参考secendarySort:
/article/4808239.html
最后还是总结一下:
1.mapreduce的优化通常围绕shuffle的过程展开,包括如何增加并发、多用内存少用磁盘、减少shuffle的文件大小等,相关参数参考上文。
2.根据经验,shuffle时如果数据过大经常会发生OutOfMemory,这时可以适当调小相应的参数,比如减少每个fetch占内存的百分比让fetch的输出直接进入磁盘。
3.io.factor在mapper很多时比较影响系统的效率,不但mapper端的merge,reducer的merge也会受到影响,需要综合考虑。
相关文章推荐
- hadoop核心逻辑shuffle代码分析-reduce端
- hadoop核心逻辑shuffle代码分析-reduce端
- hadoop核心逻辑shuffle代码分析-map端
- hadoop核心逻辑shuffle代码分析-map端 (转)
- hadoop核心逻辑shuffle代码分析-map端
- hadoop核心逻辑shuffle代码分析-map端
- hadoop核心逻辑shuffle代码分析-map端
- hadoop核心逻辑shuffle代码分析-map端
- hadoop核心逻辑shuffle代码分析-map端
- hadoop核心逻辑shuffle代码分析-map端
- 《Spark商业案例与性能调优实战100课》第19课:商业案例之NBA篮球运动员大数据分析核心业务逻辑代码实战
- discuz论坛apache日志hadoop大数据分析项目:清洗数据核心功能解说及代码实现
- Hadoop笔记之map &&shuffle && reduce 工作流程图及其分析
- Hadoop : MapReduce中的核心Shuffle和Sort分析
- Java 线程池框架核心代码分析
- 基于visual c++之windows核心编程代码分析(24)IO控制、内核通信
- Spark-项目中分析日志的核心代码
- Hadoop之气象站分析演示代码
- 基于visual c++之windows核心编程代码分析(9)实现Windows服务并安装,控制
- TeamTalk Android代码分析(业务流程篇)---消息发送和接收的整体逻辑说明