您的位置:首页 > 其它

7 MapReduce进阶之shuffle阶段

2017-03-12 08:31 239 查看

Shuffle阶段说明

shuffle阶段主要包括map阶段的combine(压缩)、group、sort、partition以及reducer阶段的合并排序。Map阶段通过shuffle后会将输出数据按照reduce的分区分文件的保存,文件内容是按照定义的sort进行排序好的。Map阶段完成后会通知ApplicationMaster,然后AM会通知Reduce进行数据的拉取,在拉取过程中进行reduce端的shuffle过程。

数据如果直接放到hdfs上会多次备份浪费hdfs资源。数据放在运行节点的本地磁盘上。

用户自定义Combiner

Combiner可以减少Map阶段的中间输出结果数,降低网络开销。默认情况下是没有Combiner的。用户自定义的Combiner要求是Reducer的子类,以Map的输出 key,value作为Combiner的输入key,value和输出key,value,也就是说Combiner的输入和输出必须是一样的。

可以通过job.setCombinerClass设置combiner的处理类,MapReduce框架不保证一定会调用该类的方法。

用户自定义Partitoner

Partitioner是用于确定map输出的key,value对应的处理reducer是哪个节点。默认MapReduce任务reduce个数为1个,此时Partitioner其实没有什么效果,但是当我们将reduce个数修改为多个的时候,partitioner就会决定key所对应reduce的节点序号(从0开始)。

可以通过job.setPartitionerClass方法指定Partitioner类,默认情况下使用HashPartitioner(默认调用key的hashCode方法)。



用户自定义Group

GroupingComparator是用于将Map输出的key,value进行分组组合成key,List的关键类,直白来讲就是用于确定key1和key2是否属于同一组,如果是同一组,就将map的输出value进行组合。

要求我们自定义的类实现自接口RawComparator,可以通过job.setGroupingComparatorClass方法指定比较类。默认情况下使用WritableComparator,但是最终调用key的compareTo方法进行比较。

用户自定义Sort

SortComparator是用于将Map输出的key,value进行key排序的关键类, 直白来讲就是用于确定key1所属组和key2所属组那个在前,那个在后。

要求我们自定义的类实现自接口RawComparator,可以通过job.setSortComparatorClass方法指定比较类。默认情况下使用WritableComparator,但是最终调用key的compareTo方法进行比较。

用户自定义Reducer的Shuffle

在reduce端拉取map的输出数据的时候,会进行shuffle(合并排序),MapReduce框架以插件模式提供了一个自定义的方式,我们可以通过实现接口ShuffleConsumerPlugin,并指定参数mapreduce.job.reduce.shuffle.consumer.plugin.class来指定自定义的shuffle规则,但是一般情况下,直接采用默认的类org.apache.hadoop.mapreduce.task.reduce.Shuffle。

案例–二次排序

hadoop默认只对key进行排序,有时候我们需要将value部分也进行排序,这种情况下有两种方式实现,第一种,我们将排序放到reducer端进行,但是这种方式当数据量比较大的时候,会比较消耗内存。那么另外一种方式就是二次排序。二次排序的内部实行其实是先按照key+value组合的方式进行排序,然后根据单独key进行分组的一种实行方式。要求reducer个数为2,而且奇数到第一个reducer进行处理,偶数到第二个reducer进行处理。





数据格式分析

map输入: 71 70

map输出: 71,70 70

reduce输入:

reduce输出:71 70

sort --> group

key + value     value 先排序
接着按照key分组,此时value便是有序的


IntPair —— 自定义输出key数据类型

package com.beifeng.shuffle;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
* 自定义输出key数据类型
*
* @author Administrator
*
*/
public class IntPair implements WritableComparable<IntPair> {
private int first;
private int second;

public IntPair() {
super();
}

public IntPair(int first, int second) {
super();
this.first = first;
this.second = second;
}

public int getFirst() {
return first;
}

public void setFirst(int first) {
this.first = first;
}

public int getSecond() {
return second;
}

public void setSecond(int second) {
this.second = second;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.first);
out.writeInt(this.second);
}

@Override
public void readFields(DataInput in) throws IOException {
this.first = in.readInt();
this.second = in.readInt();
}

@Override
public int compareTo(IntPair o) {
if (o == this) {
return 0;
}
// 先按照first排序
int tmp = Integer.compare(this.first, o.first);
if (tmp != 0) {
return tmp;
}
// 再按照second排序
tmp = Integer.compare(this.second, o.second);
return tmp;
}

}


IntPairGrouping

package com.beifeng.shuffle;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
* 自定义分组类
*
* @author Administrator
*
*/
public class IntPairGrouping extends WritableComparator {

public IntPairGrouping() {
super(IntPair.class, true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
IntPair key1 = (IntPair) a;
IntPair key2 = (IntPair) b;
return Integer.compare(key1.getFirst(), key2.getFirst());
}
}


IntPairPartitioner

package com.beifeng.shuffle;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class IntPairPartitioner extends Partitioner<IntPair, IntWritable> {

@Override
public int getPartition(IntPair key, IntWritable value, int numPartitions) {
if (numPartitions >= 2) {
int first = key.getFirst();
if (first % 2 == 0) {
// 偶数,需要使用第二个reducer进行处理
return 1;
} else {
// 奇数,需要第一个reducer 进行处理,返回值范围是0 - num-1
return 0;
}
} else {
throw new IllegalArgumentException("reducer个数必须大于1");
}
}
}


DemoRunner

package com.beifeng.shuffle;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* 主类
*
* @author Administrator
*
*/
public class DemoRunner {
/**
* 处理mapper类
*
* @author Administrator
*
*/
static class DemoMapper extends Mapper<Object, Text, IntPair, IntWritable> {

@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] strs = line.split("\\s");
if (strs.length == 2) { // 要求数据每行两个
int first = Integer.valueOf(strs[0]);
int second = Integer.valueOf(strs[1]);
context.write(new IntPair(first, second), new IntWritable(second));
} else {
System.err.println("数据异常" + line);
}
}
}

/**
* 自定义reducer类
*
* @author Administrator
*
*/
static class DemoReducer extends Reducer<IntPair, IntWritable, IntWritable, Text> {

@Override
protected void reduce(IntPair key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
Integer preKey = key.getFirst();
StringBuffer sb = new StringBuffer(); // 保存结果

for (IntWritable value : values) {
int curKey = key.getFirst();
if (preKey == curKey) {
// 表示是同一个key,但是value是不一样的,或者value是排序好的
sb.append(value.get()).append(",");
} else {
// 表示是新的一个key,先输出旧的key对应的value信息,然后修改key值和stringbuffer的值
context.write(new IntWritable(preKey), new Text(sb.toString()));
preKey = curKey;
sb = new StringBuffer();
sb.append(value.get()).append(",");
}
}

// 输出最后的结果信息
context.write(new IntWritable(preKey), new Text(sb.toString()));
}

}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.175.110:8020");

Job job = Job.getInstance(conf, "demo-job");
job.setJarByClass(DemoRunner.class);
job.setMapperClass(DemoMapper.class);
job.setReducerClass(DemoReducer.class);

job.setMapOutputKeyClass(IntPair.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);

// group by class
job.setGroupingComparatorClass(IntPairGrouping.class);
// 设置partitioner,要求reducer个数大于1
job.setPartitionerClass(IntPairPartitioner.class);
job.setNumReduceTasks(2);

// 输入输出路径
FileInputFormat.addInputPaths(job, "/beifeng/input/intpair/");
FileOutputFormat.setOutputPath(job, new Path("/beifeng/07/output/" + System.currentTimeMillis()));

// 提交
job.waitForCompletion(true);
}
}


#
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: