您的位置:首页 > 编程语言 > Java开发

创建Hbase索引表之在eclipse上运行与直接在hadoop集群上运行的程序编写的差异

2015-03-15 15:00 295 查看
创建Hbase索引表之在eclipse上运行与直接在hadoop集群上运行的程序编写的差异

目的:主要是区分在eclipse上编写程序与直接在集群hadoop上编写程序的差异,以更好的理解hadoop的工作原理;它们的差别主要是,在eclipse上编写程序时需要配置mapreduce和配置hbase,并且需要设置输入表的信息以及输出表的信息TableOutputFormat:



创建hbase索引表的程序如下(正常字体的为直接在hadoop编写的程序,***背景的为eclipse上新增的一些配置):



import java.io.ByteArrayOutputStream;

import java.io.DataOutputStream;

import java.io.IOException;

import java.util.HashMap;





import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;

import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;

import org.apache.hadoop.hbase.mapreduce.TableInputFormat;

import org.apache.hadoop.hbase.util.Base64;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.util.GenericOptionsParser;



public class IndexBuilder {

/** the column family containing the indexed row key */

public static final byte[] INDEX_COLUMN = Bytes.toBytes("INDEX");

/** the qualifier containing the indexed row key */

public static final byte[] INDEX_QUALIFIER = Bytes.toBytes("ROW");



/**

* Internal Mapper to be run by Hadoop.

*/

public static class Map extends

Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Writable> {

private byte[] family;

private HashMap<byte[], ImmutableBytesWritable> indexes;



@Override

protected void map(ImmutableBytesWritable rowKey, Result result, Context context)

throws IOException, InterruptedException { //indexes <key,value> 对应 <列名,新的表名>

for(java.util.Map.Entry<byte[], ImmutableBytesWritable> index : indexes.entrySet()) {

byte[] qualifier = index.getKey();

ImmutableBytesWritable tableName = index.getValue(); //新的表名 即索引表名

byte[] value = result.getValue(family, qualifier); //根据列族和列名来获取 元素值

if (value != null) {

// original: row 123 attribute:phone 555-1212

// index: row 555-1212 INDEX:ROW 123

Put put = new Put(value); //以旧表的元素作为新表的行键

put.add(INDEX_COLUMN, INDEX_QUALIFIER, rowKey.get()); //原来的行键作为新表的元素值

context.write(tableName, put);

}

}

}

//setup为mapper的方法,改方法只在任务开始时执行一次

@Override

protected void setup(Context context) throws IOException,

InterruptedException {

Configuration configuration = context.getConfiguration();

String tableName = configuration.get("index.tablename");

String[] fields = configuration.getStrings("index.fields");

String familyName = configuration.get("index.familyname");

family = Bytes.toBytes(familyName);

indexes = new HashMap<byte[], ImmutableBytesWritable>();

for(String field : fields) {

// if the table is "people" and the field to index is "email", then the

// index table will be called "people-email"

indexes.put(Bytes.toBytes(field), //indexes <key,value> 对应 <列名,新的表名>

new ImmutableBytesWritable(Bytes.toBytes(tableName + "-" + field)));

}

}

}



/**

* Job configuration.

*/

public static Job configureJob(Configuration conf, String [] args)

throws IOException {

String tableName = args[0]; //获取表名

String columnFamily = args[1]; //获取列族

System.out.println("****" + tableName);

conf.set(TableInputFormat.SCAN, convertScanToString(new Scan()));

conf.set(TableInputFormat.INPUT_TABLE, tableName); //表名

conf.set("index.tablename", tableName); //表名

conf.set("index.familyname", columnFamily); //列族

String[] fields = new String[args.length - 2];

for(int i = 0; i < fields.length; i++) {

fields[i] = args[i + 2]; // 获取 索引的列名

}

conf.setStrings("index.fields", fields); // 索引的列名

//conf.set("index.familyname", "attributes");

//设置运行时的参数

Job job = new Job(conf, tableName);

job.setJarByClass(IndexBuilder.class);

job.setMapperClass(Map.class);

job.setNumReduceTasks(0);

job.setInputFormatClass(TableInputFormat.class);

job.setOutputFormatClass(TableOutputFormat.class);

return job;

}



public static void main(String[] args) throws Exception {

Configuration conf = HBaseConfiguration.create();

conf.set("hbase.zookeeper.quorum", "172.16.2.34,172.16.2.54,172.16.2.57");

//***背景的为eclipse上所特有

//配置mapreduce

conf.set("hbase.master", "172.16.2.42:60000");

conf.set("fs.default.name", "hdfs://172.16.2.42:9000/");

conf.set("hadoop.job.user", "hadoop");

conf.set("mapred.job.tracker", "172.16.2.42:9001");

//配置hbase

conf.set( "mapred.jar","IndexBuilder.jar"); //打包所需的类到hadoop上

conf.set(TableOutputFormat.OUTPUT_TABLE, "heroes-name-email" ); //设置输出的表名

//conf.set(TableOutputFormat.OUTPUT_TABLE, "heroes-name"); //设置输出的表名

//conf.set(TableOutputFormat.OUTPUT_TABLE, "heroes-email"); //设置输出的表名

String[] ars=new String[]{"heroes","info","name","email"};

//String[] ars=new String[]{"heroes","info","name"};

//String[] ars=new String[]{"heroes","info","email"};

//设置输入的参数 表名 列族 索引列名 索引列名

String[] otherArgs = new GenericOptionsParser(conf, ars).getRemainingArgs();

//这两句eclipse选上句,直接hadoop运行选下句

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

if(otherArgs.length < 3) {

System.err.println("Only " + otherArgs.length + " arguments supplied, required: 3");

System.err.println("Usage: IndexBuilder <TABLE_NAME> <COLUMN_FAMILY> <ATTR> [<ATTR> ...]");

System.exit(-1);

}

Job job = configureJob(conf, otherArgs);

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

private static String convertScanToString(Scan scan) throws IOException {

ByteArrayOutputStream out = new ByteArrayOutputStream();

DataOutputStream dos = new DataOutputStream(out);

scan.write(dos);

return Base64.encodeBytes(out.toByteArray());

}

}





然后分别添加hbase的原表heroes 和 新表(索引表):

添加heroes后如下:

hbase(main):001:0> scan 'heroes'

ROW COLUMN+CELL

1 column=info:email, timestamp=1426341753586, value=kxt@123.com

1 column=info:name, timestamp=1426341638677, value=kxt

2 column=info:email, timestamp=1426341769212, value=susu@123.com

2 column=info:name, timestamp=1426341665835, value=susu

3 column=info:email, timestamp=1426341783242, value=ddd@123.com

3 column=info:name, timestamp=1426341686374, value=ddd

4 column=info:email, timestamp=1426341800260, value=shaonian@123.com

4 column=info:name, timestamp=1426341700472, value=shaonian

5 column=info:email, timestamp=1426341820389, value=chundiao@123.com

5 column=info:name, timestamp=1426341719619, value=chundiao

5 row(s) in 0.7650 seconds



索引表是分别以列名name和email为索引建立的,表名取为 heroes-name 和 heroes-email,列族为INDEX,(行键为索引的列名,值为原表的行)--程序里实现

分别创建:

create ‘heroes-name’,’INDEX’

create ‘heroes-email’,’INDEX’

create ‘heroes-name-email’,’INDEX’





直接在hadoop上运行程序:

[hadoop@Masterpc test]$ hadoop jar IndexBuilder.jar IndexBuilder heroes info name email

****heroes

15/03/15 14:17:55 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT

15/03/15 14:17:55 INFO zookeeper.ZooKeeper: Client environment:host.name=Masterpc.hadoop

15/03/15 14:17:55 INFO zookeeper.ZooKeeper: Client environment:java.version=1.8.0_25

15/03/15 14:17:55 INFO zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation

。。。。。。

15/03/15 14:17:55 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/tmp

15/03/15 14:17:55 INFO zookeeper.ZooKeeper: Client environment:java.compiler=<NA>

15/03/15 14:17:55 INFO zookeeper.ZooKeeper: Client environment:os.name=Linux

15/03/15 14:17:55 INFO zookeeper.ZooKeeper: Client environment:os.arch=i386

15/03/15 14:17:55 INFO zookeeper.ZooKeeper: Client

。。。。。。

15/03/15 14:17:56 INFO mapred.JobClient: Running job: job_201503122348_0016

15/03/15 14:17:57 INFO mapred.JobClient: map 0% reduce 0%

15/03/15 14:18:11 INFO mapred.JobClient: map 100% reduce 0%

15/03/15 14:18:16 INFO mapred.JobClient: Job complete: job_201503122348_0016

15/03/15 14:18:16 INFO mapred.JobClient: Counters: 28

。。。。。。

15/03/15 14:18:16 INFO mapred.JobClient: HBase Counters

15/03/15 14:18:16 INFO mapred.JobClient: NUM_SCANNER_RESTARTS=0

。。。。。。

15/03/15 14:18:16 INFO mapred.JobClient: Rack-local map tasks=1

15/03/15 14:18:16 INFO mapred.JobClient: File Output Format Counters

15/03/15 14:18:16 INFO mapred.JobClient: Bytes Written=0





接着查看输出索引表:

hbase(main):002:0> scan 'heroes-name'

ROW COLUMN+CELL

chundiao column=INDEX:ROW, timestamp=1426400285365, value=5

ddd column=INDEX:ROW, timestamp=1426400285365, value=3

kxt column=INDEX:ROW, timestamp=1426400285365, value=1

shaonian column=INDEX:ROW, timestamp=1426400285365, value=4

susu column=INDEX:ROW, timestamp=1426400285365, value=2

5 row(s) in 0.0870 seconds



hbase(main):011:0> scan 'heroes-email'

ROW COLUMN+CELL

chundiao@123.com column=INDEX:ROW, timestamp=1426401159664, value=5

ddd@123.com column=INDEX:ROW, timestamp=1426401159664, value=3

kxt@123.com column=INDEX:ROW, timestamp=1426401159664, value=1

shaonian@123.com column=INDEX:ROW, timestamp=1426401159664, value=4

susu@123.com column=INDEX:ROW, timestamp=1426401159664, value=2



在eclipse上运行结果如下:注意必须要设置输入的表信息以及输出表名等(见***背景程序段), 采用name和email输出到同一个表heroes-name-email中;

conf.set(TableOutputFormat.OUTPUT_TABLE, "heroes-name-email" ); //设置输出的表名

//conf.set(TableOutputFormat.OUTPUT_TABLE, "heroes-name" ); //设置输出的表名

//conf.set(TableOutputFormat.OUTPUT_TABLE, "heroes-email"); //设置输出的表名

String[] ars=new String[]{"heroes","info","name","email"};

/设置输入的参数 表名 列族 索引列名 索引列名

//String[] ars=new String[]{"heroes","info","name"};

//设置输入的参数 表名 列族 索引列名 索引列名

//String[] ars=new String[]{"heroes","info","email"};

//设置输入的参数 表名 列族 索引列名 索引列名

****heroes

15/03/15 14:53:59 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT

15/03/15 14:53:59 INFO zookeeper.ZooKeeper: Client environment:host.name=wd-PC

15/03/15 14:53:59 INFO zookeeper.ZooKeeper: Client environment:java.version=1.8.0_25

。。。。。。。

15/03/15 14:54:00 INFO mapreduce.TableOutputFormat: Created table instance for heroes-name-email

15/03/15 14:54:00 ERROR mapreduce.TableInputFormatBase: Cannot resolve the host name for /172.16.2.34 because of javax.naming.NameNotFoundException: DNS name not found [response code 3]; remaining name '34.2.16.172.in-addr.arpa'

15/03/15 14:54:00 INFO mapred.JobClient: Running job: job_201503122348_0024

15/03/15 14:54:01 INFO mapred.JobClient: map 0% reduce 0%

15/03/15 14:54:15 INFO mapred.JobClient: map 100% reduce 0%

15/03/15 14:54:20 INFO mapred.JobClient: Job complete: job_201503122348_0024

15/03/15 14:54:20 INFO mapred.JobClient: Counters: 28

15/03/15 14:54:20 INFO mapred.JobClient: Map-Reduce Framework

15/03/15 14:54:20 INFO mapred.JobClient: Spilled Records=0

15/03/15 14:54:20 INFO mapred.JobClient: Virtual memory (bytes) snapshot=535232512

15/03/15 14:54:20 INFO mapred.JobClient: Map input records=5

15/03/15 14:54:20 INFO mapred.JobClient: SPLIT_RAW_BYTES=72

15/03/15 14:54:20 INFO mapred.JobClient: Map output records=10

15/03/15 14:54:20 INFO mapred.JobClient: Physical memory (bytes) snapshot=70590464

15/03/15 14:54:20 INFO mapred.JobClient: CPU time spent (ms)=170

15/03/15 14:54:20 INFO mapred.JobClient: Total committed heap usage (bytes)=38010880

15/03/15 14:54:20 INFO mapred.JobClient: HBase Counters

15/03/15 14:54:20 INFO mapred.JobClient: NUM_SCANNER_RESTARTS=0

。。。。。。

15/03/15 14:54:20 INFO mapred.JobClient: Rack-local map tasks=1

15/03/15 14:54:20 INFO mapred.JobClient: File Output Format Counters

15/03/15 14:54:20 INFO mapred.JobClient: Bytes Written=0



查看输出:索引结果输出到一个表中

hbase(main):023:0> scan 'heroes-name-email'

ROW COLUMN+CELL

chundiao column=INDEX:ROW, timestamp=1426402274405, value=5

chundiao@123.com column=INDEX:ROW, timestamp=1426402274405, value=5

ddd column=INDEX:ROW, timestamp=1426402274405, value=3

ddd@123.com column=INDEX:ROW, timestamp=1426402274405, value=3

kxt column=INDEX:ROW, timestamp=1426402274405, value=1

kxt@123.com column=INDEX:ROW, timestamp=1426402274405, value=1

shaonian column=INDEX:ROW, timestamp=1426402274405, value=4

shaonian@123.com column=INDEX:ROW, timestamp=1426402274405, value=4

susu column=INDEX:ROW, timestamp=1426402274405, value=2

susu@123.com column=INDEX:ROW, timestamp=1426402274405, value=2

10 row(s) in 0.0500 seconds



hbase(main):024:0>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: