Hadoop源代码分析(MapReduce概论)
2013-02-28 17:45
429 查看
2009-02-21
Hadoop源代码分析(MapReduce概论)
大家都熟悉文件系统,在对HDFS进行分析前,我们并没有花很多的时间去介绍HDFS的背景,毕竟大家对文件系统的还是有一定的理解的,而且也有很好的文档。在分析Hadoop的MapReduce部分前,我们还是先了解系统是如何工作的,然后再进入我们的分析部分。下面的图来自http://horicky.blogspot.com/2008/11/hadoop-mapreduce-implementation.html,是我看到的讲MapReduce最好的图。
以Hadoop带的wordcount为例子(下面是启动行):
hadoop jar hadoop-0.19.0-examples.jar wordcount /usr/input /usr/output
用户提交一个任务以后,该任务由JobTracker协调,先执行Map阶段(图中M1,M2和M3),然后执行Reduce阶段(图中R1和R2)。Map阶段和Reduce阶段动作都受TaskTracker监控,并运行在独立于TaskTracker的Java虚拟机中。
我们的输入和输出都是HDFS上的目录(如上图所示)。输入由InputFormat接口描述,它的实现如ASCII文件,JDBC数据库等,分别处理对于的数据源,并提供了数据的一些特征。通过InputFormat实现,可以获取InputSplit接口的实现,这个实现用于对数据进行划分(图中的splite1到splite5,就是划分以后的结果),同时从InputFormat也可以获取RecordReader接口的实现,并从输入中生成<k,v>对。有了<k,v>,就可以开始做map操作了。
map操作通过context.collect(最终通过OutputCollector. collect)将结果写到context中。当Mapper的输出被收集后,它们会被Partitioner类以指定的方式区分地写出到输出文件里。我们可以为Mapper提供Combiner,在Mapper输出它的<k,v>时,键值对不会被马上写到输出里,他们会被收集在list里(一个key值一个list),当写入一定数量的键值对时,这部分缓冲会被Combiner中进行合并,然后再输出到Partitioner中(图中M1的黄颜色部分对应着Combiner和Partitioner)。
Map的动作做完以后,进入Reduce阶段。这个阶段分3个步骤:混洗(Shuffle),排序(sort)和reduce。
混洗阶段,Hadoop的MapReduce框架会根据Map结果中的key,将相关的结果传输到某一个Reducer上(多个Mapper产生的同一个key的中间结果分布在不同的机器上,这一步结束后,他们传输都到了处理这个key的Reducer的机器上)。这个步骤中的文件传输使用了HTTP协议。
排序和混洗是一块进行的,这个阶段将来自不同Mapper具有相同key值的<key,value>对合并到一起。
Reduce阶段,上面通过Shuffle和sort后得到的<key, (list of values)>会送到Reducer. reduce方法中处理,输出的结果通过OutputFormat,输出到DFS中。
2009-02-25
Hadoop源代码分析(包org.apache.hadoop.mapreduce)
有了前一节的分析,我们来看一下具体的接口,它们都处于包org.apache.hadoop.mapreduce中。
上面的图中,类可以分为4种。右上角的是从Writeable继承的,和Counter(还有CounterGroup和Counters,也在这个包中,并没有出现在上面的图里)和ID相关的类,它们保持MapReduce过程中需要的一些计数器和标识;中间大部分是和Context相关的*Context类,它为Mapper和Reducer提供了相关的上下文;关于Map和Reduce,对应的类是Mapper,Reducer和描述他们的Job(在Hadoop 中一次计算任务称之为一个job,下面的分析中,中文为“作业”,相应的task我们称为“任务”);图中其他类是配合Mapper和Reduce工作的一些辅助类。
如果你熟悉HTTPServlet, 那就能很轻松地理解Hadoop采用的结构,把整个Hadoop看作是容器,那么Mapper和Reduce就是容器里的组件,*Context保存了组件的一些配置信息,同时也是和容器通信的机制。
和ID相关的类我们就不再讨论了。我们先看JobContext,它位于*Context继承树的最上方,为Job提供一些只读的信息,如Job的ID,名称等。下面的信息是MapReduce过程中一些较关键的定制信息:
(来自http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop2/index.html):
Job继承自JobContext,提供了一系列的set方法,用于设置Job的一些属性(Job更新属性,JobContext读属性),同时,Job还提供了一些对Job进行控制的方法,如下:
l mapProgress:map的进度(0—1.0);
l reduceProgress:reduce的进度(0—1.0);
l isComplete:作业是否已经完成;
l isSuccessful:作业是否成功;
l killJob:结束一个在运行中的作业;
l getTaskCompletionEvents:得到任务完成的应答(成功/失败);
l killTask:结束某一个任务;
2009-02-25
Hadoop源代码分析(包mapreduce.lib.input)
接下来我们按照MapReduce过程中数据流动的顺序,来分解org.apache.hadoop.mapreduce.lib.*的相关内容,并介绍对应的基类的功能。首先是input部分,它实现了MapReduce的数据输入部分。类图如下:
类图的右上角是InputFormat,它描述了一个MapReduce Job的输入,通过InputFormat,Hadoop可以:
l 检查MapReduce输入数据的正确性;
l 将输入数据切分为逻辑块InputSplit,这些块会分配给Mapper;
l 提供一个RecordReader实现,Mapper用该实现从InputSplit中读取输入的<K,V>对。
在org.apache.hadoop.mapreduce.lib.input中,Hadoop为所有基于文件的InputFormat提供了一个虚基类FileInputFormat。下面几个参数可以用于配置FileInputFormat:
l mapred.input.pathFilter.class:输入文件过滤器,通过过滤器的文件才会加入InputFormat;
l mapred.min.split.size:最小的划分大小;
l mapred.max.split.size:最大的划分大小;
l mapred.input.dir:输入路径,用逗号做分割。
类中比较重要的方法有:
protected List<FileStatus> listStatus(Configuration job)
递归获取输入数据目录中的所有文件(包括文件信息),输入的job是系统运行的配置Configuration,包含了上面我们提到的参数。
public List<InputSplit> getSplits(JobContext context)
将输入划分为InputSplit,包含两个循环,第一个循环处理所有的文件,对于每一个文件,根据输入的划分最大/最小值,循环得到文件上的划分。注意,划分不会跨越文件。
FileInputFormat没有实现InputFormat的createRecordReader方法。
FileInputFormat有两个子类,SequenceFileInputFormat是Hadoop定义的一种二进制形式存放的键/值文件(参考http://hadoop.apache.org/core/docs/current/api/org/apache/hadoop/io/SequenceFile.html),它有自己定义的文件布局。由于它有特殊的扩展名,所以SequenceFileInputFormat重载了listStatus,同时,它实现了createRecordReader,返回一个SequenceFileRecordReader对象。TextInputFormat处理的是文本文件,createRecordReader返回的是LineRecordReader的实例。这两个类都没有重载FileInputFormat的getSplits方法,那么,在他们对于的RecordReader中,必须考虑FileInputFormat对输入的划分方式。
FileInputFormat的getSplits,返回的是FileSplit。这是一个很简单的类,包含的属性(文件名,起始偏移量,划分的长度和可能的目标机器)已经足以说明这个类的功能。
RecordReader用于在划分中读取<Key,Value>对。RecordReader有五个虚方法,分别是:
l initialize:初始化,输入参数包括该Reader工作的数据划分InputSplit和Job的上下文context;
l nextKey:得到输入的下一个Key,如果数据划分已经没有新的记录,返回空;
l nextValue:得到Key对应的Value,必须在调用nextKey后调用;
l getProgress:得到现在的进度;
l close,来自java.io的Closeable接口,用于清理RecordReader。
我们以LineRecordReader为例,来分析RecordReader的构成。前面我们已经分析过FileInputFormat对文件的划分了,划分完的Split包括了文件名,起始偏移量,划分的长度。由于文件是文本文件,LineRecordReader的初始化方法initialize会创建一个基于行的读取对象LineReader(定义在org.apache.hadoop.util中,我们就不分析啦),然后跳过输入的最开始的部分(只在Split的起始偏移量不为0的情况下进行,这时最开始的部分可能是上一个Split的最后一行的一部分)。nextKey的处理很简单,它使用当前的偏移量作为Key,nextValue当然就是偏移量开始的那一行了(如果行很长,可能出现截断)。进度getProgress和close都很简单。
2009-02-25
Hadoop源代码分析(包mapreduce.lib.map)
Hadoop的MapReduce框架中,Map动作通过Mapper类来抽象。一般来说,我们会实现自己特殊的Mapper,并注册到系统中,执行时,我们的Mapper会被MapReduce框架调用。Mapper类很简单,包括一个内部类和四个方法,静态结构图如下:
内部类Context继承自MapContext,并没有引入任何新的方法。
Mapper的四个方法是setup,map,cleanup和run。其中,setup和cleanup用于管理Mapper生命周期中的资源,setup在完成Mapper构造,即将开始执行map动作前调用,cleanup则在所有的map动作完成后被调用。方法map用于对一次输入的key/value对进行map动作。run方法执行了上面描述的过程,它调用setup,让后迭代所有的key/value对,进行map,最后调用cleanup。
org.apache.hadoop.mapreduce.lib.map中实现了Mapper的三个子类,分别是InverseMapper(将输入<key, value> map为输出<value, key>),MultithreadedMapper(多线程执行map方法)和TokenCounterMapper(对输入的value分解为token并计数)。其中最复杂的是MultithreadedMapper,我们就以它为例,来分析Mapper的实现。
MultithreadedMapper会启动多个线程执行另一个Mapper的map方法,它会启动mapred.map.multithreadedrunner.threads(配置项)个线程执行Mapper:mapred.map.multithreadedrunner.class(配置项)。MultithreadedMapper重写了基类Mapper的run方法,启动N个线程(对应的类为MapRunner)执行mapred.map.multithreadedrunner.class(我们称为目标Mapper)的run方法(就是说,目标Mapper的setup和cleanup会被执行多次)。目标Mapper共享同一份InputSplit,这就意味着,对InputSplit的数据读必须线程安全。为此,MultithreadedMapper引入了内部类SubMapRecordReader,SubMapRecordWriter,SubMapStatusReporter,分别继承自RecordReader,RecordWriter和StatusReporter,它们通过互斥访问MultithreadedMapper的Mapper.Context,实现了对同一份InputSplit的线程安全访问,为Mapper提供所需的Context。这些类的实现方法都很简单。
2009-02-26
Hadoop源代码分析(mapreduce.lib.partition/reduce/output)
Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出,下面我们就来分析参与这个过程的类。
Mapper的结果,可能送到可能的Combiner做合并,Combiner在系统中并没有自己的基类,而是用Reducer作为Combiner的基类,他们对外的功能是一样的,只是使用的位置和使用时的上下文不太一样而已。
Mapper最终处理的结果对<key, value>,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer那,哪个key到哪个Reducer的分配过程,是由Partitioner规定的,它只有一个方法,输入是Map的结果对<key, value>和Reducer的数目,输出则是分配的Reducer(整数编号)。系统缺省的Partitioner是HashPartitioner,它以key的Hash值对Reducer的数目取模,得到对应的Reducer。
Reducer是所有用户定制Reducer类的基类,和Mapper类似,它也有setup,reduce,cleanup和run方法,其中setup和cleanup含义和Mapper相同,reduce是真正合并Mapper结果的地方,它的输入是key和这个key对应的所有value的一个迭代器,同时还包括Reducer的上下文。系统中定义了两个非常简单的Reducer,IntSumReducer和LongSumReducer,分别用于对整形/长整型的value求和。
Reduce的结果,通过Reducer.Context的方法collect输出到文件中,和输入类似,Hadoop引入了OutputFormat。OutputFormat依赖两个辅助接口:RecordWriter和OutputCommitter,来处理输出。RecordWriter提供了write方法,用于输出<key, value>和close方法,用于关闭对应的输出。OutputCommitter提供了一系列方法,用户通过实现这些方法,可以定制OutputFormat生存期某些阶段需要的特殊操作。我们在TaskInputOutputContext中讨论过这些方法(明显,TaskInputOutputContext是OutputFormat和Reducer间的桥梁)。
OutputFormat和RecordWriter分别对应着InputFormat和RecordReader,系统提供了空输出NullOutputFormat(什么结果都不输出,NullOutputFormat.RecordWriter只是示例,系统中没有定义),LazyOutputFormat(没在类图中出现,不分析),FilterOutputFormat(不分析)和基于文件FileOutputFormat的SequenceFileOutputFormat和TextOutputFormat输出。
基于文件的输出FileOutputFormat利用了一些配置项配合工作,包括mapred.output.compress:是否压缩;mapred.output.compression.codec:压缩方法;mapred.output.dir:输出路径;mapred.work.output.dir:输出工作路径。FileOutputFormat还依赖于FileOutputCommitter,通过FileOutputCommitter提供一些和Job,Task相关的临时文件管理功能。如FileOutputCommitter的setupJob,会在输出路径下创建一个名为_temporary的临时目录,cleanupJob则会删除这个目录。
SequenceFileOutputFormat输出和TextOutputFormat输出分别对应输入的SequenceFileInputFormat和TextInputFormat,我们就不再详细分析啦。
2009-03-06
Hadoop源代码分析(包hadoop.mapred中的MapReduce接口)
前面已经完成了对org.apache.hadoop.mapreduce的分析,这个包提供了Hadoop MapReduce部分的应用API,用于用户实现自己的MapReduce应用。但这些接口是给未来的MapReduce应用的,目前MapReduce框架还是使用老系统(参考补丁HADOOP-1230)。下面我们来分析org.apache.hadoop.mapred,首先还是从mapred的MapReduce框架开始分析,下面的类图(灰色部分为标记为@Deprecated的类/接口):
我们把包mapreduce的类图附在下面,对比一下,我们就会发现,org.apache.hadoop.mapred中的MapReduce API相对来说很简单,主要是少了和Context相关的类,那么,好多在mapreduce中通过context来完成的工作,就需要通过参数来传递,如Map中的输出,老版本是:
output.collect(key, result); // output’s type is: OutputCollector
新版本是:
context.write(key, result); // output’s type is: Context
它们分别使用OutputCollector和Mapper.Context来输出map的结果,显然,原有OutputCollector的新API中就不再需要。总体来说,老版本的API比较简单,MapReduce过程中关键的对象都有,但可扩展性不是很强。同时,老版中提供的辅助类也很多,我们前面分析的FileOutputFormat,也有对应的实现,我们就不再讨论了。
2009-03-10
Hadoop源代码分析(*IDs类和*Context类)
我们开始来分析Hadoop MapReduce的内部的运行机制。用户向Hadoop提交Job(作业),作业在JobTracker对象的控制下执行。Job被分解成为Task(任务),分发到集群中,在TaskTracker的控制下运行。Task包括MapTask和ReduceTask,是MapReduce的Map操作和Reduce操作执行的地方。这中任务分布的方法比较类似于HDFS中NameNode和DataNode的分工,NameNode对应的是JobTracker,DataNode对应的是TaskTracker。JobTracker,TaskTracker和MapReduce的客户端通过RPC通信,具体可以参考HDFS部分的分析。
我们先来分析一些辅助类,首先是和ID有关的类,ID的继承树如下:
这张图可以看出现在Hadoop的org.apache.hadoop.mapred向org.apache.hadoop.mapreduce迁移带来的一些问题,其中灰色是标注为@Deprecated的。ID携带一个整型,实现了WritableComparable接口,这表明它可以比较,而且可以被Hadoop的io机制串行化/解串行化(必须实现compareTo/readFields/write方法)。JobID是系统分配给作业的唯一标识符,它的toString结果是job_<jobtrackerID>_<jobNumber>。例子:job_200707121733_0003表明这是jobtracker 200707121733(利用jobtracker的开始时间作为ID)的第3号作业。
作业分成任务执行,任务号TaskID包含了它所属的作业ID,同时也有任务ID,同时还保持了这是否是一个Map任务(成员变量isMap)。任务号的字符串表示为task_<jobtrackerID>_<jobNumber>_[m|r]_<taskNumber>,如task_200707121733_0003_m_000005表示作业200707121733_0003的000005号任务,改任务是一个Map任务。
一个任务有可能有多个执行(错误恢复/消除Stragglers等),所以必须区分任务的多个执行,这是通过类TaskAttemptID来完成,它在任务号的基础上添加了尝试号。一个任务尝试号的例子是attempt_200707121733_0003_m_000005_0,它是任务task_200707121733_0003_m_000005的第0号尝试。
JVMId用于管理任务执行过程中的Java虚拟机,我们后面再讨论。
为了使Job和Task工作,Hadoop提供了一系列的上下文,这些上下文保存了Job和Task工作的信息。
处于继承树的最上方是org.apache.hadoop.mapreduce.JobContext,前面我们已经介绍过了,它提供了Job的一些只读属性,两个成员变量,一个保存了JobID,另一个类型为JobConf,JobContext中除了JobID外,其它的信息都保持在JobConf中。它定义了如下配置项:
l mapreduce.inputformat.class:InputFormat的实现
l mapreduce.map.class:Mapper的实现
l mapreduce.combine.class: Reducer的实现
l mapreduce.reduce.class:Reducer的实现
l mapreduce.outputformat.class: OutputFormat的实现
l mapreduce.partitioner.class: Partitioner的实现
同时,它提供方法,使得通过类名,利用Java反射提供的Class.forName方法,获得类对应的Class。org.apache.hadoop.mapred的JobContext对象比org.apache.hadoop.mapreduce.JobContext多了成员变量progress,用于获取进度信息,它类型为JobConf成员job指向mapreduce.JobContext对应的成员,没有添加任何新功能。
JobConf继承自Configuration,保持了MapReduce执行需要的一些配置信息,它管理着46个配置参数,包括上面mapreduce配置项对应的老版本形式,如mapreduce.map.class 对应mapred.mapper.class。这些配置项我们在使用到它们的时候再介绍。
org.apache.hadoop.mapreduce.JobContext的子类Job前面也已经介绍了,后面在讨论系统的动态行为时,再回来看它。
TaskAttemptContext用于任务的执行,它引入了标识任务执行的TaskAttemptID和任务状态status,并提供新的访问接口。org.apache.hadoop.mapred的TaskAttemptContext继承自mapreduce的对应版本,只是增加了记录进度的progress。
TaskInputOutputContext和它的子类都在包org.apache.hadoop.mapreduce中,前面已经分析过了,我们就不再罗嗦。
2009-03-10
Hadoop源代码分析(类TaskStatus)
我们先分析Task,这是一个规模比较大的类,类图如下。Task是一个虚类,它有两个子类,MapTask和ReduceTask,分别是Map任务和Reduce任务的抽象。
在分析Task相关类之前,我们来分析和ID,JobID,TaskID相关的类。
我们从TaskStatus开始来分析Task相关的类,TaskStatus,一看类名就知道它保持了Task的状态。从前面介绍MapReduce的过程中,我们了解到,MapReduce的过程可以处于下面6个阶段,它们定义在枚举:TaskStatus.Phase中,包括如下状态:
l STARTING:开始
l MAP:Map阶段
l SHUFFLE:混洗阶段
l SORT:排序阶段
l REDUCE:Reduce阶段
l CLEANUP:清理阶段
除了阶段,TaskStatus还维护任务的状态,很明显,如果不考虑异常,一次任务应该包括准备,运行和清理三个主要阶段,其实TaskStatus的正常流程和这个非常类似,同时,考虑到任务可能异常结束或被JobTracker杀死,系统还引入配合这两种异常情况的状态,其状态如下:
图中引入了复合状态,只是表明这些状态中包含的状态(如绿色的COMMIT_PENDING和SUCCESSED)可以转移到外面的状态(FAILED)。
(注:这张图是通过人肉逆向工程画出来的,在以后的分析过程中,这张图会根据我们对系统的深入了解而修改)
接下来我们来看TaskStatus的其它成员,它的完整类图如下,基本上是一些信息,没有复杂的操作。
它包含的主要状态信息有:taskid(对应的任务号),progress(处理情况),runState(运行状态,注意和任务阶段做区分),diagnosticInfo(诊断信息),stateString(运行状态),taskTracker(对应的taskTracker),startTime(开始时间),finishTime(结束时间),outputSize(输出大小),phase(任务阶段,注意和运行状态做区分),counters(相关的计数器),includeCounters(是否包含成员变量counters)nextRecordRange(处理的记录范围)。
TaskStatus有两个子类,分别是MapTaskStatus(没有添加任何新的成员变量)和ReduceTaskStatus。ReduceTaskStatus是Reduce任务的状态,它包含了新信息shuffleFinishTime(shuffle结束时间)和sortFinishTime(sort结束时间)。同时,获取Map结果出错时,对应的Map的TaskAttemptID会保存在failedFetchTasks中,等待上报。
最后我们看一下辅助类Counters/Counters.Counter/Counters.Group,它们保存了MapReduce过程中的一些统计计数器,Counters.Counter记录了一个计数器的<名字,显示名,值>,Counters.Group将相关的Counters.Counter聚合成组,并引入组名,组显示名。
2009-05-24
Hadoop源代码分析(IFile)
Mapper的输出,在发送到Reducer前是存放在本地文件系统的,IFile提供了对Mapper输出的管理。我们已经知道,Mapper的输出是<Key,Value>对,IFile以记录<key-len, value-len, key, value>的形式存放了这些数据。为了保存键值对的边界,很自然IFile需要保存key-len和value-len。
和IFile相关的类图如下:
其中,文件流形式的输入和输出是由IFIleInputStream和IFIleOutputStream抽象。以记录形式的读/写操作由IFile.Reader/IFile.Writer提供,IFile.InMemoryReader用于读取存在于内存中的IFile文件格式数据。
我们以输出为例,来分析这部分的实现。首先是下图的和序列化反序列化相关的Serialization/Deserializer,这部分的code是在包org.apache.hadoop.io.serializer。序列化由Serializer抽象,通过Serializer的实现,用户可以利用serialize方法把对象序列化到通过open方法打开的输出流里。Deserializer提供的是相反的过程,对应的方法是deserialize。hadoop.io.serializer中还实现了配合工作的Serialization和对应的工厂SerializationFactory。两个具体的实现是WritableSerialization和JavaSerialization,分别对应了Writeble的序列化反序列化和Java本身带的序列化反序列化。
有了Serializer/Deserializer,我们来分析IFile.Writer。Writer的构造函数是:
public Writer(Configuration conf, FSDataOutputStream out,
Class<K> keyClass, Class<V> valueClass,
CompressionCodec codec, Counters.Counter writesCounter)
conf,配置参数,out是Writer的输出,keyClass 和valueClass 是输出的Kay,Value的class属性,codec是对输出进行压缩的方法,参数writesCounter用于对输出字节数进行统计的Counters.Counter。通过这些参数,我们可以构造我们使用的支持压缩功能的输出流(类成员out,类成员rawOut保存了构造函数传入的out),相关的计数器,还有就是Kay,Value的Serializer方法。
Writer最主要的方法是append方法(居然不是write方法,呵呵),有两种形式:
public void append(K key, V value) throws IOException {
public void append(DataInputBuffer key, DataInputBuffer value)
append(K key, V value)的主要过程是检查参数,然后将key和value序列化到DataOutputBuffer中,并获取序列化后的长度,最后把长度(2个)和DataOutputBuffer中的结果写到输出,并复位DataOutputBuffer和计数。append(DataInputBuffer key, DataInputBuffer value)处理过程也比较类似,就不再分析了。
close方法中需要注意的是,我们需要标记文件尾,或者是流结束。目前是通过写2个值为EOF_MARKER的长度来做标记。
IFileOutputStream是用于配合Writer的输出流,它会在IFiles的最后添加校验数据。当Writer调用IFileOutputStream的write操作时,IFileOutputStream计算并保持校验和,流被close的时候,校验结果会写到对应文件的文件尾。实际上存放在磁盘上的文件是一系列的<key-len, value-len, key, value>记录和校验结果。
Reader的相关过程,我们就不再分析了。
2009-05-24
Hadoop源代码分析(Task的内部类和辅助类)
从前面的图中,我们可以发现Task有很多内部类,并拥有大量类成员变量,这些类配合Task完成相关的工作,如下图。
MapOutputFile管理着Mapper的输出文件,它提供了一系列get方法,用于获取Mapper需要的各种文件,这些文件都存放在一个目录下面。
我们假设传入MapOutputFile的JobID为job_200707121733_0003,TaskID为task_200707121733_0003_m_000005。MapOutputFile的根为
{mapred.local.dir}/taskTracker/jobcache/{jobid}/{taskid}/output
在下面的讨论中,我们把上面的路径记为{MapOutputFileRoot}
以上面JogID和TaskID为例,我们有:
{mapred.local.dir}/taskTracker/jobcache/job_200707121733_0003/task_200707121733_0003_m_000005/output
需要注意的是,{mapred.local.dir}可以包含一系列的路径,那么,Hadoop会在这些根路径下找一个满足要求的目录,建立所需的文件。MapOutputFile的方法有两种,结尾带ForWrite和不带ForWrite,带ForWrite用于创建文件,它需要一个文件大小作为参数,用于检查磁盘空间。不带ForWrite用于获取以建立的文件。
getOutputFile:文件名为{MapOutputFileRoot}/file.out;
getOutputIndexFile:文件名为{MapOutputFileRoot}/file.out.index
getSpillFile:文件名为{MapOutputFileRoot}/spill{spillNumber}.out
getSpillIndexFile:文件名为{MapOutputFileRoot}/spill{spillNumber}.out.index
以上四个方法用于Task子类MapTask中;
getInputFile:文件名为{MapOutputFileRoot}/map_{mapId}.out
用于ReduceTask中。我们到使用到他们的地方再介绍相应的应用场景。
介绍完临时文件管理以后,我们来看Task.CombineOutputCollector,它继承自org.apache.hadoop.mapred.OutputCollector,很简单,只是一个OutputCollector到IFile.Writer的Adapter,活都让IFile.Writer干了。
ValuesIterator用于从RawKeyValueIterator(Key,Value都是DataInputBuffer,ValuesIterator要求该输入已经排序)中获取符合RawComparator<KEY> comparator的值的迭代器。它在Task中有一个简单子类,CombineValuesIterator。
Task.TaskReporter用于向JobTracker提交计数器报告和状态报告,它实现了计数器报告Reporter和状态报告StatusReporter。为了不影响主线程的工作,TaskReporter有一个独立的线程,该线程通过TaskUmbilicalProtocol接口,利用Hadoop的RPC机制,向JobTracker报告Task执行情况。
FileSystemStatisticUpdater用于记录对文件系统的对/写操作字节数,是个简单的工具类。
2009-05-25
Hadoop源代码分析(类Task)
有了前面的基础,我们可以来分析类Task了。Task是一个虚基类,它有两个子类:MapTask,ReduceTask,分别对应着Map和Reduce。先从成员变量开始:
首先是和作业任务相关的信息,包括jobFile,作业的配置文件;taskId,任务ID,从中可以获取作业ID;partition,Job内ID;taskStatus,任务状态。jobCleanup,jobSetup和taskCleanup是三个标志位。
接下来是一组和错误回复的变量。我们知道,如果在Task执行过程中出错,很有可能是因为输入有问题,一个常用的策略是在下一次回复性执行过程中,忽略这部分输入,skipRanges,skipping和writeSkipRecs就是用来控制这个行为的。
currentRecStartIndex和currentRecIndexIterator配合,可以得到当前的任务输入。
conf保存了当前任务的配置(JobConf形式),MapOutputFile上一部分已经介绍了,用于管理临时文件,跟它配合的是lDirAlloc,类型为LocalDirAllocator,是本地文件分配器。jobContext和taskContext保持了Job和Task的上下文。committer定制了和Task生命周期相关的一些特殊处理(也可以看出是上下文)。
最后一部分应该是输出outputFormat。
和统计/状态监视的成员变量分散在类的各处,如spilledRecordsCounter,taskProgress,counters等,我们就不再介绍了。
下面我们开始来分析Task的成员函数,首先是虚方法,Task包含了下面3个虚方法:
public abstract void run(JobConf job, TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException;
执行Task;
public abstract TaskRunner createRunner(TaskTracker tracker,
TaskTracker.TaskInProgress tip) throws IOException;
创建一个TaskRunner;
public abstract boolean isMapTask();
是否是一个Map任务。上面这3个方法自然是和MapTask,ReduceTask相关,也需要它们实现。
构造函数很简单,主要是初始化一些成员函数。initialize也用于初始化成员,它被Task的子类调用,用于子类传入一些子类中构造的对象。构造函数后面是一系列的setter和getter,还有实现Writable的write和readFields。
localizeConfiguration函数用于将一些和Task相关的信息存放到JobConf里,这也是Hadoop MapReduce中重要的参数传递方式。
接下来分析的是一系列和Task生命周期相关的函数。
public void done(TaskUmbilicalProtocol umbilical, TaskReporter reporter )
done被多个方法调用(下图),用于做结束任务的一些清理工作,步骤如下:
l 更新计数器updateCounters();
l 如果任务需要提交,设置Taks状态为COMMIT_PENDING,并利用TaskUmbilicalProtocol,汇报Task完成,等待提交;然后调用commit提交任务(下面分析)
l 设置任务结束标志位;结束Reporter通信线程;
l 发送最后一次统计报告(通过sendLastUpdate方法,很简单);
l 利用TaskUmbilicalProtocol报告结束状态(通过sendDone方法,很简单)。
commit方法被done方法调用,用于等待TaskTracker的可提交信号。通过这种机制,Task可以等待TaskTracker上需要的一些后续处理,比方说,把Task的结果取走,需要TaskTracker的协调和确认。commit还会调用org.apache.hadoop.mapreduce.OutputCommitter的commitTask方法,执行一些子类需要的commit事件处理。
runJobCleanupTask,runJobSetupTask和runTaskCleanupTask应用在Maptask和ReduceTask的run方法中,用于做一些准备和可能的清除任务。
runJobSetupTask:为建立Job做准备,执行状态设置,然后调用org.apache.hadoop.mapreduce.OutputCommitter的setupJob,最后通过done,通知TaskTracker任务完成。
runJobCleanupTask:清理Job,包括步骤状态设置,更新状态到TaskTracker,调用org.apache.hadoop.mapreduce.OutputCommitter的相关方法,通过done,通知TaskTracker任务完成。
runTaskCleanupTask:清理Task任务,和runJobCleanupTask类似。
应该说,这些方法只是提供了一个通用的框架,具体需要的执行,在于org.apache.hadoop.mapreduce.OutputCommitter的具体实现。
2009-05-29
Hadoop源代码分析(MapTask)
接下来我们来分析Task的两个子类,MapTask和ReduceTask。MapTask的相关类图如下:
MapTask其实不是很复杂,复杂的是支持MapTask工作的一些辅助类。MapTask的成员变量少,只有split和splitClass。我们知道,Map的输入是split,是原始数据的一个切分,这个切分由org.apache.hadoop.mapred.InputSplit的子类具体描述(前面我们是通过org.apache.hadoop.mapreduce.InputSplit介绍了InputSplit,它们对外的API是一样的)。splitClass是InputSplit子类的类名,通过它,我们可以利用Java的反射机制,创建出InputSplit子类。而split是一个BytesWritable,它是InputSplit子类串行化以后的结果,再通过InputSplit子类的readFields方法,我们可以回复出对应的InputSplit对象。
MapTask最重要的方法是run。run方法相当简单,配置完系统的TaskReporter后,就根据情况执行runJobCleanupTask,runJobSetupTask,runTaskCleanupTask或执行Mapper。由于MapReduce现在有两套API,MapTask需要支持这两套API,使得MapTask执行Mapper分为runNewMapper和runOldMapper,run*Mapper后,MapTask会调用父类的done方法。
接下来我们来分析runOldMapper,最开始部分是构造Mapper处理的InputSplit,更新Task的配置,然后就开始创建Mapper的RecordReader,rawIn是原始输入,然后分正常(使用TrackedRecordReader,后面讨论)和跳过部分记录(使用SkippingRecordReader,后面讨论)两种情况,构造对应的真正输入in。
跳过部分记录是Map的一种出错恢复策略,我们知道,MapReduce处理的数据集合非常大,而有些任务对一部分出错的数据不进行处理,对结果的影响很小(如大数据集合的一些统计量),那么,一小部分的数据出错导致已处理的大量结果无效,是得不偿失的,跳过这部分记录,成了Mapper的一种选择。
Mapper的输出,是通过MapOutputCollector进行的,也分两种情况,如果没有Reducer,那么,用DirectMapOutputCollector(后面讨论),否则,用MapOutputBuffer(后面讨论)。
构造完Mapper的输入输出,通过构造配置文件中配置的MapRunnable,就可以执行Mapper了。目前系统有两个MapRunnable:MapRunner和MultithreadedMapRunner,如下图。
原有API在这块的处理上和新API有很大的不一样。接口MapRunnable是原有API中Mapper的执行器,run方法就是用于执行用户的Mapper。MapRunner是单线程执行器,相当简单,首先,当MapTask调用:
MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
MapRunner的configure会在newInstance的最后被调用,configure执行的过程中,对应的Mapper会通过反射机制构造出来。
MapRunner的run方法,会先创建对应的key,value对象,然后,对InputSplit的每一对<key,value>,调用Mapper的map方法,循环结束后,Mapper对应的清理方法会被调用。我们需要注意,key,value对象在run方法中是被重复使用的,就是说,每次传入Mapper的map方法的key,value都是同一个对象,只不过是里面的内容变了,对象并没有变。如果你需要保留key,value的内容,需要实现clone机制,克隆出对象的一个新备份。
相对于新API的多线程执行器,老API的MultithreadedMapRunner就比较复杂了,总体来说,就是通过阻塞队列配合Java的多线程执行器,将<key,value>分发到多个线程中去处理。需要注意的是,在这个过程中,这些线程共享一个Mapper实例,如果Mapper有共享的资源,需要有一定的保护机制。
runNewMapper用于执行新版本的Mapper,比runOldMapper稍微复杂,我们就不再讨论了。
2009-06-03
Hadoop源代码分析(MapTask辅助类 I)
MapTask的辅助类主要针对Mapper的输入和输出。首先我们来看MapTask中用的的Mapper输入,在类图中,这部分位于右上角。
MapTask.TrackedRecordReader是一个Wrapper,在原有输入RecordReader的基础上,添加了收集上报统计数据的功能。
MapTask.SkippingRecordReader也是一个Wrapper,它在MapTask.TrackedRecordReader的基础上,添加了忽略部分输入的功能。在分析MapTask.SkippingRecordReader之前,我们先看一下类SortedRanges和它相关的类。
类SortedRanges.Ranges表示了一个范围,以开始位置和范围长度(这样的话就可以表示长度为0的范围)来表示一个范围,并提供了一系列的范围操作方法。注意,方法getEndIndex得到的右端点并不包含在范围内(应理解为开区间)。SortedRanges包含了一系列不重叠的范围,为了保证包含的范围不重叠,在add方法和remove方法上需要做一些处理,保证不重叠的约束。SkipRangeIterator是访问SortedRanges包含的Ranges的迭代器。
MapTask.SkippingRecordReader的实现很简单,因为要忽略的输入都保持在SortedRanges.Ranges,只需要在next方法中,判断目前范围时候落在SortedRanges.Ranges中,如果是,忽略,并将忽略的记录写文件(可配置)
NewTrackingRecordReader和NewOutputCollector被新API使用,我们不分析。
MapTask的输出辅助类都继承自MapOutputCollector,它只是在OutputCollector的基础上添加了close和flush方法。
DirectMapOutputCollector用在Reducer的数目为0,就是不需要Reduce阶段的时候。它是直接通过
out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
得到对应的RecordWriter,collect直接到RecordWriter上。
如果Mapper后续有reduce任务,系统会使用MapOutputBuffer做为输出,这是个比较复杂的类,有1k行左右的代码。
我们知道,Mapper是通过OutputCollector将Map的结果输出,输出的量很大,Hadoop的机制是通过一个circle buffer 收集Mapper的输出, 到了io.sort.mb * percent量的时候,就spill到disk,如下图。图中出现了两个数组和一个缓冲区,kvindices保持了记录所属的(Reduce)分区,key在缓冲区开始的位置和value在缓冲区开始的位置,通过kvindices,我们可以在缓冲区中找到对应的记录。kvoffets用于在缓冲区满的时候对kvindices的partition进行排序,排完序的结果将输出到输出到本地磁盘上,其中索引(kvindices)保持在spill{spill号}.out.index中,数据保存在spill{spill号}.out中。
当Mapper任务结束后,有可能会出现多个spill文件,这些文件会做一个归并排序,形成Mapper的一个输出(spill.out和spill.out.index),如下图:
这个输出是按partition排序的,这样的话,Mapper的输出被分段,Reducer要获取的就是spill.out中的一段。(注意,内存和硬盘上的索引结构不一样)
(感谢彭帅的Hadoop Map Stage流程分析 http://www.cnblogs.com/OnlyXP/archive/2009/05/25/1488811.html)
2009-06-04
Hadoop源代码分析(MapTask辅助类,II)
有了上面Mapper输出的内存存储结构和硬盘存储结构讨论,我们来仔细分析MapOutputBuffer的流程。
首先是成员变量。最先初始化的是作业配置job和统计功能reporter。通过配置,MapOutputBuffer可以获取本地文件系统(localFs和rfs),Reducer的数目和Partitioner。
SpillRecord是文件spill.out{spill号}.index在内存中的对应抽象(内存数据和文件数据就差最后的校验和),该文件保持了一系列的IndexRecord,如下图:
IndexRecord有3个字段,分别是startOffset:记录偏移量,rawLength:初始长度,partLength:实际长度(可能有压缩)。SpillRecord保持了一系列的IndexRecord,并提供方法用于添加记录(没有删除记录的操作,因为不需要),获取记录,写文件,读文件(通过构造函数)。
接下来是一些和输出缓存区kvbuffer,缓存区记录索引kvindices和缓存区记录索引排序工作数组kvoffsets相关的处理,下面的图有助于说明这段代码。
这部分依赖于3个配置参数,io.sort.spill.percent是kvbuffer,kvindices和kvoffsets的总大小(以M为单位,缺省是100,就是100M,这一部分是MapOutputBuffer中占用存储最多的)。io.sort.record.percent是kvindices和kvoffsets占用的空间比例(缺省是0.05)。前面的分析我们已经知道kvindices和kvoffsets,如果记录数是N的话,它占用的空间是4N*4bytes,根据这个关系和io.sort.record.percent的值,我们可以计算出kvindices和kvoffsets最多能有多少个记录,并分配相应的空间。参数io.sort.spill.percent指示当输出缓冲区或kvindices和kvoffsets记录数量到达对应的占用率的时候,会启动spill,将内存缓冲区的记录存放到硬盘上,softBufferLimit和softRecordLimit为对应的字节数。
值对<key, value>输出到缓冲区是通过Serializer串行化的,这部分的初始化跟在上面输出缓存后面。接下来是一些计数器和可能的数据压缩处理器的初始化,可能的Combiner和combiner工作的一些配置。
最后是启动spillThread,该Thread会检查内存中的输出缓存区,在满足一定条件的时候将缓冲区中的内容spill到硬盘上。这是一个标准的生产者-消费者模型,MapTask的collect方法是生产者,spillThread是消费者,它们之间同步是通过spillLock(ReentrantLock)和spillLock上的两个条件变量(spillDone和spillReady)完成的。
先看生产者,MapOutputBuffer.collect的主要流程是:
l 报告进度和参数检测(<K, V>符合Mapper的输出约定);
l spillLock.lock(),进入临界区;
l 如果达到spill条件,设置变量并通过spillReady.signal(),通知spillThread;并等待spill结束(通过spillDone.await()等待);
l spillLock.unlock();
l 输出key,value并更新kvindices和kvoffsets(注意,方法collect是synchronized,key和value各自输出,它们也会占用连续的输出缓冲区);
kvstart,kvend和kvindex三个变量在判断是否需要spill和spill是否结束的过程中很重要,kvstart是有效记录开始的下标,kvindex是下一个可做记录的位置,kvend的作用比较特殊,它在一般情况下kvstart==kvend,但开始spill的时候它会被赋值为kvindex的值,spill结束时,它的值会被赋给kvstart,这时候kvstart==kvend。这就是说,如果kvstart不等于kvend,系统正在spill,否则,kvstart==kvend,系统处于普通工作状态。其实在代码中,我们可以看到很多kvstart==kvend的判断。
下面我们分情况,讨论kvstart,kvend和kvindex的配合。初始化的时候,它们都被赋值0。
下图给出了一个没有spill的记录添加过程:
注意kvindex和kvnext的关系,取模实现了循环缓冲区
如果在添加记录的过程中,出现spill(多种条件),那么,主要的过程如下:
首先还是计算kvnext,主要,这个时候kvend==kvstart(图中没有画出来)。如果spill条件满足,那么,kvindex的值会赋给kvend(这是kvend不等于kvstart),从kvstart和kvend的大小关系,我们可以知道记录位于数组的那一部分(左边是kvstart<kvend的情况,右边是另外的情况)。Spill结束的时候,kvend值会被赋给kvstart, kvend==kvstart又重新满足,同时,我们可以发现kvindex在这个过程中没有变化,新的记录还是写在kvindex指向的位置,然后,kvindex=kvnect,kvindex移到下一个可用位置。
大家体会一下上面的过程,特别是kvstart,kvend和kvindex的配合,其实,<key,value>对输出使用的缓冲区,也有类似的过程。
Collect在处理<key,value>输出时,会处理一个MapBufferTooSmallException,这是value的串行化结果太大,不能一次放入缓冲区的指示,这种情况下我们需要调用spillSingleRecord,特殊处理。
2009-06-05
Hadoop源代码分析(MapTask辅助类,III)
接下来讨论的是key,value的输出,这部分比较复杂,不过有了前面kvstart,kvend和kvindex配合的分析,有利于我们理解这部分的代码。
输出缓冲区中,和kvstart,kvend和kvindex对应的是bufstart,bufend和bufmark。这部分还涉及到变量bufvoid,用于表明实际使用的缓冲区结尾(见后面BlockingBuffer.reset分析),和变量bufmark,用于标记记录的结尾。这部分代码需要bufmark,是因为key或value的输出是变长的,(前面元信息记录大小是常量,就不需要这样的变量)。
最好的情况是缓冲区没有翻转和value串行化结果很小,如下图:
先对key串行化,然后对value做串行化,临时变量keystart,valstart和valend分别记录了key结果的开始位置,value结果的开始位置和value结果的结束位置。
串行化过程中,往缓冲区写是最终调用了Buffer.write方法,我们后面再分析。
如果key串行化后出现bufindex < keystart,那么会调用BlockingBuffer的reset方法。原因是在spill的过程中需要对<key,value>排序,这种情况下,传递给RawComparator的必须是连续的二进制缓冲区,通过BlockingBuffer.reset方法,解决这个问题。下图解释了如何解决这个问题:
当发现key的串行化结果出现不连续的情况时,我们会把bufvoid设置为bufmark,见缓冲区开始部分往后挪,然后将原来位于bufmark到bufvoid出的结果,拷到缓冲区开始处,这样的话,key串行化的结果就连续存放在缓冲区的最开始处。
上面的调整有一个条件,就是bufstart前面的缓冲区能够放下整个key串行化的结果,如果不能,处理的方式是将bufindex置0,然后调用BlockingBuffer内部的out的write方法直接输出,这实际调用了Buffer.write方法,会启动spill过程,最终我们会成功写入key串行化的结果。
下面我们看write方法。key,value串行化过程中,往缓冲区写数据是最终调用了Buffer.write方法,又是一个复杂的方法。
l do-while循环,直到我们有足够的空间可以写数据(包括缓冲区和kvindices和kvoffsets)
u 首先我们计算缓冲区连续写是否写满标志buffull和缓冲区非连续情况下有足够写空间标志wrap(这个实在拗口),见下面的讨论;条件(buffull && !wrap)用于判断目前有没有足够的写空间;
u 在spill没启动的情况下(kvstart == kvend),分两种情况,如果数组中有记录(kvend != kvindex),那么,根据需要(目前输出空间不足或记录数达到spill条件)启动spill过程;否则,如果空间还是不够(buffull && !wrap),表明这个记录非常大,以至于我们的内存缓冲区不能容下这么大的数据量,抛MapBufferTooSmallException异常;
u 如果空间不足同时spill在运行,等待spillDone;
l 写数据,注意,如果buffull,则写数据会不连续,则写满剩余缓冲区,然后设置bufindex=0,并从bufindex处接着写。否则,就是从bufindex处开始写。
下图给出了缓冲区连续写是否写满标志buffull和缓冲区非连续情况下有足够写空间标志wrap计算的几种可能:
情况1和情况2中,buffull判断为从bufindex到bufvoid是否有足够的空间容纳写的内容,wrap是图中白颜色部分的空间是否比输入大,如果是,wrap为true;情况3和情况4中,buffull判断bufindex到bufstart的空间是否满足条件,而wrap肯定是false。明显,条件(buffull && !wrap)满足时,目前的空间不够一次写。
接下来我们来看spillSingleRecord,只是用于写放不进内存缓冲区的<key,value>对。过程很流水,首先是创建SpillRecord记录,输出文件和IndexRecord记录,然后循环,构造SpillRecord并在恰当的时候输出记录(如下图),最后输出spill{n}.index文件。
前面我们提过spillThread,在这个系统中它是消费者,这个消费者相当简单,需要spill时调用函数sortAndSpill,进行spill。sortAndSpill和spillSingleRecord类似,函数的开始也是创建SpillRecord记录,输出文件和IndexRecord记录,然后,需要在kvoffsets上做排序,排完序后顺序访问kvoffsets,也就是按partition顺序访问记录。
按partition循环处理排完序的数组,如果没有combiner,则直接输出记录,否则,调用combineAndSpill,先做combin然后输出。循环的最后记录IndexRecord到SpillRecord。
sortAndSpill最后是输出spill{n}.index文件。
combineAndSpill比价简单,我们就不分析了。
BlockingBuffer中最后要分析的方法是flush方法。调用flush方法,意味着Mapper的结果都已经collect了,需要对缓冲区做一些最后的清理,并合并spill{n}文件产生最后的输出。
缓冲区处理部分很简单,先等待可能的spill过程完成,然后判断缓冲区是否为空,如果不是,则调用sortAndSpill,做最后的spill,然后结束spill线程。
flush合并spill{n}文件是通过mergeParts方法。如果Mapper最后只有一个spill{n}文件,简单修改该文件的文件名就可以。如果Mapper没有任何输出,那么我们需要创建哑输出(dummy files)。如果spill{n}文件多于1个,那么按partition循环处理所有文件,将处于处理partition的记录输出。处理partition的过程中可能还会再次调用combineAndSpill,最记录再做一次combination,其中还涉及到工具类Merger,我们就不再深入研究了。
Hadoop源代码分析(MapReduce概论)
大家都熟悉文件系统,在对HDFS进行分析前,我们并没有花很多的时间去介绍HDFS的背景,毕竟大家对文件系统的还是有一定的理解的,而且也有很好的文档。在分析Hadoop的MapReduce部分前,我们还是先了解系统是如何工作的,然后再进入我们的分析部分。下面的图来自http://horicky.blogspot.com/2008/11/hadoop-mapreduce-implementation.html,是我看到的讲MapReduce最好的图。
以Hadoop带的wordcount为例子(下面是启动行):
hadoop jar hadoop-0.19.0-examples.jar wordcount /usr/input /usr/output
用户提交一个任务以后,该任务由JobTracker协调,先执行Map阶段(图中M1,M2和M3),然后执行Reduce阶段(图中R1和R2)。Map阶段和Reduce阶段动作都受TaskTracker监控,并运行在独立于TaskTracker的Java虚拟机中。
我们的输入和输出都是HDFS上的目录(如上图所示)。输入由InputFormat接口描述,它的实现如ASCII文件,JDBC数据库等,分别处理对于的数据源,并提供了数据的一些特征。通过InputFormat实现,可以获取InputSplit接口的实现,这个实现用于对数据进行划分(图中的splite1到splite5,就是划分以后的结果),同时从InputFormat也可以获取RecordReader接口的实现,并从输入中生成<k,v>对。有了<k,v>,就可以开始做map操作了。
map操作通过context.collect(最终通过OutputCollector. collect)将结果写到context中。当Mapper的输出被收集后,它们会被Partitioner类以指定的方式区分地写出到输出文件里。我们可以为Mapper提供Combiner,在Mapper输出它的<k,v>时,键值对不会被马上写到输出里,他们会被收集在list里(一个key值一个list),当写入一定数量的键值对时,这部分缓冲会被Combiner中进行合并,然后再输出到Partitioner中(图中M1的黄颜色部分对应着Combiner和Partitioner)。
Map的动作做完以后,进入Reduce阶段。这个阶段分3个步骤:混洗(Shuffle),排序(sort)和reduce。
混洗阶段,Hadoop的MapReduce框架会根据Map结果中的key,将相关的结果传输到某一个Reducer上(多个Mapper产生的同一个key的中间结果分布在不同的机器上,这一步结束后,他们传输都到了处理这个key的Reducer的机器上)。这个步骤中的文件传输使用了HTTP协议。
排序和混洗是一块进行的,这个阶段将来自不同Mapper具有相同key值的<key,value>对合并到一起。
Reduce阶段,上面通过Shuffle和sort后得到的<key, (list of values)>会送到Reducer. reduce方法中处理,输出的结果通过OutputFormat,输出到DFS中。
2009-02-25
Hadoop源代码分析(包org.apache.hadoop.mapreduce)
有了前一节的分析,我们来看一下具体的接口,它们都处于包org.apache.hadoop.mapreduce中。
上面的图中,类可以分为4种。右上角的是从Writeable继承的,和Counter(还有CounterGroup和Counters,也在这个包中,并没有出现在上面的图里)和ID相关的类,它们保持MapReduce过程中需要的一些计数器和标识;中间大部分是和Context相关的*Context类,它为Mapper和Reducer提供了相关的上下文;关于Map和Reduce,对应的类是Mapper,Reducer和描述他们的Job(在Hadoop 中一次计算任务称之为一个job,下面的分析中,中文为“作业”,相应的task我们称为“任务”);图中其他类是配合Mapper和Reduce工作的一些辅助类。
如果你熟悉HTTPServlet, 那就能很轻松地理解Hadoop采用的结构,把整个Hadoop看作是容器,那么Mapper和Reduce就是容器里的组件,*Context保存了组件的一些配置信息,同时也是和容器通信的机制。
和ID相关的类我们就不再讨论了。我们先看JobContext,它位于*Context继承树的最上方,为Job提供一些只读的信息,如Job的ID,名称等。下面的信息是MapReduce过程中一些较关键的定制信息:
(来自http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop2/index.html):
参数 | 作用 | 缺省值 | 其它实现 |
InputFormat | 将输入的数据集切割成小数据集 InputSplits, 每一个 InputSplit 将由一个 Mapper 负责处理。此外 InputFormat 中还提供一个 RecordReader 的实现, 将一个 InputSplit 解析成 <key,value> 对提供给 map 函数。 | TextInputFormat (针对文本文件,按行将文本文件切割成 InputSplits, 并用 LineRecordReader 将 InputSplit 解析成 <key,value> 对,key 是行在文件中的位置,value 是文件中的一行) | SequenceFileInputFormat |
OutputFormat | 提供一个 RecordWriter 的实现,负责输出最终结果 | TextOutputFormat (用 LineRecordWriter 将最终结果写成纯文件文件,每个 <key,value> 对一行,key 和 value 之间用 tab 分隔) | SequenceFileOutputFormat |
OutputKeyClass | 输出的最终结果中 key 的类型 | LongWritable | |
OutputValueClass | 输出的最终结果中 value 的类型 | Text | |
MapperClass | Mapper 类,实现 map 函数,完成输入的 <key,value> 到中间结果的映射 | IdentityMapper (将输入的 <key,value> 原封不动的输出为中间结果) | LongSumReducer, LogRegexMapper, InverseMapper |
CombinerClass | 实现 combine 函数,将中间结果中的重复 key 做合并 | null (不对中间结果中的重复 key 做合并) | |
ReducerClass | Reducer 类,实现 reduce 函数,对中间结果做合并,形成最终结果 | IdentityReducer (将中间结果直接输出为最终结果) | AccumulatingReducer, LongSumReducer |
InputPath | 设定 job 的输入目录, job 运行时会处理输入目录下的所有文件 | null | |
OutputPath | 设定 job 的输出目录,job 的最终结果会写入输出目录下 | null | |
MapOutputKeyClass | 设定 map 函数输出的中间结果中 key 的类型 | 如果用户没有设定的话,使用 OutputKeyClass | |
MapOutputValueClass | 设定 map 函数输出的中间结果中 value 的类型 | 如果用户没有设定的话,使用 OutputValuesClass | |
OutputKeyComparator | 对结果中的 key 进行排序时的使用的比较器 | WritableComparable | |
PartitionerClass | 对中间结果的 key 排序后,用此 Partition 函数将其划分为R份,每份由一个 Reducer 负责处理。 | HashPartitioner (使用 Hash 函数做 partition) | KeyFieldBasedPartitioner PipesPartitioner |
l mapProgress:map的进度(0—1.0);
l reduceProgress:reduce的进度(0—1.0);
l isComplete:作业是否已经完成;
l isSuccessful:作业是否成功;
l killJob:结束一个在运行中的作业;
l getTaskCompletionEvents:得到任务完成的应答(成功/失败);
l killTask:结束某一个任务;
2009-02-25
Hadoop源代码分析(包mapreduce.lib.input)
接下来我们按照MapReduce过程中数据流动的顺序,来分解org.apache.hadoop.mapreduce.lib.*的相关内容,并介绍对应的基类的功能。首先是input部分,它实现了MapReduce的数据输入部分。类图如下:
类图的右上角是InputFormat,它描述了一个MapReduce Job的输入,通过InputFormat,Hadoop可以:
l 检查MapReduce输入数据的正确性;
l 将输入数据切分为逻辑块InputSplit,这些块会分配给Mapper;
l 提供一个RecordReader实现,Mapper用该实现从InputSplit中读取输入的<K,V>对。
在org.apache.hadoop.mapreduce.lib.input中,Hadoop为所有基于文件的InputFormat提供了一个虚基类FileInputFormat。下面几个参数可以用于配置FileInputFormat:
l mapred.input.pathFilter.class:输入文件过滤器,通过过滤器的文件才会加入InputFormat;
l mapred.min.split.size:最小的划分大小;
l mapred.max.split.size:最大的划分大小;
l mapred.input.dir:输入路径,用逗号做分割。
类中比较重要的方法有:
protected List<FileStatus> listStatus(Configuration job)
递归获取输入数据目录中的所有文件(包括文件信息),输入的job是系统运行的配置Configuration,包含了上面我们提到的参数。
public List<InputSplit> getSplits(JobContext context)
将输入划分为InputSplit,包含两个循环,第一个循环处理所有的文件,对于每一个文件,根据输入的划分最大/最小值,循环得到文件上的划分。注意,划分不会跨越文件。
FileInputFormat没有实现InputFormat的createRecordReader方法。
FileInputFormat有两个子类,SequenceFileInputFormat是Hadoop定义的一种二进制形式存放的键/值文件(参考http://hadoop.apache.org/core/docs/current/api/org/apache/hadoop/io/SequenceFile.html),它有自己定义的文件布局。由于它有特殊的扩展名,所以SequenceFileInputFormat重载了listStatus,同时,它实现了createRecordReader,返回一个SequenceFileRecordReader对象。TextInputFormat处理的是文本文件,createRecordReader返回的是LineRecordReader的实例。这两个类都没有重载FileInputFormat的getSplits方法,那么,在他们对于的RecordReader中,必须考虑FileInputFormat对输入的划分方式。
FileInputFormat的getSplits,返回的是FileSplit。这是一个很简单的类,包含的属性(文件名,起始偏移量,划分的长度和可能的目标机器)已经足以说明这个类的功能。
RecordReader用于在划分中读取<Key,Value>对。RecordReader有五个虚方法,分别是:
l initialize:初始化,输入参数包括该Reader工作的数据划分InputSplit和Job的上下文context;
l nextKey:得到输入的下一个Key,如果数据划分已经没有新的记录,返回空;
l nextValue:得到Key对应的Value,必须在调用nextKey后调用;
l getProgress:得到现在的进度;
l close,来自java.io的Closeable接口,用于清理RecordReader。
我们以LineRecordReader为例,来分析RecordReader的构成。前面我们已经分析过FileInputFormat对文件的划分了,划分完的Split包括了文件名,起始偏移量,划分的长度。由于文件是文本文件,LineRecordReader的初始化方法initialize会创建一个基于行的读取对象LineReader(定义在org.apache.hadoop.util中,我们就不分析啦),然后跳过输入的最开始的部分(只在Split的起始偏移量不为0的情况下进行,这时最开始的部分可能是上一个Split的最后一行的一部分)。nextKey的处理很简单,它使用当前的偏移量作为Key,nextValue当然就是偏移量开始的那一行了(如果行很长,可能出现截断)。进度getProgress和close都很简单。
2009-02-25
Hadoop源代码分析(包mapreduce.lib.map)
Hadoop的MapReduce框架中,Map动作通过Mapper类来抽象。一般来说,我们会实现自己特殊的Mapper,并注册到系统中,执行时,我们的Mapper会被MapReduce框架调用。Mapper类很简单,包括一个内部类和四个方法,静态结构图如下:
内部类Context继承自MapContext,并没有引入任何新的方法。
Mapper的四个方法是setup,map,cleanup和run。其中,setup和cleanup用于管理Mapper生命周期中的资源,setup在完成Mapper构造,即将开始执行map动作前调用,cleanup则在所有的map动作完成后被调用。方法map用于对一次输入的key/value对进行map动作。run方法执行了上面描述的过程,它调用setup,让后迭代所有的key/value对,进行map,最后调用cleanup。
org.apache.hadoop.mapreduce.lib.map中实现了Mapper的三个子类,分别是InverseMapper(将输入<key, value> map为输出<value, key>),MultithreadedMapper(多线程执行map方法)和TokenCounterMapper(对输入的value分解为token并计数)。其中最复杂的是MultithreadedMapper,我们就以它为例,来分析Mapper的实现。
MultithreadedMapper会启动多个线程执行另一个Mapper的map方法,它会启动mapred.map.multithreadedrunner.threads(配置项)个线程执行Mapper:mapred.map.multithreadedrunner.class(配置项)。MultithreadedMapper重写了基类Mapper的run方法,启动N个线程(对应的类为MapRunner)执行mapred.map.multithreadedrunner.class(我们称为目标Mapper)的run方法(就是说,目标Mapper的setup和cleanup会被执行多次)。目标Mapper共享同一份InputSplit,这就意味着,对InputSplit的数据读必须线程安全。为此,MultithreadedMapper引入了内部类SubMapRecordReader,SubMapRecordWriter,SubMapStatusReporter,分别继承自RecordReader,RecordWriter和StatusReporter,它们通过互斥访问MultithreadedMapper的Mapper.Context,实现了对同一份InputSplit的线程安全访问,为Mapper提供所需的Context。这些类的实现方法都很简单。
2009-02-26
Hadoop源代码分析(mapreduce.lib.partition/reduce/output)
Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出,下面我们就来分析参与这个过程的类。
Mapper的结果,可能送到可能的Combiner做合并,Combiner在系统中并没有自己的基类,而是用Reducer作为Combiner的基类,他们对外的功能是一样的,只是使用的位置和使用时的上下文不太一样而已。
Mapper最终处理的结果对<key, value>,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer那,哪个key到哪个Reducer的分配过程,是由Partitioner规定的,它只有一个方法,输入是Map的结果对<key, value>和Reducer的数目,输出则是分配的Reducer(整数编号)。系统缺省的Partitioner是HashPartitioner,它以key的Hash值对Reducer的数目取模,得到对应的Reducer。
Reducer是所有用户定制Reducer类的基类,和Mapper类似,它也有setup,reduce,cleanup和run方法,其中setup和cleanup含义和Mapper相同,reduce是真正合并Mapper结果的地方,它的输入是key和这个key对应的所有value的一个迭代器,同时还包括Reducer的上下文。系统中定义了两个非常简单的Reducer,IntSumReducer和LongSumReducer,分别用于对整形/长整型的value求和。
Reduce的结果,通过Reducer.Context的方法collect输出到文件中,和输入类似,Hadoop引入了OutputFormat。OutputFormat依赖两个辅助接口:RecordWriter和OutputCommitter,来处理输出。RecordWriter提供了write方法,用于输出<key, value>和close方法,用于关闭对应的输出。OutputCommitter提供了一系列方法,用户通过实现这些方法,可以定制OutputFormat生存期某些阶段需要的特殊操作。我们在TaskInputOutputContext中讨论过这些方法(明显,TaskInputOutputContext是OutputFormat和Reducer间的桥梁)。
OutputFormat和RecordWriter分别对应着InputFormat和RecordReader,系统提供了空输出NullOutputFormat(什么结果都不输出,NullOutputFormat.RecordWriter只是示例,系统中没有定义),LazyOutputFormat(没在类图中出现,不分析),FilterOutputFormat(不分析)和基于文件FileOutputFormat的SequenceFileOutputFormat和TextOutputFormat输出。
基于文件的输出FileOutputFormat利用了一些配置项配合工作,包括mapred.output.compress:是否压缩;mapred.output.compression.codec:压缩方法;mapred.output.dir:输出路径;mapred.work.output.dir:输出工作路径。FileOutputFormat还依赖于FileOutputCommitter,通过FileOutputCommitter提供一些和Job,Task相关的临时文件管理功能。如FileOutputCommitter的setupJob,会在输出路径下创建一个名为_temporary的临时目录,cleanupJob则会删除这个目录。
SequenceFileOutputFormat输出和TextOutputFormat输出分别对应输入的SequenceFileInputFormat和TextInputFormat,我们就不再详细分析啦。
2009-03-06
Hadoop源代码分析(包hadoop.mapred中的MapReduce接口)
前面已经完成了对org.apache.hadoop.mapreduce的分析,这个包提供了Hadoop MapReduce部分的应用API,用于用户实现自己的MapReduce应用。但这些接口是给未来的MapReduce应用的,目前MapReduce框架还是使用老系统(参考补丁HADOOP-1230)。下面我们来分析org.apache.hadoop.mapred,首先还是从mapred的MapReduce框架开始分析,下面的类图(灰色部分为标记为@Deprecated的类/接口):
我们把包mapreduce的类图附在下面,对比一下,我们就会发现,org.apache.hadoop.mapred中的MapReduce API相对来说很简单,主要是少了和Context相关的类,那么,好多在mapreduce中通过context来完成的工作,就需要通过参数来传递,如Map中的输出,老版本是:
output.collect(key, result); // output’s type is: OutputCollector
新版本是:
context.write(key, result); // output’s type is: Context
它们分别使用OutputCollector和Mapper.Context来输出map的结果,显然,原有OutputCollector的新API中就不再需要。总体来说,老版本的API比较简单,MapReduce过程中关键的对象都有,但可扩展性不是很强。同时,老版中提供的辅助类也很多,我们前面分析的FileOutputFormat,也有对应的实现,我们就不再讨论了。
2009-03-10
Hadoop源代码分析(*IDs类和*Context类)
我们开始来分析Hadoop MapReduce的内部的运行机制。用户向Hadoop提交Job(作业),作业在JobTracker对象的控制下执行。Job被分解成为Task(任务),分发到集群中,在TaskTracker的控制下运行。Task包括MapTask和ReduceTask,是MapReduce的Map操作和Reduce操作执行的地方。这中任务分布的方法比较类似于HDFS中NameNode和DataNode的分工,NameNode对应的是JobTracker,DataNode对应的是TaskTracker。JobTracker,TaskTracker和MapReduce的客户端通过RPC通信,具体可以参考HDFS部分的分析。
我们先来分析一些辅助类,首先是和ID有关的类,ID的继承树如下:
这张图可以看出现在Hadoop的org.apache.hadoop.mapred向org.apache.hadoop.mapreduce迁移带来的一些问题,其中灰色是标注为@Deprecated的。ID携带一个整型,实现了WritableComparable接口,这表明它可以比较,而且可以被Hadoop的io机制串行化/解串行化(必须实现compareTo/readFields/write方法)。JobID是系统分配给作业的唯一标识符,它的toString结果是job_<jobtrackerID>_<jobNumber>。例子:job_200707121733_0003表明这是jobtracker 200707121733(利用jobtracker的开始时间作为ID)的第3号作业。
作业分成任务执行,任务号TaskID包含了它所属的作业ID,同时也有任务ID,同时还保持了这是否是一个Map任务(成员变量isMap)。任务号的字符串表示为task_<jobtrackerID>_<jobNumber>_[m|r]_<taskNumber>,如task_200707121733_0003_m_000005表示作业200707121733_0003的000005号任务,改任务是一个Map任务。
一个任务有可能有多个执行(错误恢复/消除Stragglers等),所以必须区分任务的多个执行,这是通过类TaskAttemptID来完成,它在任务号的基础上添加了尝试号。一个任务尝试号的例子是attempt_200707121733_0003_m_000005_0,它是任务task_200707121733_0003_m_000005的第0号尝试。
JVMId用于管理任务执行过程中的Java虚拟机,我们后面再讨论。
为了使Job和Task工作,Hadoop提供了一系列的上下文,这些上下文保存了Job和Task工作的信息。
处于继承树的最上方是org.apache.hadoop.mapreduce.JobContext,前面我们已经介绍过了,它提供了Job的一些只读属性,两个成员变量,一个保存了JobID,另一个类型为JobConf,JobContext中除了JobID外,其它的信息都保持在JobConf中。它定义了如下配置项:
l mapreduce.inputformat.class:InputFormat的实现
l mapreduce.map.class:Mapper的实现
l mapreduce.combine.class: Reducer的实现
l mapreduce.reduce.class:Reducer的实现
l mapreduce.outputformat.class: OutputFormat的实现
l mapreduce.partitioner.class: Partitioner的实现
同时,它提供方法,使得通过类名,利用Java反射提供的Class.forName方法,获得类对应的Class。org.apache.hadoop.mapred的JobContext对象比org.apache.hadoop.mapreduce.JobContext多了成员变量progress,用于获取进度信息,它类型为JobConf成员job指向mapreduce.JobContext对应的成员,没有添加任何新功能。
JobConf继承自Configuration,保持了MapReduce执行需要的一些配置信息,它管理着46个配置参数,包括上面mapreduce配置项对应的老版本形式,如mapreduce.map.class 对应mapred.mapper.class。这些配置项我们在使用到它们的时候再介绍。
org.apache.hadoop.mapreduce.JobContext的子类Job前面也已经介绍了,后面在讨论系统的动态行为时,再回来看它。
TaskAttemptContext用于任务的执行,它引入了标识任务执行的TaskAttemptID和任务状态status,并提供新的访问接口。org.apache.hadoop.mapred的TaskAttemptContext继承自mapreduce的对应版本,只是增加了记录进度的progress。
TaskInputOutputContext和它的子类都在包org.apache.hadoop.mapreduce中,前面已经分析过了,我们就不再罗嗦。
2009-03-10
Hadoop源代码分析(类TaskStatus)
我们先分析Task,这是一个规模比较大的类,类图如下。Task是一个虚类,它有两个子类,MapTask和ReduceTask,分别是Map任务和Reduce任务的抽象。
在分析Task相关类之前,我们来分析和ID,JobID,TaskID相关的类。
我们从TaskStatus开始来分析Task相关的类,TaskStatus,一看类名就知道它保持了Task的状态。从前面介绍MapReduce的过程中,我们了解到,MapReduce的过程可以处于下面6个阶段,它们定义在枚举:TaskStatus.Phase中,包括如下状态:
l STARTING:开始
l MAP:Map阶段
l SHUFFLE:混洗阶段
l SORT:排序阶段
l REDUCE:Reduce阶段
l CLEANUP:清理阶段
除了阶段,TaskStatus还维护任务的状态,很明显,如果不考虑异常,一次任务应该包括准备,运行和清理三个主要阶段,其实TaskStatus的正常流程和这个非常类似,同时,考虑到任务可能异常结束或被JobTracker杀死,系统还引入配合这两种异常情况的状态,其状态如下:
图中引入了复合状态,只是表明这些状态中包含的状态(如绿色的COMMIT_PENDING和SUCCESSED)可以转移到外面的状态(FAILED)。
(注:这张图是通过人肉逆向工程画出来的,在以后的分析过程中,这张图会根据我们对系统的深入了解而修改)
接下来我们来看TaskStatus的其它成员,它的完整类图如下,基本上是一些信息,没有复杂的操作。
它包含的主要状态信息有:taskid(对应的任务号),progress(处理情况),runState(运行状态,注意和任务阶段做区分),diagnosticInfo(诊断信息),stateString(运行状态),taskTracker(对应的taskTracker),startTime(开始时间),finishTime(结束时间),outputSize(输出大小),phase(任务阶段,注意和运行状态做区分),counters(相关的计数器),includeCounters(是否包含成员变量counters)nextRecordRange(处理的记录范围)。
TaskStatus有两个子类,分别是MapTaskStatus(没有添加任何新的成员变量)和ReduceTaskStatus。ReduceTaskStatus是Reduce任务的状态,它包含了新信息shuffleFinishTime(shuffle结束时间)和sortFinishTime(sort结束时间)。同时,获取Map结果出错时,对应的Map的TaskAttemptID会保存在failedFetchTasks中,等待上报。
最后我们看一下辅助类Counters/Counters.Counter/Counters.Group,它们保存了MapReduce过程中的一些统计计数器,Counters.Counter记录了一个计数器的<名字,显示名,值>,Counters.Group将相关的Counters.Counter聚合成组,并引入组名,组显示名。
2009-05-24
Hadoop源代码分析(IFile)
Mapper的输出,在发送到Reducer前是存放在本地文件系统的,IFile提供了对Mapper输出的管理。我们已经知道,Mapper的输出是<Key,Value>对,IFile以记录<key-len, value-len, key, value>的形式存放了这些数据。为了保存键值对的边界,很自然IFile需要保存key-len和value-len。
和IFile相关的类图如下:
其中,文件流形式的输入和输出是由IFIleInputStream和IFIleOutputStream抽象。以记录形式的读/写操作由IFile.Reader/IFile.Writer提供,IFile.InMemoryReader用于读取存在于内存中的IFile文件格式数据。
我们以输出为例,来分析这部分的实现。首先是下图的和序列化反序列化相关的Serialization/Deserializer,这部分的code是在包org.apache.hadoop.io.serializer。序列化由Serializer抽象,通过Serializer的实现,用户可以利用serialize方法把对象序列化到通过open方法打开的输出流里。Deserializer提供的是相反的过程,对应的方法是deserialize。hadoop.io.serializer中还实现了配合工作的Serialization和对应的工厂SerializationFactory。两个具体的实现是WritableSerialization和JavaSerialization,分别对应了Writeble的序列化反序列化和Java本身带的序列化反序列化。
有了Serializer/Deserializer,我们来分析IFile.Writer。Writer的构造函数是:
public Writer(Configuration conf, FSDataOutputStream out,
Class<K> keyClass, Class<V> valueClass,
CompressionCodec codec, Counters.Counter writesCounter)
conf,配置参数,out是Writer的输出,keyClass 和valueClass 是输出的Kay,Value的class属性,codec是对输出进行压缩的方法,参数writesCounter用于对输出字节数进行统计的Counters.Counter。通过这些参数,我们可以构造我们使用的支持压缩功能的输出流(类成员out,类成员rawOut保存了构造函数传入的out),相关的计数器,还有就是Kay,Value的Serializer方法。
Writer最主要的方法是append方法(居然不是write方法,呵呵),有两种形式:
public void append(K key, V value) throws IOException {
public void append(DataInputBuffer key, DataInputBuffer value)
append(K key, V value)的主要过程是检查参数,然后将key和value序列化到DataOutputBuffer中,并获取序列化后的长度,最后把长度(2个)和DataOutputBuffer中的结果写到输出,并复位DataOutputBuffer和计数。append(DataInputBuffer key, DataInputBuffer value)处理过程也比较类似,就不再分析了。
close方法中需要注意的是,我们需要标记文件尾,或者是流结束。目前是通过写2个值为EOF_MARKER的长度来做标记。
IFileOutputStream是用于配合Writer的输出流,它会在IFiles的最后添加校验数据。当Writer调用IFileOutputStream的write操作时,IFileOutputStream计算并保持校验和,流被close的时候,校验结果会写到对应文件的文件尾。实际上存放在磁盘上的文件是一系列的<key-len, value-len, key, value>记录和校验结果。
Reader的相关过程,我们就不再分析了。
2009-05-24
Hadoop源代码分析(Task的内部类和辅助类)
从前面的图中,我们可以发现Task有很多内部类,并拥有大量类成员变量,这些类配合Task完成相关的工作,如下图。
MapOutputFile管理着Mapper的输出文件,它提供了一系列get方法,用于获取Mapper需要的各种文件,这些文件都存放在一个目录下面。
我们假设传入MapOutputFile的JobID为job_200707121733_0003,TaskID为task_200707121733_0003_m_000005。MapOutputFile的根为
{mapred.local.dir}/taskTracker/jobcache/{jobid}/{taskid}/output
在下面的讨论中,我们把上面的路径记为{MapOutputFileRoot}
以上面JogID和TaskID为例,我们有:
{mapred.local.dir}/taskTracker/jobcache/job_200707121733_0003/task_200707121733_0003_m_000005/output
需要注意的是,{mapred.local.dir}可以包含一系列的路径,那么,Hadoop会在这些根路径下找一个满足要求的目录,建立所需的文件。MapOutputFile的方法有两种,结尾带ForWrite和不带ForWrite,带ForWrite用于创建文件,它需要一个文件大小作为参数,用于检查磁盘空间。不带ForWrite用于获取以建立的文件。
getOutputFile:文件名为{MapOutputFileRoot}/file.out;
getOutputIndexFile:文件名为{MapOutputFileRoot}/file.out.index
getSpillFile:文件名为{MapOutputFileRoot}/spill{spillNumber}.out
getSpillIndexFile:文件名为{MapOutputFileRoot}/spill{spillNumber}.out.index
以上四个方法用于Task子类MapTask中;
getInputFile:文件名为{MapOutputFileRoot}/map_{mapId}.out
用于ReduceTask中。我们到使用到他们的地方再介绍相应的应用场景。
介绍完临时文件管理以后,我们来看Task.CombineOutputCollector,它继承自org.apache.hadoop.mapred.OutputCollector,很简单,只是一个OutputCollector到IFile.Writer的Adapter,活都让IFile.Writer干了。
ValuesIterator用于从RawKeyValueIterator(Key,Value都是DataInputBuffer,ValuesIterator要求该输入已经排序)中获取符合RawComparator<KEY> comparator的值的迭代器。它在Task中有一个简单子类,CombineValuesIterator。
Task.TaskReporter用于向JobTracker提交计数器报告和状态报告,它实现了计数器报告Reporter和状态报告StatusReporter。为了不影响主线程的工作,TaskReporter有一个独立的线程,该线程通过TaskUmbilicalProtocol接口,利用Hadoop的RPC机制,向JobTracker报告Task执行情况。
FileSystemStatisticUpdater用于记录对文件系统的对/写操作字节数,是个简单的工具类。
2009-05-25
Hadoop源代码分析(类Task)
有了前面的基础,我们可以来分析类Task了。Task是一个虚基类,它有两个子类:MapTask,ReduceTask,分别对应着Map和Reduce。先从成员变量开始:
首先是和作业任务相关的信息,包括jobFile,作业的配置文件;taskId,任务ID,从中可以获取作业ID;partition,Job内ID;taskStatus,任务状态。jobCleanup,jobSetup和taskCleanup是三个标志位。
接下来是一组和错误回复的变量。我们知道,如果在Task执行过程中出错,很有可能是因为输入有问题,一个常用的策略是在下一次回复性执行过程中,忽略这部分输入,skipRanges,skipping和writeSkipRecs就是用来控制这个行为的。
currentRecStartIndex和currentRecIndexIterator配合,可以得到当前的任务输入。
conf保存了当前任务的配置(JobConf形式),MapOutputFile上一部分已经介绍了,用于管理临时文件,跟它配合的是lDirAlloc,类型为LocalDirAllocator,是本地文件分配器。jobContext和taskContext保持了Job和Task的上下文。committer定制了和Task生命周期相关的一些特殊处理(也可以看出是上下文)。
最后一部分应该是输出outputFormat。
和统计/状态监视的成员变量分散在类的各处,如spilledRecordsCounter,taskProgress,counters等,我们就不再介绍了。
下面我们开始来分析Task的成员函数,首先是虚方法,Task包含了下面3个虚方法:
public abstract void run(JobConf job, TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException;
执行Task;
public abstract TaskRunner createRunner(TaskTracker tracker,
TaskTracker.TaskInProgress tip) throws IOException;
创建一个TaskRunner;
public abstract boolean isMapTask();
是否是一个Map任务。上面这3个方法自然是和MapTask,ReduceTask相关,也需要它们实现。
构造函数很简单,主要是初始化一些成员函数。initialize也用于初始化成员,它被Task的子类调用,用于子类传入一些子类中构造的对象。构造函数后面是一系列的setter和getter,还有实现Writable的write和readFields。
localizeConfiguration函数用于将一些和Task相关的信息存放到JobConf里,这也是Hadoop MapReduce中重要的参数传递方式。
接下来分析的是一系列和Task生命周期相关的函数。
public void done(TaskUmbilicalProtocol umbilical, TaskReporter reporter )
done被多个方法调用(下图),用于做结束任务的一些清理工作,步骤如下:
l 更新计数器updateCounters();
l 如果任务需要提交,设置Taks状态为COMMIT_PENDING,并利用TaskUmbilicalProtocol,汇报Task完成,等待提交;然后调用commit提交任务(下面分析)
l 设置任务结束标志位;结束Reporter通信线程;
l 发送最后一次统计报告(通过sendLastUpdate方法,很简单);
l 利用TaskUmbilicalProtocol报告结束状态(通过sendDone方法,很简单)。
commit方法被done方法调用,用于等待TaskTracker的可提交信号。通过这种机制,Task可以等待TaskTracker上需要的一些后续处理,比方说,把Task的结果取走,需要TaskTracker的协调和确认。commit还会调用org.apache.hadoop.mapreduce.OutputCommitter的commitTask方法,执行一些子类需要的commit事件处理。
runJobCleanupTask,runJobSetupTask和runTaskCleanupTask应用在Maptask和ReduceTask的run方法中,用于做一些准备和可能的清除任务。
runJobSetupTask:为建立Job做准备,执行状态设置,然后调用org.apache.hadoop.mapreduce.OutputCommitter的setupJob,最后通过done,通知TaskTracker任务完成。
runJobCleanupTask:清理Job,包括步骤状态设置,更新状态到TaskTracker,调用org.apache.hadoop.mapreduce.OutputCommitter的相关方法,通过done,通知TaskTracker任务完成。
runTaskCleanupTask:清理Task任务,和runJobCleanupTask类似。
应该说,这些方法只是提供了一个通用的框架,具体需要的执行,在于org.apache.hadoop.mapreduce.OutputCommitter的具体实现。
2009-05-29
Hadoop源代码分析(MapTask)
接下来我们来分析Task的两个子类,MapTask和ReduceTask。MapTask的相关类图如下:
MapTask其实不是很复杂,复杂的是支持MapTask工作的一些辅助类。MapTask的成员变量少,只有split和splitClass。我们知道,Map的输入是split,是原始数据的一个切分,这个切分由org.apache.hadoop.mapred.InputSplit的子类具体描述(前面我们是通过org.apache.hadoop.mapreduce.InputSplit介绍了InputSplit,它们对外的API是一样的)。splitClass是InputSplit子类的类名,通过它,我们可以利用Java的反射机制,创建出InputSplit子类。而split是一个BytesWritable,它是InputSplit子类串行化以后的结果,再通过InputSplit子类的readFields方法,我们可以回复出对应的InputSplit对象。
MapTask最重要的方法是run。run方法相当简单,配置完系统的TaskReporter后,就根据情况执行runJobCleanupTask,runJobSetupTask,runTaskCleanupTask或执行Mapper。由于MapReduce现在有两套API,MapTask需要支持这两套API,使得MapTask执行Mapper分为runNewMapper和runOldMapper,run*Mapper后,MapTask会调用父类的done方法。
接下来我们来分析runOldMapper,最开始部分是构造Mapper处理的InputSplit,更新Task的配置,然后就开始创建Mapper的RecordReader,rawIn是原始输入,然后分正常(使用TrackedRecordReader,后面讨论)和跳过部分记录(使用SkippingRecordReader,后面讨论)两种情况,构造对应的真正输入in。
跳过部分记录是Map的一种出错恢复策略,我们知道,MapReduce处理的数据集合非常大,而有些任务对一部分出错的数据不进行处理,对结果的影响很小(如大数据集合的一些统计量),那么,一小部分的数据出错导致已处理的大量结果无效,是得不偿失的,跳过这部分记录,成了Mapper的一种选择。
Mapper的输出,是通过MapOutputCollector进行的,也分两种情况,如果没有Reducer,那么,用DirectMapOutputCollector(后面讨论),否则,用MapOutputBuffer(后面讨论)。
构造完Mapper的输入输出,通过构造配置文件中配置的MapRunnable,就可以执行Mapper了。目前系统有两个MapRunnable:MapRunner和MultithreadedMapRunner,如下图。
原有API在这块的处理上和新API有很大的不一样。接口MapRunnable是原有API中Mapper的执行器,run方法就是用于执行用户的Mapper。MapRunner是单线程执行器,相当简单,首先,当MapTask调用:
MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
MapRunner的configure会在newInstance的最后被调用,configure执行的过程中,对应的Mapper会通过反射机制构造出来。
MapRunner的run方法,会先创建对应的key,value对象,然后,对InputSplit的每一对<key,value>,调用Mapper的map方法,循环结束后,Mapper对应的清理方法会被调用。我们需要注意,key,value对象在run方法中是被重复使用的,就是说,每次传入Mapper的map方法的key,value都是同一个对象,只不过是里面的内容变了,对象并没有变。如果你需要保留key,value的内容,需要实现clone机制,克隆出对象的一个新备份。
相对于新API的多线程执行器,老API的MultithreadedMapRunner就比较复杂了,总体来说,就是通过阻塞队列配合Java的多线程执行器,将<key,value>分发到多个线程中去处理。需要注意的是,在这个过程中,这些线程共享一个Mapper实例,如果Mapper有共享的资源,需要有一定的保护机制。
runNewMapper用于执行新版本的Mapper,比runOldMapper稍微复杂,我们就不再讨论了。
2009-06-03
Hadoop源代码分析(MapTask辅助类 I)
MapTask的辅助类主要针对Mapper的输入和输出。首先我们来看MapTask中用的的Mapper输入,在类图中,这部分位于右上角。
MapTask.TrackedRecordReader是一个Wrapper,在原有输入RecordReader的基础上,添加了收集上报统计数据的功能。
MapTask.SkippingRecordReader也是一个Wrapper,它在MapTask.TrackedRecordReader的基础上,添加了忽略部分输入的功能。在分析MapTask.SkippingRecordReader之前,我们先看一下类SortedRanges和它相关的类。
类SortedRanges.Ranges表示了一个范围,以开始位置和范围长度(这样的话就可以表示长度为0的范围)来表示一个范围,并提供了一系列的范围操作方法。注意,方法getEndIndex得到的右端点并不包含在范围内(应理解为开区间)。SortedRanges包含了一系列不重叠的范围,为了保证包含的范围不重叠,在add方法和remove方法上需要做一些处理,保证不重叠的约束。SkipRangeIterator是访问SortedRanges包含的Ranges的迭代器。
MapTask.SkippingRecordReader的实现很简单,因为要忽略的输入都保持在SortedRanges.Ranges,只需要在next方法中,判断目前范围时候落在SortedRanges.Ranges中,如果是,忽略,并将忽略的记录写文件(可配置)
NewTrackingRecordReader和NewOutputCollector被新API使用,我们不分析。
MapTask的输出辅助类都继承自MapOutputCollector,它只是在OutputCollector的基础上添加了close和flush方法。
DirectMapOutputCollector用在Reducer的数目为0,就是不需要Reduce阶段的时候。它是直接通过
out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
得到对应的RecordWriter,collect直接到RecordWriter上。
如果Mapper后续有reduce任务,系统会使用MapOutputBuffer做为输出,这是个比较复杂的类,有1k行左右的代码。
我们知道,Mapper是通过OutputCollector将Map的结果输出,输出的量很大,Hadoop的机制是通过一个circle buffer 收集Mapper的输出, 到了io.sort.mb * percent量的时候,就spill到disk,如下图。图中出现了两个数组和一个缓冲区,kvindices保持了记录所属的(Reduce)分区,key在缓冲区开始的位置和value在缓冲区开始的位置,通过kvindices,我们可以在缓冲区中找到对应的记录。kvoffets用于在缓冲区满的时候对kvindices的partition进行排序,排完序的结果将输出到输出到本地磁盘上,其中索引(kvindices)保持在spill{spill号}.out.index中,数据保存在spill{spill号}.out中。
当Mapper任务结束后,有可能会出现多个spill文件,这些文件会做一个归并排序,形成Mapper的一个输出(spill.out和spill.out.index),如下图:
这个输出是按partition排序的,这样的话,Mapper的输出被分段,Reducer要获取的就是spill.out中的一段。(注意,内存和硬盘上的索引结构不一样)
(感谢彭帅的Hadoop Map Stage流程分析 http://www.cnblogs.com/OnlyXP/archive/2009/05/25/1488811.html)
2009-06-04
Hadoop源代码分析(MapTask辅助类,II)
有了上面Mapper输出的内存存储结构和硬盘存储结构讨论,我们来仔细分析MapOutputBuffer的流程。
首先是成员变量。最先初始化的是作业配置job和统计功能reporter。通过配置,MapOutputBuffer可以获取本地文件系统(localFs和rfs),Reducer的数目和Partitioner。
SpillRecord是文件spill.out{spill号}.index在内存中的对应抽象(内存数据和文件数据就差最后的校验和),该文件保持了一系列的IndexRecord,如下图:
IndexRecord有3个字段,分别是startOffset:记录偏移量,rawLength:初始长度,partLength:实际长度(可能有压缩)。SpillRecord保持了一系列的IndexRecord,并提供方法用于添加记录(没有删除记录的操作,因为不需要),获取记录,写文件,读文件(通过构造函数)。
接下来是一些和输出缓存区kvbuffer,缓存区记录索引kvindices和缓存区记录索引排序工作数组kvoffsets相关的处理,下面的图有助于说明这段代码。
这部分依赖于3个配置参数,io.sort.spill.percent是kvbuffer,kvindices和kvoffsets的总大小(以M为单位,缺省是100,就是100M,这一部分是MapOutputBuffer中占用存储最多的)。io.sort.record.percent是kvindices和kvoffsets占用的空间比例(缺省是0.05)。前面的分析我们已经知道kvindices和kvoffsets,如果记录数是N的话,它占用的空间是4N*4bytes,根据这个关系和io.sort.record.percent的值,我们可以计算出kvindices和kvoffsets最多能有多少个记录,并分配相应的空间。参数io.sort.spill.percent指示当输出缓冲区或kvindices和kvoffsets记录数量到达对应的占用率的时候,会启动spill,将内存缓冲区的记录存放到硬盘上,softBufferLimit和softRecordLimit为对应的字节数。
值对<key, value>输出到缓冲区是通过Serializer串行化的,这部分的初始化跟在上面输出缓存后面。接下来是一些计数器和可能的数据压缩处理器的初始化,可能的Combiner和combiner工作的一些配置。
最后是启动spillThread,该Thread会检查内存中的输出缓存区,在满足一定条件的时候将缓冲区中的内容spill到硬盘上。这是一个标准的生产者-消费者模型,MapTask的collect方法是生产者,spillThread是消费者,它们之间同步是通过spillLock(ReentrantLock)和spillLock上的两个条件变量(spillDone和spillReady)完成的。
先看生产者,MapOutputBuffer.collect的主要流程是:
l 报告进度和参数检测(<K, V>符合Mapper的输出约定);
l spillLock.lock(),进入临界区;
l 如果达到spill条件,设置变量并通过spillReady.signal(),通知spillThread;并等待spill结束(通过spillDone.await()等待);
l spillLock.unlock();
l 输出key,value并更新kvindices和kvoffsets(注意,方法collect是synchronized,key和value各自输出,它们也会占用连续的输出缓冲区);
kvstart,kvend和kvindex三个变量在判断是否需要spill和spill是否结束的过程中很重要,kvstart是有效记录开始的下标,kvindex是下一个可做记录的位置,kvend的作用比较特殊,它在一般情况下kvstart==kvend,但开始spill的时候它会被赋值为kvindex的值,spill结束时,它的值会被赋给kvstart,这时候kvstart==kvend。这就是说,如果kvstart不等于kvend,系统正在spill,否则,kvstart==kvend,系统处于普通工作状态。其实在代码中,我们可以看到很多kvstart==kvend的判断。
下面我们分情况,讨论kvstart,kvend和kvindex的配合。初始化的时候,它们都被赋值0。
下图给出了一个没有spill的记录添加过程:
注意kvindex和kvnext的关系,取模实现了循环缓冲区
如果在添加记录的过程中,出现spill(多种条件),那么,主要的过程如下:
首先还是计算kvnext,主要,这个时候kvend==kvstart(图中没有画出来)。如果spill条件满足,那么,kvindex的值会赋给kvend(这是kvend不等于kvstart),从kvstart和kvend的大小关系,我们可以知道记录位于数组的那一部分(左边是kvstart<kvend的情况,右边是另外的情况)。Spill结束的时候,kvend值会被赋给kvstart, kvend==kvstart又重新满足,同时,我们可以发现kvindex在这个过程中没有变化,新的记录还是写在kvindex指向的位置,然后,kvindex=kvnect,kvindex移到下一个可用位置。
大家体会一下上面的过程,特别是kvstart,kvend和kvindex的配合,其实,<key,value>对输出使用的缓冲区,也有类似的过程。
Collect在处理<key,value>输出时,会处理一个MapBufferTooSmallException,这是value的串行化结果太大,不能一次放入缓冲区的指示,这种情况下我们需要调用spillSingleRecord,特殊处理。
2009-06-05
Hadoop源代码分析(MapTask辅助类,III)
接下来讨论的是key,value的输出,这部分比较复杂,不过有了前面kvstart,kvend和kvindex配合的分析,有利于我们理解这部分的代码。
输出缓冲区中,和kvstart,kvend和kvindex对应的是bufstart,bufend和bufmark。这部分还涉及到变量bufvoid,用于表明实际使用的缓冲区结尾(见后面BlockingBuffer.reset分析),和变量bufmark,用于标记记录的结尾。这部分代码需要bufmark,是因为key或value的输出是变长的,(前面元信息记录大小是常量,就不需要这样的变量)。
最好的情况是缓冲区没有翻转和value串行化结果很小,如下图:
先对key串行化,然后对value做串行化,临时变量keystart,valstart和valend分别记录了key结果的开始位置,value结果的开始位置和value结果的结束位置。
串行化过程中,往缓冲区写是最终调用了Buffer.write方法,我们后面再分析。
如果key串行化后出现bufindex < keystart,那么会调用BlockingBuffer的reset方法。原因是在spill的过程中需要对<key,value>排序,这种情况下,传递给RawComparator的必须是连续的二进制缓冲区,通过BlockingBuffer.reset方法,解决这个问题。下图解释了如何解决这个问题:
当发现key的串行化结果出现不连续的情况时,我们会把bufvoid设置为bufmark,见缓冲区开始部分往后挪,然后将原来位于bufmark到bufvoid出的结果,拷到缓冲区开始处,这样的话,key串行化的结果就连续存放在缓冲区的最开始处。
上面的调整有一个条件,就是bufstart前面的缓冲区能够放下整个key串行化的结果,如果不能,处理的方式是将bufindex置0,然后调用BlockingBuffer内部的out的write方法直接输出,这实际调用了Buffer.write方法,会启动spill过程,最终我们会成功写入key串行化的结果。
下面我们看write方法。key,value串行化过程中,往缓冲区写数据是最终调用了Buffer.write方法,又是一个复杂的方法。
l do-while循环,直到我们有足够的空间可以写数据(包括缓冲区和kvindices和kvoffsets)
u 首先我们计算缓冲区连续写是否写满标志buffull和缓冲区非连续情况下有足够写空间标志wrap(这个实在拗口),见下面的讨论;条件(buffull && !wrap)用于判断目前有没有足够的写空间;
u 在spill没启动的情况下(kvstart == kvend),分两种情况,如果数组中有记录(kvend != kvindex),那么,根据需要(目前输出空间不足或记录数达到spill条件)启动spill过程;否则,如果空间还是不够(buffull && !wrap),表明这个记录非常大,以至于我们的内存缓冲区不能容下这么大的数据量,抛MapBufferTooSmallException异常;
u 如果空间不足同时spill在运行,等待spillDone;
l 写数据,注意,如果buffull,则写数据会不连续,则写满剩余缓冲区,然后设置bufindex=0,并从bufindex处接着写。否则,就是从bufindex处开始写。
下图给出了缓冲区连续写是否写满标志buffull和缓冲区非连续情况下有足够写空间标志wrap计算的几种可能:
情况1和情况2中,buffull判断为从bufindex到bufvoid是否有足够的空间容纳写的内容,wrap是图中白颜色部分的空间是否比输入大,如果是,wrap为true;情况3和情况4中,buffull判断bufindex到bufstart的空间是否满足条件,而wrap肯定是false。明显,条件(buffull && !wrap)满足时,目前的空间不够一次写。
接下来我们来看spillSingleRecord,只是用于写放不进内存缓冲区的<key,value>对。过程很流水,首先是创建SpillRecord记录,输出文件和IndexRecord记录,然后循环,构造SpillRecord并在恰当的时候输出记录(如下图),最后输出spill{n}.index文件。
前面我们提过spillThread,在这个系统中它是消费者,这个消费者相当简单,需要spill时调用函数sortAndSpill,进行spill。sortAndSpill和spillSingleRecord类似,函数的开始也是创建SpillRecord记录,输出文件和IndexRecord记录,然后,需要在kvoffsets上做排序,排完序后顺序访问kvoffsets,也就是按partition顺序访问记录。
按partition循环处理排完序的数组,如果没有combiner,则直接输出记录,否则,调用combineAndSpill,先做combin然后输出。循环的最后记录IndexRecord到SpillRecord。
sortAndSpill最后是输出spill{n}.index文件。
combineAndSpill比价简单,我们就不分析了。
BlockingBuffer中最后要分析的方法是flush方法。调用flush方法,意味着Mapper的结果都已经collect了,需要对缓冲区做一些最后的清理,并合并spill{n}文件产生最后的输出。
缓冲区处理部分很简单,先等待可能的spill过程完成,然后判断缓冲区是否为空,如果不是,则调用sortAndSpill,做最后的spill,然后结束spill线程。
flush合并spill{n}文件是通过mergeParts方法。如果Mapper最后只有一个spill{n}文件,简单修改该文件的文件名就可以。如果Mapper没有任何输出,那么我们需要创建哑输出(dummy files)。如果spill{n}文件多于1个,那么按partition循环处理所有文件,将处于处理partition的记录输出。处理partition的过程中可能还会再次调用combineAndSpill,最记录再做一次combination,其中还涉及到工具类Merger,我们就不再深入研究了。
相关文章推荐
- Hadoop源代码分析(MapReduce概论)
- Hadoop源代码分析(MapReduce概论)
- Hadoop源代码分析(mapreduce.lib.partition/reduce/output)
- hadoop源代码分析(一)从wordCount开始,剖析mapreduce的运行机制
- Hadoop源代码分析(包hadoop.mapred中的MapReduce接口)
- hadoop源代码分析(二)从wordCount开始,剖析mapreduce的运行机制
- Hadoop源代码分析 - MapReduce(转载)
- Hadoop源代码分析(mapreduce.lib.partition/reduce/output)
- Hadoop源码分析(MapReduce概论)
- Hadoop 源代码分析(五)RPC 框架
- Hadoop源代码分析【IO专题-序列化机制】
- Java开发2.0:用Hadoop MapReduce进行大数据分析
- Hadoop-2.7.3源码分析:MapReduce作业提交源码跟踪
- MapReduce 运行过程源代码分析
- 配置 eclipse 编译、开发 Hadoop(MapReduce)源代码
- MapReduce过程、Spark和Hadoop以Shuffle为中心的对比分析
- 使用hadoop mapreduce分析mongodb数据:(1)
- org.apache.hadoop.mapreduce包分析
- Hadoop中 MapReduce中InputSplit的分析
- Hadoop MapReduce两种常见的容错场景分析