MR-5.MapReduce常见Joins方法
2016-01-26 19:32
363 查看
Map-Site joins
使用场景:一张表十分小,一张表很大用法:在提交作业时先将小表文件放到作业的DistributedCache,然后从DistributedCache取出该小表,然后对该小表 join key/value解析分割放到内存中(可以放到hashmap等容器中)。然后扫描达标,看大表的每条记录的joinkey/value数值是否能够在内存中找到相同的join key的记录,否则则直接输出结果。
(1)数据源
气象站数据表stations
stationID stationName
011990-99999 SIHCCAJ***RI
012650-99999 TYNSET-HANM
天气记录records
stationID Timestamp Temperature
012650-99999 194903241200 111
012650-99999 195003241800 78
012650-99999 195103241900 0
011990-99999 195203242000 22
011990-99999 195303242000 11
(2)统计方法
分析: 由于stations是小表,records是大表。并且两张表的key都是stationID ,通过该键进行join,采用map-site join方法。
(1) 案例说明
public class MapSiteJoinMapper extends Mapper<LongWritable, Text, Text, Text> { /** * 小表放入内存,加快join速度 */ Map<String, String> hashMap = new HashMap<String, String>(); /** * MapTask任务示例启动加载,仅加载1次。加载的数据放入内容 */ @Override protected void setup(Context context) throws IOException, InterruptedException { // 获得当前作业的DistributedCache相关文件 String line = null; BufferedReader br = null; // 从本次获取(这里:MapTask启动的时候会同DistributedCache拷贝一份,有几个MapTask就拷贝几份,集群模式下运行 Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration()); System.out.println("获取的路径是: " + localCacheFiles[0].toString()); for (Path path : localCacheFiles) { br = new BufferedReader(new FileReader(path.toString())); while ((line = br.readLine()) != null) { String[] split = line.split("\t"); hashMap.put(split[0], split[1]); } } } /** * 根据records大表记录得ID,来从内存中获取对应该的数值,然后重新封装value */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split("\t"); String name = hashMap.get(split[0]); if (!StringUtils.isBlank(name)) { Text k = new Text(); Text v = new Text(); k.set(split[0]); v.set(name + "\t" + split[1] + "\t" + split[2]); context.write(k, v); } } }
public class MapSideJoinMain extends Configured implements Tool{ public int run(String[] args) throws Exception { String[] remainingArgs = new GenericOptionsParser(args).getRemainingArgs(); if(remainingArgs.length!=3){ System.out.println("Usage: <inputpath> <outpath> <distributedPath> "); return -2; } String inputPath = args[0]; String outPath = args[1]; String distributedPath = args[2]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); fs.delete(new Path(outPath), true); Job job = Job.getInstance(conf,MapSideJoinMain.class.getSimpleName()); DistributedCache.addCacheFile(new Path(distributedPath).toUri(),job.getConfiguration());//为job添加缓存文件-缓存文件的路径 job.setJarByClass(MapSideJoinMain.class); job.setJarByClass(MapSideJoinMain.class); job.setMapperClass(MapSiteJoinMapper.class); job.setNumReduceTasks(0); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outPath)); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MapSideJoinMain(), args); System.exit(exitCode); } }
运行结果
012650-99999 TYNSET-HANM 194903241200 111 012650-99999 TYNSET-HANM 195003241800 78 012650-99999 TYNSET-HANM 195103241900 0 011990-99999 SIHCCAJ***RI 195203242000 22 011990-99999 SIHCCAJ***RI 195303242000 11 |
Reduce-Site joins
在reduce端的join不要钱输入数据集符合特定的结构,因而reduce端连接比map端连接更为常用。但是由于两个数据集均要经过MapReduce的shffle过程,故reduce端连接的效率低一些。Reduce端join实现原理:mapper为各个记录标记源,并且使用连接键作为map输出键,是键相同的记录放到同一个reducer中。map端工作:为了区分两种不同数据源key/value数据对,对两个输入文件file1,file2每条记录进行打标签,每条记录增加tag标记,tag=0表示file1,tag=1表示file2。
reduce端工作:reduce函数获取key相同来自file1和file2文件的valuelist,然后对同一个key的file1和file2进行join操作。
相关文章推荐
- C++拷贝构造函数详解
- Dynamic Web Module 3.0 requires Java 1.6 or newer
- <OpenGL>Introduction
- POJ3071-Football(概率DP+滚动数组)
- Android的一些编码规范和优化
- sqlserver2005T-SQL查询语句的逻辑查询处理1
- Qt之进程间通信(Windows消息)
- Service 与 Activity之间的通信新方式
- android进程间的通信方式AIDL
- Qt之进程间通信(Windows消息)
- java火车站售票系统之多线程并发访问
- hadoop 第一个程序 wordcount 详解
- Swift传值传的是什么
- 如何解决C/C++中stack overflow问题
- MR-5.MapReduce排序
- mysql-otp 驱动中设置utf8mb4
- 设计模式笔记
- MR-5.MapReduce分布式缓存(Distribute Cache)
- windows DOS环境下看星球大战
- Weblogic注册为Windows的系统服务项+删除服务项