node-haystack Episode 9: Manipulate Volume
2016-09-10 14:59
513 查看
class Volumeis the manipulation class for volumes. Here only shows some crucial parts of
class Volume.
A brief of Volume
/*! \brief Class for manipulating Volume. */ class Volume { public: /** Callback for closing event. */ DECL_CB(close, int /*!< Error code */) /** Callback for append data*/ DECL_CB(append, int /*!< Error code */) /** Callback for recover */ DECL_CB(recover, int /*!< Error code */, const std::string& /*!< Status message */, const u32 /*!< Percentage of progress */) const u64 BLOCK_NUM_MAX = 4 * Mega; //!< Max blocks a Volume can hold. const u64 FILE_SIZE_MAX = 2 * Tera; // !< Max size of Volume size. const u32 WRITE_QUEUE_SIZE = 512; //!< The length of writing queue. const u64 DEF_WRITE_INTERVAL = 2 * Second; /*!< Default interval for writing new blocks: 3 seconds. */ const u64 RECOVER_BUFFER_SIZE = 128 * Mega; //!< Buffer size for recovering. .... private: bool m_has_limit; Random m_rand; //<! The random number/uuid generator. AsyncFile m_afs; //<! The async file object. Block m_block; //<! The block object. bst::atomic<u64> m_volume_size; //<! The size of Volume file, which will grow if new blocks have been appended. bst::atomic<u64> m_writed_block_num; //<! The amount of blocks have been written. AsyncTimer m_write_timer; //<! The timer block_index_map_t m_index; //<! The block index map. bst::shared_mutex m_index_mtx; //<! The mutex for block index map Block::write_queue_t* m_write_queue; //<! The queue for block writing. bst::shared_mutex m_write_queue_mtx; //<! The mutex for block writing. AsyncWorker<Block::write_queue_t> m_write_worker; //<! The async worker for block writing. }; // class
Batching writing
Batching writing has four steps. First, the data is put into queue. Second, start the timer if it didn’t start. Third, check if there are sufficient items in queue, if so, start the writing processing. Forth, if the timer event comes and the queue is not empty, start the writing anyway.Push writing request into queue
/*! \brief Append block to the end of Volume, and also assign a new key and cookie for it. \note When append a new block, if there already ware sufficient(as many as WRITE_QUEUE_SIZE) in writing queue, the writing worker will start the operation immediately. Otherwise, the new blocks will be stored in queue and waits for Next scheduling time. When the time comes, all blocks in queue will be written, no matter how many they are. */ void AppendBlock(const u128& key, const u32& cookie, const u16 tag, const u16 flag, shared_t<vec_t<char>>& data, const cb_append_t& cb) { if (m_has_limit && BlockNum() >= BLOCK_NUM_MAX) { return cb(_ERR(VOL_EXCEED_BLOCKS)); } if (m_has_limit && FileSize() + data->size() + Block::BLOCK_SIZE_MIN >= FILE_SIZE_MAX) { return cb(_ERR(VOL_EXCEED_SIZE)); } auto on_append = [&, cb](int err, const u128& key, const u128& cookie, const u16& tag, const u16& flag, const u32& size, const u64& pos) mutable { if (!err) { this->AddIndex(key, cookie, tag, flag, size, pos); } cb(err); }; // build queue value Block::write_queue_value_t qval{ key, cookie, tag, flag, (u32)data->size(), (u64)0, data, on_append }; StartWriteTimer(); u64 queue_size = 0; { slock_t sl(m_write_queue_mtx); { ulock_t ul(sl); m_write_queue->push_back(qval); } queue_size = m_write_queue->size(); } if (queue_size >= WRITE_QUEUE_SIZE) ScheduleWrite(); }
Start timer
/*! \brief Start the timer for writing scheduling. */ inline void StartWriteTimer() { if (!m_write_timer.IsStarted()) { m_write_timer.SetInterval(DEF_WRITE_INTERVAL); m_write_timer.Start(std::bind(&Volume::OnWriteTimer, this)); } }
Start writing
/*! \brief Schedule the writing work. */ inline void ScheduleWrite() { Block::write_queue_t* ptr_1 = nullptr; { slock_t sl(m_write_queue_mtx); if (!m_write_queue->size()) { LOG("Nop"); return ; } { ulock_t ul(sl); ptr_1 = m_write_queue; m_write_queue = new Block::write_queue_t; } } // Post the writing operation. m_write_worker.post(ptr_1, [&](Block::write_queue_t *ptr_2) mutable { // for(auto& it : *ptr_2) { // it.key = m_rand.uuid_to_u128(m_rand.NextUuid()); // it.cookie = m_rand.Next(); // } std::size_t qsize = ptr_2->size(); // Batch writing // FIXME: Update the volume size before post the real writing work with the calculated size of data to write. m_block.Append(m_afs, m_volume_size, ptr_2, [&](int err, u64 num, void* user_data) mutable { Block::write_queue_t* ptr_3 = reinterpret_cast<Block::write_queue_t *>(user_data); if (err) { LOG("%s", ErrorToString(err)); } else { m_volume_size += num; m_writed_block_num += ptr_3->size(); LOG("Writed blocks: %u Volume size: %u MB", (u64)m_writed_block_num, (u64)m_volume_size / (1024 * 1024)); } m_writed_block_num++; //NOTE: Release queue delete ptr_3; }); }, [&](Block::write_queue_t*, bool) { // FIXME: after_work is to difficult to fire // LOG("After work"); }); }
Periodically writing
/*! \brief Handling the writing timer event. */ void OnWriteTimer() { ScheduleWrite(); }
Recovery
As previously episode said, theclass Volumedoes the recovery. Sorry again, the callback hell makes un-readable code.
/*! \brief Recover a volume from another specifed volume. This method works asynchronously and provides progress monitorring. \param const std::string& src The source volume. \param const std::string& dst The destination volume. \param const cb_recover_t& cb Recovering callback. \return "Return of the function" */ void Recover(const std::string& src, const std::string& dst, const cb_recover_t& cb) { LOG("Recover to: %s", dst.c_str()); AsyncFile afs_src; AsyncFile afs_dst; u64 off = 0; u64 size = 0; bst::function<void()> copy; copy = [&, cb]() { afs_src.Read(off, RECOVER_BUFFER_SIZE, [&, cb](int err, shared_t<vec_t<char>>& buf, void*) { if (err) { afs_src.Close(); afs_dst.Close(); return cb(err, "", 0); } if (!buf->size()) { afs_src.Close(); afs_dst.Close(); return cb(_ERR(NOERROR), "done", 100); } afs_dst.Write(off, buf, [&, cb](int err, u64 num, void*) { if (err) { afs_src.Close(); afs_dst.Close(); return cb(err, "", 0); } cb(_ERR(NOERROR), "progress", (off * 100) / size); copy(); }); }); }; // Check source existence. afs_src.Stat(src, [&, cb](int err, const AsyncFile::file_stat_t& stat, void*) mutable { if (err) { return cb(err, "", 0); } size = stat.size; // Open source and write to destination. afs_src.Open(src, [&, cb](int err, void*) mutable { if (err) { return cb(err, "", 0); } // Remove destination if exists. if (bfs::exists(dst)) { if (!bfs::remove(dst)) { afs_src.Close(); return cb(_ERR(VOL_FAILED_TO_UNLINK), "", 0); } } // Open destination for writing. afs_dst.Open(dst, [&, cb](int err, void*) mutable { if (err) { afs_src.Close(); return cb(err, "", 0); } cb(_ERR(NOERROR), "preparing", 0); copy(); }); // dst open }); // src open }); // stat } // fn Recover
Runtime performance
Under my environment, with a test of read/write on a single volume file of 2T, the average read/write speed is 175~180MB/s: very close to the IO limitation.Type | Size/Speed | Unit |
---|---|---|
File Size | 2T | Byte |
Average Speed | 175-180 | MB/s |
Time consumption | ~3.30 | Hour |
Yet another performance testing
Today, I did a simple test to estimate how fast C++ could be, by loading the indexes of 8 million blocks(1K data for each block):... std::string vol_path = "/run/media/igame/HUGE_STORAGE/test.vol"; ... void test_read(Volume& vol) { cout<<"Load volume:"<<vol_path<<endl; vol.Load(vol_path, true, true, [&](int err) { if (err) { cout<<"load error:"<<ErrorToString(err)<<endl; return ; } TRACE("Done loading"); cout<<"Done read"<<endl; cout<<"File size:"<<vol.FileSize()<<endl; cout<<"Found index:"<<vol.BlockNum()<<endl; cout<<"Read all block's data each by each"<<endl; int err_num = 0; int prog_inv = 0xFFFF; TRACE("Export index"); block_index_map_t idx_map = vol.ExportIndex(); TRACE("Read all blocks' data"); bst::progress_display* show_progress = new bst::progress_display(vol.BlockNum() / 0xFFFF); for(const auto& v : idx_map) { block_index_value_t idx = v.second; vol.ReadBlockData(v.first, [&, idx, err_num, show_progress](int err, shared_t<vec_t<char>>& data) mutable { if (err) { cout<<"Error:"<<ErrorToString(err)<<endl; } else { if (data->size() != idx.size) { cout<<"Invalid data: expect "<<idx.size<<" but get "<<data->size()<<endl; err_num++; } } if (prog_inv-- == 0) { ++(*show_progress); prog_inv = 0xFFFF; } }); } }); // load vol } int main() { Volume vol(false); test_read(vol); uv_run(uv_default_loop(), UV_RUN_DEFAULT); cout<<"All done"<<endl; return 0; }
And with default 4-thread-pool libuv settings, get results like following:
[igame@igame-dev2 haystack++]$ ./a.out [2016-09-13 23:14:58.260158] [0x00007ff728c1f740] [trace] [hhcloud::haystack::Volume::Volume()]: Load volume:/run/media/igame/HUGE_STORAGE/test.vol [2016-09-13 23:15:46.957700] [0x00007ff728c1f740] [trace] [test_read()]:Done loading Done read File size:8925478920 Found index:8388608 Read all block's data each by each [2016-09-13 23:15:46.957744] [0x00007ff728c1f740] [trace] [test_read()]:Export index [2016-09-13 23:15:49.022758] [0x00007ff728c1f740] [trace] [test_read()]:Read all blocks' data 0% 10 20 30 40 50 60 70 80 90 100% |----|----|----|----|----|----|----|----|----|----| *************************************************** All done [2016-09-13 23:16:13.015957] [0x00007ff728c1f740] [trace] [hhcloud::haystack::Volume::~Volume()]:
Gathering results for several runs, concluded that the performance is:
Name | Value | Unit |
---|---|---|
File Size | 8.312500 | GB |
Block | 8388608 | N/A |
Average volume Loading time | 41.966909 | second |
Average block data reading time | 24.125541 | second |
Speed of volume loading | 202.826470 | MB/s |
Speed of block indexing | 0.190626 | M blocks /s |
Speed of single block data loading | 339.557152 | MB/s |
Speed of block data loading | 0.33 | M block data /s |
Indexing single block | 0.005003 | ms |
Loading data of single block | 0.002876 | ms |
The speed of single block data loading looks odd, I think it probably caused by memory cache and disk cache.
The Speed of volume loading is most the full speed of disk I/O.
By running
dd if=/dev/sdd of=/dev/null bs=128M count=1024
[igame@igame-dev2 haystack++]$ sudo dd if=/dev/sdd of=/dev/null bs=128M count=1024 [sudo] password for igame: 1024+0 records in 1024+0 records out 137438953472 bytes (137 GB) copied, 678.914 s, 202 MB/s
Fix Log
Timestamp | Description |
---|---|
Tue Sep 13 23:56:48 PDT 2016 | 1. Removed Volume::ForEach()method. 2. Added ExportIndex(). |
相关文章推荐
- node-haystack Episode 1: What is it and why
- node-haystack Episode 5: Volume
- node-haystack Episode 11: node object of Volume
- node-haystack Episode 2: Asynchronous and Threading
- node-haystack Episode 6: Data Structure And Constants
- node-haystack Episode 7: Asynchronously manipulate blocks
- node-haystack Episode 8: Simple Recovery And Verification
- node-haystack Episode - 12 : A Better Random Generator
- node-haystack Episode 3: Callback model in C++
- node-haystack Episode 12: problem of C++ closure
- node-haystack Episode-4: Wrapper of libuv
- node-haystack Episode 10: Node.js add-on
- Node.js 学习日记--我们一起来读文档~
- Node.js开发工具、开发包、框架等总结
- Node.js模块封装及使用方法
- 学习5——node.js安装及MongoDB数据库连接
- hadoop集群start-all以后只有一个datanode
- node.js笔记之订阅发布设计模式
- ListCode之Remove Nth Node From End of List
- node.js下mongoose简单操作实例