Hadoop0.20.2 Bloom filter应用演示样例
2015-12-23 10:19
411 查看
1. 简单介绍
參见《Hadoop in Action》P102 以及 《Hadoop实战(第2版)》(陆嘉恒)P69
![](http://img.blog.csdn.net/20140604115057843?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvaGV6aGl4dWU=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
![](http://img.blog.csdn.net/20140604115123796?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvaGV6aGl4dWU=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
2. 案例
网上大部分的说明不过依照《Hadoop in Action》中的演示样例代码给出。这里是Hadoop0.20.2版本号,在该版本号中已经实现了BloomFilter。
案例文件例如以下:
customers.txt
1,Stephanie Leung,555-555-5555
2,Edward Kim,123-456-7890
3,Jose Madriz,281-330-8004
4,David Stork,408-555-0000
-----------------------------------------------------------------
orders.txt
3,A,12.95,02-Jun-2008
1,B,88.25,20-May-2008
2,C,32.00,30-Nov-2007
3,D,25.02,22-Jan-2009
5,E,34.59,05-Jan-2010
6,F,28.67,16-Jan-2008
7,G,49.82,24-Jan-2009
两个文件通过customer ID关联。
3. 代码
參见《Hadoop in Action》P102 以及 《Hadoop实战(第2版)》(陆嘉恒)P69
2. 案例
网上大部分的说明不过依照《Hadoop in Action》中的演示样例代码给出。这里是Hadoop0.20.2版本号,在该版本号中已经实现了BloomFilter。
案例文件例如以下:
customers.txt
1,Stephanie Leung,555-555-5555
2,Edward Kim,123-456-7890
3,Jose Madriz,281-330-8004
4,David Stork,408-555-0000
-----------------------------------------------------------------
orders.txt
3,A,12.95,02-Jun-2008
1,B,88.25,20-May-2008
2,C,32.00,30-Nov-2007
3,D,25.02,22-Jan-2009
5,E,34.59,05-Jan-2010
6,F,28.67,16-Jan-2008
7,G,49.82,24-Jan-2009
两个文件通过customer ID关联。
3. 代码
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.bloom.BloomFilter; import org.apache.hadoop.util.bloom.Key; import org.apache.hadoop.util.hash.Hash; public class BloomMRMain { public static class BloomMapper extends Mapper<Object, Text, Text, Text> { BloomFilter bloomFilter = new BloomFilter(10000, 6, Hash.MURMUR_HASH); protected void setup(Context context) throws IOException ,InterruptedException { Configuration conf = context.getConfiguration(); String path = "hdfs://localhost:9000/user/hezhixue/input/customers.txt"; Path file = new Path(path); FileSystem hdfs = FileSystem.get(conf); FSDataInputStream dis = hdfs.open(file); BufferedReader reader = new BufferedReader(new InputStreamReader(dis)); String temp; while ((temp = reader.readLine()) != null) { // System.out.println("bloom filter temp:" + temp); String[] tokens = temp.split(","); if (tokens.length > 0) { bloomFilter.add(new Key(tokens[0].getBytes())); } } } protected void map(Object key, Text value, Context context) throws IOException ,InterruptedException { //获得文件输入路径 String pathName = ((FileSplit) context.getInputSplit()).getPath().toString(); if (pathName.contains("customers")) { String data = value.toString(); String[] tokens = data.split(","); if (tokens.length == 3) { String outKey = tokens[0]; String outVal = "0" + ":" + tokens[1] + "," + tokens[2]; context.write(new Text(outKey), new Text(outVal)); } } else if (pathName.contains("orders")) { String data = value.toString(); String[] tokens = data.split(","); if (tokens.length == 4) { String outKey = tokens[0]; System.out.println("in map and outKey:" + outKey); if (bloomFilter.membershipTest(new Key(outKey.getBytes()))) { String outVal = "1" + ":" + tokens[1] + "," + tokens[2]+ "," + tokens[3]; context.write(new Text(outKey), new Text(outVal)); } } } } } public static class BloomReducer extends Reducer<Text, Text, Text, Text> { ArrayList<Text> leftTable = new ArrayList<Text>(); ArrayList<Text> rightTable = new ArrayList<Text>(); protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException ,InterruptedException { leftTable.clear(); rightTable.clear(); for (Text val : values) { String outVal = val.toString(); System.out.println("key: " + key.toString() + " : " + outVal); int index = outVal.indexOf(":"); String flag = outVal.substring(0, index); if ("0".equals(flag)) { leftTable.add(new Text(outVal.substring(index+1))); } else if ("1".equals(flag)) { rightTable.add(new Text(outVal.substring(index + 1))); } } if (leftTable.size() > 0 && rightTable.size() > 0) { for(Text left : leftTable) { for (Text right : rightTable) { context.write(key, new Text(left.toString() + "," + right.toString())); } } } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: BloomMRMain <in> <out>"); System.exit(2); } Job job = new Job(conf, "BloomMRMain"); job.setJarByClass(BloomMRMain.class); job.setMapperClass(BloomMapper.class); job.setReducerClass(BloomReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
相关文章推荐
- stl算法copy
- <php+mysql>Mac配置APACHE+PHP+MYSQL+PHPMYADMIN
- Linux 进程管理
- 在Powershell ISE中添加sharepoint的智能提示,Enable SharePoint PowerShell Commandlets in the PowerShell ISE
- DROP TABLE、TRUNCATE TABLE和DELETE的区别
- linux 下shutdown命令关闭多个tomcat问题
- /bin/bash: prebuilts/misc/linux-x86/bison/bison: No such file or directory
- shell set 命令
- arm linux 启动流程分析
- Linux/Unix 桌面趣事:桌面上追逐的猫和老鼠
- linux内核进程详解
- Unity调用外部EXE或Shell命令
- spatialhadoop2.3源码阅读(十一) ShapeRecordReader & SpatialRecordReader[Grid Index MapReuce]
- 分布式系统全链路监控方案设计
- Hadoop生态系统全表
- web在线打印,打印阅览,打印维护,打印设计
- Linux系统下处理已删除文件继续占用磁盘空间的问题
- uptime命令查看系统启动时间和运行时间、查看linux系统负载
- 网站攻击的三种手段及预防方法
- openfire的配置搭建