您的位置:首页 > 编程语言

MapReduce 运行过程源代码分析

2012-03-09 22:28 204 查看
1. Map类及流程分析

(1) InputFormat generates InputSplit and produces RecordReader; An InputSplit is a logical representation of a unit of input work for a map task; RecordReader will turn those logical work units into actual physical input records.

(2) 运行job中的Mapper implementation; Mapper contains run() method, which will calls setup() once, map() method for each input record, and finally cleanup() method. All can be overided.

(3) Context object let M/R instance interact with the rest of Hadoop System. Set configuration data in job with Job.getConfiguration().set("",""); Get configuration data with Context.getConfiguration().get("",""). 通常将这类的操作放在setup()中。

(4) Mapper.run calls map() for each input key/value pair in that inputsplit;

(5) 将处理结果key/value通过context.write()写入内存,由此展开混洗过程。

Context class is useful that application can use it to report application-level status and update Counters instance.

(6) 首先通过partitioner对key hash后获取partition信息,然后将key/value/partition信息写到内存缓冲区内存缓冲批量收集map结果,减少磁盘IO。在写磁盘前,将在内存缓冲区对安装partition进行分区,然后再每个分区内部完成排序。

(7) 内存缓冲区默认大小100M,当数据达到设置的百分比时启动溢写线程。如果一次spill有很多key/value具有相同的partition信息,那么会进行拼接,以减少partition信息所占用的空间;

(8) 如果设置有combiner,那么在溢写磁盘前,会完成combiner操作,以减少写到磁盘的数据大小, perform local aggregation. 每个spill单元都进行排序;所以每个溢写文件都内部都是有序的且包含分区信息;

(9) 溢写线程会根据需要生成一到多个溢写文件;

(10) Map完成之后,需要将多个溢写文件进行merge操作,形成一个大文件;

这个大文件包含partition信息(基本以此分组的存放),包含键值对; 另外,多个文件merge时,会形成(key,[value1,value2....])的结构数据(这也是在reduce阶段要进行同key值合并的原因); 这个大文件是已经分区的且已经排序了。

All intermediate values associated with a given output key are subsequently grouped by the framework, and passed to the Reducer(s) to determine the final output. Users can control the grouping by specifying a Comparator via JobConf.setOutputKeyComparatorClass(Class).
If default, all values with the same key will be presented by an unordered Iterable to a call to the Reducer. 如果就是为了想加的话,那么久没有必要了。

(11) 压缩如果开启,也会启动,以减少写到磁盘的数据大小; If no compress, the default format of intermediate, sorted output is stored in(key-len, key, value-len,value)

(12) 所有在inputsplit的records处理完成后,调用cleanup.

(13) Map 通知 tasktracker,进而通知jobtracker,以更新完成状态和输出文件信息。

(14) The final output of each Mapper is grouped, sorted and Partitioned.

2. Reduce过程分析

(1) Reduce进程会启动多个http复制线程,用于从各个mapper输出拷贝文件

(2) 上述拷贝过来的文件会被放入内存缓冲区,完成sort和merge操作;如果内存中的数据过多,那么也会溢写成多个文件;

(3) 后台线程将多个临时的磁盘文件归并成更大的文件作为Reducer的输入;

(4) Application can specify a Comparator for Secondary sort;

(5) 传输给reducer, 完成操作;

Mark and Reset are available while application iterate through the values for a given key.

(6) the output of the reducer is not sorted;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: