您的位置:首页 > 其它

base源码分析(一):客户端数据入库

2013-04-20 14:13 337 查看
base源码分析(一):客户端数据入库

Hbase插入数据的过程大致是:

客户端提交请求给region server(这中间会有作一些缓存)
region server接收到请求,判断如果是put请求,将其put到memstore
每次memstore的操作,都会检查memstore是否操作一个阈值,如果超过,就开始执行flush(),这个flush其实就是从内存中的KeyValue对持久化到HStore(也就是HFile)上面

下面我们来看一条数据时怎么进入到hbase的吧:

客户端:

HTable.java 执行put操作

public void put(final Put put) throws IOException {
doPut(Arrays.asList(put));
}


在put方法里执行doPut操作

验证put的合法性,然后检查keyvalue的大小是否越界,这个值可以如过配置i参数hbase.client.keyvalue.maxsize参数来配置,默认这个值是无限大的,然后调用writeBuffer.add(put);将数据写入到本地缓存,当数据超过本地缓存writeBufferSize(默认是2097152)的大小或者设置了自动提交autoFlush (默认是打开的尾true)或者你手动调用了flushCommits()操作,这些缓存将被flush

private void doPut(final List<Put> puts) throws IOException {
int n = 0;
for (Put put : puts) {
validatePut(put);
writeBuffer.add(put);//将数据写入到本地缓存
currentWriteBufferSize += put.heapSize();

// we need to periodically see if the writebuffer is full instead of waiting until the end of the List
n++;
if (n % DOPUT_WB_CHECK == 0 && currentWriteBufferSize > writeBufferSize) {
flushCommits();
}
}
if (autoFlush || currentWriteBufferSize > writeBufferSize) {
flushCommits();
}
}


flushCommits()操作代码:

public void flushCommits() throws IOException {
try {
Object[] results = new Object[writeBuffer.size()];
try {
//在这里连接远程的region server提交请求
this.connection.processBatch(writeBuffer, tableName, pool, results);
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
// mutate list so that it is empty for complete success, or contains
// only failed records results are returned in the same order as the
// requests in list walk the list backwards, so we can remove from list
// without impacting the indexes of earlier members
for (int i = results.length - 1; i>=0; i--) {
if (results[i] instanceof Result) {
// successful Puts are removed from the list here.
writeBuffer.remove(i);
}
}
}
} finally {
if (clearBufferOnFail) {
writeBuffer.clear();
currentWriteBufferSize = 0;
} else {
// the write buffer was adjusted by processBatchOfPuts
currentWriteBufferSize = 0;
for (Put aPut : writeBuffer) {
currentWriteBufferSize += aPut.heapSize();
}
}
}
}

我们来看看HConnection.java的实现类HConnectionImplementation是怎么实现processBatch操作的:

public void processBatch(List<? extends Row> list,
final byte[] tableName,
ExecutorService pool,
Object[] results) throws IOException, InterruptedException {

// results must be the same size as list
if (results.length != list.size()) {
throw new IllegalArgumentException("argument results must be the same size as argument list");
}

processBatchCallback(list, tableName, pool, results, null);
}
public <R> void processBatchCallback(
List<? extends Row> list,
byte[] tableName,
ExecutorService pool,
Object[] results,
Batch.Callback<R> callback)
throws IOException, InterruptedException {

// results must be the same size as list
if (results.length != list.size()) {
throw new IllegalArgumentException(
"argument results must be the same size as argument list");
}
if (list.isEmpty()) {
return;
}

// Keep track of the most recent servers for any given item for better
// exceptional reporting.  We keep HRegionLocation to save on parsing.
// Later below when we use lastServers, we'll pull what we need from
// lastServers.
HRegionLocation [] lastServers = new HRegionLocation[results.length];
List<Row> workingList = new ArrayList<Row>(list);
boolean retry = true;
// count that helps presize actions array
int actionCount = 0;
Throwable singleRowCause = null;

for (int tries = 0; tries < numRetries && retry; ++tries) {

// sleep first, if this is a retry
if (tries >= 1) {
long sleepTime = getPauseTime(tries);
LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");
Thread.sleep(sleepTime);
}
// step 1: break up into regionserver-sized chunks and build the data structs
Map<HRegionLocation, MultiAction<R>> actionsByServer =
new HashMap<HRegionLocation, MultiAction<R>>();
for (int i = 0; i < workingList.size(); i++) {
Row row = workingList.get(i);
if (row != null) {
HRegionLocation loc = locateRegion(tableName, row.getRow(), true);
byte[] regionName = loc.getRegionInfo().getRegionName();

MultiAction<R> actions = actionsByServer.get(loc);
if (actions == null) {
actions = new MultiAction<R>();
actionsByServer.put(loc, actions);
}

Action<R> action = new Action<R>(row, i);
lastServers[i] = loc;
actions.add(regionName, action);
}
}

// step 2: make the requests

Map<HRegionLocation, Future<MultiResponse>> futures =
new HashMap<HRegionLocation, Future<MultiResponse>>(
actionsByServer.size());

for (Entry<HRegionLocation, MultiAction<R>> e: actionsByServer.entrySet()) {
futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
}

// step 3: collect the failures and successes and prepare for retry

for (Entry<HRegionLocation, Future<MultiResponse>> responsePerServer
: futures.entrySet()) {
HRegionLocation loc = responsePerServer.getKey();

try {
Future<MultiResponse> future = responsePerServer.getValue();
MultiResponse resp = future.get();

if (resp == null) {
// Entire server failed
LOG.debug("Failed all for server: " + loc.getHostnamePort() +
", removing from cache");
continue;
}

for (Entry<byte[], List<Pair<Integer,Object>>> e : resp.getResults().entrySet()) {
byte[] regionName = e.getKey();
List<Pair<Integer, Object>> regionResults = e.getValue();
for (Pair<Integer, Object> regionResult : regionResults) {
if (regionResult == null) {
// if the first/only record is 'null' the entire region failed.
LOG.debug("Failures for region: " +
Bytes.toStringBinary(regionName) +
", removing from cache");
} else {
// Result might be an Exception, including DNRIOE
results[regionResult.getFirst()] = regionResult.getSecond();
if (callback != null && !(regionResult.getSecond() instanceof Throwable)) {
callback.update(e.getKey(),
list.get(regionResult.getFirst()).getRow(),
(R)regionResult.getSecond());
}
}
}
}
} catch (ExecutionException e) {
LOG.warn("Failed all from " + loc, e);
}
}

// step 4: identify failures and prep for a retry (if applicable).

// Find failures (i.e. null Result), and add them to the workingList (in
// order), so they can be retried.
retry = false;
workingList.clear();
actionCount = 0;
for (int i = 0; i < results.length; i++) {
// if null (fail) or instanceof Throwable && not instanceof DNRIOE
// then retry that row. else dont.
if (results[i] == null ||
(results[i] instanceof Throwable &&
!(results[i] instanceof DoNotRetryIOException))) {

retry = true;
actionCount++;
Row row = list.get(i);
workingList.add(row);
deleteCachedLocation(tableName, row.getRow());
} else {
if (results[i] != null && results[i] instanceof Throwable) {
actionCount++;
}
// add null to workingList, so the order remains consistent with the original list argument.
workingList.add(null);
}
}
}

if (retry) {
// Simple little check for 1 item failures.
if (singleRowCause != null) {
throw new IOException(singleRowCause);
}
}

List<Throwable> exceptions = new ArrayList<Throwable>(actionCount);
List<Row> actions = new ArrayList<Row>(actionCount);
List<String> addresses = new ArrayList<String>(actionCount);

for (int i = 0 ; i < results.length; i++) {
if (results[i] == null || results[i] instanceof Throwable) {
exceptions.add((Throwable)results[i]);
actions.add(list.get(i));
addresses.add(lastServers[i].getHostnamePort());
}
}

if (!exceptions.isEmpty()) {
throw new RetriesExhaustedWithDetailsException(exceptions,
actions,
addresses);
}
}


通过RPC向Region Server提交数据,

private <R> Callable<MultiResponse> createCallable(final HRegionLocation loc,
final MultiAction<R> multi, final byte [] tableName) {
final HConnection connection = this;
return new Callable<MultiResponse>() {
public MultiResponse call() throws IOException {
return getRegionServerWithoutRetries(
new ServerCallable<MultiResponse>(connection, tableName, null) {
public MultiResponse call() throws IOException {
return server.multi(multi);
}
@Override
public void connect(boolean reload) throws IOException {
server =
connection.getHRegionConnection(loc.getHostname(), loc.getPort());
}
}
);
}
};
}


获取RPC实例的操作:

HRegionInterface getHRegionConnection(final String hostname, final int port,
final InetSocketAddress isa, final boolean master)
throws IOException {
if (master) getMaster();
HRegionInterface server;
String rsName = null;
if (isa != null) {
rsName = Addressing.createHostAndPortStr(isa.getHostName(),
isa.getPort());
} else {
rsName = Addressing.createHostAndPortStr(hostname, port);
}
// See if we already have a connection (common case)
server = this.servers.get(rsName);
if (server == null) {
// create a unique lock for this RS (if necessary)
this.connectionLock.putIfAbsent(rsName, rsName);
// get the RS lock
synchronized (this.connectionLock.get(rsName)) {
// do one more lookup in case we were stalled above
server = this.servers.get(rsName);
if (server == null) {
try {
if (clusterId.hasId()) {
conf.set(HConstants.CLUSTER_ID, clusterId.getId());
}
// Only create isa when we need to.
InetSocketAddress address = isa != null? isa:
new InetSocketAddress(hostname, port);
// definitely a cache miss. establish an RPC for this RS
server = (HRegionInterface) HBaseRPC.waitForProxy(
serverInterfaceClass, HRegionInterface.VERSION,
address, this.conf,
this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
this.servers.put(Addressing.createHostAndPortStr(
address.getHostName(), address.getPort()), server);
} catch (RemoteException e) {
LOG.warn("RemoteException connecting to RS", e);
// Throw what the RemoteException was carrying.
throw e.unwrapRemoteException();
}
}
}
}
return server;
}


hbase client在执行插入的时候,会对最近使用的region server做缓存,如果缓存中保存了相应的region server信息,就直接使用这个region信息,连接这个region server,否则会对master进行一次rpc操作,获得region server信息,客户端的操作put、get、delete等操作每次都是封装在一个Action对象中进行提交操作的,都是一系列的的action一起提交,这就是MultiAction

Server端操作:

客户端通过RPC提交过来的操作会进入到HRegionServer.multi(MultiAction<R> multi)中处理插入请求。

出去每一个action对象,判断属于哪一个实例(put/get/delete),来执行相应的操作
给每个put分配一个lock
执行HRgion.put,进行数据写入操作

HRegionServer.java

@SuppressWarnings("unchecked")
@Override
public <R> MultiResponse multi(MultiAction<R> multi) throws IOException {
checkOpen();
MultiResponse response = new MultiResponse();
for (Map.Entry<byte[], List<Action<R>>> e : multi.actions.entrySet()) {
byte[] regionName = e.getKey();
List<Action<R>> actionsForRegion = e.getValue();
// sort based on the row id - this helps in the case where we reach the
// end of a region, so that we don't have to try the rest of the
// actions in the list.
Collections.sort(actionsForRegion);
Row action;
List<Action<R>> puts = new ArrayList<Action<R>>();
for (Action<R> a : actionsForRegion) {
action = a.getAction();
int originalIndex = a.getOriginalIndex();

try {
//判断action是哪种操作
if (action instanceof Delete) {
delete(regionName, (Delete) action);
response.add(regionName, originalIndex, new Result());
} else if (action instanceof Get) {
response.add(regionName, originalIndex, get(regionName, (Get) action));
} else if (action instanceof Put) {
puts.add(a);  // wont throw.
} else if (action instanceof Exec) {
ExecResult result = execCoprocessor(regionName, (Exec)action);
response.add(regionName, new Pair<Integer, Object>(
a.getOriginalIndex(), result.getValue()
));
} else {
LOG.debug("Error: invalid Action, row must be a Get, Delete, " +
"Put or Exec.");
throw new DoNotRetryIOException("Invalid Action, row must be a " +
"Get, Delete or Put.");
}
} catch (IOException ex) {
response.add(regionName, originalIndex, ex);
}
}

// We do the puts with result.put so we can get the batching efficiency
// we so need. All this data munging doesn't seem great, but at least
// we arent copying bytes or anything.
if (!puts.isEmpty()) {
try {
HRegion region = getRegion(regionName);

if (!region.getRegionInfo().isMetaTable()) {
this.cacheFlusher.reclaimMemStoreMemory();
}

List<Pair<Put,Integer>> putsWithLocks =
Lists.newArrayListWithCapacity(puts.size());
for (Action<R> a : puts) {
Put p = (Put) a.getAction();

Integer lock;
try {
//获取lock
lock = getLockFromId(p.getLockId());
} catch (UnknownRowLockException ex) {
response.add(regionName, a.getOriginalIndex(), ex);
continue;
}
putsWithLocks.add(new Pair<Put, Integer>(p, lock));
}

this.requestCount.addAndGet(puts.size());
//调用将数据写入到region中
OperationStatus[] codes =
region.put(putsWithLocks.toArray(new Pair[]{}));

for( int i = 0 ; i < codes.length ; i++) {
OperationStatus code = codes[i];

Action<R> theAction = puts.get(i);
Object result = null;

if (code.getOperationStatusCode() == OperationStatusCode.SUCCESS) {
result = new Result();
} else if (code.getOperationStatusCode()
== OperationStatusCode.BAD_FAMILY) {
result = new NoSuchColumnFamilyException(code.getExceptionMsg());
}
// FAILURE && NOT_RUN becomes null, aka: need to run again.

response.add(regionName, theAction.getOriginalIndex(), result);
}
} catch (IOException ioe) {
// fail all the puts with the ioe in question.
for (Action<R> a: puts) {
response.add(regionName, a.getOriginalIndex(), ioe);
}
}
}
}
return response;
}

HRegion.java的put操作:
/**
* @param put
* @param lockid
* @param writeToWAL
* @throws IOException
*/
public void put(Put put, Integer lockid, boolean writeToWAL)
throws IOException {
//检查region是否只读,如果只读,就会抛出异常
checkReadOnly();

// Do a rough check that we have resources to accept a write.  The check is
// 'rough' in that between the resource check and the call to obtain a
// read lock, resources may run out.  For now, the thought is that this
// will be extremely rare; we'll deal with it when it happens.
checkResources();
//获取的lock
startRegionOperation();
this.writeRequestsCount.increment();
try {
// We obtain a per-row lock, so other clients will block while one client
// performs an update. The read lock is released by the client calling
// #commit or #abort or if the HRegionServer lease on the lock expires.
// See HRegionServer#RegionListener for how the expire on HRegionServer
// invokes a HRegion#abort.
byte [] row = put.getRow();
// If we did not pass an existing row lock, obtain a new one
Integer lid = getLock(lockid, row, true);

try {
// All edits for the given row (across all column families) must happen atomically.
internalPut(put, put.getClusterId(), writeToWAL);
} finally {
if(lockid == null) releaseRowLock(lid);
}
} finally {
closeRegionOperation();
}
}

checkResource()操作:

在实际执行put执行,先要进行必要的检查操作,我们来看看checkResource()方法。

private void checkResources() {

// If catalog region, do not impose resource constraints or block updates.
if (this.getRegionInfo().isMetaRegion()) return;

boolean blocked = false;
while (this.memstoreSize.get() > this.blockingMemStoreSize) {
requestFlush();
if (!blocked) {
LOG.info("Blocking updates for '" + Thread.currentThread().getName() +
"' on region " + Bytes.toStringBinary(getRegionName()) +
": memstore size " +
StringUtils.humanReadableInt(this.memstoreSize.get()) +
" is >= than blocking " +
StringUtils.humanReadableInt(this.blockingMemStoreSize) + " size");
}
blocked = true;
synchronized(this) {
try {
wait(threadWakeFrequency);
} catch (InterruptedException e) {
// continue;
}
}
}
if (blocked) {
LOG.info("Unblocking updates for region " + this + " '"
+ Thread.currentThread().getName() + "'");
}
}

可以看出当Hregion的Memstore总大小超过blockingMemStoreSize,则会进入flush操作,线程会进入到阻塞状态,直到memstoresize的值降到合适的范围内。

internalPut这个操作包括:

checkFamilies 检查列族
updateKVTimestamps 更新KeyValue的时间戳
addFamilyMapToWALEdit 预写日志
applyFamilyMapToMemstore 将数据写入到memstore中
isFlushSize 判断是否将文件flush到HFile中
释放锁
将memstore的数据flush到HFile中

本文仅是个人理解,有什么不正确的地方肯定指正
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: