随手记点-mapreduce3
2016-03-13 12:10
363 查看
1.在map处理的过程中有几个线程?各个线程又分别完成了什么任务?
答:mainThread:主要任务是获取(k,v)数据,map处理数据,paritition分区,seralize序列化,结果写入缓冲区。spillThread:当mainThread发现内存缓冲区的占用百分比已经达到某个临界值(默认为0.8)时就会唤醒spillThead线程。spillThead线程把内存缓冲区里的数据sortAndSpill到硬盘上,每次spill都会溢写一个文件(如果有combine,则先进行combine操作,再写入硬盘中),这样会产生多个溢写文件到硬盘上。
2.mapper运行大致的流程如何描述?
答:setup方法做一些配置,默认为空;nextKeyValue()函数循环读取下一个(k,v)对,并交给map函数进行处理,默认map方法什么也不做,即输入和输出完全一样;map函数中调用的方法getCurrentKey()和getCurrentValue()是对RecordReader的getCurrentKey()和getCurrentValue()的封装;write()是对输出对象的write方法的封装。public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); } // 默认的map方法 protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); }
3.map只能是处理一个键值对,那么在mapreduce的框架中是如何不断的使用map来处理多个键值对的?
答:在启动run mapper后会通过一个循环,持续获取context中的nextKeyValue(),即会不断地使用map来处理多个(k,v)对。详见上段程序。4.Mapper中的Inputformat和RecordReader的区别?
答:InputFormat关注于文件如何分割,所以内部封装了isSplitable和getSplit的方法,而RecordReader关注于将一个文件中的内容转换为键值对。InputFormat是将一个文件分为split,生成一个输入对象。调用了getInputFormatClass来获取自己设置的输入类,默认输入方法为TextInputFormat。
//runNewMapper()部分代码 // make the input format org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
在RecordReader中的NewTrackingRecordReader(INKEY, INVALUE)函数其实是对获得(K,V)的方法又进行了封装,同时增加了一些记录。这个输入对象才能够读取文本,封装了键值对,提供nextKeyValue()、getCurrentkey()和getCurrentValue()等方法。
//runNewMapper()中初始化RecordReader org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =new NewTrackingRecordReader<INKEY,INVALUE> (split, inputFormat, reporter, job, taskContext);
5.map处理后的结果放在哪?
答:map处理后的结果放在输出缓冲区。最终是使用的MapOutputBuffer中的collect(),直接写入了缓存中。收集的数据就是(key,value,partition)三个数据作为一个逻辑单元。//runNewMapper()中初始化RecordWriter // get an output object if (job.getNumReduceTasks() == 0) { output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { output = new NewOutputCollector(taskContext, job, umbilical, reporter); }
NewOutputCollector存在write()的方法,可以从map的运行中看出,调用的输出对象context的write方法,其实就是out的write方法。
public void write(K key, V value) throws IOException, InterruptedException { collector.collect(key, value,partitioner.getPartition(key, value,partitions)); }
其中
MapOutputCollector<K,V> collector; collector = new MapOutputBuffer<K,V>(umbilical, job,reporter);
6.partitioner的目的是什么?partitioner如何进行实例化?
答:存入的数据是(key,value,partition)三个数据,其中partition实现的是给数据贴标签,目的是指示map的输出给哪个reduce去处理。partitioner的实例化如下,默认使用 hashpartition。partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); JobContextpublic Class<? extends Partitioner<?,?>>getPartitionerClass() throws ClassNotFoundException { return (Class<? extends Partitioner<?,?>>) conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class); }
相关文章推荐
- Hadoop_2.1.0 MapReduce序列图
- MongoDB中的MapReduce简介
- MongoDB学习笔记之MapReduce使用示例
- MongoDB中MapReduce编程模型使用实例
- MapReduce中ArrayWritable 使用指南
- Java函数式编程(七):MapReduce
- java连接hdfs ha和调用mapreduce jar示例
- 用PHP和Shell写Hadoop的MapReduce程序
- JavaScript mapreduce工作原理简析
- mongodb mapredReduce 多个条件分组(group by)
- HBase基本原理
- HDFS DatanodeProtocol——sendHeartbeat
- HDFS DatanodeProtocol——register
- Hadoop集群提交作业问题总结
- Hadoop源码分析 HDFS ClientProtocol——addBlock
- Hadoop源码分析HDFS ClientProtocol——create
- Hadoop源码分析FSNamesystem几个重要的成员变量
- Hadoop源码分析HDFS ClientProtocol——getBlockLocations
- Hadoop源码分析HDFS Client向HDFS写入数据的过程解析
- ZooKeeper基本理解