您的位置:首页 > Web前端 > Node.js

Hadoop DataNode启动之asyncBlockReport

2013-10-01 22:09 531 查看
   DN会不定期定期向NN发送块报告,以使NN能知道自己的块存储情况,便于对外提供服务,对于Hadoop这个大货来说,存放的数据量非常大,如果每次块报告时同步的扫描block显然是不现实的,这时就需要有服务把块报告先准备好,以提高报告的效率,这个服务就是asyncBlockReport,这是一个后台守护线程,在DN创建FSDataset时一并启动。



public FSDataset(DataStorage storage, Configuration conf) throws IOException {
.....
.....
FSVolume[] volArray = new FSVolume[storage.getNumStorageDirs()];
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
}
//获得卷组结构
volumes = new FSVolumeSet(volArray);
//获得数据块到块文件的映射,并存放到HashMap中
volumes.getVolumeMap(volumeMap);
//创建异步块报告实例并启动
asyncBlockReport = new AsyncBlockReport(this);
asyncBlockReport.start();
File[] roots = new File[storage.getNumStorageDirs()];
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
roots[idx] = storage.getStorageDir(idx).getCurrentDir();
}
asyncDiskService = new FSDatasetAsyncDiskService(roots);
registerMBean(storage.getStorageID());
}

看下线程的执行体

public void run() {
while (shouldRun) {
try {
waitForReportRequest();//等待扫描请求,在DN的启动过程中会先扫描一次
assert requested && scan == null;
//打印日志,并记录启动时间
DataNode.LOG.info("Starting asynchronous block report scan");
long st = System.currentTimeMillis();
//开始扫描,并生成块报告
HashMap<Block, File> result = fsd.roughBlockScan();
DataNode.LOG.info("Finished asynchronous block report scan in "
+ (System.currentTimeMillis() - st) + "ms");
//给blockreport赋值
synchronized (this) {
assert scan == null;
this.scan = result;
}
} catch (InterruptedException ie) {
// interrupted to end scanner
} catch (Throwable t) {
DataNode.LOG.error("Async Block Report thread caught exception", t);
try {
// Avoid busy-looping in the case that we have entered some invalid
// state -- don't want to flood the error log with exceptions.
Thread.sleep(2000);
} catch (InterruptedException e) {
}
}
}
}

如何扫描是我们关心的,看下roughBlockScan函数,扫描时并未对目录加锁,有可能更新正在进行,所以这是一个比较粗糙的块报告,但也提供了更高的性能

HashMap<Block, File> roughBlockScan() {
int expectedNumBlocks;
synchronized (this) {
expectedNumBlocks = volumeMap.size();
}
HashMap<Block, File> seenOnDisk =
new HashMap<Block, File>(expectedNumBlocks, 1.1f);
//开始扫描
volumes.scanBlockFilesInconsistent(seenOnDisk);
return seenOnDisk;
}


继续贴scanBlockFilesInconsistent函数

void scanBlockFilesInconsistent(Map<Block, File> results) {
// 创建文件卷的一个快照,以防扫描时发生更改
FSVolume volumesCopy[];
synchronized (this) {
volumesCopy = Arrays.copyOf(volumes, volumes.length);
}
for (FSVolume vol : volumesCopy) {
vol.scanBlockFilesInconsistent(results);//注意这里
}
}

层层调用

void scanBlockFilesInconsistent(Map<Block, File> results) {
scanBlockFilesInconsistent(dataDir.dir, results);
}

下面看真正干活的函数scanBlockFilesInconsistent,注意这里生成的块报告并不是同步的,因为在扫描过程中可能会有块的增加或删除,所以在向NN发送之前会通过reconcileRoughBlockScan再次进行核对

private void scanBlockFilesInconsistent(
File dir, Map<Block, File> results) {
//获得数据目录下的所有文件
File filesInDir[] = dir.listFiles();
if (filesInDir != null) {
for (File f : filesInDir) {
//判断是否为块文件
if (Block.isBlockFilename(f)) {
long blockLen = f.length();
//文件是否存在,因为扫描时可能会被删除
if (blockLen == 0 && !f.exists()) {
// length 0 could indicate a race where this file was removed
// while we were in the middle of generating the report.
continue;
}
//生成一个标志位,并用该标识创建块实例
long genStamp = FSDataset.getGenerationStampFromFile(filesInDir, f);
Block b = new Block(f, blockLen, genStamp);
//构建一个blockreport条目,存入HashMap
results.put(b, f);
} else if (f.getName().startsWith("subdir")) {
// 如果有子目录则进行递归扫描
scanBlockFilesInconsistent(f, results);
}
}
}
}

每个数据目录都会进行相同的操作,待函数执行完,一个可能不一致的blockreport就产生了,等重新核对报告后便会向NN发送该报告。

 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: