您的位置:首页 > 运维架构

根据hadoop-1.2.1整理出的mapreduce过程

2014-02-28 16:57 204 查看
1.Job.waitForComplete() 代表一个由用户配置提交的作业

2.JobClient.submitJobInternal() 

    (1).检查客户端参数,用户权限,生成jobid

    (2).生成hadoop上下文JobContext,计算输入分片,将所有运行时信息写入job.xml

3.调用JobSubmitProtocol.submit方法提交,实际上是远程RPC调用JobTracker的submit方法

4.在JobTracker.submit

     (1).检查HDFS是否处于safe mode,然后创建一个代表和监控当前作业的JobProgress对象

     (2).调用addjob把作业添加到监控进度和初始化作业的队列当中

     (3).JobTracker运行在一个单独的Jvm进程上,启动时,在构造中创建一个TaskScheduler,在TaskScheduler.start方法中

           创建两个listener队列,一个监控进度,一个负责初始化作业

     (4).当JobProgress对象被添加到Job初始化listener队列中之后,就触发JobProgress.initTask()方法

5.JobInProgress.intiTask()

      (1). 初始化Job,计算分片数量,为处理每个分片的Map任务创建相应的TaskInprogress,加入到一个nonRunningMapCache中

           按照配置的reduce.task.number,为每个reduce创建相应的TaskInProgress对象,加入到一个nonRunningReduce中

           创建另外两个TaskInProgress,setup和cleanup

6.TaskTracker,作为一个单独的Jvm进程来运行 TaskTracker.offerService

      (1). 按照固定的时长向JobTracker发送heartBeat,发送自己的状态报告TaskTrackerStatus,

           JobTracker按照TaskTrackerStatus报告的资源信息调用JobQueenTaskScheduler.assignTask(默认)来

           为每个TaskTracker分配Task,并以HeartBeatResponse的方式返回

           TaskTracker获取HeartBeatResponse,检查Response中分给自己的map和reduce任务

      (2). 为每个任务建立LaunchTaskAction,在addToTaskQueen方法中,把LaunchTaskAction的信息组装成

            TaskInProgress加入到TaskLauncher的待处理任务队列

           TaskLauncher是一个生产者-消费者线程模型,当它的任务队列中有相应的任务时,就唤醒处理线程,

           在其run方法中调用startNewTask方法,在其中调用TaskInProgress.locallizeJob(),将任务本地化

           然后调用launchTaskforJob,在其中调用TaskInProgress.launchTask,在其中创建一个执行TaskRunner

           的线程,并启动,在其run方法中,配置child jvm的运行参数,并调用TaskRunner.launchJvmAndWait

           启动子进程的Jvm来真正执行开发者编写的map/reduce方法

7.Child    调用MapTask,ReduceTask来执行开发者编写的map/reduce 函数

      (1).在MapTask.run中调用MapRunner.run,在其中从分片中读取Record,调用开发者map方法,结果写入MapOutputBuffer,

            这是一个二级索引的环形缓冲区,经过partition之后写入,当达到io.sort.mb * io.sort.spill.percent之后

            排序,然后如果配置了combiner则先combiner再spill,最后调用mergeParts,把同一partition的不同spill文件

            合并成一个大文件

       (2).在ReduceTask.run中,创建mapred.reduce.parallel.copies个线程,从map端copy数据文件,

             每io.sort.factor个进行merge成<key,list<value>>输出,

             当文件数小于io.sort.factor个,就调用开发者的reduce函数 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop mapreduce