您的位置:首页 > 编程语言 > Java开发

hbase使用Java的一些基础操作

2017-05-23 14:27 465 查看
建表:

代码1:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;

public class CreatTableTest {
public static void main(String[] args) throws IOException  {
//设置HBase数据库的连接配置参数
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum",  "192.168.8.71");  //  Zookeeper的地址
conf.set("hbase.zookeeper.property.clientPort", "2181");
String tableName = "emp";
String[] family = { "basicinfo","deptinfo"};
HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
//创建表对象
HTableDescriptor hbaseTableDesc = new HTableDescriptor(tableName);
for(int i = 0; i < family.length; i++) {
//设置表字段
hbaseTableDesc.addFamily(new HColumnDescriptor(family[i]));
}
//判断表是否存在,不存在则创建,存在则打印提示信息
if(hbaseAdmin.tableExists(tableName)) {
System.out.println("TableExists!");
/**
这个方法是用来结束当前正在运行中的java虚拟机。如何status是非零参数,那么表示是非正常退出。
System.exit(0)是将你的整个虚拟机里的内容都停掉了 ,而dispose()只是关闭这个窗口,但是并没有停止整个application exit() 。无论如何,内存都释放了!也就是说连JVM都关闭了,内存里根本不可能还有什么东西
System.exit(0)是正常退出程序,而System.exit(1)或者说非0表示非正常退出程序
System.exit(status)不管status为何值都会退出程序。和return 相比有以下不同点:   return是回到上一层,而System.exit(status)是回到最上层
*/
System.exit(0);
} else{
hbaseAdmin.createTable(hbaseTableDesc);
System.out.println("Create table Success!");
}
}
}
代码2:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;

public class CreateMyTable {
public static void main(String[] args)
throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
String tableName = "mytb";
String columnFamily = "mycf";
create(tableName, columnFamily);
}

public static Configuration getConfiguration() {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", "hdfs://192.168.8.71:9000/hbase");
conf.set("hbase.zookeeper.quorum", "192.168.8.71");
return conf;
}

public static void create(String tableName, String columnFamily)
throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
HBaseAdmin hBaseAdmin = new HBaseAdmin(getConfiguration());
if (hBaseAdmin.tableExists(tableName)) {
System.err.println("Table exists!");
} else {
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
tableDesc.addFamily(new HColumnDescriptor(columnFamily));
hBaseAdmin.createTable(tableDesc);
System.err.println("Create Table SUCCESS!");
}
}
}
删除表:
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
public class DeleteMyTable {

public static void main(String[] args) throws IOException {
String tableName = "mytb";
delete(tableName);
}

public static Configuration getConfiguration() {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", "hdfs://192.168.8.71:9000/hbase");
conf.set("hbase.zookeeper.quorum", "192.168.8.71");
return conf;
}

public static void delete(String tableName) throws IOException {
HBaseAdmin hAdmin = new HBaseAdmin(getConfiguration());
if(hAdmin.tableExists(tableName)){
try {
hAdmin.disableTable(tableName);
hAdmin.deleteTable(tableName);
System.err.println("Delete table Success");
} catch (IOException e) {
System.err.println("Delete table Failed ");
}
}else{
System.err.println("table not exists");
}
}
}
写入数据:
某电商网站,后台有买家信息表buyer,每注册一名新用户网站后台会产生一条日志,并写入hbase中。

数据格式为:用户ID(buyer_id),注册日期(reg_date),注册IP(reg_ip),卖家状态(buyer_status,0表示冻结 ,1表示正常),以“\t”分割,数据内容如下:

用户ID   注册日期  注册IP   卖家状态

20385,2010-05-04,124.64.242.30,1

20386,2010-05-05,117.136.0.172,1

20387,2010-05-06 ,114.94.44.230,1

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
public class PutData {
public static void main(String[] args) throws MasterNotRunningException,
ZooKeeperConnectionException, IOException {
String tableName = "mytb";
String columnFamily = "mycf";

put(tableName, "20385", columnFamily, "2010-05-04:reg_ip", "124.64.242.30");
put(tableName, "20385", columnFamily, "2010-05-04:buyer_status", "1");

put(tableName, "20386", columnFamily, "2010-05-05:reg_ip", "117.136.0.172");
put(tableName, "20386", columnFamily, "2010-05-05:buyer_status", "1");

put(tableName, "20387", columnFamily, "2010-05-06:reg_ip", "114.94.44.230");
put(tableName, "20387", columnFamily, "2010-05-06:buyer_status", "1");

}

public static Configuration getConfiguration() {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", "hdfs://192.168.8.71:9000/hbase");
conf.set("hbase.zookeeper.quorum", "192.168.8.71");
return conf;
}

public static void put(String tableName, String row, String columnFamily,
String column, String data) throws IOException {
HTable table = new HTable(getConfiguration(), tableName);
Put put = new Put(Bytes.toBytes(row));
put.add(Bytes.toBytes(columnFamily),
Bytes.toBytes(column),
Bytes.toBytes(data));
table.put(put);
System.err.println("SUCCESS");
}
}
查询:
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;

public class GetData {
public static void main(String[] args) throws IOException {
String tableName = "mytb";
get(tableName, "20386");

}

public static Configuration getConfiguration() {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", "hdfs://192.168.8.71:9000/hbase");
conf.set("hbase.zookeeper.quorum", "192.168.8.71");
return conf;
}

public static void get(String tableName, String rowkey) throws IOException {
HTable table = new HTable(getConfiguration(), tableName);
Get get = new Get(Bytes.toBytes(rowkey));
Result result = table.get(get);
byte[] value1 = result.getValue("mycf".getBytes(), "2010-05-05:reg_ip".getBytes());
byte[] value2 = result.getValue("mycf".getBytes(), "2010-05-05:buyer_status".getBytes());
System.err.println("line1:SUCCESS");
System.err.println("line2:"
+ new String(value1) + "\t"
+ new String(value2));
}

}

前面的这些代码都这样执行:

[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac GetData.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/java GetData

通过Java Api与HBase交互的一些常用的操作整合:

import java.io.IOException;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseTest2 {

// 声明静态配置
static Configuration conf = null;
static {
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.205.153");
}

/*
* 创建表
* @tableName 表名
* @family 列族列表
*/
public static void creatTable(String tableName, String[] family) throws Exception {
HBaseAdmin admin = new HBaseAdmin(conf);
HTableDescriptor desc = new HTableDescriptor(tableName);
for (int i = 0; i < family.length; i++) {
desc.addFamily(new HColumnDescriptor(family[i]));
}
if (admin.tableExists(tableName)) {
System.out.println("table Exists!");
System.exit(0);
} else {
admin.createTable(desc);
System.out.println("create table Success!");
}
}

/*
* 为表添加数据(适合知道有多少列族的固定表)
* @rowKey rowKey
* @tableName 表名
* @column1 第一个列族列表
* @value1 第一个列的值的列表
* @column2 第二个列族列表
* @value2 第二个列的值的列表
*/
public static void addData(String rowKey, String tableName,
String[] column1, String[] value1, String[] column2, String[] value2)
throws IOException {
Put put = new Put(Bytes.toBytes(rowKey));// 设置rowkey
HTable table = new HTable(conf, tableName);// 获取表
HColumnDescriptor[] columnFamilies = table.getTableDescriptor() // 获取所有的列族
.getColumnFamilies();

for (int i = 0; i < columnFamilies.length; i++) {
String familyName = columnFamilies[i].getNameAsString(); // 获取列族名
if (familyName.equals("article")) { // article列族put数据
for (int j = 0; j < column1.length; j++) {
put.add(Bytes.toBytes(familyName),
Bytes.toBytes(column1[j]), Bytes.toBytes(value1[j]));
}
}
if (familyName.equals("author")) { // author列族put数据
for (int j = 0; j < column2.length; j++) {
put.add(Bytes.toBytes(familyName),
Bytes.toBytes(column2[j]), Bytes.toBytes(value2[j]));
}
}
}
table.put(put);
System.out.println("add data Success!");
}

/*
* 根据rwokey查询
* @rowKey rowKey
* @tableName 表名
*/
public static Result getResult(String tableName, String rowKey) throws IOException {
Get get = new Get(Bytes.toBytes(rowKey));
HTable table = new HTable(conf, tableName);// 获取表
Result result = table.get(get);
for (KeyValue kv : result.list()) {
System.out.println("family:" + Bytes.toString(kv.getFamily()));
System.out
.println("qualifier:" + Bytes.toString(kv.getQualifier()));
System.out.println("value:" + Bytes.toString(kv.getValue()));
System.out.println("Timestamp:" + kv.getTimestamp());
System.out.println("-------------------------------------------");
}
return result;
}

/*
* 遍历查询hbase表
* @tableName 表名
*/
public static void getResultScann(String tableName) throws IOException {
Scan scan = new Scan();
ResultScanner rs = null;
HTable table = new HTable(conf, tableName);
try {
rs = table.getScanner(scan);
for (Result r : rs) {
for (KeyValue kv : r.list()) {
System.out.println("family:"
+ Bytes.toString(kv.getFamily()));
System.out.println("qualifier:"
+ Bytes.toString(kv.getQualifier()));
System.out
.println("value:" + Bytes.toString(kv.getValue()));
System.out.println("timestamp:" + kv.getTimestamp());
System.out
.println("-------------------------------------------");
}
}
} finally {
rs.close();
}
}

/*
* 查询表中的某一列
* @tableName 表名
* @rowKey rowKey
*/
public static void getResultByColumn(String tableName, String rowKey,
String familyName, String columnName) throws IOException {
HTable table = new HTable(conf, tableName);
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); // 获取指定列族和列修饰符对应的列
Result result = table.get(get);
for (KeyValue kv : result.list()) {
System.out.println("family:" + Bytes.toString(kv.getFamily()));
System.out
.println("qualifier:" + Bytes.toString(kv.getQualifier()));
System.out.println("value:" + Bytes.toString(kv.getValue()));
System.out.println("Timestamp:" + kv.getTimestamp());
System.out.println("-------------------------------------------");
}
}

/*
* 更新表中的某一列
* @tableName 表名
* @rowKey rowKey
* @familyName 列族名
* @columnName 列名
* @value 更新后的值
*/
public static void updateTable(String tableName, String rowKey,
String familyName, String columnName, String value)
throws IOException {
HTable table = new HTable(conf, tableName);
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes(familyName), Bytes.toBytes(columnName),
Bytes.toBytes(value));
table.put(put);
System.out.println("update table Success!");
}

/*
* 查询某列数据的多个版本
* @tableName 表名
* @rowKey rowKey
* @familyName 列族名
* @columnName 列名
*/
public static void getResultByVersion(String tableName, String rowKey,
String familyName, String columnName) throws IOException {
HTable table = new HTable(conf, tableName);
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
get.setMaxVersions(5);
Result result = table.get(get);
for (KeyValue kv : result.list()) {
System.out.println("family:" + Bytes.toString(kv.getFamily()));
System.out
.println("qualifier:" + Bytes.toString(kv.getQualifier()));
System.out.println("value:" + Bytes.toString(kv.getValue()));
System.out.println("Timestamp:" + kv.getTimestamp());
System.out.println("-------------------------------------------");
}
List<?> results = table.get(get).list(); Iterator<?> it =
results.iterator(); while (it.hasNext()) {
System.out.println(it.next().toString()); }
}

/*
* 删除指定的列
* @tableName 表名
* @rowKey rowKey
* @familyName 列族名
* @columnName 列名
*/
public static void deleteColumn(String tableName, String rowKey,
String falilyName, String columnName) throws IOException {
HTable table = new HTable(conf, tableName);
Delete deleteColumn = new Delete(Bytes.toBytes(rowKey));
deleteColumn.deleteColumns(Bytes.toBytes(falilyName),
Bytes.toBytes(columnName));
table.delete(deleteColumn);
System.out.println(falilyName + ":" + columnName + "is deleted!");
}

/*
* 删除指定的列
* @tableName 表名
* @rowKey rowKey
*/
public static void deleteAllColumn(String tableName, String rowKey)
throws IOException {
HTable table = new HTable(conf, tableName);
Delete deleteAll = new Delete(Bytes.toBytes(rowKey));
table.delete(deleteAll);
System.out.println("all columns are deleted!");
}

/*
* 删除表
* @tableName 表名
*/
public static void deleteTable(String tableName) throws IOException {
HBaseAdmin admin = new HBaseAdmin(conf);
admin.disableTable(tableName);
admin.deleteTable(tableName);
System.out.println(tableName + "is deleted!");
}

public static void main(String[] args) throws Exception {

// 创建表
// String tableName = "blog2"; String[] family = { "article","author" };
// creatTable(tableName,family);

// 为表添加数据
// String[] column1 = { "title", "content", "tag" }; String[] value1 = {"Head First HBase",
// "HBase is the Hadoop database. Use it when you need random, realtime read/write access to your Big Data."
// , "Hadoop,HBase,NoSQL" }; String[] column2 = { "name", "nickname" };
// String[] value2 = { "nicholas", "lee" }; addData("rowkey1", "blog2",
// column1, value1, column2, value2);

// 删除一列
// deleteColumn("blog2", "rowkey1", "author", "nickname");

// 删除所有列
// deleteAllColumn("blog2", "rowkey1");

//删除表
// deleteTable("blog2");

// 查询
// getResult("blog2", "rowkey1");

// 查询某一列的值
// getResultByColumn("blog2", "rowkey1", "author", "name");
// updateTable("blog2", "rowkey1", "author", "name","bin");
// getResultByColumn("blog2", "rowkey1", "author", "name");

// 遍历查询
// getResultScann("blog2");

// 查询某列的多版本
getResultByVersion("blog2", "rowkey1", "author", "name");
}
}
MapReduce读取HBase并写入HBase:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
public class ReadWriteHBase {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String hbaseTableName1 = "mytb1";
String hbaseTableName2 = "mytb2";

prepareTB1(hbaseTableName1,"mycf");
prepareTB2(hbaseTableName2);

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);
job.setJarByClass(ReadWriteHBase.class);
job.setJobName("mrreadwritehbase");

Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);

TableMapReduceUtil.initTableMapperJob(hbaseTableName1, scan, doMapper.class, Text.class, Text.class, job);
TableMapReduceUtil.initTableReducerJob(hbaseTableName2, doReducer.class, job);
System.exit(job.waitForCompletion(true) ? 1 : 0);
}

public static class doMapper extends TableMapper<Text, Text>{
@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
String rowID = Bytes.toString(key.get());
String rowValue = Bytes.toString(value.list().get(0).getValue());
/**
* do map task
*/
context.write(new Text(rowID), new Text(rowValue));
}

}

public static class doReducer extends TableReducer<Text, Text, NullWritable>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
System.out.println(key.toString());
for (Text value : values) {
Put put = new Put(Bytes.toBytes(key.toString()));
put.add(Bytes.toBytes("mycolumnfamily"), Bytes.toBytes("num"), Bytes.toBytes(String.valueOf(value)));
context.write(NullWritable.get(), put);
}
}
}

public static void prepareTB1(String hbaseTableName, String hbaseColumnFamily) throws IOException{
HTableDescriptor tableDesc = new HTableDescriptor(hbaseTableName);
HColumnDescriptor columnDesc = new HColumnDescriptor(hbaseColumnFamily);
tableDesc.addFamily(columnDesc);
Configuration  cfg = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(cfg);
if (admin.tableExists(hbaseTableName)) {
System.out.println("Table exists,trying drop and create!");
admin.disableTable(hbaseTableName);
admin.deleteTable(hbaseTableName);
admin.createTable(tableDesc);
} else {
System.out.println("create table: "+ hbaseTableName);
admin.createTable(tableDesc);
}
HTable mytb = new HTable(cfg, hbaseTableName);
for (int i = 1; i <= 10; i++) {
Put oneRow = new Put(Bytes.toBytes(String.valueOf(i)));
oneRow.add(Bytes.toBytes(hbaseColumnFamily), Bytes.toBytes("num"), Bytes.toBytes(String.valueOf("Number:" + i)));
mytb.put(oneRow);
}
}

public static void prepareTB2(String hbaseTableName) throws IOException{
HTableDescriptor tableDesc = new HTableDescriptor(hbaseTableName);
HColumnDescriptor columnDesc = new HColumnDescriptor("mycolumnfamily");
tableDesc.addFamily(columnDesc);
Configuration  cfg = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(cfg);
if (admin.tableExists(hbaseTableName)) {
System.out.println("Table exists,trying drop and create!");
admin.disableTable(hbaseTableName);
admin.deleteTable(hbaseTableName);
admin.createTable(tableDesc);
} else {
System.out.println("create table: "+ hbaseTableName);
admin.createTable(tableDesc);
}
}
}


这个代码这样执行:

[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac ReadWriteHBase.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar ReadWriteHBase*class
[hadoop@h71 q1]$ hadoop jar xx.jar ReadWriteHBase
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: