hbase 源代码解析(21) 自定义过滤器
2017-07-23 14:02
337 查看
filter 需要实现Filter 或者继承FilterBase
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class Filter {
//返回码
public enum ReturnCode {
INCLUDE, //结果中包含着一样
INCLUDE_AND_NEXT_COL, //包含着这样一行,跳到下一行比较
SKIP,//跳到下一个keyvalue 并进行处理
NEXT_COL, //跳过当前一col
NEXT_ROW,//跳过当前一行
SEEK_NEXT_USING_HINT,//跳到下一个满足地方,需要调用getNextKeyHint()
}
protected transient boolean reversed;
abstract public void reset() throws IOException;
//判断行健是否满足,不满足可以跳过,避免其他检查:比如前缀过滤器
abstract public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException;
//这个过滤器可以提前结束
abstract public boolean filterAllRemaining() throws IOException;//对cell处理,
abstract public ReturnCode filterKeyValue(final Cell v) throws IOException;abstract public Cell transformCell(final Cell v) throws IOException;
@Deprecated // use Cell transformCell(final Cell)
abstract public KeyValue transform(final KeyValue currentKV) throws IOException;
//经过前面处理后,如果还有数据,将对当前行一起处理, 比如依赖过去器
abstract public void filterRowCells(List<Cell> kvs) throws IOException;
abstract public boolean hasFilterRow();
//经过这么多流程如果还有数据,会去检查一下数据的要求。比如pagefilter 是否已经够一页了
abstract public boolean filterRow() throws IOException;
@Deprecated
abstract public KeyValue getNextKeyHint(final KeyValue currentKV) throws IOException;
abstract public Cell getNextCellHint(final Cell currentKV) throws IOException;
abstract public boolean isFamilyEssential(byte[] name) throws IOException;
abstract public byte[] toByteArray() throws IOException;
public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException {
throw new DeserializationException(
"parseFrom called on base Filter, but should be called on derived type");
}
abstract boolean areSerializedFieldsEqual(Filter other);
public void setReversed(boolean reversed) {
this.reversed = reversed;
}public boolean isReversed() {
return this.reversed;
}
}流程如下: scan,或者get是调用的入口基本流程就是下面 一样,但是没有看到fiterKeyValue,如果你们找到了,告诉我一声。
private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
throws IOException {
while (true) {
boolean stopRow = isStopRow(currentRow, offset, length);
boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
if (hasFilterRow) {
if (LOG.isTraceEnabled()) {
LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
+ " formed. Changing scope of limits that may create partials");
}
scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
}
if (filterRowKey(currentRow, offset, length)) {
incrementCountOfRowsFilteredMetric(scannerContext);
// early check, see HBASE-16296//filterAllRemaining 实际调用
if (isFilterDoneInternal()) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}incrementCountOfRowsScannedMetric(scannerContext);//里面会调用filter.reset();
boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
results.clear();
continue;
}
populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length);Cell nextKv = this.storeHeap.peek();
stopRow = nextKv == null ||
isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
final boolean isEmptyRow = results.isEmpty();FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
if (hasFilterRow) {
//会调用filterRowCells(List) 和filterRowCells(cell)
ret = filter.filterRowCellsWithRet(results);
long timeProgress = scannerContext.getTimeProgress();
if (scannerContext.getKeepProgress()) {
scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
initialTimeProgress);
} else {
scannerContext.clearProgress();
}
scannerContext.setTimeProgress(timeProgress);
scannerContext.incrementBatchProgress(results.size());
for (Cell cell : results) {
scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
}
}
if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
incrementCountOfRowsFilteredMetric(scannerContext);
results.clear();
boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}if (!stopRow) continue;
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
if (this.joinedHeap != null) {
boolean mayHaveData = joinedHeapMayHaveData(currentRow, offset, length);
if (mayHaveData) {
joinedContinuationRow = current;
populateFromJoinedHeap(results, scannerContext);
if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
return true;
}
}
}
} else {populateFromJoinedHeap(results, scannerContext);
if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
return true;
}
}
if (stopRow) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} else {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
}
}就这样。自定义完成后,打成jar ,需要export export HBASE_CLAPSS 或者将jar 放到hbase 的安装目录的lib下面,重启hbase
相关文章推荐
- hbase 源代码解析(22)部分流程图笔记
- hbase0.98自定义过滤器
- hbase 源代码解析(4) 的createTable 的 region assign
- Socket通信自定义mina 框架过滤器解析(处理粘包、断包问题)
- Qt之事件过滤器 截获消息通知 自定义消息处理事件解析.
- hbase 源代码解析(3) 的createTable 服务端解析第二部分
- hbase中分页过滤器详细解析
- SSM + Shiro 整合 (5)- 自定义过滤器及权限解析器、介绍权限匹配流程
- hbase 源代码解析(1) Connection
- hbase 源代码解析(2)HAdmin 的表创建过程
- HBase自定义过滤器
- hbase 源代码解析(23)truncate 和truncate_preserve流程分析
- nutch源代码--html的头信息解析
- 如何创建、使用以及解析自定义注解
- JavaScript自定义日期格式化函数详细解析
- h.264 SPS PPS解析源代码,C实现
- 08.Django自定义模板,自定义标签和自定义过滤器
- Scala操作hbase 最详细的代码解析
- 使用python解析apache日志并上传到HBase
- datagrid数据格式解析-自定义过滤函数loadFilter