MapReduce/Hadoop的TopN解决方案之键唯一的情况
2017-11-02 15:03
393 查看
TopN问题:上星期访问次数最多的10个URL是哪些?所有猫中体重最大的10只猫是哪些?
本文使用 MapReduce/Hadoop的TopN解决方案,假设所有输入键都是唯一的。也就是说,对于一个给定的输入集合{<K,V>},所有K都是唯一的。
例如对于下面的猫,cat1不会再出现第二次
输入:cat.txt
期待输出:
一、用到的核心数据结构:Java中的SortedMap和TreeMap,其中SortedMap可以实现按key值排序。对于如下测试类
输出为:
二、code
四、扩展
1、Top5怎么办?传入另一个参数
2、不求前10个求后10个怎么办?将
本文使用 MapReduce/Hadoop的TopN解决方案,假设所有输入键都是唯一的。也就是说,对于一个给定的输入集合{<K,V>},所有K都是唯一的。
例如对于下面的猫,cat1不会再出现第二次
输入:cat.txt
12,cat1,cat1 13,cat2,cat2 14,cat3,cat3 15,cat4,cat4 10,cat5,cat5 100,cat100,cat100 200,cat200,cat200 300,cat300,cat300 1,cat001,cat001 67,cat67,cat67 22,cat22,cat22 23,cat23,cat23 1000,cat1000,cat1000 2000,cat2000,cat2000
期待输出:
2000 cat2000,cat2000 1000 cat1000,cat1000 300 cat300,cat300 200 cat200,cat200 100 cat100,cat100 67 cat67,cat67 23 cat23,cat23 22 cat22,cat22 15 cat4,cat4 14 cat3,cat3
一、用到的核心数据结构:Java中的SortedMap和TreeMap,其中SortedMap可以实现按key值排序。对于如下测试类
package topN_hadoop1; import java.util.Map.Entry; import java.util.SortedMap; import java.util.TreeMap; public class Test { public static void main(String[] args) { SortedMap<Integer, String> top = new TreeMap<Integer, String>(); top.put(1, "chenjie,1"); top.put(10, "zhanghan,10"); top.put(3 ,"renbo,3"); for(Entry< Integer, String> entry : top.entrySet()) { System.out.println(entry); } System.out.println("------------------------------------------------------"); System.out.println("firstKey:" + top.firstKey()); System.out.println("first:" + top.get(top.firstKey())); System.out.println("lastKey:" + top.lastKey()); System.out.println("last:" + top.get(top.lastKey())); top.remove(top.firstKey()); System.out.println("remove first "); System.out.println("------------------------------------------------------"); for(Entry< Integer, String> entry : top.entrySet()) { System.out.println(entry); } top.remove(top.lastKey()); System.out.println("remove last "); System.out.println("------------------------------------------------------"); for(Entry< Integer, String> entry : top.entrySet()) { System.out.println(entry); } } }
输出为:
1=chenjie,1 3=renbo,3 10=zhanghan,10 ------------------------------------------------------ firstKey:1 first:chenjie,1 lastKey:10 last:zhanghan,10 remove first ------------------------------------------------------ 3=renbo,3 10=zhanghan,10 remove last ------------------------------------------------------ 3=renbo,3
二、code
package topN_hadoop1; import java.io.IOException; import java.util.SortedMap; import java.util.TreeMap; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class TopNMapper extends Mapper<LongWritable,Text , NullWritable, Text> { private int N = 10; // default private SortedMap<Integer, String> top = new TreeMap<Integer, String>(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] lines = value.toString().split(","); String keyAsString = value.toString(); int frequency = Integer.valueOf(lines[0]); String compositeValue = keyAsString + "," + frequency; top.put(frequency, compositeValue); if (top.size() > N) { top.remove(top.firstKey()); } } @Override protected void setup(Context context) throws IOException, InterruptedException { this.N = context.getConfiguration().getInt("N", 10); // default is top 10 } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for (String str : top.values()) { context.write(NullWritable.get(), new Text(str)); } } }
package topN_hadoop1; import java.io.IOException; import java.util.SortedMap; import java.util.TreeMap; import java.util.List; import java.util.ArrayList; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class TopNReducer extends Reducer<NullWritable, Text, IntWritable, Text> { private int N = 10; // default private SortedMap<Integer, String> top = new TreeMap<Integer, String>(); @Override public void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { //value: 2000,cat2000,cat2000,2000 String valueAsString = value.toString().trim(); System.out.println(value); String[] tokens = valueAsString.split(","); String url = tokens[1] + "," + tokens[2];//,cat2000,cat2000 int frequency = Integer.parseInt(tokens[0]);//2000 top.put(frequency, url); if (top.size() > N) { top.remove(top.firstKey()); } } // emit final top N List<Integer> keys = new ArrayList<Integer>(top.keySet()); for(int i=keys.size()-1; i>=0; i--){ context.write(new IntWritable(keys.get(i)), new Text(top.get(keys.get(i)))); } } @Override protected void setup(Context context) throws IOException, InterruptedException { this.N = context.getConfiguration().getInt("N", 10); // default is top 10 } }
package topN_hadoop1; import org.apache.log4j.Logger; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class TopNDriver extends Configured implements Tool { private static Logger THE_LOGGER = Logger.getLogger(TopNDriver.class); public int run(String[] args) throws Exception { Job job = new Job(getConf()); int N = Integer.parseInt(args[0]); // top N job.getConfiguration().setInt("N", N); job.setJobName("TopNDriver"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(TopNMapper.class); job.setReducerClass(TopNReducer.class); job.setNumReduceTasks(1); // map()'s output (K,V) job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); // reduce()'s output (K,V) job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); // args[1] = input directory // args[2] = output directory FileInputFormat.setInputPaths(job, new Path(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[2])); boolean status = job.waitForCompletion(true); THE_LOGGER.info("run(): status="+status); return status ? 0 : 1; } private static final String INPATH = "input/cat.txt";// 输入文件路径 private static final String OUTPATH = "output/cat_out1";// 输出文件路径 public static void main(String[] args) throws Exception { args = new String[3]; args[0] = "10"; args[1] = INPATH; args[2] = OUTPATH; // Make sure there are exactly 3 parameters if (args.length != 3) { THE_LOGGER.warn("usage TopNDriver <N> <input> <output>"); System.exit(1); } THE_LOGGER.info("N="+args[0]); THE_LOGGER.info("inputDir="+args[1]); THE_LOGGER.info("outputDir="+args[2]); int returnStatus = ToolRunner.run(new TopNDriver(), args); System.exit(returnStatus); } }
四、扩展
1、Top5怎么办?传入另一个参数
2、不求前10个求后10个怎么办?将
if (top.size() > N) { top.remove(top.firstKey()); }改成top.lastKey()
相关文章推荐
- MapReduce/Hadoop的TopN解决方案之键不唯一的情况
- Spark的TopN解决方案(键唯一的情况、键不唯一的情况)
- 求高并发情况下生成唯一订单号解决方案
- 分布式情况下生成数据库唯一ID的解决方案
- 分布式情况下生成数据库唯一ID的解决方案
- MapReduce/Hadoop的二次排序解决方案
- Hadoop启动异常情况整理与解决方案01(能力工场--小马哥整理)
- Hadoop启动异常情况整理与解决方案01(能力工场--小马哥整理)
- Hadoop启动异常情况解决方案
- Hadoop2.6.0学习笔记(十)SPOF解决方案QJM
- : org.apache.hadoop.hbase.MasterNotRunningException: Retried 7 times解决方案
- 文字内容或标题过长,用 ... 进行省略的TD和DIV两种情况下的三种解决方案
- Hadoop 伪分布式模式 MapReduce 任务不能继续运行 解决方案
- 关于内层DIV设置margin-top不起作用的解决方案
- top查看多核cpu的使用情况
- Hadoop集群datanode磁盘不均衡的解决方案【转】
- 微信小程序处理用户拒绝授权情况及微信登录,登录保存等系列解决方案
- hadoop2.x通过Zookeeper来实现namenode的HA方案以及ResourceManager单点故障的解决方案
- 24点游戏探秘系列2:唯一解的情况
- PHP uniqid()函数可用于生成不重复的唯一标识符,该函数基于微秒级当前时间戳。在高并发或者间隔时长极短(如循环代码)的情况下,会出现大量重复数据。即使使用了第二个参数,也会重复,最好的方案是结