您的位置:首页 > Web前端 > Node.js

node-haystack Episode 9: Manipulate Volume

2016-09-10 14:59 513 查看
class Volume
is the manipulation class for volumes. Here only shows some crucial parts of
class Volume
.

A brief of Volume

/*!
\brief Class for manipulating Volume.
*/
class Volume {
public:
/** Callback for closing event. */
DECL_CB(close, int /*!< Error code */)
/** Callback for append data*/
DECL_CB(append, int /*!< Error code */)
/** Callback for recover */
DECL_CB(recover, int /*!< Error code */, const std::string& /*!< Status message */, const u32 /*!< Percentage of progress */)

const u64 BLOCK_NUM_MAX         = 4 * Mega; //!< Max blocks a Volume can hold.
const u64 FILE_SIZE_MAX         = 2 * Tera; // !< Max size of Volume size.
const u32 WRITE_QUEUE_SIZE      = 512;  //!< The length of writing queue.
const u64 DEF_WRITE_INTERVAL    = 2 * Second; /*!< Default interval for writing new blocks: 3 seconds. */
const u64 RECOVER_BUFFER_SIZE   = 128 * Mega; //!< Buffer size for recovering.

....

private:
bool m_has_limit;
Random m_rand;  //<! The random number/uuid generator.
AsyncFile m_afs;    //<! The async file object.
Block m_block;  //<! The block object.
bst::atomic<u64> m_volume_size; //<! The size of Volume file, which will grow if new blocks have been appended.
bst::atomic<u64> m_writed_block_num; //<! The amount of blocks have been written.
AsyncTimer m_write_timer;   //<! The timer

block_index_map_t m_index;  //<! The block index map.
bst::shared_mutex m_index_mtx;  //<! The mutex for block index map

Block::write_queue_t* m_write_queue; //<! The queue for block writing.
bst::shared_mutex m_write_queue_mtx;    //<! The mutex for block writing.

AsyncWorker<Block::write_queue_t> m_write_worker; //<! The async worker for block writing.
}; // class


Batching writing

Batching writing has four steps. First, the data is put into queue. Second, start the timer if it didn’t start. Third, check if there are sufficient items in queue, if so, start the writing processing. Forth, if the timer event comes and the queue is not empty, start the writing anyway.

Push writing request into queue

/*!
\brief Append block to the end of Volume, and also assign a new key and cookie for it.
\note When append a new block, if there already ware sufficient(as many as WRITE_QUEUE_SIZE)
in writing queue, the writing worker will start the operation immediately. Otherwise, the
new blocks will be stored in queue and waits for Next scheduling time. When the time comes,
all blocks in queue will be written, no matter how many they are.
*/
void AppendBlock(const u128& key, const u32& cookie, const u16 tag, const u16 flag, shared_t<vec_t<char>>& data, const cb_append_t& cb) {
if (m_has_limit && BlockNum() >= BLOCK_NUM_MAX) {
return cb(_ERR(VOL_EXCEED_BLOCKS));
}

if (m_has_limit && FileSize() + data->size() + Block::BLOCK_SIZE_MIN >= FILE_SIZE_MAX) {
return cb(_ERR(VOL_EXCEED_SIZE));
}

auto on_append =
[&, cb](int err, const u128& key, const u128& cookie, const u16& tag, const u16& flag, const u32& size, const u64& pos) mutable {
if (!err) {
this->AddIndex(key, cookie, tag, flag, size, pos);
}

cb(err);
};

// build queue value
Block::write_queue_value_t qval{
key, cookie, tag, flag, (u32)data->size(), (u64)0,
data,
on_append
};

StartWriteTimer();

u64 queue_size = 0;
{
slock_t sl(m_write_queue_mtx);

{
ulock_t ul(sl);

m_write_queue->push_back(qval);
}

queue_size = m_write_queue->size();
}

if (queue_size >= WRITE_QUEUE_SIZE)
ScheduleWrite();
}


Start timer

/*!
\brief Start the timer for writing scheduling.
*/
inline void StartWriteTimer() {
if (!m_write_timer.IsStarted()) {
m_write_timer.SetInterval(DEF_WRITE_INTERVAL);
m_write_timer.Start(std::bind(&Volume::OnWriteTimer, this));
}
}


Start writing

/*!
\brief Schedule the writing work.
*/
inline void ScheduleWrite() {
Block::write_queue_t* ptr_1 = nullptr;

{
slock_t sl(m_write_queue_mtx);

if (!m_write_queue->size()) {
LOG("Nop");
return ;
}

{
ulock_t ul(sl);

ptr_1 = m_write_queue;
m_write_queue = new Block::write_queue_t;
}
}

// Post the writing operation.
m_write_worker.post(ptr_1, [&](Block::write_queue_t *ptr_2) mutable {
// for(auto& it : *ptr_2) {
//  it.key = m_rand.uuid_to_u128(m_rand.NextUuid());
//  it.cookie = m_rand.Next();
// }

std::size_t qsize = ptr_2->size();

// Batch writing
// FIXME: Update the volume size before post the real writing work with the calculated size of data to write.
m_block.Append(m_afs, m_volume_size, ptr_2, [&](int err, u64 num, void* user_data) mutable {
Block::write_queue_t* ptr_3 = reinterpret_cast<Block::write_queue_t *>(user_data);

if (err) {
LOG("%s", ErrorToString(err));
} else {
m_volume_size += num;
m_writed_block_num += ptr_3->size();
LOG("Writed blocks: %u Volume size: %u MB", (u64)m_writed_block_num, (u64)m_volume_size / (1024 * 1024));
}

m_writed_block_num++;
//NOTE: Release queue
delete ptr_3;
});
}, [&](Block::write_queue_t*, bool) {
// FIXME: after_work is to difficult to fire
// LOG("After work");
});
}


Periodically writing

/*!
\brief Handling the writing timer event.
*/
void OnWriteTimer() {
ScheduleWrite();
}


Recovery

As previously episode said, the
class Volume
does the recovery. Sorry again, the callback hell makes un-readable code.

/*!
\brief Recover a volume from another specifed volume.
This method works asynchronously and provides progress monitorring.
\param const std::string& src The source volume.
\param const std::string& dst The destination volume.
\param const cb_recover_t& cb Recovering callback.
\return "Return of the function"
*/
void Recover(const std::string& src, const std::string& dst, const cb_recover_t& cb) {
LOG("Recover to: %s", dst.c_str());

AsyncFile afs_src;
AsyncFile afs_dst;
u64 off = 0;
u64 size = 0;

bst::function<void()> copy;

copy = [&, cb]() {
afs_src.Read(off, RECOVER_BUFFER_SIZE, [&, cb](int err, shared_t<vec_t<char>>& buf, void*) {
if (err) {
afs_src.Close();
afs_dst.Close();
return cb(err, "", 0);
}

if (!buf->size()) {
afs_src.Close();
afs_dst.Close();

return cb(_ERR(NOERROR), "done", 100);
}

afs_dst.Write(off, buf, [&, cb](int err, u64 num, void*) {
if (err) {
afs_src.Close();
afs_dst.Close();

return cb(err, "", 0);
}

cb(_ERR(NOERROR), "progress", (off * 100) / size);

copy();
});
});
};

// Check source existence.
afs_src.Stat(src, [&, cb](int err, const AsyncFile::file_stat_t& stat, void*) mutable {
if (err) {
return cb(err, "", 0);
}

size = stat.size;

// Open source and write to destination.
afs_src.Open(src, [&, cb](int err, void*) mutable {
if (err) {
return cb(err, "", 0);
}

// Remove destination if exists.
if (bfs::exists(dst)) {
if (!bfs::remove(dst)) {
afs_src.Close();
return cb(_ERR(VOL_FAILED_TO_UNLINK), "", 0);
}
}

// Open destination for writing.
afs_dst.Open(dst, [&, cb](int err, void*) mutable {
if (err) {
afs_src.Close();
return cb(err, "", 0);
}

cb(_ERR(NOERROR), "preparing", 0);

copy();
}); // dst open
}); // src open
}); // stat
} // fn Recover


Runtime performance

Under my environment, with a test of read/write on a single volume file of 2T, the average read/write speed is 175~180MB/s: very close to the IO limitation.

TypeSize/SpeedUnit
File Size2TByte
Average Speed175-180MB/s
Time consumption~3.30Hour

Yet another performance testing

Today, I did a simple test to estimate how fast C++ could be, by loading the indexes of 8 million blocks(1K data for each block):

...

std::string vol_path = "/run/media/igame/HUGE_STORAGE/test.vol";
...
void test_read(Volume& vol) {
cout<<"Load volume:"<<vol_path<<endl;

vol.Load(vol_path, true, true, [&](int err) {
if (err) {
cout<<"load error:"<<ErrorToString(err)<<endl;
return ;
}

TRACE("Done loading");

cout<<"Done read"<<endl;
cout<<"File size:"<<vol.FileSize()<<endl;
cout<<"Found index:"<<vol.BlockNum()<<endl;

cout<<"Read all block's data each by each"<<endl;
int err_num = 0;
int prog_inv = 0xFFFF;
TRACE("Export index");
block_index_map_t idx_map = vol.ExportIndex();
TRACE("Read all blocks' data");
bst::progress_display* show_progress = new bst::progress_display(vol.BlockNum() / 0xFFFF);

for(const auto& v : idx_map) {
block_index_value_t idx = v.second;

vol.ReadBlockData(v.first, [&, idx, err_num, show_progress](int err, shared_t<vec_t<char>>& data) mutable {
if (err) {
cout<<"Error:"<<ErrorToString(err)<<endl;
} else {
if (data->size() != idx.size) {
cout<<"Invalid data: expect "<<idx.size<<" but get "<<data->size()<<endl;
err_num++;
}
}

if (prog_inv-- == 0) {
++(*show_progress);
prog_inv = 0xFFFF;
}
});
}
}); // load vol
}

int main() {
Volume vol(false);

test_read(vol);
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
cout<<"All done"<<endl;
return 0;
}


And with default 4-thread-pool libuv settings, get results like following:

[igame@igame-dev2 haystack++]$ ./a.out
[2016-09-13 23:14:58.260158] [0x00007ff728c1f740] [trace]   [hhcloud::haystack::Volume::Volume()]:
Load volume:/run/media/igame/HUGE_STORAGE/test.vol
[2016-09-13 23:15:46.957700] [0x00007ff728c1f740] [trace]   [test_read()]:Done loading
Done read
File size:8925478920
Found index:8388608
Read all block's data each by each
[2016-09-13 23:15:46.957744] [0x00007ff728c1f740] [trace]   [test_read()]:Export index
[2016-09-13 23:15:49.022758] [0x00007ff728c1f740] [trace]   [test_read()]:Read all blocks' data

0%   10   20   30   40   50   60   70   80   90   100%
|----|----|----|----|----|----|----|----|----|----|
***************************************************
All done
[2016-09-13 23:16:13.015957] [0x00007ff728c1f740] [trace]   [hhcloud::haystack::Volume::~Volume()]:


Gathering results for several runs, concluded that the performance is:

NameValueUnit
File Size8.312500GB
Block8388608N/A
Average volume Loading time41.966909second
Average block data reading time24.125541second
Speed of volume loading202.826470MB/s
Speed of block indexing0.190626M blocks /s
Speed of single block data loading339.557152MB/s
Speed of block data loading0.33M block data /s
Indexing single block0.005003ms
Loading data of single block0.002876ms
NOTE

The speed of single block data loading looks odd, I think it probably caused by memory cache and disk cache.

The Speed of volume loading is most the full speed of disk I/O.

By running
dd if=/dev/sdd of=/dev/null bs=128M count=1024


[igame@igame-dev2 haystack++]$ sudo dd if=/dev/sdd of=/dev/null bs=128M count=1024
[sudo] password for igame:

1024+0 records in
1024+0 records out
137438953472 bytes (137 GB) copied, 678.914 s, 202 MB/s


Fix Log

TimestampDescription
Tue Sep 13 23:56:48 PDT 20161. Removed
Volume::ForEach()
method.
2. Added
ExportIndex()
.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: