Hadoop中Partition解析
2017-05-17 23:28
316 查看
原文:
http://www.yeolar.com/note/2013/11/19/hadoop-partition/
Map的结果,会通过partition分发到 Reducer 上, Reducer 做完Reduce操作后,通过 OutputFormat ,进行输出,下面我们就来分析参与这个过程的类。
Mapper 的结果,可能送到 Combiner 做合并, Combiner 在系统中并没有自己的基类,而是用 Reducer 作为 Combiner的基类,他们对外的功能是一样的,只是使用的位置和使用时的上下文不太一样而已。 Mapper 最终处理的键值对<key,
value>,是需要送到 Reducer 去合并的,合并的时候,有相同key的键/值对会送到同一个 Reducer 那。哪个key到哪个 Reducer 的分配过程,是由 Partitioner 规定的。它只有一个方法,
输入是Map的结果对<key, value>和 Reducer 的数目,输出则是分配的 Reducer (整数编号)。就是指定 Mapper 输出的键值对到哪一个 Reducer 上去。系统缺省的 Partitioner 是 HashPartitioner ,它以key的Hash值对 Reducer 的数目取模,得到对应的 Reducer 。这样保证如果有相同的key值,肯定被分配到同一个 Reducer 上。如果有N个 Reducer,编号就为0,1,2,3……(N-1)。
Reducer 是所有用户定制 Reducer 类的基类,和 Mapper 类似,它也有 setup , reduce , cleanup 和 run 方法,其中 setup 和 cleanup 含义和 Mapper 相同, reduce 是真正合并 Mapper 结果的地方,它的输入是key和这个key对应的所有value的一个迭代器,同时还包括 Reducer 的上下文。系统中定义了两个非常简单的 Reducer , IntSumReducer 和 LongSumReducer ,分别用于对整形/长整型的value求和。
Reduce的结果,通过 Reducer.Context 的方法 collect 输出到文件中,和输入类似,Hadoop引入了 OutputFormat。 OutputFormat 依赖两个辅助接口: RecordWriter 和 OutputCommitter 来处理输出。 RecordWriter 提供了 write方法,用于输出<key,
value>,和 close 方法,用于关闭对应的输出。 OutputCommitter 提供了一系列方法,用户通过实现这些方法,可以定制 OutputFormat 生存期某些阶段需要的特殊操作。我们在 TaskInputOutputContext 中讨论过这些方法(明显, TaskInputOutputContext 是 OutputFormat 和 Reducer 间的桥梁)。 OutputFormat 和 RecordWriter 分别对应着 InputFormat 和 RecordReader ,系统提供了空输出 NullOutputFormat (什么结果都不输出, NullOutputFormat.RecordWriter 只是示例,系统中没有定义), LazyOutputFormat (没在类图中出现,不分析), FilterOutputFormat (不分析)和基于文件 FileOutputFormat 的 SequenceFileOutputFormat 和 TextOutputFormat 输出。
基于文件的输出 FileOutputFormat 利用了一些配置项配合工作,包括:
mapred.output.compress :是否压缩;
mapred.output.compression.codec :压缩方法;
mapred.output.dir :输出路径;
mapred.work.output.dir :输出工作路径。
FileOutputFormat 还依赖于 FileOutputCommitter ,通过 FileOutputCommitter 提供一些和 Job , Task 相关的临时文件管理功能。如 FileOutputCommitter 的 setupJob ,会在输出路径下创建一个名为_temporary的临时目录, cleanupJob 则会删除这个目录。
SequenceFileOutputFormat 输出和 TextOutputFormat 输出分别对应输入的 SequenceFileInputFormat 和 TextInputFormat 。
http://www.cnblogs.com/xwdreamer/archive/2011/10/27/2296943.html
http://www.yeolar.com/note/2013/11/19/hadoop-partition/
http://www.yeolar.com/note/2013/11/19/hadoop-partition/
Map的结果,会通过partition分发到 Reducer 上, Reducer 做完Reduce操作后,通过 OutputFormat ,进行输出,下面我们就来分析参与这个过程的类。
Mapper 的结果,可能送到 Combiner 做合并, Combiner 在系统中并没有自己的基类,而是用 Reducer 作为 Combiner的基类,他们对外的功能是一样的,只是使用的位置和使用时的上下文不太一样而已。 Mapper 最终处理的键值对<key,
value>,是需要送到 Reducer 去合并的,合并的时候,有相同key的键/值对会送到同一个 Reducer 那。哪个key到哪个 Reducer 的分配过程,是由 Partitioner 规定的。它只有一个方法,
getPartition(Text key, Text value, int numPartitions)
输入是Map的结果对<key, value>和 Reducer 的数目,输出则是分配的 Reducer (整数编号)。就是指定 Mapper 输出的键值对到哪一个 Reducer 上去。系统缺省的 Partitioner 是 HashPartitioner ,它以key的Hash值对 Reducer 的数目取模,得到对应的 Reducer 。这样保证如果有相同的key值,肯定被分配到同一个 Reducer 上。如果有N个 Reducer,编号就为0,1,2,3……(N-1)。
Reducer 是所有用户定制 Reducer 类的基类,和 Mapper 类似,它也有 setup , reduce , cleanup 和 run 方法,其中 setup 和 cleanup 含义和 Mapper 相同, reduce 是真正合并 Mapper 结果的地方,它的输入是key和这个key对应的所有value的一个迭代器,同时还包括 Reducer 的上下文。系统中定义了两个非常简单的 Reducer , IntSumReducer 和 LongSumReducer ,分别用于对整形/长整型的value求和。
Reduce的结果,通过 Reducer.Context 的方法 collect 输出到文件中,和输入类似,Hadoop引入了 OutputFormat。 OutputFormat 依赖两个辅助接口: RecordWriter 和 OutputCommitter 来处理输出。 RecordWriter 提供了 write方法,用于输出<key,
value>,和 close 方法,用于关闭对应的输出。 OutputCommitter 提供了一系列方法,用户通过实现这些方法,可以定制 OutputFormat 生存期某些阶段需要的特殊操作。我们在 TaskInputOutputContext 中讨论过这些方法(明显, TaskInputOutputContext 是 OutputFormat 和 Reducer 间的桥梁)。 OutputFormat 和 RecordWriter 分别对应着 InputFormat 和 RecordReader ,系统提供了空输出 NullOutputFormat (什么结果都不输出, NullOutputFormat.RecordWriter 只是示例,系统中没有定义), LazyOutputFormat (没在类图中出现,不分析), FilterOutputFormat (不分析)和基于文件 FileOutputFormat 的 SequenceFileOutputFormat 和 TextOutputFormat 输出。
基于文件的输出 FileOutputFormat 利用了一些配置项配合工作,包括:
mapred.output.compress :是否压缩;
mapred.output.compression.codec :压缩方法;
mapred.output.dir :输出路径;
mapred.work.output.dir :输出工作路径。
FileOutputFormat 还依赖于 FileOutputCommitter ,通过 FileOutputCommitter 提供一些和 Job , Task 相关的临时文件管理功能。如 FileOutputCommitter 的 setupJob ,会在输出路径下创建一个名为_temporary的临时目录, cleanupJob 则会删除这个目录。
SequenceFileOutputFormat 输出和 TextOutputFormat 输出分别对应输入的 SequenceFileInputFormat 和 TextInputFormat 。
package org.apache.hadoop.examples; import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; /** * 输入文本,以tab间隔 * kaka 1 28 * hua 0 26 * chao 1 * tao 1 22 * mao 0 29 22 * */ //Partitioner函数的使用 public class MyPartitioner { // Map函数 public static class MyMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String[] arr_value = value.toString().split("\t"); //测试输出 // for(int i=0;i<arr_value.length;i++) // { // System.out.print(arr_value[i]+"\t"); // } // System.out.print(arr_value.length); // System.out.println(); Text word1 = new Text(); Text word2 = new Text(); if (arr_value.length > 3) { word1.set("long"); word2.set(value); } else if (arr_value.length < 3) { word1.set("short"); word2.set(value); } else { word1.set("right"); word2.set(value); } output.collect(word1, word2); } } public static class MyReduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { int sum = 0; System.out.println(key); while (values.hasNext()) { output.collect(key, new Text(values.next().getBytes())); } } } // 接口Partitioner继承JobConfigurable,所以这里有两个override方法 public static class MyPartitionerPar implements Partitioner<Text, Text> { /** * getPartition()方法的 * 输入参数:键/值对<key,value>与reducer数量numPartitions * 输出参数:分配的Reducer编号,这里是result * */ @Override public int getPartition(Text key, Text value, int numPartitions) { // TODO Auto-generated method stub int result = 0; System.out.println("numPartitions--" + numPartitions); if (key.toString().equals("long")) { result = 0 % numPartitions; } else if (key.toString().equals("short")) { result = 1 % numPartitions; } else if (key.toString().equals("right")) { result = 2 % numPartitions; } System.out.println("result--" + result); return result; } @Override public void configure(JobConf arg0) { // TODO Auto-generated method stub } } //输入参数:/home/hadoop/input/PartitionerExample /home/hadoop/output/Partitioner public static void main(String[] args) throws Exception { JobConf conf = new JobConf(MyPartitioner.class); conf.setJobName("MyPartitioner"); //控制reducer数量,因为要分3个区,所以这里设定了3个reducer conf.setNumReduceTasks(3); conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(Text.class); //设定分区类 conf.setPartitionerClass(MyPartitionerPar.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); //设定mapper和reducer类 conf.setMapperClass(MyMap.class); conf.setReducerClass(MyReduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
http://www.cnblogs.com/xwdreamer/archive/2011/10/27/2296943.html
http://www.yeolar.com/note/2013/11/19/hadoop-partition/
相关文章推荐
- Hadoop中Partition解析 和 storm的grouping
- Hadoop中的Partition解析
- Hadoop之Partition深度解析
- Hadoop中Partition解析
- Hadoop:解析Partition
- Hadoop中Partition解析
- Hadoop中Partition解析
- Hadoop中Partition深度解析
- Hadoop中Partition解析
- Hadoop:解析Partition
- Hadoop中Partition解析
- Hadoop学习总结:Map-Reduce的过程解析
- Hadoop内核解析
- 【备忘】Hadoop,Hbase,Hive源码解析与开发实战
- [Hadoop in China 2011] Hadoop之上 中国移动“大云”系统解析
- Hadoop源码解析 1 --- Hadoop工程包架构解析
- Hadoop启动脚本全解析,不能再全了![bed]
- Hadoop RPC源码解析——Server类(一)
- hadoop之 解析HDFS的写文件流程
- Hadoop中的shuffle、partition和combiner