levelDB源码分析-SSTable:.sst文件构建与读取
2012-06-14 17:35
519 查看
.sst文件的构建是通过TableBuilder进行的,读取主要集中在TableBuilder操作(table_builder.cc)如下:
读取相关操作:
Table相当于.sst文件在内存中的映像,它保存了.sst文件的Index Block数据。
说明:
Table相当于.sst文件在内存中的映像,它保存了.sst文件的Index Block数据。
TableCache相当于所有打开的.sst文件在内存中的管理结构,内部采用LRUCache,每个打开的.sst文件在LRUCache中都有一项:map< file_number -> {file, table} >,详见《levelDB源码分析-TableCache》。
struct TableBuilder::Rep { // TableBuilder内部使用的结构,记录当前的一些状态等 Options options; Options index_block_options; WritableFile* file; // 对应的.sst文件 uint64_t offset; Status status; BlockBuilder data_block; // Data Block BlockBuilder index_block; // Index Block std::string last_key; // 添加的最后一个key,一方面用于key是否排序的判断,另一方面当写入一个Data //+ Block时记录index Block中索引项(last_key+offset+size) int64_t num_entries; // .sst文件中已经添加的key/value数量 bool closed; // Either Finish() or Abandon() has been called. // Add下一Block的第一个key/value时,才根据这个key构造一个FindShortSuccessor, // 写入Index Block中的一个entry(max_key+offset+size),是为了能够找到 // 一个更短的分割2个Block的key,从而减少存储容量; // 只有Finish中是根据最后一个Block的最后一个key构造的。 // We do not emit the index entry for a block until we have seen the // first key for the next data block. This allows us to use shorter // keys in the index block. For example, consider a block boundary // between the keys "the quick brown fox" and "the who". We can use // "the r" as the key for the index block entry since it is >= all // entries in the first block and < all entries in subsequent // blocks. // // Invariant: r->pending_index_entry is true only if data_block is empty. bool pending_index_entry; // 标识是否刚写入一个Data Block,控制在Index //+ Block中添加一项索引信息(last_key+offset+size) BlockHandle pending_handle; // Handle to add to index block std::string compressed_output; // 数据压缩 Rep(const Options& opt, WritableFile* f) // 构造函数 : options(opt), index_block_options(opt), file(f), offset(0), data_block(&options), index_block(&index_block_options), num_entries(0), closed(false), pending_index_entry(false) { index_block_options.block_restart_interval = 1; // Index Block中每个restart块只有一个record,查找方便 } };// struct TableBuilder::Rep ; TableBuilder::TableBuilder(const Options& options, WritableFile* file) : rep_(new Rep(options, file)) { } TableBuilder::~TableBuilder() { assert(rep_->closed); // Catch errors where caller forgot to call Finish() delete rep_; } Status TableBuilder::ChangeOptions(const Options& options) { // 改变配置选项 // Note: if more fields are added to Options, update // this function to catch changes that should not be allowed to // change in the middle of building a Table. if (options.comparator != rep_->options.comparator) { // 使用过程中,不能改变comparator,否则,顺序不能保证有序 return Status::InvalidArgument("changing comparator while building table"); } // Note that any live BlockBuilders point to rep_->options and therefore // will automatically pick up the updated options. rep_->options = options; rep_->index_block_options = options; rep_->index_block_options.block_restart_interval = 1; return Status::OK(); } void TableBuilder::Add(const Slice& key, const Slice& value) { // .sst文件添加一个key/value键值对 Rep* r = rep_; assert(!r->closed); if (!ok()) return; if (r->num_entries > 0) { assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0); } if (r->pending_index_entry) { // 一个旧block的结束和新的block开始 assert(r->data_block.empty()); // Add下一Data Block的第一个key/value时,才根据这个key构造一个FindShortSuccessor, // 写入Index Block中的一个entry(max_key+offset+size),是为了能够找到 // 一个更短的分割2个Block的key,从而减少存储容量; // 只有Finish中是根据最后一个Block的最后一个key构造的。 r->options.comparator->FindShortestSeparator(&r->last_key, key); // 计算max_key std::string handle_encoding; r->pending_handle.EncodeTo(&handle_encoding); r->index_block.Add(r->last_key, Slice(handle_encoding));// Index Block数据,添加刚写入.sst文件中的Data Block索引项(max_key、offset、size) r->pending_index_entry = false; } r->last_key.assign(key.data(), key.size()); // 当前最大key r->num_entries++; // 记录数量++ r->data_block.Add(key, value); // Data Block数据块添加一个key/value键值对 const size_t estimated_block_size = r->data_block.CurrentSizeEstimate(); if (estimated_block_size >= r->options.block_size) { // DataBlock容量大于设置的block size,则写入文件 Flush(); } } void TableBuilder::Flush() { // 将当前Data Block写入文件 Rep* r = rep_; assert(!r->closed); if (!ok()) return; if (r->data_block.empty()) return; assert(!r->pending_index_entry); // 向文件写入一个Block(数据及type和CRC),并设置index Block项(index Block在sst文件完毕阶段写入) // 在pending_handle中记录Index Block中对应此Block的索引项 WriteBlock(&r->data_block, &r->pending_handle); if (ok()) { r->pending_index_entry = true; // 设置标志: Add/Finish时,在Index Block中记录一项索引信息 r->status = r->file->Flush(); } } // 向文件写入一个Block(数据及type和CRC),并设置index Block项(index Block在sst文件完毕阶段写入) void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { // File format contains a sequence of blocks where each block has: // block_data: uint8 // type: uint8 // crc: uint32 assert(ok()); Rep* r = rep_; Slice raw = block->Finish(); // 添加restart信息,返回Block数据的起始位置 Slice block_contents; CompressionType type = r->options.compression; // TODO(postrelease): Support more compression options: zlib? switch (type) { case kNoCompression: block_contents = raw; break; case kSnappyCompression: { // 进行Snappy压缩 std::string* compressed = &r->compressed_output; if (port::Snappy_Compress(raw.data(), raw.size(), compressed) && compressed->size() < raw.size() - (raw.size() / 8u)) { block_contents = *compressed; } else { // Snappy not supported, or compressed less than 12.5%, so just // store uncompressed form block_contents = raw; type = kNoCompression; } break; } } handle->set_offset(r->offset); // 记录Block的索引信息-offset handle->set_size(block_contents.size()); // 记录Block的索引信息-size r->status = r->file->Append(block_contents); // Block数据写入文件 if (r->status.ok()) { char trailer[kBlockTrailerSize]; // type + crc trailer[0] = type; uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size()); crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type EncodeFixed32(trailer+1, crc32c::Mask(crc)); r->status = r->file->Append(Slice(trailer, kBlockTrailerSize)); // 写入trailer if (r->status.ok()) { r->offset += block_contents.size() + kBlockTrailerSize; } } r->compressed_output.clear(); block->Reset(); } Status TableBuilder::status() const { return rep_->status; } Status TableBuilder::Finish() { // .sst数据构造完毕,写入文件 Rep* r = rep_; Flush(); assert(!r->closed); r->closed = true; BlockHandle metaindex_block_handle; BlockHandle index_block_handle; if (ok()) { BlockBuilder meta_index_block(&r->options); // TODO(postrelease): Add stats and other meta blocks WriteBlock(&meta_index_block, &metaindex_block_handle); // 写入Meta Index Block } if (ok()) { if (r->pending_index_entry) { // Add下一Data Block的第一个key/value时,才根据这个key构造一个FindShortSuccessor, // 写入Index Block中的一个entry(max_key+offset+size),是为了能够找到 // 一个更短的分割2个Block的key,从而减少存储容量; // 只有Finish中是根据最后一个Block的最后一个key构造的。 r->options.comparator->FindShortSuccessor(&r->last_key); std::string handle_encoding; r->pending_handle.EncodeTo(&handle_encoding); r->index_block.Add(r->last_key, Slice(handle_encoding)); // 在Index Block中增加一个索引信息 r->pending_index_entry = false; } WriteBlock(&r->index_block, &index_block_handle); // 写入Index Block } if (ok()) { Footer footer; footer.set_metaindex_handle(metaindex_block_handle); footer.set_index_handle(index_block_handle); std::string footer_encoding; footer.EncodeTo(&footer_encoding); r->status = r->file->Append(footer_encoding); // 写入footer if (r->status.ok()) { r->offset += footer_encoding.size(); } } return r->status; }
读取相关操作:
Table相当于.sst文件在内存中的映像,它保存了.sst文件的Index Block数据。
struct Table::Rep { ~Rep() { delete index_block; } Options options; // 配置选项 Status status; RandomAccessFile* file; // 对应的.sst文件 uint64_t cache_id; BlockHandle metaindex_handle; // Handle to metaindex_block: saved from footer Block* index_block; // Index Block }; // 解析sstable文件(读取Footer,找到index_block_handle,然后读取Index Block数据) Status Table::Open(const Options& options, RandomAccessFile* file, uint64_t size, Table** table) { *table = NULL; if (size < Footer::kEncodedLength) { return Status::InvalidArgument("file is too short to be an sstable"); } char footer_space[Footer::kEncodedLength]; Slice footer_input; Status s = file->Read(size - Footer::kEncodedLength,// 读取Footer信息 Footer::kEncodedLength, &footer_input, footer_space); if (!s.ok()) return s; Footer footer; s = footer.DecodeFrom(&footer_input); // 解析 Footer if (!s.ok()) return s; // Read the index block Block* index_block = NULL; // index_block在ReadBlock内部分配 if (s.ok()) { s = ReadBlock(file, ReadOptions(), footer.index_handle(), &index_block); // 读取Index Block数据 } if (s.ok()) { // We've successfully read the footer and the index block: we're // ready to serve requests. Rep* rep = new Table::Rep; // 新建一个内部Rep结构 rep->options = options; rep->file = file; rep->metaindex_handle = footer.metaindex_handle(); rep->index_block = index_block; rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0); // 配置不使用BlockCache时,设置为0 *table = new Table(rep); // 新建一个Table,代表SSTable文件, // .sst文件不会全部读入,如果有需要只是根据index // block中的索引,每次读取一个Data Block } else { if (index_block) delete index_block; } return s; } Table::~Table() { delete rep_; } static void DeleteBlock(void* arg, void* ignored) { delete reinterpret_cast<Block*>(arg); } static void DeleteCachedBlock(const Slice& key, void* value) { Block* block = reinterpret_cast<Block*>(value); delete block; } static void ReleaseBlock(void* arg, void* h) { Cache* cache = reinterpret_cast<Cache*>(arg); // Cache为LRUCache接口 Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h); cache->Release(handle); } // Convert an index iterator value (i.e., an encoded BlockHandle) // into an iterator over the contents of the corresponding block. Iterator* Table::BlockReader(void* arg, // 读取指定的Block数据 const ReadOptions& options, const Slice& index_value) // index_value: BlockHandle的编码数据 { Table* table = reinterpret_cast<Table*>(arg); Cache* block_cache = table->rep_->options.block_cache; Block* block = NULL; Cache::Handle* cache_handle = NULL; BlockHandle handle; Slice input = index_value; Status s = handle.DecodeFrom(&input); // We intentionally allow extra stuff in index_value so that we // can add more features in the future. if (s.ok()) { if (block_cache != NULL) // Block Cache存在 { // 构造cache key: cache_id(8) + handle.offset(8) char cache_key_buffer[16]; EncodeFixed64(cache_key_buffer, table->rep_->cache_id); EncodeFixed64(cache_key_buffer+8, handle.offset()); Slice key(cache_key_buffer, sizeof(cache_key_buffer)); cache_handle = block_cache->Lookup(key); // LRUCache中查找 if (cache_handle != NULL) // Cache中找到 { block = reinterpret_cast<Block*>(block_cache->Value(cache_handle)); //Block数据 } else // Cache中不存在,则从.sst文件读取 { // 从.sst文件中,读取handle(offset_/size_)指定的Block数据到**block中 s = ReadBlock(table->rep_->file, options, handle, &block); if (s.ok() && options.fill_cache) { cache_handle = block_cache->Insert( // 插入LRUCache中 key, block, block->size(), &DeleteCachedBlock); } } } else // Block Cache不可用 { // 从.sst文件中,读取handle(offset_/size_)指定的Block数据到**block中 s = ReadBlock(table->rep_->file, options, handle, &block); } } Iterator* iter; if (block != NULL) { iter = block->NewIterator(table->rep_->options.comparator); // Index Block的迭代器 if (cache_handle == NULL) { // Cache中不存在 iter->RegisterCleanup(&DeleteBlock, block, NULL); } else { iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle); } } else { iter = NewErrorIterator(s); } return iter; } Iterator* Table::NewIterator(const ReadOptions& options) const { return NewTwoLevelIterator( rep_->index_block->NewIterator(rep_->options.comparator), &Table::BlockReader, const_cast<Table*>(this), options); } uint64_t Table::ApproximateOffsetOf(const Slice& key) const { // 返回key在文件中的位置(根据Index Block中的索引进行查找) Iterator* index_iter = rep_->index_block->NewIterator(rep_->options.comparator); index_iter->Seek(key); // Position at the first key in the source that at or past target uint64_t result; if (index_iter->Valid()) { BlockHandle handle; Slice input = index_iter->value(); // value为BlockHandle序列化信息 Status s = handle.DecodeFrom(&input); if (s.ok()) { result = handle.offset(); } else { // Strange: we can't decode the block handle in the index block. // We'll just return the offset of the metaindex block, which is // close to the whole file size for this case. result = rep_->metaindex_handle.offset(); } } else { // key is past the last key in the file. Approximate the offset // by returning the offset of the metaindex block (which is // right near the end of the file). result = rep_->metaindex_handle.offset(); } delete index_iter; return result; }
说明:
Table相当于.sst文件在内存中的映像,它保存了.sst文件的Index Block数据。
TableCache相当于所有打开的.sst文件在内存中的管理结构,内部采用LRUCache,每个打开的.sst文件在LRUCache中都有一项:map< file_number -> {file, table} >,详见《levelDB源码分析-TableCache》。
相关文章推荐
- leveldb源码分析之sst文件格式
- leveldb源码分析——SSTable构建
- mybatis底层源码分析之--配置文件读取和解析
- Hibernate源码分析之读取配置文件
- LevelDB源码剖析之SSTable_1(Block的创建与读取)
- MapReduce(十五): 从HDFS读取文件的源码分析
- log4j源码简要分析 | 读取配置文件
- Hadoop源码分析--HDFS读取文件
- 第二人生的源码分析(十五)Mesh文件的读取
- Paoding Rose源码分析1-读取Rose配置文件
- leveldb源码分析--SSTable之Compaction
- Java 工具(jmap,jstack)在linux上的源码分析(六) -F 参数 读取动态链接共享库文件中的符号表
- LevelDB源码分析之十六:.log文件
- Spring源码分析:Bean加载流程概览及配置文件读取
- Mybatis源码分析-读取非项目中的xml文件
- SpringBoot-Loader源码分析系列1:启动&读取MANIFEST.MF文件
- levelDB源码分析-SSTable
- leveldb源码分析——SSTable解析
- levelDB源码分析-SSTable:Block
- leveldb源码分析--SSTable之Compaction 详解