Hadoop Map Reduce Secondary Sort
2014-05-27 14:30
246 查看
How to sort the value?
Hadoop.The.Definitive.Guide.3rd.Edition show that answer:
1. Make the key a composite of the natural key and the natural value.
2. The sort comparator should order by the composite key, that is, the natural key
and natural value.
3. The partitioner and grouping comparator for the composite key should consider
only the natural key for partitioning and grouping.
A example :
natural key : id
natural value : name
sort name asc of the same id
Step 1 : create the composite key class
Step 2: create the map class, use the composite key as key output,text that contain name as value output
Step 3 : create the reduce class, only output key,value
Step 4 : set the natural key partitioner,then the same id data go to the same reduce
Step 5 : create natural comparator for the value group
Step 6 : config the map reduce job
Hadoop.The.Definitive.Guide.3rd.Edition show that answer:
1. Make the key a composite of the natural key and the natural value.
2. The sort comparator should order by the composite key, that is, the natural key
and natural value.
3. The partitioner and grouping comparator for the composite key should consider
only the natural key for partitioning and grouping.
A example :
natural key : id
natural value : name
sort name asc of the same id
Step 1 : create the composite key class
public static class CompositeKey implements WritableComparable<CompositeKey> { public Long id; public String name; public void readFields(DataInput in) throws IOException { if (in.readByte() != -1) { this.id = in.readLong(); } if (in.readByte() != -1) { this.name = Text.readString(in); } } public void write(DataOutput out) throws IOException { if (this.id == null) { out.writeByte(-1); } else { out.writeByte(0); out.writeLong(this.id); } if (this.name == null) { out.writeByte(-1); } else { out.writeByte(0); Text.writeString(out, this.name); } } public int compareTo(CompositeKey o) { int returnValue = -1; returnValue = checkNullsAndCompare(this.id, o.id); if (returnValue != 0) { return returnValue; } returnValue = checkNullsAndCompare(this.name, o.name); if (returnValue != 0) { return returnValue; } return returnValue; } private int checkNullsAndCompare(Object object1, Object object2) { int returnValue = 0; if (object1 instanceof Comparable && object2 instanceof Comparable) { returnValue = ((Comparable) object1).compareTo(object2); } else if (object1 != null && object2 != null) { returnValue = compareStrings(object1.toString(), object2.toString()); } else if (object1 == null && object2 != null) { returnValue = 1; } else if (object1 != null && object2 == null) { returnValue = -1; } else { returnValue = 0; } return returnValue; } private int compareStrings(String string1, String string2) { return string1.compareTo(string2); } public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((this.id == null) ? 0 : this.id.hashCode()); result = prime * result + ((this.name == null) ? 0 : this.name.hashCode()); return result; } public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; final CompositeKey other = (CompositeKey) obj; if (this.id == null) { if (other.id != null) return false; } else if (!this.id.equals(other.id)) return false; if (this.name == null) { if (other.name != null) return false; } else if (!this.name.equals(other.name)) return false; return true; } }
Step 2: create the map class, use the composite key as key output,text that contain name as value output
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, CompositeKey, Text> { CompositeKey ck = new CompositeKey(); Text v = new Text(); public void map(LongWritable key, Text value, OutputCollector<CompositeKey, Text> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); String id = tokenizer.nextToken(); String name = tokenizer.nextToken(); ck.id = Long.valueOf(id); ck.name = name; v.set(name); output.collect(ck, v); } }
Step 3 : create the reduce class, only output key,value
public static class Reduce extends MapReduceBase implements Reducer<CompositeKey, Text, LongWritable, Text> { public void reduce(CompositeKey key, Iterator<Text> values, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException { while (values.hasNext()) { output.collect(new LongWritable(key.id), values.next()); } } }
Step 4 : set the natural key partitioner,then the same id data go to the same reduce
public static class NaturalKeyPartitioner implements org.apache.hadoop.mapred.Partitioner<CompositeKey, Text> { @Override public int getPartition(CompositeKey key, Text value, int num) { return (getHashCode(key) & Integer.MAX_VALUE) % num; } @Override public void configure(JobConf arg0) { } public int getHashCode(CompositeKey key) { return (key.id == null) ? 0 : key.id.hashCode(); } }
Step 5 : create natural comparator for the value group
public static class NaturalKeyComparator extends org.apache.hadoop.io.WritableComparator { protected NaturalKeyComparator() { super(CompositeKey.class, true); } public int compare(WritableComparable a, WritableComparable b) { CompositeKey key1 = (CompositeKey) a; CompositeKey key2 = (CompositeKey) b; return key1.id.compareTo(key2.id); } }
Step 6 : config the map reduce job
public static void main(String[] args) throws Exception { JobConf conf = new JobConf(SortValue.class); conf.setJobName("sortvalue"); conf.setPartitionerClass(NaturalKeyPartitioner.class); conf.setOutputValueGroupingComparator(NaturalKeyComparator.class); conf.setMapOutputKeyClass(CompositeKey.class); conf.setMapOutputValueClass(Text.class); conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(Text.class); conf.setMapperClass(Map.class); // conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); }
相关文章推荐
- 用MPI实现Hadoop Map/Reduce的TeraSort
- hadoop map reduce secondary sorting
- Hadoop Map/Reduce编程模型实现海量数据处理—数字求和-Hadoop学习
- Hadoop中Map-Reduce处理逻辑理解(转)
- Hadoop实战:使用Combiner提高Map/Reduce程序效率
- Hadoop学习总结之三:Map-Reduce入门
- hadoop中每个节点map和reduce个数的设置调优
- Hadoop 笔记之Map && Reduce数量确定
- hadoop2.2.0 yarn-site.xml--Map Reduce configuration
- Hadoop---在HDFS集群基础上搭建Map/Reduce集群
- 利用mrunit进行hadoop map/reduce单元测试
- hadoop学习笔记之深入了解map-reduce
- Hadoop之——Eclipse连接Hadoop / An internal error occurred during: "Map/Reduce location status updater"
- 基于C++的Hadoop Map/Reduce框架--HCE
- (转载)Hadoop -- Map-Reduce入门
- hadoop执行mapreduce任务,能够map,不能reduce,Shuffle阶段报错
- hadoop中map和reduce的数量设置
- Hadoop学习总结之四:Map-Reduce的过程解析
- Hadoop学习总结之四:Map-Reduce的过程解析
- Hadoop Map/Reduce Implementation