您的位置:首页 > 其它

MapReduce的分区与 分组二次排序

2017-06-06 17:49 399 查看
原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。否则将追究法律责任。http://computerdragon.blog.51cto.com/6235984/1287721

问题描述:

输入文件格式如下:

name1 2

name3 4

name1 6

name1 1

name3 3

name1 0

要求输出的文件格式如下:

name1 0,1,2,6

name3 3,4

要求是按照第一列分组,name1与name3也是按照顺序排列的,组内升序排序。

思路:

常规的输出,无法排序key所对应的多个值的顺序。为了排序组内中的值,需要将key与value放在同一个组。Job中有两个方法setGroupingComparatorClass和setSortComparatorClass,可以利用这两个方法来实现组内排序。但是这些排序都是基于key的,则就要将key和value定义成组合键。

但是必须要保证第一列相同的全部都放在同一个分区中,则就需要自定义分区,分区的时候只考虑第一列的值。由于partitioner仅仅能保证每一个reducer接受同一个name的所有记录,但是reducer仍然是通过键进行分组的分区,也就说该分区中还是按照键来分成不同的组,还需要分组只参考name值

先按照name分组,再在name中内部进行排序。

解决方法:

运用自定义组合键的策略,将name和1定义为一个组合键。在分区的时候只参考name的值,即继承partitioner。

由于要按照name分组,则就需要定义分组策略,然后设置setGroupingComparatorClass。

setGroupingComparatorClass主要定义哪些key可以放置在一组,分组的时候会对组合键进行比较,由于这里只需要考虑组合键中的一个值,则定义实现一个WritableComparator,设置比较策略。

对于组内的排序,可以利用setSortComparatorClass来实现,

这个方法主要用于定义key如何进行排序在它们传递给reducer之前,

这里就可以来进行组内排序。

具体代码:

Hadoop版本号:hadoop1.1.2

自定义组合键

1 package whut;
2 import java.io.IOException;
3 import org.apache.hadoop.conf.Configuration;
4 import org.apache.hadoop.conf.Configured;
5 import org.apache.hadoop.fs.Path;
6 import org.apache.hadoop.io.IntWritable;
7 import org.apache.hadoop.io.Text;
8 import org.apache.hadoop.mapreduce.Job;
9 import org.apache.hadoop.mapreduce.Mapper;
10 import org.apache.hadoop.mapreduce.Reducer;
11 import org.apache.hadoop.mapreduce.Mapper.Context;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
16 import org.apache.hadoop.util.Tool;
17 import org.apache.hadoop.util.ToolRunner;
18 //需要对数据进行分组以及组内排序的时候
19 public class SortMain extends Configured implements Tool{
20     //这里设置输入文格式为KeyValueTextInputFormat
21     //name1 5
22     //默认输入格式都是Text,Text
23     public static class GroupMapper extends
24        Mapper<Text, Text, TextInt, IntWritable>  {
25         public IntWritable second=new IntWritable();
26         public TextInt tx=new TextInt();
27         @Override
28         protected void map(Text key, Text value, Context context)
29                 throws IOException, InterruptedException {
30             String lineKey=key.toString();
31             String lineValue=value.toString();
32             int lineInt=Integer.parseInt(lineValue);
33             tx.setFirstKey(lineKey);
34             tx.setSecondKey(lineInt);
35             second.set(lineInt);
36             context.write(tx, second);
37         }
38     }
39     //设置reduce
40     public static class GroupReduce extends Reducer<TextInt, IntWritable, Text, Text>
41     {
42         @Override
43         protected void reduce(TextInt key, Iterable<IntWritable> values,
44                Context context)
45                 throws IOException, InterruptedException {
46             StringBuffer sb=new StringBuffer();
47             for(IntWritable val:values)
48             {
49                 sb.append(val+",");
50             }
51             if(sb.length()>0)
52             {
53                 sb.deleteCharAt(sb.length()-1);
54             }
55             context.write(new Text(key.getFirstKey()), new Text(sb.toString()));
56         }
57     }
58
59     @Override
60     public int run(String[] args) throws Exception {
61         // TODO Auto-generated method stub
62         Configuration conf=getConf();
63         Job job=new Job(conf,"SecondarySort");
64         job.setJarByClass(SortMain.class);
65         // 设置输入文件的路径,已经上传在HDFS
66         FileInputFormat.addInputPath(job, new Path(args[0]));
67         // 设置输出文件的路径,输出文件也存在HDFS中,但是输出目录不能已经存在
68         FileOutputFormat.setOutputPath(job, new Path(args[1]));
69
70         job.setMapperClass(GroupMapper.class);
71         job.setReducerClass(GroupReduce.class);
72         //设置分区方法
73         job.setPartitionerClass(KeyPartitioner.class);
74
75         //下面这两个都是针对map端的
76         //设置分组的策略,哪些key可以放置到一组中
77         job.setGroupingComparatorClass(TextComparator.class);
78         //设置key如何进行排序在传递给reducer之前.
79         //这里就可以设置对组内如何排序的方法
80         /*************关键点**********/
81         job.setSortComparatorClass(TextIntComparator.class);
82         //设置输入文件格式
83         job.setInputFormatClass(KeyValueTextInputFormat.class);
84         //使用默认的输出格式即TextInputFormat
85         //设置map的输出key和value类型
86         job.setMapOutputKeyClass(TextInt.class);
87         job.setMapOutputValueClass(IntWritable.class);
88         //设置reduce的输出key和value类型
89         //job.setOutputFormatClass(TextOutputFormat.class);
90         job.setOutputKeyClass(Text.class);
91         job.setOutputValueClass(Text.class);
92         job.waitForCompletion(true);
93         int exitCode=job.isSuccessful()?0:1;
94         return exitCode;
95     }
96
97     public static void main(String[] args)  throws Exception
98     {
99        int exitCode=ToolRunner.run(new SortMain(), args);
100        System.exit(exitCode);
101     }
102 }


View Code
注意事项

1,设置分组排序按照升序还是降序是在自定义WritableComparable中的compareTo()方法实现的,具体升序或者降序的设置在代码中已经注释说明

2,设置组内值进行升序还是降序的排序是在组内排序策略中的compare()方法注释说明的。

3,这里同时最重要的一点是,将第二列即放在组合键中,又作为value,这样对于组合键排序也就相当于对于value进行排序了。

4,在自定义组合键的时候,对于组合键中的数据的基本类型可以采用Java的基本类型也可以采用Hadoop的基本数据类型,对于Hadoop的基本数据类型一定要记得初始化new一个基本数据类型对象。对于组合键类,必须要有默认的构造方法。

本文出自 “在云端的追梦” 博客,请务必保留此出处http://computerdragon.blog.51cto.com/6235984/1287721
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: