Hadoop作业提交之客户端作业提交
2017-04-04 22:19
323 查看
一、概要描述
仅仅描述向Hadoop提交作业的第一步,即调用Jobclient的submitJob方法,向Hadoop提交作业。
二、 流程描述
Jobclient使用内置的JobSubmissionProtocol 实例jobSubmitClient 和JobTracker交互,最主要是提交作业、获取作业执行信息等。
在JobClient中作业提交的主要过程如下:
1)通过调用JobTracker的getNewJobId()向jobtracker请求一个新的作业ID
2)获取job的jar、输入分片、作业描述等几个路径信息,以jobId命名。
3)其中getSystemDir()是返回jobtracker的系统目录,来放置job相关的文件。包括:mapreduce的jar文件submitJarFile、分片文件submitSplitFile、作业描述文件submitJobFile
4)检查作业的输出说明,如果没有指定输出目录或输出目录以及存在,则作业不提交。参照org.apache.hadoop.mapreduce.lib.output.FileOutputFormat的checkOutputSpecs方法。如果没有指定,则抛出InvalidJobConfException,文件已经存在则抛出FileAlreadyExistsException
5)计算作业的输入分片。通过InputFormat的getSplits(job)方法获得作业的split并将split序列化封装为RawSplit。返回split数目,也即代表有多个分片有多少个map。详细参见InputFormat获取Split的方法。
6)writeNewSplits 方法把输入分片写到JobTracker的job目录下。
7)将运行作业所需的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中一个以作业ID命名的目录下。
8)使用句柄JobSubmissionProtocol通过RPC远程调用的submitJob()方法,向JobTracker提交作业。JobTracker作业放入到内存队列中,由作业调度器进行调度。并初始化作业实例。JobTracker创建job成功后会给JobClient传回一个JobStatus对象用于记录job的状态信息,如执行时间、Map和Reduce任务完成的比例等。JobClient会根据这个JobStatus对象创建一个 NetworkedJob的RunningJob对象,用于定时从JobTracker获得执行过程的统计数据来监控并打印到用户的控制台。
mapreduce 作业提交和执行
引用下Hadoop:
The Definitive Guide, Second Edition中的一张经典图。这里仅仅描述上图中的左上角第一个框部分内容,即本步骤的最终输出仅仅是将作业提交到JobTracker。其他后续文章会继续描述。
三、代码详细
Jobclient:JobClient是向JobTracker提交作业的接口,可以理解为Hadoop的Mapreduce作业框架向用户开放的作业提交入口。可以提交作业,监视作业状态等
JobSubmissionProtocol(为什么0.20.1的javadoc中找不到这个接口,虽然0.20.1 0.20.2代码中都是相同的用法,知道2.2.0貌似重命名为被ClientProtocol替换):JobClient和JobTracker进行通信的一个协议。JobClient实际上是用这个句柄来提交锁业并且监视作业的执行状况。
这个接口有两个实现:LocalJobRunner(conf)当mapred-site.xml中的mapred.job.tracker值为local是为此对象。表示在单机上执行;如果为一个地址的话则是JobTracker的对象,表示分布式执行。
详细可参照JobClient中 的初始化代码:
InputFormat重要,但暂不展开(此处会有链接)Split重要,但暂不展开(此处会有链接)
RowSplit要,但暂不展开(此处会有链接)
通过代码来了解流程,了解如何调用JobClient向Hadoop集群提交作业。
实际方法的执行是submitJobInternal方法。着重看下这个方法的内部执行。主要的逻辑部分比较详细的进行了注释。(有些想继续展开,感觉太细了,后面的文章中部分重要的会有涉及,不想深度遍历了,到时会回过头来互相链接)
完。
仅仅描述向Hadoop提交作业的第一步,即调用Jobclient的submitJob方法,向Hadoop提交作业。
二、 流程描述
Jobclient使用内置的JobSubmissionProtocol 实例jobSubmitClient 和JobTracker交互,最主要是提交作业、获取作业执行信息等。
在JobClient中作业提交的主要过程如下:
1)通过调用JobTracker的getNewJobId()向jobtracker请求一个新的作业ID
2)获取job的jar、输入分片、作业描述等几个路径信息,以jobId命名。
3)其中getSystemDir()是返回jobtracker的系统目录,来放置job相关的文件。包括:mapreduce的jar文件submitJarFile、分片文件submitSplitFile、作业描述文件submitJobFile
4)检查作业的输出说明,如果没有指定输出目录或输出目录以及存在,则作业不提交。参照org.apache.hadoop.mapreduce.lib.output.FileOutputFormat的checkOutputSpecs方法。如果没有指定,则抛出InvalidJobConfException,文件已经存在则抛出FileAlreadyExistsException
5)计算作业的输入分片。通过InputFormat的getSplits(job)方法获得作业的split并将split序列化封装为RawSplit。返回split数目,也即代表有多个分片有多少个map。详细参见InputFormat获取Split的方法。
6)writeNewSplits 方法把输入分片写到JobTracker的job目录下。
7)将运行作业所需的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中一个以作业ID命名的目录下。
8)使用句柄JobSubmissionProtocol通过RPC远程调用的submitJob()方法,向JobTracker提交作业。JobTracker作业放入到内存队列中,由作业调度器进行调度。并初始化作业实例。JobTracker创建job成功后会给JobClient传回一个JobStatus对象用于记录job的状态信息,如执行时间、Map和Reduce任务完成的比例等。JobClient会根据这个JobStatus对象创建一个 NetworkedJob的RunningJob对象,用于定时从JobTracker获得执行过程的统计数据来监控并打印到用户的控制台。
mapreduce 作业提交和执行
引用下Hadoop:
The Definitive Guide, Second Edition中的一张经典图。这里仅仅描述上图中的左上角第一个框部分内容,即本步骤的最终输出仅仅是将作业提交到JobTracker。其他后续文章会继续描述。
三、代码详细
Jobclient:JobClient是向JobTracker提交作业的接口,可以理解为Hadoop的Mapreduce作业框架向用户开放的作业提交入口。可以提交作业,监视作业状态等
JobSubmissionProtocol(为什么0.20.1的javadoc中找不到这个接口,虽然0.20.1 0.20.2代码中都是相同的用法,知道2.2.0貌似重命名为被ClientProtocol替换):JobClient和JobTracker进行通信的一个协议。JobClient实际上是用这个句柄来提交锁业并且监视作业的执行状况。
这个接口有两个实现:LocalJobRunner(conf)当mapred-site.xml中的mapred.job.tracker值为local是为此对象。表示在单机上执行;如果为一个地址的话则是JobTracker的对象,表示分布式执行。
详细可参照JobClient中 的初始化代码:
1< 20000 div class="crayon-num crayon-striped-num" data-line="crayon-572168432fd7d341078793-2" style="margin: 0px; padding: 0px 5px; border: 0px; outline: 0px; vertical-align: baseline; font-family: inherit; text-align: center; height: 15px; font-size: inherit !important; line-height: inherit !important; font-weight: inherit !important; background: 0px 50%;">23456789101112131415161718192021 | /** *如果是非local的就会 连接到指定的JobTracker */ public void init(JobConf conf) throws IOException { String tracker = conf.get("mapred.job.tracker", "local"); if ("local".equals(tracker)) { this.jobSubmitClient = new LocalJobRunner(conf); } else { this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf); } } /* * RPC不是本次主题重点,可参照后续发表的专题内容 */ private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr, Configuration conf) throws IOException { return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, JobSubmissionProtocol.versionID, addr, getUGI(conf), conf, NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class)); } |
RowSplit要,但暂不展开(此处会有链接)
通过代码来了解流程,了解如何调用JobClient向Hadoop集群提交作业。
1 2 3 4 5 6 7 8 9 10 | publicRunningJobsubmitJob(JobConfjob)throwsFileNotFoundException, IOException{ try{ returnsubmitJobInternal(job); }catch(InterruptedExceptionie){ thrownewIOException("interrupted",ie); }catch(ClassNotFoundExceptioncnfe){ thrownewIOException("class not found",cnfe); } } |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 | 1 public RunningJob submitJobInternal(JobConf job) 2 throws FileNotFoundException, ClassNotFoundException, 3 InterruptedException, IOException { 4 5 // 1)通过调用JobTracker的getNewJobId()向jobtracker请求一个新的作业ID 6 JobID jobId = jobSubmitClient.getNewJobId(); 7 // 2)获取job的jar、输入分片、作业描述等几个路径信息,以jobId命名。 8 // 3)其中getSystemDir()是返回jobtracker的系统目录,来放置job相关的文件。包括:mapreduce的jar文件submitJarFile、分片文件submitSplitFile、作业描述文件submitJobFile 9 10 Path submitJobDir = new Path(getSystemDir(), jobId.toString());11 Path submitJarFile = new Path(submitJobDir, "job.jar");12 Path submitSplitFile = new Path(submitJobDir, "job.split");13 configureCommandLineOptions(job, submitJobDir, submitJarFile);14 Path submitJobFile = new Path(submitJobDir, "job.xml");15 int reduces = job.getNumReduceTasks();16 JobContext context = new JobContext(job, jobId);17 18 // Check the output specification19 // 4)检查作业的输出说明,如果没有指定输出目录或输出目录以及存在,则作业不提交。参照org.apache.hadoop.mapreduce.lib.output.FileOutputFormat的checkOutputSpecs方法。如果没有指定,则抛出InvalidJobConfException,文件已经存在则抛出FileAlreadyExistsException20 21 if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {22 org.apache.hadoop.mapreduce.OutputFormat< > output = ReflectionUtils23 .newInstance(context.getOutputFormatClass(), job);24 output.checkOutputSpecs(context);25 } else {26 job.getOutputFormat().checkOutputSpecs(fs, job);27 }28 29 // 5)计算作业的输入分片。详细参见FormatInputFormat获取Split的方法。30 // 6)writeNewSplits 方法把输入分片写到JobTracker的job目录下,名称是submitSplitFile31 // job.split名称。32 // 7)将运行作业所需的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中一个以作业ID命名的目录下。33 34 // Create the splits for the job35 LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));36 int maps;37 if (job.getUseNewMapper()) {38 maps = writeNewSplits(context, submitSplitFile);39 } else {40 maps = writeOldSplits(job, submitSplitFile);41 }42 job.set("mapred.job.split.file", submitSplitFile.toString());43 job.setNumMapTasks(maps);44 45 // Write job file to JobTracker's fs46 FSDataOutputStream out = FileSystem.create(fs, submitJobFile,47 new FsPermission(JOB_FILE_PERMISSION));48 49 try {50 job.writeXml(out);51 } finally {52 out.close();53 }54 55 // 8)使用句柄JobSubmissionProtocol通过RPC远程调用的submitJob()方法,向JobTracker提交作业。JobTracker根据接收到的submitJob()方法调用后,把调用放入到内存队列中,由作业调度器进行调度。并初始化作业实例。56 57 JobStatus status = jobSubmitClient.submitJob(jobId);58 if (status != null) {59 return new NetworkedJob(status);60 } else {61 throw new IOException("Could not launch job");62 }63 } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | /** * JobTracker.submitJob() kicks off a new job. * * Create a 'JobInProgress' object, which contains both JobProfile * and JobStatus. Those two sub-objects are sometimes shipped outside * of the JobTracker. But JobInProgress adds info that's useful for * the JobTracker alone. */ publicsynchronizedJobStatussubmitJob(JobIDjobId)throwsIOException{ if(jobs.containsKey(jobId)){ //job already running, don't start twice returnjobs.get(jobId).getStatus(); } JobInProgressjob=newJobInProgress(jobId,this,this.conf); Stringqueue=job.getProfile().getQueueName(); if(!(queueManager.getQueues().contains(queue))){ newCleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId)); thrownewIOException("Queue \""+queue+"\" does not exist"); } // check for access try{ checkAccess(job,QueueManager.QueueOperation.SUBMIT_JOB); }catch(IOExceptionioe){ LOG.warn("Access denied for user "+job.getJobConf().getUser() +". Ignoring job "+jobId,ioe); newCleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId)); throwioe; } returnaddJob(jobId,job); } |
相关文章推荐
- Hadoop2.x Yarn作业提交(客户端)
- Hadoop源码解析之YARN客户端作业提交流程
- Hadoop MapReduce之作业提交(客户端)
- 【Hadoop代码笔记】Hadoop作业提交之客户端作业提交
- Hadoop0.21.0源码流程分析(1)-客户端提交作业
- hadoop客户端提交作业错误:java.lang.RuntimeException: java.lang.ClassNotFoundException: Job$Mappe
- Hadoop0.21.0源码流程分析(1)-客户端提交作业
- Hadoop2.x Yarn作业提交(客户端)
- Hadoop2.0 客户端提交作业流程
- Hadoop客户端提交作业时java.lang.ClassNotFoundException
- 【Hadoop代码笔记】Hadoop作业提交之客户端作业提交
- hadoop作业提交源码分析
- Hadoop2.x Yarn作业提交(服务端)
- Hadoop作业提交多种方案 hadoop插件编译
- 【Hadoop代码笔记】Hadoop作业提交之Child启动map任务
- Hadoop作业提交分析(一)
- Hadoop作业提交分析(四)
- Hadoop作业提交分析(一)
- Hadoop集群提交作业问题总结
- Hadoop源码解析之YARN服务端作业提交流程