Hadoop MapReduce 在某一列上自连接(self join)
2017-04-20 21:47
411 查看
package mapreduce; import java.util.List; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Self_join { public static class Map extends Mapper<Object, Text, Text, Text>{ public void map(Object key,Text value, Context context) throws IOException,InterruptedException{ String line = value.toString(); String[] ss = line.split(" ", 2); context.write(new Text(ss[1]), new Text("left_"+ss[0])); context.write(new Text(ss[0]), new Text("right_"+ss[1])); } } public static class Reduce extends Reducer<Text, Text, Text, Text>{ private static int time =0; private static List<String> ch = new ArrayList<String>(); private static List<String> g = new ArrayList<String>(); public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{ if(time == 0){ context.write(new Text("grandchild"), new Text("grandparent")); time ++; } Iterator<Text> ite = values.iterator(); ch.clear(); g.clear(); while(ite.hasNext()){ String p = ite.next().toString(); if(p.startsWith("left_")){ ch.add(p.replaceFirst("^left_", "")); } if(p.startsWith("right_")){ g.add(p.replaceFirst("^right_", "")); } } Iterator<String> chi = ch.iterator(); Iterator<String> gi = g.iterator(); while(chi.hasNext()){ String c = chi.next(); while(gi.hasNext()){ context.write(new Text(c), new Text(gi.next())); } } } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Selfjoin"); job.setJarByClass(Self_join.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
相关文章推荐
- Hadoop链式MapReduce、多维排序、倒排索引、自连接算法、二次排序、Join性能优化、处理员工信息Join实战、URL流量分析、TopN及其排序、求平均值和最大最小值、数据清洗ETL、分析气
- *****MapReduce连接:重分区连接【里面分析了org.apache.hadoop.contrib.utils.join包中的基础数据join原理和优化后的抽象类】
- Hadoop MapReduce例子-新版API多表连接Join之模仿订单配货
- Hadoop Mapreduce 连接(Join)之一:重分区连接(Repartition join)
- Hadoop MapReduce进阶 使用DataJoin包实现Join
- hadoop中MapReduce多种join实现实例分析 推荐
- windows MyEclipse下连接hadoop并且编写调试mapReduce程序
- 基于mapreduce的Hadoop join实现
- Hadoop MapReduce进阶 使用DataJoin包实现Join
- MapReduce数据处理两表join连接 (Ruduce端连接)
- MapReduce,DataJoin,多表连接查询
- hadoop 原生MapReduce 实现数据连接
- Hadoop中MapReduce多种join实现实例分析
- Oracle中的自连接(self join)
- hadoop MapReduce join
- Hadoop MapReduce进阶 使用DataJoin包实现Join
- 基于hadoop2.2的map端表关联(map side join)mapreduce实现
- Hadoop MapReduce进阶 使用DataJoin包实现Join
- 基于mapreduce的 Hadoop join 实现分析(二)
- windows下连接hadoop并且编写调试mapReduce程序