您的位置:首页 > 运维架构

HBase--Coprocessor协处理器

2016-03-29 22:38 387 查看
前面说过,Hbase做聚合,分组之类的运算很不方便,更不要说join之类的,更是麻烦。有没有好点的办法呢?这里介绍下HBase--Coprocessor。Hbase有两种类型的Coprocessor,为:Observer Coprocessors 和Endpoint Coprocessor,前者类似触发器,在特定的事件发生时候触发,后者类似存储过程,执行数据计算。观察者协处理器在 很多地方可能用到这些,比如:数据安全权限限制,数据外键参考或者一致性,二级索引,主要类型有:RegionObserver,RegionServerObserver,MasterObserver,WalObserver。
那么用什么方式载入Coprocessor?分为静态和动态两种方式。

静态方式是指通过hbase-site.xml,指定Coprocessor。

<property>

<name>hbase.coprocessor.region.classes</name>

<value>org.myname.hbase.coprocessor.endpoint.SumEndPoint</value>

</property>

上面的name有如下几种形式:

hbase.coprocessor.region.classes for RegionObservers and Endpoints.

hbase.coprocessor.wal.classes for WALObservers.

hbase.coprocessor.master.classes for MasterObservers.

当然下面的是value是相应的类名称,可以是自定义的类,如果有多个,可以用逗号分隔,都写上(当然必须在hbase类路径能够找到)。甚至可以为其指定优先级和输入构造参数。上传自定义的类之后,必须重新启动hbase。

动态方式是通过Hbase shell 或者Hbase java API来载入,不需要重启Hbase.

hbase> disable 'users'

hbase alter 'users', METHOD => 'table_att', 'Coprocessor'=>'hdfs://<namenode>:<port>/

user/<hadoop-user>/coprocessor.jar| org.myname.hbase.Coprocessor.RegionObserverExample|1073741823|

arg1=1,arg2=2'

hbase(main):003:0> enable 'users'

hbase(main):04:0> describe 'users'

也可以通过java代码来进行:

TableName tableName = TableName.valueOf("users");

String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar";

Configuration conf = HBaseConfiguration.create();

Connection connection = ConnectionFactory.createConnection(conf);

Admin admin = connection.getAdmin();

admin.disableTable(tableName);

HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);

HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");

columnFamily1.setMaxVersions(3);

hTableDescriptor.addFamily(columnFamily1);

HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");

columnFamily2.setMaxVersions(3);

hTableDescriptor.addFamily(columnFamily2);

hTableDescriptor.setValue("COPROCESSOR$1", path + "|"

+ RegionObserverExample.class.getCanonicalName() + "|"

+ Coprocessor.PRIORITY_USER);

admin.modifyTable(tableName, hTableDescriptor);

admin.enableTable(tableName);

当然如果不想要这些协处理器,也可以去掉。

hbase> alter 'users', METHOD => 'table_att_unset', NAME => 'coprocessor$1'

那么到底怎么实现自定义的协处理器呢?

实现协处理器

public class RegionObserverExample extends BaseRegionObserver {

private static final byte[] ADMIN = Bytes.toBytes("admin");

private static final byte[] COLUMN_FAMILY = Bytes.toBytes("details");

private static final byte[] COLUMN = Bytes.toBytes("Admin_det");

private static final byte[] VALUE = Bytes.toBytes("You can't see Admin details");

@Override

public void preGetOp(final ObserverContext e, final Get get, final List results)

throws IOException {

if (Bytes.equals(get.getRow(),ADMIN)) {

Cell c = CellUtil.createCell(get.getRow(),COLUMN _FAMILY, COLUMN,

System.currentTimeMillis(), (byte)4, VALUE);

results.add(c);

e.bypass();

}

List kvs = new ArrayList(results.size());

for (Cell c : results) {

kvs.add(KeyValueUtil.ensureKeyValue(c));

}

preGet(e, get, kvs);

results.clear();

results.addAll(kvs);

}

}

public class SumEndPoint extends SumService implements Coprocessor, CoprocessorService {

private RegionCoprocessorEnvironment env;

@Override

public Service getService() {

return this;

}

@Override

public void start(CoprocessorEnvironment env) throws IOException {

if (env instanceof RegionCoprocessorEnvironment) {

this.env = (RegionCoprocessorEnvironment)env;

} else {

throw new CoprocessorException("Must be loaded on a table region!");

}

}

@Override

public void stop(CoprocessorEnvironment env) throws IOException {

// do mothing

}

@Override

public void getSum(RpcController controller, SumRequest request, RpcCallback done) {

Scan scan = new Scan();

scan.addFamily(Bytes.toBytes(request.getFamily()));

scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));

SumResponse response = null;

InternalScanner scanner = null;

try {

scanner = env.getRegion().getScanner(scan);

List results = new ArrayList();

boolean hasMore = false;

long sum = 0L;

do {

hasMore = scanner.next(results);

for (Cell cell : results) {

sum = sum + Bytes.toLong(CellUtil.cloneValue(cell));

}

results.clear();

} while (hasMore);

response = SumResponse.newBuilder().setSum(sum).build();

} catch (IOException ioe) {

ResponseConverter.setControllerException(controller, ioe);

} finally {

if (scanner != null) {

try {

scanner.close();

} catch (IOException ignored) {}

}

}

done.run(response);

}

}

Configuration conf = HBaseConfiguration.create();

// Use below code for HBase version 1.x.x or above.

Connection connection = ConnectionFactory.createConnection(conf);

TableName tableName = TableName.valueOf("users");

Table table = connection.getTable(tableName);

//Use below code HBase version 0.98.xx or below.

//HConnection connection = HConnectionManager.createConnection(conf);

//HTableInterface table = connection.getTable("users");

final SumRequest request = SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross")

.build();

try {

Map<byte[], Long> results = table.CoprocessorService (SumService.class, null, null,

new Batch.Call<SumService, Long>() {

@Override

public Long call(SumService aggregate) throws IOException {

BlockingRpcCallback rpcCallback = new BlockingRpcCallback();

aggregate.getSum(null, request, rpcCallback);

SumResponse response = rpcCallback.get();

return response.hasSum() ? response.getSum() : 0L;

}

});

for (Long sum : results.values()) {

System.out.println("Sum = " + sum);

}

} catch (ServiceException e) {

e.printStackTrace();

} catch (Throwable e) {

e.printStackTrace();

}

这两个例子在不同的版本上可能不能运行,要看具体的文档进行修改才能运行。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: