您的位置:首页 > 运维架构

求亿级记录中搜索次数Top N的搜索词(MapReduce实现)

2015-09-18 16:40 369 查看
程序事例:

日志信息:

二手车    1345
二手房    3416
洗衣机    2789

输入: N=2

输出:
二手房
洗衣机


map函数如下:

import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TopNMapper extends Mapper<Object, Text, NullWritable, Text> {
private TreeMap<IntWritable, Text> tm = new TreeMap<IntWritable, Text>();
private IntWritable mykey = new IntWritable();
private Text myvalue = new Text();
private int N = 10;
@Override
protected void map(Object key, Text value,
Mapper<Object, Text, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
String word = value.toString().split("\t")[0];
int num = Integer.parseInt(value.toString().split("\t")[1]);
mykey.set(num);
myvalue.set(word);
tm.put(mykey, myvalue);
if (tm.size() > N) tm.remove(tm.firstKey());
}

@Override
protected void cleanup(
Mapper<Object, Text, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
for (Map.Entry<IntWritable, Text> entry : tm.entrySet()) {
Text value = new Text(entry.getKey() + " " + entry.getValue());
context.write(NullWritable.get(), value);
}
}
}


Reduce函数如下:

import java.io.IOException;
import java.util.TreeMap;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TopNReducer extends Reducer<NullWritable, Text, NullWritable, Text>{
private TreeMap<IntWritable, Text> tm = new TreeMap<IntWritable, Text>();
private IntWritable mykey = new IntWritable();
private Text myvalue = new Text();
private int N = 10;
@Override
protected void reduce(NullWritable key, Iterable<Text> values,
Reducer<NullWritable, Text, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
for (Text val : values) {
String[] tmp = val.toString().split(" ");
mykey.set(Integer.parseInt(tmp[0]));
myvalue.set(tmp[1]);
tm.put(mykey, myvalue);
if (tm.size() > N) tm.remove(tm.firstKey());
}
for (Text res : tm.descendingMap().values()) {
context.write(NullWritable.get(), res);
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: