您的位置:首页 > Web前端 > Node.js

Hadoop DataNode启动之DataBlockScanner

2013-10-08 15:53 393 查看
  存储在HDFS中的数据块每隔一段时间就会进行校验,因为由于硬件故障,系统BUG,异常断电等问题会导致数据块损坏,这时就需要HDFS采用其他块副本来修复。Hadoop为了感应块的损坏会启动一个后台线程周期性的对数据块进行检测,这个原理和客户端读取数据时的校验规则一样,以块为单位,每读取一个block,都要对其内的每一个chunk进行校验,默认情况下一个block大小为64M,每个chunk大小为512字节,下面我们从启动开始分析下这个过程

 


 DataBlockScanner在DN启动时创建,在startDataNode函数中

void startDataNode(Configuration conf,
AbstractList<File> dataDirs, SecureResources resources
) throws IOException {
......
//initialize periodic block scanner
String reason = null;
//获得扫描周期,默认为三周,参数单位为小时
if (conf.getInt("dfs.datanode.scan.period.hours", 0) < 0) {
reason = "verification is turned off by configuration";
} else if ( !(data instanceof FSDataset) ) {
reason = "verifcation is supported only with FSDataset";
}
if ( reason == null ) {
//开始创建块扫描器,但并为启动
blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);
} else {
LOG.info("Periodic Block Verification is disabled because " +
reason + ".");
}
......
}

块扫描器的启动在DN主循环的offerService中
public void offerService() throws Exception {
LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec" +
" Initial delay: " + initialBlockReportDelay + "msec");
//
// Now loop for a long time....
//
while (shouldRun) {
try {
....
....
// start block scanner
if (blockScanner != null && blockScannerThread == null &&
upgradeManager.isUpgradeCompleted()) {
LOG.info("Starting Periodic block scanner.");
//创建后台守护线程并启动
blockScannerThread = new Daemon(blockScanner);
blockScannerThread.start();
}
....
....
} // offerService

 块校验的具体流程包含在线程执行体中,大致分为三个部分 1、块集合初始化  2、更新block校验时间 3、循环校验负责规则的block。中间还有节流器控制
public void run() {
try {
//1、块集合初始化
init();

// 2、更新block校验时间
if (!assignInitialVerificationTimes()) {
return;
}
//节流器配置
adjustThrottler();
//3、循环校验负责规则的block
while (datanode.shouldRun && !Thread.interrupted()) {
long now = System.currentTimeMillis();
synchronized (this) {
if ( now >= (currentPeriodStart + scanPeriod)) {
//开启新的校验周期
startNewPeriod();
}
}
if ( (now - getEarliestScanTime()) >= scanPeriod ) {
//校验一个数据块
verifyFirstBlock();
} else {
try {
//循环间隔1秒
Thread.sleep(1000);
} catch (InterruptedException ignored) {}
}
}
} catch (RuntimeException e) {
LOG.warn("RuntimeException during DataBlockScanner.run() : " +
StringUtils.stringifyException(e));
throw e;
} finally {
shutdown();
LOG.info("Exiting DataBlockScanner thread.");
}
}

  先看初始化部分,他会获得当前块报告,构建扫描信息类,获得块校验日志文件
private void init() {
// 获得块报告数组,并打乱顺序,以使块扫描不会总发生在同一个卷组上
Block arr[] = dataset.getBlockReport();
Collections.shuffle(Arrays.asList(arr));
//创建存放扫描信息的结构
blockInfoSet = new TreeSet<BlockScanInfo>();
blockMap = new HashMap<Block, BlockScanInfo>();
//填充扫描信息集合
long scanTime = -1;
for (Block block : arr) {
BlockScanInfo info = new BlockScanInfo( block );
info.lastScanTime = scanTime--;
//still keep 'info.lastScanType' to NONE.
addBlockInfo(info);
}

//获得扫描日志
File dir = null;
FSDataset.FSVolume[] volumes = dataset.volumes.volumes;
for(FSDataset.FSVolume vol : volumes) {
if (LogFileHandler.isFilePresent(vol.getDir(), verificationLogFile)) {
dir = vol.getDir();
break;
}
}
if (dir == null) {
dir = volumes[0].getDir();
}

try {
// 创建日志记录器
verificationLog = new LogFileHandler(dir, verificationLogFile, 100);
} catch (IOException e) {
LOG.warn("Could not open verfication log. " +
"Verification times are not stored.");
}

synchronized (this) {
//创建节流器
throttler = new BlockTransferThrottler(200, MAX_SCAN_RATE);
}
}

  下面看更新block校验时间的部分assignInitialVerificationTimes
private boolean assignInitialVerificationTimes() {
int numBlocks = 1;
synchronized (this) {
//获得块数量
numBlocks = Math.max(blockMap.size(), 1);
}

//构建日志读取器
LogFileHandler.Reader logReader = null;
try {
if (verificationLog != null) {
logReader = verificationLog.new Reader(false);
}
} catch (IOException e) {
LOG.warn("Could not read previous verification times : " +
StringUtils.stringifyException(e));
}
//循环读取日志,并更新当前行数
if (verificationLog != null) {
verificationLog.updateCurNumLines();
}

try {
// 根据校验日志更新块的校验时间
while (logReader != null && logReader.hasNext()) {
if (!datanode.shouldRun || Thread.interrupted()) {
return false;
}
//获取日志条目并更新
LogEntry entry = LogEntry.parseEntry(logReader.next());
if (entry != null) {
updateBlockInfo(entry);
}
}
} finally {
//更新完毕后关闭日志流
IOUtils.closeStream(logReader);
}

//两个块之间的校验间隔,避免频繁块扫描
long verifyInterval = (long) (Math.min( scanPeriod/2.0/numBlocks,
10*60*1000 ));
//最后校验时间
long lastScanTime = System.currentTimeMillis() - scanPeriod;

/* Before this loop, entries in blockInfoSet that are not
* updated above have lastScanTime of <= 0 . Loop until first entry has
* lastModificationTime > 0.
*/
synchronized (this) {
if (blockInfoSet.size() > 0 ) {
BlockScanInfo info;
//循环更新块校验时间和间隔
while ((info =  blockInfoSet.first()).lastScanTime < 0) {
delBlockInfo(info);
info.lastScanTime = lastScanTime;
lastScanTime += verifyInterval;
addBlockInfo(info);
}
}
}
return true;
}

下面看块校验部分,这也是blockscanner的主体部分,包含了校验逻辑,其实这个逻辑和上一篇dataXceiverServer中块读取的逻辑一样,只不过这里块数据读取后并不发送到客户端,而是发送到一个空流里,我们先看如何开始一个新的扫描周期
private synchronized void startNewPeriod() {
LOG.info("Starting a new period : work left in prev period : " +
String.format("%.2f%%", (bytesLeft * 100.0)/totalBytesToScan));
// reset the byte counts :
bytesLeft = totalBytesToScan; //记录需要扫描的字节数
currentPeriodStart = System.currentTimeMillis();//记录本周期的启动时间
}

块校验函数如下
private void verifyFirstBlock() {
Block block = null;
synchronized (this) {
if ( blockInfoSet.size() > 0 ) {
//获取一个block,所有blockinfo放在一个TreeSet中,所里内部是排序过的
block = blockInfoSet.first().block;
}
}

if ( block != null ) {
verifyBlock(block);//开始校验
}
}  

在blockSender.sendBlock函数中会走上一篇dataXceiverServer中一样的块读取逻辑
private void verifyBlock(Block block) {
BlockSender blockSender = null;
//安全起见,如果第一次校验失败则会再校验校验一次
for (int i=0; i<2; i++) {
boolean second = (i > 0);
try {
adjustThrottler();
//构建BlockSender
blockSender = new BlockSender(block, 0, -1, false,
false, true, datanode);
//构建空的数据输出流
DataOutputStream out =
new DataOutputStream(new IOUtils.NullOutputStream());
//发送一个块
blockSender.sendBlock(out, null, throttler);
//当块更新完毕后则记录日志
LOG.info((second ? "Second " : "") +
"Verification succeeded for " + block);

if ( second ) {
totalTransientErrors++;
}
//更新扫描状态
updateScanStatus(block, ScanType.VERIFICATION_SCAN, true);

return;
} catch (IOException e) {
........
} finally {
IOUtils.closeStream(blockSender);
datanode.getMetrics().incrBlocksVerified();
totalScans++;
totalVerifications++;
}
}
}

一旦块扫描完毕,就会更新扫描状态,通过函数updateScanStatus实现,内部调用updateScanStatusInternal
private synchronized void updateScanStatusInternal(Block block,
ScanType type, boolean scanOk, boolean updateOnly) {

if (!isInitialized()) {
return;
}
BlockScanInfo info = blockMap.get(block);

if ( info != null ) {
delBlockInfo(info);//删除旧的扫描信息
} else {
if (updateOnly) {
return;
}
// It might already be removed. Thats ok, it will be caught next time.
info = new BlockScanInfo(block);
}
//构建新的扫描信息
long now = System.currentTimeMillis();
info.lastScanType = type;
info.lastScanTime = now;
info.lastScanOk = scanOk;
addBlockInfo(info);

if (type == ScanType.REMOTE_READ) {
totalVerifications++;
}

// Don't update meta data too often in case of REMOTE_READ
// of if the verification failed.
long diff = now - info.lastLogTime;
if (!scanOk || (type == ScanType.REMOTE_READ &&
diff < scanPeriod/3 && diff < ONE_DAY)) {
return;
}

info.lastLogTime = now;
LogFileHandler log = verificationLog;
if (log != null) {
//记录扫描日志
log.appendLine(LogEntry.newEnry(block, now));
}
}

校验日志位于current目录下,文件名为dncp_block_verification.log.curr,校验日志格式如下:
date="2013-09-2316:49:22,824" 校验时间:日期时间格式,毫秒数

time="1379926162824"          与上面表示同一时间,此处为时间戳格式

genstamp="1002"               块标记

id="-8049298064173279059"     块ID

通过上面的信息就可以确定一个块精确到毫秒的校验时间,具体时间通过System.currentTimeMillis()来获得。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: