MapReduce实现join操作
2016-03-01 14:48
405 查看
数据准备
首先是准备好数据。这个倒已经是一个熟练的过程,所要做的是把示例数据准备好,记住路径和字段分隔符。准备好下面两张表:
(1)m_ys_lab_jointest_a(以下简称表A)
建表语句为:
[sql]
view plain
copy
print?
create table if not exists m_ys_lab_jointest_a (
id bigint,
name string
)
row format delimited
fields terminated by '9'
lines terminated by '10'
stored as textfile;
数据:
id name 1 北京 2 天津 3 河北 4 山西 5 内蒙古 6 辽宁 7 吉林 8 黑龙江 |
建表语句为:
[sql]
view plain
copy
print?
create table if not exists m_ys_lab_jointest_b (
id bigint,
statyear bigint,
num bigint
)
row format delimited
fields terminated by '9'
lines terminated by '10'
stored as textfile;
数据:
id statyear num 1 2010 1962 1 2011 2019 2 2010 1299 2 2011 1355 4 2010 3574 4 2011 3593 9 2010 2303 9 2011 2347 |
m_ys_lab_jointest_ab
id name statyear num 1 北京 2011 2019 1 北京 2010 1962 2 天津 2011 1355 2 天津 2010 1299 4 山西 2011 3593 4 山西 2010 3574 |
计算模型
整个计算过程是:(1)在map阶段,把所有记录标记成<key, value>的形式,其中key是id,value则根据来源不同取不同的形式:来源于表A的记录,value的值为"a#"+name;来源于表B的记录,value的值为"b#"+score。
(2)在reduce阶段,先把每个key下的value列表拆分为分别来自表A和表B的两部分,分别放入两个向量中。然后遍历两个向量做笛卡尔积,形成一条条最终结果。
如下图所示:
代码
代码如下:[java]
view plain
copy
print?
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Vector;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
/**
* MapReduce实现Join操作
*/
public class MapRedJoin {
public static final String DELIMITER = "\u0009"; // 字段分隔符
// map过程
public static class MapClass extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {
public void configure(JobConf job) {
super.configure(job);
}
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output,
Reporter reporter) throws IOException, ClassCastException {
// 获取输入文件的全路径和名称
String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();
// 获取记录字符串
String line = value.toString();
// 抛弃空记录
if (line == null || line.equals("")) return;
// 处理来自表A的记录
if (filePath.contains("m_ys_lab_jointest_a")) {
String[] values = line.split(DELIMITER); // 按分隔符分割出字段
if (values.length < 2) return;
String id = values[0]; // id
String name = values[1]; // name
output.collect(new Text(id), new Text("a#"+name));
}
// 处理来自表B的记录
else if (filePath.contains("m_ys_lab_jointest_b")) {
String[] values = line.split(DELIMITER); // 按分隔符分割出字段
if (values.length < 3) return;
String id = values[0]; // id
String statyear = values[1]; // statyear
String num = values[2]; //num
output.collect(new Text(id), new Text("b#"+statyear+DELIMITER+num));
}
}
}
// reduce过程
public static class Reduce extends MapReduceBase
implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
Vector<String> vecA = new Vector<String>(); // 存放来自表A的值
Vector<String> vecB = new Vector<String>(); // 存放来自表B的值
while (values.hasNext()) {
String value = values.next().toString();
if (value.startsWith("a#")) {
vecA.add(value.substring(2));
} else if (value.startsWith("b#")) {
vecB.add(value.substring(2));
}
}
int sizeA = vecA.size();
int sizeB = vecB.size();
// 遍历两个向量
int i, j;
for (i = 0; i < sizeA; i ++) {
for (j = 0; j < sizeB; j ++) {
output.collect(key, new Text(vecA.get(i) + DELIMITER +vecB.get(j)));
}
}
}
}
protected void configJob(JobConf conf) {
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setOutputFormat(ReportOutFormat.class);
}
}
技术细节
下面说一下其中的若干技术细节:(1)由于输入数据涉及两张表,我们需要判断当前处理的记录是来自表A还是来自表B。Reporter类getInputSplit()方法可以获取输入数据的路径,具体代码如下:
[align=left]String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();[/align]
[align=left](2)map的输出的结果,同id的所有记录(不管来自表A还是表B)都在同一个key下保存在同一个列表中,在reduce阶段需要将其拆开,保存为相当于笛卡尔积的m x n条记录。由于事先不知道m、n是多少,这里使用了两个向量(可增长数组)来分别保存来自表A和表B的记录,再用一个两层嵌套循环组织出我们需要的最终结果。[/align]
(3)在MapReduce中可以使用System.out.println()方法输出,以方便调试。不过System.out.println()的内容不会在终端显示,而是输出到了stdout和stderr这两个文件中,这两个文件位于logs/userlogs/attempt_xxx目录下。可以通过web端的历史job查看中的“Analyse
This Job”来查看stdout和stderr的内容。
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- PropertyChangeListener简单理解
- c++11 + SDL2 + ffmpeg +OpenAL + java = Android播放器
- 插入排序
- 冒泡排序
- 堆排序
- 快速排序
- 二叉查找树