MapReduce中实现Join操作
2017-03-18 23:06
369 查看
需求
订单数据表t_order的数据如下:id | date | pid | amount |
---|---|---|---|
1001 | 20150710 | P0001 | 2 |
1002 | 20150710 | P0001 | 2 |
1002 | 20150710 | P0002 | 3 |
id | name | categroy_id | price |
---|---|---|---|
P0001 | 小米5 | C01 | 2 |
P0002 | 锤子T1 | C01 | 3 |
select a.id,a.date,b.name,b.categroy,bi.price from t_order a hoin t_product b on a.pid = b.id
思路
将t_order中pid与t_product中id相同的记录放到同一个Reduce中去。设计实体类
InfoBean.javapackage tech.mrbcy.bigdata.mr.rjoin; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class InfoBean implements Writable{ private int order_id = 0; private String dateString = ""; private String p_id = ""; private int amount = 0; private String pname = ""; private String category_id = ""; private float price = 0; private int flag = -1; // =0是订单,=1是产品 public InfoBean(){} public InfoBean(int order_id, String dateString, String p_id, int amount) { super(); this.order_id = order_id; this.dateString = dateString; this.p_id = p_id; this.amount = amount; this.flag = 0; } public InfoBean(String p_id, String pname, String category_id, float price) { super(); this.p_id = p_id; this.pname = pname; this.category_id = category_id; this.price = price; this.flag = 1; } public int getFlag() { return flag; } public void setFlag(int flag) { this.flag = flag; } public int getOrder_id() { return order_id; } public void setOrder_id(int order_id) { this.order_id = order_id; } public String getDateString() { return dateString; } public void setDateString(String dateString) { this.dateString = dateString; } public String getP_id() { return p_id; } public void setP_id(String p_id) { this.p_id = p_id; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public String getPname() { return pname; } public void setPname(String pname) { this.pname = pname; } public String getCategory_id() { return category_id; } public void setCategory_id(String category_id) { this.category_id = category_id; } public float getPrice() { return price; } public void setPrice(float price) { this.price = price; } @Override public void write(DataOutput out) throws IOException { out.writeInt(order_id); out.writeUTF(dateString); out.writeUTF(p_id); out.writeInt(amount); out.writeUTF(pname); out.writeUTF(category_id); out.writeFloat(price); out.writeInt(flag); } @Override public void readFields(DataInput in) throws IOException { order_id = in.readInt(); dateString = in.readUTF(); p_id = in.readUTF(); amount = in.readInt(); pname = in.readUTF(); category_id = in.readUTF(); price = in.readFloat(); flag = in.readInt(); } @Override public String toString() { return "order_id=" + order_id + ", dateString=" + dateString + ", p_id=" + p_id + ", amount=" + amount + ", pname=" + pname + ", category_id=" + category_id + ", price=" + price + ", flag=" + flag; } }
实现MapReduce程序
package tech.mrbcy.bigdata.mr.rjoin; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class RJoin { static class RJoinMapper extends Mapper<LongWritable, Text, Text, Info bfde Bean>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split(","); FileSplit inputSplit = (FileSplit) context.getInputSplit(); String name = inputSplit.getPath().getName(); InfoBean bean = null; // 通过文件名判断是哪种数据 if(name.startsWith("order")){ bean = new InfoBean(Integer.parseInt(fields[0]), fields[1], fields[2], Integer.parseInt(fields[3])); context.write(new Text(fields[2]), bean); }else{ bean = new InfoBean(fields[0], fields[1], fields[2], Float.parseFloat(fields[3])); context.write(new Text(fields[0]), bean); } } } static class RJoinReducer extends Reducer<Text, InfoBean, InfoBean, NullWritable>{ @Override protected void reduce(Text pid, Iterable<InfoBean> beans,Context context) throws IOException, InterruptedException { InfoBean productBean = new InfoBean(); List<InfoBean> orderBeans = new ArrayList<InfoBean>(); try { for(InfoBean bean : beans){ if(bean.getFlag() == 0){ InfoBean orderBean = new InfoBean(); BeanUtils.copyProperties(orderBean, bean); orderBeans.add(orderBean); }else{ BeanUtils.copyProperties(productBean, bean); } } } catch (Exception e) { e.printStackTrace(); } for(InfoBean orderBean : orderBeans){ // 把orderBean的数据拷贝到productBean里面 productBean.setOrder_id(orderBean.getOrder_id()); productBean.setDateString(orderBean.getDateString()); productBean.setP_id(orderBean.getP_id()); productBean.setAmount(orderBean.getAmount()); context.write(productBean, NullWritable.get()); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"Flow Sum"); job.setJarByClass(RJoin.class); job.setMapperClass(RJoinMapper.class); job.setReducerClass(RJoinReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(InfoBean.class); job.setOutputKeyClass(InfoBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
运行结果
输入数据:order.txt
1001,20150710,P0001,2 1002,20150710,P0001,3 1002,20150710,P0001,3
product.txt
P0001,小米5,C01,2000 P0002,锤子T1,C01,3000
输出:
order_id=1002, dateString=20150710, p_id=P0001, amount=3, pname=小米5, category_id=C01, price=2000.0, flag=1 order_id=1002, dateString=20150710, p_id=P0001, amount=3, pname=小米5, category_id=C01, price=2000.0, flag=1 order_id=1001, dateString=20150710, p_id=P0001, amount=2, pname=小米5, category_id=C01, price=2000.0, flag=1
相关文章推荐
- MapReduce实现基本SQL操作的原理-join和group by,以及Dinstinct
- MapReduce 实现 InnerJoin 操作: 在Reduce端实现Join
- MapReduce 实现数据join操作
- MapReduce实现join操作
- 使用MapReduce实现join操作
- MapReduce实现基本SQL操作的原理-join和group by,以及Dinstinct
- 使用mapreduce实现多表连接join操作
- MapReduce实现join操作
- MapReduce实现join操作
- 【转载】MapReduce实现基本SQL操作的原理-join和group by,以及Dinstinct
- MapReduce实现join操作
- MapReduce实现Reduce端Join操作实例
- MapReduce实现hive join操作
- MapReduce实现基本SQL操作的原理-join和group by,以及Dinstinct
- MapReduce之Join操作(1)
- hadoop中MapReduce多种join实现实例分析 推荐
- MapReduce之Join操作(map-side Join)
- 基于hadoop2.2的map端表关联(map side join)mapreduce实现
- javascript的常用string操作——join,replace和replaceAll的实现
- MapReduce之Join操作(2)