Hive 简单UDAF开发(extends UDAF)
2013-12-08 22:29
239 查看
Hive UDAF(User- Defined Aggregation Funcation)用户自定义聚合函数是一个很好的功能,集成了先进的数据处理。Hive有两种UDAF:简单和通用。顾名思义,简单的UDAF,写的相当简单的,但因为使用Java反射导致性能损失,而且有些特性不能使用,如可变长度参数列表。通用UDAF可以使用所有功能,但是UDAF就写的比较复杂,不直观。
1、一下两个包是必须的import org.apache.hadoop.hive.ql.exec.UDAF和
org.apache.hadoop.hive.ql.exec.UDAFEvaluator。
2、函数类需要继承UDAF类,内部类Evaluator实UDAFEvaluator接口。
3、Evaluator需要实现
init、iterate、terminatePartial、merge、terminate这几个函数。
a)init函数实现接口UDAFEvaluator的init函数。
b)iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean。
c)terminatePartial无参数,其为iterate函数轮转结束后,返回轮转数据,terminatePartial类似于hadoop的Combiner。
d)merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean。
e)terminate返回最终的聚集函数结果。
代码示例,用于计算商户星级的平均价格的UDAF
[java]
view plaincopy
package com.dianping.credit.udaf.avgprice;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
/**
* 计算商户的平均价格
* @author xinchun.wang
*
*/
public final class AvgPriceUDAF extends UDAF {
/**
* The internal state of an aggregation for average.
*
* Note that this is only needed if the internal state cannot be represented
* by a primitive.
*
* The internal state can also contains fields with types like
* ArrayList<String> and HashMap<String,Double> if needed. 初始化点评的平均价格列表
*/
public static class UDAFAvgPriceState {
private List<Integer> oldPriceList = new ArrayList<Integer>();
private List<Integer> newPriceList = new ArrayList<Integer>();
}
/**
* The actual class for doing the aggregation. Hive will automatically look
* for all internal classes of the UDAF that implements UDAFEvaluator.
*/
public static class UDAFAvgPriceEvaluator implements UDAFEvaluator {
UDAFAvgPriceState state;
public UDAFAvgPriceEvaluator() {
super();
state = new UDAFAvgPriceState();
init();
}
/**
* Reset the state of the aggregation.
*/
public void init() {
state.oldPriceList = new ArrayList<Integer>();
state.newPriceList = new ArrayList<Integer>();
}
/**
* Iterate through one row of original data.
*
* The number and type of arguments need to the same as we call this
* UDAF from Hive command line.
*
* This function should always return true.
*/
public boolean iterate(Integer avgPirce, Integer old) {
if (avgPirce != null) {
if (old == 1)
state.oldPriceList.add(avgPirce);
else
state.newPriceList.add(avgPirce);
}
return true;
}
/**
* Terminate a partial aggregation and return the state. If the state is
* a primitive, just return primitive Java classes like Integer or
* String.
*/
public UDAFAvgPriceState terminatePartial() {
// This is SQL standard - average of zero items should be null.
return (state.oldPriceList == null && state.newPriceList == null) ? null
: state;
}
/**
* Merge with a partial aggregation.
*
* This function should always have a single argument which has the same
* type as the return value of terminatePartial().
*
* 合并点评平均价格列表
*/
public boolean merge(UDAFAvgPriceState o) {
if (o != null) {
state.oldPriceList.addAll(o.oldPriceList);
state.newPriceList.addAll(o.newPriceList);
}
return true;
}
/**
* Terminates the aggregation and return the final result. 计算并返回商户平均价格
*/
public Integer terminate() {
// This is SQL standard - average of zero items should be null.
Integer avgPirce = 0;
if (state.oldPriceList.size() >= 8
&& state.newPriceList.size() >= 12) {
avgPirce = (CalcAvgPriceUtil.calcInterquartileMean(state.oldPriceList) * 2
+ CalcAvgPriceUtil.calcInterquartileMean(state.newPriceList) * 8) / 10;
} else {
state.newPriceList.addAll(state.oldPriceList);
avgPirce = CalcAvgPriceUtil.calcInterquartileMean(state.newPriceList);
}
return avgPirce == 0 ? null : avgPirce;
}
}
private AvgPriceUDAF() {
// prevent instantiation
}
}
参考文档: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleAvg.java?view=markup UDAF的使用
同UDF一样,编译好jar包后,在Hive中执行
[sql]
view plaincopy
add jar /data/deploy/honesty_online/shop_avgprice/avgprice.jar;
CREATE TEMPORARY FUNCTION calc_avgprice AS 'com.dianping.credit.udaf.AvgPriceUDAF';
SELECT
A.shopid,
calc_avgprice(A.avgprice, A.addtime) AS shop_avgprice,
'$cal_dt' as hp_statdate
FROM dpstg_credit_shop_avgprice_review_list A
INNER JOIN dpstg_credit_shop_tuan_avgprice B ON A.shopid = B.shopid
WHERE B.no_tuan_review_count >= 5 AND A.tuan_review = 0
GROUP BY A.shopid
-----------------------------
Hive进行UDAF开发,相对要比UDF复杂一些,不过也不是很难。
请看一个例子
package org.hrj.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
public class UDAFSum_Sample extends NumericUDAF {
public static class Evaluator implements UDAFEvaluator {
private boolean mEmpty;
private double mSum;
public Evaluator() {
super();
init();
}
public void init() {
mSum = 0;
mEmpty = true;
}
public boolean iterate(DoubleWritable o) {
if (o != null) {
mSum += o.get();
mEmpty = false;
}
return true;
}
public DoubleWritable terminatePartial() {
// This is SQL standard - sum of zero items should be null.
return mEmpty ? null : new DoubleWritable(mSum);
}
public boolean merge(DoubleWritable o) {
if (o != null) {
mSum += o.get();
mEmpty = false;
}
return true;
}
public DoubleWritable terminate() {
// This is SQL standard - sum of zero items should be null.
return mEmpty ? null : new DoubleWritable(mSum);
}
}
}
1.将java文件编译成Sum_Sample.jar
2.进入hive
hive> add jar Sum_sample.jar;
hive> create temporary function sum_test as 'com.hrj.hive.udf.UDAFSum_Sample';
hive> select sum_test(t.num) from t;
hive> drop temporary function sum_test;
hive> quit;
关于UDAF开发注意点:
1.需要import org.apache.hadoop.hive.ql.exec.UDAF以及org.apache.hadoop.hive.ql.exec.UDAFEvaluator,这两个包都是必须的
2.函数类需要继承UDAF类,内部类Evaluator实现UDAFEvaluator接口
3.Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数
1)init函数类似于构造函数,用于UDAF的初始化
2)iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean
3)terminatePartial无参数,其为iterate函数轮转结束后,返回乱转数据,iterate和terminatePartial类似于hadoop的Combiner
4)merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean
5)terminate返回最终的聚集函数结果
1、一下两个包是必须的import org.apache.hadoop.hive.ql.exec.UDAF和
org.apache.hadoop.hive.ql.exec.UDAFEvaluator。
2、函数类需要继承UDAF类,内部类Evaluator实UDAFEvaluator接口。
3、Evaluator需要实现
init、iterate、terminatePartial、merge、terminate这几个函数。
a)init函数实现接口UDAFEvaluator的init函数。
b)iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean。
c)terminatePartial无参数,其为iterate函数轮转结束后,返回轮转数据,terminatePartial类似于hadoop的Combiner。
d)merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean。
e)terminate返回最终的聚集函数结果。
代码示例,用于计算商户星级的平均价格的UDAF
[java]
view plaincopy
package com.dianping.credit.udaf.avgprice;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
/**
* 计算商户的平均价格
* @author xinchun.wang
*
*/
public final class AvgPriceUDAF extends UDAF {
/**
* The internal state of an aggregation for average.
*
* Note that this is only needed if the internal state cannot be represented
* by a primitive.
*
* The internal state can also contains fields with types like
* ArrayList<String> and HashMap<String,Double> if needed. 初始化点评的平均价格列表
*/
public static class UDAFAvgPriceState {
private List<Integer> oldPriceList = new ArrayList<Integer>();
private List<Integer> newPriceList = new ArrayList<Integer>();
}
/**
* The actual class for doing the aggregation. Hive will automatically look
* for all internal classes of the UDAF that implements UDAFEvaluator.
*/
public static class UDAFAvgPriceEvaluator implements UDAFEvaluator {
UDAFAvgPriceState state;
public UDAFAvgPriceEvaluator() {
super();
state = new UDAFAvgPriceState();
init();
}
/**
* Reset the state of the aggregation.
*/
public void init() {
state.oldPriceList = new ArrayList<Integer>();
state.newPriceList = new ArrayList<Integer>();
}
/**
* Iterate through one row of original data.
*
* The number and type of arguments need to the same as we call this
* UDAF from Hive command line.
*
* This function should always return true.
*/
public boolean iterate(Integer avgPirce, Integer old) {
if (avgPirce != null) {
if (old == 1)
state.oldPriceList.add(avgPirce);
else
state.newPriceList.add(avgPirce);
}
return true;
}
/**
* Terminate a partial aggregation and return the state. If the state is
* a primitive, just return primitive Java classes like Integer or
* String.
*/
public UDAFAvgPriceState terminatePartial() {
// This is SQL standard - average of zero items should be null.
return (state.oldPriceList == null && state.newPriceList == null) ? null
: state;
}
/**
* Merge with a partial aggregation.
*
* This function should always have a single argument which has the same
* type as the return value of terminatePartial().
*
* 合并点评平均价格列表
*/
public boolean merge(UDAFAvgPriceState o) {
if (o != null) {
state.oldPriceList.addAll(o.oldPriceList);
state.newPriceList.addAll(o.newPriceList);
}
return true;
}
/**
* Terminates the aggregation and return the final result. 计算并返回商户平均价格
*/
public Integer terminate() {
// This is SQL standard - average of zero items should be null.
Integer avgPirce = 0;
if (state.oldPriceList.size() >= 8
&& state.newPriceList.size() >= 12) {
avgPirce = (CalcAvgPriceUtil.calcInterquartileMean(state.oldPriceList) * 2
+ CalcAvgPriceUtil.calcInterquartileMean(state.newPriceList) * 8) / 10;
} else {
state.newPriceList.addAll(state.oldPriceList);
avgPirce = CalcAvgPriceUtil.calcInterquartileMean(state.newPriceList);
}
return avgPirce == 0 ? null : avgPirce;
}
}
private AvgPriceUDAF() {
// prevent instantiation
}
}
参考文档: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleAvg.java?view=markup UDAF的使用
同UDF一样,编译好jar包后,在Hive中执行
[sql]
view plaincopy
add jar /data/deploy/honesty_online/shop_avgprice/avgprice.jar;
CREATE TEMPORARY FUNCTION calc_avgprice AS 'com.dianping.credit.udaf.AvgPriceUDAF';
SELECT
A.shopid,
calc_avgprice(A.avgprice, A.addtime) AS shop_avgprice,
'$cal_dt' as hp_statdate
FROM dpstg_credit_shop_avgprice_review_list A
INNER JOIN dpstg_credit_shop_tuan_avgprice B ON A.shopid = B.shopid
WHERE B.no_tuan_review_count >= 5 AND A.tuan_review = 0
GROUP BY A.shopid
-----------------------------
Hive进行UDAF开发,相对要比UDF复杂一些,不过也不是很难。
请看一个例子
package org.hrj.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
public class UDAFSum_Sample extends NumericUDAF {
public static class Evaluator implements UDAFEvaluator {
private boolean mEmpty;
private double mSum;
public Evaluator() {
super();
init();
}
public void init() {
mSum = 0;
mEmpty = true;
}
public boolean iterate(DoubleWritable o) {
if (o != null) {
mSum += o.get();
mEmpty = false;
}
return true;
}
public DoubleWritable terminatePartial() {
// This is SQL standard - sum of zero items should be null.
return mEmpty ? null : new DoubleWritable(mSum);
}
public boolean merge(DoubleWritable o) {
if (o != null) {
mSum += o.get();
mEmpty = false;
}
return true;
}
public DoubleWritable terminate() {
// This is SQL standard - sum of zero items should be null.
return mEmpty ? null : new DoubleWritable(mSum);
}
}
}
1.将java文件编译成Sum_Sample.jar
2.进入hive
hive> add jar Sum_sample.jar;
hive> create temporary function sum_test as 'com.hrj.hive.udf.UDAFSum_Sample';
hive> select sum_test(t.num) from t;
hive> drop temporary function sum_test;
hive> quit;
关于UDAF开发注意点:
1.需要import org.apache.hadoop.hive.ql.exec.UDAF以及org.apache.hadoop.hive.ql.exec.UDAFEvaluator,这两个包都是必须的
2.函数类需要继承UDAF类,内部类Evaluator实现UDAFEvaluator接口
3.Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数
1)init函数类似于构造函数,用于UDAF的初始化
2)iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean
3)terminatePartial无参数,其为iterate函数轮转结束后,返回乱转数据,iterate和terminatePartial类似于hadoop的Combiner
4)merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean
5)terminate返回最终的聚集函数结果
相关文章推荐
- Hive 简单UDAF开发(extends UDAF)
- hive-udaf开发实例1
- 最简单的HiveMind开发应用
- Hive-UDAF开发指南
- Hive UDAF 开发
- hive udaf开发入门和运行过程详解
- Hive-UDAF开发指南
- 【hadoop hive】hive udaf开发入门和运行过程详解
- hive udaf开发入门和运行过程详解
- hive中UDF和UDAF使用说明(最简单的udf函数)
- Hive+UDAF简单示例
- hive UDAF开发入门和运行过程详解(转)
- Hive+UDAF简单示例
- hive udf&udaf开发
- 简单简陋的中间件开发原理。
- 97条架构建议-简单-开发-决策
- 游戏开发—简单精灵
- django+sae微信开发-简单的鹦鹉学舌功能
- LinuxC/C++编程基础(22) 使用thrift/rpc开发简单实例
- 基于hive进行简单压缩技术测试