您的位置:首页 > 其它

Hive+UDTF简单示例

2014-05-18 20:58 337 查看
转自http://blog.csdn.net/wangzhun0129/article/details/16988381

UDTF(User-Defined Table-Generating Functions) 用来解决输入一行输出多行(On-to-many maping) 的需求

我们来看一个简单的例子。

现在有一个名为studentScore.txt的文本,里面的内容如下:

A 90

A 80

A 70

A 50

B 60

B 90

B 95

B 80

我们 要统计 A和B的总分。

1)将studentScore.txt上传至hdfs.

bin/hadoop fs -put /home/wangzhun/tmp/studentScore.txt wz

2) 在hive里面,创建studentScore表,并导入 数据

create table studentScore(name string,score int) ROW FORMAT DELIMITED Fields TERMINATED BY '\t';

load data inpath '/user/root/wz/studentScore.txt' overwrite into table studentScore;

3)编写UDTF。

[java] view
plaincopy





package com.wz.udf;

import java.util.ArrayList;

import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;

import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;

import org.apache.hadoop.hive.ql.metadata.HiveException;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;

import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

public class staticScore extends GenericUDTF{

Integer nTotalScore = Integer.valueOf(0); //总分

Object forwardObj[] = new Object[1];

String strStudent; //学生姓名

@Override

public void close() throws HiveException {

//输出最后一个学生的总分

forwardObj[0]=(strStudent+":"+String.valueOf(nTotalScore));

forward(forwardObj);

}

@Override

public StructObjectInspector initialize(ObjectInspector[] args)throws UDFArgumentException {

strStudent="";

ArrayList<String> fieldNames = new ArrayList<String>();

ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

fieldNames.add("col1");

fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);

}

@Override

public void process(Object[] args) throws HiveException {

if(!strStudent.isEmpty() && !strStudent.equals(args[0].toString()))

{

//当学生名字变化时,输出该学生的总分

String[] newRes = new String[1];

newRes[0]=(strStudent+":"+String.valueOf(nTotalScore));

forward(newRes);

nTotalScore=0;

}

strStudent=args[0].toString();

nTotalScore+=Integer.parseInt(args[1].toString());

}

}

4)打包编译成jar包.

javac -classpath /home/wangzhun/hadoop/hadoop-0.20.2/hadoop-0.20.2-core.jar:/home/wangzhun/hive/hive-0.8.1/lib/hive-exec-0.8.1.jar staticScore.java

jar -cvf staticScore.jar com/wz/udf/staticScore.class

5) 在hive里面添加jar包,创建临时函数,并执行得到结果。

add jar /home/wangzhun/hive/hive-0.8.1/lib/staticScore.jar;

create temporary function statics as 'com.wz.udf.staticScore';

[plain] view
plaincopy





hive> select statics(studentScore.name,studentScore.score) as col1 from studentSselect statics(studentScore.name,studentScore.score) mytable as col1 from studencreate temporary function statics as 'com.wz.udf.staticScore';

OK

Time taken: 0.213 seconds

hive> select statics(studentScore.name,studentScore.score) as col1 from studentScore;

Total MapReduce jobs = 1

Launching Job 1 out of 1

Number of reduce tasks is set to 0 since there's no reduce operator

Starting Job = job_201311282251_0001, Tracking URL = http://localhost:50030/jobdetails.jsp?jobid=job_201311282251_0001
Kill Command = /home/wangzhun/hadoop/hadoop-0.20.2/bin/../bin/hadoop job -Dmapred.job.tracker=localhost:9001 -kill job_201311282251_0001

Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0

2013-11-28 23:02:24,612 Stage-1 map = 0%, reduce = 0%

2013-11-28 23:02:30,689 Stage-1 map = 100%, reduce = 0%

2013-11-28 23:02:33,732 Stage-1 map = 100%, reduce = 100%

Ended Job = job_201311282251_0001

MapReduce Jobs Launched:

Job 0: Map: 1 HDFS Read: 40 HDFS Write: 12 SUCESS

Total MapReduce CPU Time Spent: 0 msec

OK

A:290

B:325

Time taken: 34.356 seconds
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: