MapReduce的分区与 分组二次排序
2017-06-06 17:49
399 查看
原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。否则将追究法律责任。http://computerdragon.blog.51cto.com/6235984/1287721
问题描述:
输入文件格式如下:
name1 2
name3 4
name1 6
name1 1
name3 3
name1 0
要求输出的文件格式如下:
name1 0,1,2,6
name3 3,4
要求是按照第一列分组,name1与name3也是按照顺序排列的,组内升序排序。
思路:
常规的输出,无法排序key所对应的多个值的顺序。为了排序组内中的值,需要将key与value放在同一个组。Job中有两个方法setGroupingComparatorClass和setSortComparatorClass,可以利用这两个方法来实现组内排序。但是这些排序都是基于key的,则就要将key和value定义成组合键。
但是必须要保证第一列相同的全部都放在同一个分区中,则就需要自定义分区,分区的时候只考虑第一列的值。由于partitioner仅仅能保证每一个reducer接受同一个name的所有记录,但是reducer仍然是通过键进行分组的分区,也就说该分区中还是按照键来分成不同的组,还需要分组只参考name值
先按照name分组,再在name中内部进行排序。
解决方法:
运用自定义组合键的策略,将name和1定义为一个组合键。在分区的时候只参考name的值,即继承partitioner。
由于要按照name分组,则就需要定义分组策略,然后设置setGroupingComparatorClass。
setGroupingComparatorClass主要定义哪些key可以放置在一组,分组的时候会对组合键进行比较,由于这里只需要考虑组合键中的一个值,则定义实现一个WritableComparator,设置比较策略。
对于组内的排序,可以利用setSortComparatorClass来实现,
这个方法主要用于定义key如何进行排序在它们传递给reducer之前,
这里就可以来进行组内排序。
具体代码:
Hadoop版本号:hadoop1.1.2
自定义组合键
View Code
注意事项
1,设置分组排序按照升序还是降序是在自定义WritableComparable中的compareTo()方法实现的,具体升序或者降序的设置在代码中已经注释说明
2,设置组内值进行升序还是降序的排序是在组内排序策略中的compare()方法注释说明的。
3,这里同时最重要的一点是,将第二列即放在组合键中,又作为value,这样对于组合键排序也就相当于对于value进行排序了。
4,在自定义组合键的时候,对于组合键中的数据的基本类型可以采用Java的基本类型也可以采用Hadoop的基本数据类型,对于Hadoop的基本数据类型一定要记得初始化new一个基本数据类型对象。对于组合键类,必须要有默认的构造方法。
本文出自 “在云端的追梦” 博客,请务必保留此出处http://computerdragon.blog.51cto.com/6235984/1287721
问题描述:
输入文件格式如下:
name1 2
name3 4
name1 6
name1 1
name3 3
name1 0
要求输出的文件格式如下:
name1 0,1,2,6
name3 3,4
要求是按照第一列分组,name1与name3也是按照顺序排列的,组内升序排序。
思路:
常规的输出,无法排序key所对应的多个值的顺序。为了排序组内中的值,需要将key与value放在同一个组。Job中有两个方法setGroupingComparatorClass和setSortComparatorClass,可以利用这两个方法来实现组内排序。但是这些排序都是基于key的,则就要将key和value定义成组合键。
但是必须要保证第一列相同的全部都放在同一个分区中,则就需要自定义分区,分区的时候只考虑第一列的值。由于partitioner仅仅能保证每一个reducer接受同一个name的所有记录,但是reducer仍然是通过键进行分组的分区,也就说该分区中还是按照键来分成不同的组,还需要分组只参考name值
先按照name分组,再在name中内部进行排序。
解决方法:
运用自定义组合键的策略,将name和1定义为一个组合键。在分区的时候只参考name的值,即继承partitioner。
由于要按照name分组,则就需要定义分组策略,然后设置setGroupingComparatorClass。
setGroupingComparatorClass主要定义哪些key可以放置在一组,分组的时候会对组合键进行比较,由于这里只需要考虑组合键中的一个值,则定义实现一个WritableComparator,设置比较策略。
对于组内的排序,可以利用setSortComparatorClass来实现,
这个方法主要用于定义key如何进行排序在它们传递给reducer之前,
这里就可以来进行组内排序。
具体代码:
Hadoop版本号:hadoop1.1.2
自定义组合键
1 package whut; 2 import java.io.IOException; 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.conf.Configured; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Job; 9 import org.apache.hadoop.mapreduce.Mapper; 10 import org.apache.hadoop.mapreduce.Reducer; 11 import org.apache.hadoop.mapreduce.Mapper.Context; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 16 import org.apache.hadoop.util.Tool; 17 import org.apache.hadoop.util.ToolRunner; 18 //需要对数据进行分组以及组内排序的时候 19 public class SortMain extends Configured implements Tool{ 20 //这里设置输入文格式为KeyValueTextInputFormat 21 //name1 5 22 //默认输入格式都是Text,Text 23 public static class GroupMapper extends 24 Mapper<Text, Text, TextInt, IntWritable> { 25 public IntWritable second=new IntWritable(); 26 public TextInt tx=new TextInt(); 27 @Override 28 protected void map(Text key, Text value, Context context) 29 throws IOException, InterruptedException { 30 String lineKey=key.toString(); 31 String lineValue=value.toString(); 32 int lineInt=Integer.parseInt(lineValue); 33 tx.setFirstKey(lineKey); 34 tx.setSecondKey(lineInt); 35 second.set(lineInt); 36 context.write(tx, second); 37 } 38 } 39 //设置reduce 40 public static class GroupReduce extends Reducer<TextInt, IntWritable, Text, Text> 41 { 42 @Override 43 protected void reduce(TextInt key, Iterable<IntWritable> values, 44 Context context) 45 throws IOException, InterruptedException { 46 StringBuffer sb=new StringBuffer(); 47 for(IntWritable val:values) 48 { 49 sb.append(val+","); 50 } 51 if(sb.length()>0) 52 { 53 sb.deleteCharAt(sb.length()-1); 54 } 55 context.write(new Text(key.getFirstKey()), new Text(sb.toString())); 56 } 57 } 58 59 @Override 60 public int run(String[] args) throws Exception { 61 // TODO Auto-generated method stub 62 Configuration conf=getConf(); 63 Job job=new Job(conf,"SecondarySort"); 64 job.setJarByClass(SortMain.class); 65 // 设置输入文件的路径,已经上传在HDFS 66 FileInputFormat.addInputPath(job, new Path(args[0])); 67 // 设置输出文件的路径,输出文件也存在HDFS中,但是输出目录不能已经存在 68 FileOutputFormat.setOutputPath(job, new Path(args[1])); 69 70 job.setMapperClass(GroupMapper.class); 71 job.setReducerClass(GroupReduce.class); 72 //设置分区方法 73 job.setPartitionerClass(KeyPartitioner.class); 74 75 //下面这两个都是针对map端的 76 //设置分组的策略,哪些key可以放置到一组中 77 job.setGroupingComparatorClass(TextComparator.class); 78 //设置key如何进行排序在传递给reducer之前. 79 //这里就可以设置对组内如何排序的方法 80 /*************关键点**********/ 81 job.setSortComparatorClass(TextIntComparator.class); 82 //设置输入文件格式 83 job.setInputFormatClass(KeyValueTextInputFormat.class); 84 //使用默认的输出格式即TextInputFormat 85 //设置map的输出key和value类型 86 job.setMapOutputKeyClass(TextInt.class); 87 job.setMapOutputValueClass(IntWritable.class); 88 //设置reduce的输出key和value类型 89 //job.setOutputFormatClass(TextOutputFormat.class); 90 job.setOutputKeyClass(Text.class); 91 job.setOutputValueClass(Text.class); 92 job.waitForCompletion(true); 93 int exitCode=job.isSuccessful()?0:1; 94 return exitCode; 95 } 96 97 public static void main(String[] args) throws Exception 98 { 99 int exitCode=ToolRunner.run(new SortMain(), args); 100 System.exit(exitCode); 101 } 102 }
View Code
注意事项
1,设置分组排序按照升序还是降序是在自定义WritableComparable中的compareTo()方法实现的,具体升序或者降序的设置在代码中已经注释说明
2,设置组内值进行升序还是降序的排序是在组内排序策略中的compare()方法注释说明的。
3,这里同时最重要的一点是,将第二列即放在组合键中,又作为value,这样对于组合键排序也就相当于对于value进行排序了。
4,在自定义组合键的时候,对于组合键中的数据的基本类型可以采用Java的基本类型也可以采用Hadoop的基本数据类型,对于Hadoop的基本数据类型一定要记得初始化new一个基本数据类型对象。对于组合键类,必须要有默认的构造方法。
本文出自 “在云端的追梦” 博客,请务必保留此出处http://computerdragon.blog.51cto.com/6235984/1287721
相关文章推荐
- Hadoop Mapreduce分区、分组、二次排序过程详解[转]
- MapReduce处理二次排序(分区-排序-分组)
- Hadoop Mapreduce分区、分组、二次排序过程详解[转]
- 十一、理解MapReduce的二次排序功能,包括自定义数据类型、分区、分组、排序
- Hadoop Mapreduce分区、分组、二次排序过程详解
- Hadoop Mapreduce分区、分组、二次排序过程详解
- Hadoop Mapreduce分区、分组、二次排序过程详解
- Hadoop Mapreduce分区、分组、连接以及辅助排序(也叫二次排序)过程详解
- MapReduce二次排序分区,分组优化
- mapreduce之分区,分组,排序,二次排序的综合应用
- MapReduce的自定义排序、分区和分组
- mapreduce,自定义排序,分区,分组实现按照年份升序排序,温度降序排序
- 「 Hadoop」mapreduce对温度数据进行自定义排序、分组、分区等
- mapreduce,自定义分区,分组,排序实现join
- Mapreduce中的 自定义类型、分组与二次排序
- HADOOP(2)__Mapreduce分区、排序、分组
- 结合手机上网流量业务来说明Hadoop中的二次排序机制,分区机制
- mapreduce的二次排序(字符型)
- Hadoop之MapReduce自定义二次排序流程实例详解
- 关系型MapReduce模式:选择、分组和组内排序