您的位置:首页 > Web前端 > Node.js

DataNode与NameNode交互机制相关代码分析 推荐

2014-11-25 11:23 477 查看
HDFS Federation是为解决HDFS单点故障而提出的NameNode水平扩展方案,该方案允许HDFS创建多个Namespace以提高集群的扩展性和隔离性。在Federation中新增了block-pool的概念,block-pool就是属于单个Namespace的一组block,每个DataNode为所有的block-pool存储block,可以理解block-pool是一个重新将block划分的逻辑概念,同一个DataNode中可以存储属于多个block-pool的多个block。所以在NameNode和DataNode通信相关的代码方面,也做了很大的改动以支持上述特性。
在cdh3x中,DataNode与NameNode交互主要集中在DataNode这个类中,类结构比较简单,随着Federation概念的引入,新增了一些比较重要的类来管理逻辑层面划分的block-pool和block-pool下的block分布,并以block-pool为单位来与NameNode进行相关的通信。类图如下


BPServiceActor类实现Runnable接口,以线程的方式运行,一个BPServiceActor实例可以和一个active或standby模式的NameNode实例进行交互,它是真正的任务执行者。主要有四大职能
1.预先与NameNode进行握手

2.向NameNode注册
3.周期性的向NameNode发送心跳
4.处理NameNode发送回的命令
一个BPOfferService实例代表在某个DataNode上的某个block-pool(一个block-pool对应一个独立的Namespace),对block-pool对应的active和standby状态的NameNode进行交互的操作。BPOfferService管理和每个NameNode进行实际通信的BPServiceActor实例,并作为代理与处于active状态和standby状态的两个NameNode进行交互,同时标识与active状态NameNode通信的BPServiceActor实例。相关代码如下
class BPOfferService {
static final Log LOG = DataNode.LOG;

//本block-pool服务代表的Namespace信息,和NameNode握手的第一阶段分配所得
NamespaceInfo bpNSInfo;

//block-pool所在DataNode相关的注册信息,和NameNode握手的第二阶段分配所得
volatile DatanodeRegistration bpRegistration;

//所属datanode实例
private final DataNode dn;

//代表和当前active状态的NameNode关联的BPServiceActor实例
//如果所有NameNode处于standby状态,此属性可以为空
//如果此属性非空,则必指向bpServices集合中的某个实例
private BPServiceActor bpServiceToActive = null;

//在本nameservice服务下指向所有NameNode的BPServiceActor实例
//不论代表的NameNode是active状态还是standby状态
private List<BPServiceActor> bpServices = new CopyOnWriteArrayList<BPServiceActor>();

//构造方法中根据NameNode的地址来初始化BPServiceActor,并加入到bpServices集合中
BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
Preconditions.checkArgument(!nnAddrs.isEmpty(),
"Must pass at least one NN.");
this.dn = dn;

for (InetSocketAddress addr : nnAddrs) {
this.bpServices.add(new BPServiceActor(addr, this));
}
}
}
BlockPoolManager类主要用于管理DataNode上的BPOfferService,对BPOfferService对象的创建,删除,启动,停止,关闭的操作都需要通过BlockPoolManager提供的方法来控制。代码如下
class BlockPoolManager {
private static final Log LOG = DataNode.LOG;

//nameserviceId和BPOfferService的映射集合
private final Map<String, BPOfferService> bpByNameserviceId = Maps.newHashMap();
//blockPoolId和BPOfferService的映射集合
private final Map<String, BPOfferService> bpByBlockPoolId = Maps.newHashMap();
//BPOfferService集合
private final List<BPOfferService> offerServices = Lists.newArrayList();
//当前所属的datanode实例
private final DataNode dn;

//更新NameNode列表时的lock
private final Object refreshNamenodesLock = new Object();

BlockPoolManager(DataNode dn) {
this.dn = dn;
}
}
BlockPoolSliceScanner类用于扫描block-pool下的block文件并校验文件是否损坏,它对block和最后的校验时间进行跟踪,目前不提供修改block元数据的操作。一个DataNode对应一个DataBlockScanner,DataBlockScanner对不同block-pool的BlockPoolSliceScanner进行管理。
BlockPoolSliceStorage用于管理DataNode上对应同一个block pool的BlockPoolSlices集合,由于一个DataNode上可能会挂载多个存储设备,即逻辑上对应多个volume,一个BlockPoolSlice对应一个volume,所以对同一个DataNode上的同一个block pool,可以管理多个BlockPoolSlice。BlockPoolSliceStorage的主要职能如下
1.对新生成的block-pool对应的存储进行格式化
2.恢复存储状态以保持一致性
3.在升级的时候对block-pool进行快照处理
4.回滚block-pool到上一个快照
5.删除快照并提交block
在cdh3x中,DataNode启动过程中与NameNode交互的操作,都是在DataNode类中进行的,包括握手,注册,数据块上报和发送心跳等。代码调用关系如下图所示

握手




注册




数据块上报




发送心跳



在cdh5.1中,这些操作最终都交给了BPServiceActor来处理,下面来详细分析下具体的代码和相互间的调用关系。
BlockPoolManager在startDataNode方法中被实例化,startDataNode调用关系如下




DataNode.startDataNode(Configuration conf,
List<StorageLocation> dataDirs,
SecureResources resources
) throws IOException {
blockPoolManager = new BlockPoolManager(this);
//刷新加载NameNodes
blockPoolManager.refreshNamenodes(conf);
}

BlockPoolManager.refreshNamenodes(Configuration conf)
throws IOException {
LOG.info("Refresh request received for nameservices: "
+ conf.get(DFSConfigKeys.DFS_NAMESERVICES));

//地址映射列表,Map<nameserviceId,<namenodeId,nnAddress>>
Map<String, Map<String, InetSocketAddress>> newAddressMap =
DFSUtil.getNNServiceRpcAddresses(conf);

synchronized (refreshNamenodesLock) {
doRefreshNamenodes(newAddressMap);
}
}

BlockPoolManager.doRefreshNamenodes(Map<String, Map<String, InetSocketAddress>> addrMap){
Set<String> toRefresh = Sets.newLinkedHashSet();
Set<String> toAdd = Sets.newLinkedHashSet();
Set<String> toRemove;

synchronized (this) {
for (String nameserviceId : addrMap.keySet()) {
if (bpByNameserviceId.containsKey(nameserviceId)) {
//已经存在,可能有更新的nameserviceId
toRefresh.add(nameserviceId);
} else {
//加入新的nameserviceId
toAdd.add(nameserviceId);
}
}

//找出bpByNameserviceId存在的,但不存在于addrMap的nameserviceId
//等待删除
toRemove = Sets.newHashSet(Sets.difference(
bpByNameserviceId.keySet(), addrMap.keySet()));

//启动新的nameservice
if (!toAdd.isEmpty()) {
for (String nsToAdd : toAdd) {
ArrayList<InetSocketAddress> addrs =
Lists.newArrayList(addrMap.get(nsToAdd).values());
//根据NameNode地址集合创建新的BPOfferService实例
BPOfferService bpos = createBPOS(addrs);
//建立nameserviceId到BPOfferService的映射
bpByNameserviceId.put(nsToAdd, bpos);
//加入到offerServices集合
offerServices.add(bpos);
}
}
//启动BPOfferService服务
startAll();
}

//删除toRemove中的nameserviceId的映射关系,并停止相关服务
if (!toRemove.isEmpty()) {
for (String nsToRemove : toRemove) {
BPOfferService bpos = bpByNameserviceId.get(nsToRemove);
bpos.stop();
bpos.join();
}
}

//刷新变化的nameserviceId
if (!toRefresh.isEmpty()) {
for (String nsToRefresh : toRefresh) {
BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);
ArrayList<InetSocketAddress> addrs =
Lists.newArrayList(addrMap.get(nsToRefresh).values());
bpos.refreshNNList(addrs);
}
}
}

BlockPoolManager.startAll() throws IOException {
try {
UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
for (BPOfferService bpos : offerServices) {
//启动BPOfferService服务
bpos.start();
}
return null;
}
});
} catch (InterruptedException ex) {
IOException ioe = new IOException();
ioe.initCause(ex.getCause());
throw ioe;
}
}

BPOfferService.start() {
for (BPServiceActor actor : bpServices) {
//启动BPServiceActor服务
actor.start();
}
}
经过层层调用之后,真正和NameNode进行通信的BPServiceActor服务被启动,启动后的BPServiceActor开始和它对应状态的NameNode进行握手注册等一系列操作,BPServiceActor服务对应的NameNode可能是active或standby状态。详细代码如下
BPServiceActor.run() {
LOG.info(this + " starting to offer service");

try {
while (true) {
try {
//连接到NameNode并进行握手
connectToNNAndHandshake();
break;
} catch (IOException ioe) {
runningState = RunningState.INIT_FAILED;
if (shouldRetryInit()) {
LOG.error("Initialization failed for " + this + " "
+ ioe.getLocalizedMessage());
sleepAndLogInterrupts(5000, "initializing");
} else {
runningState = RunningState.FAILED;
LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);
return;
}
}
}

runningState = RunningState.RUNNING;

while (shouldRun()) {
try {
//循环调用offerService()
//在本方法中,周期性的向NameNode发送心跳并执行NameNode返回的相关命令
offerService();
} catch (Exception ex) {
LOG.error("Exception in BPOfferService for " + this, ex);
sleepAndLogInterrupts(5000, "offering service");
}
}
runningState = RunningState.EXITED;
} catch (Throwable ex) {
LOG.warn("Unexpected exception in block pool " + this, ex);
runningState = RunningState.FAILED;
} finally {
LOG.warn("Ending block pool service for: " + this);
cleanUp();
}
}

BPServiceActor.connectToNNAndHandshake() throws IOException {
//连接到NameNode并获得NameNode代理对象
bpNamenode = dn.connectToNN(nnAddr);

//第一阶段获取NamespaceInfo
NamespaceInfo nsInfo = retrieveNamespaceInfo();

//校验namespaceInfo是否和HA中的其他NameNode信息一致
//并建立blockPoolManager和BPOfferService的对应关系
bpos.verifyAndSetNamespaceInfo(nsInfo);

//第二阶段向NameNode注册
register();
}
上述主要分析了加入Federation特性和HA特性后,DataNode和NameNode在代码层面交互方式的改变,相比之前的代码,逻辑更加清晰并且类之间的耦合度更低。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop