您的位置:首页 > 运维架构

MapReduce之二次排序

2014-04-14 11:58 197 查看

应用场景

     面试官问了一个MapReduce问题:“如何用MapReduce实现两个表的join连接”。

     我说“用两个job实现,第一个。。。,第二个。。。”

     “还要用两个?二次排序会不会?”

     “不会”。于是这两天回来看了下MapReduce的二次排序。

     在某些情况下,需要对reduce中的value进行排序。而这时,可以利用二次排序。二次排序,可以将根据key聚合起来的valueList根据value进行排序。

例子

输入数据输出数据
1 2
3 4
5 6
7 8
7 82
12 211
20 21
20 53
20 522
31 42
40 511
50 51
50 52
50 53
50 53
50 54
50 62
50 512
50 522
60 51
60 52
60 53
60 56
60 56
60 57
60 57
60 61
63 61
70 54
70 55
70 56
70 57
70 58
70 58
71 55
71 56
73 57
74 58
203 21
530 54
730 54
740 58
1 2
3 4
5 6
7 8
7 82
12 211
20 21
20 53
20 522
31 42
40 511
50 51
50 52
50 53
50 53
50 54
50 62
50 512
50 522
60 51
60 52
60 53
60 56
60 56
60 57
60 57
60 61
63 61
70 54
70 55
70 56
70 57
70 58
70 58
71 55
71 56
73 57
74 58
203 21
530 54
730 54
740 58

二次排序解决思路

     以前一直以为Reduce中reduce(key,valueList)中键值对的各个key是一样的,而从对MapReduce的深入学习中,知道了只要满足一定条件,这些key是可以不一样的。我先分析下mapreduce过程。
原理
    map任务快结束时,会依次调用以下几个函数:
    1.job.setPartitionerClass()对键值对列表根据<key,value>进行分区,每个分区映射到一个reducer。默认的分区函数是HashPartion。
    2.job.setSortComparatorClass()设置的key比较函数类排序,默认根据key的compareTo进行比较。
   reduce阶段:
    1.接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass()设置的key比较函数类对所有数据对排序。
    2.之后构造一个key对应的value迭代器。这时就要用到分组,使用job.setGroupingComparatorClass()设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。
    3.之后,执行Reducer的reduce()函数,该函数的输入是所有在同一个分组内的(key,value)

思路
    这样如果要进行二次排序,可以这样设计:以前的键值对是<key,value>,新的键是键(key value)的组合,键值对是<键(key value),value>。

    setGroupingComparatorClass()根据键(key value)的key进行比较,这样就可以将具有相同key的<键(key value),value>分在一个reduce里面进行操作。键(key value)的compareTo()首先进行key的比较,再进行value的比较。完成之后reduce输入的<键(key
value),valueList>中valueList就是排好序的value列表。

示例代码

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.examples;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;

/**
* This is an example Hadoop Map/Reduce application.
* It reads the text input files that must contain two integers per a line.
* The output is sorted by the first and second number and grouped on the
* first number.
*
* To run: bin/hadoop jar build/hadoop-examples.jar secondarysort
*            <i>in-dir</i> <i>out-dir</i>
*/
public class SecondarySort {

/**
* Define a pair of integers that are writable.
* They are serialized in a byte comparable format.
*/
public static class IntPair
implements WritableComparable<IntPair> {
private int first = 0;
private int second = 0;

/**
* Set the left and right values.
*/
public void set(int left, int right) {
first = left;
second = right;
}
public int getFirst() {
return first;
}
public int getSecond() {
return second;
}
/**
* Read the two integers.
* Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1
*/
@Override
public void readFields(DataInput in) throws IOException {
first = in.readInt() + Integer.MIN_VALUE;
second = in.readInt() + Integer.MIN_VALUE;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(first - Integer.MIN_VALUE);
out.writeInt(second - Integer.MIN_VALUE);
}
@Override
public int hashCode() {
return first * 157 + second;
}
@Override
public boolean equals(Object right) {
if (right instanceof IntPair) {
IntPair r = (IntPair) right;
return r.first == first && r.second == second;
} else {
return false;
}
}
/** A Comparator that compares serialized IntPair. */
public static class Comparator extends WritableComparator {
public Comparator() {
super(IntPair.class);
}

public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
return compareBytes(b1, s1, l1, b2, s2, l2);
}
}

static {                                        // register this comparator
WritableComparator.define(IntPair.class, new Comparator());
}

@Override
public int compareTo(IntPair o) {
if (first != o.first) {
return first < o.first ? -1 : 1;
} else if (second != o.second) {
return second < o.second ? -1 : 1;
} else {
return 0;
}
}
}

/**
* Partition based on the first part of the pair.
*/
public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
@Override
public int getPartition(IntPair key, IntWritable value,
int numPartitions) {
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}

/**
* Compare only the first part of the pair, so that reduce is called once
* for each value of the first part.
*/
public static class FirstGroupingComparator
implements RawComparator<IntPair> {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8,
b2, s2, Integer.SIZE/8);
}

@Override
public int compare(IntPair o1, IntPair o2) {
int l = o1.getFirst();
int r = o2.getFirst();
return l == r ? 0 : (l < r ? -1 : 1);
}
}

/**
* Read two integers from each line and generate a key, value pair
* as ((left, right), right).
*/
public static class MapClass
extends Mapper<LongWritable, Text, IntPair, IntWritable> {

private final IntPair key = new IntPair();
private final IntWritable value = new IntWritable();

@Override
public void map(LongWritable inKey, Text inValue,
Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(inValue.toString());
int left = 0;
int right = 0;
if (itr.hasMoreTokens()) {
left = Integer.parseInt(itr.nextToken());
if (itr.hasMoreTokens()) {
right = Integer.parseInt(itr.nextToken());
}
key.set(left, right);
value.set(right);
context.write(key, value);
}
}
}
/**
* A reducer class that just emits the sum of the input values.
*/
public static class Reduce
extends Reducer<IntPair, IntWritable, Text, IntWritable> {
private static final Text SEPARATOR =
new Text("------------------------------------------------");
private final Text first = new Text();

@Override
public void reduce(IntPair key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
context.write(SEPARATOR, null);
first.set(Integer.toString(key.getFirst()));
for(IntWritable value: values) {
context.write(first, value);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: secondarysort <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "secondary sort");
job.setJarByClass(SecondarySort.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);

// group and partition by the first int in the pair
job.setPartitionerClass(FirstPartitioner.class);
job.setGroupingComparatorClass(FirstGroupingComparator.class);

// the map output is IntPair, IntWritable
job.setMapOutputKeyClass(IntPair.class);
job.setMapOutputValueClass(IntWritable.class);

// the reduce output is Text, IntWritable
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

二次排序实现两个表的join

我们可以用二次排序来实现两个表的join
如果希望表A和表B根据字段ID join在一起,处理表A时输出<(ID A),记录>,处理表B时输出<(ID B),记录>,mapper中根据ID进行partition,这可以让具有相同ID的记录放在一个reducer中处理。reducer中进行分组时,根据ID分组,使得一个reduce()能处理相同ID的记录列表。那么该reduce()函数处理的记录列表中,前面那部分为表A的记录,后面为表B的记录。这样,你在程序中就可以将这两部分记录join了。

参考资料

1. http://my.oschina.net/u/556624/blog/140719
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息