您的位置:首页 > 其它

MapReduce 的二次排序

2017-12-11 02:24 465 查看
关于二次排序一下是我的理解:

正常的 MapReduce 排序就是 map 输出(k,v),排序的时候是 map 中的 compareTo对 k 的大小进行排序:

数据如下:

20 21
50 51
50 52
50 53
50 54
60 51
60 53
60 52
60 56
60 57
70 58
60 61
70 54
70 55
70 56
70 57
70 58
1 2
3 4
5 6
7 82
203 21
50 512
50 522
50 53
530 54
40 511
20 53
20 522
60 56
60 57
740 58
63 61
730 54
71 55
71 56
73 57
74 58
12 211
31 42
50 62
7 8


进行简单排序结果:

--------------------
1       2
--------------------
3       4
--------------------
5       6
--------------------
7       8
7       82
--------------------
12      211
--------------------
20      522
20      53
20      21
--------------------
31      42
--------------------
40      511
--------------------
50      512
50      54
50      51
50      52
50      62
50      53
50      53
50      522
--------------------
60      51
60      57
60      56
60      61
60      57
60      56
60      52
60      53
--------------------
63      61
--------------------
70      56
70      55
70      54
70      58
70      58
70      57
--------------------
71      55
71      56
--------------------
73      57
--------------------
74      58
--------------------
203     21
--------------------
530     54
--------------------
730     54
--------------------
740     58


可以看出排序就是排的 k ,相同 k 的 v 并没有排序,如下:

50      512
50      54
50      51
50      52
50      62
50      53
50      53
50      522


所以 MapReduce 的二次排序,重写 k 的 compareTo 方法就可以了

定义的 k 要继承WritableComparable接口

package IntSort;

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
* Created by hubo on 2017/12/10
*/
public class IntPair implements WritableComparable<IntPair>{
int first;
int second;

public int getFirst() {
return first;
}

public int getSecond() {
return second;
}

public void set(int left, int right){
first = left;
second = right;
}

//反序列化,从流中的二进制转换成 Intpair
@Override
public void readFields(DataInput dataInput) throws IOException {
first = dataInput.readInt();
second = dataInput.readInt();
}
//序列化,将 IntPair 转换成流传送的二进制
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(first);
dataOutput.writeInt(second);
}
//key 的比较
@Override
public int compareTo(IntPair o) {
if(first != o.first){
return first < o.first ? -1 : 1;
}
else if(second != o.second){
return second < o.second ? -1 : 1;
}else {
return 0;
}
}
}


map 里面

package IntSort;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
* Created by hubo on 2017/12/10
*/
public class Map extends Mapper<LongWritable,Text,IntPair,LongWritable>{
private final IntPair intkey = new IntPair();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String values[] = value.toString().split(" ");
intkey.set(Integer.parseInt(values[0]),Integer.parseInt(values[1]));
context.write(intkey,new LongWritable(Integer.parseInt(values[1])));
}
}


Reducer 里面

package IntSort;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* Created by hubo on 2017/12/10
*/
public class Red extends Reducer<IntPair,LongWritable,Text,LongWritable>{
private final Text PI = new Text("-------------------");
@Override
protected void reduce(IntPair key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
context.write(PI,null);
for (LongWritable value : values){
context.write(new Text(Integer.toString(key.getFirst())),value);
}
}
}


我写的这个MyComparator用来分组,两个线之间的是一组

package IntSort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
* Created by hubo on 2017/12/10
*/
public class MyComparator extends WritableComparator{

protected MyComparator(){
super(IntPair.class,true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
IntPair a1 = (IntPair)a;
IntPair b1 = (IntPair)b;

int l = a1.getFirst();
int r = b1.getFirst();

return Integer.compare(l, r);
}
}


执行函数

package IntSort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* Created by hubo on 2017/12/10
*/
public class IntRun {
public static final String in = "/data/input";
public static final String out = "/data/output";
public static void main(String[] args) throws ClassNotFoundException, InterruptedException {

System.setProperty("HADOOP_USER_NAME","root");
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://hdp01:9000");
conf.set("yarn.resourcemanager.name","hdp01");

try {
FileSystem fs = FileSystem.get(conf);
Job job = Job.getInstance(conf,"number");

job.setJarByClass(IntRun.class);

job.setMapperClass(Map.class);
job.setMapOutputKeyClass(IntPair.class);
job.setMapOutputValueClass(LongWritable.class);

job.setReducerClass(Red.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

//用来分组
job.setGroupingComparatorClass(MyComparator.class);

FileInputFormat.addInputPath(job,new Path(in));

Path outpath = new Path(out);
if(fs.exists(outpath)){
fs.delete(outpath);
}
FileOutputFormat.setOutputPath(job,outpath);

job.waitForCompletion(true);
} catch (IOException e) {
e.printStackTrace();
}
}
}


输出结果

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  mapreduce 数据