一个计算HBase表中行数的MapReduce程序
2015-10-06 07:41
337 查看
public class RowCounter{
static final String NAME="rowcounter";
static class RowCounterMapper extends TableMapper<ImmutableBytesWritable ,Result>{
public static enum Counters {ROWS}
@overrideion
public void map(ImmutableBytesWritable row, Result values, Context contet ){
for(KeyVlaue value : values.list()){
if(value.getValue().length >0){
context.getCounter(Counters.ROWS).increments(1);
break
}}}}
public static Job createSubmitatableJob(Configuration conf, String [] args)
String tableName = args[0];
Job job= new Job(conf, NAME +"_"+tableName);
job.setJarByClASS(RowConter.class)
StringBuilder sb= new StringBuilder();
final int columnoffset= 1;
for(int i=columnoffset; i<args.length;i++){
if(i > columnoffset){
sb.append(" ");
}
sb.append(args[i)
}
Scan scan = new Scan();
scan.setFilter (new FirstKeyOnlyFilter());
if(sb.length>0){
for (String columnName : sb.toString().split(" "){
String [] fields = columnName.split(" :");
if(filds.length ==1 ){
scan.addFamily(Bytes.toBytes(fields[0])
else{
scan.addColmn(Bytes.toBytes(fields[0],Bytes.toBytes(fields[1]);}}
//Second argument is the table name
job.setOutputFormatClass(NullOutputFormat.class);
TableMapRecueUtils.intiTableMapperJob(tableName, scan, RowCounterMapper.class,ImmutableBytesWritable.calss,Result.class,job);
job..setNmReduceTasks(0);
return job;
}
public static void main(String[] args) {
Configuration conf = HBaseConfiguration.create();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainningArgs();
if(otherArgs.length<1){
System.err.println("Usage:RowConter talbenam[e [<clolmn1 >< column2>..]
System.exit(-1);
}
job job= createSubmittableJob(conf, otherArgs);
System.exit(job.waitForCompletion(true) :0?1);
}
page= 509 hadoop权威指南
static final String NAME="rowcounter";
static class RowCounterMapper extends TableMapper<ImmutableBytesWritable ,Result>{
public static enum Counters {ROWS}
@overrideion
public void map(ImmutableBytesWritable row, Result values, Context contet ){
for(KeyVlaue value : values.list()){
if(value.getValue().length >0){
context.getCounter(Counters.ROWS).increments(1);
break
}}}}
public static Job createSubmitatableJob(Configuration conf, String [] args)
String tableName = args[0];
Job job= new Job(conf, NAME +"_"+tableName);
job.setJarByClASS(RowConter.class)
StringBuilder sb= new StringBuilder();
final int columnoffset= 1;
for(int i=columnoffset; i<args.length;i++){
if(i > columnoffset){
sb.append(" ");
}
sb.append(args[i)
}
Scan scan = new Scan();
scan.setFilter (new FirstKeyOnlyFilter());
if(sb.length>0){
for (String columnName : sb.toString().split(" "){
String [] fields = columnName.split(" :");
if(filds.length ==1 ){
scan.addFamily(Bytes.toBytes(fields[0])
else{
scan.addColmn(Bytes.toBytes(fields[0],Bytes.toBytes(fields[1]);}}
//Second argument is the table name
job.setOutputFormatClass(NullOutputFormat.class);
TableMapRecueUtils.intiTableMapperJob(tableName, scan, RowCounterMapper.class,ImmutableBytesWritable.calss,Result.class,job);
job..setNmReduceTasks(0);
return job;
}
public static void main(String[] args) {
Configuration conf = HBaseConfiguration.create();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainningArgs();
if(otherArgs.length<1){
System.err.println("Usage:RowConter talbenam[e [<clolmn1 >< column2>..]
System.exit(-1);
}
job job= createSubmittableJob(conf, otherArgs);
System.exit(job.waitForCompletion(true) :0?1);
}
page= 509 hadoop权威指南
相关文章推荐
- 黑马程序员——基础知识——数组
- OSChina 周二乱弹 —— 早起傻一天
- notepad++使用技巧总结
- 数据结构实践——队列数组
- Java程序,求学员的平均成绩
- Java程序员常犯的10个错误
- Virtualbox 的 Kern Service 出问题了,怎么办?
- meinJava
- 微软发布Lumia系列手机是绝地反击之战
- *LeetCode-Contains Duplicate III
- 修改
- CUDA学习:进一步理解块、线程
- unity简易小地图的实现(NGUI)
- Bubble Sort
- Seletion Sort
- 如何解决HTC Desire 826无法使用谷歌服务的问题
- 浅谈vector存储内容与空间利用
- App 推荐:Spotify
- Binary Search
- 去掉UITableView的Cell选中状态