您的位置:首页 > 其它

MR-5.MapReduce常见Joins方法

2016-01-26 19:32 363 查看

Map-Site joins

使用场景:一张表十分小,一张表很大
用法:在提交作业时先将小表文件放到作业的DistributedCache,然后从DistributedCache取出该小表,然后对该小表 join key/value解析分割放到内存中(可以放到hashmap等容器中)。然后扫描达标,看大表的每条记录的joinkey/value数值是否能够在内存中找到相同的join key的记录,否则则直接输出结果。

(1)数据源
气象站数据表stations
stationID stationName
011990-99999 SIHCCAJ***RI
012650-99999 TYNSET-HANM

天气记录records
stationID Timestamp Temperature
012650-99999 194903241200 111
012650-99999 195003241800 78
012650-99999 195103241900 0
011990-99999 195203242000 22
011990-99999 195303242000 11


(2)统计方法

分析: 由于stations是小表,records是大表。并且两张表的key都是stationID ,通过该键进行join,采用map-site join方法。
(1) 案例说明
public class MapSiteJoinMapper extends Mapper<LongWritable, Text, Text, Text> {

/**
* 小表放入内存,加快join速度
*/
Map<String, String> hashMap = new HashMap<String, String>();

/**
* MapTask任务示例启动加载,仅加载1次。加载的数据放入内容
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 获得当前作业的DistributedCache相关文件
String line = null;
BufferedReader br = null;
// 从本次获取(这里:MapTask启动的时候会同DistributedCache拷贝一份,有几个MapTask就拷贝几份,集群模式下运行
Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());

System.out.println("获取的路径是:  " + localCacheFiles[0].toString());
for (Path path : localCacheFiles) {
br = new BufferedReader(new FileReader(path.toString()));
while ((line = br.readLine()) != null) {
	String[] split = line.split("\t");
	hashMap.put(split[0], split[1]);
}
}
}

/**
* 根据records大表记录得ID,来从内存中获取对应该的数值,然后重新封装value
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();
String[] split = line.split("\t");
String name = hashMap.get(split[0]);
if (!StringUtils.isBlank(name)) {
Text k = new Text();
Text v = new Text();
k.set(split[0]);
v.set(name + "\t" + split[1] + "\t" + split[2]);
context.write(k, v);
}
}
}


public class MapSideJoinMain extends Configured implements Tool{

	public int run(String[] args) throws Exception {
		String[] remainingArgs = new GenericOptionsParser(args).getRemainingArgs();
		if(remainingArgs.length!=3){
			System.out.println("Usage:  <inputpath>   <outpath>  <distributedPath> ");
			return -2;
		}
		
		String inputPath = args[0];
		String outPath = args[1];
		String distributedPath = args[2];
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		fs.delete(new Path(outPath), true);

		Job job  = Job.getInstance(conf,MapSideJoinMain.class.getSimpleName());
		DistributedCache.addCacheFile(new Path(distributedPath).toUri(),job.getConfiguration());//为job添加缓存文件-缓存文件的路径

		job.setJarByClass(MapSideJoinMain.class);
		
		job.setJarByClass(MapSideJoinMain.class);
		job.setMapperClass(MapSiteJoinMapper.class);
		job.setNumReduceTasks(0);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job, new Path(inputPath));
		FileOutputFormat.setOutputPath(job, new Path(outPath));
		return job.waitForCompletion(true)?0:1;
	}
	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new MapSideJoinMain(), args);
		System.exit(exitCode);
	}
}


运行结果

012650-99999 TYNSET-HANM 194903241200 111

012650-99999 TYNSET-HANM 195003241800 78

012650-99999 TYNSET-HANM 195103241900 0

011990-99999 SIHCCAJ***RI 195203242000 22

011990-99999 SIHCCAJ***RI 195303242000 11

Reduce-Site joins

在reduce端的join不要钱输入数据集符合特定的结构,因而reduce端连接比map端连接更为常用。但是由于两个数据集均要经过MapReduce的shffle过程,故reduce端连接的效率低一些。Reduce端join实现原理:mapper为各个记录标记源,并且使用连接键作为map输出键,是键相同的记录放到同一个reducer中。



map端工作:为了区分两种不同数据源key/value数据对,对两个输入文件file1,file2每条记录进行打标签,每条记录增加tag标记,tag=0表示file1,tag=1表示file2。



reduce端工作:reduce函数获取key相同来自file1和file2文件的valuelist,然后对同一个key的file1和file2进行join操作。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: