Hadoop--Map/Reduce实现多表链接
2014-07-07 10:27
501 查看
MR实现多表连接的原理和单表连接时一样的,甚至比单表连接还要简单。
在map阶段只需要根据文件的名称区分左表还是右表。使用关联的字段作为key2。
在reduce中对values中的值分别存储到一个左表list和右表list中。对左表list和右表list进行一个笛卡尔积完事。
在map阶段只需要根据文件的名称区分左表还是右表。使用关联的字段作为key2。
在reduce中对values中的值分别存储到一个左表list和右表list中。对左表list和右表list进行一个笛卡尔积完事。
import java.io.*; import java.util.*; import org.apache.hadoop.io.*; import org.apache.hadoop.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.util.Tool; public class MTjoin extends Configured implements Tool { /* * 多表链接,与单表链接思路类似。将关联列作为map的key值,用数字区分左表和右表。在Reduce阶段对两个表进行笛卡尔积 * */ public static class Map extends Mapper<LongWritable,Text,Text,Text>{ public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{ String line=value.toString(); int linelen=line.length(); //去除文件首行 if(line.indexOf("factoryname")==-1&&line.indexOf("addressID")==-1) { //处理factory数据 if(line.charAt(linelen-2)==' ') { String facstr="1"+line.substring(0, linelen-2); String addrestr=String.valueOf(line.charAt(linelen-1)); context.write(new Text(addrestr), new Text(facstr)); }else{ String addreidstr=String.valueOf(line.charAt(0)); String addrenastr="2"+line.substring(1); context.write(new Text(addreidstr), new Text(addrenastr)); } } } } public static class Reduce extends Reducer<Text,Text,Text,Text>{ public void reduce(Text key,Iterable<Text> values,Context context)throws IOException, InterruptedException{ ArrayList<String> facarr=new ArrayList<String>(); ArrayList<String> addarr=new ArrayList<String>(); for(Text var:values){ if(var.toString().charAt(0)=='1') { facarr.add(var.toString().substring(1)); }else if(var.toString().charAt(0)=='2') { addarr.add(var.toString().substring(1)); } } if(facarr.size()!=0&&addarr.size()!=0) { for(int i=0;i<facarr.size();i++) { context.write(new Text(facarr.get(i)), new Text(addarr.get(0))); } } } } @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf=new Configuration(); Job job=new Job(conf,"MTjoin"); job.setJarByClass(MTjoin.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean success=job.waitForCompletion(true); return success?0:1; } public static void main(String[] args)throws Exception{ int ret=ToolRunner.run(new MTjoin(), args); System.exit(ret); } }
相关文章推荐
- Hadoop -- Map-Reduce具体实现详解
- HadoopMapReduce -Map-Reduce具体实现详解
- Hadoop MapReduce概念学习系列之map并发任务数和reduce并发任务数的原理和代码实现(十八)
- 用MPI实现Hadoop Map/Reduce的TeraSort
- Python实现用Hadoop的map/reduce对web日志进行统计
- 使用SAS实现HADOOP Map/Reduce程序-wordcount
- HadoopMapReduce --Map-Reduce具体实现详解
- Lucene-Hadoop, GFS中Map/Reduce的简单实现
- Hadoop Map/Reduce编程模型实现海量数据处理—数字求和-Hadoop学习
- 实例讲解hadoop中的map/reduce查询(python语言实现)
- Hadoop Map/Reduce编程模型实现海量数据处理—数字求和-Hadoop学习
- 实例讲解hadoop中的map/reduce查询(python语言实现)
- 实例讲解hadoop中的map/reduce查询(python语言实现)
- Hadoop-Map/Reduce实现实现倒排索引
- Python+Hadoop Streaming实现MapReduce(如何给map和reduce的脚本传递参数)
- Python+Hadoop Streaming实现MapReduce(如何给map和reduce的脚本传递参数)
- Python+Hadoop Streaming实现MapReduce(如何给map和reduce的脚本传递参数)
- Lucene-Hadoop, GFS中Map/Reduce的简单实现
- Hadoop Streaming: 使用Java以外的语言去实现Map/Reduce
- 实例讲解hadoop中的map/reduce查询(python语言实现)