TaskTracker向JobTracker发送心跳时的问题
2013-07-09 20:49
417 查看
HeartbeatResponse transmitHeartBeat(long now) throws IOException {
// Send Counters in the status once every COUNTER_UPDATE_INTERVAL
boolean sendCounters;
if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
sendCounters = true;
previousUpdate = now;
}
else {
sendCounters = false;
}
//
// Check if the last heartbeat got through...
// if so then build the heartbeat information for the JobTracker;
// else resend the previous status information.
//
if (status == null) {
synchronized (this) {
status = new TaskTrackerStatus(taskTrackerName, localHostname,
httpPort,
cloneAndResetRunningTaskStatuses(
sendCounters),
failures,
maxMapSlots,
maxReduceSlots);
}
} else {
LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
"' with reponseId '" + heartbeatResponseId);
}
......
//
// Xmit the heartbeat
//
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
justStarted,
justInited,
askForNewTask,
heartbeatResponseId);
// 从这里可以看出,jobClient.heartbeat()函数只要返回了,就表示该heartbeat
// "got through successfully"了,并且后面将status重新赋值为null了。
// 那为什么前面还存在检测status==null的情形(即status可能不为null)呢?
// 这是因为jobClient.heartbeat()函数是可以抛出IOException异常的,如果
// jobClient.heartbeat()抛出了异常(说明心跳响应过程出现了问题),那么
// 接下来的代码也不会被执行,此时status不为null,transmitHeartBeat()函数
// 就以抛出IOException异常方式返回了。而在transmitHeartBeat()的上层调用
// 函数offerService()中,会捕获transmitHeartBeat()函数抛出的异常,即“
// catch (Exception except)”该子句,在处理完异常后,并没有return返回,
// 而是进入while (running && !shuttingDown){...}的下一个循环继续执行。
// 这样,当再次进入到这里的transmitHeartBeat()函数时,status变量就不
// 是null了。换言之,如果前面在if (status == null)时,变量status不为
// null,就说明一定是上一次心跳响应过程出现了异常。
// The heartbeat got through successfully!
//
heartbeatResponseId = heartbeatResponse.getResponseId();
synchronized (this) {
for (TaskStatus taskStatus : status.getTaskReports()) {
if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
!taskStatus.inTaskCleanupPhase()) {
if (taskStatus.getIsMap()) {
mapTotal--;
} else {
reduceTotal--;
}
myInstrumentation.completeTask(taskStatus.getTaskID());
runningTasks.remove(taskStatus.getTaskID());
}
}
// Clear transient status information which should only
// be sent once to the JobTracker
for (TaskInProgress tip: runningTasks.values()) {
tip.getStatus().clearStatus();
}
}
// Force a rebuild of 'status' on the next iteration
status = null;
return heartbeatResponse;
}
// Send Counters in the status once every COUNTER_UPDATE_INTERVAL
boolean sendCounters;
if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
sendCounters = true;
previousUpdate = now;
}
else {
sendCounters = false;
}
//
// Check if the last heartbeat got through...
// if so then build the heartbeat information for the JobTracker;
// else resend the previous status information.
//
if (status == null) {
synchronized (this) {
status = new TaskTrackerStatus(taskTrackerName, localHostname,
httpPort,
cloneAndResetRunningTaskStatuses(
sendCounters),
failures,
maxMapSlots,
maxReduceSlots);
}
} else {
LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
"' with reponseId '" + heartbeatResponseId);
}
......
//
// Xmit the heartbeat
//
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
justStarted,
justInited,
askForNewTask,
heartbeatResponseId);
// 从这里可以看出,jobClient.heartbeat()函数只要返回了,就表示该heartbeat
// "got through successfully"了,并且后面将status重新赋值为null了。
// 那为什么前面还存在检测status==null的情形(即status可能不为null)呢?
// 这是因为jobClient.heartbeat()函数是可以抛出IOException异常的,如果
// jobClient.heartbeat()抛出了异常(说明心跳响应过程出现了问题),那么
// 接下来的代码也不会被执行,此时status不为null,transmitHeartBeat()函数
// 就以抛出IOException异常方式返回了。而在transmitHeartBeat()的上层调用
// 函数offerService()中,会捕获transmitHeartBeat()函数抛出的异常,即“
// catch (Exception except)”该子句,在处理完异常后,并没有return返回,
// 而是进入while (running && !shuttingDown){...}的下一个循环继续执行。
// 这样,当再次进入到这里的transmitHeartBeat()函数时,status变量就不
// 是null了。换言之,如果前面在if (status == null)时,变量status不为
// null,就说明一定是上一次心跳响应过程出现了异常。
// The heartbeat got through successfully!
//
heartbeatResponseId = heartbeatResponse.getResponseId();
synchronized (this) {
for (TaskStatus taskStatus : status.getTaskReports()) {
if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
!taskStatus.inTaskCleanupPhase()) {
if (taskStatus.getIsMap()) {
mapTotal--;
} else {
reduceTotal--;
}
myInstrumentation.completeTask(taskStatus.getTaskID());
runningTasks.remove(taskStatus.getTaskID());
}
}
// Clear transient status information which should only
// be sent once to the JobTracker
for (TaskInProgress tip: runningTasks.values()) {
tip.getStatus().clearStatus();
}
}
// Force a rebuild of 'status' on the next iteration
status = null;
return heartbeatResponse;
}
相关文章推荐
- TaskTracker向JobTracker发送心跳时的问题
- TaskTracker向JobTracker发送心跳时的问题
- TaskTracker向JobTracker发送心跳时的问题
- TaskTracker向JobTracker发送心跳时的问题
- TaskTracker向JobTracker发送心跳时的问题
- TaskTracker向JobTracker发送心跳时的问题
- TaskTracker向JobTracker发送心跳时的问题
- TaskTracker向JobTracker发送心跳时的问题
- TaskTracker向JobTracker发送心跳时的问题
- TaskTracker向JobTracker发送心跳时的问题
- TaskTracker向JobTracker发送心跳时的问题
- TaskTracker向JobTracker发送心跳时的问题
- TaskTracker向JobTracker发送心跳时的问题
- TaskTracker向JobTracker发送心跳时的问题
- TaskTracker向JobTracker发送心跳时的问题
- Hadoop集群中JobTracker和TaskTracker启动耗时过多的原因分析
- Hadoop源码分析27 JobTracker空载处理心跳
- hadoop - jobTracker自动停止问题
- JobTracker心跳优化
- JobTracker和TaskTracker详解