您的位置:首页 > 其它

mapreduce,自定义分区,分组,排序实现join

2017-06-28 17:43 495 查看
join1.txt:

1 a
2 b
3 c
4 d

join2.txt:
1 111
1 222
2 333
2 444
3 555
3 666
4 777
4 888
4 999

自定义类:

package myhadoop;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class JoinBean implements WritableComparable<JoinBean> {

private int id;
private int diff;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getDiff() {
return diff;
}
public void setDiff(int diff) {
this.diff = diff;
}

@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.diff = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(id);
out.writeInt(diff);
}
@Override
public int compareTo(JoinBean o) {
int result = Integer.compare(this.id, o.getId());
if(result!=0){
return result;
}
return Integer.compare(this.diff, o.getDiff());
}

@Override
public int hashCode() {
return Integer.valueOf(id+diff).hashCode();
}

@Override
public String toString() {
return id+"..."+diff;
}

}


自定义排序:
package myhadoop;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class JoinSort extends WritableComparator{

public JoinSort() {
super(JoinBean.class,true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
JoinBean join1 = (JoinBean) a;
JoinBean join2 = (JoinBean) b;
int result = Integer.compare(join1.getId(), join2.getId());
if(result!=0){
return result;
}
return Integer.compare(join1.getDiff(), join2.getDiff());
}

}


自定义分区:
package myhadoop;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class JoinPartition extends Partitioner<JoinBean, Text>{

@Override
public int getPartition(JoinBean key, Text value, int num) {
return (key.getId()*127)%num;
}

}


自定义分组:
package myhadoop;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class JoinGroup extends WritableComparator{

public JoinGroup() {
super(JoinBean.class,true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
JoinBean join1 = (JoinBean) a;
JoinBean join2 = (JoinBean) b;
return Integer.compare(join1.getId(), join2.getId());
}

}


mapreduce:
package myhadoop;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class JoinMapperReduce {

static class JoinMapper1 extends Mapper<LongWritable, Text, JoinBean, Text>{

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] lines = line.split(" ");
if(lines.length==2){
JoinBean joinBean = new JoinBean();
joinBean.setId(Integer.parseInt(lines[0]));
joinBean.setDiff(0);
context.write(joinBean, new Text(lines[1]));
}
}

}

static class JoinMapper2 extends Mapper<LongWritable, Text, JoinBean, Text>{

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] lines = line.split(" ");
if(lines.length==2){
JoinBean joinBean = new JoinBean();
joinBean.setId(Integer.parseInt(lines[0]));
joinBean.setDiff(1);
context.write(joinBean, new Text(lines[1]));
}
}

}

static class JoinReduce extends Reducer<JoinBean, Text, Text, Text>{

@Override
protected void reduce(JoinBean key, Iterable<Text> value, Context context)
throws IOException, InterruptedException {
Iterator<Text> values = value.iterator();
String join1 = values.next().toString();
while(values.hasNext()){
String join2 = values.next().toString();
context.write(new Text(key.getId()+""), new Text(join1+" "+join2));
}

}

}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
// String[] outs = new GenericOptionsParser(configuration, args).getRemainingArgs();

Job job = Job.getInstance(configuration, "join");

job.setJarByClass(JoinMapperReduce.class);
MultipleInputs.addInputPath(job, new Path("/test/join1.txt"), TextInputFormat.class, JoinMapper1.class);
MultipleInputs.addInputPath(job, new Path("/test/join2.txt"), TextInputFormat.class, JoinMapper2.class);
job.setReducerClass(JoinReduce.class);
job.setMapOutputKeyClass(JoinBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setPartitionerClass(JoinPartition.class);
job.setGroupingComparatorClass(JoinGroup.class);
job.setSortComparatorClass(JoinSort.class);
job.setNumReduceTasks(1);

FileOutputFormat.setOutputPath(job, new Path("/test/output"));

System.exit(job.waitForCompletion(true)==true?0:1);;

}

}


输出:
[hadoop@master hadoop]$ hadoop fs -cat /test/output/part-r-00000
1 a 222
1 a 111
2 b 444
2 b 333
3 c 666
3 c 555
4 d 999
4 d 888
4 d 777
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: