(大数据之MapReduce) Hadoop作业提交分析(四)
2014-10-22 18:19
489 查看
前面我们所分析的部分其实只是Hadoop作业提交的前奏曲,真正的作业提交代码是在MR程序的main里,RunJar在最后会动态调用这个main,在(二)里有说明。我们下面要做的就是要比RunJar更进一步,让作业提交能在编码时就可实现,就像Hadoop
Eclipse Plugin那样可以对包含Mapper和Reducer的MR类直接Run on Hadoop。
一般来说,每个MR程序都会有这么一段类似的作业提交代码,这里拿WordCount的举例:
首先要做的是构建一个Configuration对象,并进行参数解析。接着构建提交作业用的Job对象,并设置作业Jar包、对应Mapper和Reducer类、输入输出的Key和Value的类及作业的输入和输出路径,最后就是提交作业并等待作业结束。这些只是比较基本的设置参数,实际还支持更多的设置参数,这里就不一一介绍,详细的可参考API文档。
一般分析代码都从开始一步步分析,但我们的重点是分析提交过程中发生的事,这里我们先不理前面的设置对后面作业的影响,我们直接跳到作业提交那一步进行分析,当碰到问题需要分析前面的代码时我会再分析。
当调用job.waitForCompletion时,其内部调用的是submit方法来提交,如果传入参数为ture则及时打印作业运作信息,否则只是等待作业结束。submit方法进去后,还有一层,里面用到了job对象内部的jobClient对象的submitJobInternal来提交作业,从这个方法才开始做正事。进去第一件事就是获取jobId,用到了jobSubmitClient对象,jobSubmitClient对应的类是JobSubmissionProtocol的实现之一(目前有两个实现,JobTracker和LocalJobRunner),由此可判断出jobSubmitClient对应的类要么是JobTracker,要么是LocalJobRunner。呃,这下有点想法了,作业提交是上到JobTracker去,还是在本地执行?可能就是看这个jobSunmitClient初始化时得到的是哪个类的实例了,我们可以稍稍的先往后看看,你会发现submitJobInternal最后用了jobSubmitClient.submitJob(jobId)来提交作业,再稍稍看看JobTracker和LocalJobRunner的submitJob实现,看来确实是这么回事。好,那我们就先跳回去看看这个jobSubmitClient是如何初始化的。在JobClient的init中我们可以发现jobSubmitClient的初始化语句:
哈,跟conf中的mapred.job.tracker属性有关,如果你没设置,那默认得到的值就是local,jobSubmitClient也就会被赋予LocalJobRunner的实例。平时,我们开发时一般都只是引用lib里面的库,不引用conf文件夹里的配置文件,这里就能解释为什么我们直接Run as Java Application时,作业被提交到Local去运行了,而不是Hadoop Cluster中。那我们把conf文件夹添加到classpath,就能Run on Hadoop了么?目前下结论尚早,我们继续分析(你添加了conf文件夹后,可以提交试一试,会爆出一个很明显的让你知道还差什么的错误,这里我就卖卖官子,先不说)。
jobId获取到后,在SystemDir基础上加jobId构建了提交作业的目录submitJobDir,SystemDir由JobClient的getSystemDir方法得出,这个SystemDir在构建fs对象时很重要,确定了返回的fs的类型。下去的configureCommandLineOptions方法主要是把作业依赖的第三方库或文件上传到fs中,并做classpath映射或Symlink,以及一些参数设置,都是些细微活,这里不仔细分析。我们主要关心里面的两个地方,一个是:
看上去很简单,一句话,就是获取FileSystem的实例,但其实里面绕来绕去,有点头晕。因为Hadoop对文件系统进行了抽象,所以这里获得fs实例的类型决定了你是在hdfs上操作还是在local fs上操作。好了,我们冲进去看看。
看见了吧,fs是由sysDir的getFileSystem返回的。我们再冲,由于篇幅,下面就只列出主要涉及的语句。
又是跟conf有关,看来conf是得实时跟住的。这里用到了Java的反射技术,用来动态生成相应的类实例。其中的class获取与uri.getScheme有密切关系,而uri就是在刚才的sysDir基础上构成,sysDir的值又最终是由jobSubmitClient的实例决定的。如果jobSubmitClient是JobTracker的实例,那Scheme就是hdfs。如果是LocalJobRunner的实例,那就是file。从core-default.xml你可以找到如下的信息:
所以在前面的作业提交代码中,在初始化Job实例时,很多事已经决定了,由conf文件夹中的配置文件决定。Configuration是通过当前线程上下文的类加载器来加载类和资源文件的,所以要想Run on Hadoop,第一步必须要让Conf文件夹进入Configuration的类加载器的搜索路径中,也就是当前线程上下文的类加载器。
第二个要注意的地方是:
因为client在提交作业到Hadoop时需要把作业打包成jar,然后copy到fs的submitJarFile路径中。如果我们想Run on Hadoop,那就必须自己把作业的class文件打个jar包,然后再提交。在Eclipse中,这就比较容易了。这里假设你启用了自动编译功能。我们可以在代码的开始阶段加入一段代码用来打包bin文件夹里的class文件为一个jar包,然后再执行后面的常规操作。
在configureCommandLineOptions方法之后,submitJobInternal会检查输出文件夹是否已存在,如果存在则抛出异常。之后,就开始划分作业数据,并根据split数得到map tasks的数量。最后,就是把作业配置文件写入submitJobFile,并调用jobSubmitClient.submitJob(jobId)最终提交作业。
至此,对Hadoop的作业提交分析也差不多了,有些地方讲的比较啰嗦,有些又讲得点到而止,但大体的过程以及一些较重要的东西还是说清楚了,其实就是那么回事。下去的文章我们会在前面的jobUtil基础上增加一些功能来支持Run on Hadoop,其实主要就是增加一个打包Jar的方法。
To be continued...
Eclipse Plugin那样可以对包含Mapper和Reducer的MR类直接Run on Hadoop。
一般来说,每个MR程序都会有这么一段类似的作业提交代码,这里拿WordCount的举例:
Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1);
首先要做的是构建一个Configuration对象,并进行参数解析。接着构建提交作业用的Job对象,并设置作业Jar包、对应Mapper和Reducer类、输入输出的Key和Value的类及作业的输入和输出路径,最后就是提交作业并等待作业结束。这些只是比较基本的设置参数,实际还支持更多的设置参数,这里就不一一介绍,详细的可参考API文档。
一般分析代码都从开始一步步分析,但我们的重点是分析提交过程中发生的事,这里我们先不理前面的设置对后面作业的影响,我们直接跳到作业提交那一步进行分析,当碰到问题需要分析前面的代码时我会再分析。
当调用job.waitForCompletion时,其内部调用的是submit方法来提交,如果传入参数为ture则及时打印作业运作信息,否则只是等待作业结束。submit方法进去后,还有一层,里面用到了job对象内部的jobClient对象的submitJobInternal来提交作业,从这个方法才开始做正事。进去第一件事就是获取jobId,用到了jobSubmitClient对象,jobSubmitClient对应的类是JobSubmissionProtocol的实现之一(目前有两个实现,JobTracker和LocalJobRunner),由此可判断出jobSubmitClient对应的类要么是JobTracker,要么是LocalJobRunner。呃,这下有点想法了,作业提交是上到JobTracker去,还是在本地执行?可能就是看这个jobSunmitClient初始化时得到的是哪个类的实例了,我们可以稍稍的先往后看看,你会发现submitJobInternal最后用了jobSubmitClient.submitJob(jobId)来提交作业,再稍稍看看JobTracker和LocalJobRunner的submitJob实现,看来确实是这么回事。好,那我们就先跳回去看看这个jobSubmitClient是如何初始化的。在JobClient的init中我们可以发现jobSubmitClient的初始化语句:
String tracker = conf.get("mapred.job.tracker", "local"); if ("local".equals(tracker)) { this.jobSubmitClient = new LocalJobRunner(conf); } else { this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf); }
哈,跟conf中的mapred.job.tracker属性有关,如果你没设置,那默认得到的值就是local,jobSubmitClient也就会被赋予LocalJobRunner的实例。平时,我们开发时一般都只是引用lib里面的库,不引用conf文件夹里的配置文件,这里就能解释为什么我们直接Run as Java Application时,作业被提交到Local去运行了,而不是Hadoop Cluster中。那我们把conf文件夹添加到classpath,就能Run on Hadoop了么?目前下结论尚早,我们继续分析(你添加了conf文件夹后,可以提交试一试,会爆出一个很明显的让你知道还差什么的错误,这里我就卖卖官子,先不说)。
jobId获取到后,在SystemDir基础上加jobId构建了提交作业的目录submitJobDir,SystemDir由JobClient的getSystemDir方法得出,这个SystemDir在构建fs对象时很重要,确定了返回的fs的类型。下去的configureCommandLineOptions方法主要是把作业依赖的第三方库或文件上传到fs中,并做classpath映射或Symlink,以及一些参数设置,都是些细微活,这里不仔细分析。我们主要关心里面的两个地方,一个是:
FileSystem fs = getFs();
看上去很简单,一句话,就是获取FileSystem的实例,但其实里面绕来绕去,有点头晕。因为Hadoop对文件系统进行了抽象,所以这里获得fs实例的类型决定了你是在hdfs上操作还是在local fs上操作。好了,我们冲进去看看。
public synchronized FileSystem getFs() throws IOException { if (this.fs == null) { Path sysDir = getSystemDir(); this.fs = sysDir.getFileSystem(getConf()); } return fs; }
看见了吧,fs是由sysDir的getFileSystem返回的。我们再冲,由于篇幅,下面就只列出主要涉及的语句。
FileSystem.get(this.toUri(), conf); ↓ CACHE.get(uri, conf); ↓ fs = createFileSystem(uri, conf); ↓ Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null); if (clazz == null) { throw new IOException("No FileSystem for scheme: " + uri.getScheme()); } FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf); fs.initialize(uri, conf); return fs;
又是跟conf有关,看来conf是得实时跟住的。这里用到了Java的反射技术,用来动态生成相应的类实例。其中的class获取与uri.getScheme有密切关系,而uri就是在刚才的sysDir基础上构成,sysDir的值又最终是由jobSubmitClient的实例决定的。如果jobSubmitClient是JobTracker的实例,那Scheme就是hdfs。如果是LocalJobRunner的实例,那就是file。从core-default.xml你可以找到如下的信息:
<property> <name>fs.file.impl</name> <value>org.apache.hadoop.fs.LocalFileSystem</value> <description>The FileSystem for file: uris.</description> </property> <property> <name>fs.hdfs.impl</name> <value>org.apache.hadoop.hdfs.DistributedFileSystem</value> <description>The FileSystem for hdfs: uris.</description> </property>
所以在前面的作业提交代码中,在初始化Job实例时,很多事已经决定了,由conf文件夹中的配置文件决定。Configuration是通过当前线程上下文的类加载器来加载类和资源文件的,所以要想Run on Hadoop,第一步必须要让Conf文件夹进入Configuration的类加载器的搜索路径中,也就是当前线程上下文的类加载器。
第二个要注意的地方是:
String originalJarPath = job.getJar(); if (originalJarPath != null) { // copy jar to JobTracker's fs // use jar name if job is not named. if ("".equals(job.getJobName())){ job.setJobName(new Path(originalJarPath).getName()); } job.setJar(submitJarFile.toString()); fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile); fs.setReplication(submitJarFile, replication); fs.setPermission(submitJarFile, new FsPermission(JOB_FILE_PERMISSION)); } else { LOG.warn("No job jar file set. User classes may not be found. "+ "See JobConf(Class) or JobConf#setJar(String)."); }
因为client在提交作业到Hadoop时需要把作业打包成jar,然后copy到fs的submitJarFile路径中。如果我们想Run on Hadoop,那就必须自己把作业的class文件打个jar包,然后再提交。在Eclipse中,这就比较容易了。这里假设你启用了自动编译功能。我们可以在代码的开始阶段加入一段代码用来打包bin文件夹里的class文件为一个jar包,然后再执行后面的常规操作。
在configureCommandLineOptions方法之后,submitJobInternal会检查输出文件夹是否已存在,如果存在则抛出异常。之后,就开始划分作业数据,并根据split数得到map tasks的数量。最后,就是把作业配置文件写入submitJobFile,并调用jobSubmitClient.submitJob(jobId)最终提交作业。
至此,对Hadoop的作业提交分析也差不多了,有些地方讲的比较啰嗦,有些又讲得点到而止,但大体的过程以及一些较重要的东西还是说清楚了,其实就是那么回事。下去的文章我们会在前面的jobUtil基础上增加一些功能来支持Run on Hadoop,其实主要就是增加一个打包Jar的方法。
To be continued...
相关文章推荐
- Hadoop-2.7.3源码分析:MapReduce作业提交源码跟踪
- Hadoop源码分析--MapReduce作业(job)提交源码跟踪
- Hadoop MapReduce作业提交与初始化过程分析
- hadoop作业提交和MapReduce基础分析
- Hadoop作业提交分析(一)
- MapReduce作业提交源码分析
- Hadoop作业提交分析(四)
- Hadoop作业提交分析(五)
- Hadoop作业提交分析(二)
- hadoop mapreduce作业提交流程
- Hadoop作业提交分析(二)
- (转)Hadoop作业提交分析
- Hadoop作业提交分析(三)
- Hadoop0.21.0源码流程分析(1)-客户端提交作业
- Hadoop0.21.0源码流程分析(1)-客户端提交作业
- mapreduce源码分析作业提交、初始化、分配、计算过程之初始化篇
- Hadoop作业提交分析(二)
- Hadoop作业提交分析(一)
- Hadoop作业提交分析(三)
- Hadoop MapReduce之作业提交(服务端)