Hive+UDTF简单示例
2014-05-18 20:58
337 查看
转自http://blog.csdn.net/wangzhun0129/article/details/16988381
UDTF(User-Defined Table-Generating Functions) 用来解决输入一行输出多行(On-to-many maping) 的需求。
我们来看一个简单的例子。
现在有一个名为studentScore.txt的文本,里面的内容如下:
A 90
A 80
A 70
A 50
B 60
B 90
B 95
B 80
我们 要统计 A和B的总分。
1)将studentScore.txt上传至hdfs.
bin/hadoop fs -put /home/wangzhun/tmp/studentScore.txt wz
2) 在hive里面,创建studentScore表,并导入 数据
create table studentScore(name string,score int) ROW FORMAT DELIMITED Fields TERMINATED BY '\t';
load data inpath '/user/root/wz/studentScore.txt' overwrite into table studentScore;
3)编写UDTF。
[java] view
plaincopy
package com.wz.udf;
import java.util.ArrayList;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
public class staticScore extends GenericUDTF{
Integer nTotalScore = Integer.valueOf(0); //总分
Object forwardObj[] = new Object[1];
String strStudent; //学生姓名
@Override
public void close() throws HiveException {
//输出最后一个学生的总分
forwardObj[0]=(strStudent+":"+String.valueOf(nTotalScore));
forward(forwardObj);
}
@Override
public StructObjectInspector initialize(ObjectInspector[] args)throws UDFArgumentException {
strStudent="";
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("col1");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
}
@Override
public void process(Object[] args) throws HiveException {
if(!strStudent.isEmpty() && !strStudent.equals(args[0].toString()))
{
//当学生名字变化时,输出该学生的总分
String[] newRes = new String[1];
newRes[0]=(strStudent+":"+String.valueOf(nTotalScore));
forward(newRes);
nTotalScore=0;
}
strStudent=args[0].toString();
nTotalScore+=Integer.parseInt(args[1].toString());
}
}
4)打包编译成jar包.
javac -classpath /home/wangzhun/hadoop/hadoop-0.20.2/hadoop-0.20.2-core.jar:/home/wangzhun/hive/hive-0.8.1/lib/hive-exec-0.8.1.jar staticScore.java
jar -cvf staticScore.jar com/wz/udf/staticScore.class
5) 在hive里面添加jar包,创建临时函数,并执行得到结果。
add jar /home/wangzhun/hive/hive-0.8.1/lib/staticScore.jar;
create temporary function statics as 'com.wz.udf.staticScore';
[plain] view
plaincopy
hive> select statics(studentScore.name,studentScore.score) as col1 from studentSselect statics(studentScore.name,studentScore.score) mytable as col1 from studencreate temporary function statics as 'com.wz.udf.staticScore';
OK
Time taken: 0.213 seconds
hive> select statics(studentScore.name,studentScore.score) as col1 from studentScore;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_201311282251_0001, Tracking URL = http://localhost:50030/jobdetails.jsp?jobid=job_201311282251_0001
Kill Command = /home/wangzhun/hadoop/hadoop-0.20.2/bin/../bin/hadoop job -Dmapred.job.tracker=localhost:9001 -kill job_201311282251_0001
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2013-11-28 23:02:24,612 Stage-1 map = 0%, reduce = 0%
2013-11-28 23:02:30,689 Stage-1 map = 100%, reduce = 0%
2013-11-28 23:02:33,732 Stage-1 map = 100%, reduce = 100%
Ended Job = job_201311282251_0001
MapReduce Jobs Launched:
Job 0: Map: 1 HDFS Read: 40 HDFS Write: 12 SUCESS
Total MapReduce CPU Time Spent: 0 msec
OK
A:290
B:325
Time taken: 34.356 seconds
UDTF(User-Defined Table-Generating Functions) 用来解决输入一行输出多行(On-to-many maping) 的需求。
我们来看一个简单的例子。
现在有一个名为studentScore.txt的文本,里面的内容如下:
A 90
A 80
A 70
A 50
B 60
B 90
B 95
B 80
我们 要统计 A和B的总分。
1)将studentScore.txt上传至hdfs.
bin/hadoop fs -put /home/wangzhun/tmp/studentScore.txt wz
2) 在hive里面,创建studentScore表,并导入 数据
create table studentScore(name string,score int) ROW FORMAT DELIMITED Fields TERMINATED BY '\t';
load data inpath '/user/root/wz/studentScore.txt' overwrite into table studentScore;
3)编写UDTF。
[java] view
plaincopy
package com.wz.udf;
import java.util.ArrayList;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
public class staticScore extends GenericUDTF{
Integer nTotalScore = Integer.valueOf(0); //总分
Object forwardObj[] = new Object[1];
String strStudent; //学生姓名
@Override
public void close() throws HiveException {
//输出最后一个学生的总分
forwardObj[0]=(strStudent+":"+String.valueOf(nTotalScore));
forward(forwardObj);
}
@Override
public StructObjectInspector initialize(ObjectInspector[] args)throws UDFArgumentException {
strStudent="";
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("col1");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
}
@Override
public void process(Object[] args) throws HiveException {
if(!strStudent.isEmpty() && !strStudent.equals(args[0].toString()))
{
//当学生名字变化时,输出该学生的总分
String[] newRes = new String[1];
newRes[0]=(strStudent+":"+String.valueOf(nTotalScore));
forward(newRes);
nTotalScore=0;
}
strStudent=args[0].toString();
nTotalScore+=Integer.parseInt(args[1].toString());
}
}
4)打包编译成jar包.
javac -classpath /home/wangzhun/hadoop/hadoop-0.20.2/hadoop-0.20.2-core.jar:/home/wangzhun/hive/hive-0.8.1/lib/hive-exec-0.8.1.jar staticScore.java
jar -cvf staticScore.jar com/wz/udf/staticScore.class
5) 在hive里面添加jar包,创建临时函数,并执行得到结果。
add jar /home/wangzhun/hive/hive-0.8.1/lib/staticScore.jar;
create temporary function statics as 'com.wz.udf.staticScore';
[plain] view
plaincopy
hive> select statics(studentScore.name,studentScore.score) as col1 from studentSselect statics(studentScore.name,studentScore.score) mytable as col1 from studencreate temporary function statics as 'com.wz.udf.staticScore';
OK
Time taken: 0.213 seconds
hive> select statics(studentScore.name,studentScore.score) as col1 from studentScore;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_201311282251_0001, Tracking URL = http://localhost:50030/jobdetails.jsp?jobid=job_201311282251_0001
Kill Command = /home/wangzhun/hadoop/hadoop-0.20.2/bin/../bin/hadoop job -Dmapred.job.tracker=localhost:9001 -kill job_201311282251_0001
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2013-11-28 23:02:24,612 Stage-1 map = 0%, reduce = 0%
2013-11-28 23:02:30,689 Stage-1 map = 100%, reduce = 0%
2013-11-28 23:02:33,732 Stage-1 map = 100%, reduce = 100%
Ended Job = job_201311282251_0001
MapReduce Jobs Launched:
Job 0: Map: 1 HDFS Read: 40 HDFS Write: 12 SUCESS
Total MapReduce CPU Time Spent: 0 msec
OK
A:290
B:325
Time taken: 34.356 seconds
相关文章推荐
- Hive+UDTF简单示例
- Hive+UDTF简单示例
- Hive+UDTF简单示例
- Hive+UDAF简单示例
- Hive+UDAF简单示例
- ADODC控件简单应用示例
- 对于构造方法的简单代码示例
- 简单的CodeDOM示例
- 构建WebService常用特性[含简单示例程序]
- 最简单的支持中文的示例
- 用JDOM包实现生成XML文件的简单示例
- Java 5.0 新增 Autoboxing & Unboxing 功能的简单示例
- 异步Socket通信的一个简单示例
- Java RMI 简单示例
- Tomcat4/5连接池的设置及简单应用示例
- DHTML的简单示例,动态改变页面
- Tomcat4/5连接池的设置及简单应用示例
- XML 简单接口 (SAX2)用Visual Basic 实现的示例
- [ASP]小偷程序原理和简单示例
- Tomcat4/5连接池的设置及简单应用示例