您的位置:首页 > 其它

hbase为避免热点,预先创建分区region

2015-04-24 10:28 176 查看
最近在使用hbase的时候,遇到了热点问题。我有三台regionserver,结果入数据的时候,总是在一台机器上写数据,导致io和cpu都很高,最后出现了memstore内存溢出的问题。后来才明白,这是因为刚开始入数据的时候只有一个region,所以数据一直往第一台机子的region上写。等到region到了一定大小之后才开始分割成两个。这样就导致了热点问题。采取的方案就是创建表的时候提前创建好分区,这样就能够实现负载均衡,极大的提高入库速度。在论坛提问,以及查阅到的资料如下,诸君可根据自己的需要选择合适的方法:

hbase api常用方法使用及预分区解决热点问题

首先是一个链接,说的非常详细。大家可以参考一下:http://www.cnblogs.com/bdifn/p/3801737.html

然后是一些代码:

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.kktest.hbase.HashChoreWoker;
import com.kktest.hbase.HashRowKeyGenerator;
import com.kktest.hbase.RowKeyGenerator;
import com.kktest.hbase.BitUtils;

/**
* hbase 客户端
*
* @author kuang hj
*
*/
@SuppressWarnings("all")
public class HBaseClient {

private static Logger logger = LoggerFactory.getLogger(HBaseClient.class);
private static Configuration config;
static {
config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum",
"192.168.1.100:2181,192.168.1.101:2181,192.168.1.103:2181");
}

/**
* 根据随机散列(hash)创建分区表
*
* @throws Exception
*             hash_split_table
*/
public static void testHashAndCreateTable(String tableNameTmp,
String columnFamily) throws Exception {<p>        // 取随机散列 10 代表 10个分区
HashChoreWoker worker = new HashChoreWoker(1000000, 10);
byte[][] splitKeys = worker.calcSplitKeys();

HBaseAdmin admin = new HBaseAdmin(config);
TableName tableName = TableName.valueOf(tableNameTmp);

if (admin.tableExists(tableName)) {
try {
admin.disableTable(tableName);
} catch (Exception e) {
}
admin.deleteTable(tableName);
}

HTableDescriptor tableDesc = new HTableDescriptor(tableName);
HColumnDescriptor columnDesc = new HColumnDescriptor(
Bytes.toBytes(columnFamily));
columnDesc.setMaxVersions(1);
tableDesc.addFamily(columnDesc);

admin.createTable(tableDesc, splitKeys);

admin.close();
}

/**
* @Title: queryData
* @Description: 从HBase查询出数据
* @author kuang hj
* @param tableName
*            表名
* @param rowkey
*            rowkey
* @return 返回用户信息的list
* @throws Exception
*/
@SuppressWarnings("all")
public static ArrayList<String> queryData(String tableName, String rowkey)
throws Exception {
ArrayList<String> list = new ArrayList<String>();
logger.info("开始时间");
HTable table = new HTable(config, tableName);

Get get = new Get(rowkey.getBytes()); // 根据主键查询
Result r = table.get(get);
logger.info("结束时间");
KeyValue[] kv = r.raw();
for (int i = 0; i < kv.length; i++) {
// 循环每一列
String key = kv[i].getKeyString();

String value = kv[i].getValueArray().toString();

// 将查询到的结果写入List中
list.add(key + ":"+ value);

}// end of 遍历每一列

return list;
}

/**
* 增加表数据
*
* @param tableName
* @param rowkey
*/
public static void insertData(String tableName, String rowkey) {
HTable table = null;
try {
table = new HTable(config, tableName);
// 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值
for (int i = 1; i < 100; i++) {
byte[] result = getNumRowkey(rowkey,i);
Put put = new Put(result);
// 本行数据的第一列
put.add(rowkey.getBytes(), "name".getBytes(),
("aaa" + i).getBytes());
// 本行数据的第三列
put.add(rowkey.getBytes(), "age".getBytes(),
("bbb" + i).getBytes());
// 本行数据的第三列
put.add(rowkey.getBytes(), "address".getBytes(),
("ccc" + i).getBytes());

table.put(put);
}

} catch (Exception e1) {
e1.printStackTrace();
}
}

private static byte[] getNewRowkey(String rowkey) {
byte[] result = null;

RowKeyGenerator rkGen = new HashRowKeyGenerator();
byte[] splitKeys = rkGen.nextId();

byte[] rowkeytmp = rowkey.getBytes();

result = new byte[splitKeys.length + rowkeytmp.length];
System.arraycopy(splitKeys, 0, result, 0, splitKeys.length);
System.arraycopy(rowkeytmp, 0, result, splitKeys.length,
rowkeytmp.length);

return result;
}

public static void main(String[] args) {
RowKeyGenerator rkGen = new HashRowKeyGenerator();
byte[] splitKeys = rkGen.nextId();
System.out.println(splitKeys);
}

private static byte[] getNumRowkey(String rowkey, int i) {
byte[] result = null;

RowKeyGenerator rkGen = new HashRowKeyGenerator();
byte[] splitKeys = rkGen.nextId();

byte[] rowkeytmp = rowkey.getBytes();

byte[] intVal = BitUtils.getByteByInt(i);
result = new byte[splitKeys.length + rowkeytmp.length + intVal.length];
System.arraycopy(splitKeys, 0, result, 0, splitKeys.length);
System.arraycopy(rowkeytmp, 0, result, splitKeys.length,
rowkeytmp.length);
System.arraycopy(intVal, 0, result, splitKeys.length+rowkeytmp.length,
intVal.length);

return result;
}

/**
* 删除表
*
* @param tableName
*/
public static void dropTable(String tableName) {
try {
HBaseAdmin admin = new HBaseAdmin(config);
admin.disableTable(tableName);
admin.deleteTable(tableName);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 查询所有
*
* @param tableName
*/
public static void QueryAll(String tableName) {
HTable table  = null;
try {
table  = new HTable(config, tableName);
ResultScanner rs = table.getScanner(new Scan());
for (Result r : rs) {
System.out.println("获得到rowkey:" + new String(r.getRow()));
for (KeyValue keyValue : r.raw()) {
System.out.println("列:" + new String(keyValue.getFamily())
+ "====值:" + new String(keyValue.getValue()));
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 查询所有
*
* @param tableName
*/
public static void QueryByCondition1(String tableName) {

HTable table = null;
try {
table  = new HTable(config, tableName);
Get scan = new Get("abcdef".getBytes());// 根据rowkey查询
Result r = table.get(scan);
System.out.println("获得到rowkey:" + new String(r.getRow()));
for (KeyValue keyValue : r.raw()) {
System.out.println("列:" + new String(keyValue.getFamily())
+ "====值:" + new String(keyValue.getValue()));
}
} catch (IOException e) {
e.printStackTrace();
}
}

/**
*  根据rowkwy前坠查询
* @param tableName
* @param rowkey
*/
public static void queryByRowKey(String tableName,String rowkey)
{
try {
HTable table = new HTable(config, tableName);
Scan scan = new Scan();
scan.setFilter(new PrefixFilter(rowkey.getBytes()));
ResultScanner rs = table.getScanner(scan);
KeyValue[] kvs = null;
for (Result tmp : rs)
{
kvs = tmp.raw();
for (KeyValue kv : kvs)
{
System.out.print(kv.getRow()+" ");
System.out.print(kv.getFamily()+" :");
System.out.print(kv.getQualifier()+" ");
System.out.print(kv.getTimestamp()+" ");
System.out.println(kv.getValue());
}
}
} catch (IOException e) {
e.printStackTrace();
}

}
/**
* 查询所有
*
* @param tableName
*/
public static void QueryByCondition2(String tableName) {

try {
HTable table = new HTable(config, tableName);
// 当列column1的值为aaa时进行查询
Filter filter = new SingleColumnValueFilter(
Bytes.toBytes("column1"), null, CompareOp.EQUAL,
Bytes.toBytes("aaa"));
Scan s = new Scan();
s.setFilter(filter);
ResultScanner rs = table.getScanner(s);
for (Result r : rs) {
System.out.println("获得到rowkey:" + new String(r.getRow()));
for (KeyValue keyValue : r.raw()) {
System.out.println("列:" + new String(keyValue.getFamily())
+ "====值:" + new String(keyValue.getValue()));
}
}
} catch (Exception e) {
e.printStackTrace();
}

}

/**
* 查询所有
*
* @param tableName
*/
public static void QueryByCondition3(String tableName) {

try {

HTable table = new HTable(config, tableName);

List<Filter> filters = new ArrayList<Filter>();

Filter filter1 = new SingleColumnValueFilter(
Bytes.toBytes("column1"), null, CompareOp.EQUAL,
Bytes.toBytes("aaa"));
filters.add(filter1);

Filter filter2 = new SingleColumnValueFilter(
Bytes.toBytes("column2"), null, CompareOp.EQUAL,
Bytes.toBytes("bbb"));
filters.add(filter2);

Filter filter3 = new SingleColumnValueFilter(
Bytes.toBytes("column3"), null, CompareOp.EQUAL,
Bytes.toBytes("ccc"));
filters.add(filter3);

FilterList filterList1 = new FilterList(filters);

Scan scan = new Scan();
scan.setFilter(filterList1);
ResultScanner rs = table.getScanner(scan);
for (Result r : rs) {
System.out.println("获得到rowkey:" + new String(r.getRow()));
for (KeyValue keyValue : r.raw()) {
System.out.println("列:" + new String(keyValue.getFamily())
+ "====值:" + new String(keyValue.getValue()));
}
}
rs.close();

} catch (Exception e) {
e.printStackTrace();
}

}
}


HashChoreWoker:

import java.util.Iterator;
import java.util.TreeSet;

import org.apache.hadoop.hbase.util.Bytes;

/**
*
* @author kuang hj
*
*/
public class HashChoreWoker{
// 随机取机数目
private int baseRecord;
// rowkey生成器
private RowKeyGenerator rkGen;
// 取样时,由取样数目及region数相除所得的数量.
private int splitKeysBase;
// splitkeys个数
private int splitKeysNumber;
// 由抽样计算出来的splitkeys结果
private byte[][] splitKeys;

public HashChoreWoker(int baseRecord, int prepareRegions) {
this.baseRecord = baseRecord;
// 实例化rowkey生成器
rkGen = new HashRowKeyGenerator();
splitKeysNumber = prepareRegions - 1;
splitKeysBase = baseRecord / prepareRegions;
}

public byte[][] calcSplitKeys() {
splitKeys = new byte[splitKeysNumber][];
// 使用treeset保存抽样数据,已排序过
TreeSet<byte[]> rows = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
for (int i = 0; i < baseRecord; i++) {
rows.add(rkGen.nextId());
}
int pointer = 0;
Iterator<byte[]> rowKeyIter = rows.iterator();
int index = 0;
while (rowKeyIter.hasNext()) {
byte[] tempRow = rowKeyIter.next();
rowKeyIter.remove();
if ((pointer != 0) && (pointer % splitKeysBase == 0)) {
if (index < splitKeysNumber) {
splitKeys[index] = tempRow;
index++;
}
}
pointer++;
}
rows.clear();
rows = null;
return splitKeys;
}
}


HashRowKeyGenerator:
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;

import com.kktest.hbase.BitUtils;
/**
*
*
**/
public class HashRowKeyGenerator implements RowKeyGenerator {
private static long currentId = 1;
private static long currentTime = System.currentTimeMillis();
//private static Random random = new Random();

public byte[] nextId()
{
try {
currentTime = getRowKeyResult(Long.MAX_VALUE - currentTime);
byte[] lowT = Bytes.copy(Bytes.toBytes(currentTime), 4, 4);
byte[] lowU = Bytes.copy(Bytes.toBytes(currentId), 4, 4);
byte[] result = Bytes.add(MD5Hash.getMD5AsHex(Bytes.add(lowT, lowU))
.substring(0, 8).getBytes(), Bytes.toBytes(currentId));
return result;
} finally {
currentId++;
}
}

/**
*  getRowKeyResult
* @param tmpData
* @return
*/
public static long getRowKeyResult(long tmpData)
{
String str = String.valueOf(tmpData);
StringBuffer sb = new StringBuffer();
char[] charStr = str.toCharArray();
for (int i = charStr.length -1 ; i > 0; i--)
{
sb.append(charStr[i]);
}

return Long.parseLong(sb.toString());
}
}


最后推荐About云论坛。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: