您的位置:首页 > 其它

随手记点-mapreduce3

2016-03-13 12:10 363 查看

1.在map处理的过程中有几个线程?各个线程又分别完成了什么任务?

答:mainThread:主要任务是获取(k,v)数据,map处理数据,paritition分区,seralize序列化,结果写入缓冲区。

spillThread:当mainThread发现内存缓冲区的占用百分比已经达到某个临界值(默认为0.8)时就会唤醒spillThead线程。spillThead线程把内存缓冲区里的数据sortAndSpill到硬盘上,每次spill都会溢写一个文件(如果有combine,则先进行combine操作,再写入硬盘中),这样会产生多个溢写文件到硬盘上。

2.mapper运行大致的流程如何描述?

答:setup方法做一些配置,默认为空;nextKeyValue()函数循环读取下一个(k,v)对,并交给map函数进行处理,默认map方法什么也不做,即输入和输出完全一样;map函数中调用的方法getCurrentKey()和getCurrentValue()是对RecordReader的getCurrentKey()和getCurrentValue()的封装;write()是对输出对象的write方法的封装。

public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}

// 默认的map方法
protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}


3.map只能是处理一个键值对,那么在mapreduce的框架中是如何不断的使用map来处理多个键值对的?

答:在启动run mapper后会通过一个循环,持续获取context中的nextKeyValue(),即会不断地使用map来处理多个(k,v)对。详见上段程序。

4.Mapper中的Inputformat和RecordReader的区别?

答:InputFormat关注于文件如何分割,所以内部封装了isSplitable和getSplit的方法,而RecordReader关注于将一个文件中的内容转换为键值对。

InputFormat是将一个文件分为split,生成一个输入对象。调用了getInputFormatClass来获取自己设置的输入类,默认输入方法为TextInputFormat。

//runNewMapper()部分代码
// make the input format
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);


在RecordReader中的NewTrackingRecordReader(INKEY, INVALUE)函数其实是对获得(K,V)的方法又进行了封装,同时增加了一些记录。这个输入对象才能够读取文本,封装了键值对,提供nextKeyValue()、getCurrentkey()和getCurrentValue()等方法。

//runNewMapper()中初始化RecordReader
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =new NewTrackingRecordReader<INKEY,INVALUE>
(split, inputFormat, reporter, job, taskContext);


5.map处理后的结果放在哪?

答:map处理后的结果放在输出缓冲区。最终是使用的MapOutputBuffer中的collect(),直接写入了缓存中。收集的数据就是(key,value,partition)三个数据作为一个逻辑单元。

//runNewMapper()中初始化RecordWriter
// get an output object
if (job.getNumReduceTasks() == 0) {
output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}


NewOutputCollector存在write()的方法,可以从map的运行中看出,调用的输出对象context的write方法,其实就是out的write方法。

public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value,partitioner.getPartition(key, value,partitions));
}


其中

MapOutputCollector<K,V> collector;
collector = new MapOutputBuffer<K,V>(umbilical, job,reporter);


6.partitioner的目的是什么?partitioner如何进行实例化?

答:存入的数据是(key,value,partition)三个数据,其中partition实现的是给数据贴标签,目的是指示map的输出给哪个reduce去处理。partitioner的实例化如下,默认使用 hashpartition。

partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);

JobContextpublic Class<? extends Partitioner<?,?>>getPartitionerClass()
throws ClassNotFoundException {
return (Class<? extends Partitioner<?,?>>)
conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  mapreduce