求亿级记录中搜索次数Top N的搜索词(MapReduce实现)
2015-09-18 16:40
369 查看
程序事例: 日志信息: 二手车 1345 二手房 3416 洗衣机 2789 输入: N=2 输出: 二手房 洗衣机
map函数如下:
import java.io.IOException; import java.util.Map; import java.util.TreeMap; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class TopNMapper extends Mapper<Object, Text, NullWritable, Text> { private TreeMap<IntWritable, Text> tm = new TreeMap<IntWritable, Text>(); private IntWritable mykey = new IntWritable(); private Text myvalue = new Text(); private int N = 10; @Override protected void map(Object key, Text value, Mapper<Object, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException { String word = value.toString().split("\t")[0]; int num = Integer.parseInt(value.toString().split("\t")[1]); mykey.set(num); myvalue.set(word); tm.put(mykey, myvalue); if (tm.size() > N) tm.remove(tm.firstKey()); } @Override protected void cleanup( Mapper<Object, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException { for (Map.Entry<IntWritable, Text> entry : tm.entrySet()) { Text value = new Text(entry.getKey() + " " + entry.getValue()); context.write(NullWritable.get(), value); } } }
Reduce函数如下:
import java.io.IOException; import java.util.TreeMap; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class TopNReducer extends Reducer<NullWritable, Text, NullWritable, Text>{ private TreeMap<IntWritable, Text> tm = new TreeMap<IntWritable, Text>(); private IntWritable mykey = new IntWritable(); private Text myvalue = new Text(); private int N = 10; @Override protected void reduce(NullWritable key, Iterable<Text> values, Reducer<NullWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException { for (Text val : values) { String[] tmp = val.toString().split(" "); mykey.set(Integer.parseInt(tmp[0])); myvalue.set(tmp[1]); tm.put(mykey, myvalue); if (tm.size() > N) tm.remove(tm.firstKey()); } for (Text res : tm.descendingMap().values()) { context.write(NullWritable.get(), res); } } }
相关文章推荐
- Android系统架构
- iOS122 有态度的iOS第三方资源聚合网站
- Linux启动过程详解
- OpenCV学习笔记__入门篇(一)
- linux下base命令
- Linux内核驱动之GPIO子系统(一)GPIO的使用【转】
- Linux内核驱动基础(一)常用宏定义【转】
- File permissions: why can't I list a directory?
- Runloop笔记
- tomcat启动报错
- SHELL编程基本知识点
- linux使用第三方lib文件
- 我的Linux主机操作记录续
- Xftp工具连接到远程linux主机上传下载
- Hadoop单节点环境搭建
- opengl函数功能详解
- Activity启动模式图文详解:standard, singleTop, singleTask 以及 singleInstance
- centos7 install 安装mysql
- 使用SecureCRT设置linux系统登录的ssh公钥认证
- Linux中的STDIN_FILENO和STDOUT_FILENO