Hadoop中reduce端shuffle过程及源码解析
2016-05-22 21:20
337 查看
一、概要描述
在Child的main函数中通过TaskUmbilicalProtocol协议,从TaskTracker获得需要执行的Task,并调用Task的run方法来执行。在ReduceTask而Task的run方法会通过java反射机制构造Reducer,Reducer.Context,然后调用构造的Reducer的run方法执行reduce操作。不同于map任务,在执行reduce任务前,需要把map的输出从map运行的tasktracker上拷贝到reducer运行的tasktracker上。
Reduce需要集群上若干个map任务的输出作为其特殊的分区文件。每个map任务完成的时间可能不同,因此只要有一个任务完成,reduce任务就开始复制其输出。这就是reduce任务的复制阶段。其实是启动若干个MapOutputCopier线程来复制完所有map输出。在复制完成后reduce任务进入排序阶段。这个阶段将由LocalFSMerger或InMemFSMergeThread合并map输出,维持其顺序排序。【即对有序的几个文件进行归并,采用归并排序】在reduce阶段,对已排序输出的每个键都要调用reduce函数,此阶段的输出直接写到文件系统,一般为HDFS上。(如果采用HDFS,由于tasktracker节点也是DataNoe,所以第一个块副本将被写到本地磁盘。 即数据本地化)
二、 流程描述
1.在ReduceTak中 构建ReduceCopier对象,调用其fetchOutputs方法。
2. 在ReduceCopier的fetchOutputs方法中分别构造几个独立的线程。相互配合,并分别独立的完成任务。
2.1 GetMapEventsThread线程通过RPC询问TaskTracker,对每个完成的Event,获取maptask所在的服务器地址,即MapTask输出的地址,构造URL,加入到mapLocations,供copier线程获取。
2.2构造并启动若干个MapOutputCopier线程,通过http协议,把map的输出从远端服务器拷贝的本地,如果可以放在内存中,则存储在内存中调用,否则保存在本地文件。
2.3LocalFSMerger对磁盘上的map 输出进行归并。
2.4nMemFSMergeThread对内存中的map输出进行归并。
3.根据拷贝到的map输出构造一个raw keyvalue的迭代器,作为reduce的输入。
4. 调用runNewReducer方法中根据配置的Reducer类构造一个Reducer实例和运行的上下文。并调用reducer的run方法来执行到用户定义的reduce操作。
5.在Reducer的run方法中从上下文中取出一个key和该key对应的Value集合(Iterable类型),调用reducer的reduce方法进行处理。
6. Recuer的reduce方法是用户定义的处理数据的方法,也是用户唯一需要定义的方法。
三、代码详细
ReduceTask的run方法。
ReduceTask的runNewReducer方法。根据配置构造reducer以及其运行的上下文,调用reducer的reduce方法。
抽象类Reducer的run方法。从上下文中取出一个key和该key对应的Value集合(Iterable类型),调用reducer的reduce方法进行处理。
Reducer类的reduce,是用户一般会覆盖来执行reduce处理逻辑的方法。
参考下文:
http://www.idouba.net/hadoop_mapreduce_tasktracker_child_reduce/
在Child的main函数中通过TaskUmbilicalProtocol协议,从TaskTracker获得需要执行的Task,并调用Task的run方法来执行。在ReduceTask而Task的run方法会通过java反射机制构造Reducer,Reducer.Context,然后调用构造的Reducer的run方法执行reduce操作。不同于map任务,在执行reduce任务前,需要把map的输出从map运行的tasktracker上拷贝到reducer运行的tasktracker上。
Reduce需要集群上若干个map任务的输出作为其特殊的分区文件。每个map任务完成的时间可能不同,因此只要有一个任务完成,reduce任务就开始复制其输出。这就是reduce任务的复制阶段。其实是启动若干个MapOutputCopier线程来复制完所有map输出。在复制完成后reduce任务进入排序阶段。这个阶段将由LocalFSMerger或InMemFSMergeThread合并map输出,维持其顺序排序。【即对有序的几个文件进行归并,采用归并排序】在reduce阶段,对已排序输出的每个键都要调用reduce函数,此阶段的输出直接写到文件系统,一般为HDFS上。(如果采用HDFS,由于tasktracker节点也是DataNoe,所以第一个块副本将被写到本地磁盘。 即数据本地化)
二、 流程描述
1.在ReduceTak中 构建ReduceCopier对象,调用其fetchOutputs方法。
2. 在ReduceCopier的fetchOutputs方法中分别构造几个独立的线程。相互配合,并分别独立的完成任务。
2.1 GetMapEventsThread线程通过RPC询问TaskTracker,对每个完成的Event,获取maptask所在的服务器地址,即MapTask输出的地址,构造URL,加入到mapLocations,供copier线程获取。
2.2构造并启动若干个MapOutputCopier线程,通过http协议,把map的输出从远端服务器拷贝的本地,如果可以放在内存中,则存储在内存中调用,否则保存在本地文件。
2.3LocalFSMerger对磁盘上的map 输出进行归并。
2.4nMemFSMergeThread对内存中的map输出进行归并。
3.根据拷贝到的map输出构造一个raw keyvalue的迭代器,作为reduce的输入。
4. 调用runNewReducer方法中根据配置的Reducer类构造一个Reducer实例和运行的上下文。并调用reducer的run方法来执行到用户定义的reduce操作。
5.在Reducer的run方法中从上下文中取出一个key和该key对应的Value集合(Iterable类型),调用reducer的reduce方法进行处理。
6. Recuer的reduce方法是用户定义的处理数据的方法,也是用户唯一需要定义的方法。
三、代码详细
ReduceTask的run方法。
@SuppressWarnings("unchecked") public void run(JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, InterruptedException, ClassNotFoundException { job.setBoolean("mapred.skip.on", isSkipping()); if (isMapOrReduce()) { copyPhase = getProgress().addPhase("copy"); sortPhase = getProgress().addPhase("sort"); reducePhase = getProgress().addPhase("reduce"); } // start thread that will handle communication with parent TaskReporter reporter = new TaskReporter(getProgress(), umbilical); reporter.startCommunicationThread(); boolean useNewApi = job.getUseNewReducer(); initialize(job, getJobID(), reporter, useNewApi); // check if it is a cleanupJobTask if (jobCleanup) { runJobCleanupTask(umbilical, reporter); return; } if (jobSetup) { runJobSetupTask(umbilical, reporter); return; } if (taskCleanup) { runTaskCleanupTask(umbilical, reporter); return; } // Initialize the codec codec = initCodec(); boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local")); //如果不是一个本地执行额模式(就是配置中不是分布式的),则要启动一个ReduceCopier来拷贝Map的输出,即Reduce的输入。 if (!isLocal) { reduceCopier = new ReduceCopier(umbilical, job, reporter); if (!reduceCopier.fetchOutputs()) { if(reduceCopier.mergeThrowable instanceof FSError) { LOG.error("Task: " + getTaskID() + " - FSError: " + StringUtils.stringifyException(reduceCopier.mergeThrowable)); umbilical.fsError(getTaskID(), reduceCopier.mergeThrowable.getMessage()); } throw new IOException("Task: " + getTaskID() + " - The reduce copier failed", reduceCopier.mergeThrowable); } } copyPhase.complete(); //拷贝完成后,进入sort阶段。 setPhase(TaskStatus.Phase.SORT); statusUpdate(umbilical); final FileSystem rfs = FileSystem.getLocal(job).getRaw(); RawKeyValueIterator rIter = isLocal ? Merger.merge(job, rfs, job.getMapOutputKeyClass(), job.getMapOutputValueClass(), codec, getMapFiles(rfs, true), !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100), new Path(getTaskID().toString()), job.getOutputKeyComparator(), reporter, spilledRecordsCounter, null) : reduceCopier.createKVIterator(job, rfs, reporter); // free up the data structures mapOutputFilesOnDisk.clear(); sortPhase.complete(); // sort is complete setPhase(TaskStatus.Phase.REDUCE); statusUpdate(umbilical); Class keyClass = job.getMapOutputKeyClass(); Class valueClass = job.getMapOutputValueClass(); RawComparator comparator = job.getOutputValueGroupingComparator(); if (useNewApi) { runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } else { runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } done(umbilical, reporter); }
ReduceTask的runNewReducer方法。根据配置构造reducer以及其运行的上下文,调用reducer的reduce方法。
Java @SuppressWarnings("unchecked") private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewReducer(JobConf job, final TaskUmbilicalProtocol umbilical, final TaskReporter reporter, RawKeyValueIterator rIter, RawComparator<INKEY> comparator, Class<INKEY> keyClass, Class<INVALUE> valueClass ) throws IOException,InterruptedException, ClassNotFoundException { //1. 构造TaskContext org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID()); //2. 根据配置的Reducer类构造一个Reducer实例 org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer = (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getReducerClass(), job); //3. 构造RecordWriter org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output = (org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>) outputFormat.getRecordWriter(taskContext); job.setBoolean("mapred.skip.on", isSkipping()); //4. 构造Context,是Reducer运行的上下文 org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, getTaskID(), rIter, reduceInputValueCounter, output, committer, reporter, comparator, keyClass, valueClass); reducer.run(reducerContext); output.close(reducerContext); }
抽象类Reducer的run方法。从上下文中取出一个key和该key对应的Value集合(Iterable类型),调用reducer的reduce方法进行处理。
3 4 5 6 7 public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKey()) { reduce(context.getCurrentKey(), context.getValues(), context); } cleanup(context); }
Reducer类的reduce,是用户一般会覆盖来执行reduce处理逻辑的方法。
@SuppressWarnings("unchecked") protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context ) throws IOException, InterruptedException { for(VALUEIN value: values) { context.write((KEYOUT) key, (VALUEOUT) value); }
参考下文:
http://www.idouba.net/hadoop_mapreduce_tasktracker_child_reduce/
相关文章推荐
- nginx rewrite uri地址重写
- PS 的小应用
- uBuntu make xconfig Linux内核配置 问题
- Linux学习之环境变量
- 第104讲: Spark Streaming电商广告点击综合案例需求分析和技术架构
- 短信猫短信收发平台
- photoshop实现图片更换背景
- 深入理解Docker Volume(二)
- 升级centos6.5内核至3.10及安装docker
- 最新Linux版本 jira6.3.6安装汉化破解以及数据迁移
- CentOS7安装mysql-5.7(glibc版)
- 小知识-为什么Linux不需要磁盘碎片整理
- Apache Flink数据流的Fault Tolerance机制
- Centos中vim的配置
- CodeForces 581C Developing Skills
- Linux内核module_param的使用
- Glut处理鼠标事件
- (三)洞悉linux下的Netfilter&iptables:内核中的rule,match和target
- 配置 Tomcat 环境变量
- 强悍的 Linux —— 强悍的 vim