您的位置:首页 > 其它

mapreduce操作单表关联数据

2015-12-10 11:42 232 查看
mapreduuce操作单表关联数据,数据结构如下

child parent

Tom Lucy

Tom Jack

Jone Lucy

Jone Jack

Lucy Mary

Lucy Ben

Jack Alice

Jack Jesse

Terry Alice

Terry Jesse

Philip Terry

Philip Alma

Mark Terry

Mark Alma

要求:

要求从 给出的数据中 寻找所 关心的数据,它是对 原始数据所包含信息的 挖掘。下面进入这个实例。

实例中给出 child-parent(孩子——父母)表,要求输出 grandchild-grandparent(孙子——爷奶)表

图解:



设计思路:

分析这个实例,显然需要进行单表连接,连接的是 左表的 parent 列和 右表的 child 列,且 左表和 右表是

同一个表。连接结果中 除去连接的两列就是所需要的结果 “grandchild--grandparent”表。MapReduce 解

决这个实例, 首先应该考虑如何实现 表的 自连接; 其次就是 连接列的 设置;最后是 结果的 整理。考虑

到 MapReduce 的 shuffle 过程会将相同的 key 会连接在一起,所以可以将 map 结果的 key 设置成 待连接

的 列,然后列中相同的值就自然会连接在一起了。再与最开始的分析联系起来:要连接的是左表的 parent

列和右表的 child 列,且左表和右表是同一个表,所以在 map阶段将 读入数据 分割成 child 和 parent 之后,

会将 parent 设置成 key,child 设置成 value进行输出,并作为 左表;再将对 同一对 child 和 parent 中的 child

设置成 key,parent 设置成value 进行输出,作为 右表。为了 区分输出中的 左右表,需要在输出的 value 中再

再加上左右表左右表的 信息,比如在 value 的 String 最开始处加上符 字符 1 表示 左表,加上符 字符 2 表示 右

表。这样在 map 的结果中就形成了左表和右表,然后在 shuffle 过程中完成连接。reduce 接收到连接的结果,

其中每个 key 的 value-list 就包含了“grandchild--grandparent”关系。取出每个key 的 value-list 进行解析,将 左表

中的 child 放入一个 数组, 右表中的 parent 放入一个 数组,然后对 两个数组求笛卡尔积就是最后的结果了。

代码:

package com.hebut.mr;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class STjoin {

public static class STjoinMap extends Mapper<Object, Text, Text, Text> {

@Override
protected void map(Object key, Text value,
Mapper<Object, Text, Text, Text>.Context context)
throws IOException, InterruptedException {

String childname = new String();
String parentname = new String();
String relationtype = new String();

//  输入的一行预处理文本
StringTokenizer itr = new StringTokenizer(value.toString());
String[] values = new String[2];
int i = 0;
while (itr.hasMoreTokens()) {
values[i] = itr.nextToken();
i++;
}

if (values[0].compareTo("child") != 0) {
childname = values[0];
parentname = values[1];
//  输出左表
relationtype = "1";
context.write(new Text(values[1]), new Text(relationtype + "+"
+ childname + "+" + parentname));
//  输出右表
relationtype = "2";
context.write(new Text(values[0]), new Text(relationtype + "+"
+ childname + "+" + parentname));
}

}
}

public static int time = 0;

public static class STjoinReduce extends Reducer<Text, Text, Text, Text> {

@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//  输出表头
if(0==time){
context.write(new Text("grandchild"), new Text("grandparent"));
time++;

}
int grandchildnum = 0;
String[] grandchild = new String[10];
int grandparentnum = 0;
String[] grandparent = new String[10];
Iterator<Text> ite = values.iterator();
while (ite.hasNext()) {
String record = ite.next().toString();
int len = record.length();
int i = 2;
if (0 == len) {
continue;
}
//  取得左右表标识
char relationtype = record.charAt(0);
//  定义孩子和父母变量
String childname = new String();
String parentname = new String();
//  获取 value‐list 中 value 的 child
while (record.charAt(i) != '+') {
childname += record.charAt(i);
i++;
}
i = i + 1;
//  获取 value‐list 中 value 的 parent
while (i < len) {
parentname += record.charAt(i);
i++;
}
//  左表,取出child放入grandchildren
if ('1' == relationtype) {
grandchild[grandchildnum] = childname;
grandchildnum++;
}
//  右表,取出 parent 放入 grandparent
if ('2' == relationtype) {
grandparent[grandparentnum] = parentname;
grandparentnum++;
}

}
if (0 != grandchildnum && 0 != grandparentnum) {
for (int m = 0; m < grandchildnum;m++) {
for(int n=0;n<grandparentnum;n++){

context.write(new Text(grandchild[m]), new Text(grandparent
));

}
}
}

}

}

public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {

Job job = new Job();
job.setJarByClass(STjoin.class);
job.setJobName("STjoin");
FileInputFormat.addInputPath(job, new Path(
"hdfs://hadoop:9000/user/hadoop/STjoin_in/"));
FileOutputFormat.setOutputPath(job, new Path(
"hdfs://hadoop:9000/user/hadoop/STjoin_out/"));
//  设置 Map 和 Reduce 处理类
job.setMapperClass(STjoin.STjoinMap.class);
job.setReducerClass(STjoin.STjoinReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: