您的位置:首页 > 运维架构

hadoop的mapred工作原理---源码分析

2011-08-24 14:18 417 查看
刚接触hadoop,在看权威指南第六章--hadoop的mapred工作原理,下面就结合代码具体来看下:

首先客户端调用  JobClient.runJob(myJob)方法来运行job,代码如下:

  public static RunningJob runJob(JobConf job) throws IOException {

    JobClient jc = new JobClient(job);       (1)初始化一个jobclient

    RunningJob rj = jc.submitJob(job);      (2)提交job

    try {

      if (!jc.monitorAndPrintJob(job, rj)) {        (3)轮询直到job完成

        throw new IOException("Job failed!");

      }

    } catch (InterruptedException ie) {

      Thread.currentThread().interrupt();

    }

    return rj;

  }


先来看(1)初始化一个JobClient

=================================================

  public JobClient(JobConf conf) throws IOException {

    setConf(conf);

    init(conf);

  }

==================================================

  public void init(JobConf conf) throws IOException {

    String tracker = conf.get("mapred.job.tracker", "local");

    if ("local".equals(tracker)) {

      this.jobSubmitClient = new LocalJobRunner(conf);

    } else {

      this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);

    }        

  }

在init中会根据配置文件选择适当的jobSubmitClient,其中如果使远程的话,则通过hadoop的RPC机制得到一个远程jobtracker的proxy对象(动态代理)

====================================================

再来看(2) jc.submitJob(job),submitJob()会调用[b]submitJobInternal
[/b]

RunningJob submitJobInternal(JobConf job

                               ) throws FileNotFoundException,

                                        ClassNotFoundException,

                                        InterruptedException,

                                        IOException {

    /*

     * configure the command line options correctly on the submitting dfs

     */

    

    JobID jobId = jobSubmitClient.getNewJobId();

    Path submitJobDir = new Path(getSystemDir(), jobId.toString());    //[b]JobDir
系统目录tmp/hadoop/mapred/system/jobid

    Path submitJarFile = new Path(submitJobDir, "job.jar");                 //

    Path submitSplitFile = new Path(submitJobDir, "job.split");

    configureCommandLineOptions(job, submitJobDir, submitJarFile);

    Path submitJobFile = new Path(submitJobDir, "job.xml");

    int reduces = job.getNumReduceTasks();                                         //默认为1

    JobContext context = new JobContext(job, jobId);

    

    // Check the output specification

    if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {

      org.apache.hadoop.mapreduce.OutputFormat<?,?> output =

        ReflectionUtils.newInstance(context.getOutputFormatClass(), job);

      output.checkOutputSpecs(context);

    } else {

      job.getOutputFormat().checkOutputSpecs(fs, job);

    }

    // Create the splits for the job

    LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));

    int maps;

    if (job.getUseNewMapper()) {

      maps = writeNewSplits(context, submitSplitFile);

    } else {

      maps = writeOldSplits(job, submitSplitFile);

    }

    job.set("mapred.job.split.file", submitSplitFile.toString());

    job.setNumMapTasks(maps);

        

    // Write job file to JobTracker's fs        

    FSDataOutputStream out =

      FileSystem.create(fs, submitJobFile,

                        new FsPermission(JOB_FILE_PERMISSION));

    try {

      job.writeXml(out);

    } finally {

      out.close();

    }

    //

    // Now, actually submit the job (using the submit name)

    //

    JobStatus status = jobSubmitClient.submitJob(jobId);

    if (status != null) {

      return new NetworkedJob(status);

    } else {

      throw new IOException("Could not launch job");

    }

  }[/b]

在[b]submitJobInternal中主要是:为job分配一个id;复制job的资源到jobtracker的文件系统;调用jobSubmitClient.submitJob(jobId)执行真正的提交工作
[/b]

=======================================================

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息