hadoop自定义排序 步骤1.4
2014-01-01 18:46
429 查看
将如下数字,以第一行升序,如果第一行相同以第二行升序的方式排序。
3 2
2 2
3 1
1 1
3 3
3 1
2 1
--------------------------------------------
1 1
2 1
2 2
3 1
3 2
3 3
因为hadoop排序只能以key2进行排序,所以我们需要自定义key2类型,并且覆写equals,hashCode,compareTo三个方法。
Mapper
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MyMapper extends Mapper<LongWritable, Text, SortWritable, LongWritable> {
protected void map(LongWritable key, Text value,
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, SortWritable, LongWritable>.Context context)
throws java.io.IOException, InterruptedException {
String[] splits = value.toString().split("\t");
String first = splits[0];
String second = splits[1];
SortWritable sw = new SortWritable(Long.parseLong(first), Long.parseLong(second));
context.write(sw, new LongWritable(Long.parseLong(second)));
};
}
Reduce
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class MyReduce extends Reducer<SortWritable, LongWritable, LongWritable, LongWritable> {
@Override
protected void reduce(SortWritable key2, java.lang.Iterable<LongWritable> values2,
org.apache.hadoop.mapreduce.Reducer<SortWritable, LongWritable, LongWritable, LongWritable>.Context context)
throws java.io.IOException, InterruptedException {
context.write(new LongWritable(key2.first), new LongWritable(key2.second));
};
}
自定义Key2,需要实现WritableComparable接口,而不是Writable接口
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class SortWritable implements WritableComparable<SortWritable> {
Long first = 1l;
Long second = 1l;
public SortWritable() {
}
public SortWritable(Long first, Long second) {
this.first = first;
this.second = second;
}
@Override
public void readFields(DataInput in) throws IOException {
this.first = in.readLong();
this.second = in.readLong();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(first);
out.writeLong(second);
}
@Override
public int compareTo(SortWritable o) {
if (this.first == o.first) {
return (int) (this.second - o.second);
}
return (int) (this.first - o.first);
}
@Override
public int hashCode() {
return first.hashCode() + second.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof SortWritable) {
SortWritable sw = (SortWritable) obj;
return this.first == sw.first && this.second == sw.second;
}
return false;
}
}
测试类:
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class SortTest {
private static final String INPUT_PATH = "hdfs://xxc:9000/input";
private static final String OUT_PATH = "hdfs://xxc:9000/out";
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
Path outPath = new Path(OUT_PATH);
if(fileSystem.exists(outPath)){
fileSystem.delete(outPath, true);
}
Job job = new Job(conf,SortTest.class.getSimpleName());
FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(SortWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
job.setOutputFormatClass(TextOutputFormat.class);
job.waitForCompletion(true);
}
}
3 2
2 2
3 1
1 1
3 3
3 1
2 1
--------------------------------------------
1 1
2 1
2 2
3 1
3 2
3 3
因为hadoop排序只能以key2进行排序,所以我们需要自定义key2类型,并且覆写equals,hashCode,compareTo三个方法。
Mapper
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MyMapper extends Mapper<LongWritable, Text, SortWritable, LongWritable> {
protected void map(LongWritable key, Text value,
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, SortWritable, LongWritable>.Context context)
throws java.io.IOException, InterruptedException {
String[] splits = value.toString().split("\t");
String first = splits[0];
String second = splits[1];
SortWritable sw = new SortWritable(Long.parseLong(first), Long.parseLong(second));
context.write(sw, new LongWritable(Long.parseLong(second)));
};
}
Reduce
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class MyReduce extends Reducer<SortWritable, LongWritable, LongWritable, LongWritable> {
@Override
protected void reduce(SortWritable key2, java.lang.Iterable<LongWritable> values2,
org.apache.hadoop.mapreduce.Reducer<SortWritable, LongWritable, LongWritable, LongWritable>.Context context)
throws java.io.IOException, InterruptedException {
context.write(new LongWritable(key2.first), new LongWritable(key2.second));
};
}
自定义Key2,需要实现WritableComparable接口,而不是Writable接口
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class SortWritable implements WritableComparable<SortWritable> {
Long first = 1l;
Long second = 1l;
public SortWritable() {
}
public SortWritable(Long first, Long second) {
this.first = first;
this.second = second;
}
@Override
public void readFields(DataInput in) throws IOException {
this.first = in.readLong();
this.second = in.readLong();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(first);
out.writeLong(second);
}
@Override
public int compareTo(SortWritable o) {
if (this.first == o.first) {
return (int) (this.second - o.second);
}
return (int) (this.first - o.first);
}
@Override
public int hashCode() {
return first.hashCode() + second.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof SortWritable) {
SortWritable sw = (SortWritable) obj;
return this.first == sw.first && this.second == sw.second;
}
return false;
}
}
测试类:
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class SortTest {
private static final String INPUT_PATH = "hdfs://xxc:9000/input";
private static final String OUT_PATH = "hdfs://xxc:9000/out";
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
Path outPath = new Path(OUT_PATH);
if(fileSystem.exists(outPath)){
fileSystem.delete(outPath, true);
}
Job job = new Job(conf,SortTest.class.getSimpleName());
FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(SortWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
job.setOutputFormatClass(TextOutputFormat.class);
job.waitForCompletion(true);
}
}
相关文章推荐
- hadoop自定义分组 步骤1.4
- Hadoop之MapReduce自定义二次排序流程实例详解
- 学习Hadoop第十五课(自定义排序)
- hadoop 自定义排序
- 从自定义排序深入理解单机hadoop执行mapreduce过程
- hadoop提交作业自定义排序和分组
- Hadoop入门案例(四)全排序之自定义分区 字符串(单词)排序
- 流量汇总(自定义jar包,在hadoop集群上 统计,排序,分组)之统计
- hadoop提交作业自定义排序和分组
- hadoop mapreduce自定义排序
- 【Hadoop】Hadoop MR 自定义排序
- hadoop的自定义排序
- Hadoop自定义Writable实现二次排序
- hadoop自定义排序、分组、分区(温度统计)
- Hadoop mapreduce自定义排序WritableComparable
- Hadoop之MapReduce自定义二次排序流程实例详解
- Hadoop自定义排序和自定义数据类型使用(setSortComparatorClass和setGroupingComparatorClass)
- hadoop自定义排序对比器和分组对比器
- Hadoop之MapReduce自定义二次排序流程实例详解
- Hadoop之MapReduce-自定义排序编程