您的位置:首页 > 其它

MapReduce之二次排序

2017-11-18 10:18 267 查看
一 RawComparator介绍

Hadoop支持对序列化的二进制流直接进行比较。相比于对序列化二进制流进行反序列化再进行序列化,这种方式效率更高。

 

RawComparator接口就是用来进行序列化字节之间的比较的。该接口继承了Comparator接口,提供了一个compare方法:publicint compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);

b1:第一个对象所在字节数组

s1:该对象在b1中的起始位置

l1:该对象在b1中的长度

b2:第二个对象所在字节数组

s2:该对象在b2的起始位置

l2:该对象在b2中的长度

 

二 二次排序

有时候,一个文件有多个字段,但是我们希望首先根据第一个字段排序,然后key相同的情况下,在进行第二个字段的排序。

 

要点:

2.1自定义实现一个WritableComprable<T>的类,这个需要定义需要比较的字段。

因为我们只能对key比较,所以就应该在key上想办法。试想又想同时对2个字段排序,又只能对key排序,所以,我们就需要自定义一个复合类型由需要比较的两个字段组成,这个复合类型可以对多个字段进行比较。

publicstatic
class
PairWritableimplementsWritableComparable<PairWritable>
{
     
private int
first;
     
private int
second;
 
     
public
PairWritable() {
 
      }
 
     
public
PairWritable(intfirst,
intsecond) {
          this.first =first;
          this.second =second;
      }
 
     
public void
write(DataOutputout)
throws IOException {
          out.writeInt(first);
          out.writeInt(second);
      }
 
     
public void
readFields(DataInputin)
throws IOException {
          this.first =in.readInt();
          this.second =in.readInt();
      }
 
     
public void
set(intleft,
intright) {
          first =
left;
          second =
right;
      }
 
     
@Override
     
public int
hashCode() {
          final
intprime
=
31;
          int
result
=1;
          result =
prime *result +
first;
          result =
prime *result +
second;
          return
result;
      }
 
     
@Override
     
public boolean
equals(Object
obj) {
          if (this ==obj)
                return
true
;
          if (obj ==null)
                return
false
;
          if (getClass() !=obj.getClass())
                return
false
;
          PairWritable
other = (PairWritable)obj;
          if (first !=other.first)
                return
false
;
          if (second !=other.second)
                return
false
;
          return
true
;
      }
 
     
public int
getFirst() {
          return
first;
      }
 
     
public int
getSecond() {
          return
second;
      }
     

       /** A Comparator that compares serializedIntPair. */
     
public staticclass
Comparator
extends WritableComparator{
          public
Comparator() {
                super(PairWritable.class);
           }
 
          @Override
          public
int
compare(byte[]b1,
ints1,
intl1,
byte[]b2,
ints2,
intl2) {
                return
compareBytes(b1,s1,
l1, b2,
s2, l2);
           }
      }
 
     
static
{// register this comparator
           WritableComparator.define(PairWritable.class,new
Comparator());
      }
 
     
public int
compareTo(PairWritableo) {
          if (first !=o.first)
{
                return
first < o.first ? -1 :1;
           }else
if
(second !=o.second) {
                return
second < o.second ? -1 :1;
           }else {
                return
0;
           }
      }
}
2.2编写map函数 和 reduce函数

其中我们自定义的组合key类型作为OutputKey,而OutputValue还是和以前保持一致

2.3接着map就要开始shuffle操作, 首先要进行分区

以前的分区算法是基于key(也就是第一个字段),现在我们的key是复合类型,那势必可能和以前的分区不一样了,如果想保持以前的分区,我们需要自己实现。

publicstatic
class
SortPartitionerextends
Partitioner<PairWritable, IntWritable> {
 
     
@Override
     
public int
getPartition(PairWritable
key, IntWritable value,
intnumPartitions) {
          return (key.hashCode() & Integer.MAX_VALUE)
%numPartitions;
      }
}

2.4 分区之后,我们知道会进行排序操作,默认的排序规则就是先根据partition排序,然后如果partition相同,则根据key排序。

 

Key要排序:

# 我们指定了比较器

如果我们对于key没有指定具体比较器,那么,就会根据我们指定的比较器进行排序

# 我们没有指定比较器

就会根据MapOutputKey也就是我们自定义的复合key的compareTo方法进行排序,当然我们之前的代码里已经实现了。

 

2.5 然后就是Reduce阶段的排序

它的原理跟map阶段默认排序是一样的。发生在merge的时候。也是没有指定比较器,就需要根据key的compareTo方法进行排序。

 

2.6 开始分组

在reduce之前,会对结果进行一个归并或者分组操作。很显然,如果我们以自定义的类型去参与分组,结果是有问题的。所以我们需要保持和原始的key的类型分组一致。

也就是在分组的时候,我们只需要让(field1,field2)字段一参与分组。

如果我们指定了分组比较器,那么就使用我们自定义的 分组比较器。

否则还是按照OutputKey的compareTo方法进行比较,看是否是属于同一个key。

publicstatic
class
SortGroupComparatorimplements
RawComparator<PairWritable> {
     
public int
compare(PairWritable
o1, PairWritable o2) {
          int
l
=o1.getFirst();
          int
r
=o2.getFirst();
          return
l == r ?
0 : (l <r ? -1 :
1);
      }
 
     
public int
compare(byte[]b1,
ints1,
intl1,
byte[]b2,
ints2,
intl2) {
          return WritableComparator.compareBytes(b1,s1,
Integer.SIZE /8,
b2, s2, Integer.SIZE /8);
      }
}

2.6 编写mapper函数和reduce函数

publicstatic
class
SortMapperextendsMapper<LongWritable, Text, PairWritable, IntWritable> {
     
private
PairWritable
outputKey = new
PairWritable();
     
private
IntWritable
outputValue = new
IntWritable();
     
@Override
     
protected void
map(LongWritable
key, Textvalue, Mapper<LongWritable, Text, PairWritable,
                 IntWritable>.Contextcontext)
throws IOException, InterruptedException {
          if (value ==null) {
                return;
           }
           String[]array =
value.toString().split("");
          if (ArrayUtils.isEmpty(array)) {
                return;
           }
          int
field1
= Integer.parseInt(array[0]);
          int
field2
= Integer.parseInt(array[1]);
          outputKey.set(field1,field2);
          outputValue.set(field2);
          context.write(outputKey,outputValue);
      }
}
 
publicstatic
class
SortReducerextends Reducer<PairWritable, IntWritable, Text, IntWritable> {
     
private staticfinal
Text
SEPARATOR =new
Text("------------------------------------------------");
     
private final
Textfirst =
newText();
     
@Override
     
protected void
reduce(PairWritable
key, Iterable<IntWritable>
values,
                 Reducer<PairWritable,IntWritable, Text, IntWritable>.Contextcontext)
                throws IOException, InterruptedException {
          context.write(SEPARATOR,null);
           StringfVal = Integer.toString(key.first);
          first.set(fVal);
          for (IntWritable
value :values) {
                context.write(first,value);
           }
      }
}

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: