使用MapReduce求解join问题
2017-06-05 14:24
253 查看
背景
有两张表,以文件的形式存储在hdfs中,如下:学生基本信息 001 jyw nan 002 lq nv 003 jl n 学生考试成绩信息 001 english 90 001 math 92 002 chinese 99
现在需要求解每一个学生参加的考试的成绩以及学生信息
sql就类似于:
select stu.*,score.grade from stu join score on stu.id = score.stuid
使用MapReduce解决这个问题
第一种方法,将id作为map输出的key,这样读出来的信息,id相同的就会被分配到同一个组中,然后就可以在reduce中进行关联
public class Demo1 { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(Demo1.class); job.setMapperClass(Demo1Mapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(InfoBean.class); job.setReducerClass(Demo1Reducer.class); job.setOutputKeyClass(InfoBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job,new Path("F:\\wc\\input")); FileOutputFormat.setOutputPath(job,new Path("F:\\wc\\output")); job.waitForCompletion(true); } } class Demo1Mapper extends Mapper<LongWritable,Text,Text,InfoBean>{ InfoBean infoBean = new InfoBean(); Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //每次读取一个文件的时候,应该先要能拿到这个文件的文件名 // 从而区分开学生信息以及考试信息 FileSplit inputSplit = (FileSplit) context.getInputSplit(); String fileName = inputSplit.getPath().getName(); String[] split = value.toString().split("\t"); //学生的基本信息 if(fileName.startsWith("stu")){ infoBean.setInfoBean(split[0],split[1],split[2],"","","0"); }else{ infoBean.setInfoBean(split[0],"","",split[1],split[2],"1"); } k.set(split[0]); context.write(k,infoBean); } } class Demo1Reducer extends Reducer<Text,InfoBean,InfoBean,NullWritable>{ @Override protected void reduce(Text key, Iterable<InfoBean> values, Context context) throws IOException, InterruptedException { InfoBean stuInfo = new InfoBean(); ArrayList<InfoBean> scoreBeans = new ArrayList<InfoBean>(); //需要先将学生信息与考试信息区分开来 for(InfoBean infoBean:values){ if(infoBean.getFlag().equals("0")){ stuInfo.setInfoBean(infoBean.getId(),infoBean.getName(),infoBean.getSex(),"","",""); }else{ scoreBeans.add(new InfoBean(infoBean.getId(),"","",infoBean.getCname(),infoBean.getScore(),"")); } } //将学生信息和考试信息join起来 for(InfoBean infoBean:scoreBeans){ infoBean.setName(stuInfo.getName()); infoBean.setSex(stuInfo.getSex()); context.write(infoBean,NullWritable.get()); } } }
这种方法的一个缺点:
在reduce端很有可能会造成数据的倾斜 ,有的学生可能修了很多的课,但是有的学生可能这一学期并没有修什么课(例如大四的学生,基本不会去上课),这就造成了有的reduce端任务繁重(因为有很多学生的选课信息被发送到了这个reduce端),而有的reduce端就会很清闲,因为获取到的学生可能并没有选课
第二种方法使用map端的join
在这里我们可以使用DistributeCache先将较小的那个文件加载到map端(读取到内存中)太大的时候也可以考虑使用其它的存储方式,例如数据库,redis等技术,然后在map的时候直接对大表进行join,不需要reduce端public class Demo2 { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(Demo2.class); job.setMapperClass(Demo2Mapper.class); job.setMapOutputKeyClass(InfoBean.class); job.setMapOutputValueClass(NullWritable.class); job.setNumReduceTasks(0); //将文件上传到每一个task运行节点的工作目录下 job.addCacheFile(new URI("file:/F:/stu01.txt")); FileInputFormat.setInputPaths(job,new Path("F:\\wc\\input")); FileOutputFormat.setOutputPath(job,new Path("F:\\wc\\output")); job.waitForCompletion(true); } } class Demo2Mapper extends Mapper<LongWritable,Text,Text,NullWritable>{ Map<String,String> stuInfo = new Hashtable<String, String>(); Text t = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { //获取到所有的上传的文件的URI URI[] cacheFiles = context.getCacheFiles(); //从当前工作目录下读取出学生信息,数据较少的情况下,可以直接放在内存中 BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(new File(cacheFiles[0])))); String line; while((line=reader.readLine())!=null){ String[] split = line.split("\t"); stuInfo.put(split[0],split[1]+"\t"+split[2]); } reader.close(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); if(stuInfo.containsKey(split[0])){ t.set(stuInfo.get(split[0])+"\t"+value.toString()); context.write(t,NullWritable.get()); } } }
相关文章推荐
- 凸包问题求解 使用蛮力算法
- 求解 关于 套汇问题 要求 使用 C++
- ASPX界面里的link标签中使用<%=myurl %>的问题求解
- 使用单链表求解约瑟夫环问题 (利用java中的LinkedList)
- 使用单链表求解约瑟夫环问题 (自定义单链表)
- 求解在SQL中使用了where列所遇到的问题
- poj-1322-Chocolate 使用动态规划求解的一种概率问题的算法
- 使用 Matlab 的 bvp4c 求解边值问题
- 使用 Matlab 的 bvp4c 求解边值问题
- 求解SDP问题—使用SeDuMi和YALMIP
- 使用不同的算法求解0-1背包问题
- zoj 2859 RMQ问题 使用分割法求解
- 使用顺序表求解约瑟夫环问题
- Hadoop MapReduce进阶 使用DataJoin包实现Join
- Hadoop MapReduce进阶 使用DataJoin包实现Join
- VS2003升级到2005后使用AjaxControlToolkit的一诡异问题(已找到解决办法,求解原因)
- Spring3mvc,在使用ajax时发生的问题,求解?
- 迷宫求解问题——堆栈的使用
- 使用顺序表求解约瑟夫环问题 (自定义顺序表)
- (使用数的便利求解层次性问题8.1.2)POJ 2003 Hire and Fire(元素的插入与删除)