您的位置:首页 > 其它

JobTracker作业调度分析

2015-12-14 00:59 441 查看
转自:http://blog.csdn.net/Androidlushangderen/article/details/41408517

JobTracker的作业调度给我感觉就是比较宏观意义上的操作。倘若你只了解了MapReduce的工作原理是远远不够的,这时去学习一下他在宏观层面的原理实现也是对我们非常有帮助的。首先我们又得从上次分析的任务提交之后的操作说起,Job作业通过RPC通信提交到JobTracker端之后,接下来会触发到下面的方法;

[java] view plaincopyprint?

/**

* 初始化作业操作

*/

public void initJob(JobInProgress job) {

if (null == job) {

LOG.info("Init on null job is not valid");

return;

}

try {

JobStatus prevStatus = (JobStatus)job.getStatus().clone();

LOG.info("Initializing " + job.getJobID());

//初始化Task任务

job.initTasks();

......

接着会执行initTasks的方法,但不是JobTracker,而是JobInProgress类中的方法:

[java] view plaincopyprint?

/**

* Construct the splits, etc. This is invoked from an async

* thread so that split-computation doesn't block anyone.

*/

public synchronized void initTasks()

throws IOException, KillInterruptedException, UnknownHostException {

if (tasksInited || isComplete()) {

return;

}

......

jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);

jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);

this.queueMetrics.addWaitingMaps(getJobID(), numMapTasks);

this.queueMetrics.addWaitingReduces(getJobID(), numReduceTasks);

//根据numMapTasks任务数,创建MapTask的总数

maps = new TaskInProgress[numMapTasks];

for(int i=0; i < numMapTasks; ++i) {

inputLength += splits[i].getInputDataLength();

maps[i] = new TaskInProgress(jobId, jobFile,

splits[i],

jobtracker, conf, this, i, numSlotsPerMap);

}

......

//

// Create reduce tasks

//根据numReduceTasks,创建Reduce的Task数量

this.reduces = new TaskInProgress[numReduceTasks];

for (int i = 0; i < numReduceTasks; i++) {

reduces[i] = new TaskInProgress(jobId, jobFile,

numMapTasks, i,

jobtracker, conf, this, numSlotsPerReduce);

nonRunningReduces.add(reduces[i]);

}

......

// create cleanup two cleanup tips, one map and one reduce.

//创建2个clean up Task任务,1个是Map Clean-Up Task,一个是Reduce Clean-Up Task

cleanup = new TaskInProgress[2];

// cleanup map tip. This map doesn't use any splits. Just assign an empty

// split.

TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;

cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,

jobtracker, conf, this, numMapTasks, 1);

cleanup[0].setJobCleanupTask();

// cleanup reduce tip.

cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,

numReduceTasks, jobtracker, conf, this, 1);

cleanup[1].setJobCleanupTask();

// create two setup tips, one map and one reduce.

//原理同上

setup = new TaskInProgress[2];

// setup map tip. This map doesn't use any split. Just assign an empty

// split.

setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,

jobtracker, conf, this, numMapTasks + 1, 1);

setup[0].setJobSetupTask();

// setup reduce tip.

setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,

numReduceTasks + 1, jobtracker, conf, this, 1);

setup[1].setJobSetupTask();

......

可以看见,在这里JobInProgress首次被划分为了很多的小的Task任务的形式存在,而这些小的任务是以TaskInProgress的类表示。在这里MapReduce把1个作业做出了如下的分解,numMapTasks个Map Task ,numReduceTasks个Reduce Task,2个CleanUp任务,2个SetUp任务,(Map Reduce,每个各占1个),好,可以大致勾画一下,1个JobInProgress的执行流程了。



ok,initTask的任务已经完成,也就是说前面初始化的准备工作都已经完成了,后面就等着JobTacker分配作业给TaskTracker了。在这里MapReduce用的是HeartBeat的形式,就是心跳机制,心跳包在这里主要有3个作用:

1.判断TaskTracker是否活着

2.获取各个TaskTracker上的资源使用情况和任务的进度

3.给TaskTracker分配任务

而这里用到的就是第三作用。HeartBeat的调用形式同样是Hadoop自带的RPC实现方式。JobTracker不会直接分配作业给TaskTracker,中间会经过一个叫TaskScheduler掉调度器,这个可以用户自定义实现,满足不同的需求设计,在Hadoop中有默认的实现,所以你会看到大致这样的一个模型流程:



所以接下来JobTracker首先会收到很多来自TaskTracker的心跳包,判断此TaskTracker是否是无任务状态的,无任务的话,马上让TaskSchedulera分配任务给他:

