您的位置:首页 > 数据库

leveldb代码阅读(5)——恢复数据库的状态

2016-01-14 16:42 573 查看
1、创建一个以数据库名字命名的目录dbname

2、锁住文件锁

3、判断Current文件是否存在,Current文件指向当前使用Manifest文件

4、如果Current已经存在了,那么表示出错(啥都没干就存在这个文件,当然出错!)

5、如果Current文件不存在,而且指定了create_if_missing标志,那么就调用NewDB函数创建一个数据库;如果没有指定create_if_missing标志那么出错。

6、恢复当前版本

7、获取dbname目录下的所有文件(一般都是日志文件)

8、保留生效的文件,删除过期的文件

9、根据日志文件依次恢复数据

// 把数据库恢复到上次退出的状态
Status DBImpl:: Recover(VersionEdit* edit, bool *save_manifest)
{

// 断言是否持有锁
mutex_.AssertHeld();

// Ignore error from CreateDir since the creation of the DB is
// committed only when the descriptor is created, and this directory
// may already exist from a previous failed creation attempt.

// 创建文件夹
env_->CreateDir(dbname_);
assert(db_lock_ == NULL);

// 锁住文件
Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
if (!s.ok())
{
return s;
}

// 判断Current文件是否存在
if (!env_->FileExists(CurrentFileName(dbname_)))
{
// 如果指定了创建标志
if (options_.create_if_missing)
{
// 创建新的数据库
s = NewDB();
if (!s.ok())
{
return s;
}
} else
{
// 无效参数
return Status::InvalidArgument(
dbname_, "does not exist (create_if_missing is false)");
}
}
else
{
// 如果Current文件已经存在了那么表示出错
if (options_.error_if_exists) {
return Status::InvalidArgument(
dbname_, "exists (error_if_exists is true)");
}
}

// 恢复
s = versions_->Recover(save_manifest);
if (!s.ok())
{
return s;
}

// 序号
SequenceNumber max_sequence(0);

// Recover from all newer log files than the ones named in the
// descriptor (new log files may have been added by the previous
// incarnation without registering them in the descriptor).
//
// Note that PrevLogNumber() is no longer used, but we pay
// attention to it in case we are recovering a database
// produced by an older version of leveldb.
const uint64_t min_log = versions_->LogNumber();
const uint64_t prev_log = versions_->PrevLogNumber();
std::vector<std::string> filenames;

// 有一个以数据库名字命名的文件夹
// 这个函数是获取该目录下所有的文件名
s = env_->GetChildren(dbname_, &filenames);
if (!s.ok()) {
return s;
}
std::set<uint64_t> expected;

// 这个函数实质是获取仍然存活(仍然有效)的文件
versions_->AddLiveFiles(&expected);
uint64_t number;
FileType type;
std::vector<uint64_t> logs;

// 对于dbname_文件夹下的文件,有的还有效,有的已经失效了
for (size_t i = 0; i < filenames.size(); i++)
{
// 解析文件名的文件编号和文件类型
if (ParseFileName(filenames[i], &number, &type))
{
// 从这里删除的目的是为了最后看看还有哪些文件名是不能够解析的
expected.erase(number);

// 如果这个文件还有效
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
logs.push_back(number);
}
}

// 如果这个数组不为空,那么表示有的文件名解析不了,出错!
if (!expected.empty()) {
char buf[50];
snprintf(buf, sizeof(buf), "%d missing files; e.g.",
static_cast<int>(expected.size()));
return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
}

// Recover in the order in which the logs were generated
// 对文件名编号进行排序
std::sort(logs.begin(), logs.end());

// 恢复每一个日志文件
for (size_t i = 0; i < logs.size(); i++)
{
s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,&max_sequence);
if (!s.ok())
{
return s;
}

// The previous incarnation may not have written any MANIFEST
// records after allocating this log number.  So we manually
// update the file number allocation counter in VersionSet.
// 记录哪些文件编号已经被使用
versions_->MarkFileNumberUsed(logs[i]);
}

// 最后使用的序号更新
if (versions_->LastSequence() < max_sequence)
{
versions_->SetLastSequence(max_sequence);
}

return Status::OK();
}
// 把内存table转储到硬盘上
Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
Version* base) {
mutex_.AssertHeld();

// 当前的时间
const uint64_t start_micros = env_->NowMicros();

// 定义一个文件元数据
FileMetaData meta;

// 设置一个编号
meta.number = versions_->NewFileNumber();

// 把元数据的编号插入列表中以防止被意外的删掉
pending_outputs_.insert(meta.number);

// 创建一个内存迭代器
Iterator* iter = mem->NewIterator();

Log(options_.info_log, "Level-0 table #%llu: started",(unsigned long long) meta.number);

Status s;
{
mutex_.Unlock();
// 构建一张表(SSTable),并创建一个文件元数据(主要记录了这个table的大小、最大/最小键值等信息)
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
mutex_.Lock();
}

Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
(unsigned long long) meta.number,
(unsigned long long) meta.file_size,
s.ToString().c_str());

delete iter;

// 从保护列表中删除这个元数据的编号
pending_outputs_.erase(meta.number);

// Note that if file_size is zero, the file has been deleted and
// should not be added to the manifest.
int level = 0;
if (s.ok() && meta.file_size > 0) {

// 获取这个文件元数据的最小/最大键值
const Slice min_user_key = meta.smallest.user_key();
const Slice max_user_key = meta.largest.user_key();

// 如果版本不为空
if (base != NULL) {
// 为内存table的转储选择一个level(后台进程会把内存表格转储到这个level上)
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
}

// 对level的文件进行版本变更,记录新增了哪些文件,留待后续处理
edit->AddFile(level, meta.number, meta.file_size,
meta.smallest, meta.largest);
}

CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.file_size;

// 为第level层增加新状态
stats_[level].Add(stats);
return s;
}
// 构建SSTable
Status BuildTable(const std::string& dbname, // 数据库的名字
Env* env,     // 平台环境
const Options& options, // 选项
TableCache* table_cache, // 表的缓存
Iterator* iter,   // 迭代器
FileMetaData* meta) // 文件元数据
{
Status s;
meta->file_size = 0;
iter->SeekToFirst();

// 构建一个表格的名字
std::string fname = TableFileName(dbname, meta->number);
if (iter->Valid()) {
WritableFile* file;
// 创建一个可写文件
s = env->NewWritableFile(fname, &file);
if (!s.ok()) {
return s;
}

// 根据选项和刚刚创建的文件,新建一个table构建器
TableBuilder* builder = new TableBuilder(options, file);

// 设置文件元数据的内容

// 遍历迭代器,把每一个迭代器指向的内存存储到table构建器中,顺便在文件元数据中记录最大最小的键值
meta->smallest.DecodeFrom(iter->key());

// iter指向内存表
for (; iter->Valid(); iter->Next())
{
Slice key = iter->key();
meta->largest.DecodeFrom(key);
// 把内存表的数据添加到SStable构建器中
builder->Add(key, iter->value());
}

// Finish and check for builder errors
if (s.ok()) {
// 构建SSTable
s = builder->Finish();
if (s.ok()) {
meta->file_size = builder->FileSize();
assert(meta->file_size > 0);
}
} else {
builder->Abandon();
}
delete builder;

// Finish and check for file errors
if (s.ok()) {
// 同步处理
s = file->Sync();
}
if (s.ok()) {
s = file->Close();
}
delete file;
file = NULL;

if (s.ok()) {
// Verify that the table is usable
// 创建一个表格cache迭代器,返回它的状态
Iterator* it = table_cache->NewIterator(ReadOptions(),
meta->number,
meta->file_size);
s = it->status();
delete it;
}
}

// Check for input iterator errors
if (!iter->status().ok()) {
s = iter->status();
}

if (s.ok() && meta->file_size > 0) {
// Keep it
} else {
env->DeleteFile(fname);
}
return s;
}
// 挑选一个层用于内存数据输出
int Version::PickLevelForMemTableOutput(
const Slice& smallest_user_key,
const Slice& largest_user_key) {
int level = 0;

// 如果第0层没有重叠的数据,那么继续往下进行查找,直到遇到一个有重复数据的层
if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key))
{
// Push to next level if there is no overlap in next level,
// and the #bytes overlapping in the level after that are limited.
InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
InternalKey limit(largest_user_key, 0, static_cast<ValueType>(0));

// 记录有重叠记录的文件
std::vector<FileMetaData*> overlaps;
while (level < config::kMaxMemCompactLevel)
{
// 递归
if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
break;
}

if (level + 2 < config::kNumLevels)
{
// Check that file does not overlap too many grandparent bytes.
// 记录有重叠数据的文件
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);

// 统计产生的重复数据
const int64_t sum = TotalFileSize(overlaps);

// 如果上层的重复数据已经达到阈值,那么可以退出循环
if (sum > kMaxGrandParentOverlapBytes) {
break;
}
}
level++;
}
}
return level;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: