hadoop MapReduce 三种连接
2014-12-25 17:22
316 查看
为了实现内连接和外连接,MapReduce中有三种连接策略,如下所示。这三种连接策略有的在map阶段,有的在reduce阶段。它们都针对MapReduce的排序-合并(sort-merge)的架构进行了优化。
重分区连接(Repartition join)——reduce端连接。使用场景:连接两个或多个大型数据集。
复制连接(Replication join)——map端连接。使用场景:待连接的数据集中有一个数据集小到可以完全放在缓存中。
半连接(Semi-join)——另一个map端连接。使用场景:待连接的数据集中有一个数据集非常大,但同时这个数据集可以被过滤成小到可以放在内存中。
数据模型:
tb_dim_city.dat
tb_user_profiles.dat
reduce端连接:
Map端的连接
Semi连接
注意事项:
只有reduce连接实例能在eclipse中运行,另外两个必须打成JAR包放到hadoop上去运行。
参考: http://zengzhaozheng.blog.51cto.com/8219051/1392961
重分区连接(Repartition join)——reduce端连接。使用场景:连接两个或多个大型数据集。
复制连接(Replication join)——map端连接。使用场景:待连接的数据集中有一个数据集小到可以完全放在缓存中。
半连接(Semi-join)——另一个map端连接。使用场景:待连接的数据集中有一个数据集非常大,但同时这个数据集可以被过滤成小到可以放在内存中。
数据模型:
tb_dim_city.dat
0|其他|9999|9999|0 1|长春|1|901|1 2|吉林|2|902|1 3|四平|3|903|1 4|松原|4|904|1 5|通化|5|905|1 6|辽源|6|906|1 7|白城|7|907|1 8|白山|8|908|1
tb_user_profiles.dat
1|2G|123|1 2|3G|333|2 3|3G|555|1 4|2G|777|3 5|3G|666|4
reduce端连接:
package com.mr.reduceSideJoin; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; public class CombineValues implements WritableComparable<CombineValues> { private Text joinKey;// 链接关键字 private Text flag;// 文件来源标志 private Text secondPart;// 除了链接键外的其他部分 public void setJoinKey(Text joinKey) { this.joinKey = joinKey; } public void setFlag(Text flag) { this.flag = flag; } public void setSecondPart(Text secondPart) { this.secondPart = secondPart; } public Text getFlag() { return flag; } public Text getSecondPart() { return secondPart; } public Text getJoinKey() { return joinKey; } public CombineValues() { this.joinKey = new Text(); this.flag = new Text(); this.secondPart = new Text(); } @Override public void write(DataOutput out) throws IOException { this.joinKey.write(out); this.flag.write(out); this.secondPart.write(out); } @Override public void readFields(DataInput in) throws IOException { this.joinKey.readFields(in); this.flag.readFields(in); this.secondPart.readFields(in); } @Override public int compareTo(CombineValues o) { return this.joinKey.compareTo(o.getJoinKey()); } @Override public String toString() { // TODO Auto-generated method stub return "[flag=" + this.flag.toString() + ",joinKey=" + this.joinKey.toString() + ",secondPart=" + this.secondPart.toString() + "]"; } }
package com.mr.reduceSideJoin; import java.io.IOException; import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool { private static final Logger logger = LoggerFactory .getLogger(ReduceSideJoin_LeftOuterJoin.class); public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, CombineValues> { private CombineValues combineValues = new CombineValues(); private Text flag = new Text(); private Text joinKey = new Text(); private Text secondPart = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 获得文件输入路径 String pathName = ((FileSplit) context.getInputSplit()).getPath() .toString(); // 数据来自tb_dim_city.dat文件,标志即为"0" if (pathName.endsWith("tb_dim_city.dat")) { String[] valueItems = value.toString().split("\\|"); // 过滤格式错误的记录 if (valueItems.length != 5) { return; } flag.set("0"); joinKey.set(valueItems[0]); secondPart.set(valueItems[1] + "\t" + valueItems[2] + "\t" + valueItems[3] + "\t" + valueItems[4]); combineValues.setFlag(flag); combineValues.setJoinKey(joinKey); combineValues.setSecondPart(secondPart); context.write(combineValues.getJoinKey(), combineValues); }// 数据来自于tb_user_profiles.dat,标志即为"1" else if (pathName.endsWith("tb_user_profiles.dat")) { String[] valueItems = value.toString().split("\\|"); // 过滤格式错误的记录 if (valueItems.length != 4) { return; } flag.set("1"); joinKey.set(valueItems[3]); secondPart.set(valueItems[0] + "\t" + valueItems[1] + "\t" + valueItems[2]); combineValues.setFlag(flag); combineValues.setJoinKey(joinKey); combineValues.setSecondPart(secondPart); context.write(combineValues.getJoinKey(), combineValues); } } } public static class LeftOutJoinReducer extends Reducer<Text, CombineValues, Text, Text> { // 存储一个分组中的左表信息 private ArrayList<Text> leftTable = new ArrayList<Text>(); // 存储一个分组中的右表信息 private ArrayList<Text> rightTable = new ArrayList<Text>(); private Text secondPar = null; private Text output = new Text(); /** * 一个分组调用一次reduce函数;相同key的数据进了同一个reduce,这样就实现了join。 */ @Override protected void reduce(Text key, Iterable<CombineValues> value, Context context) throws IOException, InterruptedException { leftTable.clear(); rightTable.clear(); /** * 将分组中的元素按照文件分别进行存放 这种方法要注意的问题: 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM, * 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最 * 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。 */ for (CombineValues cv : value) { secondPar = new Text(cv.getSecondPart().toString()); // 左表tb_dim_city if ("0".equals(cv.getFlag().toString().trim())) { leftTable.add(secondPar); } // 右表tb_user_profiles else if ("1".equals(cv.getFlag().toString().trim())) { rightTable.add(secondPar); } } logger.info("tb_dim_city:" + leftTable.toString()); logger.info("tb_user_profiles:" + rightTable.toString()); // 这里体现了左连接 for (Text leftPart : leftTable) { for (Text rightPart : rightTable) { output.set(leftPart + "\t" + rightPart); // leftTable中有数据 rightTable中没有数据 就无法进到这一步 // rightTable中有数据 leftTable中没有数据 外面的循环就进不去 context.write(key, output); } } } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); // 获得配置文件对象 Job job = new Job(conf, "LeftOutJoinMR"); job.setJarByClass(ReduceSideJoin_LeftOuterJoin.class); FileInputFormat.addInputPath(job, new Path(args[0])); // 设置map输入文件路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 设置reduce输出文件路径 job.setMapperClass(LeftOutJoinMapper.class); job.setReducerClass(LeftOutJoinReducer.class); job.setInputFormatClass(TextInputFormat.class); // 设置文件输入格式 job.setOutputFormatClass(TextOutputFormat.class);// 使用默认的output格式 // 设置map的输出key和value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CombineValues.class); // 设置reduce的输出key和value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { try { Tool rdf = new ReduceSideJoin_LeftOuterJoin(); int returnCode = ToolRunner.run(rdf, args); System.exit(returnCode); } catch (Exception e) { System.out.println(e.getMessage()); } } }
Map端的连接
package com.mr.mapSideJoin; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MapSideJoinMain extends Configured implements Tool { private static final Logger logger = LoggerFactory .getLogger(MapSideJoinMain.class); public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, Text> { private HashMap<String, String> city_info = new HashMap<String, String>(); private Text outPutKey = new Text(); private Text outPutValue = new Text(); private String mapInputStr = null; private String mapInputSpit[] = null; private String city_secondPart = null; /** * 此方法在每个task开始之前执行,这里主要用作从DistributedCache * 中取到tb_dim_city文件,并将里边记录取出放到内存中。 */ @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader br = null; // 获得当前作业的DistributedCache相关文件 Path[] distributePaths = DistributedCache .getLocalCacheFiles(context.getConfiguration()); String cityInfo = null; for (Path p : distributePaths) { if (p.toString().endsWith("tb_dim_city.dat")) { // 读缓存文件,并放到mem中 br = new BufferedReader(new FileReader(p.toString())); while (null != (cityInfo = br.readLine())) { String[] cityPart = cityInfo.split("\\|", 5); if (cityPart.length == 5) { city_info.put(cityPart[0], cityPart[1] + "\t" + cityPart[2] + "\t" + cityPart[3] + "\t" + cityPart[4]); } } } } } /** * Map端的实现相当简单,直接判断tb_user_profiles.dat中的 * cityID是否存在我的map中就ok了,这样就可以实现Map Join了 */ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 排掉空行 if (value == null || value.toString().equals("")) { return; } mapInputStr = value.toString(); mapInputSpit = mapInputStr.split("\\|", 4); // 过滤非法记录 if (mapInputSpit.length != 4) { return; } // 判断链接字段是否在map中存在 city_secondPart = city_info.get(mapInputSpit[3]); if (city_secondPart != null) { this.outPutKey.set(mapInputSpit[3]); this.outPutValue.set(city_secondPart + "\t" + mapInputSpit[0] + "\t" + mapInputSpit[1] + "\t" + mapInputSpit[2]); context.write(outPutKey, outPutValue); } } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); // 获得配置文件对象 DistributedCache.addCacheFile(new Path(args[0]).toUri(), conf);// 为该job添加缓存文件 Job job = new Job(conf, "MapJoinMR"); job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(args[0])); // 设置map输入文件路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 设置reduce输出文件路径 job.setJarByClass(MapSideJoinMain.class); job.setMapperClass(LeftOutJoinMapper.class); job.setInputFormatClass(TextInputFormat.class); // 设置文件输入格式 job.setOutputFormatClass(TextOutputFormat.class);// 使用默认的output格式 // 设置map的输出key和value类型 job.setMapOutputKeyClass(Text.class); // 设置reduce的输出key和value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { try { int returnCode = ToolRunner.run(new MapSideJoinMain(), args); System.exit(returnCode); } catch (Exception e) { // TODO Auto-generated catch block logger.error(e.getMessage()); } } }
Semi连接
package com.mr.SemiJoin; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.mr.reduceSideJoin.CombineValues; public class SemiJoin extends Configured implements Tool { private static final Logger logger = LoggerFactory .getLogger(SemiJoin.class); public static class SemiJoinMapper extends Mapper<Object, Text, Text, CombineValues> { private CombineValues combineValues = new CombineValues(); private HashSet<String> joinKeySet = new HashSet<String>(); private Text flag = new Text(); private Text joinKey = new Text(); private Text secondPart = new Text(); /** * 将参加join的key从DistributedCache取出放到内存中,以便在map端将要参加join的key过滤出来。b */ @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader br = null; // 获得当前作业的DistributedCache相关文件 Path[] distributePaths = DistributedCache .getLocalCacheFiles(context.getConfiguration()); String joinKeyStr = null; for (Path p : distributePaths) { if (p.toString().endsWith("joinKey.dat")) { // 读缓存文件,并放到mem中 br = new BufferedReader(new FileReader(p.toString())); while (null != (joinKeyStr = br.readLine())) { joinKeySet.add(joinKeyStr); } } } } @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 获得文件输入路径 String pathName = ((FileSplit) context.getInputSplit()).getPath() .toString(); // 数据来自tb_dim_city.dat文件,标志即为"0" if (pathName.endsWith("tb_dim_city.dat")) { String[] valueItems = value.toString().split("\\|"); // 过滤格式错误的记录 if (valueItems.length != 5) { return; } // 过滤掉不需要参加join的记录 if (joinKeySet.contains(valueItems[0])) { flag.set("0"); joinKey.set(valueItems[0]); secondPart.set(valueItems[1] + "\t" + valueItems[2] + "\t" + valueItems[3] + "\t" + valueItems[4]); combineValues.setFlag(flag); combineValues.setJoinKey(joinKey); combineValues.setSecondPart(secondPart); context.write(combineValues.getJoinKey(), combineValues); } else { return; } }// 数据来自于tb_user_profiles.dat,标志即为"1" else if (pathName.endsWith("tb_user_profiles.dat")) { String[] valueItems = value.toString().split("\\|"); // 过滤格式错误的记录 if (valueItems.length != 4) { return; } // 过滤掉不需要参加join的记录 if (joinKeySet.contains(valueItems[3])) { flag.set("1"); joinKey.set(valueItems[3]); secondPart.set(valueItems[0] + "\t" + valueItems[1] + "\t" + valueItems[2]); combineValues.setFlag(flag); combineValues.setJoinKey(joinKey); combineValues.setSecondPart(secondPart); context.write(combineValues.getJoinKey(), combineValues); } else { return; } } } } public static class SemiJoinReducer extends Reducer<Text, CombineValues, Text, Text> { // 存储一个分组中的左表信息 private ArrayList<Text> leftTable = new ArrayList<Text>(); // 存储一个分组中的右表信息 private ArrayList<Text> rightTable = new ArrayList<Text>(); private Text secondPar = null; private Text output = new Text(); /** * 一个分组调用一次reduce函数 */ @Override protected void reduce(Text key, Iterable<CombineValues> value, Context context) throws IOException, InterruptedException { leftTable.clear(); rightTable.clear(); /** * 将分组中的元素按照文件分别进行存放 这种方法要注意的问题: 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM, * 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最 * 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。 */ for (CombineValues cv : value) { secondPar = new Text(cv.getSecondPart().toString()); // 左表tb_dim_city if ("0".equals(cv.getFlag().toString().trim())) { leftTable.add(secondPar); } // 右表tb_user_profiles else if ("1".equals(cv.getFlag().toString().trim())) { rightTable.add(secondPar); } } logger.info("tb_dim_city:" + leftTable.toString()); logger.info("tb_user_profiles:" + rightTable.toString()); for (Text leftPart : leftTable) { for (Text rightPart : rightTable) { output.set(leftPart + "\t" + rightPart); context.write(key, output); } } } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); // 获得配置文件对象 DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf); Job job = new Job(conf, "LeftOutJoinMR"); job.setJarByClass(SemiJoin.class); FileInputFormat.addInputPath(job, new Path(args[0])); // 设置map输入文件路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 设置reduce输出文件路径 job.setMapperClass(SemiJoinMapper.class); job.setReducerClass(SemiJoinReducer.class); job.setInputFormatClass(TextInputFormat.class); // 设置文件输入格式 job.setOutputFormatClass(TextOutputFormat.class);// 使用默认的output格式 // 设置map的输出key和value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CombineValues.class); // 设置reduce的输出key和value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { try { int returnCode = ToolRunner.run(new SemiJoin(), args); System.exit(returnCode); } catch (Exception e) { logger.error(e.getMessage()); } } }
注意事项:
只有reduce连接实例能在eclipse中运行,另外两个必须打成JAR包放到hadoop上去运行。
参考: http://zengzhaozheng.blog.51cto.com/8219051/1392961
相关文章推荐
- hadoop MapReduce 三种连接
- hadoop MapReduce 三种连接
- 使用Eclipse插件连接配置Mapreduce说明与教程(hadoop-eclipse-plugin 2.6)
- Hadoop教程(四):理解MapReduce、MapReduce计数器和连接、MapReduce Hadoop程序连接数据
- 【问题】spark运行python写的mapreduce任务,hadoop平台报错,java.net.ConnectException: 连接超时
- *****MapReduce连接:重分区连接【里面分析了org.apache.hadoop.contrib.utils.join包中的基础数据join原理和优化后的抽象类】
- win7使用eclipse连接hadoop集群,运行mapreduce报错之Failed to set permissions of path
- Hadoop Mapreduce 连接(Join)之一:重分区连接(Repartition join)
- 【Mapreduce】解决eclipse远程连接MapReduce,提示HADOOP_HOME or hadoop.home.dir are not set.
- Hadoop Mapreduce分区、分组、连接以及辅助排序(也叫二次排序)过程详解
- Windows 使用Eclipse配置连接hadoop,编译运行MapReduce --本地调试WordCount
- 大数据学习之路-第03天-配置hadoop,使用符号连接的方式,让三种配置形态共存。
- windows MyEclipse下连接hadoop并且编写调试mapReduce程序
- Centos6.5 64位 安装Hadoop2.7.0, MapReduce日志分析, Hive2.1.0, JDBC连接Hive查询 (4)
- Hadoop MapReduce例子-新版API多表连接Join之模仿订单配货
- Centos6.5 64位 安装Hadoop2.7.0, MapReduce日志分析, Hive2.1.0, JDBC连接Hive查询 (1)
- hadoop学习;自定义Input/OutputFormat;类引用mapreduce.mapper;三种模式
- hadoop学习;自定义Input/OutputFormat;类引用mapreduce.mapper;三种模式
- Centos6.5 64位 安装Hadoop2.7.0, MapReduce日志分析, Hive2.1.0, JDBC连接Hive查询 (2)
- Hadoop基础教程-第7章 MapReduce进阶(7.5 MapReduce 连接)