您的位置:首页 > 其它

hbase put regionserver处理分析

2014-04-15 13:24 381 查看

RegionServer端put数据流程分析:

client端通过MultiServerCallable.call调用rs的rpc的multi方法。
regionServer实例ClientProtos.ClientService.BlockingInterface接口。

publicMultiResponse
multi(finalRpcControllerrpcc,finalMultiRequest
request)
throwsServiceException {

//rpc controller is how we bring in data via the back door; it isunprotobuf'ed data.
//It is also the conduit via which we pass back data.
PayloadCarryingRpcControllercontroller= (PayloadCarryingRpcController)rpcc;
CellScannercellScanner=
controller!=
null?controller.cellScanner():null;
if(controller!=
null)controller.setCellScanner(null);
List<CellScannable>cellsToReturn=
null;
MultiResponse.BuilderresponseBuilder= MultiResponse.newBuilder();
RegionActionResult.BuilderregionActionResultBuilder=
RegionActionResult.newBuilder();

得到当前提交的数据,数据按region,list的方式传入过来。
for(RegionAction
regionAction:
request.getRegionActionList()){
this.requestCount.add(regionAction.getActionCount());
regionActionResultBuilder.clear();
HRegion
region;
try{
从当前regionserver中的onlnieRegions中得到请求的region.
1.从onlineRegions中取出HRegion实例,如果不为空,按如下流程走,否则:执行到5
2.如果onlineRegions列表中不包含此region,从movedRegions列表中拿到region,region的moved超时是2分钟,
如果movedRegions列表中能拿到此region,同时move时间超时,并从movedRegions列表中移出引region返回null,
否则返回正在moved的region,如果movedRegions中返回的region不为null,throwRegionMovedException
3.从regionsInTransitionInRS中获取此region,如果能拿到,同时拿到的值为true,表示region还在做opening操作。
Throw
RegionOpeningException
4.如果以上得到的值都为null,表示此server中没有此region,throw
NotServingRegionException
此时基本上只有一个可能,region在做split.或者move到其它server(刚完成move,client请求时不在此server)
5.如果1中拿到region,表示正常,region在此server中。

region=
getRegion(regionAction.getRegion());
}
catch(IOException
e){
regionActionResultBuilder.setException(ResponseConverter.buildException(e));
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
continue;
// For this region it's a failure.
}
检查是否是原子操作:
if(regionAction.hasAtomic()&&
regionAction.getAtomic()){
//How does this call happen? It may need some work to play well w/ thesurroundings.
//Need to return an item per Action along w/ Action index. TODO.
Try{
执行原子操作:保证一个region中所有的action的操作的mvcc的值相同,如果有一个操作失败,整体rollback
mutateRows(region,regionAction.getActionList(),cellScanner);
}
catch(IOException
e){
//As it's atomic, we may expect it's a global failure.
regionActionResultBuilder.setException(ResponseConverter.buildException(e));
}
}
else{
//doNonAtomicRegionMutation manages the exception internally
执行非原子操作:见doNonAtomicRegionMutation流程分析
cellsToReturn=
doNonAtomicRegionMutation(region,regionAction,cellScanner,
regionActionResultBuilder,cellsToReturn);
}
responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
}
//Load the controller with the Cells to return.
if(cellsToReturn!=
null&& !cellsToReturn.isEmpty()&&
controller!=
null){
如果client端传入有数据返回的压缩编码方式,把要返回的数据添加到cellsToReturn列表中。
同时response要返回的result只返回result的大小
如果是批量的get/append/increment操作时,建议是把codec的配置设置上。
此时表示是get/append/increment的请求,生成一个CellScanner实例,此实时是查询得到的所有Result
clent通过此CellScanner来获取数据。
controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
}
returnresponseBuilder.build();
}

doNonAtomicRegionMutation处理流程分析:

用于处理非原子性的put/delete/get操作。

