java程序员的大数据之路(15):Pig Latin用户自定义函数
2017-12-15 13:54
411 查看
过滤函数
所有的过滤函数都要继承FilterFunc类,并且实现抽象方法exec(),该方法的返回类型为Boolean。示例代码如下:
package com.udf.filter; import org.apache.pig.FilterFunc; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.Tuple; import java.io.IOException; public class IsGoodQuantity extends FilterFunc { @Override public Boolean exec(Tuple tuple) throws IOException { if (tuple == null || tuple.size() == 0) { return false; } try { Object object = tuple.get(0); if (object == null) { return false; } int i = (Integer)object; return i == 0 || i == 1 || i == 4 || i == 5 || i == 9; } catch (ExecException e) { throw new IOException(e); } } }
编写好代码之后,首先将它打成一个jar包。然后通过REGISTER操作指定文件的路径。
grunt> records = LOAD '/home/jackeyzhe/hadoop-book/input/ncdc/micro-tab/sample.txt' >> AS (year:chararray, temperature:int, quality:int); DUMP records; (1950,0,1) (1950,22,1) (1950,-11,1) (1949,111,1) (1949,78,1) (1994,24,2) REGISTER pigFilterUdf.jar grunt> filtered_records = FILTER records BY temperature != 9999 AND >> com.udf.filter.IsGoodQuantity(quality); DUMP filtered_records; (1950,0,1) (1950,22,1) (1950,-11,1) (1949,111,1) (1949,78,1)
结果如上所示,quality为2的数据被过滤掉了。
计算函数
自定义的计算函数要继承EvalFunc类,需要注意的是,写计算函数需要参数化返回类型。该类型为String。示例代码如下:
public class Trim extends EvalFunc<String> { @Override public String exec(Tuple tuple) throws IOException { if (tuple == null || tuple.size() == 0) { return null; } try { Object object = tuple.get(0); if (object == null) { return null; } return ((String) object).trim(); } catch (ExecException e) { throw new IOException(e); } } @Override public List<FuncSpec> getArgToFuncMapping() throws FrontendException { List<FuncSpec> funcList = new ArrayList<FuncSpec>(); funcList.add(new FuncSpec(this.getClass().getName(), new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY)))); return funcList; } }
加载函数
加载函数需要继承LoadFunc,并实现相应的抽象方法。具体代码示例如下:
CutLoadFunc.java
public class CutLoadFunc extends LoadFunc { private static final Log LOG = LogFactory.getLog(CutLoadFunc.class); private final List<Range> ranges; private final TupleFactory tupleFactory = TupleFactory.getInstance(); private RecordReader reader; public CutLoadFunc(String cutPattern) { ranges = Range.parse(cutPattern); } @Override public void setLocation(String location, Job job) throws IOException { FileInputFormat.setInputPaths(job, location); } @Override public InputFormat getInputFormat() throws IOException { return new TextInputFormat(); } @Override public void prepareToRead(RecordReader recordReader, PigSplit pigSplit) throws IOException { this.reader = recordReader; } @Override public Tuple getNext() throws IOException { try { if (!reader.nextKeyValue()) { return null; } Text value = (Text) reader.getCurrentValue(); String line = value.toString(); Tuple tuple = tupleFactory.newTuple(ranges.size()); for (int i=0;i < ranges.size();i++) { Range range = ranges.get(i); if (range.getEnd() > line.length()) { LOG.warn(String.format(" Range end (%s) is longer than line length (%s)",range.getEnd(), line.length())); continue; } tuple.set(i, new DataByteArray(range.getSubstring(line))); } return tuple; } catch (InterruptedException e) { throw new ExecException(e); } } }
Range.java
public class Range { private final int start; private final int end; public Range(int start, int end) { this.start = start; this.end = end; } public int getStart() { return start; } public int getEnd() { return end; } public String getSubstring(String line) { return line.substring(start - 1, end); } @Override public int hashCode() { return start * 37 + end; } @Override public boolean equals(Object obj) { if (!(obj instanceof Range)) { return false; } Range other = (Range) obj; return this.start == other.start && this.end == other.end; } public static List<Range> parse(String rangeSpec) throws IllegalArgumentException { if (rangeSpec.length() == 0) { return Collections.emptyList(); } List<Range> ranges = new ArrayList<Range>(); String[] specs = rangeSpec.split(","); for (String spec : specs) { String[] split = spec.split("-"); try { ranges.add(new Range(Integer.parseInt(split[0]), Integer .parseInt(split[1]))); } catch (NumberFormatException e) { throw new IllegalArgumentException(e.getMessage()); } } return ranges; } }
加载结果:
grunt> records = LOAD '/home/jackeyzhe/hadoop-book/input/ncdc/micro/sample.txt' >> USING com.udf.load.CutLoadFunc('16-19,88-92,93-93') >> AS (year:int, temperature:int, quality:int); grunt> DUMP records;
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)
(1949,78,1)
相关文章推荐
- java程序员的大数据之路(11):MapReduce的连接
- java程序员的python之路(数据类型)
- java程序员的大数据之路(8):MapReduce的工作机制
- java程序员的大数据之路(12):Hadoop的守护进程
- java程序员的大数据之路(5):HDFS压缩与解压缩
- java程序员的大数据之路(7):基于文件的数据结构
- java程序员的大数据之路(13):Pig入门
- java程序员的大数据之路(4):编程调用HDFS
- java程序员的大数据之路(6):定制的Writable类型
- java程序员的大数据之路(16):Hive简介
- java程序员的大数据之路(9):MapReduce的类型
- java程序员的大数据之路(10):MapReduce的排序
- 最老程序员创业札记:全文检索、数据挖掘、推荐引擎应用15
- Java程序员的未来发展之路
- java程序员成长之路
- Java程序员从笨鸟到菜鸟之(四十三)细谈struts2(六)获取servletAPI和封装表单数据
- Java程序员从笨鸟到菜鸟之(四十四)细谈struts2(七)数据类型转换详解
- Java程序员从笨鸟到菜鸟之(四十四)细谈struts2(七)数据类型转换详解
- Java程序员学习之路
- 我也是个大专生?我的java之路,我的程序员之路。