您的位置:首页 > 移动开发

HadoopSourceAnalyse --- Mapreduce ApplicationMaster Job FSM

2013-05-17 16:34 501 查看

Overview



图 1-1

JOB_INIT HANDLE

当Job收到JOB_INIT事件之后,Job开始初始自己:



图 2-1
首先,Job要setup相应配置信息,包括,Job submit id, 提交时的目录, 运行job所需的配置文件,安全信息;
String oldJobIDString = job.oldJobId.toString();
String user =
UserGroupInformation.getCurrentUser().getShortUserName();
Path path = MRApps.getStagingAreaDir(job.conf, user);
if(LOG.isDebugEnabled()) {
LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString);
}

job.remoteJobSubmitDir =
FileSystem.get(job.conf).makeQualified(
new Path(path, oldJobIDString));
job.remoteJobConfFile =
new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);

// Prepare the TaskAttemptListener server for authentication of Containers
// TaskAttemptListener gets the information via jobTokenSecretManager.
JobTokenIdentifier identifier =
new JobTokenIdentifier(new Text(oldJobIDString));
job.jobToken =
new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager);
job.jobToken.setService(identifier.getJobId());
// Add it to the jobTokenSecretManager so that TaskAttemptListener server
// can authenticate containers(tasks)
job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken);
LOG.info("Adding job token for " + oldJobIDString
+ " to jobTokenSecretManager");

// If the job client did not setup the shuffle secret then reuse
// the job token secret for the shuffle.
if (TokenCache.getShuffleSecretKey(job.fsTokens) == null) {
LOG.warn("Shuffle secret key missing from job credentials."
+ " Using job token secret as shuffle secret.");
TokenCache.setShuffleSecretKey(job.jobToken.getPassword(),
job.fsTokens);
#default_user        simone
}
setup 成功之后,设置,Job运行的文件系统,并通知History service记录,job已经完成初始化:
job.fs = job.getFileSystem(job.conf);

//log to job history
JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
job.conf.get(MRJobConfig.JOB_NAME, "test"),
job.conf.get(MRJobConfig.USER_NAME, "mapred"),
job.appSubmitTime,
job.remoteJobConfFile.toString(),
job.jobACLs, job.queueName,
job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
getWorkflowAdjacencies(job.conf),
job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
//TODO JH Verify jobACLs, UserName via UGI?
完成之后,开始初化,task相关信息:首先是Split 信息:
allTaskSplitMetaInfo = SplitMetaInfoReader.readSplitMetaInfo(
job.oldJobId, job.fs,
job.conf,
job.remoteJobSubmitDir);


long maxMetaInfoSize = conf.getLong(MRJobConfig.SPLIT_METAINFO_MAXSIZE,
MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE);
Path metaSplitFile = JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir);
String jobSplitFile = JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString();
FileStatus fStatus = fs.getFileStatus(metaSplitFile);
if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) {
throw new IOException("Split metadata size exceeded " +
maxMetaInfoSize +". Aborting job " + jobId);
}
FSDataInputStream in = fs.open(metaSplitFile);
byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length];
in.readFully(header);
if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) {
throw new IOException("Invalid header on split file");
}
int vers = WritableUtils.readVInt(in);
if (vers != JobSplit.META_SPLIT_VERSION) {
in.close();
throw new IOException("Unsupported split version " + vers);
}
int numSplits = WritableUtils.readVInt(in); //TODO: check for insane values
JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo =
new JobSplit.TaskSplitMetaInfo[numSplits];
for (int i = 0; i < numSplits; i++) {
JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
splitMetaInfo.readFields(in);
JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
jobSplitFile,
splitMetaInfo.getStartOffset());
allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex,
splitMetaInfo.getLocations(),
splitMetaInfo.getInputDataLength());
}
in.close();


然后是Map 与 Reduce tas量的配置,Job 运行时context 信息,Job 类型 信息:
job.numMapTasks = taskSplitMetaInfo.length;
job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);

if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
} else if (job.numMapTasks == 0) {
job.reduceWeight = 0.9f;
} else if (job.numReduceTasks == 0) {
job.mapWeight = 0.9f;
} else {
job.mapWeight = job.reduceWeight = 0.45f;
}

checkTaskLimits();

if (job.newApiCommitter) {
job.jobContext = new JobContextImpl(job.conf,
job.oldJobId);
} else {
job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
job.conf, job.oldJobId);
}

long inputLength = 0;
for (int i = 0; i < job.numMapTasks; ++i) {
inputLength += taskSplitMetaInfo[i].getInputDataLength();
}

job.makeUberDecision(inputLength);

job.taskAttemptCompletionEvents =
new ArrayList<TaskAttemptCompletionEvent>(
job.numMapTasks + job.numReduceTasks + 10);
job.mapAttemptCompletionEvents =
new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10);
job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>(
job.numMapTasks + job.numReduceTasks + 10);

job.allowedMapFailuresPercent =
job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
job.allowedReduceFailuresPercent =
job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
最后,创建Map与 ReduceTask:
Map Task:
for (int i=0; i < job.numMapTasks; ++i) {
TaskImpl task =
new MapTaskImpl(job.jobId, i,
job.eventHandler,
job.remoteJobConfFile,
job.conf, splits[i],
job.taskAttemptListener,
job.jobToken, job.fsTokens,
job.clock,
job.applicationAttemptId.getAttemptId(),
job.metrics, job.appContext);
job.addTask(task);
}


Reduce Task:
for (int i = 0; i < job.numReduceTasks; i++) {
TaskImpl task =
new ReduceTaskImpl(job.jobId, i,
job.eventHandler,
job.remoteJobConfFile,
job.conf, job.numMapTasks,
job.taskAttemptListener, job.jobToken,
job.fsTokens, job.clock,
job.applicationAttemptId.getAttemptId(),
job.metrics, job.appContext);
job.addTask(task);
}


至此,job初始化完成,但所有的Task都还没有执行,Job 进入INITED 状态,并等待JOB_START事件;

JOB_START handle

当Job收到START 事件时,Job会很确认当前Job是否是从以前的Job中恢复的,如果是,则重置 StartTime为之前Job的开始时间,否则,设置为当前时间:
if (jse.getRecoveredJobStartTime() != 0) {
job.startTime = jse.getRecoveredJobStartTime();
} else {
job.startTime = job.clock.getTime();
}

然后,Job通知History service记录,Job已经完成初始化,而且状态已经变化——已经设置了启动时间, 最后通知Commiter初始化:
job.eventHandler.handle(new CommitterJobSetupEvent(
job.jobId, job.jobContext));

Job进入SETUP状态,等待JOB_SETUP_COMPLETE事件。事件进入了,commiter状态机;



JOB_SETUP_COMPLETE Handle

收到该事件,表明所有的Job相关的信息,已经初始完成,包括运行时信息,结果输出信息。Job开始启动所有的Task 运行:

job.setupProgress = 1.0f;
job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);
job.scheduleTasks(job.reduceTasks, true);

// If we have no tasks, just transition to job completed
if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
job.eventHandler.handle(new JobEvent(job.jobId,
JobEventType.JOB_COMPLETED));
}
事件进入Task FSM, Job进入running 状态,等待JOB_TASK_ATTEMPT_COMPLETED,JOB_TASK_COMPLETED, JOB_COMPLETED,JOB_TASK_ATTEMPT_FETCH_FAILURE,事件;

JOB_TASK_ATTEMPT_COMPLETED Handle

当收到该事件后,首先设置EventId为 该event在完成的事件列表中的index,并把该event 放入到完成事件列表中,如果该事件是Map task事件,则把该事件,再放入,map完成事件列表中,并保存该事件在map 完成事件,列表中的位置:

// Add the TaskAttemptCompletionEvent
//eventId is equal to index in the arraylist
tce.setEventId(job.taskAttemptCompletionEvents.size());
job.taskAttemptCompletionEvents.add(tce);
int mapEventIdx = -1;
if (TaskType.MAP.equals(tce.getAttemptId().getTaskId().getTaskType())) {
// we track map completions separately from task completions because
// - getMapAttemptCompletionEvents uses index ranges specific to maps
// - type converting the same events over and over is expensive
mapEventIdx = job.mapAttemptCompletionEvents.size();
job.mapAttemptCompletionEvents.add(TypeConverter.fromYarn(tce));
}
job.taskCompletionIdxToMapCompletionIdx.add(mapEventIdx);


然后,判断在成功的完成事件列表是否存在该task 对应的事件,如果有,则置前一个事件为obsolete的:

TaskAttemptId attemptId = tce.getAttemptId();
TaskId taskId = attemptId.getTaskId();
//make the previous completion event as obsolete if it exists
Integer successEventNo =
job.successAttemptCompletionEventNoMap.remove(taskId);
if (successEventNo != null) {
TaskAttemptCompletionEvent successEvent =
job.taskAttemptCompletionEvents.get(successEventNo);
successEvent.setStatus(TaskAttemptCompletionEventStatus.OBSOLETE);
int mapCompletionIdx =
job.taskCompletionIdxToMapCompletionIdx.get(successEventNo);
if (mapCompletionIdx >= 0) {
// update the corresponding TaskCompletionEvent for the map
TaskCompletionEvent mapEvent =
job.mapAttemptCompletionEvents.get(mapCompletionIdx);
job.mapAttemptCompletionEvents.set(mapCompletionIdx,
new TaskCompletionEvent(mapEvent.getEventId(),
mapEvent.getTaskAttemptId(), mapEvent.idWithinJob(),
mapEvent.isMapTask(), TaskCompletionEvent.Status.OBSOLETE,
mapEvent.getTaskTrackerHttp()));
}
}


