Hadoop MapReduce之Join示例
2013-12-10 11:20
211 查看
关于MR中的数据连接是在数据处理中经常遇到的问题,可以用一些上层框架来实现此功能,比如Hive、Pig等,这里用MR实现主要是为了理解连接的思路,MR中的连接可以在Reduce端做,也可以在Map端做,本文分别展示两种连接方式,想了解更多连接的内容可以参考<<hadoop in action>>5.2章节。
代码文件下载:http://download.csdn.net/detail/lihm0_1/6691649
reduce端的连接方式
要对上面两个表做关联,关联键为UserID,输出结果如下:
连接思路为Map端读入两个文件的数据,输出KEY:UserID 输出Value:Object(包含来自哪个文件的标记),在Reduce做连接,来自订单表的每条记录做一个输出,格式为上面定义的格式,这里假设user表的记录是不重复的。下面看具体操作
执行作业:
查看结果:
具体代码如下:
map端的操作类:
Reduce端的操作类,用于数据连接:
作业启动:
Map端的连接方式
代码文件下载:http://download.csdn.net/detail/lihm0_1/6691649
reduce端的连接方式
用户订单表order | 用户信息表user | |||||||||||||||||||||||||||||||||||
|
|
Country | UserName | UserID | OrderID |
USA | Lamb | 1 | B100001 |
USA | Lamb | 1 | B100004 |
Brazil | Daniel | 3 | B100003 |
Russia | Byrd | 2 | B100002 |
执行作业:
查看结果:
具体代码如下:
package com.join.reduceside; /** * 该类是一个JAVA BEAN,用于实例化文件中的每条数据,由于需要在不同节点间传输,需要覆盖Writable方法 */ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class Record implements Writable{ private String userID = ""; private String userName = ""; private String regTime = ""; private String orderID = ""; private String country = ""; private String state = ""; public String getState() { return state; } public void setState(String state) { this.state = state; } private String from; public String getUserID() { return userID; } public void setUserID(String userID) { this.userID = userID; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getRegTime() { return regTime; } public void setRegTime(String regTime) { this.regTime = regTime; } public String getOrderID() { return orderID; } public void setOrderID(String orderID) { this.orderID = orderID; } public String getCountry() { return country; } public void setCountry(String country) { this.country = country; } public String getFrom() { return from; } public void setFrom(String from) { this.from = from; } public Record(Record record) { this.userID = record.userID; this.userName = record.userName; this.regTime = record.regTime; this.orderID = record.orderID; this.country = record.country; this.state = record.state; this.from = record.from; } public Record() { } @Override public String toString() { return "coutry:"+this.getCountry()+" username:"+this.getUserName()+" userid:" + this.getUserID() + " regTime:" + this.getRegTime()+" orderid:"+this.orderID+" state:"+this.state; }; //////////////////////////////////////////////////////// // Writable //////////////////////////////////////////////////////// @Override public void readFields(DataInput din) throws IOException { this.userID = din.readUTF(); this.userName = din.readUTF(); this.regTime = din.readUTF(); this.orderID = din.readUTF(); this.country = din.readUTF(); this.state = din.readUTF(); this.from = din.readUTF(); } @Override public void write(DataOutput dout) throws IOException { dout.writeUTF(this.userID); dout.writeUTF(this.userName); dout.writeUTF(this.regTime); dout.writeUTF(this.orderID); dout.writeUTF(this.country); dout.writeUTF(this.state); dout.writeUTF(this.from); } }
map端的操作类:
package com.join.reduceside; /** * Map端代码,读入order和user数据并实例化,传输到Reduce端进行连接 */ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class JoinMap extends Mapper<LongWritable, Text, Text, Record>{ private static Record record; protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException { record = new Record(); String line = value.toString(); String[] fields = line.split(","); if(fields.length == 4){ //来自order表 record.setUserID(fields[0]); record.setOrderID(fields[1]); record.setCountry(fields[2]); record.setState(fields[3]); record.setFrom("order");//标记来自哪个文件 context.write(new Text(record.getUserID()), record); }else{//来自user表 record.setUserID(fields[0]); record.setUserName(fields[1]); record.setRegTime(fields[2]); record.setFrom("user");//标记来自哪个文件 context.write(new Text(record.getUserID()), record); } }; }
Reduce端的操作类,用于数据连接:
package com.join.reduceside; /** * Reduce操作类,用于关联表数据 */ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class JoinReduce extends Reducer<Text, Record, NullWritable, Text> { protected void reduce(Text key, java.lang.Iterable<Record> values, Context context) throws java.io.IOException ,InterruptedException { Record userRecord = null; Record tmpRecord = null; List<Record> records = new ArrayList<Record>(); Iterator<Record> it = values.iterator(); while(it.hasNext()){ tmpRecord = it.next(); if(tmpRecord.getFrom().equals("user") && null == userRecord){//来自用户表 userRecord = new Record(tmpRecord);//注意这里一定要新创建一个Record记录,负责会被覆盖,因为这里的Iterator已经被重写 }else{//来自订单表 records.add(new Record(tmpRecord)); } } //遍历order记录并输出,空白字段从user实例中获取 for(Record orderRecord : records){ context.write(NullWritable.get(), new Text(orderRecord.getCountry() + "," + userRecord.getUserName() +"," + userRecord.getUserID() +"," + orderRecord.getOrderID() )); } } }
作业启动:
public class JoinJob { /** * @param args */ public static void main(String[] args) throws Exception { // 创建一个job Configuration conf = new Configuration(); Job job = new Job(conf, "Join"); job.setJarByClass(JoinJob.class); // 设置输入输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Record.class); // 设置map和reduce类 job.setMapperClass(JoinMap.class); job.setReducerClass(JoinReduce.class); // 设置输入输出流 FileInputFormat.addInputPath(job, new Path("/tmp/user.txt")); FileInputFormat.addInputPath(job, new Path("/tmp/order.txt")); FileOutputFormat.setOutputPath(job, new Path("/tmp/output")); job.waitForCompletion(true); return; } }
Map端的连接方式
package com.join.mapside; /** * Map端代码,利用缓存中的记录循环比较读入的行记录,符合条件的做Map输出 */ import java.io.BufferedReader; import java.io.FileReader; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import com.join.reduceside.Record; public class JoinMap extends Mapper<LongWritable, Text, Text, Record>{ //小表数据缓存在Map中,每个Map在setup中做一次即可 private Map<String,Record> users = new HashMap<String,Record>(); private static Record record; protected void setup(Context context) throws java.io.IOException ,InterruptedException { String recordLine; String[] recordFields; //获得小表数据文件 Path[] userFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration()); System.out.println(userFiles); Path userFile = userFiles[0]; BufferedReader br = new BufferedReader(new FileReader(userFile.toString())); //循环读取数据,并存放在Map中 while(null != (recordLine = br.readLine())){ recordFields = recordLine.split(","); users.put(recordFields[0], new Record(recordFields[0],recordFields[1],recordFields[2])); } }; protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException { record = new Record(); String line = value.toString(); String[] fields = line.split(","); if(users.containsKey(fields[0])){ record.setCountry(fields[2]); record.setUserName(users.get(fields[0]).getUserName()); record.setUserID(fields[0]); record.setOrderID(fields[1]); context.write(new Text(record.getUserID()), record); } }; }
package com.join.mapside; /** * Reduce操作类,输出已经连接好的数据 */ import java.util.Iterator; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import com.join.reduceside.Record; public class JoinReduce extends Reducer<Text, Record, NullWritable, Text> { protected void reduce(Text key, java.lang.Iterable<Record> values, Context context) throws java.io.IOException ,InterruptedException { Record resultRecord ; Iterator<Record> it = values.iterator(); while(it.hasNext()){ resultRecord = it.next(); context.write(NullWritable.get(), new Text(resultRecord.getCountry()+ "," + resultRecord.getUserName()+"," + resultRecord.getUserID() +"," + resultRecord.getOrderID() )); } } }
package com.join.mapside; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.join.reduceside.Record; public class JoinJob { /** * @param args */ public static void main(String[] args) throws Exception { // 创建一个job Configuration conf = new Configuration(); Job job = new Job(conf, "Join_Map_Side"); job.setJarByClass(JoinJob.class); // 设置输入输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Record.class); // 设置map和reduce类 job.setMapperClass(JoinMap.class); job.setReducerClass(JoinReduce.class); //添加缓存文件 DistributedCache.addCacheFile(new Path("/tmp/user.txt").toUri(), job.getConfiguration()); // 设置输入输出流 FileInputFormat.addInputPath(job, new Path("/tmp/order.txt")); FileOutputFormat.setOutputPath(job, new Path("/tmp/output")); job.waitForCompletion(true); return; } }
相关文章推荐
- hadoop中MapReduce多种join实现实例分析
- Hadoop: MapReduce2的几个基本示例
- Hadoop示例程序之单词统计MapReduce
- Hadoop中MapReduce多种join实现实例分析
- Hadoop mapreduce 入门示例详解
- Hadoop Mapreduce 连接(Join)之一:重分区连接(Repartition join)
- 基于hadoop2.2的map端表关联(map side join)mapreduce实现
- Hadoop MapReduce初窥-wordcount示例
- 基于mapreduce的Hadoop join实现
- [ Hadoop | MapReduce ] 使用 CompositeInputSplit 来提高Join效率
- 【hadoop】 3002-mapreduce程序统计单词个数示例
- hadoop MapReduce java示例
- Hadoop示例程序之单词统计MapReduce
- Hadoop MapReduce进阶 使用DataJoin包实现Join
- *****MapReduce连接:重分区连接【里面分析了org.apache.hadoop.contrib.utils.join包中的基础数据join原理和优化后的抽象类】
- Hadoop-MapReduce2的几个基本示例
- hadoop MapReduce示例
- Hadoop: MapReduce2的几个基本示例
- Hadoop MapReduce示例程序WordCount.java手动编译运行解析
- Hadoop MapReduce进阶 使用DataJoin包实现Join