Hadoop作业提交跟踪
2014-01-02 22:58
302 查看
1 JobClient.runJob
在配置好作业后,我们通过JobClient.runJob(conf)提交作业,从这开始JobClient jc = new JobClient(job); RunningJob rj = jc.submitJob(job);
该方法首先实例化一个JobClient,但后调用submitJob(job)
2 submitJob
submitJob调用JobClient的submitJobInternal方法return submitJobInternal(job);
3 submitJobInternal
下面摘取部分代码:int maps; if (job.getUseNewMapper()) { maps = writeNewSplits(context, submitSplitFile); } else { maps = writeOldSplits(job, submitSplitFile); } job.set("mapred.job.split.file", submitSplitFile.toString()); job.setNumMapTasks(maps);
以上代码拷贝job.split文件并得到map个数。
FSDataOutputStream out = FileSystem.create(fs, submitJobFile, new FsPermission(JOB_FILE_PERMISSION)); try { job.writeXml(out); } finally { out.close(); }
以上代码拷贝job.xml文件
configureCommandLineOptions(job, submitJobDir, submitJarFile);
以上调用拷贝了job.jar文件。
以上工作都由jobClient完成,最后调用JobTracker的方法提交作业
JobID jobId = jobSubmitClient.getNewJobId();
JobStatus status = jobSubmitClient.submitJob(jobId);
submitJobInternal完整方法
public RunningJob submitJobInternal(JobConf job ) throws FileNotFoundException, ClassNotFoundException, InterruptedException, IOException { /* * configure the command line options correctly on the submitting dfs */ JobID jobId = jobSubmitClient.getNewJobId(); Path submitJobDir = new Path(getSystemDir(), jobId.toString()); Path submitJarFile = new Path(submitJobDir, "job.jar"); Path submitSplitFile = new Path(submitJobDir, "job.split"); configureCommandLineOptions(job, submitJobDir, submitJarFile); Path submitJobFile = new Path(submitJobDir, "job.xml"); int reduces = job.getNumReduceTasks(); JobContext context = new JobContext(job, jobId); // Check the output specification if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) { org.apache.hadoop.mapreduce.OutputFormat<?,?> output = ReflectionUtils.newInstance(context.getOutputFormatClass(), job); output.checkOutputSpecs(context); } else { job.getOutputFormat().checkOutputSpecs(fs, job); } // Create the splits for the job LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile)); int maps; if (job.getUseNewMapper()) { maps = writeNewSplits(context, submitSplitFile); } else { maps = writeOldSplits(job, submitSplitFile); } job.set("mapred.job.split.file", submitSplitFile.toString()); job.setNumMapTasks(maps); // Write job file to JobTracker's fs FSDataOutputStream out = FileSystem.create(fs, submitJobFile, new FsPermission(JOB_FILE_PERMISSION)); try { job.writeXml(out); } finally { out.close(); } // // Now, actually submit the job (using the submit name) // JobStatus status = jobSubmitClient.submitJob(jobId); if (status != null) { return new NetworkedJob(status); } else { throw new IOException("Could not launch job"); } }
注:
JobClient和JobTracker之间的关系:JobClient维护一个JobTracker的引用,JobTracker实现了JobSubmissionProtocol接口
private JobSubmissionProtocol jobSubmitClient;
该实例在JobClient的init方法中初始化
public void init(JobConf conf) throws IOException { String tracker = conf.get("mapred.job.tracker", "local"); if ("local".equals(tracker)) { this.jobSubmitClient = new LocalJobRunner(conf); } else { this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf); } }
4 JobTracker.submitJob
它的主要代码为JobInProgress job = new JobInProgress(jobId, this, this.conf);
return addJob(jobId, job);
5 addJob()
/** * Adds a job to the jobtracker. Make sure that the checks are inplace before * adding a job. This is the core job submission logic * @param jobId The id for the job submitted which needs to be added */ private synchronized JobStatus addJob(JobID jobId, JobInProgress job) { totalSubmissions++; synchronized (jobs) { synchronized (taskScheduler) { jobs.put(job.getProfile().getJobID(), job); for (JobInProgressListener listener : jobInProgressListeners) { try { listener.jobAdded(job); } catch (IOException ioe) { LOG.warn("Failed to add and so skipping the job : " + job.getJobID() + ". Exception : " + ioe); } } } } myInstrumentation.submitJob(job.getJobConf(), jobId); return job.getStatus(); }
其中的listener调用JobTracker的initJob方法初始化job,生成与job.split的个数相应的任务。之后job被添加进job队列等待调度。
相关文章推荐
- Hadoop源码分析--MapReduce作业(job)提交源码跟踪
- hadoop提交作业------>yarn提交job的源码跟踪
- Hadoop-2.7.3源码分析:MapReduce作业提交源码跟踪
- Hadoop作业提交分析(一)
- Hadoop作业提交分析(一)
- Hadoop作业提交之客户端作业提交
- Win7上eclipse无插件提交Hadoop2.2分布式作业
- Hadoop作业提交分析(四)
- Hadoop作业提交分析(三)
- 如何对hadoop作业的某个task进行debug单步跟踪
- Hadoop作业提交分析(二)
- Hadoop作业提交分析(一)
- Hadoop调度源码分析 作业提交到完成初始化部分
- 【Hadoop代码笔记】Hadoop作业提交之JobTracker接收作业提交
- 【Hadoop代码笔记】Hadoop作业提交之TaskTracker获取Task
- Hadoop1.0 Eclipse Plugin-作业提交
- Hadoop作业提交分析(五)
- Hadoop作业提交分析(一)
- Hadoop MapReduce之作业提交(客户端)
- Hadoop MapReduce作业提交与初始化过程分析