您的位置:首页 > 运维架构

Hadoop例子之Sort

2015-08-19 15:24 197 查看
Sort

对hadoop例子Sort进行代码分析学习。



注:本文仅为学习笔记,中间会包含从网络或其他出处获取的资料,文后会标注出处,若有遗漏,麻烦提醒以便修订,敬请原谅

作用

使用mapreduce框架来进行输入的排序

主类

/**
* This is the trivial map/reduce program that does absolutely nothing
* other than use the framework to fragment and sort the input values.
*
* To run: bin/hadoop jar build/hadoop-examples.jar sort
* [-r
<i>reduces</i>]
* [-inFormat
<i>input format class</i>]

* [-outFormat
<i>output format class</i>]

* [-outKey
<i>output key class</i>]

* [-outValue
<i>output value class</i>]

* [-totalOrder
<i>pcnt</i><i>num samples</i><i>max
splits</i>]
*
<i>in-dir</i><i>out-dir</i>
*/
publicclass Sort<K,V>
extends Configured
implements Tool {//1
publicstaticfinal String
REDUCES_PER_HOST =

"mapreduce.sort.reducesperhost";
private Job
job =
null;

staticint printUsage() {
System.out.println("sort [-r <reduces>] " +
"[-inFormat <input format class>] " +
"[-outFormat <output format class>] " +

"[-outKey <output key class>] " +
"[-outValue <output value class>] " +
"[-totalOrder <pcnt> <num samples> <max splits>] " +
"<input> <output>");
ToolRunner.printGenericCommandUsage(System.out);//2
return 2;
}

/**
* The main driver for sort program.
* Invoke this method to submit the map/reduce job.
* @throws IOException When there is communication problems with the

* job tracker.
*/
publicint run(String[]
args)
throws Exception {

Configuration
conf = getConf();
JobClient
client = new JobClient(conf);//3
ClusterStatus
cluster = client.getClusterStatus();//4
intnum_reduces = (int)
(cluster.getMaxReduceTasks() * 0.9);
String
sort_reduces = conf.get(REDUCES_PER_HOST);//5
if (sort_reduces !=
null) {
num_reduces =
cluster.getTaskTrackers() *

Integer.parseInt(sort_reduces); //6
}
Class<? extends
InputFormat> inputFormatClass =

SequenceFileInputFormat.class; //7
Class<? extends
OutputFormat> outputFormatClass =

SequenceFileOutputFormat.class;//8
Class<? extends
WritableComparable> outputKeyClass = BytesWritable.class;//9
Class<? extends Writable>
outputValueClass = BytesWritable.class;//10
List<String>
otherArgs = new ArrayList<String>();
InputSampler.Sampler<K,V>
sampler = null; //11
for(inti=0;
i < args.length; ++i)
{
try {
if ("-r".equals(args[i]))
{
num_reduces = Integer.parseInt(args[++i]);
} elseif ("-inFormat".equals(args[i]))
{
inputFormatClass =

Class.forName(args[++i]).asSubclass(InputFormat.class);
} elseif ("-outFormat".equals(args[i]))
{
outputFormatClass =

Class.forName(args[++i]).asSubclass(OutputFormat.class);
} elseif ("-outKey".equals(args[i]))
{
outputKeyClass =

Class.forName(args[++i]).asSubclass(WritableComparable.class);
} elseif ("-outValue".equals(args[i]))
{
outputValueClass =

Class.forName(args[++i]).asSubclass(Writable.class);
} elseif ("-totalOrder".equals(args[i]))
{
doublepcnt = Double.parseDouble(args[++i]);
intnumSamples = Integer.parseInt(args[++i]);
intmaxSplits = Integer.parseInt(args[++i]);
if (0 >=
maxSplits)
maxSplits = Integer.MAX_VALUE;
sampler =
new InputSampler.RandomSampler<K,V>(pcnt,
numSamples,
maxSplits);//12
} else {
otherArgs.add(args[i]);
}
} catch (NumberFormatException
except) {
System.out.println("ERROR: Integer expected instead of "
+ args[i]);
return
printUsage();
} catch (ArrayIndexOutOfBoundsException
except) {
System.out.println("ERROR: Required parameter missing from "
+
args[i-1]);
return
printUsage(); // exits
}
}
// Set user-supplied (possibly default) job
configs
job = Job.getInstance(conf);
job.setJobName("sorter");
job.setJarByClass(Sort.class);

job.setMapperClass(Mapper.class);

job.setReducerClass(Reducer.class);

job.setNumReduceTasks(num_reduces); //13

job.setInputFormatClass(inputFormatClass);
job.setOutputFormatClass(outputFormatClass);

job.setOutputKeyClass(outputKeyClass);
job.setOutputValueClass(outputValueClass);

// Make sure there are exactly 2 parameters left.
if (otherArgs.size() != 2) {
System.out.println("ERROR: Wrong number of parameters: "
+
otherArgs.size() +
" instead of 2.");
return
printUsage();
}
FileInputFormat.setInputPaths(job,
otherArgs.get(0));
FileOutputFormat.setOutputPath(job,
new Path(otherArgs.get(1)));

if (sampler !=
null) {
System.out.println("Sampling input to effect total-order sort...");
job.setPartitionerClass(TotalOrderPartitioner.class); //14
Path
inputDir = FileInputFormat.getInputPaths(job)[0];
inputDir =
inputDir.makeQualified(inputDir.getFileSystem(conf));//15
Path
partitionFile = new Path(inputDir,
"_sortPartitioning");
TotalOrderPartitioner.setPartitionFile(conf,
partitionFile);//16
InputSampler.<K,V>writePartitionFile(job,
sampler);//17
URI
partitionUri = new URI(partitionFile.toString() +

"#" + "_sortPartitioning");
DistributedCache.addCacheFile(partitionUri,
conf)
;//18
}

System.out.println("Running on " +
cluster.getTaskTrackers() +
" nodes to sort from " +

FileInputFormat.getInputPaths(job)[0] +
" into " +
FileOutputFormat.getOutputPath(job) +
" with " +
num_reduces +
" reduces.");
Date
startTime = new Date();
System.out.println("Job started: " +
startTime);
intret =
job.waitForCompletion(true) ? 0 : 1;
Date
end_time = new Date();
System.out.println("Job ended: " +
end_time);
System.out.println("The job took " +

(end_time.getTime() -
startTime.getTime()) /1000 +
" seconds.");
returnret;
}

publicstaticvoid main(String[]
args)
throws Exception {
intres =
ToolRunner.run(new Configuration(),
new
Sort(), args);
System.exit(res);
}

/**
* Get the last job that was run using this instance.
* @return the results of the last job that was run
*/
public Job getResult() {
returnjob;
}
}

1. extends Configured implementsTool : 该类继承org.apache.hadoop.conf.Configured类,该类保存Configuration对象作为属性,实现org.apache.hadoop.util.Tool接口,该接口定义一个run方法,可以通过ToolRunner帮助类来执行实现了Tool接口的任务。

2. ToolRunner.printGenericCommandUsage(System.out): ToolRunner类:Job任务的运行帮助类,该类可以在执行jar文件时解析hadoop命令行参数。这里打印了命令行参数的使用说明:

publicstaticvoid printGenericCommandUsage(PrintStream
out) {

out.println("Generic options supported are");
out.println("-conf <configuration file>
specify an application configuration file");
out.println("-D <property=value>
use value for given property");
out.println("-fs <local|namenode:port>
specify a namenode");
out.println("-jt <local|resourcemanager:port>
specify a ResourceManager");
out.println("-files <comma separated list of files>
" +
"specify comma separated files to be copied to the map reduce cluster");
out.println("-libjars <comma separated list of jars>
" +
"specify comma separated jar files to include in the classpath.");
out.println("-archives <comma separated list of archives>
" +
"specify comma separated archives to be unarchived" +
" on the compute machines.\n");
out.println("The general command line syntax is");
out.println("bin/hadoop command [genericOptions] [commandOptions]\n");
}

3. JobClient:org.apache.hadoop.mapred.JobClient,该类是用户和集群进行交互的主要接口类,提供过了包括:提交任务,跟踪处理进度,获取任务的报告或日记,获取集群的MapReduce状态信息等方法。这里通过Configuration对象来创建JobClient实例。

4. Org.apache.hadoop.mapred.ClusterStatus类: 该类表示当前集群的信息,这里通过cluster.getMaxReduceTasks() 获取集群支持的最大的reduce任务数量

5. 通过ToolRunner类运行后,将对命令行参数进行解析,并添加到configuration实例中,方便通过configuration获取定义的属性值。

6. 调用cluster.getTaskTrackers()获取集群任务跟踪器的数量。

7. Org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat<K,V>序列文件的输入格式

8. Org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat<K,V>序列文件的输出格式

9. Org.apache.hadoop.io.BytesWritablehadoop的byte类型

10. Org.apache.hadoop.io.BytesWritablehadoop的byte类型

11. InputSampler.Sampler<K,V>sample:

org.apache.hadoop.mapreduce.lib.partition.InputSampler 帮助进行数据分区的采样器。这里的分区指的是Map的结果按照某种规则进行分区,分发给不同的reduce.当数据量比较大,无法判断有效的分区规则是,可以通过采样器对数据进行采样分析后进行分区。

12. org.apache.hadoop.mapreduce.lib.partition.InputSampler-RandomSampler,随机采样。

13. job.setNumReduceTask() 可以设置reduce任务的个数。

14. job.setPartitionerClass(TotalOrderPartitioner.class)设置分区类。

Org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner,该类将mapper根据其key文件的定义划分到不同的reducer里面,这里的key指的是TotalOrderpartitioner使用了partitionfile,该文件里面key的数量=reduce数量-1,并且key之间是排序的,比如(2,4,6,8)这4个key。进行分区后形成5个分区(分区 2 分区4 分区 6 分区 8 分区) 分发的5个reduce,而reduce的输出是排序的,因此采用该分区类就实现了本例子的全输入数据的排序目的。

15. 返回合法路径,生成路径,用于保存TotalOrderPartitioner的key文件

16. TotalOrderPartitioner.setPartitionFile(conf,partitionFile)设置分区文件

17. InputSampler.<K,V>writePartitionFile(job,sampler)通过取样器取样,写入分区文件。

18. DistributedCache.adddCacheFile(partitionUri,conf)将文件添加的分布缓存中,hadoop会将该文件分布缓存到所有该任务工作的map节点中,目前推荐使用Job.addCacheFile(URI uri)方法替换。

总结

本例子主要引入了分区(加入取样器)和排序在hadoop处理流程中的概念,通过使用TotalOrderPartioner类实现了利用mapreduce框架的map-reduce处理流程实现了输入的全排序。

Map-Reduce流程图:

图1 MapReduce处理流程图

引用:

1. MapReduce处理流程图引用自博文:
http://blog.oddfoo.net/2011/04/17/mapreduce-partition%E5%88%86%E6%9E%90-2/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: