您的位置:首页 > 其它

levelDB源码分析-SSTable:.sst文件构建与读取

2012-06-14 17:35 519 查看
.sst文件的构建是通过TableBuilder进行的,读取主要集中在TableBuilder操作(table_builder.cc)如下:

struct TableBuilder::Rep {					   // TableBuilder内部使用的结构,记录当前的一些状态等
Options options;
Options index_block_options;
WritableFile* file;                             // 对应的.sst文件
uint64_t offset;
Status status;
BlockBuilder data_block;                        // Data Block
BlockBuilder index_block;                       // Index Block
std::string last_key;                           // 添加的最后一个key,一方面用于key是否排序的判断,另一方面当写入一个Data
//+ Block时记录index Block中索引项(last_key+offset+size)
int64_t num_entries;                            // .sst文件中已经添加的key/value数量
bool closed;          					   // Either Finish() or Abandon() has been called.

// Add下一Block的第一个key/value时,才根据这个key构造一个FindShortSuccessor,
// 写入Index Block中的一个entry(max_key+offset+size),是为了能够找到
// 一个更短的分割2个Block的key,从而减少存储容量;
// 只有Finish中是根据最后一个Block的最后一个key构造的。
// We do not emit the index entry for a block until we have seen the
// first key for the next data block.  This allows us to use shorter
// keys in the index block.  For example, consider a block boundary
// between the keys "the quick brown fox" and "the who".  We can use
// "the r" as the key for the index block entry since it is >= all
// entries in the first block and < all entries in subsequent
// blocks.
//
// Invariant: r->pending_index_entry is true only if data_block is empty.
bool pending_index_entry;                       // 标识是否刚写入一个Data Block,控制在Index
//+ Block中添加一项索引信息(last_key+offset+size)
BlockHandle pending_handle;  // Handle to add to index block

std::string compressed_output;                  // 数据压缩

Rep(const Options& opt, WritableFile* f)        // 构造函数
: options(opt),
index_block_options(opt),
file(f),
offset(0),
data_block(&options),
index_block(&index_block_options),
num_entries(0),
closed(false),
pending_index_entry(false)
{
index_block_options.block_restart_interval = 1; // Index Block中每个restart块只有一个record,查找方便
}
};// struct TableBuilder::Rep ;

TableBuilder::TableBuilder(const Options& options, WritableFile* file)
: rep_(new Rep(options, file)) {
}

TableBuilder::~TableBuilder() {
assert(rep_->closed);  // Catch errors where caller forgot to call Finish()
delete rep_;
}

Status TableBuilder::ChangeOptions(const Options& options) {    // 改变配置选项
// Note: if more fields are added to Options, update
// this function to catch changes that should not be allowed to
// change in the middle of building a Table.
if (options.comparator != rep_->options.comparator) {       // 使用过程中,不能改变comparator,否则,顺序不能保证有序
return Status::InvalidArgument("changing comparator while building table");
}

// Note that any live BlockBuilders point to rep_->options and therefore
// will automatically pick up the updated options.
rep_->options = options;
rep_->index_block_options = options;
rep_->index_block_options.block_restart_interval = 1;
return Status::OK();
}

void TableBuilder::Add(const Slice& key, const Slice& value) {  // .sst文件添加一个key/value键值对
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->num_entries > 0) {
assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
}

if (r->pending_index_entry) { // 一个旧block的结束和新的block开始
assert(r->data_block.empty());
// Add下一Data Block的第一个key/value时,才根据这个key构造一个FindShortSuccessor,
// 写入Index Block中的一个entry(max_key+offset+size),是为了能够找到
// 一个更短的分割2个Block的key,从而减少存储容量;
// 只有Finish中是根据最后一个Block的最后一个key构造的。
r->options.comparator->FindShortestSeparator(&r->last_key, key);	// 计算max_key
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));// Index Block数据,添加刚写入.sst文件中的Data Block索引项(max_key、offset、size)
r->pending_index_entry = false;
}

r->last_key.assign(key.data(), key.size());                 // 当前最大key
r->num_entries++;                                           // 记录数量++
r->data_block.Add(key, value);                              // Data Block数据块添加一个key/value键值对

const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
if (estimated_block_size >= r->options.block_size) {        // DataBlock容量大于设置的block size,则写入文件
Flush();
}
}

void TableBuilder::Flush() {                                    // 将当前Data Block写入文件
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->data_block.empty()) return;
assert(!r->pending_index_entry);
// 向文件写入一个Block(数据及type和CRC),并设置index Block项(index Block在sst文件完毕阶段写入)
// 在pending_handle中记录Index Block中对应此Block的索引项
WriteBlock(&r->data_block, &r->pending_handle);
if (ok()) {
r->pending_index_entry = true;                          // 设置标志: Add/Finish时,在Index Block中记录一项索引信息
r->status = r->file->Flush();
}
}
// 向文件写入一个Block(数据及type和CRC),并设置index Block项(index Block在sst文件完毕阶段写入)
void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
// File format contains a sequence of blocks where each block has:
//    block_data: uint8

//    type: uint8
//    crc: uint32
assert(ok());
Rep* r = rep_;
Slice raw = block->Finish();                                        // 添加restart信息,返回Block数据的起始位置

Slice block_contents;
CompressionType type = r->options.compression;
// TODO(postrelease): Support more compression options: zlib?
switch (type) {
case kNoCompression:
block_contents = raw;
break;

case kSnappyCompression: {                                      // 进行Snappy压缩
std::string* compressed = &r->compressed_output;
if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// Snappy not supported, or compressed less than 12.5%, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
}
handle->set_offset(r->offset);                                      // 记录Block的索引信息-offset
handle->set_size(block_contents.size());                            // 记录Block的索引信息-size
r->status = r->file->Append(block_contents);                        // Block数据写入文件
if (r->status.ok()) {
char trailer[kBlockTrailerSize];                                // type + crc
trailer[0] = type;
uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
crc = crc32c::Extend(crc, trailer, 1);   // Extend crc to cover block type
EncodeFixed32(trailer+1, crc32c::Mask(crc));
r->status = r->file->Append(Slice(trailer, kBlockTrailerSize)); // 写入trailer
if (r->status.ok()) {
r->offset += block_contents.size() + kBlockTrailerSize;
}
}
r->compressed_output.clear();
block->Reset();
}

Status TableBuilder::status() const {
return rep_->status;
}

Status TableBuilder::Finish() {                                         // .sst数据构造完毕,写入文件
Rep* r = rep_;
Flush();
assert(!r->closed);
r->closed = true;
BlockHandle metaindex_block_handle;
BlockHandle index_block_handle;
if (ok()) {
BlockBuilder meta_index_block(&r->options);
// TODO(postrelease): Add stats and other meta blocks
WriteBlock(&meta_index_block, &metaindex_block_handle);         // 写入Meta Index Block
}
if (ok()) {
if (r->pending_index_entry) {
// Add下一Data Block的第一个key/value时,才根据这个key构造一个FindShortSuccessor,
// 写入Index Block中的一个entry(max_key+offset+size),是为了能够找到
// 一个更短的分割2个Block的key,从而减少存储容量;
// 只有Finish中是根据最后一个Block的最后一个key构造的。
r->options.comparator->FindShortSuccessor(&r->last_key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));    // 在Index Block中增加一个索引信息
r->pending_index_entry = false;
}
WriteBlock(&r->index_block, &index_block_handle);               // 写入Index Block
}
if (ok()) {
Footer footer;
footer.set_metaindex_handle(metaindex_block_handle);
footer.set_index_handle(index_block_handle);
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
r->status = r->file->Append(footer_encoding);                   // 写入footer
if (r->status.ok()) {
r->offset += footer_encoding.size();
}
}
return r->status;
}


读取相关操作:

Table相当于.sst文件在内存中的映像,它保存了.sst文件的Index Block数据。

struct Table::Rep {
~Rep() {
delete index_block;
}

Options options;                                    // 配置选项
Status status;
RandomAccessFile* file;                             // 对应的.sst文件
uint64_t cache_id;

BlockHandle metaindex_handle;                       // Handle to metaindex_block: saved from footer
Block* index_block;                                 // Index Block
};

// 解析sstable文件(读取Footer,找到index_block_handle,然后读取Index Block数据)
Status Table::Open(const Options& options,
RandomAccessFile* file,
uint64_t size,
Table** table)
{
*table = NULL;
if (size < Footer::kEncodedLength) {
return Status::InvalidArgument("file is too short to be an sstable");
}

char footer_space[Footer::kEncodedLength];
Slice footer_input;

Status s = file->Read(size - Footer::kEncodedLength,// 读取Footer信息
Footer::kEncodedLength, &footer_input, footer_space);
if (!s.ok()) return s;

Footer footer;
s = footer.DecodeFrom(&footer_input);               // 解析 Footer
if (!s.ok()) return s;

// Read the index block
Block* index_block = NULL;                          // index_block在ReadBlock内部分配
if (s.ok()) {
s = ReadBlock(file, ReadOptions(),
footer.index_handle(), &index_block);   // 读取Index Block数据
}

if (s.ok()) {
// We've successfully read the footer and the index block: we're
// ready to serve requests.
Rep* rep = new Table::Rep;                      // 新建一个内部Rep结构
rep->options = options;
rep->file = file;
rep->metaindex_handle = footer.metaindex_handle();
rep->index_block = index_block;
rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0); // 配置不使用BlockCache时,设置为0
*table = new Table(rep);                        // 新建一个Table,代表SSTable文件,
// .sst文件不会全部读入,如果有需要只是根据index
// block中的索引,每次读取一个Data Block
}
else
{
if (index_block) delete index_block;
}

return s;
}

Table::~Table() {
delete rep_;
}

static void DeleteBlock(void* arg, void* ignored) {
delete reinterpret_cast<Block*>(arg);
}

static void DeleteCachedBlock(const Slice& key, void* value) {
Block* block = reinterpret_cast<Block*>(value);
delete block;
}

static void ReleaseBlock(void* arg, void* h) {
Cache* cache = reinterpret_cast<Cache*>(arg);       					// Cache为LRUCache接口
Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h);
cache->Release(handle);
}

// Convert an index iterator value (i.e., an encoded BlockHandle)
// into an iterator over the contents of the corresponding block.
Iterator* Table::BlockReader(void* arg,									// 读取指定的Block数据
const ReadOptions& options,
const Slice& index_value)                  					// index_value: BlockHandle的编码数据
{
Table* table = reinterpret_cast<Table*>(arg);
Cache* block_cache = table->rep_->options.block_cache;
Block* block = NULL;
Cache::Handle* cache_handle = NULL;

BlockHandle handle;
Slice input = index_value;
Status s = handle.DecodeFrom(&input);
// We intentionally allow extra stuff in index_value so that we
// can add more features in the future.

if (s.ok())
{
if (block_cache != NULL)                        					// Block Cache存在
{
// 构造cache key: cache_id(8) + handle.offset(8)
char cache_key_buffer[16];
EncodeFixed64(cache_key_buffer, table->rep_->cache_id);
EncodeFixed64(cache_key_buffer+8, handle.offset());
Slice key(cache_key_buffer, sizeof(cache_key_buffer));
cache_handle = block_cache->Lookup(key);    					// LRUCache中查找
if (cache_handle != NULL)                   					// Cache中找到
{
block = reinterpret_cast<Block*>(block_cache->Value(cache_handle)); //Block数据
}
else                                        					// Cache中不存在,则从.sst文件读取
{
// 从.sst文件中,读取handle(offset_/size_)指定的Block数据到**block中
s = ReadBlock(table->rep_->file, options, handle, &block);
if (s.ok() && options.fill_cache)
{
cache_handle = block_cache->Insert( 					// 插入LRUCache中
key, block, block->size(), &DeleteCachedBlock);
}
}
}
else                                           					// Block Cache不可用
{
// 从.sst文件中,读取handle(offset_/size_)指定的Block数据到**block中
s = ReadBlock(table->rep_->file, options, handle, &block);
}
}

Iterator* iter;
if (block != NULL) {
iter = block->NewIterator(table->rep_->options.comparator); 		// Index Block的迭代器
if (cache_handle == NULL) {                     					// Cache中不存在
iter->RegisterCleanup(&DeleteBlock, block, NULL);
} else {
iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);
}
} else {
iter = NewErrorIterator(s);
}
return iter;
}

Iterator* Table::NewIterator(const ReadOptions& options) const {
return NewTwoLevelIterator(
rep_->index_block->NewIterator(rep_->options.comparator),
&Table::BlockReader, const_cast<Table*>(this), options);
}

uint64_t Table::ApproximateOffsetOf(const Slice& key) const {   			// 返回key在文件中的位置(根据Index Block中的索引进行查找)
Iterator* index_iter =
rep_->index_block->NewIterator(rep_->options.comparator);
index_iter->Seek(key);      // Position at the first key in the source that at or past target
uint64_t result;
if (index_iter->Valid())
{
BlockHandle handle;
Slice input = index_iter->value();                      // value为BlockHandle序列化信息
Status s = handle.DecodeFrom(&input);
if (s.ok()) {
result = handle.offset();
} else {
// Strange: we can't decode the block handle in the index block.
// We'll just return the offset of the metaindex block, which is
// close to the whole file size for this case.
result = rep_->metaindex_handle.offset();
}
} else {
// key is past the last key in the file.  Approximate the offset
// by returning the offset of the metaindex block (which is
// right near the end of the file).
result = rep_->metaindex_handle.offset();
}
delete index_iter;
return result;
}


说明:

Table相当于.sst文件在内存中的映像,它保存了.sst文件的Index Block数据。

TableCache相当于所有打开的.sst文件在内存中的管理结构,内部采用LRUCache,每个打开的.sst文件在LRUCache中都有一项:map< file_number -> {file, table} >,详见《levelDB源码分析-TableCache》。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: