您的位置:首页 > 编程语言

hadoop源代码分析——block管理(一)

2012-05-16 15:11 387 查看
更多请看http://www.kakuka.net

6.1 block allocation

6.1.1 block allocation概览

客户端使用DFSOutputStream调用namenode的addBlock()(RPC方法),之后namenode调用nameSystem的getAdditionalBlock函数,传递文件名和客户端。
public LocatedBlock getAdditionalBlock(String src,
String clientName
) throws IOException {
//忽略了一部分检测

INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);

//
// If we fail this, bad things happen!
//
if (!checkFileProgress(pendingFile, false)) {
throw new NotReplicatedYetException("Not replicated yet:" + src);
}
fileLength = pendingFile.computeContentSummary().getLength();
blockSize = pendingFile.getPreferredBlockSize();
clientNode = pendingFile.getClientNode();
replication = (int)pendingFile.getReplication();
}

// choose targets for the new block tobe allocated.
DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
clientNode,
null,
blockSize);
if (targets.length < this.minReplication) {
throw new IOException("File " + src + " could only be replicated to " +
targets.length + " nodes, instead of " +
minReplication);
}

// Allocate a new block and record it in the INode.
synchronized (this) {
INode[] pathINodes = dir.getExistingPathINodes(src);//返回路径上的所有节点,最后一个是叶子节点(如果src指向文件的话)
int inodesLen = pathINodes.length;
checkLease(src, clientName, pathINodes[inodesLen-1]);
INodeFileUnderConstruction pendingFile  = (INodeFileUnderConstruction)
pathINodes[inodesLen - 1];

if (!checkFileProgress(pendingFile, false)) {
throw new NotReplicatedYetException("Not replicated yet:" + src);
}

// allocate new block record block locations in INode.
newBlock = allocateBlock(src, pathINodes);
pendingFile.setTargets(targets);

for (DatanodeDescriptor dn : targets) {
dn.incBlocksScheduled();
}//表示这个datanode中需要被处理的block多了一个。
}

// Create next block
return new LocatedBlock(newBlock, targets, fileLength);
}
这个函数经过一系列检测之后,调用replicator.chooseTarget把block都分配好了(下面详说),之后进行封装返回。注意此时的block还未在硬盘上进行存储,即现在只是绑定了block和它所属的file。在函数调用路径中有这样一段:
namesystem.blocksMap.addINode(block, fileNode);
BlockInfo blockInfo = namesystem.blocksMap.getStoredBlock(block);
fileNode.addBlock(blockInfo);
可以看出来,现在只是绑定了block和file(即INode)。然后在getAdditionalBlock函数中pendingFile.setTargets(targets),设置好了这个block需要复制到的目标。

6.1.2 block allocation算法

好了,看完了整体,接下来就看看block是如何分配的吧,确切的说是block的target(datanode)是如何分配的。从ReplicationTargetChooser的chooseTarget看起。这里不分析整个调用栈,只取出其关键的部分来讨论,考虑第一次分配调用的情况。
DatanodeDescriptor[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer,
List choosenNodes,
List excludedNodes,
long blocksize) {
//忽略一些

if (!clusterMap.contains(writer)) {
writer=null;
}//在一般情况下,客户端不是datanode,所以这个一般会成立。
//关键的部分
DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
excludedNodes, blocksize, maxNodesPerRack, results);

results.removeAll(choosenNodes);

// sorting nodes to form a pipeline 按照距离设置
return getPipeline((writer==null)?localNode:writer,
results.toArray(new DatanodeDescriptor[results.size()]));
}
关键部分拿下来:
private DatanodeDescriptor chooseTarget(int numOfReplicas,
DatanodeDescriptor writer,
List excludedNodes,
long blocksize,
int maxNodesPerRack,
List results) {

//忽略非第一次的判定部分

int numOfResults = results.size();//第一次分配时,size 是0
boolean newBlock = (numOfResults==0);
if (writer == null && !newBlock) {
writer = (DatanodeDescriptor)results.get(0);
}

try {
switch(numOfResults) {
case 0:
writer = chooseLocalNode(writer, excludedNodes,
blocksize, maxNodesPerRack, results);
//在一般的客户端不是datanode的情况下,这里仅仅随机选取了一个datanode作为本地node,也就是primary(待定)
if (--numOfReplicas == 0) {
break;
}
//……忽略达不到的部分
default:
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results);//随机选取了其余的target
}
} catch (NotEnoughReplicasException e) {
FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
+ numOfReplicas);
}
return writer;
}
这些函数综合起来,也就说在首次分配的时候,首先随机选取了一个datanode作为primary,之后随机选取了一些datanode作为target。这里并未体现复杂的分配算法,后续的调整部分可能会有。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息