您的位置:首页 > Web前端 > Node.js

【框架解析】Hadoop系统分析(五)--namenode其他

2012-08-21 13:59 746 查看
upgrade/rollback/importCheckpoint

在FsImage.recoverTransitionRead方法中,针对upgrade/rollback/importCheckpoint参数,在启动前做了特殊的操作,代码如下:

switch(startOpt) {
case UPGRADE:
doUpgrade();
return false; // upgrade saved image already
case IMPORT:
doImportCheckpoint();
return true;
case ROLLBACK:
doRollback();
break;
case REGULAR:
// just load the image
}


hadoop namenode -upgrade

进行hadoop namenode的升级操作。调用FsImage.doUpgrade方法,主要操作是将current保存为previous,升级后的fsimage保存为current。

if(getDistributedUpgradeState()) {
// only distributed upgrade need to continue
// don't do version upgrade
this.loadFSImage();
initializeDistributedUpgrade();
return;
}
// Upgrade is allowed only if there are
// no previous fs states in any of the directories
for (Iterator<StorageDirectory> it =
dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
//如果已经有previous目录,则无法重新upgrade
if (sd.getPreviousDir().exists())
throw new InconsistentFSStateException(sd.getRoot(),
"previous fs state should not exist during upgrade. "
+ "Finalize or rollback first.");
}

// 把硬盘上最近的fsimage加载进内存
this.loadFSImage();

// Do upgrade for each directory
long oldCTime = this.getCTime();
this.cTime = FSNamesystem.now();  // generate new cTime for the state
int oldLV = this.getLayoutVersion();
this.layoutVersion = FSConstants.LAYOUT_VERSION;
this.checkpointTime = FSNamesystem.now();
for (Iterator<StorageDirectory> it =
dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
LOG.info("Upgrading image directory " + sd.getRoot()
+ ".\n   old LV = " + oldLV
+ "; old CTime = " + oldCTime
+ ".\n   new LV = " + this.getLayoutVersion()
+ "; new CTime = " + this.getCTime());
File curDir = sd.getCurrentDir();
File prevDir = sd.getPreviousDir();
File tmpDir = sd.getPreviousTmp();
assert curDir.exists() : "Current directory must exist.";
assert !prevDir.exists() : "prvious directory must not exist.";
assert !tmpDir.exists() : "prvious.tmp directory must not exist.";
// 把current目录改为previous.tmp目录
rename(curDir, tmpDir);
// 把内存中的fsimage保存到current中
saveCurrent(sd);
// 把previous.tmp目录修改为previous目录
rename(tmpDir, prevDir);
//标记升级没有完成(升级成功需要进行finalize,升级失败需要进行rollback来完成升级)
isUpgradeFinalized = false;
LOG.info("Upgrade of " + sd.getRoot() + " is complete.");
}
//初始化升级后操作,主要是计算version写VERSION文件
initializeDistributedUpgrade();
//开启editlog
editLog.open();

hadoop namenode -rollback

升级hadoop失败后回滚,主要是把upgrade时变更的数据目录恢复原状,将current修改为removed.tmp,previous修改回current,最后删除removed.tmp

// Rollback is allowed only if there is
// a previous fs states in at least one of the storage directories.
// Directories that don't have previous state do not rollback
boolean canRollback = false;
FSImage prevState = new FSImage();
prevState.layoutVersion = FSConstants.LAYOUT_VERSION;
for (Iterator<StorageDirectory> it =
dirIterator(); it.hasNext();) {
//将previous目录里的版本信息加载进内存,如果没有previous目录,则加载current目录
StorageDirectory sd = it.next();
File prevDir = sd.getPreviousDir();
if (!prevDir.exists()) {  // use current directory then
LOG.info("Storage directory " + sd.getRoot()
+ " does not contain previous fs state.");
sd.read(); // read and verify consistency with other directories
continue;
}
StorageDirectory sdPrev = prevState.new StorageDirectory(sd.getRoot());
sdPrev.read(sdPrev.getPreviousVersionFile());  // read and verify consistency of the prev dir
canRollback = true;
}
if (!canRollback)
throw new IOException("Cannot rollback. "
+ "None of the storage directories contain previous fs state.");

// Now that we know all directories are going to be consistent
// Do rollback for each directory containing previous state
for (Iterator<StorageDirectory> it =
dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
File prevDir = sd.getPreviousDir();
if (!prevDir.exists())
continue;

LOG.info("Rolling back storage directory " + sd.getRoot()
+ ".\n   new LV = " + prevState.getLayoutVersion()
+ "; new CTime = " + prevState.getCTime());
File tmpDir = sd.getRemovedTmp();
assert !tmpDir.exists() : "removed.tmp directory must not exist.";
// rename current to tmp
File curDir = sd.getCurrentDir();
assert curDir.exists() : "Current directory must exist.";
//将current目录更改为removed.tmp
rename(curDir, tmpDir);
// rename previous to current
//将previous更改为current
rename(prevDir, curDir);

// delete tmp dir
//删除removed.tmp
deleteDir(tmpDir);
LOG.info("Rollback of " + sd.getRoot()+ " is complete.");
}
isUpgradeFinalized = true;
// check whether name-node can start in regular mode
verifyDistributedUpgradeProgress(StartupOption.REGULAR);

hadoop namenode -importCheckpoint

使用备份的checkpoint启动namenode,之后将加载进来的ckp保存为current

FSImage ckptImage = new FSImage();
FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
// replace real image with the checkpoint image
FSImage realImage = fsNamesys.getFSImage();
assert realImage == this;
fsNamesys.dir.fsImage = ckptImage;
// load from the checkpoint dirs
try {
//读取checkpointDirs目录与checkpointEditsDirs目录,合并后放入内存
//checkpointDirs值为fs.checkpoint.dir配置项,如果没有则为null,secondarynamenode上默认值为/tmp/hadoop/dfs/namesecondary
//checkpointEditsDirs值为fs.checkpoint.edits.dir配置项,如果没有则为null,secondarynamenode上默认值为/tmp/hadoop/dfs/namesecondary
ckptImage.recoverTransitionRead(checkpointDirs, checkpointEditsDirs,
StartupOption.REGULAR);
} finally {
ckptImage.close();
}
// return back the real image
realImage.setStorageInfo(ckptImage);
fsNamesys.dir.fsImage = realImage;
// and save it 保存fsimage
saveNamespace(false);


hadoop namenode -finalize

升级完成后,使用hadoop namenode -finalize来进行确认,finalize之后就不能rollback了

回到NameNode.createNameNode方法中,进入FINALIZE分支

case FINALIZE:
aborted = finalize(conf, true);
System.exit(aborted ? 1 : 0);
之后进入FSImage.doFinalize进行完成升级,主要工作就是删除previous目录,标记升级结束

File prevDir = sd.getPreviousDir();
if (!prevDir.exists()) { // already discarded
LOG.info("Directory " + prevDir + " does not exist.");
LOG.info("Finalize upgrade for " + sd.getRoot()+ " is not required.");
return;
}
LOG.info("Finalizing upgrade for storage directory "
+ sd.getRoot() + "."
+ (getLayoutVersion()==0 ? "" :
"\n   cur LV = " + this.getLayoutVersion()
+ "; cur CTime = " + this.getCTime()));
assert sd.getCurrentDir().exists() : "Current directory must exist.";
final File tmpDir = sd.getFinalizedTmp();
// rename previous to tmp and remove
//将previous修改为finalized.tmp
rename(prevDir, tmpDir);
//删除finalized.tmp
deleteDir(tmpDir);
isUpgradeFinalized = true;
LOG.info("Finalize upgrade for " + sd.getRoot()+ " is complete.");
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: