您的位置:首页 > 运维架构

Hadoop作业提交跟踪

2014-01-02 22:58 302 查看

1 JobClient.runJob

在配置好作业后,我们通过JobClient.runJob(conf)提交作业,从这开始

JobClient jc = new JobClient(job);
RunningJob rj = jc.submitJob(job);


该方法首先实例化一个JobClient,但后调用submitJob(job)

2 submitJob

submitJob调用JobClient的submitJobInternal方法
return submitJobInternal(job);


3 submitJobInternal

下面摘取部分代码:
int maps;
if (job.getUseNewMapper()) {
maps = writeNewSplits(context, submitSplitFile);
} else {
maps = writeOldSplits(job, submitSplitFile);
}
job.set("mapred.job.split.file", submitSplitFile.toString());
job.setNumMapTasks(maps);

以上代码拷贝job.split文件并得到map个数。

FSDataOutputStream out =
FileSystem.create(fs, submitJobFile,
new FsPermission(JOB_FILE_PERMISSION));

try {
job.writeXml(out);
} finally {
out.close();
}

以上代码拷贝job.xml文件

configureCommandLineOptions(job, submitJobDir, submitJarFile);

以上调用拷贝了job.jar文件。

以上工作都由jobClient完成,最后调用JobTracker的方法提交作业

JobID jobId = jobSubmitClient.getNewJobId();

JobStatus status = jobSubmitClient.submitJob(jobId);


submitJobInternal完整方法
public
RunningJob submitJobInternal(JobConf job
) throws FileNotFoundException,
ClassNotFoundException,
InterruptedException,
IOException {
/*
* configure the command line options correctly on the submitting dfs
*/

JobID jobId = jobSubmitClient.getNewJobId();
Path submitJobDir = new Path(getSystemDir(), jobId.toString());
Path submitJarFile = new Path(submitJobDir, "job.jar");
Path submitSplitFile = new Path(submitJobDir, "job.split");
configureCommandLineOptions(job, submitJobDir, submitJarFile);
Path submitJobFile = new Path(submitJobDir, "job.xml");
int reduces = job.getNumReduceTasks();
JobContext context = new JobContext(job, jobId);

// Check the output specification
if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
ReflectionUtils.newInstance(context.getOutputFormatClass(), job);
output.checkOutputSpecs(context);
} else {
job.getOutputFormat().checkOutputSpecs(fs, job);
}

// Create the splits for the job
LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
int maps;
if (job.getUseNewMapper()) {
maps = writeNewSplits(context, submitSplitFile);
} else {
maps = writeOldSplits(job, submitSplitFile);
}
job.set("mapred.job.split.file", submitSplitFile.toString());
job.setNumMapTasks(maps);

// Write job file to JobTracker's fs
FSDataOutputStream out =
FileSystem.create(fs, submitJobFile,
new FsPermission(JOB_FILE_PERMISSION));

try {
job.writeXml(out);
} finally {
out.close();
}

//
// Now, actually submit the job (using the submit name)
//
JobStatus status = jobSubmitClient.submitJob(jobId);
if (status != null) {
return new NetworkedJob(status);
} else {
throw new IOException("Could not launch job");
}
}

注:
JobClient和JobTracker之间的关系:JobClient维护一个JobTracker的引用,JobTracker实现了JobSubmissionProtocol接口
private JobSubmissionProtocol jobSubmitClient;


该实例在JobClient的init方法中初始化
public void init(JobConf conf) throws IOException {
String tracker = conf.get("mapred.job.tracker", "local");
if ("local".equals(tracker)) {
this.jobSubmitClient = new LocalJobRunner(conf);
} else {
this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
}
}




4 JobTracker.submitJob

它的主要代码为
JobInProgress job = new JobInProgress(jobId, this, this.conf);
return addJob(jobId, job);



5 addJob()

/**
* Adds a job to the jobtracker. Make sure that the checks are inplace before
* adding a job. This is the core job submission logic
* @param jobId The id for the job submitted which needs to be added
*/
private synchronized JobStatus addJob(JobID jobId, JobInProgress job) {
totalSubmissions++;

synchronized (jobs) {
synchronized (taskScheduler) {
jobs.put(job.getProfile().getJobID(), job);
for (JobInProgressListener listener : jobInProgressListeners) {
try {
listener.jobAdded(job);
} catch (IOException ioe) {
LOG.warn("Failed to add and so skipping the job : "
+ job.getJobID() + ". Exception : " + ioe);
}
}
}
}
myInstrumentation.submitJob(job.getJobConf(), jobId);
return job.getStatus();
}

其中的listener调用JobTracker的initJob方法初始化job,生成与job.split的个数相应的任务。之后job被添加进job队列等待调度。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: