您的位置:首页 > 其它

使用MapReduce求解join问题

2017-06-05 14:24 253 查看

背景

有两张表,以文件的形式存储在hdfs中,如下:

学生基本信息
001 jyw nan
002 lq  nv
003 jl  n

学生考试成绩信息
001 english 90
001 math    92
002 chinese 99


现在需要求解每一个学生参加的考试的成绩以及学生信息

sql就类似于:

select stu.*,score.grade from stu join score on stu.id = score.stuid


使用MapReduce解决这个问题

第一种方法,将id作为map输出的key,这样读出来的信息,id相同的就会被分配到同一个组中,然后就可以在reduce中进行关联

public class Demo1 {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();

Job job = Job.getInstance(configuration);
job.setJarByClass(Demo1.class);

job.setMapperClass(Demo1Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(InfoBean.class);

job.setReducerClass(Demo1Reducer.class);
job.setOutputKeyClass(InfoBean.class);
job.setOutputValueClass(NullWritable.class);

FileInputFormat.setInputPaths(job,new Path("F:\\wc\\input"));

FileOutputFormat.setOutputPath(job,new Path("F:\\wc\\output"));

job.waitForCompletion(true);
}
}

class Demo1Mapper extends Mapper<LongWritable,Text,Text,InfoBean>{

InfoBean infoBean = new InfoBean();

Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//每次读取一个文件的时候,应该先要能拿到这个文件的文件名
// 从而区分开学生信息以及考试信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String fileName = inputSplit.getPath().getName();

String[] split = value.toString().split("\t");

//学生的基本信息
if(fileName.startsWith("stu")){
infoBean.setInfoBean(split[0],split[1],split[2],"","","0");
}else{
infoBean.setInfoBean(split[0],"","",split[1],split[2],"1");
}

k.set(split[0]);
context.write(k,infoBean);
}
}

class Demo1Reducer extends Reducer<Text,InfoBean,InfoBean,NullWritable>{

@Override
protected void reduce(Text key, Iterable<InfoBean> values, Context context) throws IOException, InterruptedException {
InfoBean stuInfo = new InfoBean();

ArrayList<InfoBean> scoreBeans = new ArrayList<InfoBean>();

//需要先将学生信息与考试信息区分开来
for(InfoBean infoBean:values){
if(infoBean.getFlag().equals("0")){
stuInfo.setInfoBean(infoBean.getId(),infoBean.getName(),infoBean.getSex(),"","","");
}else{
scoreBeans.add(new InfoBean(infoBean.getId(),"","",infoBean.getCname(),infoBean.getScore(),""));
}
}

//将学生信息和考试信息join起来
for(InfoBean infoBean:scoreBeans){
infoBean.setName(stuInfo.getName());
infoBean.setSex(stuInfo.getSex());
context.write(infoBean,NullWritable.get());
}
}
}


这种方法的一个缺点:

在reduce端很有可能会造成数据的倾斜 ,有的学生可能修了很多的课,但是有的学生可能这一学期并没有修什么课(例如大四的学生,基本不会去上课),这就造成了有的reduce端任务繁重(因为有很多学生的选课信息被发送到了这个reduce端),而有的reduce端就会很清闲,因为获取到的学生可能并没有选课

第二种方法使用map端的join

在这里我们可以使用DistributeCache先将较小的那个文件加载到map端(读取到内存中)太大的时候也可以考虑使用其它的存储方式,例如数据库,redis等技术,然后在map的时候直接对大表进行join,不需要reduce端

public class Demo2 {
public static void main(String[] args) throws Exception {

Configuration configuration = new Configuration();

Job job = Job.getInstance(configuration);
job.setJarByClass(Demo2.class);

job.setMapperClass(Demo2Mapper.class);
job.setMapOutputKeyClass(InfoBean.class);
job.setMapOutputValueClass(NullWritable.class);

job.setNumReduceTasks(0);

//将文件上传到每一个task运行节点的工作目录下
job.addCacheFile(new URI("file:/F:/stu01.txt"));

FileInputFormat.setInputPaths(job,new Path("F:\\wc\\input"));
FileOutputFormat.setOutputPath(job,new Path("F:\\wc\\output"));
job.waitForCompletion(true);
}
}

class Demo2Mapper extends Mapper<LongWritable,Text,Text,NullWritable>{

Map<String,String> stuInfo = new Hashtable<String, String>();
Text t = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {

//获取到所有的上传的文件的URI
URI[] cacheFiles = context.getCacheFiles();

//从当前工作目录下读取出学生信息,数据较少的情况下,可以直接放在内存中
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(new File(cacheFiles[0]))));

String line;
while((line=reader.readLine())!=null){
String[] split = line.split("\t");
stuInfo.put(split[0],split[1]+"\t"+split[2]);
}
reader.close();
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
if(stuInfo.containsKey(split[0])){
t.set(stuInfo.get(split[0])+"\t"+value.toString());
context.write(t,NullWritable.get());
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: