您的位置:首页 > Web前端 > Node.js


2012-01-08 16:17 435 查看




public Block[] getBlockReport() {

TreeSet<Block> blockSet = new TreeSet<Block>();
Block blockTable[] = new Block[blockSet.size()];
int i = 0;

for (Iterator<Block> it = blockSet.iterator(); it.hasNext(); i++) {
blockTable[i] = it.next();

return blockTable;


public void invalidate(Block invalidBlks[]) throws IOException {
boolean error = false;
for (int i = 0; i < invalidBlks.length; i++) {
File f = null;
FSVolume v;
synchronized (this) {
f = getFile(invalidBlks[i]);//获取数据块对应的数据文件
DatanodeBlockInfo dinfo = volumeMap.get(invalidBlks[i]);//找到数据块的位置信息
if (dinfo == null) {
DataNode.LOG.warn("Unexpected error trying to delete block " + invalidBlks[i] +  ". BlockInfo not found in volumeMap.");
error = true;
v = dinfo.getVolume();
if (f == null) {
DataNode.LOG.warn("Unexpected error trying to delete block " + invalidBlks[i] + ". Block not found in blockMap." + ((v == null) ? " " : " Block found in volumeMap."));
error = true;
if (v == null) {
DataNode.LOG.warn("Unexpected error trying to delete block " + invalidBlks[i] + ". No volume for this block." + " Block found in blockMap. " + f + ".");
error = true;
File parent = f.getParentFile();
if (parent == null) {
DataNode.LOG.warn("Unexpected error trying to delete block " + invalidBlks[i] + ". Parent not found for file " + f + ".");
error = true;
volumeMap.remove(invalidBlks[i]);//清除数据块的位置映射信息      }

File metaFile = getMetaFile( f, invalidBlks[i] );//获取数据块对应的元数据文件
long blockSize = f.length()+metaFile.length();
if ( !f.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
DataNode.LOG.warn("Unexpected error trying to delete block " + invalidBlks[i] + " at file " + f);
error = true;
DataNode.LOG.info("Deleting block " + invalidBlks[i] + " file " + f);
if (f.exists()) {
// This is a temporary check especially for hadoop-1220.
// This will go away in the future.
DataNode.LOG.info("File " + f + " was deleted but still exists!");

if (error) {
throw new IOException("Error in deleting blocks.");




public boolean isValidBlock(Block b) {
return validateBlockFile(b) != null;

* Find the file corresponding to the block and return it if it exists.
File validateBlockFile(Block b) {
File f = getFile(b);

if(f != null && f.exists())
return f;

if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
InterDatanodeProtocol.LOG.debug("b=" + b + ", f=" + f);

return null;



public long getLength(Block b) throws IOException {
return getBlockFile(b).length();

* Get File name for a given block.
public synchronized File getBlockFile(Block b) throws IOException {
File f = validateBlockFile(b);
if(f == null) {
if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
InterDatanodeProtocol.LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
throw new IOException("Block " + b + " is not valid.");

return f;



private static File findMetaFile(final File blockFile) throws IOException {
final String prefix = blockFile.getName() + "_";
final File parent = blockFile.getParentFile();
File[] matches = parent.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
return dir.equals(parent) && name.startsWith(prefix) && name.endsWith(METADATA_EXTENSION);

if (matches == null || matches.length == 0) {
throw new IOException("Meta file not found, blockFile=" + blockFile);
else if (matches.length > 1) {
throw new IOException("Found more than one meta files: "
+ Arrays.asList(matches));
return matches[0];

/** 通过解析数据块的元数据文件名来获取该数据块的版本时间戳 */
private static long parseGenerationStamp(File blockFile, File metaFile) throws IOException {
String metaname = metaFile.getName();
String gs = metaname.substring(blockFile.getName().length() + 1,
metaname.length() - METADATA_EXTENSION.length());
try {
return Long.parseLong(gs);
} catch(NumberFormatException nfe) {
throw (IOException)new IOException("blockFile=" + blockFile
+ ", metaFile=" + metaFile).initCause(nfe);

/** 获取数据块对应的数据文件 */
public File findBlockFile(long blockId) {
final Block b = new Block(blockId);
File blockfile = null;
ActiveFile activefile = ongoingCreates.get(b);
if (activefile != null) {
blockfile = activefile.file;
if (blockfile == null) {
blockfile = getFile(b);
if (blockfile == null) {
if (DataNode.LOG.isDebugEnabled()) {
DataNode.LOG.debug("ongoingCreates=" + ongoingCreates);
DataNode.LOG.debug("volumeMap=" + volumeMap);
return blockfile;

/** {@inheritDoc} */
public synchronized Block getStoredBlock(long blkid) throws IOException {
File blockfile = findBlockFile(blkid);
if (blockfile == null) {
return null;
File metafile = findMetaFile(blockfile);
return new Block(blkid, blockfile.length(), parseGenerationStamp(blockfile, metafile));



public long getDfsUsed() throws IOException {
return volumes.getDfsUsed();

public long getCapacity() throws IOException {
return volumes.getCapacity();

public long getRemaining() throws IOException {
return volumes.getRemaining();



public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException {
if (isValidBlock(b)) {
if (!isRecovery) {//如果Block已经存在,而又不是append则抛出异常
throw new BlockAlreadyExistsException("Block " + b + " is valid, and cannot be written to.");
// The other reason is that an "append" is occurring to this block.
detachBlock(b, 1);

long blockSize = b.getNumBytes();

File f = null;
List<Thread> threads = null;
synchronized (this) {
ActiveFile activeFile = ongoingCreates.get(b);
if (activeFile != null) {
f = activeFile.file;
threads = activeFile.threads;

if (!isRecovery) {//DataNode节点当前正在接收该Block,当此次又不是append所以出错
throw new BlockAlreadyExistsException("Block " + b + " has already been started (though not completed), and thus cannot be created.");
} else {
for (Thread thread:threads) {

FSVolume v = null;
if (!isRecovery) {
v = volumes.getNextVolume(blockSize);
// 为Block创建一个临时中间文件来接收它的数据
f = createTmpFile(v, b);
volumeMap.put(b, new DatanodeBlockInfo(v));
} else if (f != null) {
DataNode.LOG.info("Reopen already-open Block for append " + b);
// create or reuse temporary file to hold block in the designated volume
v = volumeMap.get(b).getVolume();
volumeMap.put(b, new DatanodeBlockInfo(v));
} else {
// reopening block for appending to it.
DataNode.LOG.info("Reopen Block for append " + b);
v = volumeMap.get(b).getVolume();
f = createTmpFile(v, b);//为Block创建一个临时数据文件
File blkfile = getBlockFile(b);//获取Block对应的数据文件
File oldmeta = getMetaFile(b);//获取Block对应的元数据文件
File newmeta = getMetaFile(f, b);//获取Block的新临时元数据文件

// rename meta file to tmp directory
DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
if (!oldmeta.renameTo(newmeta)) {
throw new IOException("Block " + b + " reopen failed. " + " Unable to move meta file  " + oldmeta + " to tmp dir " + newmeta);

// rename block file to tmp directory
DataNode.LOG.debug("Renaming " + blkfile + " to " + f);
if (!blkfile.renameTo(f)) {
if (!f.delete()) {
throw new IOException("Block " + b + " reopen failed. " + " Unable to remove file " + f);
if (!blkfile.renameTo(f)) {
throw new IOException("Block " + b + " reopen failed. " + " Unable to move block file " + blkfile + " to tmp dir " + f);
volumeMap.put(b, new DatanodeBlockInfo(v));

if (f == null) {
DataNode.LOG.warn("Block " + b + " reopen failed " + " Unable to locate tmp file.");
throw new IOException("Block " + b + " reopen failed " + " Unable to locate tmp file.");
ongoingCreates.put(b, new ActiveFile(f, threads));//记录DataNode节点正在接收该Block

try {
if (threads != null) {
for (Thread thread:threads) {
} catch (InterruptedException e) {
throw new IOException("Recovery waiting for thread interrupted.");

File metafile = getMetaFile(f, b);
DataNode.LOG.debug("writeTo blockfile is " + f + " of size " + f.length());
DataNode.LOG.debug("writeTo metafile is " + metafile + " of size " + metafile.length());
return createBlockWriteStreams( f , metafile);

public boolean detachBlock(Block block, int numLinks) throws IOException {
DatanodeBlockInfo info = null;

synchronized (this) {
info = volumeMap.get(block);
return info.detachBlock(block, numLinks);



public synchronized void finalizeBlock(Block b) throws IOException {
ActiveFile activeFile = ongoingCreates.get(b);
if (activeFile == null) {
throw new IOException("Block " + b + " is already finalized.");
File f = activeFile.file;
if (f == null || !f.exists()) {
throw new IOException("No temporary file " + f + " for block " + b);
FSVolume v = volumeMap.get(b).getVolume();
if (v == null) {
throw new IOException("No volume for temporary file " + f + " for block " + b);

File dest = null;
dest = v.addBlock(b, f);//将Block的数据文件和元数据文件移动到分配的“分区”中
volumeMap.put(b, new DatanodeBlockInfo(v, dest));



public synchronized void unfinalizeBlock(Block b) throws IOException {
// 消除Block正在创建记录
ActiveFile activefile = ongoingCreates.remove(b);
if (activefile == null) {

if (delBlockFromDisk(activefile.file, getMetaFile(activefile.file, b), b)) {
DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );

private boolean delBlockFromDisk(File blockFile, File metaFile, Block b) {
if (blockFile == null) {
DataNode.LOG.warn("No file exists for block: " + b);
return true;

if (!blockFile.delete()) {
DataNode.LOG.warn("Not able to delete the block file: " + blockFile);
return false;
} else { // remove the meta file
if (metaFile != null && !metaFile.delete()) {
DataNode.LOG.warn( "Not able to delete the meta block file: " + metaFile);
return false;

return true;



public synchronized InputStream getBlockInputStream(Block b, long seekOffset) throws IOException {

File blockFile = getBlockFile(b);
RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
if (seekOffset > 0) {

return new FileInputStream(blockInFile.getFD());


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息