您的位置:首页 > 其它

hbase0.98自定义过滤器

2017-12-08 16:03 239 查看
一般来说调整表设计就可以优化访问模式。但是有时你已经把表设计调整得尽可能好了,为不同访问模式优化得尽可能好了。当你仍然需要减少返回客户端的数据或者现有的过滤器还不能满足需求时,这就是考虑使用过滤器的时候了。

自定义filter继承的接口Filter,或是FilterBase。

以下是重写的方法:

    filterRowKey根据RK决定行是否被过滤

    filterKeyValue根据KV决定行或列(ReturnCode)是否被过滤

    filterRow(keyvalues)根据制定列值决定行是否过滤

    filterRow在之前Filter结束后,进行过滤

     reset当一行读完,为了读下一行,清空当前行数据

    filterAllRemainning当返回true,停止scan

过滤器的执行流程图:





自定义过滤器步骤:

 

1、用protobuf工具创建序列化类(可能因为HBase0.95内部引入了Google-Protobuf作为中间数据组织方式后才需要这步,本例是在0.98.7上编写的,0.95版本以前的没试过需不需要这步),有关protobuf的详细用法请查阅相关资料,这里不介绍:

a、下载2.5版本proto.exe(hbase0.98.7内protobuf版本是2.5),在proto.exe所在文件夹下用记事本编写一个.proto文件,如下:

option java_package = "cn.cstor.cproc.java"
;

option java_outer_classname = "MyKeyFilterProto" ;

 

message MyKeyFilter{

required string keyType = 1 ;

required string comp = 2 ;

required string value = 3 ;

optional int32 offset = 4 ;

optional int32 len = 5 ;

 

}

b、因为本人在windows环境下编码,在cmd中进入proto.exe所在文件夹,执行命令: protoc.exe --java_out=./ test.proto ,则可以找到的一个生成的FirstProtobuf.java文件。将该文  件拷到myeclipse中对应的package下。

 

2、编写自己的过滤器,如下代码(该例实现分页过滤器):

package cn.cstor.cproc.java.util;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import java.util.List;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.hbase.KeyValue;

import org.apache.hadoop.hbase.exceptions.DeserializationException;

import org.apache.hadoop.hbase.filter.FilterBase;

import com.google.protobuf.InvalidProtocolBufferException;

 

public class RowPaginationFilter extends FilterBase {

static final Log LOG = LogFactory.getLog(RowPaginationFilter.class);

private int rowsAccepted = 0;

private int offset = 0;

private int limit = 0;

public RowPaginationFilter() {

}

 

public RowPaginationFilter(int offset, int limit) {

this.offset = offset;

this.limit = limit;

}

@Override

public void reset() {

// noop

}

@Override

public boolean filterAllRemaining() {

return this.rowsAccepted > this.limit + this.offset;

}

@Override

public boolean filterRowKey(byte[] rowKey, int offset, int length) {

return false;

}

public ReturnCode filterKeyValue(KeyValue v) {

return ReturnCode.INCLUDE;

}

@SuppressWarnings("deprecation")

@Override

public void filterRow(List ignored) {

try {

super.filterRow(ignored);

} catch (IOException e) {

e.printStackTrace();

}

}

// true to exclude row, false to include row.

@Override

public boolean filterRow() {

boolean isExclude = this.rowsAccepted < this.offset || this.rowsAccepted >= this.limit + this.offset;

rowsAccepted++;

return isExclude;

}

public void readFields(final DataInput in) throws IOException {

this.offset = in.readInt();

this.limit = in.readInt();

}

public void write(final DataOutput out) throws IOException {

out.write(offset);

out.write(limit);

}

@Override  //重写该方法

    public byte[] toByteArray() {

RowPaginationProto.RowPaginationFilter.Builder builder =

RowPaginationProto.RowPaginationFilter.newBuilder();

 

        builder.setLimit(this.limit);

        builder.setOffset(this.offset);

        return builder.build().toByteArray();

    }

    

   //定义该方法,用于对象反序列化操作

    public static RowPaginationFilter parseFrom(final byte [] bytes) throws DeserializationException {

    RowPaginationProto.RowPaginationFilter proto = null;

        try {

            proto = RowPaginationProto.RowPaginationFilter.parseFrom(bytes);

        } catch (InvalidProtocolBufferException e) {

            throw new DeserializationException(e);

        }

        return new RowPaginationFilter(proto.getOffset(),proto.getLimit());

    }

}

 

注:自订filter中必须重写这两个个方法:
    public static Filter parseFrom(final byte [] pbBytes) throwsDeserializationException

    public byte [] toByteArray()

 

3、将xxxx和xxxx这两个类用myeclipse打成jar包部署到hbase中(放到lib目录中或者放任意目录通过修改hbase_evn.sh配置文件HBASE_CLASSPATH指定该jar包路径也可以)注:jar包要分发到所有 regionserver上,因为过滤器是在各regionserver上执行的。

 

4、重启HBASE,测试过滤器是否生效。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hbase filter