您的位置:首页 > 运维架构

玩过hadoop,hive的你们可以回顾回顾

2016-03-31 16:31 309 查看
这几年,大家都是从hadoop,hive走过来的。hadoop现在都快3.0了。下面贴一下我去年写的udaf,你看每个环节你是不是都清楚了。

import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.Text;

/**
* Created by xxx on 2014/11/23.
*/

public class ConcatUDAF extends AbstractGenericUDAFResolver {

public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) {
return new ConcatUDAFEvaluator();
}

public static class ConcatUDAFEvaluator extends GenericUDAFEvaluator {

private transient PrimitiveObjectInspector vectorsFieldOI;
private Text result;

@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
super.init(m, parameters);
vectorsFieldOI = (PrimitiveObjectInspector) parameters[0];
PrimitiveTypeInfo typeInfo = TypeInfoFactory.stringTypeInfo;
return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(typeInfo);
}

/**
* class for storing string concat value.
*/
static class ConcatHiveStringAgg extends AbstractAggregationBuffer {
boolean empty;
HiveVarchar str;
}

@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
ConcatHiveStringAgg agg = new ConcatHiveStringAgg();
reset(agg);
return agg;
}

@Override
public void reset(AggregationBuffer agg) throws HiveException {
ConcatHiveStringAgg bdAgg = (ConcatHiveStringAgg) agg;
bdAgg.empty = true;
bdAgg.str = new HiveVarchar();
bdAgg.str.setValue("");
}

boolean warned = false;

@Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
try {
ConcatHiveStringAgg myagg = (ConcatHiveStringAgg) agg;
String str = myagg.str.toString();
String f = "000";
if (null != str && str.trim().length() != 0)
f = ",";
myagg.str.setValue(str + f + parameters[0]);
} catch (Exception e) {
if (!warned) {
warned = true;
}
}
}

@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
ConcatHiveStringAgg myagg = (ConcatHiveStringAgg) agg;
if (myagg.str == null) {
result = new Text("nothing");
}
result = new Text("terminatePar:" + myagg.str.toString());
return result;
}

@Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
if (partial != null) {
ConcatHiveStringAgg myagg = (ConcatHiveStringAgg) agg;
String str = myagg.str.toString();
String f = "";
if (null != str && str.trim().length() != 0)
f = "|";
myagg.str.setValue(str + f + PrimitiveObjectInspectorUtils.getHiveChar(
partial, vectorsFieldOI));
myagg.empty = false;
}
}

@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
ConcatHiveStringAgg myagg = (ConcatHiveStringAgg) agg;
if (myagg.str == null) {
result = new Text("nothing");
}
result = new Text(myagg.str.toString());
return result;
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: