您的位置:首页 > 运维架构

Hive可扩展接口UDF

2016-06-20 10:30 375 查看

Hive 可扩展接口学习笔记

Hive提供灵活的接口,以使用户能更灵活地处理数据。可扩展接口分为三种:UDF、UDTF、和UDAF

UDF

UDF是最常用到的接口,用来处理字段并返回一个单一的值。

接口类型:

UDF提供两种接口:

简单API
org.apache.hadoop.hive.ql.exec.UDF


UDF接口可以用来读取并返回初级类型。这里说的初级类型指的是hadoop和hive的可写类型:Text, IntWritable, LongWritable,

DoubleWritable等。

复杂API
org.apache.hadoop.hive.ql.udf.generic.GenericUDF


GenericUDF接口可以用来处理内嵌的数据结构,像Map, List和Set.

使用举例(简单API):

某字段由多组数据组成,数据间由逗号分隔,此UDF返回此字段内所有数据之和

package com.dokia.hive.MyUDF;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class SumComma extends UDF{
public Text evaluate(final Text s) {
if (s == null) return null;
String[] nums = s.toString().split(",");
long sum = 0;
for (String num : nums) {
try {
long data = Long.parseLong(num);
sum += data;
} catch (NumberFormatException ex) {}
}
return new Text(String.valueOf(sum));
}
}


将此类打包成SumComma.jar,并加载到hive中,有三种方法:

临时添加UDF,回话结束后函数自动销毁,每次新回话需要add jar并且create temporary function

> add jar SumComma.jar;
> create temporary function sum_comma as 'com.dokia.hive.MyUDF.SumComma';


然后就可以在sql中使用函数sum_comma了。

也可以将上述命令通过文件导入hive,获得的同样是临时函数

$ hive -i hive_init;


自定义UDF注册为hive内置函数,比较危险,而且需要编译hive,不建议

参考资料: hive利器 自定义UDF+重编译hive

UDF中需要注意字段为null的情况,因为在数据库中null字段还是很常见的。

使用举例(复杂API):

GenericUDF接口的继承需要实现三个方法:

// 处理字段的方法
abstract Object evaluate evaluate(GenericUDF.DeferredObject[] arguments);
// 返回一个字符串,表示函数名称
abstract String getDisplayString(String[] children);
// 在处理字段之前检测字段的类型
abstract ObjectInspector initialize(ObjectInspector[] arguments);


Maven依赖

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>


UDTF

UDTF可以输出多行多列

使用UDTF需要实现接口
org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
, 并实现其三个方法:

// 判别输入输出格式
abstract StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException;
// 处理输入数据,并输出处理后的结构
abstract void process(Object[] record) throws HiveException;
// 通知UDTF数据已经处理完
abstruct void close() throws HiveException;


Hive sql使用接口:

# 从people表格中获取name, 并将其拆分为name和surname两部分输出
SELECT
adTable.name,
adTable.surname
FROM people
lateral view process_names(name) adTable as name, surname;


UDAF

UDAF可以一次处理一整个列的数据,并进行聚合操作.(类似于sum()和count())

使用UDAF需要实现两个接口:

org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator


前者检查输入参数,确定需要使用的reslover的类型,后者是处理数据逻辑的主要部分,需要实现以下几个方法:

// 确定输入输出数据的类型
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException;
// 存储数据处理结果 (中间结果和最终结果)
abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;
// 重新设置聚合buffer
public void reset(AggregationBuffer agg) throws HiveException;
// 从输入表格中读数据 a typical Mapper
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;
// 处理部分数据
public Object terminalPartial(AggregationBuffer agg) throws HiveException;
// 把部分聚合结果相加
public void merge(AggregationBuffer agg, Object partial) throws HiveException;
// 输出最终结果 the Reducer
public Object terminate(AggregationBuffer agg) throws HiveException;


参考文档:

Apache Hive Customization Tutorial Series

Hive Extension Examples
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  UDF hadoop hive