您的位置:首页 > 其它

leveldb源码分析入手

2018-01-19 22:25 239 查看
上一篇文章介绍了leveldb整体架构,大致了解了下leveldb是什么。为了弄清leveldb的实现,当作学习Google开源项目的一个很好例子,决定先从leveldb基本使用方法入手。

#include <iostream>
#include "leveldb/db.h"
using namespace std;

int main()
{
leveldb::DB* db;
leveldb::Options options;
options.create_if_missing = true;
std::string dbpath = "testdb";
leveldb::Status status = leveldb::DB::Open(options, dbpath, &db);   //  1 创建一个数据库
assert(status.ok());
std::string key = "Hello";
std::string value = "world";

std::string v1 = "";
status = db->Put(leveldb::WriteOptions(), key, value);/* 2 插入key-value*/
status = db->Get(leveldb::ReadOptions(), key, &v1);/* 3 根据key查询value*/
cout<<v1<<endl;
delete db;/*删除数据库*/


从这个例子看到leveldb的三个基本使用接口,创建数据库,插入和查询。

我们先跟踪插入这个接口。

Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val);
}


WriteOptions相当于一个应用配置暂时忽悠,我们看到传入的是实参是string,而这里的Put形参是Slice。

Slice

class Slice {
public:
// Create an empty slice.
Slice() : data_(""), size_(0) { }

// Create a slice that refers to d[0,n-1].
Slice(const char* d, size_t n) : data_(d), size_(n) { }

// Create a slice that refers to the contents of "s"
Slice(const std::string& s) : data_(s.data()), size_(s.size()) { }

// Create a slice that refers to s[0,strlen(s)-1]
Slice(const char* s) : data_(s), size_(strlen(s)) { }

private:
const char* data_;
size_t size_;
};


我们先看看Slice的成员变量,data_和size_。看到这里就能想到类似string的实现,看看构造函数,就是指针的简单赋值,就是浅拷贝。所以Slice就是简单对指针一个包装,记录指针所指数据起始地址及长度。同时也避免数据拷贝。

接着跟踪DB::Put

Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}


WriteBatch 是什么东西

WriteBatch

看看WriteBatch类

class WriteBatch {
public:
WriteBatch();
~WriteBatch();

// Store the mapping "key->value" in the database.
void Put(const Slice& key, const Slice& value);
//...........其他成员函数省略
private:
friend class WriteBatchInternal;

std::string rep_;  // See comme
b71f
nt in write_batch.cc for the format of rep_

// Intentionally copyable
};


看到WriteBatch 就一个成员rep_, Put接口便是将key,value放入rep_。既然是放入肯定有一定组织结构,下面就看看WriteBatch 的结构。



先看record, 依次记录key长度 key 数据和value长度 value数据,通用存储二进制数据方法。

count: 记录key-value对数,WriteBatch可以由多个key-value组合而成,一次写入log文件提高效率。

sequence number :序列号是一个重要概念,当插入数据时,有一个全局变量SequenceNumber会随着插入key-value逐渐递增,可以看作key的计数。我们知道leveldb key-value数据覆盖时不会真正覆盖。如果先后插入key=”hello1” value=”world1” key=”hello1” value=”world2” 数据库中会有两条记录。并且可以根据sequence,查出”hello1”或者”hello2”,所以sequence 对同一key又有版本号的语义。

leveldb会将操作记录先写入log文件,再放入内存,而记录的基本方式就是WriteBatch。所以就必须通过ValueType 标识操作类型是删除还是插入,ValueType 就只有如下两种类型

enum ValueType {
kTypeDeletion = 0x0,//删除
kTypeValue = 0x1//插入
};


下面看看Put的具体实现,可以想象上面的结构图

void WriteBatch::Put(const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);//设置count
rep_.push_back(static_cast<char>(kTypeValue));//设置类型
PutLengthPrefixedSlice(&rep_, key);//追加key的大小和数据
PutLengthPrefixedSlice(&rep_, value);//追加value的大小和数据
}


PutLengthPrefixedSlice 的实现重点在于int/int64整形编解码,详情可以看看这篇文章

key -value写入WriteBatch之后 ,就是将WriteBatch写入文件,然后以某种组织方式写入Memtable,顺着这个思路阅读代码。

再跟踪到Write

// 1 将队列中batch合并 一次写入log文件
//  2 写入memtable
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
Writer w(&mutex_);
w.batch = my_batch;
w.sync = options.sync;
w.done = false;

MutexLock l(&mutex_);
writers_.push_back(&w);
// 由writes_队列中首batch(线程)处理
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
if (w.done) {//batch已经被处理
return w.status;
}

// May temporarily unlock and wait.
Status status = MakeRoomForWrite(my_batch == NULL);//1 检测memtable空间,是否需要compaction
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && my_batch != NULL) {  // NULL batch is for compactions
WriteBatch* updates = BuildBatchGroup(&last_writer);//将writes_中batch合并
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates);

// Add to log and apply to memtable.  We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock();//解锁后一个线程写入log和memtable过程中其他线程可以往writes_中写入batch
status = log_->AddRecord(WriteBatchInternal::Contents(updates));// 2 将合并后的batch写入log文件
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_);//3 写入memtable
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (updates == tmp_batch_) tmp_batch_->Clear();

versions_->SetLastSequence(last_sequence);
}

//ready到 last_write间batch都已合并
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
//last_write后如果还有batch,激活线程
// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}

return status;
}


这个函数其大意就是:假如应用层有多个线程同时写入,每个线程先将WriteBatch写入队列,应用线程都是生产者,然而没有单独的消费者,于是就在这些线程里找一个作为消费者将合并后的WriteBatch写入log及Memtable。

为什么不是每个线程都执行完整的逻辑呢?

可以合并patch,提高写入文件效率

找到关键三行代码

Status status = MakeRoomForWrite(my_batch == NULL);

status = log_->AddRecord(WriteBatchInternal::Contents(updates));

status = WriteBatchInternal::InsertInto(updates, mem_);

以后分别从这三条主线深入分析。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: