leveldb代码阅读(5)——恢复数据库的状态
2016-01-14 16:42
573 查看
1、创建一个以数据库名字命名的目录dbname
2、锁住文件锁
3、判断Current文件是否存在,Current文件指向当前使用Manifest文件
4、如果Current已经存在了,那么表示出错(啥都没干就存在这个文件,当然出错!)
5、如果Current文件不存在,而且指定了create_if_missing标志,那么就调用NewDB函数创建一个数据库;如果没有指定create_if_missing标志那么出错。
6、恢复当前版本
7、获取dbname目录下的所有文件(一般都是日志文件)
8、保留生效的文件,删除过期的文件
9、根据日志文件依次恢复数据
2、锁住文件锁
3、判断Current文件是否存在,Current文件指向当前使用Manifest文件
4、如果Current已经存在了,那么表示出错(啥都没干就存在这个文件,当然出错!)
5、如果Current文件不存在,而且指定了create_if_missing标志,那么就调用NewDB函数创建一个数据库;如果没有指定create_if_missing标志那么出错。
6、恢复当前版本
7、获取dbname目录下的所有文件(一般都是日志文件)
8、保留生效的文件,删除过期的文件
9、根据日志文件依次恢复数据
// 把数据库恢复到上次退出的状态 Status DBImpl:: Recover(VersionEdit* edit, bool *save_manifest) { // 断言是否持有锁 mutex_.AssertHeld(); // Ignore error from CreateDir since the creation of the DB is // committed only when the descriptor is created, and this directory // may already exist from a previous failed creation attempt. // 创建文件夹 env_->CreateDir(dbname_); assert(db_lock_ == NULL); // 锁住文件 Status s = env_->LockFile(LockFileName(dbname_), &db_lock_); if (!s.ok()) { return s; } // 判断Current文件是否存在 if (!env_->FileExists(CurrentFileName(dbname_))) { // 如果指定了创建标志 if (options_.create_if_missing) { // 创建新的数据库 s = NewDB(); if (!s.ok()) { return s; } } else { // 无效参数 return Status::InvalidArgument( dbname_, "does not exist (create_if_missing is false)"); } } else { // 如果Current文件已经存在了那么表示出错 if (options_.error_if_exists) { return Status::InvalidArgument( dbname_, "exists (error_if_exists is true)"); } } // 恢复 s = versions_->Recover(save_manifest); if (!s.ok()) { return s; } // 序号 SequenceNumber max_sequence(0); // Recover from all newer log files than the ones named in the // descriptor (new log files may have been added by the previous // incarnation without registering them in the descriptor). // // Note that PrevLogNumber() is no longer used, but we pay // attention to it in case we are recovering a database // produced by an older version of leveldb. const uint64_t min_log = versions_->LogNumber(); const uint64_t prev_log = versions_->PrevLogNumber(); std::vector<std::string> filenames; // 有一个以数据库名字命名的文件夹 // 这个函数是获取该目录下所有的文件名 s = env_->GetChildren(dbname_, &filenames); if (!s.ok()) { return s; } std::set<uint64_t> expected; // 这个函数实质是获取仍然存活(仍然有效)的文件 versions_->AddLiveFiles(&expected); uint64_t number; FileType type; std::vector<uint64_t> logs; // 对于dbname_文件夹下的文件,有的还有效,有的已经失效了 for (size_t i = 0; i < filenames.size(); i++) { // 解析文件名的文件编号和文件类型 if (ParseFileName(filenames[i], &number, &type)) { // 从这里删除的目的是为了最后看看还有哪些文件名是不能够解析的 expected.erase(number); // 如果这个文件还有效 if (type == kLogFile && ((number >= min_log) || (number == prev_log))) logs.push_back(number); } } // 如果这个数组不为空,那么表示有的文件名解析不了,出错! if (!expected.empty()) { char buf[50]; snprintf(buf, sizeof(buf), "%d missing files; e.g.", static_cast<int>(expected.size())); return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin()))); } // Recover in the order in which the logs were generated // 对文件名编号进行排序 std::sort(logs.begin(), logs.end()); // 恢复每一个日志文件 for (size_t i = 0; i < logs.size(); i++) { s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,&max_sequence); if (!s.ok()) { return s; } // The previous incarnation may not have written any MANIFEST // records after allocating this log number. So we manually // update the file number allocation counter in VersionSet. // 记录哪些文件编号已经被使用 versions_->MarkFileNumberUsed(logs[i]); } // 最后使用的序号更新 if (versions_->LastSequence() < max_sequence) { versions_->SetLastSequence(max_sequence); } return Status::OK(); }
// 把内存table转储到硬盘上 Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base) { mutex_.AssertHeld(); // 当前的时间 const uint64_t start_micros = env_->NowMicros(); // 定义一个文件元数据 FileMetaData meta; // 设置一个编号 meta.number = versions_->NewFileNumber(); // 把元数据的编号插入列表中以防止被意外的删掉 pending_outputs_.insert(meta.number); // 创建一个内存迭代器 Iterator* iter = mem->NewIterator(); Log(options_.info_log, "Level-0 table #%llu: started",(unsigned long long) meta.number); Status s; { mutex_.Unlock(); // 构建一张表(SSTable),并创建一个文件元数据(主要记录了这个table的大小、最大/最小键值等信息) s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); mutex_.Lock(); } Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s", (unsigned long long) meta.number, (unsigned long long) meta.file_size, s.ToString().c_str()); delete iter; // 从保护列表中删除这个元数据的编号 pending_outputs_.erase(meta.number); // Note that if file_size is zero, the file has been deleted and // should not be added to the manifest. int level = 0; if (s.ok() && meta.file_size > 0) { // 获取这个文件元数据的最小/最大键值 const Slice min_user_key = meta.smallest.user_key(); const Slice max_user_key = meta.largest.user_key(); // 如果版本不为空 if (base != NULL) { // 为内存table的转储选择一个level(后台进程会把内存表格转储到这个level上) level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } // 对level的文件进行版本变更,记录新增了哪些文件,留待后续处理 edit->AddFile(level, meta.number, meta.file_size, meta.smallest, meta.largest); } CompactionStats stats; stats.micros = env_->NowMicros() - start_micros; stats.bytes_written = meta.file_size; // 为第level层增加新状态 stats_[level].Add(stats); return s; }
// 构建SSTable Status BuildTable(const std::string& dbname, // 数据库的名字 Env* env, // 平台环境 const Options& options, // 选项 TableCache* table_cache, // 表的缓存 Iterator* iter, // 迭代器 FileMetaData* meta) // 文件元数据 { Status s; meta->file_size = 0; iter->SeekToFirst(); // 构建一个表格的名字 std::string fname = TableFileName(dbname, meta->number); if (iter->Valid()) { WritableFile* file; // 创建一个可写文件 s = env->NewWritableFile(fname, &file); if (!s.ok()) { return s; } // 根据选项和刚刚创建的文件,新建一个table构建器 TableBuilder* builder = new TableBuilder(options, file); // 设置文件元数据的内容 // 遍历迭代器,把每一个迭代器指向的内存存储到table构建器中,顺便在文件元数据中记录最大最小的键值 meta->smallest.DecodeFrom(iter->key()); // iter指向内存表 for (; iter->Valid(); iter->Next()) { Slice key = iter->key(); meta->largest.DecodeFrom(key); // 把内存表的数据添加到SStable构建器中 builder->Add(key, iter->value()); } // Finish and check for builder errors if (s.ok()) { // 构建SSTable s = builder->Finish(); if (s.ok()) { meta->file_size = builder->FileSize(); assert(meta->file_size > 0); } } else { builder->Abandon(); } delete builder; // Finish and check for file errors if (s.ok()) { // 同步处理 s = file->Sync(); } if (s.ok()) { s = file->Close(); } delete file; file = NULL; if (s.ok()) { // Verify that the table is usable // 创建一个表格cache迭代器,返回它的状态 Iterator* it = table_cache->NewIterator(ReadOptions(), meta->number, meta->file_size); s = it->status(); delete it; } } // Check for input iterator errors if (!iter->status().ok()) { s = iter->status(); } if (s.ok() && meta->file_size > 0) { // Keep it } else { env->DeleteFile(fname); } return s; }
// 挑选一个层用于内存数据输出 int Version::PickLevelForMemTableOutput( const Slice& smallest_user_key, const Slice& largest_user_key) { int level = 0; // 如果第0层没有重叠的数据,那么继续往下进行查找,直到遇到一个有重复数据的层 if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) { // Push to next level if there is no overlap in next level, // and the #bytes overlapping in the level after that are limited. InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek); InternalKey limit(largest_user_key, 0, static_cast<ValueType>(0)); // 记录有重叠记录的文件 std::vector<FileMetaData*> overlaps; while (level < config::kMaxMemCompactLevel) { // 递归 if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) { break; } if (level + 2 < config::kNumLevels) { // Check that file does not overlap too many grandparent bytes. // 记录有重叠数据的文件 GetOverlappingInputs(level + 2, &start, &limit, &overlaps); // 统计产生的重复数据 const int64_t sum = TotalFileSize(overlaps); // 如果上层的重复数据已经达到阈值,那么可以退出循环 if (sum > kMaxGrandParentOverlapBytes) { break; } } level++; } } return level; }
相关文章推荐
- redis实现分布式锁
- mysql 源码安装
- MySQL与NoSQL——SQL与NoSQL的融合
- mysql事务知识分享
- mysqlmap-config.xml
- 统计数据库表大小
- nosql
- 【SQL Server学习笔记】表和列增加注释
- SQL SERVER更改表架构
- FMDB(用SQLite存数据)
- SQL Server 全文搜索 配置、查询初体验
- SQL Server中SET赋值和SELECT赋值的区别
- sqlzoo练习答案--Self join
- 关于oracle死锁问题——ORA-01436: 用户数据中的 CONNECT BY 循环
- 如何在Linux下安装php-memcached扩展
- SQL Server数据库备份的镜像
- Mysql 关键字-保留字
- 一个防止误删MSSQL数据库的方法
- 基于sys.fn_dblog()的SQL Server日志分析过程
- SQL Where特殊的三个条件(between,in,like(字符串匹配,模糊查寻))