您的位置:首页 > 运维架构

在Hbase Endpoint Coprocessor中使用coprocessorProxy操作例子与问题解析

2015-06-29 23:03 465 查看
转载请注明原创为Michael Zhang:/article/8890125.html

一、先说注意事项吧:

1、Coprocessor启动有三种方式:配置文件、shell和程序中指定,我使用的是程序指定:

static {
EP_TABLE_DISCRIPTOR = new HTableDescriptor("epTest");
HColumnDescriptor family = new HColumnDescriptor("_tis".getBytes());
family.setInMemory(true);
family.setMaxVersions(1);
EP_TABLE_DISCRIPTOR.addFamily(family);
try {
EP_TABLE_DISCRIPTOR.addCoprocessor("ict.wde.test.RowCountServer");
} catch (IOException ioe) {

}


上段代码中的addCoprocessor就是指定该表启动coprocessor操作。但前提是必须重启HBase才能把jar包载入进来。

2、如果客户端连接后出现如下问题:No matching handler **** for protocol in *** region,说明jar包还没有载入到HBaes中,确保HBase已经重启,另外检查代码中addCoprocessor("ict.wde.test.RowCountServer");的类名“RowCountServer”是否写正确了

二、说下步骤

2.1编写服务端代码:

1)接口类(固定格式)

package ict.wde.test;

import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;

import java.io.File;
import java.io.IOException;

/**
* Created by Michael on 2015/6/22.
*/
public interface RowCountProtocol extends Coprocessor, CoprocessorProtocol {

public long getRowCount() throws IOException;

public long getRowCount(Filter filter) throws IOException;

public String getStr() throws IOException;

//public long getKeyValue() throws IOException;
}
2)真正起作用的类

package ict.wde.test;

import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;

import java.io.IOException;

/**
* Created by Michael on 2015/6/27.
*/
public class RowCountServer implements RowCountProtocol {

@Override
public void start(CoprocessorEnvironment env) throws IOException {

}

@Override
public void stop(CoprocessorEnvironment env) throws IOException {

}

@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
return new ProtocolSignature(3, null);
}

@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return 3;
}

@Override
public long getRowCount() throws IOException {
return this.getRowCount(new FirstKeyOnlyFilter());
}

@Override
public long getRowCount(Filter filter) throws IOException {
return this.getRowCount(filter, false);
}

@Override
public String getStr() throws IOException {
String name = "Hello Doctor Michael Zhang, again!";
return name;
}

//    @Override
//    public long getKeyValueCount() {
//        return 0;
//    }

public long getRowCount(Filter filter, boolean countKeyValue) throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(1);
if (filter != null) {
scan.setFilter(filter);
}

return 1;
}

}


上述两个类打包jar后放入hbase的lib目录下

2.2客户端代码

import ict.wde.test.RowCountProtocol;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.filter.Filter;

import java.io.IOException;

/**
* Created by Michael on 2015/6/30.
*/
public class EndpointTestClient {

private final HTableInterface table;
private final Configuration conf;
private final RowCountProtocol server;

private static final HTableDescriptor EP_TABLE_DISCRIPTOR;

static {
EP_TABLE_DISCRIPTOR = new HTableDescriptor("epTest");
HColumnDescriptor family = new HColumnDescriptor("_tis".getBytes());
family.setInMemory(true);
family.setMaxVersions(1);
EP_TABLE_DISCRIPTOR.addFamily(family);
try {
EP_TABLE_DISCRIPTOR.addCoprocessor("ict.wde.test.RowCountServer");
} catch (IOException ioe) {

}
}

public EndpointTestClient(Configuration config) throws IOException {
conf = config;
table = initTidTable();
server = table.coprocessorProxy(RowCountProtocol.class, "0".getBytes());
}

private HTableInterface initTidTable() throws IOException {
HBaseAdmin admin = new HBaseAdmin(conf);
if (!admin.tableExists("epTest")) {
admin.createTable(EP_TABLE_DISCRIPTOR);
}
admin.close();
return new HTable(conf, "epTest");
}

public String getStr() throws IOException {
return server.getStr();
}
}
启动类:

import org.apache.hadoop.conf.Configuration;

import java.io.IOException;

/**
* Created by Michael on 2015/6/22.
*/
public class EndpointExample {

//    private final HTableInterface table;
//    private static final Configuration conf;
//    private static final HTableDescriptor EP_TABLE_DISCRIPTOR;
//
//    static {
//        conf = new Configuration();
//        conf.set("hbase.zookeeper.quorum", "ccf04:2181");
//
//        EP_TABLE_DISCRIPTOR = new HTableDescriptor("epTest");
//        HColumnDescriptor family = new HColumnDescriptor("_tis".getBytes());
//        family.setInMemory(true);
//        family.setMaxVersions(1);
//        EP_TABLE_DISCRIPTOR.addFamily(family);
//        try {
//            EP_TABLE_DISCRIPTOR.addCoprocessor("ict.wde.test.RowCountServer");
//        } catch (IOException ioe) {
//
//        }
//
//        table = initTidTable();
//    }
//
//    private HTableInterface initTidTable() throws IOException {
//        HBaseAdmin admin = new HBaseAdmin(conf);
//        if (!admin.tableExists("epTest")) {
//            admin.createTable(EP_TABLE_DISCRIPTOR);
//        }
//        admin.close();
//        return new HTable(conf, "epTest");
//    }

public static void main(String[] agrs) throws IOException {

Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "ccf04:2181");

EndpointTestClient client = new EndpointTestClient(conf);
String name = client.getStr();
System.out.println(name);

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: