TaskTracker执行map或reduce任务的过程2
2013-08-29 15:17
351 查看
TaskTracker执行map或reduce任务的过程(二)
上次说到,当MapLauncher或ReduceLancher(用于执行任务的线程,它们扩展自TaskLauncher),从它们所维护的LinkedList也即队列中获取到TaskInProgress,并且TaskTracker有空闲的slot时,该线程就调用了TaskTracker的startNewTask(tip)方法,如下所示:public void run() { while (!Thread.interrupted()) { try { TaskInProgress tip; Task task; synchronized (tasksToLaunch) { while (tasksToLaunch.isEmpty()) { tasksToLaunch.wait();//当队列为空时呗阻塞,知道有新的tip到来才会被唤醒 } //get the TIP tip = tasksToLaunch.remove(0); task = tip.getTask(); ......//当有空闲的slot时执行启动一个任务 startNewTask(tip); ...... } }
接下了来就让我们看下startNewTask(tip)的神秘面纱吧,由于在其内部通过实习Runnable创建了一个线程,我们只需分析线程体的run方法即可,关键代码如下,为便于说明,给3个核心语句分别标识为**1,**2:
public void run() { try { RunningJob rjob = localizeJob(tip); //**1 tip.getTask().setJobFile(rjob.getLocalizedJobConf().toString()); // task本地化已经完成,此刻如果rjob.jobConf或者rjob.ugi为空的话,会抛出异常 launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob); //**2 ...... } }
**1的源码如下,
Task t = tip.getTask(); JobID jobId = t.getJobID(); RunningJob rjob = addTaskToJob(jobId, tip); InetSocketAddress ttAddr = getTaskTrackerReportAddress();
从中我们可以看出,首先创建了一个该任务所属的RunningJob,并把它放入到一个该TaskTracker所维护的TreeMap<jobId,RunningJob>中,同时在RunningJob中记录将要执行的task,也即把tip放入到RunningJob.tasks(一个HashSet<TaskInProgress>)中。由此,我们可以知道,每个TaskTracker都维护者一个TreeMap用以记录它正在执行的哪个作业的哪些任务(map、reduce任务)。
接下来localizeJob(tip)要做的就是调用initializeJob(t, rjob, ttAddr)初始化工作目录,并下载相应的job.xml以及job.jar(TaskController负责)文件,TaskController最后调用RunJar.unJar()将包解压到相应的工作目录,,至此初始化工作完成,调用launchTaskForJob开始执行Task。
**2的核心代码为:
protected void launchTaskForJob(TaskInProgress tip, JobConf jobConf,RunningJob rjob) throws IOException { synchronized (tip) { jobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY, localStorage.getDirsString()); tip.setJobConf(jobConf); tip.setUGI(rjob.ugi); tip.launchTask(rjob); } }
由此看出,它主要是调用TaskTracker.TaskInProgress的launchTask()方法,在该方法中它创建了一个TaskRunner线程,并启这个线程执行这个task,其run方法核心代码如下:
public final void run() { //设置工作目录 final File workDir = new File(new Path(localdirs[rand.nextInt(localdirs.length)], TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(), taskid.toString(), t.isTaskCleanupTask())).toString()); ...... // 设置环境变量 List<String> classPaths = getClassPaths(conf, workDir,taskDistributedCacheManager); ....... //启动Task子进程 launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir); } }
未完待续......
相关文章推荐
- TaskTracker执行map或reduce任务的过程(二)
- TaskTracker获取并执行map或reduce任务的过程(一)
- TaskTracker获取并执行map或reduce任务的过程1
- Map任务执行过程分析
- Hadoop MapReduce之ReduceTask任务执行(二):GetMapEventsThread线程
- map-reduce任务的执行流程
- Hadoop MapReduce执行过程中map和reduce执行过程
- taskTracker请求任务执行任务的过程
- map-reduce任务的执行流程
- Hadoop MapReduce执行过程详解及MR中job参数及设置map和reduce的个数(带hadoop例子)
- 实现每个Map或Reduce任务只执行一次map或reduce方法
- hadoop的map和reduce任务的执行步骤
- Hadoop 少量map/reduce任务执行慢问题
- MapReduce剖析笔记之五:Map与Reduce任务分配过程
- 【hadoop2.2(yarn)】基于yarn成功执行分布式map-reduce,记录问题解决过程。
- Hadoop MapReduce之ReduceTask任务执行(一):远程拷贝map输出
- hadoop执行mapreduce任务,能够map,不能reduce,Shuffle阶段报错
- 详谈JobInProgress中Map/Reduce任务分配
- 记一次centos7.2下用crontab执行定时任务的过程(初级)
- 远程提交Map/Reduce任务