玩过hadoop,hive的你们可以回顾回顾
2016-03-31 16:31
309 查看
这几年,大家都是从hadoop,hive走过来的。hadoop现在都快3.0了。下面贴一下我去年写的udaf,你看每个环节你是不是都清楚了。
import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.Text; /** * Created by xxx on 2014/11/23. */ public class ConcatUDAF extends AbstractGenericUDAFResolver { public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) { return new ConcatUDAFEvaluator(); } public static class ConcatUDAFEvaluator extends GenericUDAFEvaluator { private transient PrimitiveObjectInspector vectorsFieldOI; private Text result; @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters); vectorsFieldOI = (PrimitiveObjectInspector) parameters[0]; PrimitiveTypeInfo typeInfo = TypeInfoFactory.stringTypeInfo; return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(typeInfo); } /** * class for storing string concat value. */ static class ConcatHiveStringAgg extends AbstractAggregationBuffer { boolean empty; HiveVarchar str; } @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { ConcatHiveStringAgg agg = new ConcatHiveStringAgg(); reset(agg); return agg; } @Override public void reset(AggregationBuffer agg) throws HiveException { ConcatHiveStringAgg bdAgg = (ConcatHiveStringAgg) agg; bdAgg.empty = true; bdAgg.str = new HiveVarchar(); bdAgg.str.setValue(""); } boolean warned = false; @Override public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { try { ConcatHiveStringAgg myagg = (ConcatHiveStringAgg) agg; String str = myagg.str.toString(); String f = "000"; if (null != str && str.trim().length() != 0) f = ","; myagg.str.setValue(str + f + parameters[0]); } catch (Exception e) { if (!warned) { warned = true; } } } @Override public Object terminatePartial(AggregationBuffer agg) throws HiveException { ConcatHiveStringAgg myagg = (ConcatHiveStringAgg) agg; if (myagg.str == null) { result = new Text("nothing"); } result = new Text("terminatePar:" + myagg.str.toString()); return result; } @Override public void merge(AggregationBuffer agg, Object partial) throws HiveException { if (partial != null) { ConcatHiveStringAgg myagg = (ConcatHiveStringAgg) agg; String str = myagg.str.toString(); String f = ""; if (null != str && str.trim().length() != 0) f = "|"; myagg.str.setValue(str + f + PrimitiveObjectInspectorUtils.getHiveChar( partial, vectorsFieldOI)); myagg.empty = false; } } @Override public Object terminate(AggregationBuffer agg) throws HiveException { ConcatHiveStringAgg myagg = (ConcatHiveStringAgg) agg; if (myagg.str == null) { result = new Text("nothing"); } result = new Text(myagg.str.toString()); return result; } } }
相关文章推荐
- Linux时间子系统之八:动态时钟框架(CONFIG_NO_HZ、tickless)【转】
- Linux 系统常用命令汇总(七) 安全设置
- Linux 系统常用命令汇总(六) 文件打包与压缩
- Linux 系统常用命令汇总(五) 磁盘管理
- Linux 系统常用命令汇总(四) 程序和资源管理
- Linux 系统常用命令汇总(三) 用户和用户组管理
- Linux 系统常用命令汇总(二) vi 文本编辑
- Linux发行版制作总结
- linux 中特殊符号用法详解
- FastDFS+Nginx轻量级分布式
- nginx配置location总结及rewrite规则写法
- Linux 系统常用命令汇总(一) 文件和目录操作
- 再生龙u盘制作及使用 - Linux - clonezilla
- 学习OpenGL(三)绘制点
- yum源配置
- Linux命令(1):cd命令
- 详解用ELK来分析Nginx服务器日志的方法
- Linux Net I/O
- shell脚本实例
- mac(linux) 上如何安装ant