您的位置:首页 > 职场人生

java程序员的大数据之路(15):Pig Latin用户自定义函数

2017-12-15 13:54 411 查看

过滤函数

所有的过滤函数都要继承FilterFunc类,并且实现抽象方法exec(),该方法的返回类型为Boolean。

示例代码如下:

package com.udf.filter;

import org.apache.pig.FilterFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;

import java.io.IOException;

public class IsGoodQuantity extends FilterFunc {

@Override
public Boolean exec(Tuple tuple) throws IOException {
if (tuple == null || tuple.size() == 0) {
return false;
}
try {
Object object = tuple.get(0);
if (object == null) {
return false;
}
int i = (Integer)object;
return i == 0 || i == 1 || i == 4 || i == 5 || i == 9;
} catch (ExecException e) {
throw new IOException(e);
}
}
}


编写好代码之后,首先将它打成一个jar包。然后通过REGISTER操作指定文件的路径。

grunt> records = LOAD '/home/jackeyzhe/hadoop-book/input/ncdc/micro-tab/sample.txt'
>> AS (year:chararray, temperature:int, quality:int);
DUMP records;
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)
(1949,78,1)
(1994,24,2)
REGISTER pigFilterUdf.jar

grunt> filtered_records = FILTER records BY temperature != 9999 AND
>> com.udf.filter.IsGoodQuantity(quality);
DUMP filtered_records;
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)
(1949,78,1)


结果如上所示,quality为2的数据被过滤掉了。

计算函数

自定义的计算函数要继承EvalFunc类,需要注意的是,写计算函数需要参数化返回类型。该类型为String。

示例代码如下:

public class Trim extends EvalFunc<String> {
@Override
public String exec(Tuple tuple) throws IOException {
if (tuple == null || tuple.size() == 0) {
return null;
}
try {
Object object = tuple.get(0);
if (object == null) {
return null;
}
return ((String) object).trim();
} catch (ExecException e) {
throw new IOException(e);
}
}

@Override
public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
List<FuncSpec> funcList = new ArrayList<FuncSpec>();
funcList.add(new FuncSpec(this.getClass().getName(), new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY))));
return funcList;
}
}


加载函数

加载函数需要继承LoadFunc,并实现相应的抽象方法。

具体代码示例如下:

CutLoadFunc.java

public class CutLoadFunc extends LoadFunc {

private static final Log LOG = LogFactory.getLog(CutLoadFunc.class);

private final List<Range> ranges;
private final TupleFactory tupleFactory = TupleFactory.getInstance();
private RecordReader reader;

public CutLoadFunc(String cutPattern) {
ranges = Range.parse(cutPattern);
}

@Override
public void setLocation(String location, Job job) throws IOException {
FileInputFormat.setInputPaths(job, location);
}

@Override
public InputFormat getInputFormat() throws IOException {
return new TextInputFormat();
}

@Override
public void prepareToRead(RecordReader recordReader, PigSplit pigSplit) throws IOException {
this.reader = recordReader;
}

@Override
public Tuple getNext() throws IOException {
try {
if (!reader.nextKeyValue()) {
return null;
}
Text value = (Text) reader.getCurrentValue();
String line = value.toString();
Tuple tuple = tupleFactory.newTuple(ranges.size());
for (int i=0;i < ranges.size();i++) {
Range range = ranges.get(i);
if (range.getEnd() > line.length()) {
LOG.warn(String.format(" Range end (%s) is longer than line length (%s)",range.getEnd(), line.length()));
continue;
}
tuple.set(i, new DataByteArray(range.getSubstring(line)));
}
return tuple;
} catch (InterruptedException e) {
throw new ExecException(e);
}
}
}


Range.java

public class Range {
private final int start;
private final int end;

public Range(int start, int end) {
this.start = start;
this.end = end;
}

public int getStart() {
return start;
}

public int getEnd() {
return end;
}

public String getSubstring(String line) {
return line.substring(start - 1, end);
}

@Override
public int hashCode() {
return start * 37 + end;
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof Range)) {
return false;
}
Range other = (Range) obj;
return this.start == other.start && this.end == other.end;
}

public static List<Range> parse(String rangeSpec)
throws IllegalArgumentException {
if (rangeSpec.length() == 0) {
return Collections.emptyList();
}
List<Range> ranges = new ArrayList<Range>();
String[] specs = rangeSpec.split(",");
for (String spec : specs) {
String[] split = spec.split("-");
try {
ranges.add(new Range(Integer.parseInt(split[0]), Integer
.parseInt(split[1])));
} catch (NumberFormatException e) {
throw new IllegalArgumentException(e.getMessage());
}
}
return ranges;
}

}


加载结果:

grunt> records = LOAD '/home/jackeyzhe/hadoop-book/input/ncdc/micro/sample.txt'
>> USING com.udf.load.CutLoadFunc('16-19,88-92,93-93')
>> AS (year:int, temperature:int, quality:int);
grunt> DUMP records;


(1950,0,1)

(1950,22,1)

(1950,-11,1)

(1949,111,1)

(1949,78,1)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  pig