您的位置:首页 > 其它

日志重播分析

2014-04-13 16:41 302 查看

日志重播分析

Hbase的日志重播分为启动时的日志重播与rs下线时的日志重播操作。

通过hbase.master.distributed.log.replay来控制日志的split是在region的reopen前执行还是reopen后执行

如果是true表示在reopen后执行,否则相反

Rs下线时的日志重播分析

master监听下线

master通过RegionServerTracker监听rs在zk上的节点,当节点被删除时(rs下线)。触发nodeDeleted

publicvoidnodeDeleted(Stringpath)
{
if(path.startsWith(watcher.rsZNode)){
解析出zk中rs路径下的rs名称,并解析成ServerName实例。
String
serverName= ZKUtil.getNodeName(path);
....................此处部分日志打印信息没有显示
ServerName
sn= ServerName.parseServerName(serverName);
如果下线的rs在ServerManager的onlineServers中已经不包含,不做处理,
if(!serverManager.isServerOnline(sn)){
....................此处部分日志打印信息没有显示
return;
}
从RegionServerTracker.onlineServers列表中移出此rs
remove(sn);
执行ServerManager.expireServer进行下线处理
this.serverManager.expireServer(sn);
}
}
执行ServerManager.expireServer进行下线处理
publicsynchronizedvoidexpireServer(finalServerName
serverName){
....................此处部分代码没有显示

把rs添加到deadservers列表中。
this.deadservers.add(serverName);
从onlineServers列表中移出此rs
this.onlineServers.remove(serverName);
synchronized(onlineServers){
onlineServers.notifyAll();
}
从rsAdmins(对rs进行RPC调用的接口实现类)容器中移出此rs
this.rsAdmins.remove(serverName);
....................此处部分代码没有显示

检查此rs中是否包含meta的region,如果是,执行MetaServerShutdownHandler.否则执行ServerShutdownHandler
booleancarryingMeta=
services.getAssignmentManager().isCarryingMeta(serverName);
if(carryingMeta){
this.services.getExecutorService().submit(newMetaServerShutdownHandler(this.master,
this.services,this.deadservers,serverName));
}else{
this.services.getExecutorService().submit(newServerShutdownHandler(this.master,
this.services,this.deadservers,serverName,true));
}
....................此处部分日志打印没有显示
}

MetaServerShutdownHandler.process方法处理流程:

publicvoidprocess()throwsIOException
{
booleangotException=
true;
try{
AssignmentManager
am=
this.services.getAssignmentManager();
try{
检查是否需要做hlog的split,生成此实例时,shouldSplitHlog的值为true
if(this.shouldSplitHlog){
LOG.info("Splittinghbase:meta
logs for " +serverName);

检查hbase.master.distributed.log.replay配置是否设置为true,默认值为false
if(this.distributedLogReplay){

先对metaregion执行prepareLogReplay处理。
见MasterFileSystem.prepareLogReplay分析
Set<HRegionInfo>regions
=newHashSet<HRegionInfo>();
regions.add(HRegionInfo.FIRST_META_REGIONINFO);
this.services.getMasterFileSystem().prepareLogReplay(serverName,regions);
}
else{

否则在没有配置distributedLogReplay时,执行splitMetaLog对rs的日志进行split,等待split完成
见MasterFileSystem.splitMetaLog分析
this.services.getMasterFileSystem().splitMetaLog(serverName);
}

从AssignmentManager.RegionStates.lastAssignments中移出metaregion的分配。
am.getRegionStates().logSplit(HRegionInfo.FIRST_META_REGIONINFO);
}
}
catch(IOException
ioe){
....................此处部分代码没有显示
}

//Assign meta if we were carrying it.
//Check again: region may be assigned to other where because of RIT
//timeout
检查此server上还没有完成regionopen操作(regionInTransition还在)如果包含有metaregion,
if(am.isCarryingMeta(serverName)){
LOG.info("Server"
+ serverName+
" was carrying META. Trying toassign.");
更新RegionStates中此region的状态为offline
从regionsInTransition中移出此region,
从serverHoldings中移出此server中metaregion的分配信息
从regionAssignments中移出此metaregion的分配信息
从regionsToReopen中移出此metaregion
从regionPlans中移出此metaregion
am.regionOffline(HRegionInfo.FIRST_META_REGIONINFO);
等待metaregion的分配,
通过hbase.catalog.verification.retries配置metaregion分配的重试次数,默认10次
通过hbase.catalog.verification.timeout配置每次分配重试的间隔时间,默认1000ms
verifyAndAssignMetaWithRetries();

如果meta在zk中的地址过期数据被删除,重新执行meta的分配,并等待meta分配完成
}
elseif(!this.services.getCatalogTracker().isMetaLocationAvailable()){
//the meta location as per master is null. This could happen in casewhen meta assignment
//in previous run failed, while meta
znodehas been updated to null. We should try to
//assign the meta again.
如果metaregion在zk中的地址没有注册的数据,执行metaregion的分配,并等待分配结束
通过hbase.catalog.verification.retries配置metaregion分配的重试次数,默认10次
通过hbase.catalog.verification.timeout配置每次分配重试的间隔时间,默认1000ms
verifyAndAssignMetaWithRetries();
}
else{
LOG.info("METAhas
been assigned to otherwhere, skip assigning.");
}

try{
如果distributedLogReplay配置为true,等待regionreplay的regionintranstion事务完成
也就是RegionStates.regionsInTransition中不在包含此metaregion的regionintransition
regionreplay的等待超时通过hbase.master.log.replay.wait.region.timeout配置,默认为15000ms
如果在超时的时间内没有完成regionintransition时,此方法返回false
if(this.shouldSplitHlog&&
this.distributedLogReplay){
if(!am.waitOnRegionToClearRegionsInTransition(HRegionInfo.FIRST_META_REGIONINFO,
regionAssignmentWaitTimeout)){
....................此处部分代码没有显示
}

执行logsplit,并等待split完成,如果是distributedLogReplay时,
此时regionassign已经完成,开始spltlog
见MasterFileSystem.splitMetaLog分析
this.services.getMasterFileSystem().splitMetaLog(serverName);
}
}
catch(Exception
ex){
....................此处部分代码没有显示
}

gotException=
false;
}finally{
if(gotException){
//If we had an exception, this.deadServers.finish will be skipped insuper.process()
this.deadServers.finish(serverName);
}

}
执行此rs中非metaregion的日志重播与regionassign,
见ServerShutdownHandler.process方法处理流程

super.process();
}

MasterFileSystem.prepareLogReplay分析

此方法在hbase.master.distributed.log.replay配置为true时,分执行此操作

publicvoid
prepareLogReplay(ServerNameserverName,
Set<HRegionInfo>regions)
throwsIOException {
一些必要的检查,检查是否设置有日志的分布式replay模式,要进行分布式日志的region列表是否为空
if(!this.distributedLogReplay){
return;
}
//mark regions in recovering state
if(regions
==null||
regions.isEmpty()){
return;
}
try{
通过SplitLogManager.markRegionsRecoveringInZK在/hbase/recovering-regions中添加region路径
this.splitLogManager.markRegionsRecoveringInZK(serverName,regions);
}catch(KeeperExceptione)
{
thrownewIOException(e);
}
}

执行distributedLogReplay
voidmarkRegionsRecoveringInZK(finalServerName
serverName, Set<HRegionInfo>userRegions)
throwsKeeperException {
一些必要的检查,检查是否设置有日志的分布式replay模式,要进行分布式日志的region列表是否为空
if(userRegions==
null|| !this.distributedLogReplay){
return;
}

try{
this.recoveringRegionLock.lock();
//mark that we're creating recovering
znodes
更新SplitLogManager中最后一次recoveringnode的时间为当前时间
this.lastRecoveringNodeCreationTime=
EnvironmentEdgeManager.currentTimeMillis();

开始迭代执行要replay的每一个region,如果是metaregion,此时只有一个迭代
for(HRegionInfo
region:
userRegions){
String
regionEncodeName=
region.getEncodedName();
得到hbase.splitlog.zk.retries配置的在zk中创建子路径的最大重试次数,默认为3
longretries
=this.zkretries;

do{
在zookeeper.znode.recovering.regions配置的路径下生成一个通过regionname为名称的子路径
默认为/hbase/recovering-regions/region-name
String
nodePath= ZKUtil.joinZNode(watcher.recoveringRegionsZNode,regionEncodeName);
longlastRecordedFlushedSequenceId=
-1;
try{
得到region中的最大的seqid,此seqid在ServerManager.flushedSequenceIdByRegion中存储,
记录着region中flush的最大的seqid
longlastSequenceId=
this.master.getServerManager().getLastFlushedSequenceId(
regionEncodeName.getBytes());
....................此处部分代码没有显示

检查在zk中的recovering-regions中是否已经包含此region,
byte[]data
=ZKUtil.getData(this.watcher,nodePath);
if(data
==null){
如果recovering-regions中还不包含此region的replay信息,
把region的最后一次flush的seqid写入到replay路径下
ZKUtil.createSetData(this.watcher,nodePath,
ZKUtil.positionToByteArray(lastSequenceId));
}
else{
如果recovering-regions中已经包含有此region的replay信息,
得到上一次region的recovering的seqid,
如果上一次的seqid小于当前region的最后一次flush的seqid,更新zk中此region的replay的seqid为最新的seqid
否则不做修改(上一次flush的seqid比记录的flush的seqid更加的新)
lastRecordedFlushedSequenceId= SplitLogManager.parseLastFlushedSequenceIdFrom(data);
if(lastRecordedFlushedSequenceId<
lastSequenceId){
//update last flushed sequence id in the region level
ZKUtil.setData(this.watcher,nodePath,ZKUtil.positionToByteArray(lastSequenceId));
}
}
//go one level deeper with server name
在recovering-regions/region-name下生成当前下线的server子路径
nodePath= ZKUtil.joinZNode(nodePath,serverName.getServerName());
如果当前region的flush的seqid小于上一次的recovering中replay的seqid,
(上一次flush的seqid比记录的flush的seqid更加的新),更新region的lastflush
seqid为上一次的seqid
if(lastSequenceId<=
lastRecordedFlushedSequenceId){
//the newly assigned RS failed even before any flush to the region
lastSequenceId=
lastRecordedFlushedSequenceId;
}

在/hbase/recovering-regions/region-name/server-name路径下记录最后一次flush的seqid.
ZKUtil.createSetData(this.watcher,nodePath,
ZKUtil.regionSequenceIdsToByteArray(lastSequenceId,null));

....................此处部分代码没有显示
break;
}
catch(KeeperExceptione)
{
....................此处部分代码没有显示
}
}
while((--retries)> 0 && (!this.stopper.isStopped()));
}
}finally{
this.recoveringRegionLock.unlock();
}
}

MasterFileSystem.splitMetaLog分析

splitMetaLog传入需要执行split操作的server(下线的server),方法去调用splitLog方法,
由于此时只针对metaregion的hlog时行split,因此在splitLog方法时传入META_FILTER来区分是否metasplit

publicvoid
splitLog(finalSet<ServerName>serverNames,
PathFilterfilter)
throwsIOException {
longsplitTime
=0, splitLogSize= 0;
从/hbase/WALs的日志路径下得到通过下线的servers命名的所有日志路径,老版本中.logs目录
如:/hbase/WALs/server-name1
并把下线的server路径名称更新为/hbase/WALs/server-name1-splitting路径
List<Path>logDirs
=getLogDirs(serverNames);
把下线的所有rsserver添加到SplitLogManager.deadWorkers中,
等待SplitLogManager.TimeoutMonitor线程定期去处理,
见SplitLogManager.TimeoutMonitor线程分析
splitLogManager.handleDeadWorkers(serverNames);
splitTime= EnvironmentEdgeManager.currentTimeMillis();
执行hlogsplit操作,见SplitLogManager.splitLogDistributed分析
splitLogSize=
splitLogManager.splitLogDistributed(serverNames,logDirs,filter);
splitTime= EnvironmentEdgeManager.currentTimeMillis()-
splitTime;

....................此处部分代码没有显示,监控信息
}

MasterFileSystem.splitLog分析

splitLog传入需要执行split操作的server(下线的server),方法去调用splitLog方法,
由于此时只针对非metaregion的hlog时行split,
因此在splitLog方法时传入NON_META_FILTER来区分是否非metasplit

publicvoid
splitLog(finalSet<ServerName>serverNames,
PathFilterfilter)
throwsIOException {
longsplitTime
=0, splitLogSize= 0;
从/hbase/WALs的日志路径下得到通过下线的servers命名的所有日志路径,老版本中.logs目录
如:/hbase/WALs/server-name1
并把下线的server路径名称更新为/hbase/WALs/server-name1-splitting路径
List<Path>logDirs
=getLogDirs(serverNames);
把下线的所有rsserver添加到SplitLogManager.deadWorkers中,
等待SplitLogManager.TimeoutMonitor线程定期去处理,
见SplitLogManager.TimeoutMonitor线程分析
splitLogManager.handleDeadWorkers(serverNames);
splitTime= EnvironmentEdgeManager.currentTimeMillis();
执行hlogsplit操作,见SplitLogManager.splitLogDistributed分析
splitLogSize=
splitLogManager.splitLogDistributed(serverNames,logDirs,filter);
splitTime= EnvironmentEdgeManager.currentTimeMillis()-
splitTime;

....................此处部分代码没有显示,监控信息
}

SplitLogManager.splitLogDistributed分析

此方法主要用于对serverhlog根据region进行split操作,生成splittask,并等待split完成。
publiclong
splitLogDistributed(finalSet<ServerName>serverNames,
finalList<Path>logDirs,
PathFilterfilter)throwsIOException
{

....................此处部分代码没有显示,监控信息,日志信息

得到/hbase/WALs/server-name-splitting下的所有日志文件,
如果传入的filter为META_FILTER,那么只获取.meta的hlog文件,否则获取全部hlog文件

FileStatus[]
logfiles=
getFileList(logDirs,filter);

....................此处部分代码没有显示,监控信息,日志信息

longtotalSize
=0;
TaskBatch
batch=
newTaskBatch();
Boolean
isMetaRecovery= (filter==
null)?
null:
false;
for(FileStatus
lf:
logfiles){

....................此处部分代码没有显示,监控信息,日志信息

totalSize+=
lf.getLen();
得到日志文件路径去掉/hbase的部分名称,如/WALs/server-name-splitting/aaa.meta

String
pathToLog=
FSUtils.removeRootPath(lf.getPath(),conf);

1.把hlog的全路径去掉/hbase部分通过URLEncoder.encode进行转码(/会被转换成%2F)
2.把hlog的全路径添加到zookeeper.znode.splitlog配置的路径下默认为splitWAL,作为其子路径存在。
3.在SplitLogManager.tasks中添加一个Task实例,key为2中zk生成的path,value为生成的Task实例,
设置Task的status为IN_PROGRESS并把task的batch实例为上面生成的TaskBatch实例(batch),
把batch中的installed加一,表示增加一个批量执行的Task
4.根据hbase.splitlog.zk.retries配置的zk重试次数,默认为3,
生成SplitLogTask实例,设置其originServer为master的ServerName
设置其state为ZooKeeperProtos.SplitLogTask.State.UNASSIGNED

在zk中注册此地址,并把SplitLogTask写入到此zk的路径下。

5.regionserver中监听zk的splitWAL的路径,
见regionserver中处理splitlog
6.master中通过SplitLogManager.nodeDataChanged来监听rs中SplitLogTask的状态修改。
见SplitLogManager.nodeDataChanged分析

if(!enqueueSplitTask(pathToLog,batch))
{
thrownewIOException("duplicatelog
split scheduled for " +lf.getPath());
}
}

等待split操作完成,
a.batch中所有的Task.status为TerminationStatus.IN_PROGRESS的task个数为0

b.splitWAL路径下的所有子路径的个数为0
c.每次迭代都需要等待batch被nodeDataChanged或者其它地方对batch进行notify

waitForSplittingCompletion(batch,status);
//remove recovering regions from ZK
if(filter
==MasterFileSystem.META_FILTER/* reference comparison */){
....................此处部分代码没有显示,日志信息
isMetaRecovery=
true;
}

删除zk的recovering-regions下对应的region路径下传入的servers子路径
(如果region下所有的servers子路径不存在,直接删除region子路径)
如果isMetaRecovery等于true表示只删除metaregion的recovering路径
this.removeRecoveringRegionsFromZK(serverNames,isMetaRecovery);

如果有日志split出现错误,直接throwIOException
if(batch.done!=
batch.installed){
batch.isDead=
true;
....................此处部分代码没有显示,日志信息
thrownewIOException(msg);
}
for(PathlogDir:logDirs){
status.setStatus("Cleaningup
log directory...");
try{
删除WALs目录下对应的server-name-splitting的日志文件。
if(fs.exists(logDir)&&
!fs.delete(logDir,false)){
LOG.warn("Unableto
delete log src dir. Ignoring. "+ logDir);
}
}
catch(IOException
ioe){
....................此处部分代码没有显示,日志信息
}
SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
}
....................此处部分代码没有显示,监控信息,日志信息
returntotalSize;
}

ServerShutdownHandler.process方法处理流程:

ServerShutdownHandler的处理流程主要对非metaregion的下线处理,region的重新分配,日志split

publicvoid
process()throws IOException {
booleanhasLogReplayWork=
false;
finalServerName
serverName=
this.serverName;
try{
....................此处部分代码没有显示

AssignmentManager
am=
services.getAssignmentManager();
if(isCarryingMeta()//
hbase:meta
||!am.isFailoverCleanupDone()){
this.services.getServerManager().processDeadServer(serverName,this.shouldSplitHlog);
return;
}

....................此处部分代码没有显示

NavigableMap<HRegionInfo,Result>hris
=null;
while(!this.server.isStopped()){
try{
从meta表中进行scan,扫描出当前下线的regionserver中所有的userregion
列表。
this.server.getCatalogTracker().waitForMeta();
hris= MetaReader.getServerUserRegions(this.server.getCatalogTracker(),
this.serverName);
break;
}
catch(InterruptedException
e){
Thread.currentThread().interrupt();
thrownewIOException("Interrupted",e)
}
catch(IOException
ioe){
....................此处部分代码没有显示
}
}
if(this.server.isStopped()){
thrownewIOException("Serveris
stopped");
}

try{
shouldSplitHlog在ServerShutdownHandler实例生成时默认为true
if(this.shouldSplitHlog){
LOG.info("Splittinglogs
for " + serverName+
" before assignment.");

检查hbase.master.distributed.log.replay配置是否设置为true,默认值为false

if(this.distributedLogReplay){
LOG.info("Markregions
in recovery before assignment.");
Set<ServerName>serverNames=
newHashSet<ServerName>();
serverNames.add(serverName);
如果设置有distributedLogReplay,执行logsplit的预处理,
见MasterFileSystem.prepareLogReplay分析
this.services.getMasterFileSystem().prepareLogReplay(serverNames);
}
else{
如果没有设置distributedLogReplay,执行logsplit,并等待split完成
见MasterFileSystem.splitLog分析
this.services.getMasterFileSystem().splitLog(serverName);
}
从RegionStates.lastAssignments中移出此server对应的所有region分配信息
从RegionStates.processedServers中移出超出时间限制的
通过hbase.master.maximum.logsplit.keeptime配置的时间的server,默认为7200000ms(2hour)
am.getRegionStates().logSplit(serverName);
}
else{
LOG.info("Skippinglog
splitting for " + serverName);
}
}
catch(IOException
ioe){
resubmit(serverName,ioe);
}
....................此处部分代码没有显示

1.从AssignmentManager.regionPlans中移出包含此server的regionplan,
2.从regionStates.serverHoldings中得到此server所有的regionassign,
如果region的状态为online/splitting/merging,把region的状态设置为offline
并从regionsInTransition与regionAssignments移出这些个region.
如果region的状态为splitting/merging时,删除region在zk中region-in-transition的注册信息
3.从regionsInTransition中找到所有此server中transition的
状态为PENDING_OPEN/OPENING/FAILED_OPEN/FAILED_CLOSE/OFFLINE的region,并返回
4.根据3返回的在regionsInTransition中的region,删除region在zk中region-in-transition的注册信息
zk中的路径通过zookeeper.znode.unassigned进行配置。

5.注意:3中返回的region是当前下线的server在assignments中不包含的,

同时在regionInTransition又包含,也就是这些个region准备在当前下线的server上启动,

但此时这个server挂掉了。此方法的主要作用是删除掉当前下线server中正在做transition的region的zk信息,

把这些个region的状态设置为offline,等待下面的代码逻辑重新执行分配.

List<HRegionInfo>regionsInTransition=
am.processServerShutdown(serverName);

....................此处部分代码没有显示

把上面得到的正在做transition的regions添加到待分配的region列表中

List<HRegionInfo>toAssignRegions=
newArrayList<HRegionInfo>();
toAssignRegions.addAll(regionsInTransition);

//Iterate regions that were on this server and assign them
if(hris
!=null){
RegionStates
regionStates=
am.getRegionStates();

迭代从meta表中得到的所有当前下线server的userregion,

for(Map.Entry<HRegionInfo,Result>e:hris.entrySet()){
HRegionInfo
hri=
e.getKey();

如果此region在transition中已经包含,重新迭代下一次

if(regionsInTransition.contains(hri)){
continue;
}
String
encodedName=
hri.getEncodedName();
Locklock
=am.acquireRegionLock(encodedName);
try{
RegionState
rit=
regionStates.getRegionTransitionState(hri);

检查region所在的table是否被删除/是否是disable的table,如果不是执行如下流程

if(processDeadRegion(hri,e.getValue(),am,server.getCatalogTracker())){
ServerName
addressFromAM=
regionStates.getRegionServerOfRegion(hri);
if(addressFromAM!=
null&& !addressFromAM.equals(this.serverName)){
....................此处部分代码没有显示
continue;
}
if(rit
!=null){
if(rit.getServerName()!=
null&& !rit.isOnServer(serverName)){
....................此处部分代码没有显示
continue;
}
try{
....................此处部分代码没有显示

删除region在zk中region-in-transition的注册信息,zk中的路径通过zookeeper.znode.unassigned进行配置,

并更新region的状态为offline

ZKAssign.deleteNodeFailSilent(services.getZooKeeper(),hri);
regionStates.updateRegionState(hri,State.OFFLINE);
}
catch(KeeperExceptionke)
{
this.server.abort("UnexpectedZK
exception deleting unassigned node "+
hri, ke);
return;
}
}
elseif(regionStates.isRegionInState(
hri,State.SPLITTING_NEW,State.MERGING_NEW)){
如果region的状态是准备split或者准备merge时,重新设置region状态为offline
regionStates.regionOffline(hri);
}

添加此region到待分配的region列表中

toAssignRegions.add(hri);

}
elseif(rit !=null){
region所在的table现在是disable的table,设置region状态为offline,
a.如果region在zk中的eventType为M_ZK_REGION_CLOSING/RS_ZK_REGION_CLOSED,从zk中删除此region的路径
zk中region-in-transition的注册信息,zk中的路径通过zookeeper.znode.unassigned进行配置
b.如果region在zk中的eventtype为RS_ZK_REGION_CLOSED/M_ZK_REGION_OFFLINE,从zk中删除此region的路径
if(rit.isPendingCloseOrClosing()
&&am.getZKTable().isDisablingOrDisabledTable(hri.getTable())){
....................此处部分代码没有显示
regionStates.updateRegionState(hri,State.OFFLINE);
am.deleteClosingOrClosedNode(hri,rit.getServerName());
am.offlineDisabledRegion(hri);
}
else{
LOG.warn("THISSHOULD
NOT HAPPEN: unexpected region in transition "
+
rit+
" not to be assigned by SSH ofserver " +
serverName);
}
}
}
finally{
lock.unlock();
}
}
}

try{
执行region的批量assign操作
am.assign(toAssignRegions);
}
catch(InterruptedException
ie){
LOG.error("Caught"
+ ie+
" during round-robinassignment");
thrownewIOException(ie);
}

if(this.shouldSplitHlog&&
this.distributedLogReplay){
//wait for region assignment completes
for(HRegionInfo
hri:
toAssignRegions){
try{
此处只能是distributedLogReplay设置为true时,因为这时rs中不做logreplay,
distributedLogReplay设置为true时,region下不存在recovered.edits路径,因此openregion时replay不会执行,
所以此时等待region的open完成是可行的。等待每一个region的assign完成,
也就是assign时的RegionInTransition在RegionStates.regionsInTransition的处理完成(列表中不包含此region)
或者说等待分配的超时时间hbase.master.log.replay.wait.region.timeout过期,默认15000ms
在assign时会在zk中的region-in-transition注册一个region地址,等待rs处理,
此方法会不停止的迭代,直接timeout或者regionsInTransition中移出此region的transition,
每次迭代会让regionstates处于wait状态,等待AssignmentManager.nodeDataChanged/nodeDeleted对其notify
完成后通过AssignmentManager中的相关nodeDataChanged处理事件方法对regionsInTransition更新,
通过nodeDeleted处理事件对regionsInTransition移出
if(!am.waitOnRegionToClearRegionsInTransition(hri,regionAssignmentWaitTimeout)){
//Wait here is to avoid log replay hits current dead server and incur aRPC timeout
//when replay happens before region assignment completes.
LOG.warn("Region"
+ hri.getEncodedName()
+
"didn't complete assignment in time");
}
}
catch(InterruptedException
ie){
thrownewInterruptedIOException("Caught"
+ ie
+
"during waitOnRegionToClearRegionsInTransition");
}
}
//submit logReplay work
如果设置distributedLogReplay为true,此时regionassign完成,执行logsplit,并等待split完成
见MasterFileSystem.splitLog分析
this.services.getExecutorService().submit(
newLogReplayHandler(this.server,this.services,this.deadServers,this.serverName));
hasLogReplayWork=
true;
}
}finally{
this.deadServers.finish(serverName);
}

if(!hasLogReplayWork){
LOG.info("Finishedprocessing
of shutdown of " +serverName);
}
}

regionserver中处理splitlog

regionserver中通过regionserver启动时启动的SplitLogWorker线程,
通过其的run方法监听master在zk中生成splitWAL,一但master在zk中注册splitWAL路径成功,
执行taskLoop方法默认5s进行一次splitlog的检查(线程等待,timeout为5000ms),
通过nodeChildrenChanged来监听zk中splitWAL子路径的修改,并notify此线程,
通过nodeDataChanged来更新每一个SplitLogTask的状态更新,

publicvoidrun(){
try{
....................此处部分代码没有显示

//wait for master to create the splitLogZnode
intres
= -1;
while(res
== -1&& !exitWorker){
try{
监听master对zk中splitWAL的注册
res= ZKUtil.checkExists(watcher,watcher.splitLogZNode);
}
catch(KeeperExceptione)
{
//ignore
LOG.warn("Exceptionwhen
checking for " +watcher.splitLogZNode +
" ... retrying",e);
}
if(res
== -1){
try{
....................此处部分代码没有显示
Thread.sleep(1000);
}
catch(InterruptedException
e){
....................此处部分代码没有显示
exitWorker=
true;
break;
}
}
}

if(!exitWorker){
定期检查并启动执行splithlog的处理
taskLoop();
}
}catch(Throwable
t){
....................此处部分代码没有显示
}finally{
LOG.info("SplitLogWorker"
+ this.serverName+
" exiting");
}
}

检查并执行splithlog
privatevoid
taskLoop(){
while(!exitWorker){
intseq_start
=taskReadySeq;
得到所有的需要进行logsplit的servername的路径
List<String>paths
=getTaskList();
if(paths
==null){
LOG.warn("Couldnot
get tasks, did someone remove "+
this.watcher.splitLogZNode+
" ... worker thread exiting.");
return;
}
//pick meta
walfirstly
首先定义一个先执行的servernamehlog split的路径值,默认为随机取一个下标
如果要split的server中包含有meta的region,那么先从meta的server开始执行
intoffset
=(int)(Math.random()*
paths.size());
for(inti
= 0; i<
paths.size();i ++){
if(HLogUtil.isMetaFile(paths.get(i))){
offset=
i;
break;
}
}
intnumTasks
=paths.size();
for(inti
= 0; i<
numTasks;i++) {
计算执行顺序,从offset开始执行,如:paths.size()=6,offset=5,那么执行顺序为501234
intidx
= (i+
offset) %paths.size();
//don't call ZKSplitLog.getNodeName() because that will lead to
//double encoding of the path name
每一个server最大同时执行splithlog的task个数通过hbase.regionserver.wal.max.splitters配置,默认为2
得到现在活着的所有的regionserver列表,根据要split的server个数,
平均下来后计算此server最多要执行多少个splttask,
最多同时执行个数不超过hbase.regionserver.wal.max.splitters配置,每次执行tasksInProgress值加一
if(this.calculateAvailableSplitters(numTasks)>
0) {
如果此server还有能力执行splithlog
task,
更新zk中splitWAL中此servername(待split)的SplitLogTask为SplitLogTask.Owned,
并把当前执行split的rs更新到zk中。生成HLogSplitterHandler实例,并启动线程执行此处理程序
把tasksInProgress的正在处理的splittask的值加一,见HLogSplitterHandler.process流程分析
等待500-1000ms在重新执行下一次分配,这样能保证其它的rs也能分配到任务
注意:此部分逻辑第一次执行此方法时不会执行,因为第一次执行时zk中splitWAL路径下可能为空,
直接进入下面部分,让此线程进入wait状态,等待nodeChildrenChanged来进行notify
grabTask(ZKUtil.joinZNode(watcher.splitLogZNode,paths.get(idx)));
}
else{
LOG.debug("Currentregion
server " + this.serverName+
" has "
+this.tasksInProgress.get()+
" tasks in progress and can'ttake more.");
break;
}
if(exitWorker){
return;
}
}
SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
synchronized(taskReadyLock){
此次任务执行完成,zk中splitWAL在任务执行到此时还没有更新的rs下线被注册进来
while(seq_start==
taskReadySeq){
try{
线程进行等待状态,等待nodeChildrenChanged来进行notify
taskReadyLock.wait(checkInterval);
if(this.server!=
null){
//check to see if we have stale recovering regions in our internalmemory state
如果是设置有distributedLogReplay模式,此时在regionopen后才开始执行splitlog,
那么得到要进行splitlog的region列表。迭代每一个region,
从recovering-regions中检查是否此region需要splitlog,
如果recovering-regions中不存在此region,从rs中的recoveringRegions列表中移出此region
并设置此Hregion的recovering的值为false.
开始回到taskLoop方法的顶部,重新对这部分region进行splitlog
Map<String,HRegion>recoveringRegions=
this.server.getRecoveringRegions();
if(!recoveringRegions.isEmpty()){
//Make a local copy to prevent ConcurrentModificationException whenother threads
//modify recoveringRegions
List<String>tmpCopy
=newArrayList<String>(recoveringRegions.keySet());
for(String
region:
tmpCopy){
String
nodePath= ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode,region);
try{
if(ZKUtil.checkExists(this.watcher,nodePath)==
-1) {
HRegion
r= recoveringRegions.remove(region);
if(r
!= null){
r.setRecovering(false);
}
LOG.debug("Markrecovering
region:" + region+
" up.");
}
else{
....................此处部分代码没有显示
break;
}
}
catch(KeeperExceptione)
{
....................此处部分代码没有显示
break;
}
}
}
}
}
catch(InterruptedException
e){
....................此处部分代码没有显示
exitWorker=
true;
return;
}
}
}

}
}

HlogSplitterHandler.process处理流程分析

HlogSplitterHandler是具体对hlog进行处理的handler,通过其传入的TaskExecutor.exec方法执行,
TaskExecutor是在SplitLogWorker实例生成时在构造方法中生成的一个匿名实现类,

publicHLogSplitterHandler(finalServer
server,String curTask,
finalMutableInt
curTaskZKVersion,
CancelableProgressablereporter,
AtomicInteger
inProgressTasks,TaskExecutorsplitTaskExecutor){
设置EventType为RS_LOG_REPLAY
super(server,EventType.RS_LOG_REPLAY);
this.curTask=
curTask;
this.wal=
ZKSplitLog.getFileName(curTask);
this.reporter=
reporter;
this.inProgressTasks=
inProgressTasks;
把regionserver中执行splitlog
的task的值加一,表示占用一个执行位置
this.inProgressTasks.incrementAndGet();
this.serverName=
server.getServerName();
this.zkw=
server.getZooKeeper();
this.curTaskZKVersion=
curTaskZKVersion;
见SplitLogWorker的构造方法最后一个参数
this.splitTaskExecutor=
splitTaskExecutor;
}

publicvoid
process()throws IOException {
longstartTime
=System.currentTimeMillis();
try{
执行splitlog的处理程序,见下面的SplitLogTaskExecutor.exec处理分析,并得到流程执行的返回状态
Statusstatus
=this.splitTaskExecutor.exec(wal,reporter);
switch(status)
{
caseDONE:
成功结束,调用endTask结束任务,
设置zk中splitWAL路径的servername中SplitLogTask的状态为SplitLogTask.Done
endTask(zkw,newSplitLogTask.Done(this.serverName),
SplitLogCounters.tot_wkr_task_done,curTask,curTaskZKVersion.intValue());
break;
casePREEMPTED:
如果splittask是一个抢占的资源,不做处理
SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
LOG.warn("taskexecution
prempted " + wal);
break;
caseERR:
执行过程错误,调用endTask结束任务,
设置zk中splitWAL路径的servername中SplitLogTask的状态为SplitLogTask.Err
if(server!=
null&& !server.isStopped()){
endTask(zkw,newSplitLogTask.Err(this.serverName),
SplitLogCounters.tot_wkr_task_err,curTask,curTaskZKVersion.intValue());
break;
}
//if the RS is exiting then there is probably a tons of stuff
//that can go wrong. Resign instead of signaling error.
//$FALL-THROUGH$
caseRESIGNED:
如果资源的task执行被放弃,调用endTask结束任务,
设置zk中splitWAL路径的servername中SplitLogTask的状态为SplitLogTask.Resigned
if(server!=
null&&
server.isStopped()){
LOG.info("taskexecution
interrupted because worker is exiting "+
curTask);
}
endTask(zkw,newSplitLogTask.Resigned(this.serverName),
SplitLogCounters.tot_wkr_task_resigned,curTask,curTaskZKVersion.intValue());
break;
}
}finally{
LOG.info("worker"
+ serverName+
" done with task "+
curTask +" in "
+ (System.currentTimeMillis()-
startTime)+
"ms");
把regionserver中的splitlog
task的值减一,表示有一个空闲的位置
this.inProgressTasks.decrementAndGet();
}
}

SplitLogTaskExecutor.exec处理分析:

publicStatus
exec(Stringfilename,
CancelableProgressablep) {
Path
rootdir;
FileSystemfs;
try{
rootdir=
FSUtils.getRootDir(conf);
fs=
rootdir.getFileSystem(conf);
}
catch(IOException
e){
LOG.warn("couldnot
find root dir or fs", e);
如果得到hdfs中/hbase目录出错或生成/hbase的FileSystem出错时,返回RESIGNED(放弃)
returnStatus.RESIGNED;
}
//TODOhave
to correctly figure out when log splitting has been
//interrupted or has encountered a transient error and when it has
//encountered a bad non-retry-able persistent error.
Try{
执行splitlog操作,生成一个SplitLogFile实例,并执行其splitLogFile方法,
方法执行返回trueor
false,执行过程中定期向zk中此hlogreplay的路径发送心跳,如果心跳发送失败返回false
发送心跳的间隔通过hbase.splitlog.report.period配置,默认为hbase.splitlog.manager.timeout(120000)/3
发送心跳其实就是定期在zk中重新注册此servername,并得到上一次注册的version,
如果上一次version小于1表示PREEMPTED(此server有资源抢占)
1.通过hbase.regionserver.hlog.splitlog.buffersize配置读取源hlog的buffer大小,默认为128*1024*1024
2.通过hbase.regionserver.hlog.splitlog.writer.threads配置OutputSink的写入线程个数
3.配置hbase.regionserver.wal.logreplay.batch.size,默认为64
4.如果distributedLogReplay设置为true,生成的OutputSink为HLogSplitter.LogReplayOutputSink/
否则生成HLogSplitter.LogRecoveredEditsOutputSink实例
5.通过hbase.hlog.split.skip.errors配置是否跳过spliterror,默认为false
6.通过hbase.splitlog.report.interval.loglines配置每次读取的行数,默认为1024
读取过程中如果hlog的entity的seqid小于region中的seqid或者cocovering-regions中存储的seqid,continue.
数据在output时,根据regionname,在regionname下创建一个recovered.edits目录,并写入hlog数据到此目录下
具体请参见HLogSplitter.splitLogFile方法源代码。
if(!HLogSplitter.splitLogFile(rootdir,fs.getFileStatus(newPath(rootdir,filename)),
fs,conf,
p,sequenceIdChecker,watcher)) {
此server有资源抢占,主要是在zk上定期注册此server对hlog的split
returnStatus.PREEMPTED;
}
}
catch(InterruptedIOException
iioe){
LOG.warn("logsplitting
of " + filename+
" interrupted, resigning",iioe);
returnStatus.RESIGNED;
}
catch(IOException
e){
Throwable
cause=
e.getCause();
if(einstanceofRetriesExhaustedException

&& (causeinstanceofNotServingRegionException

||
causeinstanceofConnectException

||
causeinstanceofSocketTimeoutException)) {
LOG.warn("logreplaying
of " + filename+
" can't connect to the targetregionserver, "
+
"resigning",e);
returnStatus.RESIGNED;
}
elseif(causeinstanceofInterruptedException)
{
LOG.warn("logsplitting
of " + filename+
" interrupted, resigning",e);
returnStatus.RESIGNED;
}
elseif(causeinstanceofKeeperException){
LOG.warn("logsplitting
of " + filename+
" hit ZooKeeper issue,resigning",
e);
returnStatus.RESIGNED;
}
LOG.warn("logsplitting
of " + filename+
" failed, returning error",e);
returnStatus.ERR;
}
returnStatus.DONE;
}
}

SplogLogWorker.nodeDataChanged方法中监听到zk的状态修改时,如果状态不是如下状态是,调用stopTask结束线程
String
taskpath= currentTask;
if(taskpath!=
null&&
taskpath.equals(path)){
//have to compare data. cannot compare version because then there
//will be race with attemptToOwnTask()
//cannot just check whether the node has been transitioned to
//UNASSIGNED because by the time this worker sets the data watch
//the node might have made two transitions - from owned by this
//worker to unassigned to owned by another worker
if(!
slt.isOwned(this.serverName)&&
!slt.isDone(this.serverName)&&
!slt.isErr(this.serverName)&&
!slt.isResigned(this.serverName)){
LOG.info("task"
+ taskpath+
" preempted from "+
serverName+
", current task state and owner="+
slt.toString());
stopTask();
}
}
结束线程的执行过程
voidstopTask() {
LOG.info("Sendinginterrupt
to stop the worker thread");
worker.interrupt();//
TODOinterrupt often gets swallowed, do what else?
}

SplitLogManager.nodeDataChanged流程分析

regionserver中执行splitlog操作,并根据执行情况修改zk中splitWAL中SplitLogTask的状态。
SplitLogManager.nodeDataChanged在master端对zk中splitWAL进行监听,
从tasks列表中找到对应修改的task,把task的状态从IN_PROGRESS修改为SUCCESS,
设置task对应的TaskBatch的done或error的值加一。调用TaskBatch.notify方法叫醒线程的等待。
waitForSplittingCompletion方法中会每执行一次检查把TaskBatch.wait,因此需要对其做notify

Regionopen数据重播分析

HregionServer.openRegion-->OpenRegionHandler.process-->openRegion-->
Hregion.openRegion-->生成HRegion实例,并调用实例的r.openHRegion(reporter)-->initialize
-->initializeRegionInternals-->initializeRegionStores-->replayRecoveredEditsIfAny
注意:日志重播时传入的每一个store中最大的seqid是不包含blukload的hfile的seqid,
而regionopen时得到并计算nextsequence
id的所有store中最大的seqid是包含blukload的hfile的seqid

protectedlongreplayRecoveredEditsIfAny(finalPath
regiondir,
Map<byte[],Long>maxSeqIdInStores,
finalCancelableProgressablereporter,finalMonitoredTaskstatus)
throwsUnsupportedEncodingException,
IOException {
取出所有的store中flush到磁盘上的所有store中最小的一个seqid
longminSeqIdForTheRegion=
-1;
for(Long
maxSeqIdInStore:
maxSeqIdInStores.values()){
if(maxSeqIdInStore<
minSeqIdForTheRegion||
minSeqIdForTheRegion== -1) {
minSeqIdForTheRegion=
maxSeqIdInStore;
}
}
longseqid
=minSeqIdForTheRegion;

FileSystemfs
=this.fs.getFileSystem();
取出region目录下recovered.edits子路径下所有的文件,但不包含结尾是.temp的文件,并根据文件名称排序返回
hlog在region下的文件名称是此文件对应的最大seqid,也就是按seqid从小到大排序。
NavigableSet<Path>files
=HLogUtil.getSplitEditFilesSorted(fs,regiondir);
if(LOG.isDebugEnabled()){
LOG.debug("Found"
+ (files==
null? 0 :
files.size())
+
"recovered edits file(s) under " +regiondir);
}
没有需要重播的日志文件,直接返回当前所有的store中最小的seqid,如果是表示不需要进行replay
if(files
==null||
files.isEmpty())returnseqid;

for(Path
edits:files) {
检查日志文件是否存在
if(edits
==null|| !fs.exists(edits)){
LOG.warn("Nullor
non-existent edits file: " +edits);
continue;
}
检查文件大小是否为空,如果是空文件直接删除,如果是表示不需要进行replay
if(isZeroLengthThenDelete(fs,edits))continue;

longmaxSeqId
=Long.MAX_VALUE;
String
fileName=
edits.getName();
检查此文件中最大的seqid是否小于region是所有store中最小的seqid,如果是表示此文件不需要进行replay
maxSeqId= Math.abs(Long.parseLong(fileName));
if(maxSeqId<=
minSeqIdForTheRegion){
if(LOG.isDebugEnabled()){
String
msg=
"Maximum sequenceid for this logis " +
maxSeqId
+
"and minimum sequenceid for the region is "+
minSeqIdForTheRegion
+
",skipped the whole file, path=" +edits;
LOG.debug(msg);
}
continue;
}

try{
得到replay的edits中每一个kv,并根据kv得到对应的store,
如果kv中的seqid小于store中最大的seqid,此kv不需要replay,
否则把kv添加到store中,得到添加的kvsize,把size添加到:
a.RegionServerAccounting.replayEditsPerRegion中对应的region的大小中,
表示此region中replay的memory使用情况
b.RegionServerAccounting.atomicGlobalMemstoreSize中,表示全局的memstore使用情况
c.添加到此region的memstore中,HRegion.memstoreSize,表示当前region的memory使用情况
d.检查memstore是否达到flush的值,通过hbase.hregion.memstore.flush.size配置,默认1024*1024*128L
如果达到memstore的flush值,对memstore进行flush
f.返回最新的seqid
seqid=
replayRecoveredEdits(edits,maxSeqIdInStores,reporter);
}
catch(IOException
e){
出现replay错误,检查hbase.hregion.edits.replay.skip.errors是否配置为true
老版本使用hbase.skip.errors进行配置,默认值为false,表示不跳过error
booleanskipErrors=
conf.getBoolean(
HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
conf.getBoolean(
"hbase.skip.errors",
HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
if(conf.get("hbase.skip.errors")!=
null){
LOG.warn(
"Theproperty 'hbase.skip.errors' has been deprecated. Please use "+
HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS+
" instead.");
}
如果配置有跳过replayerror,把此edits文件重命名为editsname.systime,并remove到region的根目录下
if(skipErrors){
Path
p= HLogUtil.moveAsideBadEditsFile(fs,edits);
LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
+
"=trueso continuing. Renamed " +
edits+
"as " +
p,e);
}
else{
throwe;
}
}
}
//The edits size added into rsAccounting during this replaying will not
//be required any more. So just clear it.
把RegionServerAccounting.replayEditsPerRegion中此region对应的replaykvsize清空
if(this.rsAccounting!=
null){
this.rsAccounting.clearRegionReplayEditsSize(this.getRegionName());
}
如果进行了replay,那么当前replay后的seqid一定是大于原来的store的seqid,强制对region进行flush
if(seqid
>minSeqIdForTheRegion){
//Then we added some edits to memory. Flush and cleanup split editfiles.
internalFlushcache(null,seqid,status);
}
//Now delete the content of recovered edits. We're done w/ them.
删除region下所有的recovered.edits下的文件
for(Path
file:files) {
if(!fs.delete(file,false)){
LOG.error("Faileddelete
of " + file);
}
else{
LOG.debug("Deletedrecovered.edits
file=" + file);
}
}
returnseqid;
}

distributedLogReplay为true的日志重播

通过hbase.master.distributed.log.replay配置的值为true时,在splitLog时,

生成HLogSplitter实例时OutputSink的实现会选择HLogSplitter.LogReplayOutputSink,

此实现不经过recovered.edits目录,直接把数据replay到region中。具体实现请查看相关源代码
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: