Hadoop实现多表关联
2015-12-06 16:58
435 查看
对于用Hadoop实现多表关联的算法,在理解mapreduce编程模型的基础上,实现起来并不难!
如下有两个文件:分别为factory.txt和addressID.txt
思路如下:可以根据两个文件中都存在的address数字进行关联。由于经过shuffle和sort后最后给reduce的是一个<key,(value-list)>,因此可以将数字作为这里的key,那么每个数字对应的<value-list>就是包含工厂名称以及地址的一个集合,在这个集合中,所有工厂名都属于这个集合中的那个地址,现在的问题就是要如何区分出工厂和地址,这很简单,在map里直接对地址和工厂名加入相应的标志就可以了。最后在reduce里进行解析,去除工厂名标志和地址标志,对单个地址和多个或一个工厂名称的集合做笛卡尔积,就可以求出每个工厂名与其对应的地址。
下面附上代码:
最后生成的结果如下:
如下有两个文件:分别为factory.txt和addressID.txt
factoryname addressed Beijing Red Star 1 Shenzhen Thunder 3 Guangzhou Honda 2 Beijing Rising 1 Guangzhou Development Bank 2 Tencent 3 Bank of Beijing 1
addressID addressname 1 Beijing 2 Guangzhou 3 Shenzhen 4 Xian最终输出的文件,要实现工厂名称与其地址的一对一对应。
思路如下:可以根据两个文件中都存在的address数字进行关联。由于经过shuffle和sort后最后给reduce的是一个<key,(value-list)>,因此可以将数字作为这里的key,那么每个数字对应的<value-list>就是包含工厂名称以及地址的一个集合,在这个集合中,所有工厂名都属于这个集合中的那个地址,现在的问题就是要如何区分出工厂和地址,这很简单,在map里直接对地址和工厂名加入相应的标志就可以了。最后在reduce里进行解析,去除工厂名标志和地址标志,对单个地址和多个或一个工厂名称的集合做笛卡尔积,就可以求出每个工厂名与其对应的地址。
下面附上代码:
import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.GenericOptionsParser; public class MTjoin { private static int time = 0; public static class Map extends Mapper<Object, Text, Text, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = new String(); line = value.toString(); if (line.contains("factoryname") || line.contains("addressID")) { return; } String factoryname = new String(); int i = 0; while (line.charAt(i)>='9' || line.charAt(i)<='0') { i++; } if (line.charAt(0)>='9' || line.charAt(0)<='0') { int j = i-1; while(line.charAt(j)!=' ') j--; String[] values = {line.substring(0,j),line.substring(i)}; context.write(new Text(values[1]), new Text("name+"+values[0])); //the left table }else { int j = i+1; while(line.charAt(j)!=' ') j++; String[] values = {line.substring(0,j),line.substring(j)}; context.write(new Text(values[0]), new Text("address+"+values[1])); //the right table } } } public static class Reduce extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ if (time == 0) { context.write(new Text("factoryName"), new Text("address")); time ++; } ArrayList<String> factories = new ArrayList<>(); ArrayList<String> addresses = new ArrayList<>(); Iterator iterator = values.iterator(); while(iterator.hasNext()){ String record = iterator.next().toString(); if (record.startsWith("name+")) { factories.add(record.substring(5)); }else if (record.startsWith("address+")) { addresses.add(record.substring(8)); }else { System.err.println("there is an error!"); } } if (factories.size()!=0 && addresses.size()!=0) { for(int m=0; m<addresses.size();m++){ for(int n=0; n<factories.size();n++){ context.write(new Text(factories.get(n)), new Text(addresses.get(m))); } } } } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); Job job = new Job(conf,"multiple table join"); if (otherArgs.length!=2) { System.err.println("11111111111111111!!!!!!!!!!!"); System.exit(2); } FileSystem fileSystem = FileSystem.get(URI.create(otherArgs[1]), conf); Path path = new Path(otherArgs[1]); if (fileSystem.exists(path)) { fileSystem.delete(path); } job.setJarByClass(MTjoin.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0]));; FileOutputFormat.setOutputPath(job, path); System.exit(job.waitForCompletion(true) ? 0:1); } }map程序中要生成一个左表和一个右表,然后在reduce里面进行关联,由于map输出只有两列数据,因此要包含四列左右表的信息就要将四列混合成两列,但是对于不同的表要加上左表或者右表的标志,在这里,我的左表第二个元素前面都加了“name+”,右表第二个元素前面都加了“address+”,这个标志帮助我们在reduce程序中解析出我们需要的信息。
最后生成的结果如下:
factoryName address Bank of Beijing Beijing Beijing Rising Beijing Beijing Red Star Beijing Guangzhou Development Bank Guangzhou Guangzhou Honda Guangzhou Tencent Shenzhen Shenzhen Thunder Shenzhen
相关文章推荐
- 详解HDFS Short Circuit Local Reads
- Hadoop_2.1.0 MapReduce序列图
- 使用Hadoop搭建现代电信企业架构
- 肯特·贝克:改变人生的代码整理魔法
- 你应该学习哪种编程语言?
- 单机版搭建Hadoop环境图文教程详解
- [转]我们需要一种其他人能使用的编程语言
- DB2编程序技巧(1)
- DB2编程序技巧 (四)
- 女人VS编程_国庆快乐
- DB2编程序技巧 (六)
- DB2编程序技巧 (三)
- DB2编程序技巧 (九)
- DB2编程序技巧 (七)
- DB2编程序小小技巧
- DB2编程序技巧 (五)
- 动易2006序列号破解算法公布
- DB2编程序技巧 (一)
- DB2编程序技巧 (八)