如果该事件是成功的事件,则保存该事件,及运行该事件的,nodeId:

// if this attempt is not successful then why is the previous successful
// attempt being removed above - MAPREDUCE-4330
if (TaskAttemptCompletionEventStatus.SUCCEEDED.equals(tce.getStatus())) {
job.successAttemptCompletionEventNoMap.put(taskId, tce.getEventId());

// here we could have simply called Task.getSuccessfulAttempt() but
// the event that triggers this code is sent before
// Task.successfulAttempt is set and so there is no guarantee that it
// will be available now
Task task = job.tasks.get(taskId);
TaskAttempt attempt = task.getAttempt(attemptId);
NodeId nodeId = attempt.getNodeId();
assert (nodeId != null); // node must exist for a successful event
List<TaskAttemptId> taskAttemptIdList = job.nodesToSucceededTaskAttempts
.get(nodeId);
if (taskAttemptIdList == null) {
taskAttemptIdList = new ArrayList<TaskAttemptId>();
job.nodesToSucceededTaskAttempts.put(nodeId, taskAttemptIdList);
}
taskAttemptIdList.add(attempt.getID());
}


处理完该事件,之后,Job仍然处于,RUNNING状态;

JOB_TASK_COMPLETED Handle

收到该事件之后,统计该完成事件,然后,取出相应的task对像,如果该事年是成功事件,则统计成功事件:

if (task.getType() == TaskType.MAP) {
job.succeededMapTaskCount++;
} else {
job.succeededReduceTaskCount++;
}
job.metrics.completedTask(task);
如果该事件是失败事件,统计失败事件:

if (task.getType() == TaskType.MAP) {
job.failedMapTaskCount++;
} else if (task.getType() == TaskType.REDUCE) {
job.failedReduceTaskCount++;
}
job.addDiagnostic("Task failed " + task.getID());
job.metrics.failedTask(task);


如果是kill 事件,则统计kill 事件:

if (task.getType() == TaskType.MAP) {
job.killedMapTaskCount++;
} else if (task.getType() == TaskType.REDUCE) {
job.killedReduceTaskCount++;
}
job.metrics.killedTask(task);
然后,判断失败task是否超出系统配置的充许值,若已超出,则停止任务的执行,并通知Committer 放弃该任务:

//check for Job failure
if (job.failedMapTaskCount*100 >
job.allowedMapFailuresPercent*job.numMapTasks ||
job.failedReduceTaskCount*100 >
job.allowedReduceFailuresPercent*job.numReduceTasks) {
job.setFinishTime();

String diagnosticMsg = "Job failed as tasks failed. " +
"failedMaps:" + job.failedMapTaskCount +
" failedReduces:" + job.failedReduceTaskCount;
LOG.info(diagnosticMsg);
job.addDiagnostic(diagnosticMsg);
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
job.jobContext,
org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
return JobStateInternal.FAIL_ABORT;
Job进入FAIL_ABORT状态;

如果没有超出,则检查,是否还有更多task,没有完成,如果有,则Job仍停留在Running状态,否则,进入commiting 状态,准备提交运行结果:

if (completedTaskCount == tasks.size()
&& currentState == JobStateInternal.RUNNING) {
eventHandler.handle(new CommitterJobCommitEvent(jobId, getJobContext()));
return JobStateInternal.COMMITTING;
}
// return the current state as job not ready to commit yet
return getInternalState();

JOB_TASK_COMPLETED Handle

收到该事件,检查是否还有更多task,没有完成,如果有,则Job仍停留在Running状态,否则,进入commiting 状态,准备提交运行结果;

JOB_TASK_ATTEMPT_FETCH_FAILURE Handle

收到该事件后,计算出所有的在SHUFFLE的task数量,然后,从event中取出失败的map Task的数量,计算失败的map task与正在SHUFFLE的task的比例,如果超出系统允许的比例并且失败的map task数量,超过系统允许值,则向task 发送,失败事件:

boolean isMapFaulty =
(failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);
if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS && isMapFaulty) {
LOG.info("Too many fetch-failures for output of task attempt: " +
mapId + " ... raising fetch failure to map");
job.eventHandler.handle(new TaskAttemptEvent(mapId,
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
job.fetchFailuresMapping.remove(mapId);
}


所有的task 执行完成之后,job进入committing状态,等待committer的提交完成通知,之后,Job 退出;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: