您的位置:首页 > 运维架构

Hadoop MapReduce之Join示例

2013-12-10 11:20 211 查看
关于MR中的数据连接是在数据处理中经常遇到的问题,可以用一些上层框架来实现此功能,比如Hive、Pig等,这里用MR实现主要是为了理解连接的思路,MR中的连接可以在Reduce端做,也可以在Map端做,本文分别展示两种连接方式,想了解更多连接的内容可以参考<<hadoop in action>>5.2章节。

代码文件下载:http://download.csdn.net/detail/lihm0_1/6691649

reduce端的连接方式

 

                 用户订单表order                         用户信息表user
UserIDOrderIDCountrystate
1B100001USA0
2B100002Russia0
3B100003Brazil0
1B100004USA0
UserIDUserNameRegTime
1Lamb2009-01-22
2Byrd2010-08-14
3Daniel2007-03-21
4Simon2011-11-13
要对上面两个表做关联,关联键为UserID,输出结果如下:
CountryUserNameUserIDOrderID
USALamb1B100001
USALamb1B100004
BrazilDaniel3B100003
RussiaByrd2B100002
连接思路为Map端读入两个文件的数据,输出KEY:UserID  输出Value:Object(包含来自哪个文件的标记),在Reduce做连接,来自订单表的每条记录做一个输出,格式为上面定义的格式,这里假设user表的记录是不重复的。下面看具体操作



执行作业:



查看结果:



具体代码如下:

package com.join.reduceside;
/**
* 该类是一个JAVA BEAN,用于实例化文件中的每条数据,由于需要在不同节点间传输,需要覆盖Writable方法
*/
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

public class Record implements Writable{
private	String userID = "";
private String userName = "";
private String regTime = "";
private	String orderID = "";
private String country = "";
private String state = "";

public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
private String from;

public String getUserID() {
return userID;
}
public void setUserID(String userID) {
this.userID = userID;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getRegTime() {
return regTime;
}
public void setRegTime(String regTime) {
this.regTime = regTime;
}
public String getOrderID() {
return orderID;
}
public void setOrderID(String orderID) {
this.orderID = orderID;
}
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
public String getFrom() {
return from;
}
public void setFrom(String from) {
this.from = from;
}

public Record(Record record) {
this.userID = record.userID;
this.userName = record.userName;
this.regTime = record.regTime;
this.orderID = record.orderID;
this.country = record.country;
this.state = record.state;
this.from = record.from;
}
public Record() {
}
@Override
public String toString() {
return "coutry:"+this.getCountry()+"  username:"+this.getUserName()+" userid:"
+ this.getUserID() + "  regTime:" + this.getRegTime()+" orderid:"+this.orderID+" state:"+this.state;
};
////////////////////////////////////////////////////////
// Writable
////////////////////////////////////////////////////////
@Override
public void readFields(DataInput din) throws IOException {
this.userID = din.readUTF();
this.userName = din.readUTF();
this.regTime = din.readUTF();
this.orderID = din.readUTF();
this.country = din.readUTF();
this.state = din.readUTF();
this.from = din.readUTF();
}
@Override
public void write(DataOutput dout) throws IOException {
dout.writeUTF(this.userID);
dout.writeUTF(this.userName);
dout.writeUTF(this.regTime);
dout.writeUTF(this.orderID);
dout.writeUTF(this.country);
dout.writeUTF(this.state);
dout.writeUTF(this.from);

}

}


map端的操作类:

package com.join.reduceside;
/**
* Map端代码,读入order和user数据并实例化,传输到Reduce端进行连接
*/
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class JoinMap extends Mapper<LongWritable, Text, Text, Record>{
private static Record record;
protected void map(LongWritable key, Text value, Context context)
throws java.io.IOException ,InterruptedException {
record = new Record();
String line = value.toString();
String[] fields = line.split(",");
if(fields.length == 4){	//来自order表
record.setUserID(fields[0]);
record.setOrderID(fields[1]);
record.setCountry(fields[2]);
record.setState(fields[3]);
record.setFrom("order");//标记来自哪个文件
context.write(new Text(record.getUserID()), record);
}else{//来自user表
record.setUserID(fields[0]);
record.setUserName(fields[1]);
record.setRegTime(fields[2]);
record.setFrom("user");//标记来自哪个文件
context.write(new Text(record.getUserID()), record);
}
};
}

Reduce端的操作类,用于数据连接:

package com.join.reduceside;
/**
* Reduce操作类,用于关联表数据
*/
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class JoinReduce extends Reducer<Text, Record, NullWritable, Text> {

protected void reduce(Text key, java.lang.Iterable<Record> values, Context context)
throws java.io.IOException ,InterruptedException {
Record userRecord = null;
Record tmpRecord = null;
List<Record> records = new ArrayList<Record>();
Iterator<Record> it = values.iterator();
while(it.hasNext()){
tmpRecord = it.next();
if(tmpRecord.getFrom().equals("user") && null == userRecord){//来自用户表
userRecord = new Record(tmpRecord);//注意这里一定要新创建一个Record记录,负责会被覆盖,因为这里的Iterator已经被重写
}else{//来自订单表
records.add(new Record(tmpRecord));
}
}
//遍历order记录并输出,空白字段从user实例中获取
for(Record orderRecord : records){
context.write(NullWritable.get(),
new Text(orderRecord.getCountry() + ","  +
userRecord.getUserName() +"," +
userRecord.getUserID() +"," +
orderRecord.getOrderID()
));
}
}
}

作业启动:

public class JoinJob {

/**
* @param args
*/
public static void main(String[] args) throws Exception {
// 创建一个job
Configuration conf = new Configuration();
Job job = new Job(conf, "Join");
job.setJarByClass(JoinJob.class);

// 设置输入输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Record.class);

// 设置map和reduce类
job.setMapperClass(JoinMap.class);
job.setReducerClass(JoinReduce.class);

// 设置输入输出流
FileInputFormat.addInputPath(job, new Path("/tmp/user.txt"));
FileInputFormat.addInputPath(job, new Path("/tmp/order.txt"));
FileOutputFormat.setOutputPath(job, new Path("/tmp/output"));

job.waitForCompletion(true);
return;

}

}

Map端的连接方式

package com.join.mapside;
/**
* Map端代码,利用缓存中的记录循环比较读入的行记录,符合条件的做Map输出
*/
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import com.join.reduceside.Record;

public class JoinMap extends Mapper<LongWritable, Text, Text, Record>{
//小表数据缓存在Map中,每个Map在setup中做一次即可
private Map<String,Record> users = new HashMap<String,Record>();
private static Record record;

protected void setup(Context context) throws java.io.IOException ,InterruptedException {
String recordLine;
String[] recordFields;
//获得小表数据文件
Path[] userFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
System.out.println(userFiles);
Path userFile = userFiles[0];
BufferedReader br = new BufferedReader(new FileReader(userFile.toString()));
//循环读取数据,并存放在Map中
while(null != (recordLine = br.readLine())){
recordFields = recordLine.split(",");
users.put(recordFields[0], new Record(recordFields[0],recordFields[1],recordFields[2]));
}
};

protected void map(LongWritable key, Text value, Context context)
throws java.io.IOException ,InterruptedException {
record = new Record();
String line = value.toString();
String[] fields = line.split(",");
if(users.containsKey(fields[0])){
record.setCountry(fields[2]);
record.setUserName(users.get(fields[0]).getUserName());
record.setUserID(fields[0]);
record.setOrderID(fields[1]);
context.write(new Text(record.getUserID()), record);
}
};
}

package com.join.mapside;
/**
* Reduce操作类,输出已经连接好的数据
*/
import java.util.Iterator;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import com.join.reduceside.Record;

public class JoinReduce extends Reducer<Text, Record, NullWritable, Text> {

protected void reduce(Text key, java.lang.Iterable<Record> values, Context context)
throws java.io.IOException ,InterruptedException {
Record resultRecord ;
Iterator<Record> it = values.iterator();
while(it.hasNext()){
resultRecord = it.next();
context.write(NullWritable.get(), new Text(resultRecord.getCountry()+ ","
+ resultRecord.getUserName()+","
+ resultRecord.getUserID() +","
+ resultRecord.getOrderID()
));
}
}
}

package com.join.mapside;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.join.reduceside.Record;

public class JoinJob {

/**
* @param args
*/
public static void main(String[] args) throws Exception {
// 创建一个job
Configuration conf = new Configuration();
Job job = new Job(conf, "Join_Map_Side");
job.setJarByClass(JoinJob.class);

// 设置输入输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Record.class);

// 设置map和reduce类
job.setMapperClass(JoinMap.class);
job.setReducerClass(JoinReduce.class);

//添加缓存文件
DistributedCache.addCacheFile(new Path("/tmp/user.txt").toUri(),
job.getConfiguration());

// 设置输入输出流
FileInputFormat.addInputPath(job, new Path("/tmp/order.txt"));
FileOutputFormat.setOutputPath(job, new Path("/tmp/output"));

job.waitForCompletion(true);
return;

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: