hbase使用Java的一些基础操作
2017-05-23 14:27
465 查看
建表:
代码1:
某电商网站,后台有买家信息表buyer,每注册一名新用户网站后台会产生一条日志,并写入hbase中。
数据格式为:用户ID(buyer_id),注册日期(reg_date),注册IP(reg_ip),卖家状态(buyer_status,0表示冻结 ,1表示正常),以“\t”分割,数据内容如下:
用户ID 注册日期 注册IP 卖家状态
20385,2010-05-04,124.64.242.30,1
20386,2010-05-05,117.136.0.172,1
20387,2010-05-06 ,114.94.44.230,1
前面的这些代码都这样执行:
通过Java Api与HBase交互的一些常用的操作整合:
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
public class HBaseTest2 {
// 声明静态配置
static Configuration conf = null;
static {
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.205.153");
}
/*
* 创建表
* @tableName 表名
* @family 列族列表
*/
public static void creatTable(String tableName, String[] family) throws Exception {
HBaseAdmin admin = new HBaseAdmin(conf);
HTableDescriptor desc = new HTableDescriptor(tableName);
for (int i = 0; i < family.length; i++) {
desc.addFamily(new HColumnDescriptor(family[i]));
}
if (admin.tableExists(tableName)) {
System.out.println("table Exists!");
System.exit(0);
} else {
admin.createTable(desc);
System.out.println("create table Success!");
}
}
/*
* 为表添加数据(适合知道有多少列族的固定表)
* @rowKey rowKey
* @tableName 表名
* @column1 第一个列族列表
* @value1 第一个列的值的列表
* @column2 第二个列族列表
* @value2 第二个列的值的列表
*/
public static void addData(String rowKey, String tableName,
String[] column1, String[] value1, String[] column2, String[] value2)
throws IOException {
Put put = new Put(Bytes.toBytes(rowKey));// 设置rowkey
HTable table = new HTable(conf, tableName);// 获取表
HColumnDescriptor[] columnFamilies = table.getTableDescriptor() // 获取所有的列族
.getColumnFamilies();
for (int i = 0; i < columnFamilies.length; i++) {
String familyName = columnFamilies[i].getNameAsString(); // 获取列族名
if (familyName.equals("article")) { // article列族put数据
for (int j = 0; j < column1.length; j++) {
put.add(Bytes.toBytes(familyName),
Bytes.toBytes(column1[j]), Bytes.toBytes(value1[j]));
}
}
if (familyName.equals("author")) { // author列族put数据
for (int j = 0; j < column2.length; j++) {
put.add(Bytes.toBytes(familyName),
Bytes.toBytes(column2[j]), Bytes.toBytes(value2[j]));
}
}
}
table.put(put);
System.out.println("add data Success!");
}
/*
* 根据rwokey查询
* @rowKey rowKey
* @tableName 表名
*/
public static Result getResult(String tableName, String rowKey) throws IOException {
Get get = new Get(Bytes.toBytes(rowKey));
HTable table = new HTable(conf, tableName);// 获取表
Result result = table.get(get);
for (KeyValue kv : result.list()) {
System.out.println("family:" + Bytes.toString(kv.getFamily()));
System.out
.println("qualifier:" + Bytes.toString(kv.getQualifier()));
System.out.println("value:" + Bytes.toString(kv.getValue()));
System.out.println("Timestamp:" + kv.getTimestamp());
System.out.println("-------------------------------------------");
}
return result;
}
/*
* 遍历查询hbase表
* @tableName 表名
*/
public static void getResultScann(String tableName) throws IOException {
Scan scan = new Scan();
ResultScanner rs = null;
HTable table = new HTable(conf, tableName);
try {
rs = table.getScanner(scan);
for (Result r : rs) {
for (KeyValue kv : r.list()) {
System.out.println("family:"
+ Bytes.toString(kv.getFamily()));
System.out.println("qualifier:"
+ Bytes.toString(kv.getQualifier()));
System.out
.println("value:" + Bytes.toString(kv.getValue()));
System.out.println("timestamp:" + kv.getTimestamp());
System.out
.println("-------------------------------------------");
}
}
} finally {
rs.close();
}
}
/*
* 查询表中的某一列
* @tableName 表名
* @rowKey rowKey
*/
public static void getResultByColumn(String tableName, String rowKey,
String familyName, String columnName) throws IOException {
HTable table = new HTable(conf, tableName);
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); // 获取指定列族和列修饰符对应的列
Result result = table.get(get);
for (KeyValue kv : result.list()) {
System.out.println("family:" + Bytes.toString(kv.getFamily()));
System.out
.println("qualifier:" + Bytes.toString(kv.getQualifier()));
System.out.println("value:" + Bytes.toString(kv.getValue()));
System.out.println("Timestamp:" + kv.getTimestamp());
System.out.println("-------------------------------------------");
}
}
/*
* 更新表中的某一列
* @tableName 表名
* @rowKey rowKey
* @familyName 列族名
* @columnName 列名
* @value 更新后的值
*/
public static void updateTable(String tableName, String rowKey,
String familyName, String columnName, String value)
throws IOException {
HTable table = new HTable(conf, tableName);
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes(familyName), Bytes.toBytes(columnName),
Bytes.toBytes(value));
table.put(put);
System.out.println("update table Success!");
}
/*
* 查询某列数据的多个版本
* @tableName 表名
* @rowKey rowKey
* @familyName 列族名
* @columnName 列名
*/
public static void getResultByVersion(String tableName, String rowKey,
String familyName, String columnName) throws IOException {
HTable table = new HTable(conf, tableName);
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
get.setMaxVersions(5);
Result result = table.get(get);
for (KeyValue kv : result.list()) {
System.out.println("family:" + Bytes.toString(kv.getFamily()));
System.out
.println("qualifier:" + Bytes.toString(kv.getQualifier()));
System.out.println("value:" + Bytes.toString(kv.getValue()));
System.out.println("Timestamp:" + kv.getTimestamp());
System.out.println("-------------------------------------------");
}
List<?> results = table.get(get).list(); Iterator<?> it =
results.iterator(); while (it.hasNext()) {
System.out.println(it.next().toString()); }
}
/*
* 删除指定的列
* @tableName 表名
* @rowKey rowKey
* @familyName 列族名
* @columnName 列名
*/
public static void deleteColumn(String tableName, String rowKey,
String falilyName, String columnName) throws IOException {
HTable table = new HTable(conf, tableName);
Delete deleteColumn = new Delete(Bytes.toBytes(rowKey));
deleteColumn.deleteColumns(Bytes.toBytes(falilyName),
Bytes.toBytes(columnName));
table.delete(deleteColumn);
System.out.println(falilyName + ":" + columnName + "is deleted!");
}
/*
* 删除指定的列
* @tableName 表名
* @rowKey rowKey
*/
public static void deleteAllColumn(String tableName, String rowKey)
throws IOException {
HTable table = new HTable(conf, tableName);
Delete deleteAll = new Delete(Bytes.toBytes(rowKey));
table.delete(deleteAll);
System.out.println("all columns are deleted!");
}
/*
* 删除表
* @tableName 表名
*/
public static void deleteTable(String tableName) throws IOException {
HBaseAdmin admin = new HBaseAdmin(conf);
admin.disableTable(tableName);
admin.deleteTable(tableName);
System.out.println(tableName + "is deleted!");
}
public static void main(String[] args) throws Exception {
// 创建表
// String tableName = "blog2"; String[] family = { "article","author" };
// creatTable(tableName,family);
// 为表添加数据
// String[] column1 = { "title", "content", "tag" }; String[] value1 = {"Head First HBase",
// "HBase is the Hadoop database. Use it when you need random, realtime read/write access to your Big Data."
// , "Hadoop,HBase,NoSQL" }; String[] column2 = { "name", "nickname" };
// String[] value2 = { "nicholas", "lee" }; addData("rowkey1", "blog2",
// column1, value1, column2, value2);
// 删除一列
// deleteColumn("blog2", "rowkey1", "author", "nickname");
// 删除所有列
// deleteAllColumn("blog2", "rowkey1");
//删除表
// deleteTable("blog2");
// 查询
// getResult("blog2", "rowkey1");
// 查询某一列的值
// getResultByColumn("blog2", "rowkey1", "author", "name");
// updateTable("blog2", "rowkey1", "author", "name","bin");
// getResultByColumn("blog2", "rowkey1", "author", "name");
// 遍历查询
// getResultScann("blog2");
// 查询某列的多版本
getResultByVersion("blog2", "rowkey1", "author", "name");
}
}
MapReduce读取HBase并写入HBase:
这个代码这样执行:
代码1:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; public class CreatTableTest { public static void main(String[] args) throws IOException { //设置HBase数据库的连接配置参数 Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.8.71"); // Zookeeper的地址 conf.set("hbase.zookeeper.property.clientPort", "2181"); String tableName = "emp"; String[] family = { "basicinfo","deptinfo"}; HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); //创建表对象 HTableDescriptor hbaseTableDesc = new HTableDescriptor(tableName); for(int i = 0; i < family.length; i++) { //设置表字段 hbaseTableDesc.addFamily(new HColumnDescriptor(family[i])); } //判断表是否存在,不存在则创建,存在则打印提示信息 if(hbaseAdmin.tableExists(tableName)) { System.out.println("TableExists!"); /** 这个方法是用来结束当前正在运行中的java虚拟机。如何status是非零参数,那么表示是非正常退出。 System.exit(0)是将你的整个虚拟机里的内容都停掉了 ,而dispose()只是关闭这个窗口,但是并没有停止整个application exit() 。无论如何,内存都释放了!也就是说连JVM都关闭了,内存里根本不可能还有什么东西 System.exit(0)是正常退出程序,而System.exit(1)或者说非0表示非正常退出程序 System.exit(status)不管status为何值都会退出程序。和return 相比有以下不同点: return是回到上一层,而System.exit(status)是回到最上层 */ System.exit(0); } else{ hbaseAdmin.createTable(hbaseTableDesc); System.out.println("Create table Success!"); } } }代码2:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HBaseAdmin; public class CreateMyTable { public static void main(String[] args) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { String tableName = "mytb"; String columnFamily = "mycf"; create(tableName, columnFamily); } public static Configuration getConfiguration() { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://192.168.8.71:9000/hbase"); conf.set("hbase.zookeeper.quorum", "192.168.8.71"); return conf; } public static void create(String tableName, String columnFamily) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { HBaseAdmin hBaseAdmin = new HBaseAdmin(getConfiguration()); if (hBaseAdmin.tableExists(tableName)) { System.err.println("Table exists!"); } else { HTableDescriptor tableDesc = new HTableDescriptor(tableName); tableDesc.addFamily(new HColumnDescriptor(columnFamily)); hBaseAdmin.createTable(tableDesc); System.err.println("Create Table SUCCESS!"); } } }删除表:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HBaseAdmin; public class DeleteMyTable { public static void main(String[] args) throws IOException { String tableName = "mytb"; delete(tableName); } public static Configuration getConfiguration() { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://192.168.8.71:9000/hbase"); conf.set("hbase.zookeeper.quorum", "192.168.8.71"); return conf; } public static void delete(String tableName) throws IOException { HBaseAdmin hAdmin = new HBaseAdmin(getConfiguration()); if(hAdmin.tableExists(tableName)){ try { hAdmin.disableTable(tableName); hAdmin.deleteTable(tableName); System.err.println("Delete table Success"); } catch (IOException e) { System.err.println("Delete table Failed "); } }else{ System.err.println("table not exists"); } } }写入数据:
某电商网站,后台有买家信息表buyer,每注册一名新用户网站后台会产生一条日志,并写入hbase中。
数据格式为:用户ID(buyer_id),注册日期(reg_date),注册IP(reg_ip),卖家状态(buyer_status,0表示冻结 ,1表示正常),以“\t”分割,数据内容如下:
用户ID 注册日期 注册IP 卖家状态
20385,2010-05-04,124.64.242.30,1
20386,2010-05-05,117.136.0.172,1
20387,2010-05-06 ,114.94.44.230,1
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; public class PutData { public static void main(String[] args) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { String tableName = "mytb"; String columnFamily = "mycf"; put(tableName, "20385", columnFamily, "2010-05-04:reg_ip", "124.64.242.30"); put(tableName, "20385", columnFamily, "2010-05-04:buyer_status", "1"); put(tableName, "20386", columnFamily, "2010-05-05:reg_ip", "117.136.0.172"); put(tableName, "20386", columnFamily, "2010-05-05:buyer_status", "1"); put(tableName, "20387", columnFamily, "2010-05-06:reg_ip", "114.94.44.230"); put(tableName, "20387", columnFamily, "2010-05-06:buyer_status", "1"); } public static Configuration getConfiguration() { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://192.168.8.71:9000/hbase"); conf.set("hbase.zookeeper.quorum", "192.168.8.71"); return conf; } public static void put(String tableName, String row, String columnFamily, String column, String data) throws IOException { HTable table = new HTable(getConfiguration(), tableName); Put put = new Put(Bytes.toBytes(row)); put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(data)); table.put(put); System.err.println("SUCCESS"); } }查询:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; public class GetData { public static void main(String[] args) throws IOException { String tableName = "mytb"; get(tableName, "20386"); } public static Configuration getConfiguration() { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://192.168.8.71:9000/hbase"); conf.set("hbase.zookeeper.quorum", "192.168.8.71"); return conf; } public static void get(String tableName, String rowkey) throws IOException { HTable table = new HTable(getConfiguration(), tableName); Get get = new Get(Bytes.toBytes(rowkey)); Result result = table.get(get); byte[] value1 = result.getValue("mycf".getBytes(), "2010-05-05:reg_ip".getBytes()); byte[] value2 = result.getValue("mycf".getBytes(), "2010-05-05:buyer_status".getBytes()); System.err.println("line1:SUCCESS"); System.err.println("line2:" + new String(value1) + "\t" + new String(value2)); } }
前面的这些代码都这样执行:
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac GetData.java [hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/java GetData
通过Java Api与HBase交互的一些常用的操作整合:
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
public class HBaseTest2 {
// 声明静态配置
static Configuration conf = null;
static {
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.205.153");
}
/*
* 创建表
* @tableName 表名
* @family 列族列表
*/
public static void creatTable(String tableName, String[] family) throws Exception {
HBaseAdmin admin = new HBaseAdmin(conf);
HTableDescriptor desc = new HTableDescriptor(tableName);
for (int i = 0; i < family.length; i++) {
desc.addFamily(new HColumnDescriptor(family[i]));
}
if (admin.tableExists(tableName)) {
System.out.println("table Exists!");
System.exit(0);
} else {
admin.createTable(desc);
System.out.println("create table Success!");
}
}
/*
* 为表添加数据(适合知道有多少列族的固定表)
* @rowKey rowKey
* @tableName 表名
* @column1 第一个列族列表
* @value1 第一个列的值的列表
* @column2 第二个列族列表
* @value2 第二个列的值的列表
*/
public static void addData(String rowKey, String tableName,
String[] column1, String[] value1, String[] column2, String[] value2)
throws IOException {
Put put = new Put(Bytes.toBytes(rowKey));// 设置rowkey
HTable table = new HTable(conf, tableName);// 获取表
HColumnDescriptor[] columnFamilies = table.getTableDescriptor() // 获取所有的列族
.getColumnFamilies();
for (int i = 0; i < columnFamilies.length; i++) {
String familyName = columnFamilies[i].getNameAsString(); // 获取列族名
if (familyName.equals("article")) { // article列族put数据
for (int j = 0; j < column1.length; j++) {
put.add(Bytes.toBytes(familyName),
Bytes.toBytes(column1[j]), Bytes.toBytes(value1[j]));
}
}
if (familyName.equals("author")) { // author列族put数据
for (int j = 0; j < column2.length; j++) {
put.add(Bytes.toBytes(familyName),
Bytes.toBytes(column2[j]), Bytes.toBytes(value2[j]));
}
}
}
table.put(put);
System.out.println("add data Success!");
}
/*
* 根据rwokey查询
* @rowKey rowKey
* @tableName 表名
*/
public static Result getResult(String tableName, String rowKey) throws IOException {
Get get = new Get(Bytes.toBytes(rowKey));
HTable table = new HTable(conf, tableName);// 获取表
Result result = table.get(get);
for (KeyValue kv : result.list()) {
System.out.println("family:" + Bytes.toString(kv.getFamily()));
System.out
.println("qualifier:" + Bytes.toString(kv.getQualifier()));
System.out.println("value:" + Bytes.toString(kv.getValue()));
System.out.println("Timestamp:" + kv.getTimestamp());
System.out.println("-------------------------------------------");
}
return result;
}
/*
* 遍历查询hbase表
* @tableName 表名
*/
public static void getResultScann(String tableName) throws IOException {
Scan scan = new Scan();
ResultScanner rs = null;
HTable table = new HTable(conf, tableName);
try {
rs = table.getScanner(scan);
for (Result r : rs) {
for (KeyValue kv : r.list()) {
System.out.println("family:"
+ Bytes.toString(kv.getFamily()));
System.out.println("qualifier:"
+ Bytes.toString(kv.getQualifier()));
System.out
.println("value:" + Bytes.toString(kv.getValue()));
System.out.println("timestamp:" + kv.getTimestamp());
System.out
.println("-------------------------------------------");
}
}
} finally {
rs.close();
}
}
/*
* 查询表中的某一列
* @tableName 表名
* @rowKey rowKey
*/
public static void getResultByColumn(String tableName, String rowKey,
String familyName, String columnName) throws IOException {
HTable table = new HTable(conf, tableName);
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); // 获取指定列族和列修饰符对应的列
Result result = table.get(get);
for (KeyValue kv : result.list()) {
System.out.println("family:" + Bytes.toString(kv.getFamily()));
System.out
.println("qualifier:" + Bytes.toString(kv.getQualifier()));
System.out.println("value:" + Bytes.toString(kv.getValue()));
System.out.println("Timestamp:" + kv.getTimestamp());
System.out.println("-------------------------------------------");
}
}
/*
* 更新表中的某一列
* @tableName 表名
* @rowKey rowKey
* @familyName 列族名
* @columnName 列名
* @value 更新后的值
*/
public static void updateTable(String tableName, String rowKey,
String familyName, String columnName, String value)
throws IOException {
HTable table = new HTable(conf, tableName);
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes(familyName), Bytes.toBytes(columnName),
Bytes.toBytes(value));
table.put(put);
System.out.println("update table Success!");
}
/*
* 查询某列数据的多个版本
* @tableName 表名
* @rowKey rowKey
* @familyName 列族名
* @columnName 列名
*/
public static void getResultByVersion(String tableName, String rowKey,
String familyName, String columnName) throws IOException {
HTable table = new HTable(conf, tableName);
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
get.setMaxVersions(5);
Result result = table.get(get);
for (KeyValue kv : result.list()) {
System.out.println("family:" + Bytes.toString(kv.getFamily()));
System.out
.println("qualifier:" + Bytes.toString(kv.getQualifier()));
System.out.println("value:" + Bytes.toString(kv.getValue()));
System.out.println("Timestamp:" + kv.getTimestamp());
System.out.println("-------------------------------------------");
}
List<?> results = table.get(get).list(); Iterator<?> it =
results.iterator(); while (it.hasNext()) {
System.out.println(it.next().toString()); }
}
/*
* 删除指定的列
* @tableName 表名
* @rowKey rowKey
* @familyName 列族名
* @columnName 列名
*/
public static void deleteColumn(String tableName, String rowKey,
String falilyName, String columnName) throws IOException {
HTable table = new HTable(conf, tableName);
Delete deleteColumn = new Delete(Bytes.toBytes(rowKey));
deleteColumn.deleteColumns(Bytes.toBytes(falilyName),
Bytes.toBytes(columnName));
table.delete(deleteColumn);
System.out.println(falilyName + ":" + columnName + "is deleted!");
}
/*
* 删除指定的列
* @tableName 表名
* @rowKey rowKey
*/
public static void deleteAllColumn(String tableName, String rowKey)
throws IOException {
HTable table = new HTable(conf, tableName);
Delete deleteAll = new Delete(Bytes.toBytes(rowKey));
table.delete(deleteAll);
System.out.println("all columns are deleted!");
}
/*
* 删除表
* @tableName 表名
*/
public static void deleteTable(String tableName) throws IOException {
HBaseAdmin admin = new HBaseAdmin(conf);
admin.disableTable(tableName);
admin.deleteTable(tableName);
System.out.println(tableName + "is deleted!");
}
public static void main(String[] args) throws Exception {
// 创建表
// String tableName = "blog2"; String[] family = { "article","author" };
// creatTable(tableName,family);
// 为表添加数据
// String[] column1 = { "title", "content", "tag" }; String[] value1 = {"Head First HBase",
// "HBase is the Hadoop database. Use it when you need random, realtime read/write access to your Big Data."
// , "Hadoop,HBase,NoSQL" }; String[] column2 = { "name", "nickname" };
// String[] value2 = { "nicholas", "lee" }; addData("rowkey1", "blog2",
// column1, value1, column2, value2);
// 删除一列
// deleteColumn("blog2", "rowkey1", "author", "nickname");
// 删除所有列
// deleteAllColumn("blog2", "rowkey1");
//删除表
// deleteTable("blog2");
// 查询
// getResult("blog2", "rowkey1");
// 查询某一列的值
// getResultByColumn("blog2", "rowkey1", "author", "name");
// updateTable("blog2", "rowkey1", "author", "name","bin");
// getResultByColumn("blog2", "rowkey1", "author", "name");
// 遍历查询
// getResultScann("blog2");
// 查询某列的多版本
getResultByVersion("blog2", "rowkey1", "author", "name");
}
}
MapReduce读取HBase并写入HBase:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; public class ReadWriteHBase { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { String hbaseTableName1 = "mytb1"; String hbaseTableName2 = "mytb2"; prepareTB1(hbaseTableName1,"mycf"); prepareTB2(hbaseTableName2); Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(ReadWriteHBase.class); job.setJobName("mrreadwritehbase"); Scan scan = new Scan(); scan.setCaching(500); scan.setCacheBlocks(false); TableMapReduceUtil.initTableMapperJob(hbaseTableName1, scan, doMapper.class, Text.class, Text.class, job); TableMapReduceUtil.initTableReducerJob(hbaseTableName2, doReducer.class, job); System.exit(job.waitForCompletion(true) ? 1 : 0); } public static class doMapper extends TableMapper<Text, Text>{ @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String rowID = Bytes.toString(key.get()); String rowValue = Bytes.toString(value.list().get(0).getValue()); /** * do map task */ context.write(new Text(rowID), new Text(rowValue)); } } public static class doReducer extends TableReducer<Text, Text, NullWritable>{ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { System.out.println(key.toString()); for (Text value : values) { Put put = new Put(Bytes.toBytes(key.toString())); put.add(Bytes.toBytes("mycolumnfamily"), Bytes.toBytes("num"), Bytes.toBytes(String.valueOf(value))); context.write(NullWritable.get(), put); } } } public static void prepareTB1(String hbaseTableName, String hbaseColumnFamily) throws IOException{ HTableDescriptor tableDesc = new HTableDescriptor(hbaseTableName); HColumnDescriptor columnDesc = new HColumnDescriptor(hbaseColumnFamily); tableDesc.addFamily(columnDesc); Configuration cfg = HBaseConfiguration.create(); HBaseAdmin admin = new HBaseAdmin(cfg); if (admin.tableExists(hbaseTableName)) { System.out.println("Table exists,trying drop and create!"); admin.disableTable(hbaseTableName); admin.deleteTable(hbaseTableName); admin.createTable(tableDesc); } else { System.out.println("create table: "+ hbaseTableName); admin.createTable(tableDesc); } HTable mytb = new HTable(cfg, hbaseTableName); for (int i = 1; i <= 10; i++) { Put oneRow = new Put(Bytes.toBytes(String.valueOf(i))); oneRow.add(Bytes.toBytes(hbaseColumnFamily), Bytes.toBytes("num"), Bytes.toBytes(String.valueOf("Number:" + i))); mytb.put(oneRow); } } public static void prepareTB2(String hbaseTableName) throws IOException{ HTableDescriptor tableDesc = new HTableDescriptor(hbaseTableName); HColumnDescriptor columnDesc = new HColumnDescriptor("mycolumnfamily"); tableDesc.addFamily(columnDesc); Configuration cfg = HBaseConfiguration.create(); HBaseAdmin admin = new HBaseAdmin(cfg); if (admin.tableExists(hbaseTableName)) { System.out.println("Table exists,trying drop and create!"); admin.disableTable(hbaseTableName); admin.deleteTable(hbaseTableName); admin.createTable(tableDesc); } else { System.out.println("create table: "+ hbaseTableName); admin.createTable(tableDesc); } } }
这个代码这样执行:
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac ReadWriteHBase.java [hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar ReadWriteHBase*class [hadoop@h71 q1]$ hadoop jar xx.jar ReadWriteHBase
相关文章推荐
- 使用Java进行图像处理的一些基础操作
- JAVA操作Hbase基础例子
- JAVA操作Hbase基础例子
- Java基础---使用foreach操作数组(三十三)
- 线程操作,比 handler更简单的AsyncTask 使用详解-基础java线程
- 使用java操作HBase
- 黑马程序员JAVA基础-数组以及一些基本操作
- Java基础---使用Arrays类操作Java中的数组(三十二)
- java 基础学习总结(附带eclipse一些常用操作)
- JAVA基础学习之IP简述使用、反射、正则表达式操作、网络爬虫、可变参数、了解和入门注解的应用、使用Eclipse的Debug功能(7)
- Java基础---使用循环操作Java中的数组(三十一)
- eclipse下使用java api 进行hbase的常用的操作所需的jar包,以及如何查看java build path下的jar是否使用
- 黑马程序员--Java基础加强--15.利用反射操作泛型IV【通过反射Method解析泛型方法思路】【通过Method对四种Type子接口类型进行解剖】【使用递归对任意复合泛型类型进行彻底解剖】【个人
- HBase Java API使用操作例子
- Java基础之IO流,以字节流的方式操作读写文件FileOutputStream和FileInputStream的使用
- 使用Java操作Windows注册表-Java基础-Java-编程开发
- Java中一些基础概念的使用详解
- 黑马程序员_Java基础_IO流_字节流,字节流操作文件,缓冲区字节流,流的使用规律,异常记录原理
- hbase使用-java操作
- 使用用Phoenix的Java api操作HBase