您的位置:首页 > 其它

Hive 查询优化

2015-09-18 15:57 183 查看
hive.groupby.skewindata

  用于数据存在倾斜的情况,比如:

select ip, count(*) from access_log group by ip


如果某些ip地址对应的访问量非常大,相应的 reducer 执行就会很慢。可以通过将该变量设置为 true 来解决这个问题。当 hive.groupby.skewindata=true 的时候,生成的查询计划会有两个MR任务,第一个任务中 Map 生成的结果会随机的分配到 reducer 上面,每个 reducer 做局部聚合操作,并且输出结果。第二个MR任务在进行一次聚合操作。

map side join

map side join 在某些情况下可以大幅提升查询效率,map side join 有两种实现方式

/*+ MAPJOIN(aliasname), MAPJOIN(anothertable) */ 在这种方式中,这一段注释风格的代码必须放在 SELECT 后面,比如:SELECT /*+ MAPJOIN(c) */ * FROM orders o JOIN cities c ON (o.city_id = c.id);

通过 hive 选项来控制,set hive.auto.convert.join=true,hive 会自动对小于 hive.mapjoin.smalltable.filesize 的表做 map join。PS 在0.11.0版本点hive中,hive.auto.convert.join默认为true,早期的版本中,默认为 false

left semi join

只留下左表中存join key在右表中存在的记录。

select * from a left semi join b on (a.key = b.key)


当 a 表中当 key 在 b 表中存在的时候,输出 a 表中的记录,否则不输出。要注意跟 left outer join 的区别。

order by 和 sort by

order by 是全局排序,要在一个 reducer 完成, sort by 是局部排序,只在单个 reducer 上作,通常和 distribute by 配合使用,distribute by key 会把 key 相同的记录放入分发给相同的reducer。cluster by key 等价于 distribute by key sort by key,但是这里的排序只能是倒序,不能只等 asc 或者 desc。

cluster by 和 distribute by

distribute by : 根据distribute by 后面指定的字段来分配reducer,字段相同的会被分配到相同的reducer

cluster by : distribute by + sort by

distinct 功能的实现

如果只有一个 distinct 语句,比如:

select ip, count(distinct userid) from logs group by ip


在 map 阶段使用 ip 和 userid 作为组合 key,该组合 key 出现的数量作为 value,同时使用 ip 这个字段作为 partition key;在 reduce 阶段,由于 ip 和 userid 的组合 key 已经排序,所以可以根据 LastIP 来判断 ip 地址某一个 ip 地址是否结束,从而统计出 userid 的数量,同时避免了把所有 userid 加载到内存。

 如果有多个 distinct 语句,比如:

select ip, count(distinct userid), count(distinct cookie) from logs group by ip


如果继续使用上面提到的方法, 则需要多次排序,因为不可能同时按照 ip + userid 和 ip + cookie 这两个组合来排序,所以也就不可能通过 LastIP 来判断某个 ip 地址的访问是否已经结束。

此时,可以通过增加虚拟列的方式,对于 logs 表中的每一行,map 产生两条记录,map key 分别是 ip + 0 + userid, ip + 1 + cookie,partition key 是 ip;在 reduce 阶段,由于已经按照map key 排序,所以此时仍然可以按照 LastIP 的方式分别统计对应于该 ip 地址的 userid 和 cookie 的数量。

动态分区

在 hive 中,如果需要根据输入的 key, 把结果写入到不同的目录(分区)中,需要用到动态分区。使用动态分区,先要做设置。

使用动态分区的时候要注意,动态分区是不指定值的(静态分区要指定),在 INSERT ... SELECT ... 这种类型的查询中,所有动态分区的值需要在 SELECT 语句中给出,而且是在末尾按照顺序给出,这里又要分动态分区和静态分区可能同时出现的情况,

当只有动态分区的时候:

INSERT OVERWRITE TABLE T PARTITION (ds, hr)
SELECT key, value, ds, hr FROM srcpart WHERE ds is not null and hr>10;


即有动态分区又有静态分区的情况:

INSERT OVERWRITE TABLE T PARTITION (ds='2010-03-03', hr)
SELECT key, value, /*ds,*/ hr FROM srcpart WHERE ds is not null and hr>10;


注意动态分区和静态分区的顺序不能更改(静态分区在前面):

-- throw an exception
INSERT OVERWRITE TABLE T PARTITION (ds, hr = 11)
SELECT key, value, ds/*, hr*/ FROM srcpart WHERE ds is not null and hr=11;


set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;

set hive.exec.dynamic.partitions.pernode=50000;
set hive.exec.dynamic.partitions.partitions=50000;
set hive.exec.max.created.files=500000;
set mapred.reduce.tasks =20000;
set hive.merge.mapfiles=true;

drop table itemset;
create table itemset(
auctions string)
partitioned by (category_id string)
row format delimited
fields terminated by '\t'
stored as textfile
location '/data/itemset';

insert overwrite table itemset partition(category_id)
select auctions, category_id from source_table distribute by category_id;


其中

hive.exec.dynamic.partitions.pernode 用来设置每个节点上面可以生成的分区数量

hive.exec.dynamic.partitions.partitions 用来设置动态分区的最大数量

hive.exec.max.created.files 能够创建的最多的文件数

UDF

hive udf 可以用 java 实现,也可以用 python,如果使用 java,需要继承特定的几个类,
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: