mapreduce,自定义分区,分组,排序实现join
2017-06-28 17:43
495 查看
join1.txt:
1 a
2 b
3 c
4 d
join2.txt:
1 111
1 222
2 333
2 444
3 555
3 666
4 777
4 888
4 999
自定义类:
package myhadoop;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class JoinBean implements WritableComparable<JoinBean> {
private int id;
private int diff;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getDiff() {
return diff;
}
public void setDiff(int diff) {
this.diff = diff;
}
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.diff = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(id);
out.writeInt(diff);
}
@Override
public int compareTo(JoinBean o) {
int result = Integer.compare(this.id, o.getId());
if(result!=0){
return result;
}
return Integer.compare(this.diff, o.getDiff());
}
@Override
public int hashCode() {
return Integer.valueOf(id+diff).hashCode();
}
@Override
public String toString() {
return id+"..."+diff;
}
}
自定义排序:
package myhadoop;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class JoinSort extends WritableComparator{
public JoinSort() {
super(JoinBean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
JoinBean join1 = (JoinBean) a;
JoinBean join2 = (JoinBean) b;
int result = Integer.compare(join1.getId(), join2.getId());
if(result!=0){
return result;
}
return Integer.compare(join1.getDiff(), join2.getDiff());
}
}
自定义分区:
package myhadoop;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class JoinPartition extends Partitioner<JoinBean, Text>{
@Override
public int getPartition(JoinBean key, Text value, int num) {
return (key.getId()*127)%num;
}
}
自定义分组:
package myhadoop;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class JoinGroup extends WritableComparator{
public JoinGroup() {
super(JoinBean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
JoinBean join1 = (JoinBean) a;
JoinBean join2 = (JoinBean) b;
return Integer.compare(join1.getId(), join2.getId());
}
}
mapreduce:
package myhadoop;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class JoinMapperReduce {
static class JoinMapper1 extends Mapper<LongWritable, Text, JoinBean, Text>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] lines = line.split(" ");
if(lines.length==2){
JoinBean joinBean = new JoinBean();
joinBean.setId(Integer.parseInt(lines[0]));
joinBean.setDiff(0);
context.write(joinBean, new Text(lines[1]));
}
}
}
static class JoinMapper2 extends Mapper<LongWritable, Text, JoinBean, Text>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] lines = line.split(" ");
if(lines.length==2){
JoinBean joinBean = new JoinBean();
joinBean.setId(Integer.parseInt(lines[0]));
joinBean.setDiff(1);
context.write(joinBean, new Text(lines[1]));
}
}
}
static class JoinReduce extends Reducer<JoinBean, Text, Text, Text>{
@Override
protected void reduce(JoinBean key, Iterable<Text> value, Context context)
throws IOException, InterruptedException {
Iterator<Text> values = value.iterator();
String join1 = values.next().toString();
while(values.hasNext()){
String join2 = values.next().toString();
context.write(new Text(key.getId()+""), new Text(join1+" "+join2));
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
// String[] outs = new GenericOptionsParser(configuration, args).getRemainingArgs();
Job job = Job.getInstance(configuration, "join");
job.setJarByClass(JoinMapperReduce.class);
MultipleInputs.addInputPath(job, new Path("/test/join1.txt"), TextInputFormat.class, JoinMapper1.class);
MultipleInputs.addInputPath(job, new Path("/test/join2.txt"), TextInputFormat.class, JoinMapper2.class);
job.setReducerClass(JoinReduce.class);
job.setMapOutputKeyClass(JoinBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(JoinPartition.class);
job.setGroupingComparatorClass(JoinGroup.class);
job.setSortComparatorClass(JoinSort.class);
job.setNumReduceTasks(1);
FileOutputFormat.setOutputPath(job, new Path("/test/output"));
System.exit(job.waitForCompletion(true)==true?0:1);;
}
}
输出:
[hadoop@master hadoop]$ hadoop fs -cat /test/output/part-r-00000
1 a 222
1 a 111
2 b 444
2 b 333
3 c 666
3 c 555
4 d 999
4 d 888
4 d 777
1 a
2 b
3 c
4 d
join2.txt:
1 111
1 222
2 333
2 444
3 555
3 666
4 777
4 888
4 999
自定义类:
package myhadoop;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class JoinBean implements WritableComparable<JoinBean> {
private int id;
private int diff;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getDiff() {
return diff;
}
public void setDiff(int diff) {
this.diff = diff;
}
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.diff = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(id);
out.writeInt(diff);
}
@Override
public int compareTo(JoinBean o) {
int result = Integer.compare(this.id, o.getId());
if(result!=0){
return result;
}
return Integer.compare(this.diff, o.getDiff());
}
@Override
public int hashCode() {
return Integer.valueOf(id+diff).hashCode();
}
@Override
public String toString() {
return id+"..."+diff;
}
}
自定义排序:
package myhadoop;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class JoinSort extends WritableComparator{
public JoinSort() {
super(JoinBean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
JoinBean join1 = (JoinBean) a;
JoinBean join2 = (JoinBean) b;
int result = Integer.compare(join1.getId(), join2.getId());
if(result!=0){
return result;
}
return Integer.compare(join1.getDiff(), join2.getDiff());
}
}
自定义分区:
package myhadoop;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class JoinPartition extends Partitioner<JoinBean, Text>{
@Override
public int getPartition(JoinBean key, Text value, int num) {
return (key.getId()*127)%num;
}
}
自定义分组:
package myhadoop;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class JoinGroup extends WritableComparator{
public JoinGroup() {
super(JoinBean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
JoinBean join1 = (JoinBean) a;
JoinBean join2 = (JoinBean) b;
return Integer.compare(join1.getId(), join2.getId());
}
}
mapreduce:
package myhadoop;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class JoinMapperReduce {
static class JoinMapper1 extends Mapper<LongWritable, Text, JoinBean, Text>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] lines = line.split(" ");
if(lines.length==2){
JoinBean joinBean = new JoinBean();
joinBean.setId(Integer.parseInt(lines[0]));
joinBean.setDiff(0);
context.write(joinBean, new Text(lines[1]));
}
}
}
static class JoinMapper2 extends Mapper<LongWritable, Text, JoinBean, Text>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] lines = line.split(" ");
if(lines.length==2){
JoinBean joinBean = new JoinBean();
joinBean.setId(Integer.parseInt(lines[0]));
joinBean.setDiff(1);
context.write(joinBean, new Text(lines[1]));
}
}
}
static class JoinReduce extends Reducer<JoinBean, Text, Text, Text>{
@Override
protected void reduce(JoinBean key, Iterable<Text> value, Context context)
throws IOException, InterruptedException {
Iterator<Text> values = value.iterator();
String join1 = values.next().toString();
while(values.hasNext()){
String join2 = values.next().toString();
context.write(new Text(key.getId()+""), new Text(join1+" "+join2));
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
// String[] outs = new GenericOptionsParser(configuration, args).getRemainingArgs();
Job job = Job.getInstance(configuration, "join");
job.setJarByClass(JoinMapperReduce.class);
MultipleInputs.addInputPath(job, new Path("/test/join1.txt"), TextInputFormat.class, JoinMapper1.class);
MultipleInputs.addInputPath(job, new Path("/test/join2.txt"), TextInputFormat.class, JoinMapper2.class);
job.setReducerClass(JoinReduce.class);
job.setMapOutputKeyClass(JoinBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(JoinPartition.class);
job.setGroupingComparatorClass(JoinGroup.class);
job.setSortComparatorClass(JoinSort.class);
job.setNumReduceTasks(1);
FileOutputFormat.setOutputPath(job, new Path("/test/output"));
System.exit(job.waitForCompletion(true)==true?0:1);;
}
}
输出:
[hadoop@master hadoop]$ hadoop fs -cat /test/output/part-r-00000
1 a 222
1 a 111
2 b 444
2 b 333
3 c 666
3 c 555
4 d 999
4 d 888
4 d 777
相关文章推荐
- mapreduce,自定义排序,分区,分组实现按照年份升序排序,温度降序排序
- 「 Hadoop」mapreduce对温度数据进行自定义排序、分组、分区等
- 十一、理解MapReduce的二次排序功能,包括自定义数据类型、分区、分组、排序
- MapReduce的自定义排序、分区和分组
- Hadoop Mapreduce分区、分组、二次排序过程详解[转]
- 第二个MapReduce程序----flowcount(流量统计,自定义排序,自定义分区)
- MapReduce的分区与 分组二次排序
- MapReduce---自定义分区类实现全排序
- MapReduce处理二次排序(分区-排序-分组)
- Hadoop中自定义排序,分区,分组
- 一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现
- Mapreduce中的 自定义类型、分组与二次排序
- Hadoop——自定义数据类型,实现WritableComparable, 并且 分组,排序
- Hadoop Mapreduce分区、分组、二次排序过程详解
- Hadoop Mapreduce分区、分组、连接以及辅助排序(也叫二次排序)过程详解
- Hadoop 自定义排序,自定义分区,自定义分组
- 自定义分区、数据类型、排序、分组
- MapReduce自定义分组实现
- MapReduce二次排序分区,分组优化
- HADOOP(2)__Mapreduce分区、排序、分组