您的位置:首页 > 运维架构

hadoop自定义排序 步骤1.4

2014-01-01 18:46 429 查看
将如下数字,以第一行升序,如果第一行相同以第二行升序的方式排序。

3       2

2       2

3       1

1       1

3       3

3       1

2       1

--------------------------------------------

1       1

2       1

2       2

3       1

3       2

3       3


因为hadoop排序只能以key2进行排序,所以我们需要自定义key2类型,并且覆写equals,hashCode,compareTo三个方法。

Mapper

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<LongWritable, Text, SortWritable, LongWritable> {

protected void map(LongWritable key, Text value,
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, SortWritable, LongWritable>.Context context)
throws java.io.IOException, InterruptedException {
String[] splits = value.toString().split("\t");
String first = splits[0];
String second = splits[1];
SortWritable sw = new SortWritable(Long.parseLong(first), Long.parseLong(second));
context.write(sw, new LongWritable(Long.parseLong(second)));
};
}
Reduce
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReduce extends Reducer<SortWritable, LongWritable, LongWritable, LongWritable> {
@Override
protected void reduce(SortWritable key2, java.lang.Iterable<LongWritable> values2,
org.apache.hadoop.mapreduce.Reducer<SortWritable, LongWritable, LongWritable, LongWritable>.Context context)
throws java.io.IOException, InterruptedException {
context.write(new LongWritable(key2.first), new LongWritable(key2.second));
};
}
自定义Key2,需要实现WritableComparable接口,而不是Writable接口
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class SortWritable implements WritableComparable<SortWritable> {
Long first = 1l;
Long second = 1l;

public SortWritable() {
}

public SortWritable(Long first, Long second) {
this.first = first;
this.second = second;
}

@Override
public void readFields(DataInput in) throws IOException {
this.first = in.readLong();
this.second = in.readLong();
}

@Override
public void write(DataOutput out) throws IOException {
out.writeLong(first);
out.writeLong(second);
}

@Override
public int compareTo(SortWritable o) {
if (this.first == o.first) {
return (int) (this.second - o.second);
}
return (int) (this.first - o.first);
}

@Override
public int hashCode() {
return first.hashCode() + second.hashCode();
}

@Override
public boolean equals(Object obj) {
if (obj instanceof SortWritable) {
SortWritable sw = (SortWritable) obj;
return this.first == sw.first && this.second == sw.second;
}
return false;
}

}
测试类:
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class SortTest {
private static final String INPUT_PATH = "hdfs://xxc:9000/input";
private static final String OUT_PATH = "hdfs://xxc:9000/out";

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
Configuration conf = new Configuration();

FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
Path outPath = new Path(OUT_PATH);
if(fileSystem.exists(outPath)){
fileSystem.delete(outPath, true);
}

Job job = new Job(conf,SortTest.class.getSimpleName());

FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setInputFormatClass(TextInputFormat.class);

job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(SortWritable.class);
job.setMapOutputValueClass(LongWritable.class);

job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);

job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);

FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
job.setOutputFormatClass(TextOutputFormat.class);

job.waitForCompletion(true);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: