您的位置:首页 > 其它

MR案例:Map-Join

2015-08-21 15:50 176 查看
适用场景:一张表十分小【key不可重复】、一张表非常大。
用法:在Job提交时,首先将小表加载到 DistributedCache 分布式缓存中,然后从DistributeCache中读取小表解析成 key/value 保存到内存中(可以放在Hash Map等容器中)。然后扫描大表中的每条记录的 key 是否能在内存中找到相同 join key 的记录,如果有则直接输出结果。

package join.map;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* map-join中小表的数据如下:
*
*    1    Beijing
*    2    Guangzhou
*    3    Shenzhen
*    4    Xian
*
* 大表的数据如下:
*
*    Beijing Red Star              1
*    Shenzhen Thunder              3
*    Guangzhou Honda               2
*    Beijing Rising                1
*    Guangzhou Development Bank    2
*    Tencent                       3
*    Back of Beijing               1
*/
public class MapJoin {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MapJoin2.class);

//此方法已过时,被job.addCacheFile()所取代
//DistributedCache.addCacheFile(new URI("hdfs://10.16.17.182:9000/test/in/address.txt"), conf);

//加载小表到 分布式缓存DistributedCache
     job.addCacheFile(new Path(args[0]).toUri());

job.setMapperClass(MJMapper.class);
job.setNumReduceTasks(0);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));

System.exit(job.waitForCompletion(true)? 0:1);
}

public static class MJMapper extends Mapper<LongWritable, Text, Text, Text>{

/**
* 此map是存放小表数据用的
* 注意小表的key是不能重复的,类似与数据库的外键表
* 在这里的小表,就相当于一个外键表
* **/
private HashMap<String, String> map=new HashMap<String, String>();

@Override
protected void setup(Context context) throws IOException, InterruptedException {

BufferedReader br=null;        // 读取文件流
String line;

// 获取DistributedCached里面 的共享文件
        Path[] paths = context.getLocalCacheFiles();

for(Path path : paths){
if(path.getName().indexOf("address") >= 0){        //如果是 address文件
            br=new BufferedReader(new FileReader(path.toString()));

while((line=br.readLine()) != null){    //读取文件中的每一行
               String[] splited = line.split("\t");

map.put(splited[0], splited[1]);    //将小表解析成 key/value 存放进map
}
}
}
}

/**
* map阶段读取并处理大表中的数据
* 小表中的数据是加载到HashMap中的,无需从hdfs读取
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

if(value==null || ("").equals(value.toString())){    //跳过空值
return;
}

String[] splited = value.toString().split("\t");

if(map.get(splited[1]) != null){    //map中大表的 key 对应的 value 不为空

Text keyOut = new Text(splited[0]);        //key=大表的第一列
Text valueOut = new Text(map.get(splited[1]));    //value=小表的第二列

context.write(keyOut, valueOut);
}
}
}
}


更多参考分布式缓存DistributedCache
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: