hadoop源码分析,map输出
2015-06-07 00:26
423 查看
Mapper 的输入官方文档如下
The Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job. Users can control which keys (and hence records) go to which Reducer by implementing a custom Partitioner.
mapper的输出是已经排序并且针对每个reducer划分开的,那么hadoop代码是如何划分的,这里将跟从代码分析。
还是根据官方示例WordCount的示例
第一次分析为了简化map的输出复杂情况,
只分析一个文档,并且其中只有10个'单词',分别为“J", .."c", "b", "a" ( 这里10个字母最好是乱序的,后面会看到其排序),
注释掉设置combine class的代码。
这里因为我们的output 只有10个Record 且每个大小都比较小,所以跳过了spill了处理以及combine处理,主要代码如下,
public synchronized void collect(K key, V value, final int partition ) throws IOException {
{
...
keySerializer.serialize(key);
...
valSerializer.serialize(value);
.... kvmeta.put(kvindex + PARTITION, partition); kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend)); ...
}
这里实际是将(K,V) 序列化到了byte数组org.apache.hadoop.mapred.MapTask.MapOutputBuffer.kvbuffer 中,
并将(K,V)在内存中的位置信息 以及 其partition(相同partition的record由同一个reducer处理) 消息 存在 kvmeta 中.
到此map的输出都存在了内存中
private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException { ...
sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
...
for (int i = 0; i < partitions; ++i) {
...
if (combinerRunner == null) {
// spill directly DataInputBuffer key = new DataInputBuffer();
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
....
writer.append(key, value);
++spindex;
}
} ...
spillRec.putIndex(rec, i);
}
...
indexCacheList.add(spillRec);
...}
这里有三个操作,
1. Sorter.sort :是以partition 和key 来排序的,目的是聚合相同partition的record, 并以key的顺序排列。
2. writer.append : 将序列化的record 写入输出流,这里写入到文件spill0.out
3. indexCacheList.add : 每个spillRec记录某个spill out文件中包含的partition信息。
这里只是唯一的spillRec 写入到到文件中, file.out.index
将spill0.out 重命名为file.out, 可以vim打开这个文件看到里面存在顺序号的字符。
private void mergeParts() throws IOException, InterruptedException, ClassNotFoundException {
...
sameVolRename(filename[0],
mapOutputFile.getOutputFileForWriteInVolume(filename[0]));...
indexCacheList.get(0).writeToFile(
mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);
...}
总结如下:
1. map的输出首先序列化到内存中kvbuffer,kvmeta
2. sortAndSpill 会将内存中的record写入到文件中
3. merge将spill出的文件merge问一个文件file.out,并将每个文件中partition的信息写入file.out.index
还没分析的情况:
map 输出大量数据,出现多个spill 文件的复杂情况的细节(1. 异步spill, 2. merge 多个文件)
The Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job. Users can control which keys (and hence records) go to which Reducer by implementing a custom Partitioner.
mapper的输出是已经排序并且针对每个reducer划分开的,那么hadoop代码是如何划分的,这里将跟从代码分析。
还是根据官方示例WordCount的示例
第一次分析为了简化map的输出复杂情况,
只分析一个文档,并且其中只有10个'单词',分别为“J", .."c", "b", "a" ( 这里10个字母最好是乱序的,后面会看到其排序),
注释掉设置combine class的代码。
1. 单步跟踪map中的context.write(生产kvbuffer 和kvmeta)
可以追踪到最终实际是由org.apache.hadoop.mapred.MapTask.MapOutputBuffer.collect(K, V, int)这里因为我们的output 只有10个Record 且每个大小都比较小,所以跳过了spill了处理以及combine处理,主要代码如下,
public synchronized void collect(K key, V value, final int partition ) throws IOException {
{
...
keySerializer.serialize(key);
...
valSerializer.serialize(value);
.... kvmeta.put(kvindex + PARTITION, partition); kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend)); ...
}
这里实际是将(K,V) 序列化到了byte数组org.apache.hadoop.mapred.MapTask.MapOutputBuffer.kvbuffer 中,
并将(K,V)在内存中的位置信息 以及 其partition(相同partition的record由同一个reducer处理) 消息 存在 kvmeta 中.
到此map的输出都存在了内存中
2. 通过查找kvmeta的代码索引, 找到消费kvbuffer和kvmeta代码,生产spillRecv到indexCacheList
可以找到在 org.apache.hadoop.mapred.MapTask.MapOutputBuffer.sortAndSpill() 中找到有使用,设置断点,看到如下,private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException { ...
sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
...
for (int i = 0; i < partitions; ++i) {
...
if (combinerRunner == null) {
// spill directly DataInputBuffer key = new DataInputBuffer();
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
....
writer.append(key, value);
++spindex;
}
} ...
spillRec.putIndex(rec, i);
}
...
indexCacheList.add(spillRec);
...}
这里有三个操作,
1. Sorter.sort :是以partition 和key 来排序的,目的是聚合相同partition的record, 并以key的顺序排列。
2. writer.append : 将序列化的record 写入输出流,这里写入到文件spill0.out
3. indexCacheList.add : 每个spillRec记录某个spill out文件中包含的partition信息。
3. 查找消费indexCacheList的代码,org.apache.hadoop.mapred.MapTask.MapOutputBuffer.mergeParts()
在此设置断点,可以看到这里我们只有一个spill文件,不需要merge,这里只是唯一的spillRec 写入到到文件中, file.out.index
将spill0.out 重命名为file.out, 可以vim打开这个文件看到里面存在顺序号的字符。
private void mergeParts() throws IOException, InterruptedException, ClassNotFoundException {
...
sameVolRename(filename[0],
mapOutputFile.getOutputFileForWriteInVolume(filename[0]));...
indexCacheList.get(0).writeToFile(
mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);
...}
总结如下:
1. map的输出首先序列化到内存中kvbuffer,kvmeta
2. sortAndSpill 会将内存中的record写入到文件中
3. merge将spill出的文件merge问一个文件file.out,并将每个文件中partition的信息写入file.out.index
还没分析的情况:
map 输出大量数据,出现多个spill 文件的复杂情况的细节(1. 异步spill, 2. merge 多个文件)
相关文章推荐
- JQuery插件Style定制化方法的分析与比较
- CSS和JS标签style属性对照表(方便js开发的朋友)
- JavaScript原生对象之Number对象的属性和方法详解
- Prototype源码浅析 Number部分
- JS中不为人知的五种声明Number的方式简要概述
- JavaScript中的object转换成number或string规则介绍
- Javascript基础教程之数据类型 (数值 Number)
- JavaScript中number转换成string介绍
- ppk谈JavaScript style属性
- ERROR 1222 (21000): The used SELECT statements have a different number of columns
- javascript parseInt与Number函数的区别
- asp中去除html中style,javascript,css代码
- PHP number_format() 函数定义和用法
- javascript下用for( in )语句 获得所有style 内容的脚本代码
- JavaScript修改css样式style动态改变元素样式
- JavaScript修改css样式style
- Default style sheet for HTML 4
- js中将String转换为number以便比较
- gridview生成时如何去掉style属性中的border-collapse
- Android入门之Style与Theme用法实例解析