您的位置:首页 > 其它

开发HIVE的UDTF自定义函数

2015-10-13 11:05 465 查看
[Author]: kwu

UDTF(User-Defined Table-Generating Functions) 用来解决 输入一行输出多行(On-to-many maping) 的需求,开发HIVE的UDTF自定义函数具体步骤如下:

1、继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF,实现initialize, process, close三个方法。

2、UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型)。

3、初始化完成后,会调用process方法,真正的处理过程在process函数中,在process中,每一次forward()调用产生一行;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。

4、最后close()方法调用,对需要清理的方法进行清理。

5、代码实例,实现的功能比较简单,首先按 "\001" 切分,再处理字符串,其中涉及对JSON的处理

[java] view
plaincopy

package com.hexun.udtf;



import java.util.ArrayList;



import net.sf.json.JSON;

import net.sf.json.JSONSerializer;



import org.apache.commons.beanutils.PropertyUtils;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;

import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;

import org.apache.hadoop.hive.ql.metadata.HiveException;

import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;

import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;



public class UDTFDratio extends GenericUDTF {



public void close() throws HiveException {



}



// 返回UDTF的处理行的信息(个数,类型)。

public StructObjectInspector initialize(ObjectInspector[] args)

throws UDFArgumentException {

if (args.length != 1) {

throw new UDFArgumentLengthException(

"ExplodeMap takes only one argument");

}

if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {

throw new UDFArgumentException(

"ExplodeMap takes string as a parameter");

}

ArrayList<String> fieldNames = new ArrayList<String>();

ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();



fieldNames.add("col1");

fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

fieldNames.add("col2");

fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

fieldNames.add("col3");

fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

fieldNames.add("col4");

fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

fieldNames.add("col5");

fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

fieldNames.add("col6");

fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

fieldNames.add("col7");

fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

fieldNames.add("col8");

fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);



return ObjectInspectorFactory.getStandardStructObjectInspector(

fieldNames, fieldOIs);

}



// 对传入的参数进行处理,可以通过forword()方法返回结果

public void process(Object[] args) throws HiveException {

String input = args[0].toString();

String[] splited = input.split("\001");



String[] result = new String[8];



for (int i = 0; i < splited.length; i++) {

if (i == 0) {

String head = splited[i];

String userId = head.substring(0, head.indexOf("_"));

String cookieId = head.substring(head.indexOf("_") + 1);



result[0] = userId;

result[1] = cookieId;

} else {

String json = splited[i];

JSON jo = JSONSerializer.toJSON(json);

Object o = JSONSerializer.toJava(jo);



try{

String sex = PropertyUtils.getProperty(o, "sex").toString();

result[2] = sex;



String age = PropertyUtils.getProperty(o, "age").toString();

result[3] = age;



String ppt = PropertyUtils.getProperty(o, "ppt").toString();

result[4] = ppt;



String degree = PropertyUtils.getProperty(o, "degree").toString();

result[5] = degree;



String favor = PropertyUtils.getProperty(o, "favor").toString();

result[6] = favor;



String commercial = PropertyUtils.getProperty(o, "commercial").toString();

result[7] = commercial;



}catch(Exception e){

e.printStackTrace();

}

}

}



forward(result);

}



}

示例代码涉及的JAR包



6、hive命令行操作,引入UDTF前,需要先加入JSON的依赖包

[plain] view
plaincopy

add jar /opt/softwares/lib/commons-beanutils-1.7.0.jar;

add jar /opt/softwares/lib/commons-collections-3.2.jar;

add jar /opt/softwares/lib/commons-lang-2.4.jar;

add jar /opt/softwares/lib/commons-logging-1.1.3.jar;

add jar /opt/softwares/lib/ezmorph-1.0.3.jar;

add jar /opt/softwares/lib/json-lib-2.2.3-jdk15.jar;

add jar /opt/softwares/UDF.jar;

create temporary function explode_map3 as 'com.hexun.udtf.UDTFDratio';



insert into table stage.dratio PARTITION (day='${yesterday}') select explode_map3(datadratio) as (col1,col2,col3,col4,col5,col6,col7,col8) from stage.dratio_tmp;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: