7 MapReduce进阶之shuffle阶段
2017-03-12 08:31
239 查看
Shuffle阶段说明
shuffle阶段主要包括map阶段的combine(压缩)、group、sort、partition以及reducer阶段的合并排序。Map阶段通过shuffle后会将输出数据按照reduce的分区分文件的保存,文件内容是按照定义的sort进行排序好的。Map阶段完成后会通知ApplicationMaster,然后AM会通知Reduce进行数据的拉取,在拉取过程中进行reduce端的shuffle过程。数据如果直接放到hdfs上会多次备份浪费hdfs资源。数据放在运行节点的本地磁盘上。
用户自定义Combiner
Combiner可以减少Map阶段的中间输出结果数,降低网络开销。默认情况下是没有Combiner的。用户自定义的Combiner要求是Reducer的子类,以Map的输出 key,value作为Combiner的输入key,value和输出key,value,也就是说Combiner的输入和输出必须是一样的。可以通过job.setCombinerClass设置combiner的处理类,MapReduce框架不保证一定会调用该类的方法。
用户自定义Partitoner
Partitioner是用于确定map输出的key,value对应的处理reducer是哪个节点。默认MapReduce任务reduce个数为1个,此时Partitioner其实没有什么效果,但是当我们将reduce个数修改为多个的时候,partitioner就会决定key所对应reduce的节点序号(从0开始)。可以通过job.setPartitionerClass方法指定Partitioner类,默认情况下使用HashPartitioner(默认调用key的hashCode方法)。
用户自定义Group
GroupingComparator是用于将Map输出的key,value进行分组组合成key,List的关键类,直白来讲就是用于确定key1和key2是否属于同一组,如果是同一组,就将map的输出value进行组合。要求我们自定义的类实现自接口RawComparator,可以通过job.setGroupingComparatorClass方法指定比较类。默认情况下使用WritableComparator,但是最终调用key的compareTo方法进行比较。
用户自定义Sort
SortComparator是用于将Map输出的key,value进行key排序的关键类, 直白来讲就是用于确定key1所属组和key2所属组那个在前,那个在后。要求我们自定义的类实现自接口RawComparator,可以通过job.setSortComparatorClass方法指定比较类。默认情况下使用WritableComparator,但是最终调用key的compareTo方法进行比较。
用户自定义Reducer的Shuffle
在reduce端拉取map的输出数据的时候,会进行shuffle(合并排序),MapReduce框架以插件模式提供了一个自定义的方式,我们可以通过实现接口ShuffleConsumerPlugin,并指定参数mapreduce.job.reduce.shuffle.consumer.plugin.class来指定自定义的shuffle规则,但是一般情况下,直接采用默认的类org.apache.hadoop.mapreduce.task.reduce.Shuffle。案例–二次排序
hadoop默认只对key进行排序,有时候我们需要将value部分也进行排序,这种情况下有两种方式实现,第一种,我们将排序放到reducer端进行,但是这种方式当数据量比较大的时候,会比较消耗内存。那么另外一种方式就是二次排序。二次排序的内部实行其实是先按照key+value组合的方式进行排序,然后根据单独key进行分组的一种实行方式。要求reducer个数为2,而且奇数到第一个reducer进行处理,偶数到第二个reducer进行处理。数据格式分析
map输入: 71 70
map输出: 71,70 70
reduce输入:
reduce输出:71 70
sort --> group key + value value 先排序 接着按照key分组,此时value便是有序的
IntPair —— 自定义输出key数据类型
package com.beifeng.shuffle; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * 自定义输出key数据类型 * * @author Administrator * */ public class IntPair implements WritableComparable<IntPair> { private int first; private int second; public IntPair() { super(); } public IntPair(int first, int second) { super(); this.first = first; this.second = second; } public int getFirst() { return first; } public void setFirst(int first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } @Override public void write(DataOutput out) throws IOException { out.writeInt(this.first); out.writeInt(this.second); } @Override public void readFields(DataInput in) throws IOException { this.first = in.readInt(); this.second = in.readInt(); } @Override public int compareTo(IntPair o) { if (o == this) { return 0; } // 先按照first排序 int tmp = Integer.compare(this.first, o.first); if (tmp != 0) { return tmp; } // 再按照second排序 tmp = Integer.compare(this.second, o.second); return tmp; } }
IntPairGrouping
package com.beifeng.shuffle; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 自定义分组类 * * @author Administrator * */ public class IntPairGrouping extends WritableComparator { public IntPairGrouping() { super(IntPair.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { IntPair key1 = (IntPair) a; IntPair key2 = (IntPair) b; return Integer.compare(key1.getFirst(), key2.getFirst()); } }
IntPairPartitioner
package com.beifeng.shuffle; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; public class IntPairPartitioner extends Partitioner<IntPair, IntWritable> { @Override public int getPartition(IntPair key, IntWritable value, int numPartitions) { if (numPartitions >= 2) { int first = key.getFirst(); if (first % 2 == 0) { // 偶数,需要使用第二个reducer进行处理 return 1; } else { // 奇数,需要第一个reducer 进行处理,返回值范围是0 - num-1 return 0; } } else { throw new IllegalArgumentException("reducer个数必须大于1"); } } }
DemoRunner
package com.beifeng.shuffle; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 主类 * * @author Administrator * */ public class DemoRunner { /** * 处理mapper类 * * @author Administrator * */ static class DemoMapper extends Mapper<Object, Text, IntPair, IntWritable> { @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] strs = line.split("\\s"); if (strs.length == 2) { // 要求数据每行两个 int first = Integer.valueOf(strs[0]); int second = Integer.valueOf(strs[1]); context.write(new IntPair(first, second), new IntWritable(second)); } else { System.err.println("数据异常" + line); } } } /** * 自定义reducer类 * * @author Administrator * */ static class DemoReducer extends Reducer<IntPair, IntWritable, IntWritable, Text> { @Override protected void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { Integer preKey = key.getFirst(); StringBuffer sb = new StringBuffer(); // 保存结果 for (IntWritable value : values) { int curKey = key.getFirst(); if (preKey == curKey) { // 表示是同一个key,但是value是不一样的,或者value是排序好的 sb.append(value.get()).append(","); } else { // 表示是新的一个key,先输出旧的key对应的value信息,然后修改key值和stringbuffer的值 context.write(new IntWritable(preKey), new Text(sb.toString())); preKey = curKey; sb = new StringBuffer(); sb.append(value.get()).append(","); } } // 输出最后的结果信息 context.write(new IntWritable(preKey), new Text(sb.toString())); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.175.110:8020"); Job job = Job.getInstance(conf, "demo-job"); job.setJarByClass(DemoRunner.class); job.setMapperClass(DemoMapper.class); job.setReducerClass(DemoReducer.class); job.setMapOutputKeyClass(IntPair.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); // group by class job.setGroupingComparatorClass(IntPairGrouping.class); // 设置partitioner,要求reducer个数大于1 job.setPartitionerClass(IntPairPartitioner.class); job.setNumReduceTasks(2); // 输入输出路径 FileInputFormat.addInputPaths(job, "/beifeng/input/intpair/"); FileOutputFormat.setOutputPath(job, new Path("/beifeng/07/output/" + System.currentTimeMillis())); // 提交 job.waitForCompletion(true); } }
#
相关文章推荐
- MapReduce性能优化_6. 优化 Shuffle & Sort 阶段
- MapReduce combiner阶段 与shuffle阶段的区别
- hadoop概念-MapReduce各个执行阶段及Shuffle过程详解
- mapreduce 的map shuffle reduce 阶段解析
- [大牛翻译系列]Hadoop(13)MapReduce 性能调优:优化洗牌(shuffle)和排序阶段
- MapReduce --- shuffle阶段图解
- MapReduce的Shuffle阶段
- hadoop执行mapreduce任务,能够map,不能reduce,Shuffle阶段报错
- 【Hadoop】MapReduce笔记(三):MapReduce的Shuffle和Sort阶段详解
- 通过编程方式详解MapReduce之Shuffle 三个阶段
- mapreduce的shuffle阶段详解
- Hadoop基础教程-第7章 MapReduce进阶(7.1 MapReduce过程)(草稿)
- MapReduce:详解Shuffle过程
- Hadoop : MapReduce中的核心Shuffle和Sort分析
- Hadoop系列之四:MapReduce进阶
- Hadoop深入学习:MapReduce的Shuffle过程详解
- Mongodb高级进阶 MapReduce
- MapReduce阶段map的setup() 和cleanup()
- MapReduce错误处理,任务调度及Shuffle过程
- 彻底了解mapreduce核心Shuffle