Reduce侧联接
2015-08-26 21:23
351 查看
案例分析前提,了解其原理,以及术语
原理部分:
原理:
1、mapper端输入后,将数据封装成TaggedMapOutput类型,此类型封装数据源(tag)和值(value);
2、map阶段输出的结果不在是简单的一条数据,而是一条记录。记录=数据源(tag)+数据值(value).
3、combine接收的是一个组合:不同数据源却有相同组键的值;
4、不同数据源的每一条记录只能在一个combine中出现;
如图:
![](http://img.blog.csdn.net/20150826202931789)
![](http://img.blog.csdn.net/20150826202947368)
MapperClass.java
TaggedWritable.java
Reduce.java
Drive.java
Customers.txt
Orders.txt
所碰到问题有几点,提出来和大家分析一下
1。第一个问题是DataJoinMapperBase包的问题,前面已经解决了
2。第二个问题是原来的程序会报一个
所以你需要给一个构造方法
3。第三个问题是我有两个多个文件怎么导入,你将多个文件放入同一个文件夹里然后用
就可以导入多个文件啦,同理也可以拼file*.txt之类的
4。有的时候我为了测试一个工程,从test1测试到了test20,为了方便我们输出的时候总是要创建一个新的目录,解决方案如下
5。日志:
放在src目录下即可
术语部分: 1.Data Source:基本与关系数据库中的表相似,形式为:(例子中为CSV格式) 2.Tag:由于记录类型(Customers或Orders)与记录本身分离,标记一个Record会确保特殊元数据会一致存在于记录中。在这个目的下,我们将使用每个record自身的Data source名称标记每个record。 3.Group Key:Group Key类似于关系数据库中的链接键(join key),在我们的例子中,group key就是Customer ID(第一列的3)。由于datajoin包允许用户自定义group key,所以其较之关系数据库中的join key更一般、平常。
原理部分:
原理:
1、mapper端输入后,将数据封装成TaggedMapOutput类型,此类型封装数据源(tag)和值(value);
2、map阶段输出的结果不在是简单的一条数据,而是一条记录。记录=数据源(tag)+数据值(value).
3、combine接收的是一个组合:不同数据源却有相同组键的值;
4、不同数据源的每一条记录只能在一个combine中出现;
如图:
1.利用datajoin包来实现join: --------------------- Hadoop的datajoin包中有三个需要我们继承的类:DataJoinMapperBase,DataJoinReducerBase,TaggedMapOutput。正如其名字一样,我们的MapClass将会扩展DataJoinMapperBase,Reduce类会扩展DataJoinReducerBase。这个datajoin包已经实现了map()和reduce()方法,因此我们的子类只需要实现一些新方法来设置一些细节。 在用DataJoinMapperBase和DataJoinReducerBase之前,我们需要弄清楚我们贯穿整个程序使用的新的虚数据类TaggedMapOutput。 根据之前我们在图Advance MapReduce的数据流中所展示的那样,mapper输出一个包(由一个key和一个value(tagged record)组成)。datajoin包将key设置为Text类型,将value设置为TaggedMapOutput类型(TaggedMapOutput是一个将我们的记录使用一个Text类型的tag包装起来的数据类型)。它实现了getTag()和setTag(Text tag)方法。它还定义了一个getData()方法,我们的子类将实现这个方法来处理record记录。我们并没有明确地要求子类实现setData()方法,但我们最好还是实现这个方法以实现程序的对称性(或者在构造函数中实现)。作为Mapper的输出,TaggedMapOutput需要是Writable类型,因此的子类还需要实现readFields()和write()方法。 DataJoinMapperBase: ------------------- 回忆join数据流图,mapper的主要功能就是打包一个record使其能够和其他拥有相同group key的记录去向一个Reducer。DataJoinMapperBase完成所有的打包工作,这个类定义了三个虚类让我们的子类实现: protected abstract Text generateInputTag(String inputFile); protected abstract TaggedMapOutput generateTaggedMapOutut(Object value); protected abstract Text generateGroupKey(TaggedMapOutput aRecored); 在一个map任务开始之前为所有这个map任务会处理的记录定义一个tag(Text),结果将保存到DataJoinMapperBase的inputTag变量中,我们也可以保存filename至inputFile变量中以待后用。 在map任务初始化之后,DataJoinMapperBase的map()方法会对每一个记录执行。它调用了两个我们还没有实现的虚方法:generateTaggedMapOutput()以及generateGroupKey(aRecord);(详见代码) DataJoinReducerBase: -------------------- DataJoinMapperBase将我们所需要做的工作以一个full outer join的方式简化。我们的Reducer子类只需要实现combine()方法来滤除掉我们不需要的组合来得到我们需要的(inner join, left outer join等)。同时我们也在combiner()中将我们的组合格式化为输出格式。
/hadoop-2.6.0/share/hadoop/tools/lib 程序需要自己手动导入Jar包
MapperClass.java
package com.yc.zzg.test; import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MapperClass extends DataJoinMapperBase{ @Override protected Text generateGroupKey(TaggedMapOutput arg0) { String line = ((Text)arg0.getData()).toString(); String[] tokens = line.split(","); String groupKey = tokens[0]; return new Text(groupKey); } @Override protected Text generateInputTag(String arg0) { return new Text(arg0); } @Override protected TaggedMapOutput generateTaggedMapOutput(Object arg0) { TaggedWritable tw = new TaggedWritable((Text)arg0); tw.setTag(this.inputTag); return tw; } }
TaggedWritable.java
package com.yc.zzg.test; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; public class TaggedWritable extends TaggedMapOutput { private Writable data; public TaggedWritable() { } public TaggedWritable(Writable data) { this.tag = new Text(""); this.data = data; } @Override public Writable getData() { return data; } public void setData(Writable data) { this.data = data; } /** *TaggedMapOutput实现了Writable */ @Override public void readFields(DataInput in) throws IOException { this.tag.readFields(in); //加入此部分代码,否则,可能报空指针异常 String temp=in.readUTF(); if (this.data == null|| !this.data.getClass().getName().equals(temp)) { try { this.data = (Writable) ReflectionUtils.newInstance( Class.forName(temp), null); } catch (ClassNotFoundException e) { e.printStackTrace(); } } this.data.readFields(in); } @Override public void write(DataOutput out) throws IOException { this.tag.write(out); //此行代码很重要 out.writeUTF(this.data.getClass().getName()); this.data.write(out); } }
Reduce.java
package com.yc.zzg.test; import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import org.apache.hadoop.io.Text; public class Reduce extends DataJoinReducerBase { @Override protected TaggedMapOutput combine(Object[] tags, Object[] values) { // if(tags.length<2)return null; //用来控制内连接 StringBuffer joinData = new StringBuffer(); for(Object value: values){ TaggedWritable tw = (TaggedWritable)value; String recordLine = ((Text)tw.getData()).toString(); String[] tokens = recordLine.split(",",2); joinData.append(tokens[1]); } TaggedWritable rtv = new TaggedWritable(new Text(new String(joinData))); rtv.setTag((Text)tags[0]); return rtv; } }
Drive.java
package com.yc.zzg.test; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Drive { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); JobConf job = new JobConf(conf, Drive.class); Path in = new Path("hdfs://localhost:9000/input/inputtest/*"); Path out = new Path("hdfs://localhost:9000/output/test20"); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("DataJoin"); job.setMapperClass(MapperClass.class); job.setReducerClass(Reduce.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TaggedWritable.class); JobClient.runJob(job); } }
Customers.txt
1,Stephanie Leung,555-555-5555 2,Edward Kim,123-456-7890 3,Jose Madriz,281-330-8004 4,David Stork,408-555-0000
Orders.txt
3,A,12.95,02-Jun-2008 1,B,88.25,20-May-2008 2,C,32.00,30-Nov-2007 3,D,25.02,22-Jan-2009
所碰到问题有几点,提出来和大家分析一下
1。第一个问题是DataJoinMapperBase包的问题,前面已经解决了
2。第二个问题是原来的程序会报一个
java.lang.Exception: java.lang.RuntimeException: java.lang.NoSuchMethodException: com.yc.zzg.test.TaggedWritable.<init>() Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodException: com.yc.zzg.test.TaggedWritable.<init>()
所以你需要给一个构造方法
public TaggedWritable() { }
3。第三个问题是我有两个多个文件怎么导入,你将多个文件放入同一个文件夹里然后用
Path in = new Path("hdfs://localhost:9000/input/inputtest/*");
就可以导入多个文件啦,同理也可以拼file*.txt之类的
4。有的时候我为了测试一个工程,从test1测试到了test20,为了方便我们输出的时候总是要创建一个新的目录,解决方案如下
1。hadoop需要把集群上的core-site.xml和hdfs-site.xml放到当前工程下。eclipse工作目录的bin文件夹下面 2。 FileSystem fs=FileSystem.get(conf); if(fs.exists(out)){ fs.delete(out, true); System.out.println("输出路径存在,已删除!"); }
5。日志:
log4j.rootLogger=info, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n
放在src目录下即可
相关文章推荐
- 折叠Collapse插件
- linux 根据sp和fp查调用栈
- html 利用 frameset 进行简单的框架布局
- PHP的curl带验证码模拟登陆学校的教务系统(验证码已被自动识别,不需要输入)
- 02类型转化 进程空间 数组2015/08/26
- 设计模式之策略模式
- 最新制作 Mac 电脑 U 盘启动盘笔记
- javascirpt对象创建
- HDU3342 - Legal or Not 拓补排序模板
- (转)Qt中如何使用样式表QPalette以及相关注意事项
- js获取?后面具体参数的值
- Object -C NSSet -- 笔记
- 创建园子第一天
- EasyUI DataGrid 的常用设置
- 01文件循环位移加密 C预处理宏2015/08/24
- Windows Azure Active Directory (3) China Azure AD增加新用户
- Xcode中 黄色文件夹与蓝色文件夹的个人反思
- arcgis sever发布地图服务,使用arcmap版本不同发布不成功的探讨
- Codeforces Round #317 [AimFund Thanks-Round] (Div. 1) B. Minimization 贪心 dp
- ACM网络流模板 最大流ISAP URAL1774