privateList<CellScannable>doNonAtomicRegionMutation(finalHRegion
region,
finalRegionAction
actions,finalCellScannercellScanner,
finalRegionActionResult.Builder
builder,List<CellScannable>cellsToReturn){
//Gather up CONTIGUOUS Puts and Deletes in this mutations List. Ideais that rather than
do
//one at a time, we instead pass them in batch. Be aware that thecorresponding
//ResultOrException instance that matches each Put or Delete is thenadded down in the
//doBatchOp call. We should be staying aligned though the Put andDelete are deferred/batched
List<ClientProtos.Action>mutations
=null;
对request中指定region中所有的action进行迭代
for(ClientProtos.Action
action:actions.getActionList()){
ClientProtos.ResultOrException.BuilderresultOrExceptionBuilder=
null;
try{
Result
r= null;
如果此action是get操作,直接执行get
if(action.hasGet()){
Get
get= ProtobufUtil.toGet(action.getGet());
r=
region.get(get);
}
elseif(action.hasMutation()){
MutationTypetype
=action.getMutation().getMutateType();
if(type
!=MutationType.PUT&&
type!=
MutationType.DELETE&&
mutations!=
null&&
!mutations.isEmpty()){
//Flush out any Puts or Deletes already collected.
doBatchOp(builder,region,mutations,cellScanner);
mutations.clear();
}
switch(type)
{
如果kv中存在原来的数据,把新kv的数据添加到oldkv的后面,kv的大小等于oldkvsize+newkvsize
如果是新的kv,直接当成一列进行存储
caseAPPEND:
r=
append(region,action.getMutation(),cellScanner);
break;
值自动增加。
caseINCREMENT:
r=
increment(region,action.getMutation(),cellScanner);
break;
如果操作是put/delete,把所有的操作action集合在一起,
casePUT:
caseDELETE:
//Collect the individual mutations and apply in a batch
if(mutations==
null){
mutations=
newArrayList<ClientProtos.Action>(actions.getActionCount());
}
mutations.add(action);
break;
default:
thrownewDoNotRetryIOException("Unsupportedmutate
type: " + type.name());
}
}
else{
thrownewHBaseIOException("UnexpectedAction
type");
}
如果是get/append/increment操作,每一次都会得到一个result,把result添加到返回的response中。
if(r
!= null){
ClientProtos.Result
pbResult=
null;
如果client端传入有数据返回的压缩编码方式,把要返回的数据添加到cellsToReturn列表中。
同时response要返回的result只返回result的大小
如果是批量的get/append/increment操作时,建议是把codec的配置设置上。
if(isClientCellBlockSupport()){
pbResult= ProtobufUtil.toResultNoData(r);
// Hard to guess the size here. Just make a rough guess.
if(cellsToReturn==
null)cellsToReturn=
newArrayList<CellScannable>();
cellsToReturn.add(r);
}
else{
如果没有配置codec时,把result的所有数据写入到response中,当成clientrequest的响应信息,
这样如果批量的值比较大时,可能会影响到响应的速度。
pbResult= ProtobufUtil.toResult(r);
}
resultOrExceptionBuilder=
ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
}
//Could get to here and there was no result and no exception. Presumeswe added
//a Put or Delete to the collecting Mutations List for adding later. In this
//case the corresponding ResultOrException instance for the Put orDelete will be added
//down in the doBatchOp method call rather than up here.
}
catch(IOException
ie){
resultOrExceptionBuilder= ResultOrException.newBuilder().
setException(ResponseConverter.buildException(ie));
}
if(resultOrExceptionBuilder!=
null){
//Propagate index.
把第一个子的响应信息添加到集合列表中,等待执行完成统一进行响应。
resultOrExceptionBuilder.setIndex(action.getIndex());
builder.addResultOrException(resultOrExceptionBuilder.build());
}
}
//Finish up any outstanding mutations
针对put/delete操作,执行批量操作。
if(mutations!=
null&& !mutations.isEmpty()){
doBatchOp(builder,region,mutations,cellScanner);
}
returncellsToReturn;
}
针对put/delete操作的批量操作处理方法
protectedvoid
doBatchOp(finalRegionActionResult.Builder
builder,final HRegion
region,
finalList<ClientProtos.Action>mutations,finalCellScannercells)
{
Mutation[]mArray
=newMutation[mutations.size()];
longbefore
=EnvironmentEdgeManager.currentTimeMillis();
booleanbatchContainsPuts=
false,batchContainsDelete=
false;
try{
inti
= 0;
迭代要操作的所有action,生成put/delete的mutation实例
for(ClientProtos.Action
action:mutations){
MutationProto
m= action.getMutation();
Mutationmutation;
if(m.getMutateType()==
MutationType.PUT){
mutation= ProtobufUtil.toPut(m,cells);
batchContainsPuts=
true;
}
else{
mutation= ProtobufUtil.toDelete(m,cells);
batchContainsDelete=
true;
}
mArray[i++]=
mutation;
}

requestCount.add(mutations.size());
如果不是metatable的写入,也就是用户表写入,
if(!region.getRegionInfo().isMetaTable()){
检查并等待全局flush的完成。
1.检查全局的flush是否超过hbase.regionserver.global.memstore.upperLimit配置的值,默认是0.4
如果当前rs中所有的memstore的size总和超过了此值,强制进行flsuh,并等待flush完成。线程wait,
等待MemStoreFlusher.flushRegion去notify此线程的等待。
2.检查全局的flush是否超过hbase.regionserver.global.memstore.lowerLimit配置,默认为0.35
如果rs中所有的memstore的size总和超过了此值,发起flush请求,不等待flush完成,执行下面流程
cacheFlusher.reclaimMemStoreMemory();
}
通过HRegion执行更新操作。
OperationStatus
codes[]=
region.batchMutate(mArray,false);
处理更新后每一条数据是否成功的信息,并添加到response中。
for(i
= 0; i<
codes.length;i++) {
intindex
=mutations.get(i).getIndex();
Exception
e= null;
switch(codes[i].getOperationStatusCode()){
caseBAD_FAMILY:
出现这种情况表示要更新的action中指定的cf不存在或cf的name为null,
e=
newNoSuchColumnFamilyException(codes[i].getExceptionMsg());
builder.addResultOrException(getResultOrException(e,index));
break;

caseSANITY_CHECK_FAILURE:
出现这种情况表示要更新的action中有kv的timestamp的值超出了当前rs中的时间,
通过hbase.hregion.keyvalue.timestamp.slop.millisecs配置可超出的时间范围,默认为不控制
e=
newFailedSanityCheckException(codes[i].getExceptionMsg());
builder.addResultOrException(getResultOrException(e,index));
break;

default:
e=
newDoNotRetryIOException(codes[i].getExceptionMsg());
builder.addResultOrException(getResultOrException(e,index));
break;

caseSUCCESS:
正常结束
builder.addResultOrException(getResultOrException(ClientProtos.Result.getDefaultInstance(),index));
break;
}
}
}catch(IOException
ie){
for(inti
= 0; i<
mutations.size();i++) {
builder.addResultOrException(getResultOrException(ie,mutations.get(i).getIndex()));
}
}
更新监控数据
longafter
=EnvironmentEdgeManager.currentTimeMillis();
if(batchContainsPuts){
metricsRegionServer.updatePut(after-
before);
}
if(batchContainsDelete){
metricsRegionServer.updateDelete(after-
before);
}
}

Hregion.batchMutate:

OperationStatus[]batchMutate(Mutation[]mutations,
booleanisReplay)
throwsIOException {
初始化批量执行处理程序,把每一个action的status设置为OperationStatus.NOT_RUN
BatchOperationInProgress<Mutation>batchOp
=
newBatchOperationInProgress<Mutation>(mutations);

booleaninitialized=
false;
如果处理还没有结束,一直迭代,
Hregion.BatchOperationInProgress.nextIndexToProcess==要处理的action个数表示完成处理
while(!batchOp.isDone()){
if(!isReplay){
checkReadOnly();
}
checkResources();

longnewSize;
if(isReplay){
startRegionOperation(Operation.REPLAY_BATCH_MUTATE);
}
else{
startRegionOperation(Operation.BATCH_MUTATE);
}

try{
if(!initialized){
if(!isReplay){
region请求加一,
this.writeRequestsCount.increment();
迭代每一个action,执行cp的prePut/preDelete操作。
doPreMutationHook(batchOp);
}
initialized=
true;
}
执行更新操作,
检查cf是否合法,如果是put,检查put传入的cf是否在table中存在,不存在此action的status为BAD_FAMILY

检查put的timestamp是否合法,需要在rs当前时间的一个合理范围内。

不在范围内此action.status为SANITY_CHECK_FAILURE

检查delete的cf是否合法。

如果是put的action,更新put的所有kv中timestamp为当前的rs时间

如果是delete的action,通过get先读取每一个cf中的数据,检查是否需要执行删除,
删除的timestamp为Long.MAX_VALUE
在以上检查完成后,如果部分检查不合法的action,它们的状态为非OperationStatusCode.NOT_RUN状态
把所有现在是NOT_RUN状态的action添加到对应的cf的memstore中。
每一个store中kv的MvccVersion的值为mvcc中memstoreWrite的值(regionmax
seqid + 1)
根据更新的action中所有的store(cf),分别调用region中不同的store的add(HStore.add)方法添加到memstore中,
一个action中不同的store中所有的kv的mvccversion的都相同。
把更新过的每一个action的status的状态设置为OperationStatus.SUCCESS
写入wal日志,通过append方式添加日志,
日志的flush通过hbase.regionserver.optionallogflushinterval进行配置,默认为1*1000ms,-1表示实时更新
或可以通过table定义的时候设置DURABILITY属性,可设置为SYNC_WAL/FSYNC_WAL表示实时更新日志
得到当此添加的所的kv的总大小。

longaddedSize
=doMiniBatchMutation(batchOp,isReplay);

把当前更新的size添加到rs中的全局memstore的大小,atomicGlobalMemstoreSize
把当前更新的size添加到当前region的memstore中。memstoreSize
newSize=
this.addAndGetGlobalMemstoreSize(addedSize);
}
finally{
closeRegionOperation();
}
检查当前region中memstore的大小是否超过hbase.hregion.memstore.flush.size配置的大小,默认1024*1024*128L
如果需要flush,通过MemStoreFlusher.requestFlush(HRegion)发起flush请求
if(isFlushSize(newSize)){
requestFlush();
}
}
returnbatchOp.retCodeDetails;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: