您的位置:首页 > 运维架构

输出、状态-hadoop源码TaskAttemptID TaskTrackerAction JobTracker,FileOutputCommitter相关-by小雨

2013-04-17 13:00 429 查看
首先声明,我是一个菜鸟。一下文章中出现技术误导情况盖不负责

1,TaskAttemptID代表task attempt,一个task attempt就是一个map/reduce task 的一个例实taskid,而个每TaskAttemptID由两部分构成:TaskID+task序列号

eg:

attempt_200707121733_0003_m_000005_0

代表2007年07月12日17点33分启动的第0003号作业(job)的第00005号map任务的第0号task attempt

2,TaskTrackerAction的类型有,LaunchTaskAction(),KillTaskAction(),KillJobAction(),ReinitTrackerAction(),CommitTaskAction();

/**
* Ennumeration of various 'actions' that the {@link JobTracker}
* directs the {@link TaskTracker} to perform periodically.
*
*/
public static enum ActionType {
/** Launch a new task. */
LAUNCH_TASK,

/** Kill a task. */
KILL_TASK,

/** Kill any tasks of this job and cleanup. */
KILL_JOB,

/** Reinitialize the tasktracker. */
REINIT_TRACKER,

/** Ask a task to save its output. */
COMMIT_TASK
};

/**
* A factory-method to create objects of given {@link ActionType}.
* @param actionType the {@link ActionType} of object to create.
* @return an object of {@link ActionType}.
*/
public static TaskTrackerAction createAction(ActionType actionType) {
TaskTrackerAction action = null;

switch (actionType) {
case LAUNCH_TASK:
{
action = new LaunchTaskAction();
}
break;
case KILL_TASK:
{
action = new KillTaskAction();
}
break;
case KILL_JOB:
{
action = new KillJobAction();
}
break;
case REINIT_TRACKER:
{
action = new ReinitTrackerAction();
}
break;
case COMMIT_TASK:
{
action = new CommitTaskAction();
}
break;
}

return action;
}

3,JobTracker中相干:

offerService()

|--taskScheduler.start();

|--recoveryManager.recover();

通过RecoveryManager的recover()法方恢复Job

|-- this.expireTrackersThread.start();

检测已失效的TaskTracker点节。

// Used to expire TaskTrackers that have gone down

|-- this.retireJobsThread.start();

除清那些已成完很长时光仍然存在队列中的job

|--expireLaunchingTaskThread.start();

停止那些在超时时光内未告报进度的Tasks。

A thread to timeout tasks that have been assigned to task trackers,
* but that haven't reported back yet.

|--completedJobsStoreThread.start();

|--this.interTrackerServer.start();/ start the inter-tracker server once the jt is ready

/**
* Run forever
*/
public void offerService() throws InterruptedException, IOException {
// Prepare for recovery. This is done irrespective of the status of restart
// flag.
while (true) {
try {
recoveryManager.updateRestartCount();
break;
} catch (IOException ioe) {
LOG.warn("Failed to initialize recovery manager. ", ioe);
// wait for some time
Thread.sleep(FS_ACCESS_RETRY_PERIOD);
LOG.warn("Retrying...");
}
}

taskScheduler.start();

// Start the recovery after starting the scheduler
try {
recoveryManager.recover();
} catch (Throwable t) {
LOG.warn("Recovery manager crashed! Ignoring.", t);
}

this.expireTrackersThread = new Thread(this.expireTrackers,
"expireTrackers");
this.expireTrackersThread.start();
this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
this.retireJobsThread.start();
expireLaunchingTaskThread.start();

if (completedJobStatusStore.isActive()) {
completedJobsStoreThread = new Thread(completedJobStatusStore,
"completedjobsStore-housekeeper");
completedJobsStoreThread.start();
}

// start the inter-tracker server once the jt is ready
this.interTrackerServer.start();

synchronized (this) {
state = State.RUNNING;
}

4, Job的五种状态

public static final int RUNNING = 1;
public static final int SUCCEEDED = 2;
public static final int FAILED = 3;
public static final int PREP = 4;

public static final int KILLED = 5;

5,jobStatus信息:

this.jobid = jobid;
this.setupProgress = setupProgress;
this.mapProgress = mapProgress;
this.reduceProgress = reduceProgress;
this.cleanupProgress = cleanupProgress;
this.runState = runState;

6,FileOutputCommitter相干

setupJob:Hadoop初始化时设置job的输出;

commitJob:当job成完时,除清job的输出,这个法方在馈反来回的job状态为SUCCEEDED时用调;

cleanupJob:job结束后除清job的输出;

// do the clean up of temporary directory

abortJob:当job的返回状态是FAILED或KILLED时,行执该数函,用于终止作业的输出;

setupTask:设置task的输出;

// FileOutputCommitter's setupTask doesn't do anything. Because the
// temporary task directory is created on demand when the
// task is writing.

needsTaskCommit:检测task否是须要交提;

commitTask:将task的输出移到作业的输出目录;

// Move the task outputs to their final place

// Delete the temporary task-specific output directory

abortTask:取消task的输出;

文章结束给大家分享下程序员的一些笑话语录:

IBM和波音777

  波音777是有史以来第一架完全在电脑虚拟现实中设计制造的飞机,所用的设备完全由IBM公司所提供。试飞前,波音公司的总裁非常热情的邀请IBM的技术主管去参加试飞,可那位主管却说道:“啊,非常荣幸,可惜那天是我妻子的生日,So..”..

  波音公司的总载一听就生气了:“胆小鬼,我还没告诉你试飞的日期呢!”
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: