Hive 简单UDAF开发(extends UDAF)
2013-11-22 15:53
218 查看
Hive UDAF(User- Defined Aggregation Funcation)用户自定义聚合函数是一个很好的功能,集成了先进的数据处理。Hive有两种UDAF:简单和通用。顾名思义,简单的UDAF,写的相当简单的,但因为使用Java反射导致性能损失,而且有些特性不能使用,如可变长度参数列表。通用UDAF可以使用所有功能,但是UDAF就写的比较复杂,不直观。
1、一下两个包是必须的import org.apache.hadoop.hive.ql.exec.UDAF和
org.apache.hadoop.hive.ql.exec.UDAFEvaluator。
2、函数类需要继承UDAF类,内部类Evaluator实UDAFEvaluator接口。
3、Evaluator需要实现
init、iterate、terminatePartial、merge、terminate这几个函数。
a)init函数实现接口UDAFEvaluator的init函数。
b)iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean。
c)terminatePartial无参数,其为iterate函数轮转结束后,返回轮转数据,terminatePartial类似于hadoop的Combiner。
d)merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean。
e)terminate返回最终的聚集函数结果。
代码示例,用于计算商户星级的平均价格的UDAF
http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleAvg.java?view=markup UDAF的使用
同UDF一样,编译好jar包后,在Hive中执行
1、一下两个包是必须的import org.apache.hadoop.hive.ql.exec.UDAF和
org.apache.hadoop.hive.ql.exec.UDAFEvaluator。
2、函数类需要继承UDAF类,内部类Evaluator实UDAFEvaluator接口。
3、Evaluator需要实现
init、iterate、terminatePartial、merge、terminate这几个函数。
a)init函数实现接口UDAFEvaluator的init函数。
b)iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean。
c)terminatePartial无参数,其为iterate函数轮转结束后,返回轮转数据,terminatePartial类似于hadoop的Combiner。
d)merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean。
e)terminate返回最终的聚集函数结果。
代码示例,用于计算商户星级的平均价格的UDAF
package com.dianping.credit.udaf.avgprice; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hive.ql.exec.UDAF; import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; /** * 计算商户的平均价格 * @author xinchun.wang * */ public final class AvgPriceUDAF extends UDAF { /** * The internal state of an aggregation for average. * * Note that this is only needed if the internal state cannot be represented * by a primitive. * * The internal state can also contains fields with types like * ArrayList<String> and HashMap<String,Double> if needed. 初始化点评的平均价格列表 */ public static class UDAFAvgPriceState { private List<Integer> oldPriceList = new ArrayList<Integer>(); private List<Integer> newPriceList = new ArrayList<Integer>(); } /** * The actual class for doing the aggregation. Hive will automatically look * for all internal classes of the UDAF that implements UDAFEvaluator. */ public static class UDAFAvgPriceEvaluator implements UDAFEvaluator { UDAFAvgPriceState state; public UDAFAvgPriceEvaluator() { super(); state = new UDAFAvgPriceState(); init(); } /** * Reset the state of the aggregation. */ public void init() { state.oldPriceList = new ArrayList<Integer>(); state.newPriceList = new ArrayList<Integer>(); } /** * Iterate through one row of original data. * * The number and type of arguments need to the same as we call this * UDAF from Hive command line. * * This function should always return true. */ public boolean iterate(Integer avgPirce, Integer old) { if (avgPirce != null) { if (old == 1) state.oldPriceList.add(avgPirce); else state.newPriceList.add(avgPirce); } return true; } /** * Terminate a partial aggregation and return the state. If the state is * a primitive, just return primitive Java classes like Integer or * String. */ public UDAFAvgPriceState terminatePartial() { // This is SQL standard - average of zero items should be null. return (state.oldPriceList == null && state.newPriceList == null) ? null : state; } /** * Merge with a partial aggregation. * * This function should always have a single argument which has the same * type as the return value of terminatePartial(). * * 合并点评平均价格列表 */ public boolean merge(UDAFAvgPriceState o) { if (o != null) { state.oldPriceList.addAll(o.oldPriceList); state.newPriceList.addAll(o.newPriceList); } return true; } /** * Terminates the aggregation and return the final result. 计算并返回商户平均价格 */ public Integer terminate() { // This is SQL standard - average of zero items should be null. Integer avgPirce = 0; if (state.oldPriceList.size() >= 8 && state.newPriceList.size() >= 12) { avgPirce = (CalcAvgPriceUtil.calcInterquartileMean(state.oldPriceList) * 2 + CalcAvgPriceUtil.calcInterquartileMean(state.newPriceList) * 8) / 10; } else { state.newPriceList.addAll(state.oldPriceList); avgPirce = CalcAvgPriceUtil.calcInterquartileMean(state.newPriceList); } return avgPirce == 0 ? null : avgPirce; } } private AvgPriceUDAF() { // prevent instantiation } }参考文档:
http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleAvg.java?view=markup UDAF的使用
同UDF一样,编译好jar包后,在Hive中执行
add jar /data/deploy/honesty_online/shop_avgprice/avgprice.jar; CREATE TEMPORARY FUNCTION calc_avgprice AS 'com.dianping.credit.udaf.AvgPriceUDAF'; SELECT A.shopid, calc_avgprice(A.avgprice, A.addtime) AS shop_avgprice, '$cal_dt' as hp_statdate FROM dpstg_credit_shop_avgprice_review_list A INNER JOIN dpstg_credit_shop_tuan_avgprice B ON A.shopid = B.shopid WHERE B.no_tuan_review_count >= 5 AND A.tuan_review = 0 GROUP BY A.shopid
相关文章推荐
- Hive 简单UDAF开发(extends UDAF)
- Hive-UDAF开发指南
- hive udaf开发入门和运行过程详解
- hive中UDF和UDAF使用说明(最简单的udf函数)
- Hive+UDAF简单示例
- 【hadoop hive】hive udaf开发入门和运行过程详解
- hive UDAF开发入门和运行过程详解(转)
- Hive+UDAF简单示例
- hive udf&udaf开发
- 最简单的HiveMind开发应用
- Hive UDAF 开发
- hive-udaf开发实例1
- Hive-UDAF开发指南
- hive udaf开发入门和运行过程详解
- Android ndk开发环境下利用c编写的简单平面波传播模型(带控件)
- JAVA开发技术之简单WEB服务开发
- (转)用NET-SNMP软件包开发简单客户端代理
- Delphi组件开发教程指南(2)简单扩充TEdit
- 简单记录围观annotationprocessor,并在eclipse开发自己的ap
- 为什么越简单的技术对于开发人员越难