您的位置:首页 > 其它

MapReduce实现join操作

2015-11-07 22:48 513 查看
MapReduce实现join操作

前阵子把MapReduce实现join操作的算法设想清楚了,但一直没有在代码层面落地。今天终于费了些功夫把整个流程走了一遭,期间经历了诸多麻烦并最终得以将其一一搞定,再次深切体会到,什么叫从计算模型到算法实现还有很多路要走。

数据准备

首先是准备好数据。这个倒已经是一个熟练的过程,所要做的是把示例数据准备好,记住路径和字段分隔符。
准备好下面两张表:
(1)m_ys_lab_jointest_a(以下简称表A)
建表语句为:

[sql] view
plaincopyprint?

create table if not exists m_ys_lab_jointest_a (

id bigint,

name string

)

row format delimited

fields terminated by '9'

lines terminated by '10'

stored as textfile;

数据:
id name

1 北京

2 天津

3 河北

4 山西

5 内蒙古

6 辽宁

7 吉林

8 黑龙江
(2)m_ys_lab_jointest_b(以下简称表B)
建表语句为:

[sql] view
plaincopyprint?

create table if not exists m_ys_lab_jointest_b (

id bigint,

statyear bigint,

num bigint

)

row format delimited

fields terminated by '9'

lines terminated by '10'

stored as textfile;

数据:

id statyear num

1 2010 1962

1 2011 2019

2 2010 1299

2 2011 1355

4 2010 3574

4 2011 3593

9 2010 2303

9 2011 2347
我们的目的是,以id为key做join操作,得到以下表:

m_ys_lab_jointest_ab

id name statyear num

1 北京 2011 2019

1 北京 2010 1962

2 天津 2011 1355

2 天津 2010 1299

4 山西 2011 3593

4 山西 2010 3574

计算模型

整个计算过程是:
(1)在map阶段,把所有记录标记成<key, value>的形式,其中key是id,value则根据来源不同取不同的形式:来源于表A的记录,value的值为"a#"+name;来源于表B的记录,value的值为"b#"+score。
(2)在reduce阶段,先把每个key下的value列表拆分为分别来自表A和表B的两部分,分别放入两个向量中。然后遍历两个向量做笛卡尔积,形成一条条最终结果。
如下图所示:



代码

代码如下:

[java] view
plaincopyprint?

import java.io.IOException;

import java.util.HashMap;

import java.util.Iterator;

import java.util.Vector;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapred.FileSplit;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.RecordWriter;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

/**

* MapReduce实现Join操作

*/

public class MapRedJoin {

public static final String DELIMITER = "\u0009"; // 字段分隔符

// map过程

public static class MapClass extends MapReduceBase implements

Mapper<LongWritable, Text, Text, Text> {

public void configure(JobConf job) {

super.configure(job);

}

public void map(LongWritable key, Text value, OutputCollector<Text, Text> output,

Reporter reporter) throws IOException, ClassCastException {

// 获取输入文件的全路径和名称

String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();

// 获取记录字符串

String line = value.toString();

// 抛弃空记录

if (line == null || line.equals("")) return;

// 处理来自表A的记录

if (filePath.contains("m_ys_lab_jointest_a")) {

String[] values = line.split(DELIMITER); // 按分隔符分割出字段

if (values.length < 2) return;

String id = values[0]; // id

String name = values[1]; // name

output.collect(new Text(id), new Text("a#"+name));

}

// 处理来自表B的记录

else if (filePath.contains("m_ys_lab_jointest_b")) {

String[] values = line.split(DELIMITER); // 按分隔符分割出字段

if (values.length < 3) return;

String id = values[0]; // id

String statyear = values[1]; // statyear

String num = values[2]; //num

output.collect(new Text(id), new Text("b#"+statyear+DELIMITER+num));

}

}

}

// reduce过程

public static class Reduce extends MapReduceBase

implements Reducer<Text, Text, Text, Text> {

public void reduce(Text key, Iterator<Text> values,

OutputCollector<Text, Text> output, Reporter reporter)

throws IOException {

Vector<String> vecA = new Vector<String>(); // 存放来自表A的值

Vector<String> vecB = new Vector<String>(); // 存放来自表B的值

while (values.hasNext()) {

String value = values.next().toString();

if (value.startsWith("a#")) {

vecA.add(value.substring(2));

} else if (value.startsWith("b#")) {

vecB.add(value.substring(2));

}

}

int sizeA = vecA.size();

int sizeB = vecB.size();

// 遍历两个向量

int i, j;

for (i = 0; i < sizeA; i ++) {

for (j = 0; j < sizeB; j ++) {

output.collect(key, new Text(vecA.get(i) + DELIMITER +vecB.get(j)));

}

}

}

}

protected void configJob(JobConf conf) {

conf.setMapOutputKeyClass(Text.class);

conf.setMapOutputValueClass(Text.class);

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(Text.class);

conf.setOutputFormat(ReportOutFormat.class);

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: