MapReduce之二次排序
2017-11-21 15:17
162 查看
总结二次排序的要点:
1、组合key,自定义数据类型
-》继承WritableComparable
2、保证原来的分组规则不变,自定义分组规则
-》继承RawComparator
3、保证原来的分区规则不变,自定义分区规则
-》继承partitioner
代码实现如下:
主类:
package com.bigdata.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.bigdata.mapreduce.SecondaryMapReduce.SortMapper.SortReducer; public class SecondaryMapReduce extends Configured implements Tool { // step 1: Mapper Class public static class SortMapper extends Mapper<LongWritable, Text, PairWritable, IntWritable> { private PairWritable mapOutputKey = new PairWritable();//map输出的自定义 key 类型 private IntWritable mapOutputValue = new IntWritable();//map输出的 value类型 @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 读取文件的每一行,将Text类型转换成String类型 String lineValue = value.toString(); String[] strs = lineValue.split(","); if(2!=strs.length){//如果字符串的长度 不等于2 直接退出 return; } // 设置key value输出 mapOutputKey.set(strs[0],Integer.valueOf(strs[1])); mapOutputValue.set(Integer.valueOf(strs[1])); // map输出 context.write(mapOutputKey, mapOutputValue); } // step2: Reducer Class public static class SortReducer extends Reducer<PairWritable, IntWritable, Text, IntWritable> { private Text outputKey = new Text();//reduce 的输出 key 类型 @Override protected void reduce(PairWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for(IntWritable value:values){ outputKey.set(key.getFirst()); context.write(outputKey, value); } } } } // step3: Driver public int run(String[] args) throws Exception { // 获取集群中的相关配置信息 Configuration configuration = this.getConf(); // 创建一个Job任务 Job job = Job.getInstance(configuration, this.getClass().getSimpleName()); // 整个MapReduce程序运行的入口,或者叫jar包的入口,jar具体运行的是哪个类 job.setJarByClass(this.getClass()); // 设置Job // input输入,输入路径 Path inpath = new Path(args[0]); FileInputFormat.addInputPath(job, inpath); // outout输出,输出路径 Path outpath = new Path(args[1]); FileOutputFormat.setOutputPath(job, outpath); // 设置Mapper job.setMapperClass(SortMapper.class); job.setMapOutputKeyClass(PairWritable.class); job.setMapOutputValueClass(IntWritable.class); // ============shuffle=============== // 1.分区 job.setPartitionerClass(FirstPartitioner.class); // 2.分组 job.setGroupingComparatorClass(FirstCrouingComparator.class); // ============shuffle=============== // 设置Reducer job.setReducerClass(SortReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setNumReduceTasks(2);//设置reduce数目 // 提交Job -》 YARN boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1; } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); args = new String[] { "hdfs://bigdata-01.yushu.com:8020/sort/input", "hdfs://bigdata-01.yushu.com:8020/sort/output" }; // run job int status = ToolRunner.run(configuration, new SecondaryMapReduce(), args); // exit program System.exit(status); } }
自定义Key数据类型:
package com.bigdata.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class PairWritable implements WritableComparable<PairWritable> {
private String first;
private int second;
public PairWritable() {
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((first == null) ? 0 : first.hashCode());
result = prime * result + second;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
PairWritable other = (PairWritable) obj;
if (first == null) {
if (other.first != null)
return false;
} else if (!first.equals(other.first))
return false;
if (second != other.second)
return false;
return true;
}
public PairWritable(String first, int second) {
this.set(first, second);
}
public void set(String first, int second) {
this.setFirst(first);
this.setSecond(second);
}
public String getFirst() {
return first;
}
public void setFirst(String first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(first);
out.writeInt(second);
}
public void readFields(DataInput in) throws IOException {
this.first = in.readUTF();
this.second = in.readInt();
}
public int compareTo(PairWritable o) {
int comp = this.first.compareTo(o.getFirst());
// 判断,如果两个比较字段不等于0,那么直接返回,不需要比较第二个字段,否则就需要比较第二个参数
if (0 != comp) {
return comp;
}
return Integer.valueOf(getSecond()).compareTo(Integer.valueOf(o.getSecond()));
}
@Override
public String toString() {
return first + "\t" + second;
}
}
自定义分区函数类:
package com.bigdata.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class FirstPartitioner extends Partitioner<PairWritable, IntWritable> {
@Override
public int getPartition(PairWritable key, IntWritable value, int numPartitions) {
return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
自定义分组类:
package com.bigdata.mapreduce;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;
public class FirstCrouingComparator implements RawComparator<PairWritable> {
public int compare(PairWritable o1, PairWritable o2) {
return o1.getFirst().compareTo(o2.getFirst());
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return WritableComparator.compareBytes(b1, 0, l1 - 4, b2, 0, l2 - 4);
}
}
最终输出:
结果分区:[/b]
相关文章推荐
- 【Hadoop】MapReduce温度排序之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- Hadoop MapReduce编程 API入门系列之二次排序(十六)
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce程序之二次排序与多次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序
- MapReduce之二次排序