您的位置:首页 > 其它

mapreduce(四):单表关联查询

2016-07-04 13:52 363 查看
本文主要通过输入文件中的child字段和parent字段进行单表关联查询,推导出哪些用户具有child与grandparent关系。


1、 输入数据分析

输入文件数据示例:
child parent
Tom Jack
Jack Alice
Jack Jesse


第1列表示child,第2列表示parent,我们需要根据child和parent的关系得出child和grantparent的关系。比如说Tom的parent是Jack,Jack的parent是Alice和Jesse,由此我们可以得出Tom的grantparent是{Alice,Jesse}。


2、 Map过程

首先使用默认的TextInputFormat类对输入文件进行处理,得到文本中每行的偏移量及其内容。Map过程首先将输入分割成child和parent,然后正序输出一次作为右表,反序输出一次作为左表,需要注意的是在输出的value中必须加上左右表区别标志,其中左表标识符为1,右表标识符为2,如图所示。




3、 Reduce过程

Reduce过程首先对输入< key,values >即<”Lucy”,[“1 Tom”,”2 Mary”,”2 Ben”]>的values值进行遍历获取到单元信息(例如”1 Tom”),然后将单元信息中的用户ID(例如Tom)按照左表、右表标识符分别存入到grandChild集合和grandParent集合,最后对grandChild集合和grandParent集合进行笛卡尔积运算得到child与grandParent的关系,并进行输出,如图所示。



<pre name="code" class="java">package com.mr5;

import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;

import com.mr1.EJob;
import com.mr3.CombinationKey;
import com.mr3.DefinedComparator;
import com.mr3.DefinedGroupSort;
import com.mr3.DefinedPartition;
import com.mr3.SecondSortMR;
import com.mr3.SecondSortMR.SortMapper;
import com.mr3.SecondSortMR.SortReducer;

/*
* 输入文件数据示例:
child parent Tom Jack Jack Alice Jack Jesse*
* 输出结果:
* Tom Alice
* Tom Jesse
*
*
* */

public class SingletonTableJoin {

public static class JoinMapper extends Mapper<LongWritable, Text, Text, Text>{
Text child=new Text();
Text parent=new Text();
String line="";
@Override
protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
if (value!=null) {
line=value.toString();
String[] values=line.split(" ");
if (values.length>=2) {
child.set(values[0]);
parent.set("parent "+values[1]);
context.write(child, parent);
parent.set(values[1]);
child.set("child "+values[0]);
context.write(parent, child);
}
}
}
}

public static class JoinReducer extends Reducer<Text,Text,Text,Text>{

String value="";
HashSet<String> childs=new HashSet<>();
HashSet<String> parents=new HashSet<>();
Text child=new Text();
Text parent=new Text();
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)throws IOException, InterruptedException {
for (Text text : values) {
value=text.toString();
if (value.contains("child")) {
childs.add(value.split(" ")[1]);
}
if (value.contains("parent")) {
parents.add(value.split(" ")[1]);
}
}

if (childs.size()!=0&&parents.size()!=0) {
for (String str1 : childs) {
child.set(str1);
for (String str2 : parents) {
parent.set(" "+str2);
context.write(child, parent);
}
}
}
childs.clear();
parents.clear();
}

}
public static void main(String[] args) throws Exception {
File jarFile = EJob.createTempJar("bin");
ClassLoader classLoader = EJob.getClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);

Configuration conf = new Configuration(true);
conf.set("fs.default.name", "hdfs://192.168.56.111:9000");
conf.set("hadoop.job.user", "root");
conf.set("mapreduce.framework.name", "yarn");
conf.set("mapreduce.jobtracker.address", "192.168.56.111:9001");
conf.set("yarn.resourcemanager.hostname", "192.168.56.111");
conf.set("yarn.resourcemanager.admin.address", "192.168.56.111:8033");
conf.set("yarn.resourcemanager.address", "192.168.56.111:8032");
conf.set("yarn.resourcemanager.resource-tracker.address","192.168.56.111:8031");
conf.set("yarn.resourcemanager.scheduler.address","192.168.56.111:8030");
conf.setBoolean("mapreduce.app-submission.cross-platform", true);
String[] otherArgs = new String[2];
otherArgs[0] = "hdfs://192.168.56.111:9000/testdata/SingletonTableJoin.txt";// 计算原文件目录,需提前在里面存入文件
String time = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
otherArgs[1] = "hdfs://192.168.56.111:9000/output/output" + time;// 计算后的计算结果存储目录,每次程序执行的结果目录不能相同,所以添加时间标签

Job job = new Job(conf, "SingletonTableJoin job");
job.setJarByClass(SingletonTableJoin.class);
((JobConf) job.getConfiguration()).setJar(jarFile.toString());// 环境变量调用,添加此句则可在eclipse中直接提交mapreduce任务,如果将该java文件打成jar包,需要将该句注释掉,否则在执行时反而找不到环境变量
System.out.println("Job start!");
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);

//设置reduce的输出key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

if (job.waitForCompletion(true)) {
System.out.println("ok!");
} else {
System.out.println("error!");
System.exit(0);
}
}

}



                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: