Hadoop实现Secondary Sort (二)
2013-10-18 18:11
260 查看
在hadoop中每个reduce的输入的key都是有序的,而value则是无序的。而且同一个job运行多次,由于map完成顺序不同,reduce收到的value顺序是不固定的。那如何才能实现reduce收到有序的value呢?这就需要Secondary Sort。
Secondary Sort要解决的问题:reduce收到的value有序。
这里举一个场景,来说明Secondary Sort是如何实现的。假设我们有若干公司若干部门的人数,数据样例如下:
公司名 部门的人数
Taobao 52
Taobao 31
Taobao 67
Alipay 10
Alipay 36
Alipay 29
B2B 120
B2B 72
Aliyun 13
Aliyun 32
Aliyun 3
我们想知道每个公司的最大部门(人数最多)的人数。即希望先按公司名group,然后对group内的人数降序排列,最后取每个group的第一个即可。
由于reduce收到的value是无序的,所以要对value进行排序,首先需要将value封装到key里面。即需要自定义key的类型,代码如下:
[java] view
plaincopy
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class MyKey implements WritableComparable<MyKey> {
public final Text first;
public final IntWritable second;
public MyKey() {
first = new Text();
second = new IntWritable();
}
public MyKey(Text first, IntWritable second) {
this.first = first;
this.second = second;
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public String toString() {
return first + "\t" + second;
}
@Override
public int compareTo(MyKey tp) {
int cmp = first.compareTo(tp.first);
if (cmp != 0) {
return cmp;
}
return -second.compareTo(tp.second);
}
}
这里新定义的类型为MyKey封装了一个Text和一个IntWritable,依次存放公司名和部门人数。Hadoop要求key的类型必须实现Writable和Comparable,前者为了支持序列化和反序列化,后者为了实现基于比较的排序。需要注意的是compareTo()方法中先按first即公司名升序排列,后按second即部门人数降序排列。另外toString()方法的实现是为了定义输出格式,即公司名+tab+最大部门人数。
定义key后还不能满足需求。因为默认的HashPartitioner会将相同的key分配给同一个reduce,而我们希望的是first相同的key分给同一个reduce处理,默认的Partitioner显然保证不了这一点。这就需要我们自定义Partitioner,实现first相同的key分配给同一个reduce。实现代码如下:
[java] view
plaincopy
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
public class MyPartitioner
implements Partitioner<MyKey, NullWritable> {
@Override
public void configure(JobConf job) {}
@Override
public int getPartition(MyKey key, NullWritable value, int numPartitions) {
return (key.first.hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
MyPartitioner的getPartition()方法中,只考虑first,不考虑second,这样就满足了我们的需求。
实现到这一步后,reduce会获取到按first升序且按second降序排列的key序列。而我们希望first相同的key中,只获取第一个的second即可,其他数据可以忽略。这就需要数据执行reduce前按照key的first字段进行归并,即grouping。first相同的key归为一个group,将第一个key和所有的value(value为NullWritable类型,无需处理)传给reduce()方法。然后reduce将key输出即可实现目的。为了实现这样的grouping操纵,需要自定义归并比较器(ValueGroupingComparator),代码如下:
[java] view
plaincopy
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class MyGroupComparator extends WritableComparator {
protected MyGroupComparator() {
super(MyKey.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
MyKey m1 = (MyKey) w1;
MyKey m2 = (MyKey) w2;
return m1.first.compareTo(m2.first);
}
}
从MyGroupComparator代码中可以看出,compare中只比较firest而忽略second。
以上模块自定义好后,map和reduce实现会相当容易。map只需要将公司名和部门人数构造成一个MyKey对象即可。而reduce中将收到的key输出就ok了。实现SecondarySort的作业代码如下:
[java] view
plaincopy
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MySecondarySort extends Configured implements Tool{
public static class MyMap extends MapReduceBase
implements Mapper<Text, Text, MyKey, NullWritable> {
private IntWritable num = new IntWritable();
@Override
public void map(Text key, Text value,
OutputCollector<MyKey, NullWritable> output,
Reporter reporter) throws IOException {
num.set(Integer.parseInt(value.toString()));
MyKey myKey = new MyKey(key, num);
output.collect(myKey, NullWritable.get());
}
}
public static class MyReduce extends MapReduceBase
implements Reducer<MyKey, NullWritable, MyKey, NullWritable> {
@Override
public void reduce(MyKey key, Iterator<NullWritable> values,
OutputCollector<MyKey, NullWritable> output,
Reporter reporter) throws IOException {
output.collect(key, NullWritable.get());
}
}
@Override
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), MySecondarySort.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(MyKey.class);
conf.setOutputValueClass(NullWritable.class);
conf.setMapperClass(MyMap.class);
conf.setReducerClass(MyReduce.class);
conf.setPartitionerClass(MyPartitioner.class);
conf.setOutputValueGroupingComparator(MyGroupComparator.class);
conf.setInputFormat(KeyValueTextInputFormat.class);
conf.set("key.value.separator.in.input.line", " ");
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MySecondarySort(), args);
System.exit(res);
}
}
注意由于输入格式是key+空格+value,这里采用KeyValueTextInputFormat,避免了map中做分割字符串操作。
对于输入如下内容的文件:
$ bin/hadoopfs -cat /liangly/list
Taobao 52
Taobao 31
Taobao 67
Alipay 10
Alipay 36
Alipay 29
B2B 120
B2B 72
Aliyun 13
Aliyun 32
Aliyun 3
执行上面实现的Job:
$ bin/hadoopjar
job.jar MySecondarySort \
> -Dmapred.map.tasks=3 \
> -Dmapred.reduce.tasks=2 \
> /liangly/list \
> /liangly/out
作业结束后输出如下:
$ bin/hadoopfs -cat /liangly/out/*
Alipay 36
Aliyun 32
B2B 120
Taobao 67
由于数据量很小,很容易确定已经达到了预期目的。
Secondary Sort要解决的问题:reduce收到的value有序。
这里举一个场景,来说明Secondary Sort是如何实现的。假设我们有若干公司若干部门的人数,数据样例如下:
公司名 部门的人数
Taobao 52
Taobao 31
Taobao 67
Alipay 10
Alipay 36
Alipay 29
B2B 120
B2B 72
Aliyun 13
Aliyun 32
Aliyun 3
我们想知道每个公司的最大部门(人数最多)的人数。即希望先按公司名group,然后对group内的人数降序排列,最后取每个group的第一个即可。
由于reduce收到的value是无序的,所以要对value进行排序,首先需要将value封装到key里面。即需要自定义key的类型,代码如下:
[java] view
plaincopy
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class MyKey implements WritableComparable<MyKey> {
public final Text first;
public final IntWritable second;
public MyKey() {
first = new Text();
second = new IntWritable();
}
public MyKey(Text first, IntWritable second) {
this.first = first;
this.second = second;
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public String toString() {
return first + "\t" + second;
}
@Override
public int compareTo(MyKey tp) {
int cmp = first.compareTo(tp.first);
if (cmp != 0) {
return cmp;
}
return -second.compareTo(tp.second);
}
}
这里新定义的类型为MyKey封装了一个Text和一个IntWritable,依次存放公司名和部门人数。Hadoop要求key的类型必须实现Writable和Comparable,前者为了支持序列化和反序列化,后者为了实现基于比较的排序。需要注意的是compareTo()方法中先按first即公司名升序排列,后按second即部门人数降序排列。另外toString()方法的实现是为了定义输出格式,即公司名+tab+最大部门人数。
定义key后还不能满足需求。因为默认的HashPartitioner会将相同的key分配给同一个reduce,而我们希望的是first相同的key分给同一个reduce处理,默认的Partitioner显然保证不了这一点。这就需要我们自定义Partitioner,实现first相同的key分配给同一个reduce。实现代码如下:
[java] view
plaincopy
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
public class MyPartitioner
implements Partitioner<MyKey, NullWritable> {
@Override
public void configure(JobConf job) {}
@Override
public int getPartition(MyKey key, NullWritable value, int numPartitions) {
return (key.first.hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
MyPartitioner的getPartition()方法中,只考虑first,不考虑second,这样就满足了我们的需求。
实现到这一步后,reduce会获取到按first升序且按second降序排列的key序列。而我们希望first相同的key中,只获取第一个的second即可,其他数据可以忽略。这就需要数据执行reduce前按照key的first字段进行归并,即grouping。first相同的key归为一个group,将第一个key和所有的value(value为NullWritable类型,无需处理)传给reduce()方法。然后reduce将key输出即可实现目的。为了实现这样的grouping操纵,需要自定义归并比较器(ValueGroupingComparator),代码如下:
[java] view
plaincopy
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class MyGroupComparator extends WritableComparator {
protected MyGroupComparator() {
super(MyKey.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
MyKey m1 = (MyKey) w1;
MyKey m2 = (MyKey) w2;
return m1.first.compareTo(m2.first);
}
}
从MyGroupComparator代码中可以看出,compare中只比较firest而忽略second。
以上模块自定义好后,map和reduce实现会相当容易。map只需要将公司名和部门人数构造成一个MyKey对象即可。而reduce中将收到的key输出就ok了。实现SecondarySort的作业代码如下:
[java] view
plaincopy
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MySecondarySort extends Configured implements Tool{
public static class MyMap extends MapReduceBase
implements Mapper<Text, Text, MyKey, NullWritable> {
private IntWritable num = new IntWritable();
@Override
public void map(Text key, Text value,
OutputCollector<MyKey, NullWritable> output,
Reporter reporter) throws IOException {
num.set(Integer.parseInt(value.toString()));
MyKey myKey = new MyKey(key, num);
output.collect(myKey, NullWritable.get());
}
}
public static class MyReduce extends MapReduceBase
implements Reducer<MyKey, NullWritable, MyKey, NullWritable> {
@Override
public void reduce(MyKey key, Iterator<NullWritable> values,
OutputCollector<MyKey, NullWritable> output,
Reporter reporter) throws IOException {
output.collect(key, NullWritable.get());
}
}
@Override
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), MySecondarySort.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(MyKey.class);
conf.setOutputValueClass(NullWritable.class);
conf.setMapperClass(MyMap.class);
conf.setReducerClass(MyReduce.class);
conf.setPartitionerClass(MyPartitioner.class);
conf.setOutputValueGroupingComparator(MyGroupComparator.class);
conf.setInputFormat(KeyValueTextInputFormat.class);
conf.set("key.value.separator.in.input.line", " ");
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MySecondarySort(), args);
System.exit(res);
}
}
注意由于输入格式是key+空格+value,这里采用KeyValueTextInputFormat,避免了map中做分割字符串操作。
对于输入如下内容的文件:
$ bin/hadoopfs -cat /liangly/list
Taobao 52
Taobao 31
Taobao 67
Alipay 10
Alipay 36
Alipay 29
B2B 120
B2B 72
Aliyun 13
Aliyun 32
Aliyun 3
执行上面实现的Job:
$ bin/hadoopjar
job.jar MySecondarySort \
> -Dmapred.map.tasks=3 \
> -Dmapred.reduce.tasks=2 \
> /liangly/list \
> /liangly/out
作业结束后输出如下:
$ bin/hadoopfs -cat /liangly/out/*
Alipay 36
Aliyun 32
B2B 120
Taobao 67
由于数据量很小,很容易确定已经达到了预期目的。
相关文章推荐
- Hadoop实现Secondary Sort(一)
- Hadoop实现Secondary Sort (三)
- Hadoop Map Reduce Secondary Sort
- 使用secondary sort实现数据关联 完整示例代码
- 用MPI实现Hadoop Map/Reduce的TeraSort
- hadoop自带例子SecondarySort源码分析MapReduce原理
- hadoop编程(8)-MapReduce案例:次排序(Secondary Sort)详解
- 【Data Algorithms_Recipes for Scaling up with Hadoop and Spark】Chapter1 Secondary Sort
- Hadoop 之 Secondary Sort介绍<转>
- Hadoop 之 Secondary Sort介绍
- JDK不同版本的Collections.Sort方法实现
- 实现用Collections.sort(arg1,arg2)排序
- 利用Hadoop实现超大矩阵相乘之我见(二)
- hadoop ha 高可用实现原理
- CHD4B1(hadoop-0.23)实现NameNode HA安装配置
- sort函数的用法(C++排序库函数的调用) 对数组进行排序,在c++中有库函数帮我们实现,这们就不需要我们自己来编程进行排序了。 (一)为什么要用c++标准库里的排序函数 Sort()函数是c+
- 希尔排序(Shell's Sort)的C语言实现
- Lucene-Hadoop, GFS中Map/Reduce的简单实现
- java 利用泛性和反射机制实现collections.sort排序模板
- 快速排序quick_sort(python的两种实现方式)