MR案例:Map-Join
2015-08-21 15:50
176 查看
适用场景:一张表十分小【key不可重复】、一张表非常大。
用法:在Job提交时,首先将小表加载到 DistributedCache 分布式缓存中,然后从DistributeCache中读取小表解析成 key/value 保存到内存中(可以放在Hash Map等容器中)。然后扫描大表中的每条记录的 key 是否能在内存中找到相同 join key 的记录,如果有则直接输出结果。
更多参考分布式缓存DistributedCache
用法:在Job提交时,首先将小表加载到 DistributedCache 分布式缓存中,然后从DistributeCache中读取小表解析成 key/value 保存到内存中(可以放在Hash Map等容器中)。然后扫描大表中的每条记录的 key 是否能在内存中找到相同 join key 的记录,如果有则直接输出结果。
package join.map; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * map-join中小表的数据如下: * * 1 Beijing * 2 Guangzhou * 3 Shenzhen * 4 Xian * * 大表的数据如下: * * Beijing Red Star 1 * Shenzhen Thunder 3 * Guangzhou Honda 2 * Beijing Rising 1 * Guangzhou Development Bank 2 * Tencent 3 * Back of Beijing 1 */ public class MapJoin { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MapJoin2.class); //此方法已过时,被job.addCacheFile()所取代 //DistributedCache.addCacheFile(new URI("hdfs://10.16.17.182:9000/test/in/address.txt"), conf); //加载小表到 分布式缓存DistributedCache job.addCacheFile(new Path(args[0]).toUri()); job.setMapperClass(MJMapper.class); job.setNumReduceTasks(0); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[2])); System.exit(job.waitForCompletion(true)? 0:1); } public static class MJMapper extends Mapper<LongWritable, Text, Text, Text>{ /** * 此map是存放小表数据用的 * 注意小表的key是不能重复的,类似与数据库的外键表 * 在这里的小表,就相当于一个外键表 * **/ private HashMap<String, String> map=new HashMap<String, String>(); @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader br=null; // 读取文件流 String line; // 获取DistributedCached里面 的共享文件 Path[] paths = context.getLocalCacheFiles(); for(Path path : paths){ if(path.getName().indexOf("address") >= 0){ //如果是 address文件 br=new BufferedReader(new FileReader(path.toString())); while((line=br.readLine()) != null){ //读取文件中的每一行 String[] splited = line.split("\t"); map.put(splited[0], splited[1]); //将小表解析成 key/value 存放进map } } } } /** * map阶段读取并处理大表中的数据 * 小表中的数据是加载到HashMap中的,无需从hdfs读取 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if(value==null || ("").equals(value.toString())){ //跳过空值 return; } String[] splited = value.toString().split("\t"); if(map.get(splited[1]) != null){ //map中大表的 key 对应的 value 不为空 Text keyOut = new Text(splited[0]); //key=大表的第一列 Text valueOut = new Text(map.get(splited[1])); //value=小表的第二列 context.write(keyOut, valueOut); } } } }
更多参考分布式缓存DistributedCache
相关文章推荐
- Git 忽略一些文件不加入版本控制
- js+css实现文字散开重组动画特效代码分享
- 大写数字金额
- jq api
- 一步一步学习MFC
- Android中的md5加密
- nginx+keepalived实现双机热备
- python中的dict
- 链表之排序(插入、选择、归并、快速、冒泡)
- 集群redis创建
- CSS3:绘制图形
- 织梦cms如何调用含有某一关键词的文章?
- 咖啡斑治疗的治疗方法
- MVC中导航菜单,选中项的高亮问题。。
- 设计角色状态权限控制问题
- Oil Deposits
- 权限验证原理篇
- 谁让APP工程师产生了泡沫?
- AngularJS自定义Echarts标签 — 雷达图Radar
- android 组件CheckBox实例