您的位置:首页 > 其它

Hive 简单UDAF开发(extends UDAF)

2013-12-08 22:29 239 查看
Hive UDAF(User- Defined Aggregation Funcation)用户自定义聚合函数是一个很好的功能,集成了先进的数据处理。Hive有两种UDAF:简单和通用。顾名思义,简单的UDAF,写的相当简单的,但因为使用Java反射导致性能损失,而且有些特性不能使用,如可变长度参数列表。通用UDAF可以使用​​所有功能,但是UDAF就写的比较复杂,不直观。
1、一下两个包是必须的import org.apache.hadoop.hive.ql.exec.UDAF和
org.apache.hadoop.hive.ql.exec.UDAFEvaluator。
2、函数类需要继承UDAF类,内部类Evaluator实UDAFEvaluator接口。
3、Evaluator需要实现
init、iterate、terminatePartial、merge、terminate这几个函数。
a)init函数实现接口UDAFEvaluator的init函数。
b)iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean。
c)terminatePartial无参数,其为iterate函数轮转结束后,返回轮转数据,terminatePartial类似于hadoop的Combiner。
d)merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean。
e)terminate返回最终的聚集函数结果。
代码示例,用于计算商户星级的平均价格的UDAF

[java]
view plaincopy





package com.dianping.credit.udaf.avgprice;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

/**
* 计算商户的平均价格
* @author xinchun.wang
*
*/
public final class AvgPriceUDAF extends UDAF {

/**
* The internal state of an aggregation for average.
*
* Note that this is only needed if the internal state cannot be represented
* by a primitive.
*
* The internal state can also contains fields with types like
* ArrayList<String> and HashMap<String,Double> if needed. 初始化点评的平均价格列表
*/
public static class UDAFAvgPriceState {
private List<Integer> oldPriceList = new ArrayList<Integer>();
private List<Integer> newPriceList = new ArrayList<Integer>();

}

/**
* The actual class for doing the aggregation. Hive will automatically look
* for all internal classes of the UDAF that implements UDAFEvaluator.
*/
public static class UDAFAvgPriceEvaluator implements UDAFEvaluator {

UDAFAvgPriceState state;

public UDAFAvgPriceEvaluator() {
super();
state = new UDAFAvgPriceState();
init();
}

/**
* Reset the state of the aggregation.
*/
public void init() {
state.oldPriceList = new ArrayList<Integer>();
state.newPriceList = new ArrayList<Integer>();
}

/**
* Iterate through one row of original data.
*
* The number and type of arguments need to the same as we call this
* UDAF from Hive command line.
*
* This function should always return true.
*/
public boolean iterate(Integer avgPirce, Integer old) {
if (avgPirce != null) {
if (old == 1)
state.oldPriceList.add(avgPirce);
else
state.newPriceList.add(avgPirce);
}
return true;
}

/**
* Terminate a partial aggregation and return the state. If the state is
* a primitive, just return primitive Java classes like Integer or
* String.
*/
public UDAFAvgPriceState terminatePartial() {
// This is SQL standard - average of zero items should be null.
return (state.oldPriceList == null && state.newPriceList == null) ? null
: state;
}

/**
* Merge with a partial aggregation.
*
* This function should always have a single argument which has the same
* type as the return value of terminatePartial().
*
* 合并点评平均价格列表
*/
public boolean merge(UDAFAvgPriceState o) {
if (o != null) {
state.oldPriceList.addAll(o.oldPriceList);
state.newPriceList.addAll(o.newPriceList);
}
return true;
}

/**
* Terminates the aggregation and return the final result. 计算并返回商户平均价格
*/
public Integer terminate() {
// This is SQL standard - average of zero items should be null.
Integer avgPirce = 0;
if (state.oldPriceList.size() >= 8
&& state.newPriceList.size() >= 12) {
avgPirce = (CalcAvgPriceUtil.calcInterquartileMean(state.oldPriceList) * 2
+ CalcAvgPriceUtil.calcInterquartileMean(state.newPriceList) * 8) / 10;
} else {
state.newPriceList.addAll(state.oldPriceList);
avgPirce = CalcAvgPriceUtil.calcInterquartileMean(state.newPriceList);
}
return avgPirce == 0 ? null : avgPirce;
}
}

private AvgPriceUDAF() {
// prevent instantiation
}

}

参考文档: http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleAvg.java?view=markup UDAF的使用
同UDF一样,编译好jar包后,在Hive中执行

[sql]
view plaincopy





add jar /data/deploy/honesty_online/shop_avgprice/avgprice.jar;

CREATE TEMPORARY FUNCTION calc_avgprice AS 'com.dianping.credit.udaf.AvgPriceUDAF';

SELECT
A.shopid,
calc_avgprice(A.avgprice, A.addtime) AS shop_avgprice,
'$cal_dt' as hp_statdate
FROM dpstg_credit_shop_avgprice_review_list A
INNER JOIN dpstg_credit_shop_tuan_avgprice B ON A.shopid = B.shopid
WHERE B.no_tuan_review_count >= 5 AND A.tuan_review = 0
GROUP BY A.shopid

-----------------------------

Hive进行UDAF开发,相对要比UDF复杂一些,不过也不是很难。

请看一个例子

package org.hrj.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

import org.apache.hadoop.hive.serde2.io.DoubleWritable;

public class UDAFSum_Sample extends NumericUDAF {

public static class Evaluator implements UDAFEvaluator {

private boolean mEmpty;

private double mSum;

public Evaluator() {

super();

init();

}

public void init() {

mSum = 0;

mEmpty = true;

}

public boolean iterate(DoubleWritable o) {

if (o != null) {

mSum += o.get();

mEmpty = false;

}

return true;

}

public DoubleWritable terminatePartial() {

// This is SQL standard - sum of zero items should be null.

return mEmpty ? null : new DoubleWritable(mSum);

}

public boolean merge(DoubleWritable o) {

if (o != null) {

mSum += o.get();

mEmpty = false;

}

return true;

}

public DoubleWritable terminate() {

// This is SQL standard - sum of zero items should be null.

return mEmpty ? null : new DoubleWritable(mSum);

}

}

}

1.将java文件编译成Sum_Sample.jar

2.进入hive

hive> add jar Sum_sample.jar;

hive> create temporary function sum_test as 'com.hrj.hive.udf.UDAFSum_Sample';

hive> select sum_test(t.num) from t;

hive> drop temporary function sum_test;

hive> quit;

关于UDAF开发注意点:

1.需要import org.apache.hadoop.hive.ql.exec.UDAF以及org.apache.hadoop.hive.ql.exec.UDAFEvaluator,这两个包都是必须的

2.函数类需要继承UDAF类,内部类Evaluator实现UDAFEvaluator接口

3.Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数

1)init函数类似于构造函数,用于UDAF的初始化

2)iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean

3)terminatePartial无参数,其为iterate函数轮转结束后,返回乱转数据,iterate和terminatePartial类似于hadoop的Combiner

4)merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean

5)terminate返回最终的聚集函数结果
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: