您的位置:首页 > 其它

MapReduce之二次排序

2017-11-21 15:17 141 查看






总结二次排序的要点:

1、组合key,自定义数据类型
-》继承WritableComparable

2、保证原来的分组规则不变,自定义分组规则
-》继承RawComparator

3、保证原来的分区规则不变,自定义分区规则
-》继承partitioner


代码实现如下:

主类:

package com.bigdata.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.bigdata.mapreduce.SecondaryMapReduce.SortMapper.SortReducer;

public class SecondaryMapReduce extends Configured implements Tool {

// step 1: Mapper Class
public static class SortMapper extends Mapper<LongWritable, Text, PairWritable, IntWritable> {

private PairWritable mapOutputKey = new PairWritable();//map输出的自定义 key 类型

private IntWritable mapOutputValue = new IntWritable();//map输出的 value类型

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 读取文件的每一行,将Text类型转换成String类型
String lineValue = value.toString();
String[] strs = lineValue.split(",");
if(2!=strs.length){//如果字符串的长度 不等于2 直接退出
return;
}
// 设置key value输出
mapOutputKey.set(strs[0],Integer.valueOf(strs[1]));
mapOutputValue.set(Integer.valueOf(strs[1]));
// map输出
context.write(mapOutputKey, mapOutputValue);
}

// step2: Reducer Class
public static class SortReducer extends Reducer<PairWritable, IntWritable, Text, IntWritable> {

private Text outputKey = new Text();//reduce 的输出 key 类型

@Override
protected void reduce(PairWritable key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
for(IntWritable value:values){
outputKey.set(key.getFirst());
context.write(outputKey, value);
}

}
}

}

// step3: Driver
public int run(String[] args) throws Exception {

// 获取集群中的相关配置信息
Configuration configuration = this.getConf();

// 创建一个Job任务
Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
// 整个MapReduce程序运行的入口,或者叫jar包的入口,jar具体运行的是哪个类
job.setJarByClass(this.getClass());

// 设置Job
// input输入,输入路径
Path inpath = new Path(args[0]);
FileInputFormat.addInputPath(job, inpath);

// outout输出,输出路径
Path outpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outpath);

// 设置Mapper
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(PairWritable.class);
job.setMapOutputValueClass(IntWritable.class);

// ============shuffle===============
// 1.分区
job.setPartitionerClass(FirstPartitioner.class);

// 2.分组
job.setGroupingComparatorClass(FirstCrouingComparator.class);
// ============shuffle===============

// 设置Reducer
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setNumReduceTasks(2);//设置reduce数目

// 提交Job -》 YARN
boolean isSuccess = job.waitForCompletion(true);

return isSuccess ? 0 : 1;

}

public static void main(String[] args) throws Exception {

Configuration configuration = new Configuration();
args = new String[] { "hdfs://bigdata-01.yushu.com:8020/sort/input",
"hdfs://bigdata-01.yushu.com:8020/sort/output" };

// run job
int status = ToolRunner.run(configuration, new SecondaryMapReduce(), args);
// exit program
System.exit(status);

}

}

自定义Key数据类型:

package com.bigdata.mapreduce;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class PairWritable implements WritableComparable<PairWritable> {

private String first;
private int second;

public PairWritable() {

}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((first == null) ? 0 : first.hashCode());
result = prime * result + second;
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
PairWritable other = (PairWritable) obj;
if (first == null) {
if (other.first != null)
return false;
} else if (!first.equals(other.first))
return false;
if (second != other.second)
return false;
return true;
}

public PairWritable(String first, int second) {
this.set(first, second);
}

public void set(String first, int second) {
this.setFirst(first);
this.setSecond(second);
}

public String getFirst() {
return first;
}

public void setFirst(String first) {
this.first = first;
}

public int getSecond() {
return second;
}

public void setSecond(int second) {
this.second = second;
}

public void write(DataOutput out) throws IOException {
out.writeUTF(first);
out.writeInt(second);
}

public void readFields(DataInput in) throws IOException {
this.first = in.readUTF();
this.second = in.readInt();
}

public int compareTo(PairWritable o) {
int comp = this.first.compareTo(o.getFirst());

// 判断,如果两个比较字段不等于0,那么直接返回,不需要比较第二个字段,否则就需要比较第二个参数
if (0 != comp) {
return comp;
}
return Integer.valueOf(getSecond()).compareTo(Integer.valueOf(o.getSecond()));
}

@Override
public String toString() {
return first + "\t" + second;
}
}

自定义分区函数类:
package com.bigdata.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class FirstPartitioner extends Partitioner<PairWritable, IntWritable> {

@Override
public int getPartition(PairWritable key, IntWritable value, int numPartitions) {

return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}


自定义分组类:

package com.bigdata.mapreduce;

import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;

public class FirstCrouingComparator implements RawComparator<PairWritable> {

public int compare(PairWritable o1, PairWritable o2) {
return o1.getFirst().compareTo(o2.getFirst());
}

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return WritableComparator.compareBytes(b1, 0, l1 - 4, b2, 0, l2 - 4);
}

}
最终输出:



结果分区:[/b]

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: