Hadoop例子之Sort
2015-08-19 15:24
197 查看
Sort
对hadoop例子Sort进行代码分析学习。
注:本文仅为学习笔记,中间会包含从网络或其他出处获取的资料,文后会标注出处,若有遗漏,麻烦提醒以便修订,敬请原谅
1. extends Configured implementsTool : 该类继承org.apache.hadoop.conf.Configured类,该类保存Configuration对象作为属性,实现org.apache.hadoop.util.Tool接口,该接口定义一个run方法,可以通过ToolRunner帮助类来执行实现了Tool接口的任务。
2. ToolRunner.printGenericCommandUsage(System.out): ToolRunner类:Job任务的运行帮助类,该类可以在执行jar文件时解析hadoop命令行参数。这里打印了命令行参数的使用说明:
3. JobClient:org.apache.hadoop.mapred.JobClient,该类是用户和集群进行交互的主要接口类,提供过了包括:提交任务,跟踪处理进度,获取任务的报告或日记,获取集群的MapReduce状态信息等方法。这里通过Configuration对象来创建JobClient实例。
4. Org.apache.hadoop.mapred.ClusterStatus类: 该类表示当前集群的信息,这里通过cluster.getMaxReduceTasks() 获取集群支持的最大的reduce任务数量
5. 通过ToolRunner类运行后,将对命令行参数进行解析,并添加到configuration实例中,方便通过configuration获取定义的属性值。
6. 调用cluster.getTaskTrackers()获取集群任务跟踪器的数量。
7. Org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat<K,V>序列文件的输入格式
8. Org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat<K,V>序列文件的输出格式
9. Org.apache.hadoop.io.BytesWritablehadoop的byte类型
10. Org.apache.hadoop.io.BytesWritablehadoop的byte类型
11. InputSampler.Sampler<K,V>sample:
org.apache.hadoop.mapreduce.lib.partition.InputSampler 帮助进行数据分区的采样器。这里的分区指的是Map的结果按照某种规则进行分区,分发给不同的reduce.当数据量比较大,无法判断有效的分区规则是,可以通过采样器对数据进行采样分析后进行分区。
12. org.apache.hadoop.mapreduce.lib.partition.InputSampler-RandomSampler,随机采样。
13. job.setNumReduceTask() 可以设置reduce任务的个数。
14. job.setPartitionerClass(TotalOrderPartitioner.class)设置分区类。
Org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner,该类将mapper根据其key文件的定义划分到不同的reducer里面,这里的key指的是TotalOrderpartitioner使用了partitionfile,该文件里面key的数量=reduce数量-1,并且key之间是排序的,比如(2,4,6,8)这4个key。进行分区后形成5个分区(分区 2 分区4 分区 6 分区 8 分区) 分发的5个reduce,而reduce的输出是排序的,因此采用该分区类就实现了本例子的全输入数据的排序目的。
15. 返回合法路径,生成路径,用于保存TotalOrderPartitioner的key文件
16. TotalOrderPartitioner.setPartitionFile(conf,partitionFile)设置分区文件
17. InputSampler.<K,V>writePartitionFile(job,sampler)通过取样器取样,写入分区文件。
18. DistributedCache.adddCacheFile(partitionUri,conf)将文件添加的分布缓存中,hadoop会将该文件分布缓存到所有该任务工作的map节点中,目前推荐使用Job.addCacheFile(URI uri)方法替换。
总结
本例子主要引入了分区(加入取样器)和排序在hadoop处理流程中的概念,通过使用TotalOrderPartioner类实现了利用mapreduce框架的map-reduce处理流程实现了输入的全排序。
Map-Reduce流程图:
图1 MapReduce处理流程图
http://blog.oddfoo.net/2011/04/17/mapreduce-partition%E5%88%86%E6%9E%90-2/
对hadoop例子Sort进行代码分析学习。
注:本文仅为学习笔记,中间会包含从网络或其他出处获取的资料,文后会标注出处,若有遗漏,麻烦提醒以便修订,敬请原谅
作用
使用mapreduce框架来进行输入的排序主类
/** * This is the trivial map/reduce program that does absolutely nothing * other than use the framework to fragment and sort the input values. * * To run: bin/hadoop jar build/hadoop-examples.jar sort * [-r <i>reduces</i>] * [-inFormat <i>input format class</i>] * [-outFormat <i>output format class</i>] * [-outKey <i>output key class</i>] * [-outValue <i>output value class</i>] * [-totalOrder <i>pcnt</i><i>num samples</i><i>max splits</i>] * <i>in-dir</i><i>out-dir</i> */ publicclass Sort<K,V> extends Configured implements Tool {//1 publicstaticfinal String REDUCES_PER_HOST = "mapreduce.sort.reducesperhost"; private Job job = null; staticint printUsage() { System.out.println("sort [-r <reduces>] " + "[-inFormat <input format class>] " + "[-outFormat <output format class>] " + "[-outKey <output key class>] " + "[-outValue <output value class>] " + "[-totalOrder <pcnt> <num samples> <max splits>] " + "<input> <output>"); ToolRunner.printGenericCommandUsage(System.out);//2 return 2; } /** * The main driver for sort program. * Invoke this method to submit the map/reduce job. * @throws IOException When there is communication problems with the * job tracker. */ publicint run(String[] args) throws Exception { Configuration conf = getConf(); JobClient client = new JobClient(conf);//3 ClusterStatus cluster = client.getClusterStatus();//4 intnum_reduces = (int) (cluster.getMaxReduceTasks() * 0.9); String sort_reduces = conf.get(REDUCES_PER_HOST);//5 if (sort_reduces != null) { num_reduces = cluster.getTaskTrackers() * Integer.parseInt(sort_reduces); //6 } Class<? extends InputFormat> inputFormatClass = SequenceFileInputFormat.class; //7 Class<? extends OutputFormat> outputFormatClass = SequenceFileOutputFormat.class;//8 Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;//9 Class<? extends Writable> outputValueClass = BytesWritable.class;//10 List<String> otherArgs = new ArrayList<String>(); InputSampler.Sampler<K,V> sampler = null; //11 for(inti=0; i < args.length; ++i) { try { if ("-r".equals(args[i])) { num_reduces = Integer.parseInt(args[++i]); } elseif ("-inFormat".equals(args[i])) { inputFormatClass = Class.forName(args[++i]).asSubclass(InputFormat.class); } elseif ("-outFormat".equals(args[i])) { outputFormatClass = Class.forName(args[++i]).asSubclass(OutputFormat.class); } elseif ("-outKey".equals(args[i])) { outputKeyClass = Class.forName(args[++i]).asSubclass(WritableComparable.class); } elseif ("-outValue".equals(args[i])) { outputValueClass = Class.forName(args[++i]).asSubclass(Writable.class); } elseif ("-totalOrder".equals(args[i])) { doublepcnt = Double.parseDouble(args[++i]); intnumSamples = Integer.parseInt(args[++i]); intmaxSplits = Integer.parseInt(args[++i]); if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE; sampler = new InputSampler.RandomSampler<K,V>(pcnt, numSamples, maxSplits);//12 } else { otherArgs.add(args[i]); } } catch (NumberFormatException except) { System.out.println("ERROR: Integer expected instead of " + args[i]); return printUsage(); } catch (ArrayIndexOutOfBoundsException except) { System.out.println("ERROR: Required parameter missing from " + args[i-1]); return printUsage(); // exits } } // Set user-supplied (possibly default) job configs job = Job.getInstance(conf); job.setJobName("sorter"); job.setJarByClass(Sort.class); job.setMapperClass(Mapper.class); job.setReducerClass(Reducer.class); job.setNumReduceTasks(num_reduces); //13 job.setInputFormatClass(inputFormatClass); job.setOutputFormatClass(outputFormatClass); job.setOutputKeyClass(outputKeyClass); job.setOutputValueClass(outputValueClass); // Make sure there are exactly 2 parameters left. if (otherArgs.size() != 2) { System.out.println("ERROR: Wrong number of parameters: " + otherArgs.size() + " instead of 2."); return printUsage(); } FileInputFormat.setInputPaths(job, otherArgs.get(0)); FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1))); if (sampler != null) { System.out.println("Sampling input to effect total-order sort..."); job.setPartitionerClass(TotalOrderPartitioner.class); //14 Path inputDir = FileInputFormat.getInputPaths(job)[0]; inputDir = inputDir.makeQualified(inputDir.getFileSystem(conf));//15 Path partitionFile = new Path(inputDir, "_sortPartitioning"); TotalOrderPartitioner.setPartitionFile(conf, partitionFile);//16 InputSampler.<K,V>writePartitionFile(job, sampler);//17 URI partitionUri = new URI(partitionFile.toString() + "#" + "_sortPartitioning"); DistributedCache.addCacheFile(partitionUri, conf);//18 } System.out.println("Running on " + cluster.getTaskTrackers() + " nodes to sort from " + FileInputFormat.getInputPaths(job)[0] + " into " + FileOutputFormat.getOutputPath(job) + " with " + num_reduces + " reduces."); Date startTime = new Date(); System.out.println("Job started: " + startTime); intret = job.waitForCompletion(true) ? 0 : 1; Date end_time = new Date(); System.out.println("Job ended: " + end_time); System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds."); returnret; } publicstaticvoid main(String[] args) throws Exception { intres = ToolRunner.run(new Configuration(), new Sort(), args); System.exit(res); } /** * Get the last job that was run using this instance. * @return the results of the last job that was run */ public Job getResult() { returnjob; } } |
2. ToolRunner.printGenericCommandUsage(System.out): ToolRunner类:Job任务的运行帮助类,该类可以在执行jar文件时解析hadoop命令行参数。这里打印了命令行参数的使用说明:
publicstaticvoid printGenericCommandUsage(PrintStream out) { out.println("Generic options supported are"); out.println("-conf <configuration file> specify an application configuration file"); out.println("-D <property=value> use value for given property"); out.println("-fs <local|namenode:port> specify a namenode"); out.println("-jt <local|resourcemanager:port> specify a ResourceManager"); out.println("-files <comma separated list of files> " + "specify comma separated files to be copied to the map reduce cluster"); out.println("-libjars <comma separated list of jars> " + "specify comma separated jar files to include in the classpath."); out.println("-archives <comma separated list of archives> " + "specify comma separated archives to be unarchived" + " on the compute machines.\n"); out.println("The general command line syntax is"); out.println("bin/hadoop command [genericOptions] [commandOptions]\n"); } |
4. Org.apache.hadoop.mapred.ClusterStatus类: 该类表示当前集群的信息,这里通过cluster.getMaxReduceTasks() 获取集群支持的最大的reduce任务数量
5. 通过ToolRunner类运行后,将对命令行参数进行解析,并添加到configuration实例中,方便通过configuration获取定义的属性值。
6. 调用cluster.getTaskTrackers()获取集群任务跟踪器的数量。
7. Org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat<K,V>序列文件的输入格式
8. Org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat<K,V>序列文件的输出格式
9. Org.apache.hadoop.io.BytesWritablehadoop的byte类型
10. Org.apache.hadoop.io.BytesWritablehadoop的byte类型
11. InputSampler.Sampler<K,V>sample:
org.apache.hadoop.mapreduce.lib.partition.InputSampler 帮助进行数据分区的采样器。这里的分区指的是Map的结果按照某种规则进行分区,分发给不同的reduce.当数据量比较大,无法判断有效的分区规则是,可以通过采样器对数据进行采样分析后进行分区。
12. org.apache.hadoop.mapreduce.lib.partition.InputSampler-RandomSampler,随机采样。
13. job.setNumReduceTask() 可以设置reduce任务的个数。
14. job.setPartitionerClass(TotalOrderPartitioner.class)设置分区类。
Org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner,该类将mapper根据其key文件的定义划分到不同的reducer里面,这里的key指的是TotalOrderpartitioner使用了partitionfile,该文件里面key的数量=reduce数量-1,并且key之间是排序的,比如(2,4,6,8)这4个key。进行分区后形成5个分区(分区 2 分区4 分区 6 分区 8 分区) 分发的5个reduce,而reduce的输出是排序的,因此采用该分区类就实现了本例子的全输入数据的排序目的。
15. 返回合法路径,生成路径,用于保存TotalOrderPartitioner的key文件
16. TotalOrderPartitioner.setPartitionFile(conf,partitionFile)设置分区文件
17. InputSampler.<K,V>writePartitionFile(job,sampler)通过取样器取样,写入分区文件。
18. DistributedCache.adddCacheFile(partitionUri,conf)将文件添加的分布缓存中,hadoop会将该文件分布缓存到所有该任务工作的map节点中,目前推荐使用Job.addCacheFile(URI uri)方法替换。
总结
本例子主要引入了分区(加入取样器)和排序在hadoop处理流程中的概念,通过使用TotalOrderPartioner类实现了利用mapreduce框架的map-reduce处理流程实现了输入的全排序。
Map-Reduce流程图:
图1 MapReduce处理流程图
引用:
1. MapReduce处理流程图引用自博文:http://blog.oddfoo.net/2011/04/17/mapreduce-partition%E5%88%86%E6%9E%90-2/
相关文章推荐
- 怎样理解Linux的文件系统
- 平滑升级openssh版本方法
- 2-7-RHEL6.5搭建DNS服务器案例分析与总结(Red Hat Enterprise Linux Server6.5)@
- Linux中_ALIGN宏背后的原理——内存对齐
- Linux 静态IP地址设置
- 微软CTO和Docker CTO上周“在一起”,究竟谈了什么?
- 从西直门立交桥谈IT架构与重构(干货)
- 从西直门立交桥谈IT架构与重构(干货)
- centos6.6 64位下安装nfs文件共享系统
- linux相关笔记
- 我心中的核心组件(可插拔的AOP)~分布式文件上传组件~基于FastDFS
- HDU 2094 产生冠军(拓扑排序)
- BPM那些事儿——MBP技术架构
- ECSHOP模板制作教程,ECSHOP标签大全,ECSHOP模板标签
- linux下常用命令及功能
- 开发笔记-Linux-Apache-PHP-CI
- PopupWindow
- 使用opencv播放视频
- Linux Tomcat 6.0安装配置实践总结
- Centos 内存占满 释放内存