您的位置:首页 > 编程语言

hbase 源代码解析(21) 自定义过滤器

2017-07-23 14:02 337 查看
filter 需要实现Filter 或者继承FilterBase
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class Filter {
//返回码
public enum ReturnCode {
INCLUDE, //结果中包含着一样
INCLUDE_AND_NEXT_COL, //包含着这样一行,跳到下一行比较
SKIP,//跳到下一个keyvalue 并进行处理
NEXT_COL, //跳过当前一col
NEXT_ROW,//跳过当前一行
SEEK_NEXT_USING_HINT,//跳到下一个满足地方,需要调用getNextKeyHint()
}
protected transient boolean reversed;
abstract public void reset() throws IOException;
//判断行健是否满足,不满足可以跳过,避免其他检查:比如前缀过滤器
abstract public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException;
//这个过滤器可以提前结束
abstract public boolean filterAllRemaining() throws IOException;
//对cell处理,
abstract public ReturnCode filterKeyValue(final Cell v) throws IOException;
abstract public Cell transformCell(final Cell v) throws IOException;
@Deprecated // use Cell transformCell(final Cell)
abstract public KeyValue transform(final KeyValue currentKV) throws IOException;
//经过前面处理后,如果还有数据,将对当前行一起处理, 比如依赖过去器
abstract public void filterRowCells(List<Cell> kvs) throws IOException;
abstract public boolean hasFilterRow();
//经过这么多流程如果还有数据,会去检查一下数据的要求。比如pagefilter 是否已经够一页了
abstract public boolean filterRow() throws IOException;
@Deprecated
abstract public KeyValue getNextKeyHint(final KeyValue currentKV) throws IOException;
abstract public Cell getNextCellHint(final Cell currentKV) throws IOException;
abstract public boolean isFamilyEssential(byte[] name) throws IOException;
abstract public byte[] toByteArray() throws IOException;
public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException {
throw new DeserializationException(
"parseFrom called on base Filter, but should be called on derived type");
}
abstract boolean areSerializedFieldsEqual(Filter other);
public void setReversed(boolean reversed) {
this.reversed = reversed;
}
public boolean isReversed() {
return this.reversed;
}
}
流程如下: scan,或者get是调用的入口基本流程就是下面 一样,但是没有看到fiterKeyValue,如果你们找到了,告诉我一声。
private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
throws IOException {
 
while (true) {
  
boolean stopRow = isStopRow(currentRow, offset, length);
 
boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
if (hasFilterRow) {
if (LOG.isTraceEnabled()) {
LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
+ " formed. Changing scope of limits that may create partials");
}
scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
}
if (filterRowKey(currentRow, offset, length)) {
incrementCountOfRowsFilteredMetric(scannerContext);
// early check, see HBASE-16296
//filterAllRemaining 实际调用
if (isFilterDoneInternal()) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
incrementCountOfRowsScannedMetric(scannerContext);//里面会调用filter.reset();
boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
results.clear();
continue;
}
populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length);
Cell nextKv = this.storeHeap.peek();
stopRow = nextKv == null ||
isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
final boolean isEmptyRow = results.isEmpty();
 FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
if (hasFilterRow) {
//会调用
filterRowCells(List) 和filterRowCells(cell)
ret = filter.filterRowCellsWithRet(results);
 
long timeProgress = scannerContext.getTimeProgress();
if (scannerContext.getKeepProgress()) {
scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
initialTimeProgress);
} else {
scannerContext.clearProgress();
}
scannerContext.setTimeProgress(timeProgress);
scannerContext.incrementBatchProgress(results.size());
for (Cell cell : results) {
scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
}
}
if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
incrementCountOfRowsFilteredMetric(scannerContext);
results.clear();
boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
 if (!stopRow) continue;
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
 
if (this.joinedHeap != null) {
boolean mayHaveData = joinedHeapMayHaveData(currentRow, offset, length);
if (mayHaveData) {
joinedContinuationRow = current;
populateFromJoinedHeap(results, scannerContext);
if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
return true;
}
}
}
} else {
populateFromJoinedHeap(results, scannerContext);
if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
return true;
}
}
if (stopRow) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} else {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
}
}
就这样。自定义完成后,打成jar ,需要export export HBASE_CLAPSS 或者将jar 放到hbase 的安装目录的lib下面,重启hbase
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hbase 源代码