您的位置:首页 > 其它

TaskTracker向JobTracker发送心跳时的问题

2013-07-09 20:49 393 查看
HeartbeatResponse transmitHeartBeat(long now) throws IOException {

    // Send Counters in the status once every COUNTER_UPDATE_INTERVAL

    boolean sendCounters;

    if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {

      sendCounters = true;

      previousUpdate = now;

    }

    else {

      sendCounters = false;

    }

    // 

    // Check if the last heartbeat got through... 

    // if so then build the heartbeat information for the JobTracker;

    // else resend the previous status information.

    //
    if (status == null) {

      synchronized (this) {

        status = new TaskTrackerStatus(taskTrackerName, localHostname, 

                                       httpPort, 

                                       cloneAndResetRunningTaskStatuses(

                                         sendCounters), 

                                       failures, 

                                       maxMapSlots,

                                       maxReduceSlots); 

      }

    } else {

      LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +

               "' with reponseId '" + heartbeatResponseId);

    }

    ......

    //

    // Xmit the heartbeat

    //

    HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, 

                                                              justStarted,

                                                              justInited,

                                                              askForNewTask, 

                                                              heartbeatResponseId);

      

    // 从这里可以看出,jobClient.heartbeat()函数只要返回了,就表示该heartbeat

    // "got through successfully"了,并且后面将status重新赋值为null了。

    // 那为什么前面还存在检测status==null的情形(即status可能不为null)呢?

    // 这是因为jobClient.heartbeat()函数是可以抛出IOException异常的,如果

    // jobClient.heartbeat()抛出了异常(说明心跳响应过程出现了问题),那么

    // 接下来的代码也不会被执行,此时status不为null,transmitHeartBeat()函数

    // 就以抛出IOException异常方式返回了。而在transmitHeartBeat()的上层调用

    // 函数offerService()中,会捕获transmitHeartBeat()函数抛出的异常,即“

    // catch (Exception except)”该子句,在处理完异常后,并没有return返回,

    // 而是进入while (running && !shuttingDown){...}的下一个循环继续执行。

    // 这样,当再次进入到这里的transmitHeartBeat()函数时,status变量就不

    // 是null了。换言之,如果前面在if (status == null)时,变量status不为

    // null,就说明一定是上一次心跳响应过程出现了异常。

    // The heartbeat got through successfully!

    //

    heartbeatResponseId = heartbeatResponse.getResponseId();

      

    synchronized (this) {

      for (TaskStatus taskStatus : status.getTaskReports()) {

        if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&

            taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&

            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&

            !taskStatus.inTaskCleanupPhase()) {

          if (taskStatus.getIsMap()) {

            mapTotal--;

          } else {

            reduceTotal--;

          }

          myInstrumentation.completeTask(taskStatus.getTaskID());

          runningTasks.remove(taskStatus.getTaskID());

        }

      }

      

      // Clear transient status information which should only

      // be sent once to the JobTracker

      for (TaskInProgress tip: runningTasks.values()) {

        tip.getStatus().clearStatus();

      }

    }

    // Force a rebuild of 'status' on the next iteration

    status = null;                                
    return heartbeatResponse;

}

  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: