您的位置:首页 > 其它

一个计算HBase表中行数的MapReduce程序

2015-10-06 07:41 337 查看
public class RowCounter{

static final String NAME="rowcounter";

static class RowCounterMapper extends TableMapper<ImmutableBytesWritable ,Result>{

public static enum Counters {ROWS}

@overrideion

public void map(ImmutableBytesWritable row, Result values, Context contet ){

for(KeyVlaue value : values.list()){

if(value.getValue().length >0){

context.getCounter(Counters.ROWS).increments(1);

break

}}}}

public static Job createSubmitatableJob(Configuration conf, String [] args)

String tableName = args[0];

Job job= new Job(conf, NAME +"_"+tableName);

job.setJarByClASS(RowConter.class)

StringBuilder sb= new StringBuilder();

final int columnoffset= 1;

for(int i=columnoffset; i<args.length;i++){

if(i > columnoffset){

sb.append(" ");

}

sb.append(args[i)

}

Scan scan = new Scan();

scan.setFilter (new FirstKeyOnlyFilter());

if(sb.length>0){

for (String columnName : sb.toString().split(" "){

String [] fields = columnName.split(" :");

if(filds.length ==1 ){

scan.addFamily(Bytes.toBytes(fields[0])

else{

scan.addColmn(Bytes.toBytes(fields[0],Bytes.toBytes(fields[1]);}}

//Second argument is the table name

job.setOutputFormatClass(NullOutputFormat.class);

TableMapRecueUtils.intiTableMapperJob(tableName, scan, RowCounterMapper.class,ImmutableBytesWritable.calss,Result.class,job);

job..setNmReduceTasks(0);

return job;

}

public static void main(String[] args) {

Configuration conf = HBaseConfiguration.create();

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainningArgs();

if(otherArgs.length<1){

System.err.println("Usage:RowConter talbenam[e [<clolmn1 >< column2>..]

System.exit(-1);



job job= createSubmittableJob(conf, otherArgs);

System.exit(job.waitForCompletion(true) :0?1);

}

page= 509 hadoop权威指南
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: