Pig、Hive、MapReduce 解决分组 Top K 问题
2013-12-09 21:55
555 查看
问题:
有如下数据文件 city.txt (id, city, value)
cat city.txt
1 wh 500
2 bj 600
3 wh 100
4 sh 400
5 wh 200
6 bj 100
7 sh 200
8 bj 300
9 sh 900
需要按 city分组聚合,然后从每组数据中取出前两条value最大的记录。
1、这是实际业务中经常会遇到的 group TopK 问题,下面来看看 pig 如何解决:
结果:
这几行代码其实也实现了mysql中的 group_concat 函数的功能:
结果:
2、下面我们再来看看hive如何处理group topk的问题:
本质上HSQL和sql有很多相同的地方,但HSQL目前功能还有很多缺失,至少不如原生态的SQL功能强大,
比起PIG也有些差距,如果SQL中这类分组topk的问题如何解决呢?
http://my.oschina.net/leejun2005/blog/78904
但是这种写法在HQL中直接报语法错误了,下面我们只能用hive udf的思路来解决了:
排序city和value,然后对city计数,最后where过滤掉city列计数器大于k的行即可。
好了,上代码:
(1)定义UDF:
(2)注册jar、建表、导数据,查询:
(3)结果:
可以看到,hive相比pig来说,处理起来稍微复杂了点,但随着hive的日渐完善,以后比pig更简洁也说不定。
REF:hive中分组取前N个值的实现
http://baiyunl.iteye.com/blog/1466343
3、最后我们来看一下原生态的MR:
hadoop jar GroupTopK.jar GroupTopK /tmp/decli/record_new.txt /tmp/1
结果:
hadoop fs -cat /tmp/1/part-r-00000
0 12869695
0 12869971
0 12869976
1 12869813
1 12869870
1 12869951
......
数据验证:
awk '$3==0{print $1}' record_new.txt|sort -nr|head -3
12869976
12869971
12869695
可以看到结果没有问题。
注:测试数据由以下脚本生成:
http://my.oschina.net/leejun2005/blog/76631
PS:
如果说hive类似sql的话,那pig就类似plsql存储过程了:程序编写更自由,逻辑能处理的更强大了。
pig中还能直接通过反射调用java的静态类中的方法,这块内容请参考之前的相关pig博文。
附几个HIVE UDAF链接,有兴趣的同学自己看下:
Hive UDAF和UDTF实现group by后获取top值http://blog.csdn.net/liuzhoulong/article/details/7789183
hive中自定义函数(UDAF)实现多行字符串拼接为一行http://blog.sina.com.cn/s/blog_6ff05a2c0100tjw4.html
编写Hive UDAFhttp://www.fuzhijie.me/?p=118
Hive UDAF开发http://richiehu.blog.51cto.com/2093113/386113
有如下数据文件 city.txt (id, city, value)
cat city.txt
1 wh 500
2 bj 600
3 wh 100
4 sh 400
5 wh 200
6 bj 100
7 sh 200
8 bj 300
9 sh 900
需要按 city分组聚合,然后从每组数据中取出前两条value最大的记录。
1、这是实际业务中经常会遇到的 group TopK 问题,下面来看看 pig 如何解决:
1 | a = load '/data/city.txt' using PigStorage( ' ' ) as (id:chararray, city:chararray, value: int ); |
2 | b = group a by city; |
3 | c = foreach b {c1= order a by value desc ; c2=limit c1 2; generate group ,c2.value;}; |
4 | d = stream c through `sed 's/[(){}]//g' `; |
5 | dump d; |
1 | (bj,600,300) |
2 | (sh,900,400) |
3 | (wh,500,200) |
1 | a = load '/data/city.txt' using PigStorage( ' ' ) as (id:chararray, city:chararray, value: int ); |
2 | b = group a by city; |
3 | c = foreach b {c1= order a by value desc ; generate group ,c1.value;}; |
4 | d = stream c through `sed 's/[(){}]//g' `; |
5 | dump d; |
1 | (bj,600,300,100) |
2 | (sh,900,400,200) |
3 | (wh,500,200,100) |
本质上HSQL和sql有很多相同的地方,但HSQL目前功能还有很多缺失,至少不如原生态的SQL功能强大,
比起PIG也有些差距,如果SQL中这类分组topk的问题如何解决呢?
1 | select * from city where |
2 | 2>( select count (1) from city where cname=a.cname and value>a.value) |
3 | distribute by a.cname sort by a.cname,a.value desc ; |
但是这种写法在HQL中直接报语法错误了,下面我们只能用hive udf的思路来解决了:
排序city和value,然后对city计数,最后where过滤掉city列计数器大于k的行即可。
好了,上代码:
(1)定义UDF:
01 | package com.example.hive.udf; |
02 | import org.apache.hadoop.hive.ql. exec .UDF; |
03 |
04 | public final class Rank extends UDF{ |
05 | private int counter; |
06 | private String last_key; |
07 | public int evaluate(final String key ){ |
08 | if ( ! key .equalsIgnoreCase(this.last_key) |
09 | this.counter = 0; |
10 | this.last_key = key ; |
11 | } |
12 | return this.counter++; |
13 | } |
14 | } |
1 | add jar Rank.jar; |
2 | create temporary function rank as 'com.example.hive.udf.Rank' ; |
3 | create table city(id int ,cname int ) row format delimited fields terminated by ' ' ; |
4 | LOAD DATA LOCAL INPATH 'city.txt' OVERWRITE INTO TABLE city; |
5 | select cname, from ( |
6 | select cname,rank(cname) from ( |
7 | select id, from city by cname sort by cname,value desc |
8 | )a |
9 | )b where csum < 2; |
1 | bj 600 |
2 | bj 300 |
3 | sh 900 |
4 | sh 400 |
5 | wh 500 |
6 | wh 200 |
REF:hive中分组取前N个值的实现
3、最后我们来看一下原生态的MR:
01 | import java.io.IOException; |
02 | import java.util.TreeSet; |
03 |
04 | import org.apache.hadoop.conf.Configuration; |
05 | import org.apache.hadoop.fs.Path; |
06 | import org.apache.hadoop.io.IntWritable; |
07 | import org.apache.hadoop.io.LongWritable; |
08 | import org.apache.hadoop.io.Text; |
09 | import org.apache.hadoop.mapreduce.Job; |
10 | import org.apache.hadoop.mapreduce.Mapper; |
11 | import org.apache.hadoop.mapreduce.Reducer; |
12 | import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
13 | import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
14 | import org.apache.hadoop.util.GenericOptionsParser; |
15 |
16 | public class GroupTopK { |
17 | // 这个 MR 将会取得每组年龄中 id 最大的前 3 个 |
18 | // 测试数据由脚本生成:http://my.oschina.net/leejun2005/blog/76631 |
19 | public static class GroupTopKMapper extends |
20 | Mapper<LongWritable, Text, IntWritable, LongWritable> { |
21 | IntWritable outKey = new IntWritable(); |
22 | LongWritable outValue = new LongWritable(); |
23 | String[] valArr = null ; |
24 |
25 | public void map(LongWritable key, Text value, Context context) |
26 | throws IOException, InterruptedException { |
27 | valArr = value.toString().split( "\t" ); |
28 | outKey.set(Integer.parseInt(valArr[ 2 ])); // age int |
29 | outValue.set(Long.parseLong(valArr[ 0 ])); // id long |
30 | context.write(outKey, outValue); |
31 | } |
32 | } |
33 |
34 | public static class GroupTopKReducer extends |
35 | Reducer<IntWritable, LongWritable, IntWritable, LongWritable> { |
36 |
37 | LongWritable outValue = new LongWritable(); |
38 |
39 | public void reduce(IntWritable key, Iterable<LongWritable> values, |
40 | Context throws IOException, InterruptedException { |
41 | TreeSet<Long> idTreeSet = new TreeSet<Long>(); |
42 | for (LongWritable |
43 | idTreeSet.add(val.get()); |
44 | if (idTreeSet.size() > 3 ) { |
45 | idTreeSet.remove(idTreeSet.first()); |
46 | } |
47 | } |
48 | for (Long |
49 | outValue.set(id); |
50 | context.write(key, outValue); |
51 | } |
52 | } |
53 | } |
54 |
55 | public static void main(String[] throws Exception { |
56 | Configuration conf = new Configuration(); |
57 | String[] otherArgs = new GenericOptionsParser(conf, args) |
58 | .getRemainingArgs(); |
59 |
60 | System.out.println(otherArgs.length); |
61 | System.out.println(otherArgs[ 0 ]); |
62 | System.out.println(otherArgs[ 1 ]); |
63 |
64 | if (otherArgs.length != 3 ) { |
65 | System.err.println( "Usage: GroupTopK <in> <out>" ); |
66 | System.exit( 2 ); |
67 | } |
68 | Job job = new Job(conf, "GroupTopK" ); |
69 | job.setJarByClass(GroupTopK. class ); |
70 | job.setMapperClass(GroupTopKMapper. class ); |
71 | job.setReducerClass(GroupTopKReducer. class ); |
72 | job.setNumReduceTasks( 1 ); |
73 | job.setOutputKeyClass(IntWritable. class ); |
74 | job.setOutputValueClass(LongWritable. class ); |
75 | FileInputFormat.addInputPath(job, new Path(otherArgs[ 1 ])); |
76 | FileOutputFormat.setOutputPath(job, new Path(otherArgs[ 2 ])); |
77 | System.exit(job.waitForCompletion( true ) ? 0 : 1 ); |
78 | } |
79 | } |
结果:
hadoop fs -cat /tmp/1/part-r-00000
0 12869695
0 12869971
0 12869976
1 12869813
1 12869870
1 12869951
......
数据验证:
awk '$3==0{print $1}' record_new.txt|sort -nr|head -3
12869976
12869971
12869695
可以看到结果没有问题。
注:测试数据由以下脚本生成:
PS:
如果说hive类似sql的话,那pig就类似plsql存储过程了:程序编写更自由,逻辑能处理的更强大了。
pig中还能直接通过反射调用java的静态类中的方法,这块内容请参考之前的相关pig博文。
附几个HIVE UDAF链接,有兴趣的同学自己看下:
Hive UDAF和UDTF实现group by后获取top值
hive中自定义函数(UDAF)实现多行字符串拼接为一行
编写Hive UDAF
Hive UDAF开发
相关文章推荐
- Pig、Hive、MapReduce 解决分组 Top K 问题(转)
- Pig、Hive、MapReduce 解决分组 Top K 问题
- Pig、Hive、MapReduce 解决分组 Top K 问题
- Pig、Hive、MapReduce 解决分组 Top K 问题
- Pig、Hive、MapReduce解决分组Top K问题(组内排序)
- Pig、Hive、Map Reduce 解决分组 Top K 问题
- Pig、Hive 解决分组 TopK 问题
- mapreduce在hive中执行成功,在beeline中执行失败的问题解决
- hadoop mapreduce 解决 top K问题
- hadoop mapreduce 解决 top K问题
- Hive中跑MapReduce Job出现OOM问题分析及解决
- 解决集群Hive表注释为?的编码问题
- SQL分组求每组最大值问题的解决方法收集 (转载)
- Hive:解决Hive创建文件数过多的问题
- 堆排序解决 top k 问题
- 记Hadoop2.5.0线上mapreduce任务执行map任务划分的一次问题解决
- 使用Hive-JDBC遇到的一些问题解决
- 【转】 hive安装配置及遇到的问题解决
- spark 从HIVE读数据导入hbase中发生空指针(java.lang.NullPointerException)问题的解决
- 两个Hive无法启动问题的解决