您的位置:首页 > 其它

hive中使用自定义函数(UDF)实现分析函数row_number的功能

2013-01-26 00:30 711 查看
之前部门实现row_number是使用的transform,我觉得用UDF实现后,平时的使用会更方便,免去了transform相对繁琐的语法。
用到的测试表为:

hive> desc row_number_test;

OK

id1 int

id2 string

age int

score double

name string

hive> select * from row_number_test;

OK

2 t04 25 60.0 youlia

1 t01 20 85.0 liujiannan

1 t02 24 70.0 zengqiu

2 t03 30 88.0 hongqu

2 t03 27 70.0 yongqi

1 t02 19 75.0 wangdong

1 t02 24 70.0 zengqiu

使用时要先在子查询中进行分区与排序,比如oracle中这样一句SQL:

select row_number() over (partition by id1 order by age desc)from row_number_test;

转换为hive语句应该是:

select row_number(id1) from --partition by的字段传到row_number函数中去

(select *from row_number_test distribute by id1 sort by id1,age desc) a;

如果partition by 两个字段:

select row_number() over (partition by id1,id2 orderby score) from row_number_test;

转换为hive语句应该是:

select row_number(id1,id2) --partition by的字段传到row_number函数中去

from(select * from row_number_test distribute by id1,id2 sort byid1,id2,score) a;

展示一下查询结果:

1.
select id1,id2,age,score,name,row_number(id1) rn from (select *from row_number_test distribute by id1 sort by id1,age desc) a;


OK

2 t03 30 88.0 hongqu 1

2 t03 27 70.0 yongqi 2

2 t04 25 60.0 youlia 3

1 t02 24 70.0 zengqiu 1

1 t02 24 70.0 zengqiu 2

1 t01 20 85.0 liujiannan 3

1 t02 19 75.0 wangdong 4

2.
select id1,id2,age,score,name,row_number(id1,id2) rn from(select * from row_number_test distribute by id1,id2 sortby id1,id2,score) a;


OK

2 t04 25 60.0 youlia 1

1 t02 24 70.0 zengqiu 1

2 t03 27 70.0 yongqi 1

1 t02 24 70.0 zengqiu 2

1 t02 19 75.0 wangdong 3

1 t01 20 85.0 liujiannan 1

2 t03 30 88.0 hongqu 2

下面是代码,只实现了接收1个参数和2个参数的evaluator方法,参数再多的照搬代码就可以了,代码仅供参考:

package org.rowincrement;

import java.util.ArrayList;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

public class RowIncrement extends GenericUDTF {

Object[] result = new Object[1];

@Override
public void close() throws HiveException {
}

@Override
public StructObjectInspector initialize(ObjectInspector[] args)
throws UDFArgumentException {
if (args.length != 1) {
throw new UDFArgumentLengthException("RowIncrement takes only one argument");
}
if (!args[0].getTypeName().equals("int")) {
throw new UDFArgumentException("RowIncrement only takes an integer as a parameter");
}
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("col1");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);

return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
}

@Override
public void process(Object[] args) throws HiveException {
try
{
int n = Integer.parseInt(args[0].toString());
for (int i=0;i<n;i++)
{
result[0] = i+1;
forward(result);
}
}
catch (Exception e) {
throw new HiveException("RowIncrement has an exception");
}
}
public static void main(String args[])

{

Row_number t = new Row_number();

System.out.println(t.evaluate(123));

System.out.println(t.evaluate(123));

System.out.println(t.evaluate(123));

System.out.println(t.evaluate(1234));

System.out.println(t.evaluate(1234));

System.out.println(t.evaluate(1234));

System.out.println(t.evaluate(1235));

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: