hbase0.98 endpoint实现group分组求和代码
2014-09-01 16:24
381 查看
1,制作test.proto文件:
option java_package = "com.coprocessor.group.generated";
option java_outer_classname = "GroupProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message GroupRequest {
repeated string params = 1;
}
message GroupResponse {
required string key_value = 1;
}
service GroupService {
rpc getCountByGroup(GroupRequest)
returns (GroupResponse);
}
2,利用protoc生成GroupProtos.java文件(该由工具文件自动生成,就不粘贴了):
执行命令protoc--java_out=. test.proto
3,编写CountByGroupEndpoint .java
public class CountByGroupEndpoint extends GroupProtos.GroupService implements Coprocessor, CoprocessorService{
Log log = LogFactory.getLog(CountByGroupEndpoint.class);
private RegionCoprocessorEnvironment env;
@Override
public void getCountByGroup(RpcController controller, GroupRequest request,
RpcCallback<GroupResponse> done) {
String family = request.getParams(2);
String sumByQualifier = request.getParams(0);
String groupByQualifiers = request.getParams(1);
Scan scan = new Scan();
GroupProtos.GroupResponse response = null;
InternalScanner scanner = null;
try {
scanner = env.getRegion().getScanner(scan);
List<Cell> results = new ArrayList<Cell>();
boolean hasMore = false;
byte[] lastRow = null;
long count = 0;
Map<String, Long> map = new HashMap<String, Long>();
do {
hasMore = scanner.next(results);
Result r = Result.create(results);
/*
for (Cell kv : results) {
byte[] currentRow = CellUtil.cloneRow(kv);
if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
lastRow = currentRow;
count++;
}
}
*/
byte[] sumValueBytes = r.getValue(family.getBytes(), sumByQualifier.getBytes());
String sumValue = "";
if(null != sumValueBytes){
sumValue = new String(sumValueBytes);
}
sumValue(family, groupByQualifiers, map, r, sumValue);
results.clear();
} while (hasMore);
//构成map形式的字符串,client用jackson解析成map
String str = "";
for(Entry<String, Long> entry : map.entrySet()){
System.out.println(entry.getKey()+" : "+entry.getValue());
str += "\""+entry.getKey()+"\":\""+entry.getValue()+"\",";
}
str = "{"+str.substring(0, str.length()-2)+"\"}";
log.info("===str==============="+str);
// ObjectMapper objectMapper = new ObjectMapper();
// Map readValue = objectMapper.readValue(str,Map.class);
// response = GroupProtos.GroupResponse.newBuilder().setCount(count).build();
response = GroupProtos.GroupResponse.newBuilder().setKeyValue(str).build();
} catch (IOException ioe) {
ResponseConverter.setControllerException(controller, ioe);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
done.run(response);
}
//分组求和
private void sumValue(String family, String groupByQualifiers,
Map<String, Long> map, Result r, String sumValue) {
byte[] groubValueBytes = r.getValue(family.getBytes(), groupByQualifiers.getBytes());
String groupValue = "";
if(null != groubValueBytes){
groupValue = new String(groubValueBytes);
}
long value = 0;
if(null == sumValue || "".equals(sumValue.trim())){
value = 0;
}else{
value = Integer.parseInt(sumValue);
}
if(map.containsKey(groupValue)){
long mapValue = map.get(groupValue) + value;
map.put(groupValue,mapValue);
}else{
map.put(groupValue,value);
}
}
@Override
public Service getService() {
return this;
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment)env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
}
}
4,编写client文件
public class GroupClient {
public static void main(String[] args) throws ServiceException, Throwable {
Winutils.createWinutils();
// TODO Auto-generated method stub
System.out.println("begin.....");
long begin_time = System.currentTimeMillis();
Configuration config = HBaseConfiguration.create();
// String master_ip="192.168.150.128";
// String master_ip = args[0];
// String zk_ip = args[1];
String table_name = "xun_traffic";
// config.set("hbase.zookeeper.property.clientPort", "2181");
// config.set("hbase.zookeeper.quorum", zk_ip);
// config.set("hbase.master", master_ip + ":600000");
HTable table = new HTable(config, table_name);
Map<byte[], String> results = table.coprocessorService(GroupService.class, null, null,
new Batch.Call<GroupProtos.GroupService, String>() {
public String call(GroupProtos.GroupService counter)
throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<GroupProtos.GroupResponse> rpcCallback = new BlockingRpcCallback<GroupProtos.GroupResponse>();
GroupProtos.GroupRequest request = GroupProtos.GroupRequest.getDefaultInstance();
Builder newBuilder = request.newBuilder();
List list = new ArrayList();
list.add("id");
list.add("date");
list.add("f");
newBuilder.addAllParams(list);
GroupRequest build = newBuilder.build();
byte[] info = build.toByteArray();
GroupRequest parseFrom = request.parseFrom(info);
counter.getCountByGroup(controller, parseFrom,rpcCallback);
GroupProtos.GroupResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return (response != null && response.hasKeyValue()) ? response.getKeyValue() : "";
}
});
table.close();
if (results.size() > 0) {
// System.out.println("results==="+results.values());
System.out.println("results size==="+results.size());
Map<String, Long> combin = new HashMap<String, Long>();
int i = 0;
for(Entry<byte[], String> entry : results.entrySet()){
// System.out.println("====="+new String(entry.getKey())+":"+entry.getValue());
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Long> readValue = objectMapper.readValue(entry.getValue(),Map.class);
if (i == 0) {
combin.putAll(readValue);
} else {
putAll(combin, readValue);
}
i++;
}
//打印出分组求和的结果
System.out.println("map====="+combin);
} else {
System.out.println("没有任何返回结果");
}
long end_time = System.currentTimeMillis();
System.out.println("end:" + (end_time - begin_time));
}
//因为返回是每个region上的数据,所以要将所有的region合并
public static void putAll(Map<String, Long> combin, Map<String, Long> map2) {
Set<Map.Entry<String, Long>> set2 = map2.entrySet();
for (Iterator<Map.Entry<String, Long>> it2 = set2.iterator(); it2
.hasNext();) {
Map.Entry<String, Long> entry2 = it2.next();
String key = entry2.getKey();
// Long value = entry2.getValue();
Long value = Long.parseLong(entry2.getValue()+"");
Set<Map.Entry<String, Long>> set = combin.entrySet();
boolean flag = false;
for (Iterator<Map.Entry<String, Long>> it = set.iterator(); it
.hasNext();) {
Map.Entry<String, Long> entry = it.next();
String key2 = entry.getKey();
Long value2 = Long.parseLong(entry.getValue()+"");
if (key.equals(key2)) {
value = value + value2;
combin.put(key, value);
flag = true;
}
}
if (!flag) {
combin.put(key, value);
}
}
}
}
option java_package = "com.coprocessor.group.generated";
option java_outer_classname = "GroupProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message GroupRequest {
repeated string params = 1;
}
message GroupResponse {
required string key_value = 1;
}
service GroupService {
rpc getCountByGroup(GroupRequest)
returns (GroupResponse);
}
2,利用protoc生成GroupProtos.java文件(该由工具文件自动生成,就不粘贴了):
执行命令protoc--java_out=. test.proto
3,编写CountByGroupEndpoint .java
public class CountByGroupEndpoint extends GroupProtos.GroupService implements Coprocessor, CoprocessorService{
Log log = LogFactory.getLog(CountByGroupEndpoint.class);
private RegionCoprocessorEnvironment env;
@Override
public void getCountByGroup(RpcController controller, GroupRequest request,
RpcCallback<GroupResponse> done) {
String family = request.getParams(2);
String sumByQualifier = request.getParams(0);
String groupByQualifiers = request.getParams(1);
Scan scan = new Scan();
GroupProtos.GroupResponse response = null;
InternalScanner scanner = null;
try {
scanner = env.getRegion().getScanner(scan);
List<Cell> results = new ArrayList<Cell>();
boolean hasMore = false;
byte[] lastRow = null;
long count = 0;
Map<String, Long> map = new HashMap<String, Long>();
do {
hasMore = scanner.next(results);
Result r = Result.create(results);
/*
for (Cell kv : results) {
byte[] currentRow = CellUtil.cloneRow(kv);
if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
lastRow = currentRow;
count++;
}
}
*/
byte[] sumValueBytes = r.getValue(family.getBytes(), sumByQualifier.getBytes());
String sumValue = "";
if(null != sumValueBytes){
sumValue = new String(sumValueBytes);
}
sumValue(family, groupByQualifiers, map, r, sumValue);
results.clear();
} while (hasMore);
//构成map形式的字符串,client用jackson解析成map
String str = "";
for(Entry<String, Long> entry : map.entrySet()){
System.out.println(entry.getKey()+" : "+entry.getValue());
str += "\""+entry.getKey()+"\":\""+entry.getValue()+"\",";
}
str = "{"+str.substring(0, str.length()-2)+"\"}";
log.info("===str==============="+str);
// ObjectMapper objectMapper = new ObjectMapper();
// Map readValue = objectMapper.readValue(str,Map.class);
// response = GroupProtos.GroupResponse.newBuilder().setCount(count).build();
response = GroupProtos.GroupResponse.newBuilder().setKeyValue(str).build();
} catch (IOException ioe) {
ResponseConverter.setControllerException(controller, ioe);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
done.run(response);
}
//分组求和
private void sumValue(String family, String groupByQualifiers,
Map<String, Long> map, Result r, String sumValue) {
byte[] groubValueBytes = r.getValue(family.getBytes(), groupByQualifiers.getBytes());
String groupValue = "";
if(null != groubValueBytes){
groupValue = new String(groubValueBytes);
}
long value = 0;
if(null == sumValue || "".equals(sumValue.trim())){
value = 0;
}else{
value = Integer.parseInt(sumValue);
}
if(map.containsKey(groupValue)){
long mapValue = map.get(groupValue) + value;
map.put(groupValue,mapValue);
}else{
map.put(groupValue,value);
}
}
@Override
public Service getService() {
return this;
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment)env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
}
}
4,编写client文件
public class GroupClient {
public static void main(String[] args) throws ServiceException, Throwable {
Winutils.createWinutils();
// TODO Auto-generated method stub
System.out.println("begin.....");
long begin_time = System.currentTimeMillis();
Configuration config = HBaseConfiguration.create();
// String master_ip="192.168.150.128";
// String master_ip = args[0];
// String zk_ip = args[1];
String table_name = "xun_traffic";
// config.set("hbase.zookeeper.property.clientPort", "2181");
// config.set("hbase.zookeeper.quorum", zk_ip);
// config.set("hbase.master", master_ip + ":600000");
HTable table = new HTable(config, table_name);
Map<byte[], String> results = table.coprocessorService(GroupService.class, null, null,
new Batch.Call<GroupProtos.GroupService, String>() {
public String call(GroupProtos.GroupService counter)
throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<GroupProtos.GroupResponse> rpcCallback = new BlockingRpcCallback<GroupProtos.GroupResponse>();
GroupProtos.GroupRequest request = GroupProtos.GroupRequest.getDefaultInstance();
Builder newBuilder = request.newBuilder();
List list = new ArrayList();
list.add("id");
list.add("date");
list.add("f");
newBuilder.addAllParams(list);
GroupRequest build = newBuilder.build();
byte[] info = build.toByteArray();
GroupRequest parseFrom = request.parseFrom(info);
counter.getCountByGroup(controller, parseFrom,rpcCallback);
GroupProtos.GroupResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return (response != null && response.hasKeyValue()) ? response.getKeyValue() : "";
}
});
table.close();
if (results.size() > 0) {
// System.out.println("results==="+results.values());
System.out.println("results size==="+results.size());
Map<String, Long> combin = new HashMap<String, Long>();
int i = 0;
for(Entry<byte[], String> entry : results.entrySet()){
// System.out.println("====="+new String(entry.getKey())+":"+entry.getValue());
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Long> readValue = objectMapper.readValue(entry.getValue(),Map.class);
if (i == 0) {
combin.putAll(readValue);
} else {
putAll(combin, readValue);
}
i++;
}
//打印出分组求和的结果
System.out.println("map====="+combin);
} else {
System.out.println("没有任何返回结果");
}
long end_time = System.currentTimeMillis();
System.out.println("end:" + (end_time - begin_time));
}
//因为返回是每个region上的数据,所以要将所有的region合并
public static void putAll(Map<String, Long> combin, Map<String, Long> map2) {
Set<Map.Entry<String, Long>> set2 = map2.entrySet();
for (Iterator<Map.Entry<String, Long>> it2 = set2.iterator(); it2
.hasNext();) {
Map.Entry<String, Long> entry2 = it2.next();
String key = entry2.getKey();
// Long value = entry2.getValue();
Long value = Long.parseLong(entry2.getValue()+"");
Set<Map.Entry<String, Long>> set = combin.entrySet();
boolean flag = false;
for (Iterator<Map.Entry<String, Long>> it = set.iterator(); it
.hasNext();) {
Map.Entry<String, Long> entry = it.next();
String key2 = entry.getKey();
Long value2 = Long.parseLong(entry.getValue()+"");
if (key.equals(key2)) {
value = value + value2;
combin.put(key, value);
flag = true;
}
}
if (!flag) {
combin.put(key, value);
}
}
}
}
相关文章推荐
- hbase0.98 endpoint实现group分组求和代码
- HBase 0.98增删改查java代码实现
- 我如何让echarts实现了分组(原创插件echarts.group代码分享)
- C语言实现的统计素数并求和代码分享
- 集算器实现几种常见内存分组的代码示例
- php number_format() 函数通过千位分组来格式化数字的实现代码
- php number_format() 函数通过千位分组来格式化数字的实现代码
- linq对datatable group分组并求和
- SQL Server中Group分组获取Top N方法实现
- 用纯代码实现RadioGroup,并且实现滚动条
- 微软BI 之SSRS 系列 - 使用分组 Group 属性实现基于父子递归关系的汇总报表
- Java 代码实现分组
- asp中实现随机分组程序的代码
- MongoVUE下实现MongoDB的Group分组查询
- [MSSQL]SQL中Group分组获取Top N方法实现可首选row_number
- 解决问题之道--使用Java实现数据的分组求和
- YbSoftwareFactory 代码生成插件【二】:二次开发之 IPlugInGroupRepository 接口的实现
- 用纯代码实现RadioGroup,并且实现滚动条
- MongoVUE下实现MongoDB的Group分组查询
- MongoDB学习笔记~管道中的分组实现group+distinct