您的位置:首页 > 运维架构

Pig、Hive、MapReduce 解决分组 Top K 问题

2013-12-09 21:55 555 查看
问题:

有如下数据文件 city.txt (id, city, value)

cat city.txt

1 wh 500

2 bj 600

3 wh 100

4 sh 400

5 wh 200

6 bj 100

7 sh 200

8 bj 300

9 sh 900

需要按 city分组聚合,然后从每组数据中取出前两条value最大的记录。

1、这是实际业务中经常会遇到的 group TopK 问题,下面来看看 pig 如何解决:

1
a
=
load
'/data/city.txt'
using
PigStorage(
'
'
)
as
(id:chararray,
city:chararray, value:
int
);
2
b
=
group
a
by
city;
3
c
= foreach b {c1=
order
a
by
value
desc
;
c2=limit c1 2; generate
group
,c2.value;};
4
d
= stream c through `sed
's/[(){}]//g'
`;
5
dump
d;
结果:

1
(bj,600,300)
2
(sh,900,400)
3
(wh,500,200)
这几行代码其实也实现了mysql中的 group_concat 函数的功能:

1
a
=
load
'/data/city.txt'
using
PigStorage(
'
'
)
as
(id:chararray,
city:chararray, value:
int
);
2
b
=
group
a
by
city;
3
c
= foreach b {c1=
order
a
by
value
desc
;
generate
group
,c1.value;};
4
d
= stream c through `sed
's/[(){}]//g'
`;
5
dump
d;
结果:

1
(bj,600,300,100)
2
(sh,900,400,200)
3
(wh,500,200,100)
2、下面我们再来看看hive如何处理group topk的问题:

本质上HSQL和sql有很多相同的地方,但HSQL目前功能还有很多缺失,至少不如原生态的SQL功能强大,

比起PIG也有些差距,如果SQL中这类分组topk的问题如何解决呢?

1
select
*
from
city
a
where
2
2>(
select
count
(1)
from
city
where
cname=a.cname
and
value>a.value)
3
distribute
by
a.cname
sort
by
a.cname,a.value
desc
;
http://my.oschina.net/leejun2005/blog/78904

但是这种写法在HQL中直接报语法错误了,下面我们只能用hive udf的思路来解决了:

排序city和value,然后对city计数,最后where过滤掉city列计数器大于k的行即可。

好了,上代码:

(1)定义UDF:

01
package
com.example.hive.udf;
02
import
org.apache.hadoop.hive.ql.
exec
.UDF;
03
 
04
public
final
class Rank extends UDF{
05
private
int
counter;
06
private
String last_key;
07
public
int
evaluate(final
String
key
){
08
 
if
( !
key
.equalsIgnoreCase(this.last_key)
){
09
this.counter
= 0;
10
this.last_key
=
key
;
11
 
}
12
 
return
this.counter++;
13
}
14
}
(2)注册jar、建表、导数据,查询:

1
add
jar
Rank.jar;
2
create
temporary
function
rank
as
'com.example.hive.udf.Rank'
;
3
create
table
city(id
int
,cname
string,value
int
)
row format delimited fields terminated
by
'
'
;
4
LOAD
DATA
LOCAL
INPATH
'city.txt'
OVERWRITE
INTO
TABLE
city;
5
select
cname,
value
from
(
6
select
cname,rank(cname)
csum,value
from
(
7
   
select
id,
cname, value
from
city
distribute
by
cname
sort
by
cname,value
desc
8
)a
9
)b
where
csum
< 2;
(3)结果:

1
bj
600
2
bj
300
3
sh
900
4
sh
400
5
wh
500
6
wh
200
可以看到,hive相比pig来说,处理起来稍微复杂了点,但随着hive的日渐完善,以后比pig更简洁也说不定。

REF:hive中分组取前N个值的实现
http://baiyunl.iteye.com/blog/1466343

3、最后我们来看一下原生态的MR:

01
import
java.io.IOException;
02
import
java.util.TreeSet;
03
04
import
org.apache.hadoop.conf.Configuration;
05
import
org.apache.hadoop.fs.Path;
06
import
org.apache.hadoop.io.IntWritable;
07
import
org.apache.hadoop.io.LongWritable;
08
import
org.apache.hadoop.io.Text;
09
import
org.apache.hadoop.mapreduce.Job;
10
import
org.apache.hadoop.mapreduce.Mapper;
11
import
org.apache.hadoop.mapreduce.Reducer;
12
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14
import
org.apache.hadoop.util.GenericOptionsParser;
15
16
public
class
GroupTopK
{
17
//
这个 MR 将会取得每组年龄中 id 最大的前 3 个
18
//
测试数据由脚本生成:http://my.oschina.net/leejun2005/blog/76631
19
public
static
class
GroupTopKMapper
extends
20
  
Mapper<LongWritable,
Text, IntWritable, LongWritable> {
21
   
IntWritable
outKey =
new
IntWritable();
22
   
LongWritable
outValue =
new
LongWritable();
23
   
String[]
valArr =
null
;
24
25
   
public
void
map(LongWritable
key, Text value, Context context)
26
 
throws
IOException,
InterruptedException {
27
  
valArr
= value.toString().split(
"\t"
);
28
  
outKey.set(Integer.parseInt(valArr[
2
]));
//
age int
29
  
outValue.set(Long.parseLong(valArr[
0
]));
//
id long
30
  
context.write(outKey,
outValue);
31
   
}
32
}
33
34
public
static
class
GroupTopKReducer
extends
35
  
Reducer<IntWritable,
LongWritable, IntWritable, LongWritable> {
36
37
   
LongWritable
outValue =
new
LongWritable();
38
39
   
public
void
reduce(IntWritable
key, Iterable<LongWritable> values,
40
 
Context
context)
throws
IOException,
InterruptedException {
41
  
TreeSet<Long>
idTreeSet =
new
TreeSet<Long>();
42
  
for
(LongWritable
val :values){
43
 
idTreeSet.add(val.get());
44
 
if
(idTreeSet.size()
>
3
)
{
45
idTreeSet.remove(idTreeSet.first());
46
 
}
47
  
}
48
  
for
(Long
id :idTreeSet){
49
 
outValue.set(id);
50
 
context.write(key,
outValue);
51
  
}
52
   
}
53
}
54
55
public
static
void
main(String[]
args)
throws
Exception
{
56
   
Configuration
conf =
new
Configuration();
57
   
String[]
otherArgs =
new
GenericOptionsParser(conf,
args)
58
 
.getRemainingArgs();
59
60
   
System.out.println(otherArgs.length);
61
   
System.out.println(otherArgs[
0
]);
62
   
System.out.println(otherArgs[
1
]);
63
64
   
if
(otherArgs.length
!=
3
)
{
65
  
System.err.println(
"Usage:
GroupTopK <in> <out>"
);
66
  
System.exit(
2
);
67
   
}
68
   
Job
job =
new
Job(conf,
"GroupTopK"
);
69
   
job.setJarByClass(GroupTopK.
class
);
70
   
job.setMapperClass(GroupTopKMapper.
class
);
71
   
job.setReducerClass(GroupTopKReducer.
class
);
72
   
job.setNumReduceTasks(
1
);
73
   
job.setOutputKeyClass(IntWritable.
class
);
74
   
job.setOutputValueClass(LongWritable.
class
);
75
   
FileInputFormat.addInputPath(job,
new
Path(otherArgs[
1
]));
76
   
FileOutputFormat.setOutputPath(job,
new
Path(otherArgs[
2
]));
77
   
System.exit(job.waitForCompletion(
true
)
?
0
:
1
);
78
}
79
}
hadoop jar GroupTopK.jar GroupTopK /tmp/decli/record_new.txt /tmp/1
结果:

hadoop fs -cat /tmp/1/part-r-00000

0 12869695

0 12869971

0 12869976

1 12869813

1 12869870

1 12869951

......

数据验证:

awk '$3==0{print $1}' record_new.txt|sort -nr|head -3

12869976

12869971

12869695

可以看到结果没有问题。

注:测试数据由以下脚本生成:

http://my.oschina.net/leejun2005/blog/76631

PS:

如果说hive类似sql的话,那pig就类似plsql存储过程了:程序编写更自由,逻辑能处理的更强大了。

pig中还能直接通过反射调用java的静态类中的方法,这块内容请参考之前的相关pig博文。

附几个HIVE UDAF链接,有兴趣的同学自己看下:

Hive UDAF和UDTF实现group by后获取top值 http://blog.csdn.net/liuzhoulong/article/details/7789183

hive中自定义函数(UDAF)实现多行字符串拼接为一行 http://blog.sina.com.cn/s/blog_6ff05a2c0100tjw4.html

编写Hive UDAF http://www.fuzhijie.me/?p=118

Hive UDAF开发 http://richiehu.blog.51cto.com/2093113/386113
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: