使用hadoop的datajoin包进行关系型join操作
2012-08-15 21:01
495 查看
datajoin包在hadoop的contrib目录下,我们也可以在src下面看见其源码,它的源码很小,我建议大体看看以了解其原理。
利用datajoin进行join操作,在《Hadoop in action》里面已经讲的十分清楚,在这里只提及值得注意的几个地方。
TaggedMapOutput的目的是标识数据,让我们知道哪个记录是从哪里来的。
DataJoinMapperBase类中的generateInputTag在map任务开始时被调用(还没进行map函数),其目的是生成tag,并自动存储于DataJoinMapperBase类的inputTag中。
generateTaggedMapOutput()用于生存带标签的数据,这可以让我们知道数据的来源,在我们想进行left join /right join的时候很有用,同时对于过滤数据等操作也有帮助。
datajoin在当前是用旧API写的,也就是说Mapper子类是实现Mapper接口而不是扩展Mapper虚类,但是MapperChain.addMapper虽然是在旧API目录下面,但是却只支持扩展虚类的方式,相信当你数据流比较长的时候这会给你带来麻烦,这个问题我没有解决,重写datajoin包可能是一种好的方式,但你也可以手动执行多个作业来间接达到目的。
下面是我自己想的一个练习,由于上述的第4点的限制,这其实是一个不完整的练习。
数据是《hadoop in action》的数据:
题目:选出id为10,000到1000,000,000的用户数据进行right join(保留右边,左边为如果没有对应用户信息则设为NULL)。
说明:我们设想订单可以是匿名用户购买的(比如淘宝网),现在我希望知道这个id范围的一些订单信息。
在generateTaggedMapOutput中我们进行数据过滤,把10000到1000,000,000间的数据选出来,然后在combine进行left join,我们知道combine函数是决定联结方式的地方。
参考: [1] <<hadoop in action>>
利用datajoin进行join操作,在《Hadoop in action》里面已经讲的十分清楚,在这里只提及值得注意的几个地方。
TaggedMapOutput的目的是标识数据,让我们知道哪个记录是从哪里来的。
DataJoinMapperBase类中的generateInputTag在map任务开始时被调用(还没进行map函数),其目的是生成tag,并自动存储于DataJoinMapperBase类的inputTag中。
generateTaggedMapOutput()用于生存带标签的数据,这可以让我们知道数据的来源,在我们想进行left join /right join的时候很有用,同时对于过滤数据等操作也有帮助。
datajoin在当前是用旧API写的,也就是说Mapper子类是实现Mapper接口而不是扩展Mapper虚类,但是MapperChain.addMapper虽然是在旧API目录下面,但是却只支持扩展虚类的方式,相信当你数据流比较长的时候这会给你带来麻烦,这个问题我没有解决,重写datajoin包可能是一种好的方式,但你也可以手动执行多个作业来间接达到目的。
下面是我自己想的一个练习,由于上述的第4点的限制,这其实是一个不完整的练习。
数据是《hadoop in action》的数据:
datajoin_customers文件 | datajoin_orders文件 |
10001,Stephanie Leung,555-555-5555 10002,Edward Kim,123-456-7890 10003,Jose Madriz,281-330-8004 10004,David Stork,408-555-0000 102343,Posa Wu, 12387887989 | 10003,A,12.95,02-Jun-2008 10001,B,88.25,20-May-2008 10002,C,32.00,30-Nov-2007 10003,D,25.02,22-Jan-2009 21312,F,32.00,23-Jan-2010 |
说明:我们设想订单可以是匿名用户购买的(比如淘宝网),现在我希望知道这个id范围的一些订单信息。
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase; import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class DataJoin extends Configured implements Tool{ public static class Mapper extends DataJoinMapperBase{ protected Text generateInputTag(String inputFile) { return new Text(inputFile); } protected TaggedMapOutput generateTaggedMapOutput(Object value) { TaggedMapOutput ret = new MyTaggedWritable((Text)value); String ck = ((Text)value).toString().split(",", 2)[0]; if(10000 > Long.valueOf(ck) || 1000000000 < Long.valueOf(ck)){ return null; } ret.setTag(new Text(this.inputTag)); return ret; } protected Text generateGroupKey(TaggedMapOutput aRecord) { return new Text(aRecord.getData().toString().split(",")[0]); } } public static class Reducer extends DataJoinReducerBase{ protected TaggedMapOutput combine(Object[] tags, Object[] values) { if(tags.length < 1) return null; if(tags.length == 1 && ((Text)tags[0]).toString().endsWith("customers")){ return null; } String retStr = ""; if(tags.length == 1 && ((Text)tags[0]).toString().endsWith("orders")){ retStr = "NULL,"; } for(int i = 0; i < values.length; i++){ if(i > 0) retStr += ","; retStr +=((MyTaggedWritable)values[i]).getData().toString().split(",",2)[1]; } TaggedMapOutput ret = new MyTaggedWritable(new Text(retStr)); ret.setTag((Text)tags[0]); return ret; } } public static class MyTaggedWritable extends TaggedMapOutput{ public Text data; public MyTaggedWritable(){ this.data = new Text(""); //必须有,否则反序列化时出错。 } public MyTaggedWritable(Text data){ this.data = data; } public void readFields(DataInput in) throws IOException { this.tag.readFields(in); this.data.readFields(in); } public void write(DataOutput out) throws IOException { this.tag.write(out); this.data.write(out); } public Writable getData() { return this.data; } } public int run(String[] arg0) throws Exception { Configuration conf = getConf(); JobConf job = new JobConf(conf, DataJoin.class); job.setJarByClass(DataJoin.class); Path in = new Path(arg0[0]); Path out = new Path(arg0[1]); FileInputFormat.setInputPaths(job,in); FileOutputFormat.setOutputPath(job, out); job.setJobName("DataJoin"); job.setMapperClass(Mapper.class); job.setReducerClass(Reducer.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MyTaggedWritable.class); job.setOutputValueClass(Text.class); job.set("mapred.textoutputformat.separator", ","); JobClient.runJob(job); return 0; } public static void main(String args[]) throws Exception{ int res = ToolRunner.run(new Configuration(), new DataJoin(),args); System.exit(res); } }
在generateTaggedMapOutput中我们进行数据过滤,把10000到1000,000,000间的数据选出来,然后在combine进行left join,我们知道combine函数是决定联结方式的地方。
参考: [1] <<hadoop in action>>
相关文章推荐
- 使用Hadoop的datajoin包进行关系型join操作【hadoop关系型join,源码解析及如何应用jar包中的抽象类】
- Hadoop MapReduce进阶 使用DataJoin包实现Join
- Hadoop MapReduce进阶 使用DataJoin包实现Join
- 在MySQL中使用JOIN语句进行连接操作的详细教程
- Hadoop之使用python实现数据集合间join操作
- Hadoop之使用python实现数据集合间join操作
- Hadoop MapReduce进阶 使用DataJoin包实现Join
- 【大数据系列】windows环境下搭建hadoop开发环境使用api进行基本操作
- MySQL 使用locate函数对 GROUP_CONCAT 列进行 JOIN 操作
- Hadoop使用Java进行文件修改删除操作
- 在MySQL中使用JOIN语句进行连接操作的详细教程
- Hadoop MapReduce进阶 使用DataJoin包实现Join
- Hadoop MapReduce进阶 使用DataJoin包实现Join
- hadoop(02)、使用JAVA API对HDFS进行基本操作
- Hadoop使用DistributedCache进行join
- Hbase总结(三)--使用spring-data-hadoop进行hbase的读写操作
- 使用MFC的CFile文件类和CArchive串行化类进行二进制文件读写操作
- 使用coding进行项目代码管理(全程可视化操作!)
- 如何使用ASP进行打印操作
- 使用使用for in 语句,并对数组中元素进行了增删操作,报错却不知怎么办?