hadoop之BlockPoolManager源码分析
2017-11-22 09:11
369 查看
在HDFS Federation架构中, 一个HDFS集群可以创建多个命名空间,每一个DataNode都可以存储多个BlockPool的的数据块,所以在
DataNode定义了一个BlockPoolManager用于管理DataNode上所有的块池。
DataNode 其他模块要对BlockPool操作必须通过BlockPool Manager来执行,每一个DataNode都有一个BlockManager的实例
一 BPServiceActor分析
BPServiceActor负责与一个NameNode进行通信,每一个BPServiceActor都是一个独立的线程,主要功能:
>>与NameNode进行第一次握手,获取命名空间的信息
>>向NameNode注册当前DataNode
>>定期向NameNode发送心跳,增量块汇报,全量块汇报,缓存块汇报等
>>执行NameNode传回的指令
static
final Log
LOG = DataNode.LOG;
//NameNode
地址
finalInetSocketAddress
nnAddr;
//NameNode
状态
HAServiceStatestate;
//所持有的BPOfferService对象
final BPOfferService
bpos;
//当前的工作线程
ThreadbpThread;
//向Name Node发送RPC请求的代理
DatanodeProtocolClientSideTranslatorPBbpNamenode;
//当前BPServiceActor的运行状态,初始状态时CONNECTING
static
enum RunningState {
CONNECTING, INIT_FAILED,
RUNNING, EXITED,
FAILED;
}
private
volatile RunningState
runningState = RunningState.CONNECTING;
//用于保存2次块汇报之间Data Node存储数据块的变化
private
final Map<DatanodeStorage, PerStoragePendingIncrementalBR>
pendingIncrementalBRperStorage = Maps.newHashMap();
//DataNode对象的引用
private
final DataNode
dn;
//用于记录Data Node的注册信息
private DatanodeRegistration
bpRegistration;
//初始化
try {
//与Name Node握手并进行Data Node注册
connectToNNAndHandshake();
break;
}
catch (IOException ioe) {
//初始化握手出现失败,运行状态置为INIT_FAILED
runningState = RunningState.INIT_FAILED;
if (shouldRetryInit()) {
// Retry until all namenode's of BPOSfailed initialization
sleepAndLogInterrupts(5000,
"initializing");
}
else {
runningState = RunningState.FAILED;
return;
}
}
}
//初始化成功,状态置为RUNNING
runningState = RunningState.RUNNING;
//循环调用offerService方法向NameNode发送心跳,块汇报,增量汇报以及缓存快汇报等
while (shouldRun()) {
try {
offerService();
}
catch (Exception ex) {
//收到异常也不会处理直到BPServiceActor停止或者Data Node停止
sleepAndLogInterrupts(5000,
"offeringservice");
}
}
//BPServiceActor停止以后,状态置为EXITED
runningState = RunningState.EXITED;
private void
connectToNNAndHandshake() throws IOException {
//获取Name Node 的PRC 代理
bpNamenode = dn.connectToNN(nnAddr);
//第一次握手去获取namespace 信息
NamespaceInfo
nsInfo =
retrieveNamespaceInfo();
bpos.verifyAndSetNamespaceInfo(nsInfo);
//第二次握手则是向Name Node注册这个Data Node
register(nsInfo);
}
private
void offerService()
throwsException {
long fullBlockReportLeaseId =
0;
while (shouldRun()) {
try {
final long
startTime =
scheduler.monotonicNow();
//判断是否发送心跳信息
final boolean
sendHeartbeat =
scheduler.isHeartbeatDue(startTime);
HeartbeatResponse
resp = null;
//如果要发送心跳
if (sendHeartbeat) {
boolean requestBlockReportLease = (fullBlockReportLeaseId ==
0) &&
scheduler.isBlockReportDue(startTime);
scheduler.scheduleNextHeartbeat();
if (!dn.areHeartbeatsDisabledForTests()) {
//发送心跳信息
resp = sendHeartBeat(requestBlockReportLease);
assert
resp != null;
if (resp.getFullBlockReportLeaseId() !=
0) {
fullBlockReportLeaseId =
resp.getFullBlockReportLeaseId();
}
dn.getMetrics().addHeartbeat(scheduler.monotonicNow() -
startTime);
//对心跳响应中携带的NameNode的HA状态进行处理
bpos.updateActorStatesFromHeartbeat(
this,
resp.getNameNodeHaState());
state = resp.getNameNodeHaState().getState();
if (state == HAServiceState.ACTIVE) {
handleRollingUpgradeStatus(resp);
}
long startProcessCommands =
monotonicNow();
//处理响应中带回的Name Node指令
if (!processCommand(resp.getCommands()))
continue;
long endProcessCommands =
monotonicNow();
}
}
if (sendImmediateIBR ||
sendHeartbeat) {
reportReceivedDeletedBlocks();
}
List<DatanodeCommand>
cmds = null;
boolean forceFullBr =
scheduler.forceFullBlockReport.getAndSet(false);
if (forceFullBr) {
LOG.info("Forcinga full block report to " +
nnAddr);
}
if ((fullBlockReportLeaseId !=
0) || forceFullBr) {
cmds = blockReport(fullBlockReportLeaseId);
fullBlockReportLeaseId =
0;
}
processCommand(cmds ==
null ? null :
cmds.toArray(new
DatanodeCommand[cmds.size()]));
if (!dn.areCacheReportsDisabledForTests()) {
DatanodeCommand cmd =
cacheReport();
processCommand(new
DatanodeCommand[]{
cmd });
}
//
// There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again.
//
long waitTime =
scheduler.getHeartbeatWaitTime();
synchronized(pendingIncrementalBRperStorage) {
if (waitTime >
0 && !sendImmediateIBR) {
try {
pendingIncrementalBRperStorage.wait(waitTime);
}
catch (InterruptedException
ie) {
LOG.warn("BPOfferServicefor " +
this +
" interrupted");
}
}
}
// synchronized
}
catch(RemoteException
re) {
String
reClass =
re.getClassName();
if (UnregisteredNodeException.class.getName().equals(reClass)
||
DisallowedDatanodeException.class.getName().equals(reClass)
||
IncorrectVersionException.class.getName().equals(reClass))
{
LOG.warn(this +
" is shutting down",
re);
shouldServiceRun = false;
return;
}
LOG.warn("RemoteExceptionin offerService",
re);
try {
long sleepTime = Math.min(1000,
dnConf.heartBeatInterval);
Thread.sleep(sleepTime);
}
catch (InterruptedException
ie) {
Thread.currentThread().interrupt();
}
}
catch (IOException e) {
LOG.warn("IOExceptionin offerService",
e);
}
processQueueMessages();
}
// while (shouldRun())
}
// offerService
void
updateActorStatesFromHeartbeat(BPServiceActor
actor,
NNHAStatusHeartbeat
nnHaState) {
writeLock();
try {
//取到Name Node的txid
final long
txid = nnHaState.getTxId();
//当前这个NameNode是否声明自己为Active NameNode
final boolean
nnClaimsActive =
nnHaState.getState() == HAServiceState.ACTIVE;
//BPOfferService是否认为当前Name Node为Active NameNode
final boolean
bposThinksActive =
bpServiceToActive == actor;
//当前Name Node携带的txid是否大于原Active NameNode 的txid
final boolean
isMoreRecentClaim =
txid > lastActiveClaimTxId;
//当前这个是Acitve,但是BPOfferService所记录的不是Active,说明Standby已经切换成Active
if (nnClaimsActive && !bposThinksActive) {
//如果有两个namenode声明为active,当前的请求过时
if (!isMoreRecentClaim) {
//直接忽略
return;
}
else {//当前请求是最新的请求
if (bpServiceToActive ==
null) {
//BPOfferService上还没有保存active name node
}
else {
}
//将bpServiceToActive指向当前的NameNode对应的 BPOfferService
bpServiceToActive =
actor;
}
}
else if (!nnClaimsActive &&
bposThinksActive) {
//原来Active Name Node现在声明为StandbyName Node
bpServiceToActive = null;
}
//更新lastActiveClaimTxId
if (bpServiceToActive ==
actor) {
assert
txid >= lastActiveClaimTxId;
lastActiveClaimTxId =
txid;
}
}
finally {
writeUnlock();
}
}
二 BPOfferService
BPOfferService就是对DataNode每一个BlockPool进行管理的类。
重要的字段:
//握手之后获取到的Namespace信息
NamespaceInfobpNSInfo;
//当前Block Pool在Name Node上的注册信息,这个信息是在Data Node注册阶段获取的
volatileDatanodeRegistration
bpRegistration;
//当前DataNode的引用
private
final DataNode
dn;
//BPServiceActor的引用,这个代表着是Active的Name Node对应的对象
privateBPServiceActor
bpServiceToActive = null;
//当前命名空间中所有NameNode对应的BPServiceActor对象
private
final List<BPServiceActor>
bpServices = new
CopyOnWriteArrayList<BPServiceActor>();
//每当收到一个NameNode的时候,就记录最近的txid
private
long lastActiveClaimTxId = -1;
主要的方法分类:
1)触发汇报
trySendErrorReport(),reportRemoteBadBlock,reportBadBlock()实现了向NameNode发送错误汇报,汇报远程坏块以及本地坏块的操作,会直接调用BPServiceActor对应操作
void
trySendErrorReport(int
errCode, String errMsg) {
for (BPServiceActor actor :
bpServices) {
ErrorReportAction
errorReportAction = new
ErrorReportAction
(errCode,
errMsg);
actor.bpThreadEnqueue(errorReportAction);
}
}
2)添加与删除数据块操作
当DataNode接收一个新的数据块时,比如客户端通过数据流管道写入一个数据块,或者通过DataTransferProtocal流式接口复制一个数据块时候,都会调用BPOfferService.notifyNameNodeReceiveBlock()。
当DataNdoe删除一个已有的数据块的时候,会调用BPOfferService
.notifyNamenodeDeletedBlock()方法通知命名空间。
3)响应NameNode的指令
boolean
processCommandFromActor(DatanodeCommand
cmd,BPServiceActor
actor) throws IOException {
if (cmd ==
null) {
return true;
}
//如果Name Node返回的指令要求Data Node重新注册的,则调用BPServiceActor.register方法
if (DatanodeProtocol.DNA_REGISTER ==
cmd.getAction()) {
actor.reRegister();
return false;
}
writeLock();
try {
//对于Active Name Node返回的指令,调用processCommandFromActive
if (actor ==
bpServiceToActive) {
return
processCommandFromActive(cmd,
actor);
}
else {
//对于Standbu Name Node返回的指令,调用processCommandFromStandby
return
processCommandFromStandby(cmd,
actor);
}
}
finally {
writeUnlock();
}
}
processCommandFromStandby处理来自StandbyName Node的指令,直接忽略即可。防止在HA部署下出现脑裂的情况,也就是ActiveNameNode和StandbyNameNode同时向DataNode下指令。所以BPOfferService对象并不执行Standby返回的字节指令
三BlockPoolManager
BlockPoolManager类负责管理所有的BPOfferService实例,对外提供添加、删除、启动关闭BPOfferService类的接口。所有BPOfferService的操作,都必须通过BlockPoolManager类提供的方法来执行
DataNode启动的时候,会初始化BlockPoolManager对象,然后调用BlockPoolManager.refreshNamenodes()完成对BlockPoolManager的构造
//<namespaceId,BPOfferService>命名空间id与BPOfferService
private
final Map<String, BPOfferService>
bpByNameserviceId = Maps.newHashMap();
//<blockId,BPOfferService>块池id与BPOfferService映射
private
final Map<String, BPOfferService>
bpByBlockPoolId = Maps.newHashMap();
//持有一个BPOfferService List
private
final List<BPOfferService>
offerServices =Lists.newArrayList();
//持有一个Data Node的引用
private
final DataNode
dn;
DataNode定义了一个BlockPoolManager用于管理DataNode上所有的块池。
DataNode 其他模块要对BlockPool操作必须通过BlockPool Manager来执行,每一个DataNode都有一个BlockManager的实例
一 BPServiceActor分析
BPServiceActor负责与一个NameNode进行通信,每一个BPServiceActor都是一个独立的线程,主要功能:
>>与NameNode进行第一次握手,获取命名空间的信息
>>向NameNode注册当前DataNode
>>定期向NameNode发送心跳,增量块汇报,全量块汇报,缓存块汇报等
>>执行NameNode传回的指令
static
final Log
LOG = DataNode.LOG;
//NameNode
地址
finalInetSocketAddress
nnAddr;
//NameNode
状态
HAServiceStatestate;
//所持有的BPOfferService对象
final BPOfferService
bpos;
//当前的工作线程
ThreadbpThread;
//向Name Node发送RPC请求的代理
DatanodeProtocolClientSideTranslatorPBbpNamenode;
//当前BPServiceActor的运行状态,初始状态时CONNECTING
static
enum RunningState {
CONNECTING, INIT_FAILED,
RUNNING, EXITED,
FAILED;
}
private
volatile RunningState
runningState = RunningState.CONNECTING;
//用于保存2次块汇报之间Data Node存储数据块的变化
private
final Map<DatanodeStorage, PerStoragePendingIncrementalBR>
pendingIncrementalBRperStorage = Maps.newHashMap();
//DataNode对象的引用
private
final DataNode
dn;
//用于记录Data Node的注册信息
private DatanodeRegistration
bpRegistration;
//初始化
try {
//与Name Node握手并进行Data Node注册
connectToNNAndHandshake();
break;
}
catch (IOException ioe) {
//初始化握手出现失败,运行状态置为INIT_FAILED
runningState = RunningState.INIT_FAILED;
if (shouldRetryInit()) {
// Retry until all namenode's of BPOSfailed initialization
sleepAndLogInterrupts(5000,
"initializing");
}
else {
runningState = RunningState.FAILED;
return;
}
}
}
//初始化成功,状态置为RUNNING
runningState = RunningState.RUNNING;
//循环调用offerService方法向NameNode发送心跳,块汇报,增量汇报以及缓存快汇报等
while (shouldRun()) {
try {
offerService();
}
catch (Exception ex) {
//收到异常也不会处理直到BPServiceActor停止或者Data Node停止
sleepAndLogInterrupts(5000,
"offeringservice");
}
}
//BPServiceActor停止以后,状态置为EXITED
runningState = RunningState.EXITED;
private void
connectToNNAndHandshake() throws IOException {
//获取Name Node 的PRC 代理
bpNamenode = dn.connectToNN(nnAddr);
//第一次握手去获取namespace 信息
NamespaceInfo
nsInfo =
retrieveNamespaceInfo();
bpos.verifyAndSetNamespaceInfo(nsInfo);
//第二次握手则是向Name Node注册这个Data Node
register(nsInfo);
}
private
void offerService()
throwsException {
long fullBlockReportLeaseId =
0;
while (shouldRun()) {
try {
final long
startTime =
scheduler.monotonicNow();
//判断是否发送心跳信息
final boolean
sendHeartbeat =
scheduler.isHeartbeatDue(startTime);
HeartbeatResponse
resp = null;
//如果要发送心跳
if (sendHeartbeat) {
boolean requestBlockReportLease = (fullBlockReportLeaseId ==
0) &&
scheduler.isBlockReportDue(startTime);
scheduler.scheduleNextHeartbeat();
if (!dn.areHeartbeatsDisabledForTests()) {
//发送心跳信息
resp = sendHeartBeat(requestBlockReportLease);
assert
resp != null;
if (resp.getFullBlockReportLeaseId() !=
0) {
fullBlockReportLeaseId =
resp.getFullBlockReportLeaseId();
}
dn.getMetrics().addHeartbeat(scheduler.monotonicNow() -
startTime);
//对心跳响应中携带的NameNode的HA状态进行处理
bpos.updateActorStatesFromHeartbeat(
this,
resp.getNameNodeHaState());
state = resp.getNameNodeHaState().getState();
if (state == HAServiceState.ACTIVE) {
handleRollingUpgradeStatus(resp);
}
long startProcessCommands =
monotonicNow();
//处理响应中带回的Name Node指令
if (!processCommand(resp.getCommands()))
continue;
long endProcessCommands =
monotonicNow();
}
}
if (sendImmediateIBR ||
sendHeartbeat) {
reportReceivedDeletedBlocks();
}
List<DatanodeCommand>
cmds = null;
boolean forceFullBr =
scheduler.forceFullBlockReport.getAndSet(false);
if (forceFullBr) {
LOG.info("Forcinga full block report to " +
nnAddr);
}
if ((fullBlockReportLeaseId !=
0) || forceFullBr) {
cmds = blockReport(fullBlockReportLeaseId);
fullBlockReportLeaseId =
0;
}
processCommand(cmds ==
null ? null :
cmds.toArray(new
DatanodeCommand[cmds.size()]));
if (!dn.areCacheReportsDisabledForTests()) {
DatanodeCommand cmd =
cacheReport();
processCommand(new
DatanodeCommand[]{
cmd });
}
//
// There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again.
//
long waitTime =
scheduler.getHeartbeatWaitTime();
synchronized(pendingIncrementalBRperStorage) {
if (waitTime >
0 && !sendImmediateIBR) {
try {
pendingIncrementalBRperStorage.wait(waitTime);
}
catch (InterruptedException
ie) {
LOG.warn("BPOfferServicefor " +
this +
" interrupted");
}
}
}
// synchronized
}
catch(RemoteException
re) {
String
reClass =
re.getClassName();
if (UnregisteredNodeException.class.getName().equals(reClass)
||
DisallowedDatanodeException.class.getName().equals(reClass)
||
IncorrectVersionException.class.getName().equals(reClass))
{
LOG.warn(this +
" is shutting down",
re);
shouldServiceRun = false;
return;
}
LOG.warn("RemoteExceptionin offerService",
re);
try {
long sleepTime = Math.min(1000,
dnConf.heartBeatInterval);
Thread.sleep(sleepTime);
}
catch (InterruptedException
ie) {
Thread.currentThread().interrupt();
}
}
catch (IOException e) {
LOG.warn("IOExceptionin offerService",
e);
}
processQueueMessages();
}
// while (shouldRun())
}
// offerService
void
updateActorStatesFromHeartbeat(BPServiceActor
actor,
NNHAStatusHeartbeat
nnHaState) {
writeLock();
try {
//取到Name Node的txid
final long
txid = nnHaState.getTxId();
//当前这个NameNode是否声明自己为Active NameNode
final boolean
nnClaimsActive =
nnHaState.getState() == HAServiceState.ACTIVE;
//BPOfferService是否认为当前Name Node为Active NameNode
final boolean
bposThinksActive =
bpServiceToActive == actor;
//当前Name Node携带的txid是否大于原Active NameNode 的txid
final boolean
isMoreRecentClaim =
txid > lastActiveClaimTxId;
//当前这个是Acitve,但是BPOfferService所记录的不是Active,说明Standby已经切换成Active
if (nnClaimsActive && !bposThinksActive) {
//如果有两个namenode声明为active,当前的请求过时
if (!isMoreRecentClaim) {
//直接忽略
return;
}
else {//当前请求是最新的请求
if (bpServiceToActive ==
null) {
//BPOfferService上还没有保存active name node
}
else {
}
//将bpServiceToActive指向当前的NameNode对应的 BPOfferService
bpServiceToActive =
actor;
}
}
else if (!nnClaimsActive &&
bposThinksActive) {
//原来Active Name Node现在声明为StandbyName Node
bpServiceToActive = null;
}
//更新lastActiveClaimTxId
if (bpServiceToActive ==
actor) {
assert
txid >= lastActiveClaimTxId;
lastActiveClaimTxId =
txid;
}
}
finally {
writeUnlock();
}
}
二 BPOfferService
BPOfferService就是对DataNode每一个BlockPool进行管理的类。
重要的字段:
//握手之后获取到的Namespace信息
NamespaceInfobpNSInfo;
//当前Block Pool在Name Node上的注册信息,这个信息是在Data Node注册阶段获取的
volatileDatanodeRegistration
bpRegistration;
//当前DataNode的引用
private
final DataNode
dn;
//BPServiceActor的引用,这个代表着是Active的Name Node对应的对象
privateBPServiceActor
bpServiceToActive = null;
//当前命名空间中所有NameNode对应的BPServiceActor对象
private
final List<BPServiceActor>
bpServices = new
CopyOnWriteArrayList<BPServiceActor>();
//每当收到一个NameNode的时候,就记录最近的txid
private
long lastActiveClaimTxId = -1;
主要的方法分类:
1)触发汇报
trySendErrorReport(),reportRemoteBadBlock,reportBadBlock()实现了向NameNode发送错误汇报,汇报远程坏块以及本地坏块的操作,会直接调用BPServiceActor对应操作
void
trySendErrorReport(int
errCode, String errMsg) {
for (BPServiceActor actor :
bpServices) {
ErrorReportAction
errorReportAction = new
ErrorReportAction
(errCode,
errMsg);
actor.bpThreadEnqueue(errorReportAction);
}
}
2)添加与删除数据块操作
当DataNode接收一个新的数据块时,比如客户端通过数据流管道写入一个数据块,或者通过DataTransferProtocal流式接口复制一个数据块时候,都会调用BPOfferService.notifyNameNodeReceiveBlock()。
当DataNdoe删除一个已有的数据块的时候,会调用BPOfferService
.notifyNamenodeDeletedBlock()方法通知命名空间。
3)响应NameNode的指令
boolean
processCommandFromActor(DatanodeCommand
cmd,BPServiceActor
actor) throws IOException {
if (cmd ==
null) {
return true;
}
//如果Name Node返回的指令要求Data Node重新注册的,则调用BPServiceActor.register方法
if (DatanodeProtocol.DNA_REGISTER ==
cmd.getAction()) {
actor.reRegister();
return false;
}
writeLock();
try {
//对于Active Name Node返回的指令,调用processCommandFromActive
if (actor ==
bpServiceToActive) {
return
processCommandFromActive(cmd,
actor);
}
else {
//对于Standbu Name Node返回的指令,调用processCommandFromStandby
return
processCommandFromStandby(cmd,
actor);
}
}
finally {
writeUnlock();
}
}
processCommandFromStandby处理来自StandbyName Node的指令,直接忽略即可。防止在HA部署下出现脑裂的情况,也就是ActiveNameNode和StandbyNameNode同时向DataNode下指令。所以BPOfferService对象并不执行Standby返回的字节指令
三BlockPoolManager
BlockPoolManager类负责管理所有的BPOfferService实例,对外提供添加、删除、启动关闭BPOfferService类的接口。所有BPOfferService的操作,都必须通过BlockPoolManager类提供的方法来执行
DataNode启动的时候,会初始化BlockPoolManager对象,然后调用BlockPoolManager.refreshNamenodes()完成对BlockPoolManager的构造
//<namespaceId,BPOfferService>命名空间id与BPOfferService
private
final Map<String, BPOfferService>
bpByNameserviceId = Maps.newHashMap();
//<blockId,BPOfferService>块池id与BPOfferService映射
private
final Map<String, BPOfferService>
bpByBlockPoolId = Maps.newHashMap();
//持有一个BPOfferService List
private
final List<BPOfferService>
offerServices =Lists.newArrayList();
//持有一个Data Node的引用
private
final DataNode
dn;
相关文章推荐
- 研磨Hadoop源码(六)ResourceManager启动分析2
- 研磨Hadoop源码(五)ResourceManager启动分析1
- 研磨Hadoop源码(六)ResourceManager启动分析2
- 研磨Hadoop源码(五)ResourceManager启动分析1
- Hadoop源码分析13: IPC流程(8) Server的wait、notify
- Hadoop源码分析26 JobTracker主要容器和线程
- HBase源码分析之org.apache.hadoop.hbase.catalog包
- 源码分析之ServiceManager类分析
- Hadoop源码分析,map输入
- Hadoop-源码分析--HDFS读取文件
- [FileZilla Client 源码分析一]CContextManager与CState类
- Hadoop2源码分析-HDFS datanode核心模块分析
- 源码分析Hadoop FileInputFormat如何分片
- hadoop源码分析系列(三)——org.apache.hadoop.fs包 ----(下)
- Hadoop源码分析之NameNode的启动与停止(续)
- Hadoop源码分析-Text
- Android多用户之UserManagerService源码分析
- Hadoop RCFile存储格式详解(源码分析、代码示例)
- Kubernetes Eviction Manager源码分析
- Hadoop源码分析笔记(十五):名字节点--启动和停止