您的位置:首页 > Web前端 > Node.js

DataNode节点的数据块管理(4)——FSDataset

2012-01-08 16:17 435 查看
本文作为DataNode节点的数据块管理系列的最后一篇博文,将详细讨论DataNode中直接为它服务而管理文件数据块的一个大家伙FSDatasetInterface,当然FSDatasetInterface只是一个接口,所要讲的主要是它的一个具体实现——FSDataset。FSDataset主要是在FSVolumeSet之上进行操作的,它的核心是为数据块创建I/O流。先来看看与FSDataset相关联的类。



下面将主要介绍FSDataset的实现及其功能:

1.getBlockReport()

当DataNode节点正常启动之后,就会定期的向NameNode汇报存储在自己节点上的所有数据块信息。每一次汇报的时间间隔可以通过DataNode节点的配置文件来设置,对应的配置项是:dfs.blockreport.intervalMsec。

public Block[] getBlockReport() {

TreeSet<Block> blockSet = new TreeSet<Block>();
volumes.getBlockInfo(blockSet);//获取“磁盘”上存储的所有数据块信息
Block blockTable[] = new Block[blockSet.size()];
int i = 0;

for (Iterator<Block> it = blockSet.iterator(); it.hasNext(); i++) {
blockTable[i] = it.next();
}

return blockTable;
}


2.invalidate(Block[])
在DataNode节点成功向NameNode注册之后,就会定期的向NameNode发送心跳包,以此来告诉NameNode节点自己当前工作正常。当NameNode接到DataNode节点的心跳包之后会发送一个响应包,同时这个响应包会顺便捎带NameNode给DataNode节点的一些命令,如删除DataNode节点是一些已经无用的数据块(数据块无用可能是用户删除了一个文件造成的)。这个的时间间隔可以通过DataNode的配置文件来设置,对应的配置项:dfs.heartbeat.interval。

public void invalidate(Block invalidBlks[]) throws IOException {
boolean error = false;
for (int i = 0; i < invalidBlks.length; i++) {
File f = null;
FSVolume v;
synchronized (this) {
f = getFile(invalidBlks[i]);//获取数据块对应的数据文件
DatanodeBlockInfo dinfo = volumeMap.get(invalidBlks[i]);//找到数据块的位置信息
if (dinfo == null) {
DataNode.LOG.warn("Unexpected error trying to delete block " + invalidBlks[i] +  ". BlockInfo not found in volumeMap.");
error = true;
continue;
}
v = dinfo.getVolume();
if (f == null) {
DataNode.LOG.warn("Unexpected error trying to delete block " + invalidBlks[i] + ". Block not found in blockMap." + ((v == null) ? " " : " Block found in volumeMap."));
error = true;
continue;
}
if (v == null) {
DataNode.LOG.warn("Unexpected error trying to delete block " + invalidBlks[i] + ". No volume for this block." + " Block found in blockMap. " + f + ".");
error = true;
continue;
}
File parent = f.getParentFile();
if (parent == null) {
DataNode.LOG.warn("Unexpected error trying to delete block " + invalidBlks[i] + ". Parent not found for file " + f + ".");
error = true;
continue;
}
v.clearPath(parent);//数据块所在的存储目录的blocks数量依次减1
volumeMap.remove(invalidBlks[i]);//清除数据块的位置映射信息      }

File metaFile = getMetaFile( f, invalidBlks[i] );//获取数据块对应的元数据文件
long blockSize = f.length()+metaFile.length();
//删除数据块对应的数据文件和元数据文件
if ( !f.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
DataNode.LOG.warn("Unexpected error trying to delete block " + invalidBlks[i] + " at file " + f);
error = true;
continue;
}
v.decDfsUsed(blockSize);//更新数据块所在“分区”的空间使用情况
DataNode.LOG.info("Deleting block " + invalidBlks[i] + " file " + f);
if (f.exists()) {
//
// This is a temporary check especially for hadoop-1220.
// This will go away in the future.
//
DataNode.LOG.info("File " + f + " was deleted but still exists!");
}
}

if (error) {
throw new IOException("Error in deleting blocks.");
}

}



3.isValidBlock(Block)

当一个数据块的副本数不够的时候,NameNode就需要复制这个数据块来增加它的副本数。NameNode会选择一个存有这个数据块的DataNode节点作为主动节点来向其它的DataNode节点传送这个数据块。在主动DataNode节点传送这个数据块之前会先检测这个数据节点,实际上就是判断本地磁盘上有没有这个数据块文件。(如果DataNode节点正在接受这个数据块时,验证也是通不过的)

public boolean isValidBlock(Block b) {
return validateBlockFile(b) != null;
}

/**
* Find the file corresponding to the block and return it if it exists.
*/
File validateBlockFile(Block b) {
//获取数据块对应的数据文件
File f = getFile(b);

if(f != null && f.exists())
return f;

if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
InterDatanodeProtocol.LOG.debug("b=" + b + ", f=" + f);
}

return null;
}


4.getLength(Block)

在DataNode节点向其它DataNode节点>复制数据块时还需要验证一个信息,就是NameNode保存的该数据块的长度是否和DataNode节点上真实的文件长度一致。

public long getLength(Block b) throws IOException {
return getBlockFile(b).length();
}

/**
* Get File name for a given block.
*/
public synchronized File getBlockFile(Block b) throws IOException {
File f = validateBlockFile(b);
if(f == null) {
if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
InterDatanodeProtocol.LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
}
throw new IOException("Block " + b + " is not valid.");
}

return f;
}


5.getStoredBlock(long)

当DataNode节点对某一个数据块进行恢复/同步操作的时候,会和该数据块的其它副本进行比较,即比较时间戳和数据大小,而这些信息需要获取到数据块的基本信息。(它不仅能够获取到在DataNode上已经存在的数据块基本信息,而且还能获取DataNode节点当前正在接受的数据块)

/*获取数据块对应的元数据文件*/
private static File findMetaFile(final File blockFile) throws IOException {
final String prefix = blockFile.getName() + "_";
final File parent = blockFile.getParentFile();
File[] matches = parent.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
return dir.equals(parent) && name.startsWith(prefix) && name.endsWith(METADATA_EXTENSION);
}
});

if (matches == null || matches.length == 0) {
throw new IOException("Meta file not found, blockFile=" + blockFile);
}
else if (matches.length > 1) {
throw new IOException("Found more than one meta files: "
+ Arrays.asList(matches));
}
return matches[0];
}

/** 通过解析数据块的元数据文件名来获取该数据块的版本时间戳 */
private static long parseGenerationStamp(File blockFile, File metaFile) throws IOException {
String metaname = metaFile.getName();
String gs = metaname.substring(blockFile.getName().length() + 1,
metaname.length() - METADATA_EXTENSION.length());
try {
return Long.parseLong(gs);
} catch(NumberFormatException nfe) {
throw (IOException)new IOException("blockFile=" + blockFile
+ ", metaFile=" + metaFile).initCause(nfe);
}
}

/** 获取数据块对应的数据文件 */
public File findBlockFile(long blockId) {
final Block b = new Block(blockId);
File blockfile = null;
ActiveFile activefile = ongoingCreates.get(b);
if (activefile != null) {
blockfile = activefile.file;
}
if (blockfile == null) {
blockfile = getFile(b);
}
if (blockfile == null) {
if (DataNode.LOG.isDebugEnabled()) {
DataNode.LOG.debug("ongoingCreates=" + ongoingCreates);
DataNode.LOG.debug("volumeMap=" + volumeMap);
}
}
return blockfile;
}

/** {@inheritDoc} */
public synchronized Block getStoredBlock(long blkid) throws IOException {
File blockfile = findBlockFile(blkid);
if (blockfile == null) {
return null;
}
File metafile = findMetaFile(blockfile);
//数据块的基本信息包括id、大小、版本
return new Block(blkid, blockfile.length(), parseGenerationStamp(blockfile, metafile));
}


6.getDfsUsed()/getCapacity()/getRemaining()

DataNode节点每一次向NameNode节点发送心跳包的时候都会带上自己当前的基本信息,这个基本信息其中就包括自己当前存储空间的状态信息(总容量,使用量,剩余量)。

public long getDfsUsed() throws IOException {
return volumes.getDfsUsed();
}

public long getCapacity() throws IOException {
return volumes.getCapacity();
}

public long getRemaining() throws IOException {
return volumes.getRemaining();
}


7.writeToBlock(Block,boolean)

当DataNode节点开始接受来自其它DataNode节点或者用户客户端传过来的一个Block数据时就需要先为该数据块创建对应的写入流,这个写入流会产生两个文件,一个是数据文件,一个是元数据文件,这个元数据文件主要包括数据文件对应的校验和信息。

public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException {
//检查Block是否已经存在
if (isValidBlock(b)) {
if (!isRecovery) {//如果Block已经存在,而又不是append则抛出异常
throw new BlockAlreadyExistsException("Block " + b + " is valid, and cannot be written to.");
}
// The other reason is that an "append" is occurring to this block.
detachBlock(b, 1);
}

long blockSize = b.getNumBytes();

//
File f = null;
List<Thread> threads = null;
synchronized (this) {
//查看Block是否已经正在接收
ActiveFile activeFile = ongoingCreates.get(b);
if (activeFile != null) {
f = activeFile.file;
threads = activeFile.threads;

if (!isRecovery) {//DataNode节点当前正在接收该Block,当此次又不是append所以出错
throw new BlockAlreadyExistsException("Block " + b + " has already been started (though not completed), and thus cannot be created.");
} else {
//停止所有正在接收该Block的线程
for (Thread thread:threads) {
thread.interrupt();
}
}
ongoingCreates.remove(b);
}

FSVolume v = null;
if (!isRecovery) {
v = volumes.getNextVolume(blockSize);
// 为Block创建一个临时中间文件来接收它的数据
f = createTmpFile(v, b);
volumeMap.put(b, new DatanodeBlockInfo(v));
} else if (f != null) {
DataNode.LOG.info("Reopen already-open Block for append " + b);
// create or reuse temporary file to hold block in the designated volume
v = volumeMap.get(b).getVolume();
volumeMap.put(b, new DatanodeBlockInfo(v));
} else {
// reopening block for appending to it.
DataNode.LOG.info("Reopen Block for append " + b);
v = volumeMap.get(b).getVolume();
f = createTmpFile(v, b);//为Block创建一个临时数据文件
File blkfile = getBlockFile(b);//获取Block对应的数据文件
File oldmeta = getMetaFile(b);//获取Block对应的元数据文件
File newmeta = getMetaFile(f, b);//获取Block的新临时元数据文件

// rename meta file to tmp directory
DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
if (!oldmeta.renameTo(newmeta)) {
throw new IOException("Block " + b + " reopen failed. " + " Unable to move meta file  " + oldmeta + " to tmp dir " + newmeta);
}

// rename block file to tmp directory
DataNode.LOG.debug("Renaming " + blkfile + " to " + f);
if (!blkfile.renameTo(f)) {
if (!f.delete()) {
throw new IOException("Block " + b + " reopen failed. " + " Unable to remove file " + f);
}
if (!blkfile.renameTo(f)) {
throw new IOException("Block " + b + " reopen failed. " + " Unable to move block file " + blkfile + " to tmp dir " + f);
}
}
volumeMap.put(b, new DatanodeBlockInfo(v));
}

if (f == null) {
DataNode.LOG.warn("Block " + b + " reopen failed " + " Unable to locate tmp file.");
throw new IOException("Block " + b + " reopen failed " + " Unable to locate tmp file.");
}
ongoingCreates.put(b, new ActiveFile(f, threads));//记录DataNode节点正在接收该Block
}

//等待所有正在接收该Block的线程结束
try {
if (threads != null) {
for (Thread thread:threads) {
thread.join();
}
}
} catch (InterruptedException e) {
throw new IOException("Recovery waiting for thread interrupted.");
}

//获取该Block的元数据文件
File metafile = getMetaFile(f, b);
DataNode.LOG.debug("writeTo blockfile is " + f + " of size " + f.length());
DataNode.LOG.debug("writeTo metafile is " + metafile + " of size " + metafile.length());
//为该Block的临时数据文件和元数据文件创建一个写入流
return createBlockWriteStreams( f , metafile);
}

public boolean detachBlock(Block block, int numLinks) throws IOException {
DatanodeBlockInfo info = null;

synchronized (this) {
info = volumeMap.get(block);
}
return info.detachBlock(block, numLinks);
}


7.finalizeBlock(Block)

当DataNode节点成功的接收了一个Block的数据之后,就需要把这个Block的数据文件和元数据文件移动到它真正的位置处。(如果是数据的最开始来源于用户,当用户在传送数据时客户端出现异常,则仍然认为接收的Block的是“成功”的,因为客户端在恢复之后选择append操作仍然可以接着再传)

public synchronized void finalizeBlock(Block b) throws IOException {
ActiveFile activeFile = ongoingCreates.get(b);
if (activeFile == null) {
throw new IOException("Block " + b + " is already finalized.");
}
File f = activeFile.file;
if (f == null || !f.exists()) {
throw new IOException("No temporary file " + f + " for block " + b);
}
FSVolume v = volumeMap.get(b).getVolume();
if (v == null) {
throw new IOException("No volume for temporary file " + f + " for block " + b);
}

File dest = null;
dest = v.addBlock(b, f);//将Block的数据文件和元数据文件移动到分配的“分区”中
volumeMap.put(b, new DatanodeBlockInfo(v, dest));
ongoingCreates.remove(b);//消除Block正在创建的记录
}


8.unfinalizeBlock(Block)

当我增加一个Block副本的时候,NameNode节点会选择一个已存在该Block副本的DataNode节点作为主节点,然后选择若干个其它DataNode节点作为备份节点,如果备份节点在接受这个Block的数据时发生I/O异常,就会删除与这个Block相关的信息

public synchronized void unfinalizeBlock(Block b) throws IOException {
// 消除Block正在创建记录
ActiveFile activefile = ongoingCreates.remove(b);
if (activefile == null) {
return;
}
volumeMap.remove(b);

//从磁盘上删除该Block的临时数据文件和元数据文件
if (delBlockFromDisk(activefile.file, getMetaFile(activefile.file, b), b)) {
DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
}
}

private boolean delBlockFromDisk(File blockFile, File metaFile, Block b) {
if (blockFile == null) {
DataNode.LOG.warn("No file exists for block: " + b);
return true;
}

if (!blockFile.delete()) {
DataNode.LOG.warn("Not able to delete the block file: " + blockFile);
return false;
} else { // remove the meta file
if (metaFile != null && !metaFile.delete()) {
DataNode.LOG.warn( "Not able to delete the meta block file: " + metaFile);
return false;
}
}

return true;
}


9.getBlockInputStream(Block,long)

HDFS支持文件的随机读,当用户想要从某一个文件中读取某个位置以后的若干数据时,HDFS的用户客户端会向NameNode节点请求该文件的起始位置位于哪一个数据块并且该数据块存放在那些DataNode节点上,而NameNode同时也会按照客户端与这些DataNode节点的距离进行升序排序。从NameNode返回之后,客户端会依次尝试从这些DataNode节点来获取数据,对应的DataNode也会从指定位置开始传送这些数据。

public synchronized InputStream getBlockInputStream(Block b, long seekOffset) throws IOException {

File blockFile = getBlockFile(b);
RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
if (seekOffset > 0) {
blockInFile.seek(seekOffset);
}

return new FileInputStream(blockInFile.getFD());
}


关于接口的其它重要的方法,我将在以后的博文中结合实际情况来详细的讨论。


                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: