您的位置:首页 > 运维架构

Hadoop 之 Shuffle and Sort

2016-05-31 17:20 441 查看
Mapreduce 确保每个reducer的input都是按照key 排序的。系统将 map ouputs 变成 reduce inputs输入的过程被称为 shuffle。shuffle是Mapreduce的 心脏,是奇迹发生的地方。

Map side

当 map函数开始产生输出时,并不简单的将它写到磁盘。它利用buffer的方式写到内存,并除以效率的考虑,进行预排序。



每个map任务都有一个环形的内存缓冲区用于存储任务输出。缓冲区默认大小为100M(可以通过mapreduce.task.io.sort.mb 设置)。一旦缓冲的内容达到阈值(mapre

duce.map.sort.spill.percent,默认未0.80),会开启一个后台线程,将内容 spill(溢出)到磁盘。map的输出继续写到缓冲区,但如果在此期间缓冲区被填满,map会被阻塞直到写磁盘过程完成。溢出写过程按轮询方式将缓冲区内容写到 mapreduce.cluster.local.dir 属性指定的目录中。

在写入磁盘前,线程首先根据最终要输出的reducer将数据分区。在每个分区中,后台线程会在内存中按key 执行排序,如果有一个combiner,它就在排序后的输出上运行。运行combine函数会使map输出更加紧凑,减少写到磁盘的数据和传递给reducer的数据。

每次 buffer达到阈值后,都会新建一个spill文件。因此在map写完其最后一个输出记录后,会有几个溢出文件。在task完成之前,spil文件被合并成一个已分区且已排序的输出文件。apreduce.task.io.sort.factor属性控制着一次最多能合并多少流,默认值为0.

如果至少存在3个spill文件(mapreduce.map.combine.minspills 可以配置),这output写入到磁盘之前会再次运行combiner。反复调用combiner不会对最终结果产生影响。如果只有一个或者两个spill 文件,那么就不值得再对map输出调用combiner,所以不会再运行combiner。

将map输出结果压缩后写到磁盘上是个比较好的注意,这样写入磁盘会更快,节约磁盘空间,并且减少传给reducer的数据量。在默认情况下,输出是不压缩的,需要将mapreduce.map.output.compress设置未true。使用的压缩库由mapreduce.map.output.compress.codec属性指定。

reducer 通过HTTP方式得到输出文件的分区,文件分区的工作线程的数量是由属性mapreduce.shuffle.max.threads控制的。此设置是针对每一个NodeManager,不是针对每个map任务。默认情况下这个值是0到处理器数量的2倍。

reduce side

reduce任务需要集群上若干个map任务的输出作为其特殊的分区文件。每个map任务的完成时间可能不同,因此只要有一个任务完成,reduce任务旧开始复制其输出。reduce任务有少量的copier线程,因此你呢狗狗并行的取得map的输出。默认为5个线程,并且可以通过mapreduce.reduce.shuffle.parallelcopies 属性设置。

reduce如何知道要从哪台机器取得map输出的呢?

map任务成功完成后,它会通过 heartbeat机制通知Application Master。因此,对于一个作业,Application Master知道 map output和hosts 之间的映射。reducer会有一个线程轮询master获取map输出的位置,直至获取所有输出的位置。

由于第一个reducer可能失败,所以master并不会在第一reducer检索到map outputs时就从磁盘上删除它们。在job完成后,等待Application Master告知其可以删除时,才得以删除。

如果map任务的输出足够小,会被拷贝到reducer的JVM内存中(mapreduce.reduce.shuffle.input.buffer.percent 属性可以设置其大小,指定用于此用途的堆空间的百分比),否则,将会被复制到磁盘。一旦达到缓冲区大小的阈值时(mapreduce.reduce.shuffle.merge.percent )或者达到map输出的阈值(mapreduce.reduce.merge.inmem.threshold),则合并后spill到磁盘。如果指定combiner,则在合并期间运行它,降低写入硬盘的数据量。

注:为了合并,压缩的文件在内存中都会被解压缩,然后合并。

复制完所有的map输出后,reduce任务进入排序阶段(准确的说是合并阶段,排序是在map端发生的),这个阶段维持map输出的顺序,合并 map outputs。举个例子:

如果有50个map outputs,merge factor是10(mapreduce.task.io.sort.factor 可以设置大小),合并将进行5次,每10个文件合并成一个文件,最后有5个中间文件。

最后将这个5个文件合并未一个文件是调用的reduce函数。



合并的文件数实际上并不会像上面例子中所展示的那样。比如上图展示了40个文件合并的次数。

配置调优

总的原则是给shuffle过程尽量多的提供内存空间。然而,有一个权衡,需要确保map和reduce能得到足够的内存去执行任务。这个也是编写map函数和reduce函数时尽量少用内存的原因,或者不应该大量或者无限制的使用内存。

mapred.child.java.opts 可以设置map和reduce任务JVM的大小。任务节点上的内存应该尽量大。

在map端,应该尽量避免多次spill 数据到磁盘来获取性能的提升,一次是最佳的情况。如果可以估算map输出的大小,可以合理的设置mapreduce.task.io.sort.* 属性尽可能减少spill的次数。Mapreduce 计数器计算在作业运行整个阶段中spill到磁盘的record数量,这有利于性能调优。

在reduce端,中间数据全部存储在内存中时,就能获取最佳性能。默认情况下,是不可能发生的。如果reduce函数需要的内存不大,可以把mapreduce.reduce.merge.inmem.threshold设置未0,mapreduce.reduce.input.buffer.percent设置未1.0 就可以提升性能。

Hadoop默认使用的缓冲区大小为 4KB,可以通过属性io.file.buffer.size设置。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: