您的位置:首页 > 编程语言

hbase0.98 endpoint实现group分组求和代码

2014-09-01 16:24 381 查看
1,制作test.proto文件:

option java_package = "com.coprocessor.group.generated";

option java_outer_classname = "GroupProtos";

option java_generic_services = true;

option java_generate_equals_and_hash = true;

option optimize_for = SPEED;

message GroupRequest {

repeated string params = 1;

}

message GroupResponse {

required string key_value = 1;

}

service GroupService {

rpc getCountByGroup(GroupRequest)

returns (GroupResponse);

}

2,利用protoc生成GroupProtos.java文件(该由工具文件自动生成,就不粘贴了):

执行命令protoc--java_out=. test.proto

3,编写CountByGroupEndpoint .java
public class CountByGroupEndpoint extends GroupProtos.GroupService implements Coprocessor, CoprocessorService{

Log log = LogFactory.getLog(CountByGroupEndpoint.class);

private RegionCoprocessorEnvironment env;

@Override

public void getCountByGroup(RpcController controller, GroupRequest request,

RpcCallback<GroupResponse> done) {

String family = request.getParams(2);

String sumByQualifier = request.getParams(0);

String groupByQualifiers = request.getParams(1);

Scan scan = new Scan();

GroupProtos.GroupResponse response = null;

InternalScanner scanner = null;

try {

scanner = env.getRegion().getScanner(scan);

List<Cell> results = new ArrayList<Cell>();

boolean hasMore = false;

byte[] lastRow = null;

long count = 0;

Map<String, Long> map = new HashMap<String, Long>();

do {

hasMore = scanner.next(results);

Result r = Result.create(results);

/*

for (Cell kv : results) {

byte[] currentRow = CellUtil.cloneRow(kv);

if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {

lastRow = currentRow;

count++;

}

}

*/

byte[] sumValueBytes = r.getValue(family.getBytes(), sumByQualifier.getBytes());

String sumValue = "";

if(null != sumValueBytes){

sumValue = new String(sumValueBytes);

}

sumValue(family, groupByQualifiers, map, r, sumValue);

results.clear();

} while (hasMore);

//构成map形式的字符串,client用jackson解析成map

String str = "";

for(Entry<String, Long> entry : map.entrySet()){

System.out.println(entry.getKey()+" : "+entry.getValue());

str += "\""+entry.getKey()+"\":\""+entry.getValue()+"\",";

}

str = "{"+str.substring(0, str.length()-2)+"\"}";

log.info("===str==============="+str);

// ObjectMapper objectMapper = new ObjectMapper();

// Map readValue = objectMapper.readValue(str,Map.class);

// response = GroupProtos.GroupResponse.newBuilder().setCount(count).build();

response = GroupProtos.GroupResponse.newBuilder().setKeyValue(str).build();

} catch (IOException ioe) {

ResponseConverter.setControllerException(controller, ioe);

} finally {

if (scanner != null) {

try {

scanner.close();

} catch (IOException ignored) {}

}

}

done.run(response);

}

//分组求和

private void sumValue(String family, String groupByQualifiers,

Map<String, Long> map, Result r, String sumValue) {

byte[] groubValueBytes = r.getValue(family.getBytes(), groupByQualifiers.getBytes());

String groupValue = "";

if(null != groubValueBytes){

groupValue = new String(groubValueBytes);

}

long value = 0;

if(null == sumValue || "".equals(sumValue.trim())){

value = 0;

}else{

value = Integer.parseInt(sumValue);

}

if(map.containsKey(groupValue)){

long mapValue = map.get(groupValue) + value;

map.put(groupValue,mapValue);

}else{

map.put(groupValue,value);

}

}

@Override

public Service getService() {

return this;

}

@Override

public void start(CoprocessorEnvironment env) throws IOException {

if (env instanceof RegionCoprocessorEnvironment) {

this.env = (RegionCoprocessorEnvironment)env;

} else {

throw new CoprocessorException("Must be loaded on a table region!");

}

}

@Override

public void stop(CoprocessorEnvironment env) throws IOException {

}

}

4,编写client文件

public class GroupClient {

public static void main(String[] args) throws ServiceException, Throwable {

Winutils.createWinutils();

// TODO Auto-generated method stub

System.out.println("begin.....");

long begin_time = System.currentTimeMillis();

Configuration config = HBaseConfiguration.create();

// String master_ip="192.168.150.128";

// String master_ip = args[0];

// String zk_ip = args[1];

String table_name = "xun_traffic";

// config.set("hbase.zookeeper.property.clientPort", "2181");

// config.set("hbase.zookeeper.quorum", zk_ip);

// config.set("hbase.master", master_ip + ":600000");

HTable table = new HTable(config, table_name);

Map<byte[], String> results = table.coprocessorService(GroupService.class, null, null,

new Batch.Call<GroupProtos.GroupService, String>() {

public String call(GroupProtos.GroupService counter)

throws IOException {

ServerRpcController controller = new ServerRpcController();

BlockingRpcCallback<GroupProtos.GroupResponse> rpcCallback = new BlockingRpcCallback<GroupProtos.GroupResponse>();

GroupProtos.GroupRequest request = GroupProtos.GroupRequest.getDefaultInstance();

Builder newBuilder = request.newBuilder();

List list = new ArrayList();

list.add("id");

list.add("date");

list.add("f");

newBuilder.addAllParams(list);

GroupRequest build = newBuilder.build();

byte[] info = build.toByteArray();

GroupRequest parseFrom = request.parseFrom(info);

counter.getCountByGroup(controller, parseFrom,rpcCallback);

GroupProtos.GroupResponse response = rpcCallback.get();

if (controller.failedOnException()) {

throw controller.getFailedOn();

}

return (response != null && response.hasKeyValue()) ? response.getKeyValue() : "";

}

});

table.close();

if (results.size() > 0) {

// System.out.println("results==="+results.values());

System.out.println("results size==="+results.size());

Map<String, Long> combin = new HashMap<String, Long>();

int i = 0;

for(Entry<byte[], String> entry : results.entrySet()){

// System.out.println("====="+new String(entry.getKey())+":"+entry.getValue());

ObjectMapper objectMapper = new ObjectMapper();

Map<String, Long> readValue = objectMapper.readValue(entry.getValue(),Map.class);

if (i == 0) {

combin.putAll(readValue);

} else {

putAll(combin, readValue);

}

i++;

}

//打印出分组求和的结果

System.out.println("map====="+combin);

} else {

System.out.println("没有任何返回结果");

}

long end_time = System.currentTimeMillis();

System.out.println("end:" + (end_time - begin_time));

}

//因为返回是每个region上的数据,所以要将所有的region合并

public static void putAll(Map<String, Long> combin, Map<String, Long> map2) {

Set<Map.Entry<String, Long>> set2 = map2.entrySet();

for (Iterator<Map.Entry<String, Long>> it2 = set2.iterator(); it2

.hasNext();) {

Map.Entry<String, Long> entry2 = it2.next();

String key = entry2.getKey();

// Long value = entry2.getValue();

Long value = Long.parseLong(entry2.getValue()+"");

Set<Map.Entry<String, Long>> set = combin.entrySet();

boolean flag = false;

for (Iterator<Map.Entry<String, Long>> it = set.iterator(); it

.hasNext();) {

Map.Entry<String, Long> entry = it.next();

String key2 = entry.getKey();

Long value2 = Long.parseLong(entry.getValue()+"");

if (key.equals(key2)) {

value = value + value2;

combin.put(key, value);

flag = true;

}

}

if (!flag) {

combin.put(key, value);

}

}

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: