您的位置:首页 > 编程语言

leveldb代码阅读(14)——Level和Compaction

2016-01-15 15:18 357 查看
原文地址:http://www.blogjava.net/sandy/archive/2012/03/15/leveldb6.html

leveldb之所以使用level作为数据库名称,精华就在于level的设计。

本质是一种归并排序算法。

这样设计的好处主要是可以减少compaction的次数和每次的文件个数。

Compaction

为什么要compaction?

compaction可以提高数据的查询效率,没有经过compaction,需要从很多SST file去查找,而做过compaction后,只需要从有限的SST文件去查找,大大的提高了随机查询的效率,另外也可以删除过期数据。

什么时候可能进行compaction?

1. database open的时候

2. write的时候

3. read的时候?

<db/dbimpl.cc>

//是否要进行compaction

void DBImpl::MaybeScheduleCompaction() {

mutex_.AssertHeld();

if (bg_compaction_scheduled_) { //已经在进行

} else if (shutting_down_.Acquire_Load()) {

} else if (imm_ == NULL &&

manual_compaction_ == NULL &&

!versions_->NeedsCompaction()) {

//imm_为NULL:没有memtable需要flush

//manual_compaction_:手动compaction

} else {

bg_compaction_scheduled_ = true;

env_->Schedule(&DBImpl::BGWork, this);

}

}

<db/version_set.h>

bool NeedsCompaction() const {

Version* v = current_;

return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL);

}

如何计算这个compaction_score呢?看下面的代码:

<db/version_set.cc>

void VersionSet::Finalize(Version* v) {

int best_level = -1;

double best_score = -1;

//遍历所有的level

for (int level = 0; level < config::kNumLevels-1; level++) {

double score;

if (level == 0) {

//对于level 0,计算当前文件个数和预定义的compaction trigger value(Default:4)之比

score = v->files_[level].size() /

static_cast<double>(config::kL0_CompactionTrigger);

} else {

//对于其他level,计算level文件大小和level应有的大小(10^N MB)

const uint64_t level_bytes = TotalFileSize(v->files_[level]);

score = static_cast<double>(level_bytes) / MaxBytesForLevel(level);

}

//找出最需要compaction的level

if (score > best_score) {

best_level = level;

best_score = score;

}

}

v->compaction_level_ = best_level;

v->compaction_score_ = best_score;

}

如何做compaction?

leveldb 运行会启动一个background thread,会执行一些background task,compaction就在这个线程中执行。

首先来看看compaction对象如何定义的

<db/version_set.h>

//关于compaction的一些信息

class Compaction {

public:

~Compaction();

//compaction Level:会将N层N+1层合并生成N+1文件

int level() const { return level_; }

//返回VersionEdit,用于记录到manifest

VersionEdit* edit() { return &edit_; }

//返回N层或者N+1层的文件个数,which = 0,1

int num_input_files(int which) const { return inputs_[which].size(); }

//返回具体的文件信息,which:level

FileMetaData* input(int which, int i) const { return inputs_[which][i]; }

//本次compaction最大输出字节

uint64_t MaxOutputFileSize() const { return max_output_file_size_; }

//是否只需要移动文件进行compaction,不需要merge和split

bool IsTrivialMove() const;

//把input都当成delete写入edit

void AddInputDeletions(VersionEdit* edit);

// Returns true if the information we have available guarantees that

// the compaction is producing data in "level+1" for which no data exists

// in levels greater than "level+1".

bool IsBaseLevelForKey(const Slice& user_key);

// Returns true iff we should stop building the current output

// before processing "internal_key".

bool ShouldStopBefore(const Slice& internal_key);

// Release the input version for the compaction, once the compaction

// is successful.

void ReleaseInputs();

private:

friend class Version;

friend class VersionSet;

explicit Compaction(int level);

int level_;

uint64_t max_output_file_size_;

Version* input_version_;

VersionEdit edit_;

// Each compaction reads inputs from "level_" and "level_+1"

std::vector<FileMetaData*> inputs_[2]; // The two sets of inputs

// State used to check for number of of overlapping grandparent files

// (parent == level_ + 1, grandparent == level_ + 2)

std::vector<FileMetaData*> grandparents_;

size_t grandparent_index_; // Index in grandparent_starts_

bool seen_key_; // Some output key has been seen

int64_t overlapped_bytes_; // Bytes of overlap between current output

// and grandparent files

// State for implementing IsBaseLevelForKey

// level_ptrs_ holds indices into input_version_->levels_: our state

// is that we are positioned at one of the file ranges for each

// higher level than the ones involved in this compaction (i.e. for

// all L >= level_ + 2).

size_t level_ptrs_[config::kNumLevels];

};

Compaction Thread

<db/dbimpl.cc>

void DBImpl::BackgroundCompaction() {

mutex_.AssertHeld();

//把memtable flush到sstable

if (imm_ != NULL) {

CompactMemTable();

return;

}

Compaction* c;

bool is_manual = (manual_compaction_ != NULL);

InternalKey manual_end;

if (is_manual) { //手动compaction

ManualCompaction* m = manual_compaction_;

//根据range来做compaction

c = versions_->CompactRange(m->level, m->begin, m->end);

m->done = (c == NULL);

if (c != NULL) {

manual_end = c->input(0, c->num_input_files(0) - 1)->largest;

}

Log(options_.info_log,

"Manual compaction at level-%d from %s .. %s; will stop at %s\n",

m->level,

(m->begin ? m->begin->DebugString().c_str() : "(begin)"),

(m->end ? m->end->DebugString().c_str() : "(end)"),

(m->done ? "(end)" : manual_end.DebugString().c_str()));

} else {

//找到需要compaction的level&file

c = versions_->PickCompaction();

}

Status status;

if (c == NULL) {

// Nothing to do

} else if (!is_manual && c->IsTrivialMove()) { //只需要移动sst file

// Move file to next level

assert(c->num_input_files(0) == 1);

FileMetaData* f = c->input(0, 0);

c->edit()->DeleteFile(c->level(), f->number);

c->edit()->AddFile(c->level() + 1, f->number, f->file_size,

f->smallest, f->largest);

status = versions_->LogAndApply(c->edit(), &mutex_);

VersionSet::LevelSummaryStorage tmp;

Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",

static_cast<unsigned long long>(f->number),

c->level() + 1,

static_cast<unsigned long long>(f->file_size),

status.ToString().c_str(),

versions_->LevelSummary(&tmp));

} else {//完成compaction

CompactionState* compact = new CompactionState(c);

status = DoCompactionWork(compact);

CleanupCompaction(compact);

c->ReleaseInputs();

DeleteObsoleteFiles();

}

delete c;

if (status.ok()) {

// Done

} else if (shutting_down_.Acquire_Load()) {

// Ignore compaction errors found during shutting down

} else {

Log(options_.info_log,

"Compaction error: %s", status.ToString().c_str());

if (options_.paranoid_checks && bg_error_.ok()) {

bg_error_ = status;

}

}

if (is_manual) {

ManualCompaction* m = manual_compaction_;

if (!status.ok()) {

m->done = true;

}

if (!m->done) {

// We only compacted part of the requested range. Update *m

// to the range that is left to be compacted.

m->tmp_storage = manual_end;

m->begin = &m->tmp_storage;

}

manual_compaction_ = NULL;

}

}

compaction memtable:写一个level0文件,并写入manifest log

Status DBImpl::CompactMemTable() {

mutex_.AssertHeld();

assert(imm_ != NULL);

VersionEdit edit;

Version* base = versions_->current();

base->Ref();

//写入level0 sst table

Status s = WriteLevel0Table(imm_, &edit, base);

base->Unref();

if (s.ok() && shutting_down_.Acquire_Load()) {

s = Status::IOError("Deleting DB during memtable compaction");

}

// Replace immutable memtable with the generated Table

if (s.ok()) {

edit.SetPrevLogNumber(0);

edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed

//生成edit并计入manifest log

s = versions_->LogAndApply(&edit, &mutex_);

}

if (s.ok()) {

// Commit to the new state

imm_->Unref();

imm_ = NULL;

has_imm_.Release_Store(NULL);

DeleteObsoleteFiles();

}

return s;

}

下面来看看compaction已有文件:

找出要compaction的文件:

<db/version_set.cc>

Compaction* VersionSet::PickCompaction() {

Compaction* c;

int level;

//是否需要compaction,有两种compaction,一种基于size大小,另外一种基于被seek的次数

const bool size_compaction = (current_->compaction_score_ >= 1);

const bool seek_compaction = (current_->file_to_compact_ != NULL);

if (size_compaction) {

level = current_->compaction_level_;

assert(level >= 0);

assert(level+1 < config::kNumLevels);

c = new Compaction(level);

//每一层有一个compact_pointer,用于记录compaction key,这样可以进行循环compaction

for (size_t i = 0; i < current_->files_[level].size(); i++) {

FileMetaData* f = current_->files_[level][i];

if (compact_pointer_[level].empty() ||

icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {

//找到一个文件就可以了

c->inputs_[0].push_back(f);

break;

}

}

if (c->inputs_[0].empty()) {

// Wrap-around to the beginning of the key space

c->inputs_[0].push_back(current_->files_[level][0]);

}

} else if (seek_compaction) {

level = current_->file_to_compact_level_;

c = new Compaction(level);

c->inputs_[0].push_back(current_->file_to_compact_);

} else {

return NULL;

}

c->input_version_ = current_;

c->input_version_->Ref();

// level 0:特殊处理,因为可能有key 重叠,把所有重叠都找出来,一起做compaction

if (level == 0) {

InternalKey smallest, largest;

GetRange(c->inputs_[0], &smallest, &largest);

// Note that the next call will discard the file we placed in

// c->inputs_[0] earlier and replace it with an overlapping set

// which will include the picked file.

current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);

assert(!c->inputs_[0].empty());

}

//找到level N+1需要compaction的文件

SetupOtherInputs(c);

return c;

}

<db/version_set.cc>

void VersionSet::SetupOtherInputs(Compaction* c) {

const int level = c->level();

InternalKey smallest, largest;

GetRange(c->inputs_[0], &smallest, &largest);

//找到所有在Level N+1层有重叠的文件

current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]);

//取出key的范围

InternalKey all_start, all_limit;

GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);

//检查是否能从Level N找到更多的文件

if (!c->inputs_[1].empty()) {

std::vector<FileMetaData*> expanded0;

current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);

const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);

const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);

const int64_t expanded0_size = TotalFileSize(expanded0);

if (expanded0.size() > c->inputs_[0].size() &&

inputs1_size + expanded0_size < kExpandedCompactionByteSizeLimit) {

InternalKey new_start, new_limit;

GetRange(expanded0, &new_start, &new_limit);

std::vector<FileMetaData*> expanded1;

current_->GetOverlappingInputs(level+1, &new_start, &new_limit,

&expanded1);

if (expanded1.size() == c->inputs_[1].size()) {

Log(options_->info_log,

"Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n",

level,

int(c->inputs_[0].size()),

int(c->inputs_[1].size()),

long(inputs0_size), long(inputs1_size),

int(expanded0.size()),

int(expanded1.size()),

long(expanded0_size), long(inputs1_size));

smallest = new_start;

largest = new_limit;

c->inputs_[0] = expanded0;

c->inputs_[1] = expanded1;

GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);

}

}

}

// Compute the set of grandparent files that overlap this compaction

// (parent == level+1; grandparent == level+2)

if (level + 2 < config::kNumLevels) {

current_->GetOverlappingInputs(level + 2, &all_start, &all_limit,

&c->grandparents_);

}

if (false) {

Log(options_->info_log, "Compacting %d '%s' .. '%s'",

level,

smallest.DebugString().c_str(),

largest.DebugString().c_str());

}

//设置新的compact_pointer

compact_pointer_[level] = largest.Encode().ToString();

c->edit_.SetCompactPointer(level, largest);

}

do compaction task:

Status DBImpl::DoCompactionWork(CompactionState* compact) {

const uint64_t start_micros = env_->NowMicros();

int64_t imm_micros = 0; // Micros spent doing imm_ compactions

Log(options_.info_log, "Compacting %d@%d + %d@%d files",

compact->compaction->num_input_files(0),

compact->compaction->level(),

compact->compaction->num_input_files(1),

compact->compaction->level() + 1);

assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);

assert(compact->builder == NULL);

assert(compact->outfile == NULL);

if (snapshots_.empty()) {

compact->smallest_snapshot = versions_->LastSequence();

} else {

compact->smallest_snapshot = snapshots_.oldest()->number_;

}

// Release mutex while we're actually doing the compaction work

mutex_.Unlock();

//生成iterator:遍历要compaction的数据

Iterator* input = versions_->MakeInputIterator(compact->compaction);

input->SeekToFirst();

Status status;

ParsedInternalKey ikey;

std::string current_user_key;

bool has_current_user_key = false;

SequenceNumber last_sequence_for_key = kMaxSequenceNumber;

for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {

// 如果有memtable要compaction:优先去做

if (has_imm_.NoBarrier_Load() != NULL) {

const uint64_t imm_start = env_->NowMicros();

mutex_.Lock();

if (imm_ != NULL) {

CompactMemTable();

bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary

}

mutex_.Unlock();

imm_micros += (env_->NowMicros() - imm_start);

}

Slice key = input->key();

//检查是不是中途输出compaction的结果,避免compaction结果和level N+2 files有过多的重叠

if (compact->compaction->ShouldStopBefore(key) &&

compact->builder != NULL) {

status = FinishCompactionOutputFile(compact, input);

if (!status.ok()) {

break;

}

}

// Handle key/value, add to state, etc.

bool drop = false;

if (!ParseInternalKey(key, &ikey)) {

// Do not hide error keys

current_user_key.clear();

has_current_user_key = false;

last_sequence_for_key = kMaxSequenceNumber;

} else {

if (!has_current_user_key ||

user_comparator()->Compare(ikey.user_key,

Slice(current_user_key)) != 0) {

// First occurrence of this user key

current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());

has_current_user_key = true;

last_sequence_for_key = kMaxSequenceNumber;

}

if (last_sequence_for_key <= compact->smallest_snapshot) {

// Hidden by an newer entry for same user key

drop = true; // (A)

} else if (ikey.type == kTypeDeletion &&

ikey.sequence <= compact->smallest_snapshot &&

compact->compaction->IsBaseLevelForKey(ikey.user_key)) {

// For this user key:

// (1) there is no data in higher levels

// (2) data in lower levels will have larger sequence numbers

// (3) data in layers that are being compacted here and have

// smaller sequence numbers will be dropped in the next

// few iterations of this loop (by rule (A) above).

// Therefore this deletion marker is obsolete and can be dropped.

drop = true;

}

last_sequence_for_key = ikey.sequence;

}

if (!drop) {

// Open output file if necessary

if (compact->builder == NULL) {

status = OpenCompactionOutputFile(compact);

if (!status.ok()) {

break;

}

}

if (compact->builder->NumEntries() == 0) {

compact->current_output()->smallest.DecodeFrom(key);

}

compact->current_output()->largest.DecodeFrom(key);

compact->builder->Add(key, input->value());

// 达到sst文件大小,重新写文件

if (compact->builder->FileSize() >=

compact->compaction->MaxOutputFileSize()) {

status = FinishCompactionOutputFile(compact, input);

if (!status.ok()) {

break;

}

}

}

input->Next();

}

if (status.ok() && shutting_down_.Acquire_Load()) {

status = Status::IOError("Deleting DB during compaction");

}

if (status.ok() && compact->builder != NULL) {

status = FinishCompactionOutputFile(compact, input);

}

if (status.ok()) {

status = input->status();

}

delete input;

input = NULL;

//更新compaction的一些统计数据

CompactionStats stats;

stats.micros = env_->NowMicros() - start_micros - imm_micros;

for (int which = 0; which < 2; which++) {

for (int i = 0; i < compact->compaction->num_input_files(which); i++) {

stats.bytes_read += compact->compaction->input(which, i)->file_size;

}

}

for (size_t i = 0; i < compact->outputs.size(); i++) {

stats.bytes_written += compact->outputs[i].file_size;

}

mutex_.Lock();

stats_[compact->compaction->level() + 1].Add(stats);

if (status.ok()) {

status = InstallCompactionResults(compact);

}

VersionSet::LevelSummaryStorage tmp;

Log(options_.info_log,

"compacted to: %s", versions_->LevelSummary(&tmp));

return status;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: