Hbase Coprocessor导引笔记

Coprocessors can be loaded globally on all tables and regions hosted by the region server, these are known as system coprocessors; or the administrator can specify which coprocessors should be loaded on all regions for a table on a per-table basis, these are
known as table coprocessors(可以全局可以per Table).

(两种实现)One is the observer, which are like triggers in conventional databases(类数据库触发器), and the other is the endpoint, dynamic RPC endpoints that resemble stored procedures(使用rpc的类存储过程).


The idea behind observers is that we can insert user code by overriding upcall methods provided by the coprocessor framework. The callback functions are executed from core HBase code when certain events occur. The coprocessor framework handles all of the details
of invoking callbacks during various base HBase activities; the coprocessor need only insert the desired additional or alternate functionality(你只需要插入功能处理部分的代码).

More than one observer can be loaded at one place -- region, master, or WAL -- to extend functionality. They are chained to execute sequentially by order of assigned priorities. There is nothing preventing a coprocessor implementor from communicating internally
between the contexts of his or her installed observers, providing comprehensive coverage of HBase functions(允许多个Obersver串联).

As we mentioned above, various events cause observer methods to be invoked on loaded observers. The set of events and method signatures are presented in the HBase API(直接提供的API,即event触发的函数), beginning with HBase version 0.92. Please be aware that the API could
be changed in future, due to the HBase Client API changes, or possibly other reasons. We’ve tried to stablize the API before 0.92 release but there is no guarantee.)

The RegionObserver interface provides callbacks for:
preOpen, postOpen: Called before and after the region is reported as online to the master.
preFlush, postFlush: Called before and after the memstore is flushed into a new store file.
preGet, postGet: Called before and after a client makes a Get request.
preExists, postExists: Called before and after the client tests for existence using a Get.
prePut and postPut: Called before and after the client stores a value.
preDelete and postDelete: Called before and after the client deletes a value.

Please refer to HBase 0.92 javadoc to get the whole list of declared methods.

We provide a convenient abstract class BaseRegionObserver(抽象实现), which implements all RegionObserver methods with default behaviors, so you can focus on what events you have interest in, without having to be concerned about process upcalls for all of them.


1234567891011121314151617181920212223package org.apache.hadoop.hbase.coprocessor;import java.util.List;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.client.Get;// Sample access-control coprocessor. It utilizes RegionObserver// and intercept preXXX() method to check user privilege for the given table// and column family.public class AccessControlCoprocessor extends BaseRegionObserver {  @Override  public void preGet(final ObserverContext<RegionCoprocessorEnvironment> c,final Get get, final List<KeyValue> result) throws IOException      throws IOException {    // check permissions..    if (!permissionGranted())  {        throw new AccessDeniedException("User is not allowed to access.");    }  }  // override prePut(), preDelete(), etc.}


As mentioned previously, observers can be thought of like database triggers. Endpoints, on the other hand, are more powerful, resembling stored procedures. One can invoke an endpoint at any time from the client. The endpoint implementation will then be executed remotely at the target region or regions, and results from those executions will be returned to the client(client调用远程过程拿到结果).Endpoint is an interface for dynamic RPC extension. The endpoint implementation is installed on the server side and can then be invoked with HBase RPC. The client library provides convenience methods for invoking such dynamic interfaces.In order to build and use your own endpoint, you need to:Have a new protocol interface which extends CoprocessorProtocol(扩展CoprocessorProtocol接口).
Implement the Endpoint interface. The implementation will be loaded into and executed from the region context(实现Endpoint接口,实现会在region server环境上执行).
Extend the abstract class BaseEndpointCoprocessor. This convenience class hides some internal details that the implementer need not necessary be concerned about, such as coprocessor framework class loading(扩展BaseEndpointCoprocessor类).
On the client side, the Endpoint can be invoked by two new HBase client APIs(客户端API调用Endpoint接口): 1)Executing against a single region(面向一个region):
1HTableInterface.coprocessorProxy(Class<T> protocol, byte[] row)

                                    2)Executing over a range of regions(面向多个region):
1HTableInterface.coprocessorExec(Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T,R> callable)Note that the HBase client has the responsibility for dispatching parallel endpoint invocations to the target regions, and for collecting the returned results to present to the application code(这里,hbase负责了分发给目标region多个并行请求,收集结果返回给应用代码).This is like a lightweight MapReduce job: The “map” is the endpoint execution performed in the region server on every target region(类似于MapReduce的job,map是面向目标region数据,运行在region server上的endpoint存储过程), and the “reduce” is the final aggregation at the client(reduce是客户端最后做的聚合). Meanwhile, the coprocessor framework on the server side and in the client library is like the MapReduce framework, moving tedious distributed systems programming details behind a clean API, so the programmer can focus on the application. endpoint程序:1234567891011121314151617181920212223242526272829303132333435// A sample protocol for performing aggregation at regions.public static interface ColumnAggregationProtocolextends CoprocessorProtocol {  // Perform aggregation for a given column at the region. The aggregation  // will include all the rows inside the region. It can be extended to  // allow passing start and end rows for a fine-grained aggregation.  public long sum(byte[] family, byte[] qualifier) throwsIOException;}// Aggregation implementation at a region.public static class ColumnAggregationEndpoint extends BaseEndpointCoprocessorimplements ColumnAggregationProtocol {  @Override  public long sum(byte[] family, byte[] qualifier)  throws IOException {    // aggregate at each region    Scan scan = new Scan();    scan.addColumn(family, qualifier);    long sumResult = 0;    InternalScanner scanner = getEnvironment().getRegion().getScanner(scan);    try {      List<KeyValue> curVals = new ArrayList<KeyValue>();      boolean hasMore = false;      do {    curVals.clear();    hasMore = scanner.next(curVals);    KeyValue kv = curVals.get(0);    sumResult += Bytes.toLong(kv.getValue());      } while (hasMore);    } finally {        scanner.close();    }    return sumResult;  }}


Client invocations are performed through new methods on HTableInterface(客户端调用):
123456789public <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol, Row row);public <T extends CoprocessorProtocol, R> void coprocessorExec(    Class<T> protocol, List<? extends Row> rows,    BatchCall<T,R> callable, BatchCallback<R> callback);public <T extends CoprocessorProtocol, R> void coprocessorExec(    Class<T> protocol, RowRange range,    BatchCall<T,R> callable, BatchCallback<R> callback); Here is the client side example of calling ColumnAggregationEndpoint(调用远程过程ColumnAggregationProtocol.sum(...),注意这里的coprocessorExec会被hbase分发到多个region上去完成,底层是hbase负责的,应用面对的是一个简单的api):1234567891011121314151617HTableInterface table = new HTable(util.getConfiguration(), TEST_TABLE);Scan scan;Map<byte[], Long> results;// scan: for all regionsscan = new Scan();results = table.coprocessorExec(ColumnAggregationProtocol.class, scan,    new BatchCall<ColumnAggregationProtocol,Long>() {      public Integer call(ColumnAggregationProtocol instance)throws IOException{        return instance.sum(TEST_FAMILY, TEST_QUALIFIER);      }    });long sumResult = 0;long expectedResult = 0;for (Map.Entry<byte[], Long> e : results.entrySet()) {  sumResult += e.getValue();}

The above example is actually a simplified version of HBASE-1512. You can refer to that JIRA or the HBase source code of org.apache.hadoop.hbase.coprocessor.AggregateImplementation for more detail.

Below is a visualization of dynamic RPC invocation for this example. The application code client side performs a batch call. This initiates parallel RPC invocations of the registered dynamic protocol on every target table region. The results of those invocations
are returned as they become available. The client library manages this parallel communication on behalf of the application, messy details such as dealing with retries and errors(底层包含了路由,重试,错误处理), until all results are returned (or in the event of an unrecoverable
error). Then the client library rolls up the responses into a Map and hands it over to the application. If an unrecoverable error occurs, then an exception will be thrown for the application code to catch and take action.

Coprocessor Management


Build Your Own Coprocessor

We now assume you have your coprocessor code ready, compiled and packaged as a jar file. You will see how coprocessor framework can be configured to load the coprocessor in the following sections(直讲配置部署过程).

(We should have a template coprocessor that helps users quickly start to develop. Currently there are some built-in coprocessors that can serve as examples and a starting point for implementation of a new coprocessor. However they are scattered over the code
base. As discussed in HBASE-5273, there
will be some coprocessors samples provided under src/example/coprocessor of the HBase source code. )(HBase自身提供了协处理器的example代码)

Coprocessor Deployment

Currently we provide two options for deploying coprocessor extensions: load from configuration(从配置加载,在启动时完成), which happens when the master or region servers start up; or load from table attribute, dynamic loading when the table is (re)opened(从table属性中加载,在table重新打开时完成).
Because most users will set table attributes by way of the ‘alter’ command of the HBase shell(alter命令load from shell), let’s call this load from shell.

方式一:Load from Configuration

When a region is opened, the framework tries to read coprocessor class names supplied as the configuration entries:
hbase.coprocessor.region.classes: for RegionObservers and Endpoints
hbase.coprocessor.master.classes: for MasterObservers
hbase.coprocessor.wal.classes: for WALObservers


Hers is an example of the hbase-site.xml where one RegionObserver is configured for all the HBase tables:


1234<property>    <name>hbase.coprocessor.region.classes</name>    <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value> </property> 
If there are multiple classes specified for loading, the class names must be comma separated(多个类,逗号分隔). Then, the framework will try to load all the configured classes using the default class loader. This means the jar file must reside on the server side HBase classpath(因为是用默认的类加载器加载的,因此jar文件必须存放在HBase的classpath中).If loaded in this manner, the coprocessors will be active on all regions of all tables. This is the system coprocessor as earlier introduced(能处理所有的region,所有的table,所以叫做系统协处理器). The first listed coprocessors will be assigned the priority Coprocessor.Priority.SYSTEM. Each subsequent coprocessor in the list will have its priority value incremented by one (which reduces its priority, priorities have the natural sort order of Integers)(有优先级).We have not really discussed priority, but it should be reasonably clear how the priority given to a coprocessor affects how it integrates with other coprocessors. When calling out to registered observers, the framework executes their callbacks methods in the sorted order of their priority. Ties are broken arbitrarily(observer被hbase注册,并且根据它们的优先级被callback).方式二:Load from ShellCoprocessors can also be configured to load on a per table basis, via a shell command ``alter’’ + ``table_att''(这种方法observer只能访问在shell中指定的table).123456789101112131415161718192021hbase(main):005:0>  alter 't1', METHOD => 'table_att',   'coprocessor'=>'hdfs:///foo.jar|com.foo.FooRegionObserver|1001|arg1=1,arg2=2'Updating all regions with the new schema...1/1 regions updated.Done.0 row(s) in 1.0730 secondshbase(main):006:0> describe 't1'DESCRIPTION                                                        ENABLED                              {NAME => 't1', coprocessor$1 => 'hdfs:///foo.jar|com.foo.FooRegio false                                nObserver|1001|arg1=1,arg2=2', FAMILIES => [{NAME => 'c1', DATA_B                                      LOCK_ENCODING => 'NONE', BLOOMFILTER => 'NONE', REPLICATION_SCOPE                                       => '0', VERSIONS => '3', COMPRESSION => 'NONE', MIN_VERSIONS =>                                       '0', TTL => '2147483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZ                                      E => '65536', IN_MEMORY => 'false', ENCODE_ON_DISK => 'true', BLO                                      CKCACHE => 'true'}, {NAME => 'f1', DATA_BLOCK_ENCODING => 'NONE',                                       BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', VERSIONS => '3'                                      , COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '2147483647'                                      , KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY                                       => 'false', ENCODE_ON_DISK => 'true', BLOCKCACHE => 'true'}]}                                         1 row(s) in 0.0190 seconds

The coprocessor framework will try to read the class information from the coprocessor table attribute value. The value contains four pieces of information which are separated by ``|’’(hbase从table的属性中读取observer类信息,四个piece用|分割):
File path: The jar file containing the coprocessor implementation must located somewhere where all region servers can read it. The file could be copied somewhere onto the local disk of all region servers, but we recommended storing
the file into HDFS instead(将jar包放在HDFS中,指定它的path). If no file path is given, the framework will attempt to load the class from the server classpath using the default class loader(最后也能从server的classpath读取).
Class name: The full class name of the coprocessor.(类名)
Priority: An integer. The framework will determine the execution sequence of all configured observers registered at the same hook using priorities. This field can be left blank. In that case the framework will assign a default
priority value.(优先级需要指定)
Arguments: This field is passed to the coprocessor implementation.(参数)

You can also remove a loaded coprocessor at shell, by ``alter'' + ``table_att_unset'' command:
1234567891011121314151617181920hbase(main):007:0> alter 't1', METHOD => 'table_att_unset', hbase(main):008:0*   NAME => 'coprocessor$1'Updating all regions with the new schema...1/1 regions updated.Done.0 row(s) in 1.1130 secondshbase(main):009:0> describe 't1'DESCRIPTION                                                        ENABLED                              {NAME => 't1', FAMILIES => [{NAME => 'c1', DATA_BLOCK_ENCODING => false                                 'NONE', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', VERSION                                      S => '3', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '214                                      7483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN                                      _MEMORY => 'false', ENCODE_ON_DISK => 'true', BLOCKCACHE => 'true                                      '}, {NAME => 'f1', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER =>                                       'NONE', REPLICATION_SCOPE => '0', VERSIONS => '3', COMPRESSION =>                                       'NONE', MIN_VERSIONS => '0', TTL => '2147483647', KEEP_DELETED_C                                      ELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', ENCO                                      DE_ON_DISK => 'true', BLOCKCACHE => 'true'}]}                                                         1 row(s) in 0.0180 seconds

Current Status

There are several JIRAs opened for coprocessor development. HBASE-2000 functioned as the umbrella for coprocessor development. HBASE-2001 covered coprocessor framework development. HBASE-2002 covered RPC extensions for endpoints. Code resulting from work on
these issues was committed to HBase trunk in 2010, and are available beginning with the 0.92.0 release.

Future Work

Parallel Computation Framework

By way of endpoints, we have a new dynamic way to inject user code into the processing of actions on individual table regions, and with the corresponding client side support we can interrogate them all in parallel and return results to the client in a flexible
manner(rpc实现的endpoint,灵活而且并行). This is immediately useful for building batch data processing and aggregation on top of HBase(从而支持了hbase的). However you need to understand some internal HBase detail to develop such applications.

Various JIRAs have been opened that consider exposing additional framework for parallel computation that can provide a convenient but powerful higher level of abstraction. Options under consideration include MapReduce APIs similar to those provided by Hadoop;
support for scriptlets, i.e. Ruby script fragments sent server side to perform work; or something akin or directly supporting the Cascading framework (http://cascading.org) on the server for working with data flows more abstractly.

However, as far as I know, none of them is under construction right now.

External Coprocessor Host (HBASE-4047)

Where HBase coprocessors deviate substantially from the design of Google's BigTable coprocessors is we have reimagined them as a framework for internal extension. In contrast, BigTable coprocessors run as separate processes colocated with tablet servers. The
essential trade off is between performance, flexibility and possibility; and the ability to control and enforce resource usage.

We are considering developing a coprocessor that is a generic host for another coprocessor. The host installs in-process into the master or region servers, but the user code will be loaded into a forked child process. An eventing model and umbilical protocol
over a bidirectional pipe between the parent and child will provide the user code loaded into the child the same semantics as if it were loaded internally to the parent. However, we immediately isolate the user code from HBase internals, and eventually expect
to use available resource management capabilities on the platform to further limit child resource consumption as desired by system administrators or the application designer.

Code Weaving (HBASE-2058)

Right now there are no constraints on what actions a coprocessor can take. We do not protect against malicious actions or faults accidentally introduced by a coprocessor. As an alternative to external coprocessor hosts we could introduce code weaving and code
transformation at coprocessor load time. We would weave in a configurable set of policies for constraining the actions a coprocessor can take. For example:
Improve fault isolation and system integrity protections via various static checks
Wrap heap allocations to enforce limits
Monitor CPU time via instrumentation injected into method and loop headers
Reject static or dynamic method invocations to APIs considered unsafe

And More...

Coprocessors framework provides possibilities to extend HBase. There are some more identified applications which can be built on top of coprocessors:
HBase isolation and allocation (HBase-4120)
Secondary indexing: http://wiki.apache.org/hadoop/Hbase/SecondaryIndexing Search in HBase (HBASE-3529)
HBase table, region access statistic.
and more ...


