在Hbase Endpoint Coprocessor中使用coprocessorProxy操作例子与问题解析
2015-06-29 23:03
465 查看
转载请注明原创为Michael Zhang:/article/8890125.html
一、先说注意事项吧:
1、Coprocessor启动有三种方式:配置文件、shell和程序中指定,我使用的是程序指定:
上段代码中的addCoprocessor就是指定该表启动coprocessor操作。但前提是必须重启HBase才能把jar包载入进来。
2、如果客户端连接后出现如下问题:No matching handler **** for protocol in *** region,说明jar包还没有载入到HBaes中,确保HBase已经重启,另外检查代码中addCoprocessor("ict.wde.test.RowCountServer");的类名“RowCountServer”是否写正确了
二、说下步骤
2.1编写服务端代码:
1)接口类(固定格式)
上述两个类打包jar后放入hbase的lib目录下
2.2客户端代码
一、先说注意事项吧:
1、Coprocessor启动有三种方式:配置文件、shell和程序中指定,我使用的是程序指定:
static { EP_TABLE_DISCRIPTOR = new HTableDescriptor("epTest"); HColumnDescriptor family = new HColumnDescriptor("_tis".getBytes()); family.setInMemory(true); family.setMaxVersions(1); EP_TABLE_DISCRIPTOR.addFamily(family); try { EP_TABLE_DISCRIPTOR.addCoprocessor("ict.wde.test.RowCountServer"); } catch (IOException ioe) { }
上段代码中的addCoprocessor就是指定该表启动coprocessor操作。但前提是必须重启HBase才能把jar包载入进来。
2、如果客户端连接后出现如下问题:No matching handler **** for protocol in *** region,说明jar包还没有载入到HBaes中,确保HBase已经重启,另外检查代码中addCoprocessor("ict.wde.test.RowCountServer");的类名“RowCountServer”是否写正确了
二、说下步骤
2.1编写服务端代码:
1)接口类(固定格式)
package ict.wde.test; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import java.io.File; import java.io.IOException; /** * Created by Michael on 2015/6/22. */ public interface RowCountProtocol extends Coprocessor, CoprocessorProtocol { public long getRowCount() throws IOException; public long getRowCount(Filter filter) throws IOException; public String getStr() throws IOException; //public long getKeyValue() throws IOException; }2)真正起作用的类
package ict.wde.test; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.ipc.ProtocolSignature; import java.io.IOException; /** * Created by Michael on 2015/6/27. */ public class RowCountServer implements RowCountProtocol { @Override public void start(CoprocessorEnvironment env) throws IOException { } @Override public void stop(CoprocessorEnvironment env) throws IOException { } @Override public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException { return new ProtocolSignature(3, null); } @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { return 3; } @Override public long getRowCount() throws IOException { return this.getRowCount(new FirstKeyOnlyFilter()); } @Override public long getRowCount(Filter filter) throws IOException { return this.getRowCount(filter, false); } @Override public String getStr() throws IOException { String name = "Hello Doctor Michael Zhang, again!"; return name; } // @Override // public long getKeyValueCount() { // return 0; // } public long getRowCount(Filter filter, boolean countKeyValue) throws IOException { Scan scan = new Scan(); scan.setMaxVersions(1); if (filter != null) { scan.setFilter(filter); } return 1; } }
上述两个类打包jar后放入hbase的lib目录下
2.2客户端代码
import ict.wde.test.RowCountProtocol; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.filter.Filter; import java.io.IOException; /** * Created by Michael on 2015/6/30. */ public class EndpointTestClient { private final HTableInterface table; private final Configuration conf; private final RowCountProtocol server; private static final HTableDescriptor EP_TABLE_DISCRIPTOR; static { EP_TABLE_DISCRIPTOR = new HTableDescriptor("epTest"); HColumnDescriptor family = new HColumnDescriptor("_tis".getBytes()); family.setInMemory(true); family.setMaxVersions(1); EP_TABLE_DISCRIPTOR.addFamily(family); try { EP_TABLE_DISCRIPTOR.addCoprocessor("ict.wde.test.RowCountServer"); } catch (IOException ioe) { } } public EndpointTestClient(Configuration config) throws IOException { conf = config; table = initTidTable(); server = table.coprocessorProxy(RowCountProtocol.class, "0".getBytes()); } private HTableInterface initTidTable() throws IOException { HBaseAdmin admin = new HBaseAdmin(conf); if (!admin.tableExists("epTest")) { admin.createTable(EP_TABLE_DISCRIPTOR); } admin.close(); return new HTable(conf, "epTest"); } public String getStr() throws IOException { return server.getStr(); } }启动类:
import org.apache.hadoop.conf.Configuration; import java.io.IOException; /** * Created by Michael on 2015/6/22. */ public class EndpointExample { // private final HTableInterface table; // private static final Configuration conf; // private static final HTableDescriptor EP_TABLE_DISCRIPTOR; // // static { // conf = new Configuration(); // conf.set("hbase.zookeeper.quorum", "ccf04:2181"); // // EP_TABLE_DISCRIPTOR = new HTableDescriptor("epTest"); // HColumnDescriptor family = new HColumnDescriptor("_tis".getBytes()); // family.setInMemory(true); // family.setMaxVersions(1); // EP_TABLE_DISCRIPTOR.addFamily(family); // try { // EP_TABLE_DISCRIPTOR.addCoprocessor("ict.wde.test.RowCountServer"); // } catch (IOException ioe) { // // } // // table = initTidTable(); // } // // private HTableInterface initTidTable() throws IOException { // HBaseAdmin admin = new HBaseAdmin(conf); // if (!admin.tableExists("epTest")) { // admin.createTable(EP_TABLE_DISCRIPTOR); // } // admin.close(); // return new HTable(conf, "epTest"); // } public static void main(String[] agrs) throws IOException { Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "ccf04:2181"); EndpointTestClient client = new EndpointTestClient(conf); String name = client.getStr(); System.out.println(name); } }
相关文章推荐
- windows动态链接库[DLL]与Linux共享库[SO]技术浅析
- vsphere6.0的vc架构部分更新(1)
- shell编程注意点
- shell编程注意点
- Linux常见问题(CentOS)
- Linux bashrc与profile的区别
- 在CentOS上安装MySQL
- RHEL7 - LINUX中的UID
- Windows和Linux下使用C语言写库和运用
- so库与系统硬件架构不符合异常
- OOP
- Linux中基本I/O 重定向的符号及其用法和文件标识符
- Docker的save和export命令的区别
- Hadoop 推测式任务 Hadoop Speculative Task
- 如何自定义菜单(点击菜单键,弹出一个popwindow)
- Linux文件,文件描述符以及dup()和dup2()
- 为什么要fork两次来避免产生僵尸进程
- opencv 数字图象处理 镜像变换 cvFlip()
- Centos6.6 安装Subversion服务
- 【原创】Linux opensource-src-4.3.2.tar.gz的安装。