TaskTracker获取并执行map或reduce任务的过程(一)
2013-08-23 18:54
162 查看
我们知道TaskTracker在默认情况下,每个3秒就行JobTracker发送一个心跳包,也就是在这个心跳包中包含对任务的请求。JobTracker返回给TaskTracker的心跳包中包含有各种action(任务),如果有满足在此TaskTracker上执行的任务的话,该任务也就包含在心跳包的响应中。在TaskTracker端有线程专门等待map或reduce任务,并从队列中取出执行。
该函数具体代码为:
我们从中可以看出,TaskTracker首先创建一个TaskTrackerStatus对象,其中包含有TaskTracker的各种信息,比如,mapslot的数目,reducerslot槽的数目,TaskTracker所在的主机名等信息。然后,对TaskTracker的空闲的slot以及磁盘空间进行检查,如果满足相应的条件时,最终就会通过JobClient(为JobTracker的代理)将心跳信息发送给JobTracker,并得到JobTracker的响应HeartbeatResponse。如下所示,JobClient是InterTrackerProtocol的一个实例,而JobTracker实现了InterTrackerProtocol这个接口。
那么,TaskTracker怎样通过JobTracker的代理与JobTracker进行通信呢?它是通过RPC调用JobTracker的heartbeat(......)方法而实现的。
privatevoidaddToTaskQueue(LaunchTaskActionaction){
if(action.getTask().isMapTask()){
mapLauncher.addToTaskQueue(action);
}else{
reduceLauncher.addToTaskQueue(action);
}
}
TaskTracker启动的时候,创建了两个线程:mapLauncher和reduceLauncher,它们分别处理map任务和reduce任务,map任务有mapLauncher负责将其放入到LinkedList中,reduce任务有reducerLauncher负责将其放入到它维护的LinkedList中。
mapLauncher或者是reducerLauncher根据接收到的action,创建对应的TaskTracker.TaskInProgress对象,并放入到队列中,唤醒等待的线程进行处理。如下所示,该线程负责从taskToLaunch中获取task,当有空间的slot时,执行这个task。
这样,TaskTracker就得到了待处理的任务,具体如何执行请参考下一篇博客。
1.TaskTracker发送心跳包
TaskTracker是作为一个单独的JVM运行的,它启动以后一直处于offerService()函数中,每隔3秒就执行一次transmitHeartBeat函数,如下所示:HeartbeatResponseheartbeatResponse=transmitHeartBeat(now);
该函数具体代码为:
HeartbeatResponsetransmitHeartBeat(longnow)throwsIOException{ ...... if(status==null){ synchronized(this){ status=newTaskTrackerStatus(taskTrackerName,localHostname, httpPort, cloneAndResetRunningTaskStatuses( sendCounters), failures, maxMapSlots, maxReduceSlots); } }// //检查是否可以接受新的任务 // booleanaskForNewTask; longlocalMinSpaceStart; synchronized(this){ askForNewTask= ((status.countOccupiedMapSlots()<maxMapSlots|| status.countOccupiedReduceSlots()<maxReduceSlots)&& acceptNewTasks); localMinSpaceStart=minSpaceStart; } ...... HeartbeatResponseheartbeatResponse=jobClient.heartbeat(status, justStarted, justInited, askForNewTask, heartbeatResponseId); ...... returnheartbeatResponse; }
我们从中可以看出,TaskTracker首先创建一个TaskTrackerStatus对象,其中包含有TaskTracker的各种信息,比如,mapslot的数目,reducerslot槽的数目,TaskTracker所在的主机名等信息。然后,对TaskTracker的空闲的slot以及磁盘空间进行检查,如果满足相应的条件时,最终就会通过JobClient(为JobTracker的代理)将心跳信息发送给JobTracker,并得到JobTracker的响应HeartbeatResponse。如下所示,JobClient是InterTrackerProtocol的一个实例,而JobTracker实现了InterTrackerProtocol这个接口。
this.jobClient=(InterTrackerProtocol) UserGroupInformation.getLoginUser().doAs( newPrivilegedExceptionAction<Object>(){ publicObjectrun()throwsIOException{ returnRPC.waitForProxy(InterTrackerProtocol.class, InterTrackerProtocol.versionID, jobTrackAddr,fConf); } });
那么,TaskTracker怎样通过JobTracker的代理与JobTracker进行通信呢?它是通过RPC调用JobTracker的heartbeat(......)方法而实现的。
2.TaskTracker端获取任务
TaskTracker接收到任务后,会将它们放入到相应的LinkedList中,LinkedList实现了List和Queue接口,它是基于链表实现的FIFO的队列。heartbeatInterval=heartbeatResponse.getHeartbeatInterval();if(actions!=null){ for(TaskTrackerActionaction:actions){ if(actioninstanceofLaunchTaskAction){ addToTaskQueue((LaunchTaskAction)action); ...... } } ......
privatevoidaddToTaskQueue(LaunchTaskActionaction){
if(action.getTask().isMapTask()){
mapLauncher.addToTaskQueue(action);
}else{
reduceLauncher.addToTaskQueue(action);
}
}
TaskTracker启动的时候,创建了两个线程:mapLauncher和reduceLauncher,它们分别处理map任务和reduce任务,map任务有mapLauncher负责将其放入到LinkedList中,reduce任务有reducerLauncher负责将其放入到它维护的LinkedList中。
publicvoidaddToTaskQueue(LaunchTaskActionaction){
synchronized(tasksToLaunch){
TaskInProgresstip=registerTask(action,this);
tasksToLaunch.add(tip);
tasksToLaunch.notifyAll();
}
}
mapLauncher或者是reducerLauncher根据接收到的action,创建对应的TaskTracker.TaskInProgress对象,并放入到队列中,唤醒等待的线程进行处理。如下所示,该线程负责从taskToLaunch中获取task,当有空间的slot时,执行这个task。
synchronized(tasksToLaunch){
while(tasksToLaunch.isEmpty()){
tasksToLaunch.wait();
}
//gettheTIP
tip=tasksToLaunch.remove(0);
task=tip.getTask();
LOG.info("Tryingtolaunch:"+tip.getTask().getTaskID()+
"whichneeds"+task.getNumSlotsRequired()+"slots");
}
.....
//得到空闲的slot后,启动这个task
startNewTask(tip);
这样,TaskTracker就得到了待处理的任务,具体如何执行请参考下一篇博客。
相关文章推荐
- TaskTracker获取并执行map或reduce任务的过程1
- TaskTracker执行map或reduce任务的过程2
- TaskTracker执行map或reduce任务的过程(二)
- hadoop执行mapreduce任务,能够map,不能reduce,Shuffle阶段报错
- taskTracker请求任务执行任务的过程
- map-reduce任务的执行流程
- map-reduce任务的执行流程
- Map任务执行过程分析
- 【hadoop2.2(yarn)】基于yarn成功执行分布式map-reduce,记录问题解决过程。
- Hadoop 少量map/reduce任务执行慢问题
- MapReduce剖析笔记之五:Map与Reduce任务分配过程
- Hadoop MapReduce执行过程详解及MR中job参数及设置map和reduce的个数(带hadoop例子)
- Hadoop MapReduce之ReduceTask任务执行(二):GetMapEventsThread线程
- 实现每个Map或Reduce任务只执行一次map或reduce方法
- hadoop的map和reduce任务的执行步骤
- Hadoop MapReduce之ReduceTask任务执行(一):远程拷贝map输出
- Hadoop MapReduce执行过程中map和reduce执行过程
- 如何在Hadoop中控制Map&Reduce任务的数量
- 教你彻底弄明白Executor框架线程池任务执行的全过程
- 详谈JobInProgress中Map/Reduce任务分配