MapReduce 的二次排序
2017-12-11 02:24
465 查看
关于二次排序一下是我的理解:
正常的 MapReduce 排序就是 map 输出(k,v),排序的时候是 map 中的 compareTo对 k 的大小进行排序:
数据如下:
进行简单排序结果:
可以看出排序就是排的 k ,相同 k 的 v 并没有排序,如下:
所以 MapReduce 的二次排序,重写 k 的 compareTo 方法就可以了
定义的 k 要继承WritableComparable接口
map 里面
Reducer 里面
我写的这个MyComparator用来分组,两个线之间的是一组
执行函数
输出结果
正常的 MapReduce 排序就是 map 输出(k,v),排序的时候是 map 中的 compareTo对 k 的大小进行排序:
数据如下:
20 21 50 51 50 52 50 53 50 54 60 51 60 53 60 52 60 56 60 57 70 58 60 61 70 54 70 55 70 56 70 57 70 58 1 2 3 4 5 6 7 82 203 21 50 512 50 522 50 53 530 54 40 511 20 53 20 522 60 56 60 57 740 58 63 61 730 54 71 55 71 56 73 57 74 58 12 211 31 42 50 62 7 8
进行简单排序结果:
-------------------- 1 2 -------------------- 3 4 -------------------- 5 6 -------------------- 7 8 7 82 -------------------- 12 211 -------------------- 20 522 20 53 20 21 -------------------- 31 42 -------------------- 40 511 -------------------- 50 512 50 54 50 51 50 52 50 62 50 53 50 53 50 522 -------------------- 60 51 60 57 60 56 60 61 60 57 60 56 60 52 60 53 -------------------- 63 61 -------------------- 70 56 70 55 70 54 70 58 70 58 70 57 -------------------- 71 55 71 56 -------------------- 73 57 -------------------- 74 58 -------------------- 203 21 -------------------- 530 54 -------------------- 730 54 -------------------- 740 58
可以看出排序就是排的 k ,相同 k 的 v 并没有排序,如下:
50 512 50 54 50 51 50 52 50 62 50 53 50 53 50 522
所以 MapReduce 的二次排序,重写 k 的 compareTo 方法就可以了
定义的 k 要继承WritableComparable接口
package IntSort; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * Created by hubo on 2017/12/10 */ public class IntPair implements WritableComparable<IntPair>{ int first; int second; public int getFirst() { return first; } public int getSecond() { return second; } public void set(int left, int right){ first = left; second = right; } //反序列化,从流中的二进制转换成 Intpair @Override public void readFields(DataInput dataInput) throws IOException { first = dataInput.readInt(); second = dataInput.readInt(); } //序列化,将 IntPair 转换成流传送的二进制 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(first); dataOutput.writeInt(second); } //key 的比较 @Override public int compareTo(IntPair o) { if(first != o.first){ return first < o.first ? -1 : 1; } else if(second != o.second){ return second < o.second ? -1 : 1; }else { return 0; } } }
map 里面
package IntSort; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * Created by hubo on 2017/12/10 */ public class Map extends Mapper<LongWritable,Text,IntPair,LongWritable>{ private final IntPair intkey = new IntPair(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String values[] = value.toString().split(" "); intkey.set(Integer.parseInt(values[0]),Integer.parseInt(values[1])); context.write(intkey,new LongWritable(Integer.parseInt(values[1]))); } }
Reducer 里面
package IntSort; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Created by hubo on 2017/12/10 */ public class Red extends Reducer<IntPair,LongWritable,Text,LongWritable>{ private final Text PI = new Text("-------------------"); @Override protected void reduce(IntPair key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { context.write(PI,null); for (LongWritable value : values){ context.write(new Text(Integer.toString(key.getFirst())),value); } } }
我写的这个MyComparator用来分组,两个线之间的是一组
package IntSort; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * Created by hubo on 2017/12/10 */ public class MyComparator extends WritableComparator{ protected MyComparator(){ super(IntPair.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { IntPair a1 = (IntPair)a; IntPair b1 = (IntPair)b; int l = a1.getFirst(); int r = b1.getFirst(); return Integer.compare(l, r); } }
执行函数
package IntSort; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * Created by hubo on 2017/12/10 */ public class IntRun { public static final String in = "/data/input"; public static final String out = "/data/output"; public static void main(String[] args) throws ClassNotFoundException, InterruptedException { System.setProperty("HADOOP_USER_NAME","root"); Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://hdp01:9000"); conf.set("yarn.resourcemanager.name","hdp01"); try { FileSystem fs = FileSystem.get(conf); Job job = Job.getInstance(conf,"number"); job.setJarByClass(IntRun.class); job.setMapperClass(Map.class); job.setMapOutputKeyClass(IntPair.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(Red.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //用来分组 job.setGroupingComparatorClass(MyComparator.class); FileInputFormat.addInputPath(job,new Path(in)); Path outpath = new Path(out); if(fs.exists(outpath)){ fs.delete(outpath); } FileOutputFormat.setOutputPath(job,outpath); job.waitForCompletion(true); } catch (IOException e) { e.printStackTrace(); } } }
输出结果
相关文章推荐
- Mapreduce二次排序实例
- Hadoop二次排序及MapReduce处理流程实例详解
- MapReduce 二次排序详解
- mapreduce的二次排序 SecondarySort
- mapreduce二次排序
- mapreduce二次排序案例
- mapreduce的二次排序 SecondarySort
- Hadoop之MapReduce自定义二次排序流程实例详解
- Hadoop之MapReduce自定义二次排序流程实例详解
- mapreduce的二次排序 SecondarySort
- hadoop之MapReduce自定义二次排序流程实例详解 推荐
- Hadoop1.x MapReduce 实现二次排序 实现WritableComparable接口
- MapReduce的排序和二次排序
- 实验6 MapReduce-二次排序
- mapreduce的二次排序 SecondarySort
- mapreduce排序和二次排序以及全排序
- MapReduce 高级应用练习:二次排序及Join
- Hadoop---mapreduce排序和二次排序以及全排序
- MapReduce二次排序
- MapReduce处理二次排序(分区-排序-分组)