您的位置:首页 > 运维架构

Hadoop Map Reduce Secondary Sort

2014-05-27 14:30 246 查看
How to sort the value?

Hadoop.The.Definitive.Guide.3rd.Edition show that answer:

1. Make the key a composite of the natural key and the natural value.

2. The sort comparator should order by the composite key, that is, the natural key

and natural value.

3. The partitioner and grouping comparator for the composite key should consider

only the natural key for partitioning and grouping.

A example :

natural key : id

natural value : name

sort name asc of the same id

Step 1 : create the composite key class

public static class CompositeKey implements
WritableComparable<CompositeKey> {
public Long id;
public String name;
public void readFields(DataInput in) throws IOException {
if (in.readByte() != -1) {
this.id = in.readLong();
}
if (in.readByte() != -1) {
this.name = Text.readString(in);
}
}
public void write(DataOutput out) throws IOException {
if (this.id == null) {
out.writeByte(-1);
} else {
out.writeByte(0);
out.writeLong(this.id);
}
if (this.name == null) {
out.writeByte(-1);
} else {
out.writeByte(0);
Text.writeString(out, this.name);
}
}
public int compareTo(CompositeKey o) {
int returnValue = -1;
returnValue = checkNullsAndCompare(this.id, o.id);
if (returnValue != 0) {
return returnValue;
}
returnValue = checkNullsAndCompare(this.name, o.name);
if (returnValue != 0) {
return returnValue;
}
return returnValue;
}
private int checkNullsAndCompare(Object object1, Object object2) {
int returnValue = 0;
if (object1 instanceof Comparable && object2 instanceof Comparable) {
returnValue = ((Comparable) object1).compareTo(object2);
} else if (object1 != null && object2 != null) {
returnValue = compareStrings(object1.toString(),
object2.toString());
} else if (object1 == null && object2 != null) {
returnValue = 1;
} else if (object1 != null && object2 == null) {
returnValue = -1;
} else {
returnValue = 0;
}
return returnValue;
}
private int compareStrings(String string1, String string2) {
return string1.compareTo(string2);
}
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result
+ ((this.id == null) ? 0 : this.id.hashCode());
result = prime * result
+ ((this.name == null) ? 0 : this.name.hashCode());
return result;
}
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
final CompositeKey other = (CompositeKey) obj;
if (this.id == null) {
if (other.id != null)
return false;
} else if (!this.id.equals(other.id))
return false;
if (this.name == null) {
if (other.name != null)
return false;
} else if (!this.name.equals(other.name))
return false;
return true;
}
}


Step 2: create the map class, use the composite key as key output,text that contain name as value output

public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, CompositeKey, Text> {
CompositeKey ck = new CompositeKey();
Text v = new Text();
public void map(LongWritable key, Text value,
OutputCollector<CompositeKey, Text> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
String id = tokenizer.nextToken();
String name = tokenizer.nextToken();
ck.id = Long.valueOf(id);
ck.name = name;
v.set(name);
output.collect(ck, v);
}
}


Step 3 : create the reduce class, only output key,value

public static class Reduce extends MapReduceBase implements
Reducer<CompositeKey, Text, LongWritable, Text> {
public void reduce(CompositeKey key, Iterator<Text> values,
OutputCollector<LongWritable, Text> output, Reporter reporter)
throws IOException {
while (values.hasNext()) {
output.collect(new LongWritable(key.id), values.next());
}
}
}


Step 4 : set the natural key partitioner,then the same id data go to the same reduce

public static class NaturalKeyPartitioner implements
org.apache.hadoop.mapred.Partitioner<CompositeKey, Text> {
@Override
public int getPartition(CompositeKey key, Text value, int num) {
return (getHashCode(key) & Integer.MAX_VALUE) % num;
}
@Override
public void configure(JobConf arg0) {
}
public int getHashCode(CompositeKey key) {
return (key.id == null) ? 0 : key.id.hashCode();
}
}


Step 5 : create natural comparator for the value group

public static class NaturalKeyComparator extends
org.apache.hadoop.io.WritableComparator {
protected NaturalKeyComparator() {
super(CompositeKey.class, true);
}
public int compare(WritableComparable a, WritableComparable b) {
CompositeKey key1 = (CompositeKey) a;
CompositeKey key2 = (CompositeKey) b;
return key1.id.compareTo(key2.id);
}
}


Step 6 : config the map reduce job

public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(SortValue.class);
conf.setJobName("sortvalue");
conf.setPartitionerClass(NaturalKeyPartitioner.class);
conf.setOutputValueGroupingComparator(NaturalKeyComparator.class);
conf.setMapOutputKeyClass(CompositeKey.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Map.class);
// conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: