MapReuce-Join操作-初级优化
2015-12-30 10:52
447 查看
在上一篇《MapReduce-Join操作-初体验》的结论中说明了上述join方法的不足之处,这一篇中将说明针对上一篇的几个缺陷进行一些一些改进,主要是针对上一篇提到几点:
1.效率低是因为在reduce端遍历了两次集合
2.资源的消耗大是因为重新创建了List来放几乎所有的迭代器中的数据
3.不能适用于所有的业务是因为正式环境往往一个reduce的迭代器中的数据量巨大,而List的最大值为Integer.MAX_VALUE,所以 在数据量巨大的时候,会造成List越界的错误
针对这三个问题的分析可知道,这都是因为在reduce阶段需要先遍历迭代器来找出关于phone的那一个对象,然后再遍历一次进行join的拼接。针对这一问题:那么我们能不能把phone的直接放到第一位那就不用第一次的遍历了,而是改为直接第一个元素拿到phone对象后,直接进行拼接。这样的话,我们就可以助助二次排序的方案来解决这个问题。这里不详细说明二次排序如果有不明白的可以参考《MapReduce-自定义Key-二次排序》《MapReduce-三次排序-曾经想不通的二次排序》
那么下面就开始解决问题:
测试数据同上一篇:
用户数据:
uid,name,phoneid
1,tom,40
2,jack,20
3,seven,30
4,lee,10
5,smith,20
6,张三,10
7,李四,30
8,王五,20
goodid,name
10,苹果
20,三星
30,LG
40,华为
输出结果:
lee 苹果
张三 苹果
jack 三星
smith 三星
王五 三星
seven LG
李四 LG
tom 华为
定义key:
总结:
这里只是对前一篇博客中的join做了一个简单的优化处理,后面会逐步用一些其它的方法来做一些其它方案的做优化,下一篇《MapReduce-Join中级优化-hadoop自带datajoin的解决方法》会说明基本hadoop自带的jar怎么处理JOIN操作。
1.效率低是因为在reduce端遍历了两次集合
2.资源的消耗大是因为重新创建了List来放几乎所有的迭代器中的数据
3.不能适用于所有的业务是因为正式环境往往一个reduce的迭代器中的数据量巨大,而List的最大值为Integer.MAX_VALUE,所以 在数据量巨大的时候,会造成List越界的错误
针对这三个问题的分析可知道,这都是因为在reduce阶段需要先遍历迭代器来找出关于phone的那一个对象,然后再遍历一次进行join的拼接。针对这一问题:那么我们能不能把phone的直接放到第一位那就不用第一次的遍历了,而是改为直接第一个元素拿到phone对象后,直接进行拼接。这样的话,我们就可以助助二次排序的方案来解决这个问题。这里不详细说明二次排序如果有不明白的可以参考《MapReduce-自定义Key-二次排序》《MapReduce-三次排序-曾经想不通的二次排序》
那么下面就开始解决问题:
测试数据同上一篇:
用户数据:
uid,name,phoneid
1,tom,40
2,jack,20
3,seven,30
4,lee,10
5,smith,20
6,张三,10
7,李四,30
8,王五,20
goodid,name
10,苹果
20,三星
30,LG
40,华为
输出结果:
lee 苹果
张三 苹果
jack 三星
smith 三星
王五 三星
seven LG
李四 LG
tom 华为
定义key:
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class UserKey implements WritableComparable<UserKey>{ private String pno = ""; private boolean isPhone = false; public UserKey() { super(); } public UserKey(String pno, boolean isPhone) { super(); this.pno = pno; this.isPhone = isPhone; } public String getPno() { return pno; } public void setPno(String pno) { this.pno = pno; } public boolean isPhone() { return isPhone; } public void setPhone(boolean isPhone) { this.isPhone = isPhone; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(pno); out.writeBoolean(isPhone);; } @Override public void readFields(DataInput in) throws IOException { this.pno = in.readUTF(); this.isPhone = in.readBoolean(); } /** * 二次排序的比较器保证进入reduce迭代器时phone对象在迭代器的第一个元素处 */ @Override public int compareTo(UserKey o) { if(!this.pno.equals(o.getPno())) { return this.pno.compareTo(o.getPno()); } else { return isPhone ? -1: 1; } } }定制序列化对象:
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class User implements Writable { private String uno = ""; private String name = ""; private String pname = ""; private String pno = ""; private int flag = 0; public User() { } public User(User u) { super(); this.uno = u.uno; this.name = u.name; this.pname = u.pname; this.pno = u.pno; this.flag = u.flag; } public User(String uno, String name, String pname, String pno, int flag) { super(); this.uno = uno; this.name = name; this.pname = pname; this.pno = pno; this.flag = flag; } @Override public void readFields(DataInput input) throws IOException { this.uno = input.readUTF(); this.name = input.readUTF(); this.pname = input.readUTF(); this.pno = input.readUTF(); this.flag = input.readInt(); } @Override public void write(DataOutput output) throws IOException { output.writeUTF(uno); output.writeUTF(name); output.writeUTF(pname); output.writeUTF(pno); output.writeInt(flag); } public String getUno() { return uno; } public void setUno(String uno) { this.uno = uno; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getPname() { return pname; } public void setPname(String pname) { this.pname = pname; } public String getPno() { return pno; } public void setPno(String pno) { this.pno = pno; } public int getFlag() { return flag; } public void setFlag(int flag) { this.flag = flag; } @Override public String toString() { return name + " " + pname; } }定制分组比较器:(这里不详细讲解用法及执行过程可以参考《MapReduce-自定义比较器》)
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class GroupingComparator extends WritableComparator { public GroupingComparator() { super(UserKey.class, true); } public GroupingComparator(Class<? extends WritableComparable> keyClass, boolean createInstances) { super(keyClass, createInstances); } public GroupingComparator(Class<? extends WritableComparable> keyClass, Configuration conf, boolean createInstances) { super(keyClass, conf, createInstances); } public GroupingComparator(Class<? extends WritableComparable> keyClass) { super(keyClass); } /** * 只使用第一个字段进行分组比较,以保证同一pno的user和phone对象在同一个分组里 */ @Override public int compare(WritableComparable a, WritableComparable b) { UserKey a1 = (UserKey)a; UserKey b1 = (UserKey)b; return a1.getPno().compareTo(b1.getPno()); } }map阶段:
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class JoinMapper extends Mapper<LongWritable, Text, UserKey, User> { private User u = new User(); private UserKey uk = new UserKey(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); /** * 判断是否为空行 */ if(line.trim().length() <= 0) { return; } String[] arr = line.split(","); /** * 如果是用户数据则把UserKey中的isPhone字段设为false */ if (arr.length == 3) { u.setUno(arr[0]); u.setName(arr[1]); u.setFlag(0); uk.setPno(arr[2]); uk.setPhone(false); context.write(uk, u); } else if (arr.length == 2) { /** * 如果是手机数据则把UserKey中的isPhone字段设为false */ u.setPname(arr[1]); u.setPno(arr[0]); u.setFlag(1); uk.setPno(arr[0].trim()); uk.setPhone(true); /** * 都把要join的字段作为key,这样就可以让其到reduce函数处理时在同一个 * 迭代器中,这样就可以在reduce函数中做join的操作 */ context.write(uk , u); } } }reduce阶段:
import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class JoinReducer extends Reducer<UserKey, User, NullWritable, Text> { private Text value = new Text(); @Override protected void reduce(UserKey key, Iterable<User> values, Context context) throws IOException, InterruptedException { int num = 0; User phone = null; /** * 直接在第一个元素中取出phone数据,后面的数据直接拼接后输出 */ for(User e: values) { if(num == 0) { phone = new User(e); num ++; } else { e.setPno(phone.getPno()); e.setPname(phone.getPname()); value.set(e.toString()); context.write(null, value); } } } }启动函数:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class JobMain { public static void main(String[] args) throws Exception{ Configuration configuration = new Configuration(); Job job = new Job(configuration, "join-job"); job.setJarByClass(JobMain.class); job.setMapperClass(JoinMapper.class); job.setMapOutputKeyClass(UserKey.class); job.setMapOutputValueClass(User.class); job.setGroupingComparatorClass(GroupingComparator.class); job.setReducerClass(JoinReducer.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); Path outputDir = new Path(args[1]); FileSystem fs = FileSystem.get(configuration); if(fs.exists(outputDir)) { fs.delete(outputDir, true); } FileOutputFormat.setOutputPath(job, outputDir); System.exit(job.waitForCompletion(true)?0:1); } }运行结果:
总结:
这里只是对前一篇博客中的join做了一个简单的优化处理,后面会逐步用一些其它的方法来做一些其它方案的做优化,下一篇《MapReduce-Join中级优化-hadoop自带datajoin的解决方法》会说明基本hadoop自带的jar怎么处理JOIN操作。
相关文章推荐
- webrtc windows工程下载包含sln以及编译介绍
- 关于UILabel产生黑边的原因及去除方法
- iOS获取设备型号、设备类型等信息
- 【python】多个文件共用日志系统的重复打印问题
- Spring MVC的异常统一处理方法
- Raspbian安装xrdp远程
- 大端、小端、网络字节序
- Spring 拦截器与过滤器
- C++中异常处理
- 熟悉git命令的小游戏
- MATLAB匹配追踪
- redis cluster搭建
- Git CMD - config: Get and set repository or global options
- Spring MVC的异常统一处理方法
- iOS 9音频应用播放音频之控制播放速度
- mac php 安装 encrypt 扩展
- Netty 实现 WebSocket 聊天功能
- 从机器学习谈起
- 【JavaScript面向对象编程】20151229(函数,也是一种数据类型)
- app技术博客整理