Hive UDTF开发指南
2015-12-07 00:21
375 查看
在这篇文章中,我们将深入了解用户定义表函数(UDTF),该函数的实现是通过继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF这个抽象通用类,UDTF相对UDF更为复杂,但是通过它,我们读入一个数据域,输出多行多列,而UDF只能输出单行单列。
下面要创建hive外部表,在hive shell中执行
同样,可能我们也想输出多列,而不是输出单列。
以上所有的要求我们可以用UDTF去完成。
1、姓和名 两个分开的列
2、所有记录都包含姓名
3、每条记录或有包含多个人名(eg Nick and Nicole Smith)
为了达到这个实例目的,我们将实现以下API:
我们为输入的string参数定义了数据格式PrimitiveObjectInspector
定义输出数据格式(objectinspectors) 需要我们先定义两个属性名称,因为(objectinspectors)需要读取每一个属性(在这个实例中,两个属性都是string类型)。
我们主要的处理逻辑放在这个比较直观的processInputRecord方法当中。分开逻辑处理有利我们进行更简单的单元测试,而不用涉及到繁琐的objectinspector。
最后,一旦得到结果就可以对其进行forward,把基注册为hive处理后的输出记录对象。
代码
文章中所有的代码可以在这里找到:hive examples、GitHub repository示例数据
首先先创建一张包含示例数据的表:people,该表只有name一列,该列中包含了一个或多个名字,该表数据保存在people.txt文件中。~$ cat ./people.txt John Smith John and Ann White Ted Green Dorothy把该文件上载到hdfs目录/user/matthew/people中:
hadoop fs -mkdir people hadoop fs -put ./people.txt people
下面要创建hive外部表,在hive shell中执行
CREATE EXTERNAL TABLE people (name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ESCAPED BY '' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION '/user/matthew/people';
UDTF的输出值
上一文章讲解的UDF与GenericUDF函数是操作单个数据域。它们必须要返回一个值。但是这并不适用于所用的数据处理任务。Hive可以存储许多类型的数据,而有时候我们并不想单数据域输入、单数据域输出。对于每一行的输入,可能我们想输出多行,又或是不输出,举个例子,想一下函数explode(一个hive内置函数)的作用。同样,可能我们也想输出多列,而不是输出单列。
以上所有的要求我们可以用UDTF去完成。
实例
首先我们先假设我们想清洗people这张表中的人名,这个新的表有:1、姓和名 两个分开的列
2、所有记录都包含姓名
3、每条记录或有包含多个人名(eg Nick and Nicole Smith)
为了达到这个实例目的,我们将实现以下API:
org.apache.hadoop.hive.ql.udf.generic.GenericUDTF我们将覆盖以下三个方法:
//该方法中,我们将指定输入输出参数:输入参数的ObjectInspector与输出参数的StructObjectInspector abstract StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException; //我们将处理一条输入记录,输出若干条结果记录 abstract void process(Object[] record) throws HiveException; //当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出 abstract void close() throws HiveException;
代码实现
完整代码
public class NameParserGenericUDTF extends GenericUDTF { private PrimitiveObjectInspector stringOI = null; @Override public StructObjectInspector initialize(ObjectInspector[] args) UDFArgumentException { if (args.length != 1) { throw new UDFArgumentException("NameParserGenericUDTF() takes exactly one argument"); } if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE && ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) { throw new UDFArgumentException("NameParserGenericUDTF() takes a string as a parameter"); } // 输入格式(inspectors) stringOI = (PrimitiveObjectInspector) args[0]; // 输出格式(inspectors) -- 有两个属性的对象 List<String> fieldNames = new ArrayList<String>(2); List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2); fieldNames.add("name"); fieldNames.add("surname"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } public ArrayList<Object[]> processInputRecord(String name){ ArrayList<Object[]> result = new ArrayList<Object[]>(); // 忽略null值与空值 if (name == null || name.isEmpty()) { return result; } String[] tokens = name.split("\\s+"); if (tokens.length == 2){ result.add(new Object[] { tokens[0], tokens[1] }); }else if (tokens.length == 4 && tokens[1].equals("and")){ result.add(new Object[] { tokens[0], tokens[3] }); result.add(new Object[] { tokens[2], tokens[3] }); } return result; } @Override public void process(Object[] record) throws HiveException { final String name = stringOI.getPrimitiveJavaObject(record[0]).toString(); ArrayList<Object[]> results = processInputRecord(name); Iterator<Object[]> it = results.iterator(); while (it.hasNext()){ Object[] r = it.next(); forward(r); } } @Override public void close() throws HiveException { // do nothing } }以上代码可以从:github目录 check 下来。
代码走读
该UDTF以string类型作为参数,返回一个拥有两个属性的对象,与GenericUDF比较相似,指定输入输出数据格式(objectinspector),以便hive能识别输入与输出。我们为输入的string参数定义了数据格式PrimitiveObjectInspector
stringOI = (PrimitiveObjectInspector) args[0]
定义输出数据格式(objectinspectors) 需要我们先定义两个属性名称,因为(objectinspectors)需要读取每一个属性(在这个实例中,两个属性都是string类型)。
List<String> fieldNames = new ArrayList<String>(2); fieldNames.add("name"); fieldNames.add("surname"); List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
我们主要的处理逻辑放在这个比较直观的processInputRecord方法当中。分开逻辑处理有利我们进行更简单的单元测试,而不用涉及到繁琐的objectinspector。
最后,一旦得到结果就可以对其进行forward,把基注册为hive处理后的输出记录对象。
while (it.hasNext()){ Object[] r = it.next(); forward(r); } }
使用该UDTF函数
我们可以在hive中创建我们自己的函数mvn package cp target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar ./ext.jar然后在hive中使用
ADD JAR ./ext.jar; CREATE TEMPORARY FUNCTION process_names as 'com.matthewrathbone.example.NameParserGenericUDTF'; SELECT adTable.name, adTable.surname FROM people lateral view process_names(name) adTable as name, surname;输出
OK John Smith John White Ann White Ted Green
原文链接
http://beekeeperdata.com/posts/hadoop/2015/07/26/Hive-UDTF-Tutorial.html相关文章推荐
- 详解HDFS Short Circuit Local Reads
- Hadoop_2.1.0 MapReduce序列图
- 使用Hadoop搭建现代电信企业架构
- 分享Hive的一份胶片资料
- 单机版搭建Hadoop环境图文教程详解
- Hadoop生态上几个技术的关系与区别:hive、pig、hbase 关系与区别
- hadoop常见错误以及处理方法详解
- hadoop 单机安装配置教程
- hadoop的hdfs文件操作实现上传文件到hdfs
- hadoop实现grep示例分享
- Apache Hadoop版本详解
- linux下搭建hadoop环境步骤分享
- hadoop client与datanode的通信协议分析
- hadoop中一些常用的命令介绍
- Hadoop单机版和全分布式(集群)安装
- 用PHP和Shell写Hadoop的MapReduce程序
- hadoop map-reduce中的文件并发操作
- Hadoop1.2中配置伪分布式的实例
- java结合HADOOP集群文件上传下载
- 用python + hadoop streaming 分布式编程(一) -- 原理介绍,样例程序与本地调试