[java] view plaincopyprint?

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,

boolean restarted,

boolean initialContact,

boolean acceptNewTasks,

short responseId)

throws IOException {

....

//通过心跳机制发送命令回应

// Initialize the response to be sent for the heartbeat

HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);

List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();

boolean isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());

// Check for new tasks to be executed on the tasktracker

if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {

TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName);

if (taskTrackerStatus == null) {

LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);

} else {

List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);

//说明此TaskTtracker上无任务了

if (tasks == null ) {

//为此TaskTracker分配任务

tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));

}

接下来就是TaskScheduler的方法了,不过得找出他的实现类,TaskScheduler只是一个基类:

[java] view plaincopyprint?

public synchronized List<Task> assignTasks(TaskTracker taskTracker)

throws IOException {

TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();

ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();

final int numTaskTrackers = clusterStatus.getTaskTrackers();

final int clusterMapCapacity = clusterStatus.getMaxMapTasks();

final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();

//获取作业队列

Collection<JobInProgress> jobQueue =

jobQueueJobInProgressListener.getJobQueue();

.....

for (JobInProgress job : jobQueue) {

if (job.getStatus().getRunState() != JobStatus.RUNNING ||

job.numReduceTasks == 0) {

continue;

}

//在这里分配了一个新的Reduce任务

Task t =

job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers,

taskTrackerManager.getNumberOfUniqueHosts()

);

.....

首先获取一个作业列表,在里面挑出一个作业给,在比如从里面挑出1个Reduce的任务区给整个TaskTracker执行,因为我们刚刚已经知道,所有的Task都是以TaskInProgress形式被包含于JobInProgress中的,所以又来到了JobInProgress中了

[java] view plaincopyprint?

/**

* Return a ReduceTask, if appropriate, to run on the given tasktracker.

* We don't have cache-sensitivity for reduce tasks, as they

* work on temporary MapRed files.

*/

public synchronized Task obtainNewReduceTask(TaskTrackerStatus tts,

int clusterSize,

int numUniqueHosts

) throws IOException {

.....

int target = findNewReduceTask(tts, clusterSize, numUniqueHosts,

status.reduceProgress());

if (target == -1) {

return null;

}

//这里继续调用方法,获取目标任务

Task result = reduces[target].getTaskToRun(tts.getTrackerName());

if (result != null) {

addRunningTaskToTIP(reduces[target], result.getTaskID(), tts, true);

}

return result;

}

此时就执行了一个TIP就是TaskInProgress里面去执行了,此时的转变就是JIP->TIP的转变。继续往里看,这时候来到的是TaskInProgress的类里面了:

[java] view plaincopyprint?

public Task getTaskToRun(String taskTracker) throws IOException {

if (0 == execStartTime){

// assume task starts running now

execStartTime = jobtracker.getClock().getTime();

}

// Create the 'taskid'; do not count the 'killed' tasks against the job!

TaskAttemptID taskid = null;

if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts + numKilledTasks)) {

// Make sure that the attempts are unqiue across restarts

int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART + nextTaskId;

//启动一次TA尝试

taskid = new TaskAttemptID( id, attemptId);

++nextTaskId;

} else {

LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts) +

" (plus " + numKilledTasks + " killed)" +

" attempts for the tip '" + getTIPId() + "'");

return null;

}

//加入到相应的数据结构中

return addRunningTask(taskid, taskTracker);

}

在这里明显的执行了所谓的TA尝试,就是说这是一次Task的尝试执行,因为不能保证这次任务就一定能执行成功。把这次尝试的任务ID加入系统变量中,就来到了addRunningTask,也就是说来到了方法执行的最末尾:

[java] view plaincopyprint?

/**

* Adds a previously running task to this tip. This is used in case of

* jobtracker restarts.

* 添加任务

*/

public Task addRunningTask(TaskAttemptID taskid,

String taskTracker,

boolean taskCleanup) {

.....

//添加任务和taskTracker的映射关系

activeTasks.put(taskid, taskTracker);

tasks.add(taskid);

// Ask JobTracker to note that the task exists

//在JobTracker中增加一对任务记录

jobtracker.createTaskEntry(taskid, taskTracker, this);

// check and set the first attempt

if (firstTaskId == null) {

firstTaskId = taskid;

}

return t;

}

在这里,就增加了任务和TaskTracker的一些任务运行信息的变量关系。后面就等着TaskTracker自己去把任务挑出来,执行就OK了,上面这个步骤从TIP->TA的转变。我们把这种结构流程叫做“三层多叉树”的方式结构。



整个作业的调度的时序关系图如下:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: