您的位置:首页 > 其它

MapReduce中实现Join操作

2017-03-18 23:06 369 查看

需求

订单数据表t_order的数据如下:

iddatepidamount
100120150710P00012
100220150710P00012
100220150710P00023
商品信息表t_product的数据如下:

idnamecategroy_idprice
P0001小米5C012
P0002锤子T1C013
要求用MapReduce实现

select a.id,a.date,b.name,b.categroy,bi.price from t_order a hoin t_product b on a.pid = b.id

思路

将t_order中pid与t_product中id相同的记录放到同一个Reduce中去。

设计实体类

InfoBean.java

package tech.mrbcy.bigdata.mr.rjoin;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class InfoBean implements Writable{

private int order_id = 0;
private String dateString = "";
private String p_id = "";
private int amount = 0;
private String pname = "";
private String category_id = "";
private float price = 0;
private int flag = -1; // =0是订单,=1是产品

public InfoBean(){}

public InfoBean(int order_id, String dateString, String p_id, int amount) {
super();
this.order_id = order_id;
this.dateString = dateString;
this.p_id = p_id;
this.amount = amount;
this.flag = 0;
}

public InfoBean(String p_id, String pname, String category_id, float price) {
super();
this.p_id = p_id;
this.pname = pname;
this.category_id = category_id;
this.price = price;
this.flag = 1;
}

public int getFlag() {
return flag;
}

public void setFlag(int flag) {
this.flag = flag;
}

public int getOrder_id() {
return order_id;
}

public void setOrder_id(int order_id) {
this.order_id = order_id;
}

public String getDateString() {
return dateString;
}

public void setDateString(String dateString) {
this.dateString = dateString;
}

public String getP_id() {
return p_id;
}

public void setP_id(String p_id) {
this.p_id = p_id;
}

public int getAmount() {
return amount;
}

public void setAmount(int amount) {
this.amount = amount;
}

public String getPname() {
return pname;
}

public void setPname(String pname) {
this.pname = pname;
}

public String getCategory_id() {
return category_id;
}

public void setCategory_id(String category_id) {
this.category_id = category_id;
}

public float getPrice() {
return price;
}

public void setPrice(float price) {
this.price = price;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(order_id);
out.writeUTF(dateString);
out.writeUTF(p_id);
out.writeInt(amount);
out.writeUTF(pname);
out.writeUTF(category_id);
out.writeFloat(price);
out.writeInt(flag);
}

@Override
public void readFields(DataInput in) throws IOException {
order_id = in.readInt();
dateString = in.readUTF();
p_id = in.readUTF();
amount = in.readInt();
pname = in.readUTF();
category_id = in.readUTF();
price = in.readFloat();
flag = in.readInt();
}

@Override
public String toString() {
return "order_id=" + order_id + ", dateString=" + dateString
+ ", p_id=" + p_id + ", amount=" + amount + ", pname=" + pname
+ ", category_id=" + category_id + ", price=" + price
+ ", flag=" + flag;
}

}


实现MapReduce程序

package tech.mrbcy.bigdata.mr.rjoin;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class RJoin {
static class RJoinMapper extends Mapper<LongWritable, Text, Text, Info
bfde
Bean>{

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String name = inputSplit.getPath().getName();

InfoBean bean = null;
// 通过文件名判断是哪种数据
if(name.startsWith("order")){
bean = new InfoBean(Integer.parseInt(fields[0]), fields[1],
fields[2], Integer.parseInt(fields[3]));
context.write(new Text(fields[2]), bean);
}else{
bean = new InfoBean(fields[0], fields[1],
fields[2], Float.parseFloat(fields[3]));
context.write(new Text(fields[0]), bean);
}

}

}

static class RJoinReducer extends Reducer<Text, InfoBean, InfoBean, NullWritable>{

@Override
protected void reduce(Text pid, Iterable<InfoBean> beans,Context context)
throws IOException, InterruptedException {
InfoBean productBean = new InfoBean();
List<InfoBean> orderBeans = new ArrayList<InfoBean>();

try {
for(InfoBean bean : beans){
if(bean.getFlag() == 0){
InfoBean orderBean = new InfoBean();
BeanUtils.copyProperties(orderBean, bean);
orderBeans.add(orderBean);
}else{
BeanUtils.copyProperties(productBean, bean);
}
}
} catch (Exception e) {
e.printStackTrace();
}

for(InfoBean orderBean : orderBeans){
// 把orderBean的数据拷贝到productBean里面
productBean.setOrder_id(orderBean.getOrder_id());
productBean.setDateString(orderBean.getDateString());
productBean.setP_id(orderBean.getP_id());
productBean.setAmount(orderBean.getAmount());
context.write(productBean, NullWritable.get());
}
}

}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"Flow Sum");
job.setJarByClass(RJoin.class);
job.setMapperClass(RJoinMapper.class);
job.setReducerClass(RJoinReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(InfoBean.class);
job.setOutputKeyClass(InfoBean.class);
job.setOutputValueClass(NullWritable.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}


运行结果

输入数据:

order.txt

1001,20150710,P0001,2
1002,20150710,P0001,3
1002,20150710,P0001,3


product.txt

P0001,小米5,C01,2000
P0002,锤子T1,C01,3000


输出:

order_id=1002, dateString=20150710, p_id=P0001, amount=3, pname=小米5, category_id=C01, price=2000.0, flag=1
order_id=1002, dateString=20150710, p_id=P0001, amount=3, pname=小米5, category_id=C01, price=2000.0, flag=1
order_id=1001, dateString=20150710, p_id=P0001, amount=2, pname=小米5, category_id=C01, price=2000.0, flag=1
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: