MapReduce之二次排序
2017-11-18 10:18
267 查看
一 RawComparator介绍
Hadoop支持对序列化的二进制流直接进行比较。相比于对序列化二进制流进行反序列化再进行序列化,这种方式效率更高。
RawComparator接口就是用来进行序列化字节之间的比较的。该接口继承了Comparator接口,提供了一个compare方法:publicint compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
b1:第一个对象所在字节数组
s1:该对象在b1中的起始位置
l1:该对象在b1中的长度
b2:第二个对象所在字节数组
s2:该对象在b2的起始位置
l2:该对象在b2中的长度
二 二次排序
有时候,一个文件有多个字段,但是我们希望首先根据第一个字段排序,然后key相同的情况下,在进行第二个字段的排序。
要点:
2.1自定义实现一个WritableComprable<T>的类,这个需要定义需要比较的字段。
因为我们只能对key比较,所以就应该在key上想办法。试想又想同时对2个字段排序,又只能对key排序,所以,我们就需要自定义一个复合类型由需要比较的两个字段组成,这个复合类型可以对多个字段进行比较。
publicstatic
classPairWritableimplementsWritableComparable<PairWritable>
{
private intfirst;
private intsecond;
public PairWritable() {
}
public PairWritable(intfirst,
intsecond) {
this.first =first;
this.second =second;
}
public voidwrite(DataOutputout)
throws IOException {
out.writeInt(first);
out.writeInt(second);
}
public voidreadFields(DataInputin)
throws IOException {
this.first =in.readInt();
this.second =in.readInt();
}
public voidset(intleft,
intright) {
first =
left;
second =
right;
}
@Override
public inthashCode() {
final
intprime =
31;
int
result =1;
result =
prime *result +
first;
result =
prime *result +
second;
return
result;
}
@Override
public booleanequals(Object
obj) {
if (this ==obj)
return
true;
if (obj ==null)
return
false;
if (getClass() !=obj.getClass())
return
false;
PairWritable
other = (PairWritable)obj;
if (first !=other.first)
return
false;
if (second !=other.second)
return
false;
return
true;
}
public intgetFirst() {
return
first;
}
public intgetSecond() {
return
second;
}
/** A Comparator that compares serializedIntPair. */
public staticclass Comparator
extends WritableComparator{
public
Comparator() {
super(PairWritable.class);
}
@Override
public
intcompare(byte[]b1,
ints1,
intl1,
byte[]b2,
ints2,
intl2) {
return
compareBytes(b1,s1,
l1, b2,
s2, l2);
}
}
static {// register this comparator
WritableComparator.define(PairWritable.class,new
Comparator());
}
public intcompareTo(PairWritableo) {
if (first !=o.first)
{
return
first < o.first ? -1 :1;
}else
if (second !=o.second) {
return
second < o.second ? -1 :1;
}else {
return
0;
}
}
}
2.2编写map函数 和 reduce函数
其中我们自定义的组合key类型作为OutputKey,而OutputValue还是和以前保持一致
2.3接着map就要开始shuffle操作, 首先要进行分区
以前的分区算法是基于key(也就是第一个字段),现在我们的key是复合类型,那势必可能和以前的分区不一样了,如果想保持以前的分区,我们需要自己实现。
publicstatic
class SortPartitionerextends
Partitioner<PairWritable, IntWritable> {
@Override
public intgetPartition(PairWritable
key, IntWritable value,
intnumPartitions) {
return (key.hashCode() & Integer.MAX_VALUE)
%numPartitions;
}
}
2.4 分区之后,我们知道会进行排序操作,默认的排序规则就是先根据partition排序,然后如果partition相同,则根据key排序。
Key要排序:
# 我们指定了比较器
如果我们对于key没有指定具体比较器,那么,就会根据我们指定的比较器进行排序
# 我们没有指定比较器
就会根据MapOutputKey也就是我们自定义的复合key的compareTo方法进行排序,当然我们之前的代码里已经实现了。
2.5 然后就是Reduce阶段的排序
它的原理跟map阶段默认排序是一样的。发生在merge的时候。也是没有指定比较器,就需要根据key的compareTo方法进行排序。
2.6 开始分组
在reduce之前,会对结果进行一个归并或者分组操作。很显然,如果我们以自定义的类型去参与分组,结果是有问题的。所以我们需要保持和原始的key的类型分组一致。
也就是在分组的时候,我们只需要让(field1,field2)字段一参与分组。
如果我们指定了分组比较器,那么就使用我们自定义的 分组比较器。
否则还是按照OutputKey的compareTo方法进行比较,看是否是属于同一个key。
publicstatic
class SortGroupComparatorimplements
RawComparator<PairWritable> {
public intcompare(PairWritable
o1, PairWritable o2) {
int
l =o1.getFirst();
int
r =o2.getFirst();
return
l == r ?
0 : (l <r ? -1 :
1);
}
public intcompare(byte[]b1,
ints1,
intl1,
byte[]b2,
ints2,
intl2) {
return WritableComparator.compareBytes(b1,s1,
Integer.SIZE /8,
b2, s2, Integer.SIZE /8);
}
}
2.6 编写mapper函数和reduce函数
publicstatic
class SortMapperextendsMapper<LongWritable, Text, PairWritable, IntWritable> {
private PairWritable
outputKey = new
PairWritable();
private IntWritable
outputValue = new
IntWritable();
@Override
protected voidmap(LongWritable
key, Textvalue, Mapper<LongWritable, Text, PairWritable,
IntWritable>.Contextcontext)
throws IOException, InterruptedException {
if (value ==null) {
return;
}
String[]array =
value.toString().split("");
if (ArrayUtils.isEmpty(array)) {
return;
}
int
field1 = Integer.parseInt(array[0]);
int
field2 = Integer.parseInt(array[1]);
outputKey.set(field1,field2);
outputValue.set(field2);
context.write(outputKey,outputValue);
}
}
publicstatic
class SortReducerextends Reducer<PairWritable, IntWritable, Text, IntWritable> {
private staticfinal Text
SEPARATOR =new
Text("------------------------------------------------");
private final Textfirst =
newText();
@Override
protected voidreduce(PairWritable
key, Iterable<IntWritable>
values,
Reducer<PairWritable,IntWritable, Text, IntWritable>.Contextcontext)
throws IOException, InterruptedException {
context.write(SEPARATOR,null);
StringfVal = Integer.toString(key.first);
first.set(fVal);
for (IntWritable
value :values) {
context.write(first,value);
}
}
}
Hadoop支持对序列化的二进制流直接进行比较。相比于对序列化二进制流进行反序列化再进行序列化,这种方式效率更高。
RawComparator接口就是用来进行序列化字节之间的比较的。该接口继承了Comparator接口,提供了一个compare方法:publicint compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
b1:第一个对象所在字节数组
s1:该对象在b1中的起始位置
l1:该对象在b1中的长度
b2:第二个对象所在字节数组
s2:该对象在b2的起始位置
l2:该对象在b2中的长度
二 二次排序
有时候,一个文件有多个字段,但是我们希望首先根据第一个字段排序,然后key相同的情况下,在进行第二个字段的排序。
要点:
2.1自定义实现一个WritableComprable<T>的类,这个需要定义需要比较的字段。
因为我们只能对key比较,所以就应该在key上想办法。试想又想同时对2个字段排序,又只能对key排序,所以,我们就需要自定义一个复合类型由需要比较的两个字段组成,这个复合类型可以对多个字段进行比较。
publicstatic
classPairWritableimplementsWritableComparable<PairWritable>
{
private intfirst;
private intsecond;
public PairWritable() {
}
public PairWritable(intfirst,
intsecond) {
this.first =first;
this.second =second;
}
public voidwrite(DataOutputout)
throws IOException {
out.writeInt(first);
out.writeInt(second);
}
public voidreadFields(DataInputin)
throws IOException {
this.first =in.readInt();
this.second =in.readInt();
}
public voidset(intleft,
intright) {
first =
left;
second =
right;
}
@Override
public inthashCode() {
final
intprime =
31;
int
result =1;
result =
prime *result +
first;
result =
prime *result +
second;
return
result;
}
@Override
public booleanequals(Object
obj) {
if (this ==obj)
return
true;
if (obj ==null)
return
false;
if (getClass() !=obj.getClass())
return
false;
PairWritable
other = (PairWritable)obj;
if (first !=other.first)
return
false;
if (second !=other.second)
return
false;
return
true;
}
public intgetFirst() {
return
first;
}
public intgetSecond() {
return
second;
}
/** A Comparator that compares serializedIntPair. */
public staticclass Comparator
extends WritableComparator{
public
Comparator() {
super(PairWritable.class);
}
@Override
public
intcompare(byte[]b1,
ints1,
intl1,
byte[]b2,
ints2,
intl2) {
return
compareBytes(b1,s1,
l1, b2,
s2, l2);
}
}
static {// register this comparator
WritableComparator.define(PairWritable.class,new
Comparator());
}
public intcompareTo(PairWritableo) {
if (first !=o.first)
{
return
first < o.first ? -1 :1;
}else
if (second !=o.second) {
return
second < o.second ? -1 :1;
}else {
return
0;
}
}
}
2.2编写map函数 和 reduce函数
其中我们自定义的组合key类型作为OutputKey,而OutputValue还是和以前保持一致
2.3接着map就要开始shuffle操作, 首先要进行分区
以前的分区算法是基于key(也就是第一个字段),现在我们的key是复合类型,那势必可能和以前的分区不一样了,如果想保持以前的分区,我们需要自己实现。
publicstatic
class SortPartitionerextends
Partitioner<PairWritable, IntWritable> {
@Override
public intgetPartition(PairWritable
key, IntWritable value,
intnumPartitions) {
return (key.hashCode() & Integer.MAX_VALUE)
%numPartitions;
}
}
2.4 分区之后,我们知道会进行排序操作,默认的排序规则就是先根据partition排序,然后如果partition相同,则根据key排序。
Key要排序:
# 我们指定了比较器
如果我们对于key没有指定具体比较器,那么,就会根据我们指定的比较器进行排序
# 我们没有指定比较器
就会根据MapOutputKey也就是我们自定义的复合key的compareTo方法进行排序,当然我们之前的代码里已经实现了。
2.5 然后就是Reduce阶段的排序
它的原理跟map阶段默认排序是一样的。发生在merge的时候。也是没有指定比较器,就需要根据key的compareTo方法进行排序。
2.6 开始分组
在reduce之前,会对结果进行一个归并或者分组操作。很显然,如果我们以自定义的类型去参与分组,结果是有问题的。所以我们需要保持和原始的key的类型分组一致。
也就是在分组的时候,我们只需要让(field1,field2)字段一参与分组。
如果我们指定了分组比较器,那么就使用我们自定义的 分组比较器。
否则还是按照OutputKey的compareTo方法进行比较,看是否是属于同一个key。
publicstatic
class SortGroupComparatorimplements
RawComparator<PairWritable> {
public intcompare(PairWritable
o1, PairWritable o2) {
int
l =o1.getFirst();
int
r =o2.getFirst();
return
l == r ?
0 : (l <r ? -1 :
1);
}
public intcompare(byte[]b1,
ints1,
intl1,
byte[]b2,
ints2,
intl2) {
return WritableComparator.compareBytes(b1,s1,
Integer.SIZE /8,
b2, s2, Integer.SIZE /8);
}
}
2.6 编写mapper函数和reduce函数
publicstatic
class SortMapperextendsMapper<LongWritable, Text, PairWritable, IntWritable> {
private PairWritable
outputKey = new
PairWritable();
private IntWritable
outputValue = new
IntWritable();
@Override
protected voidmap(LongWritable
key, Textvalue, Mapper<LongWritable, Text, PairWritable,
IntWritable>.Contextcontext)
throws IOException, InterruptedException {
if (value ==null) {
return;
}
String[]array =
value.toString().split("");
if (ArrayUtils.isEmpty(array)) {
return;
}
int
field1 = Integer.parseInt(array[0]);
int
field2 = Integer.parseInt(array[1]);
outputKey.set(field1,field2);
outputValue.set(field2);
context.write(outputKey,outputValue);
}
}
publicstatic
class SortReducerextends Reducer<PairWritable, IntWritable, Text, IntWritable> {
private staticfinal Text
SEPARATOR =new
Text("------------------------------------------------");
private final Textfirst =
newText();
@Override
protected voidreduce(PairWritable
key, Iterable<IntWritable>
values,
Reducer<PairWritable,IntWritable, Text, IntWritable>.Contextcontext)
throws IOException, InterruptedException {
context.write(SEPARATOR,null);
StringfVal = Integer.toString(key.first);
first.set(fVal);
for (IntWritable
value :values) {
context.write(first,value);
}
}
}
相关文章推荐
- Hadoop MapReduce编程 API入门系列之二次排序(十六)
- 【Hadoop】MapReduce温度排序之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce程序之二次排序与多次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序