node-haystack Episode 7: Asynchronously manipulate blocks
2016-09-10 14:31
399 查看
Asynchronous operations can drive the hardware working in full speed, or almost.
Let’s take a look at the member varaiables of class block:
Because any step when reading blocks can fail, there are a lot of error probing code and recovering-from-corruption code.
Class Block
Exactly, manipulating the blocks happen inclass Block.
Let’s take a look at the member varaiables of class block:
class Block { public: DECL_CB(read_info, int /*!< Error code. */, u64& /*!< Position of block. */, const block_info_t& /*!< Block index. */) DECL_CB(read_data, int /*!< Error code. */, shared_t<vec_t<char>>& /*!< Block data. */) DECL_CB(append, int /*!< Error code. */, const u128& /*!< Key of block. */, const u32& /*!< Cookie of block. */, const u16& /*!< Tag data of block. */, const u16& /*!< Flag of block. */, const u32& /*!< Size of block data. */, const u64& /*!< Position of block in file. */) DECL_CB(load_complete, int /*!< Error code. */) DECL_CB(remove, int /*!< Error code. */) DECL_CB(verify, int /*!< Error code. */, const std::string& /*!< Status message. */, u32 /*!< Percentage of progress. */) typedef struct { u128 key; u32 cookie; u16 tag; u16 flag; u32 size; u64 pos; shared_t<vec_t<char>> data; cb_append_t cb_append; } write_queue_value_t; using wq_val_t = write_queue_value_t; using write_queue_t = std::list<wq_val_t>; private: typedef struct { Block* sender; vec_t<char> data; block_info_t info; u64 pos; bool validated; cb_read_info_t cb_read; } read_queue_value_t; using rq_val_t = read_queue_value_t; using parse_worker_t = AsyncWorker<rq_val_t>; .... private: shared_t<vec_t<char>> m_array_of_removal_flag; bst::mutex m_parse_mtx; bst::condition_variable m_cv_parse; bst::atomic<u64> m_posted_parse_work_num; parse_worker_t m_parser; parse_worker_t::cb_work_t m_cb_parse; parse_worker_t::cb_after_work_t m_cb_after_parse; }; // class block
Async load
To load the volume as fast as possible, the loading operation has to separate into two parts: one part reads the block wholly, another one is working in work queue parsing the block and fire the callback.Because any step when reading blocks can fail, there are a lot of error probing code and recovering-from-corruption code.
void LoadBlockInfo(bfs::fstream& fs, const cb_read_info_t& cb_read, const cb_load_complete_t& cb_done) { vec_t<char> buffer; u64 blk_pos, buf_cur, buf_beg; char* ptr; char* pfooter; size_t data_size; size_t blk_size = BLOCK_SIZE_MIN; size_t padding; bool data_drained; u32 err_count = 0; buf_cur = buf_beg = fs.tellg(); do { if (m_posted_parse_work_num >= PARSE_QUEUE_LEN) { bst::mutex::scoped_lock lock(m_parse_mtx); // LOG("Waiting..."); m_cv_parse.wait(lock, [&]() mutable { // LOG("Check posted works: %u", (u64)m_posted_parse_work_num); return m_posted_parse_work_num == 0; }); } if (!buffer.size() || buffer.size() - (buf_cur - buf_beg) < blk_size) { // Refill buffer fs.seekg(buf_cur - fs.tellg(), fs.cur); if (buffer.size() < READ_BUFFER_SIZE) buffer.resize(READ_BUFFER_SIZE); fs.read(&buffer[0], READ_BUFFER_SIZE); if (fs.eof() && fs.gcount() == 0) { cb_done(0); break; } if (fs.gcount() < BLOCK_SIZE_MIN) { LOG("Failed to read block: expect %u bytes but get %u at position 0x%.8x", BLOCK_SIZE_MIN, fs.gcount(), buf_cur); cb_done(_ERR(VOL_READ_BLOCK)); break; } // Resize buffer to fit if (fs.gcount() != READ_BUFFER_SIZE) { buffer.resize(fs.gcount()); } buf_beg = buf_cur; } // if need to refill buffer blk_pos = buf_cur; assert(buf_cur >= buf_beg); ptr = &buffer[buf_cur - buf_beg]; if (*(u32 *)ptr != MAGIC_NUM_HEADER) { LOG("Invalid header magic(%u): 0x%.8x/%.8x at position 0x%.8x", err_count, *(u32 *)ptr, MAGIC_NUM_HEADER, buf_cur); // Attempt to recover from failure. char* pend = &buffer[0] + buffer.size(); if (err_count < MAX_ERROR && ptr < pend) { err_count++; // Find next MAGIC_NUM_HEADER // All data must align at 4 bytes. ptr += 4; bool found = false; while(ptr < pend) { if (*(u32 *)ptr == MAGIC_NUM_HEADER) { break; } ptr += 4; } // Force refill if (ptr >= pend) { buf_cur = buf_beg + buffer.size(); continue; } // Can not recover from error. if (*(u32 *)ptr != MAGIC_NUM_HEADER) { cb_done(_ERR(VOL_INVALID_BLOCK_HEADER)); break; } assert(ptr < pend - 4); assert(*(u32 *)ptr == MAGIC_NUM_HEADER); buf_cur += ptr - &buffer[buf_cur - buf_beg]; LOG("Recovered from error at position: 0x%.8x", buf_cur); blk_pos = buf_cur; assert(buf_cur >= buf_beg); assert(ptr == &buffer[buf_cur - buf_beg]); } else { // Too many errors. cb_done(_ERR(VOL_INVALID_BLOCK_HEADER)); break; } } // if wrong header magic number found data_size = *(u32 *)(ptr + FIELD_OFF_SIZE); blk_size = data_size + BLOCK_SIZE_MIN; padding = blk_size & 7 ? 8 - (blk_size & 7) : 0; blk_size += padding; if (buffer.size() - (buf_cur - buf_beg) < blk_size) { // Refill buffer continue ; } pfooter = ptr + FIELD_OFF_DATA + data_size; if (*(u32 *)pfooter != MAGIC_NUM_FOOTER) { LOG("Invalid footer magic: 0x%.8x/0x%.8x at position 0x%.8x", *(u32 *)pfooter, MAGIC_NUM_FOOTER, buf_cur + FIELD_OFF_DATA + data_size); cb_done(_ERR(VOL_INVALID_BLOCK_FOOTER)); break; } // Continue Next block buf_cur += blk_size; data_drained = fs.eof() && buffer.size() - (buf_cur - buf_beg) <= 0; auto qval = createReadQueueValue(blk_pos, blk_size, cb_read, ptr); m_posted_parse_work_num++; // LOG("%u works have been posted", (u64)m_posted_parse_work_num); m_parser.post(qval, m_cb_parse, m_cb_after_parse); // Has more data if (data_drained) { LOG("No more data"); cb_done(0); break; } } while(true); } /*! \brief Create read queue item. \return rq_val_t* the read queue item. */ inline rq_val_t* createReadQueueValue(u64 pos, u32 size, const cb_read_info_t& cb_read, char* pdata) { rq_val_t* val = new rq_val_t(); val->sender = this; val->data.resize(size); val->pos = pos; val->validated = false; val->cb_read = cb_read; std::copy(pdata, pdata + size, &val->data[0]); return val; } /*! \brief Parse read queue item. */ inline void parseReadQueueValue(rq_val_t* qval) { char* ptr = &qval->data[0] + sizeof(MAGIC_NUM_HEADER); char* pcnt = &qval->data[0] + FIELD_OFF_DATA; qval->info = *(block_info_t *)ptr; if (qval->info.flag != FLAG_REMOVE) { u32 res = qval->sender->computeParity(pcnt, qval->info.size); qval->validated = qval->info.checkSum == res; qval->cb_read(qval->validated ? 0 : -1, qval->pos, qval->info); } } .... private: /*! \brief Event handle for parsing read block data. \return "Return of the function" */ inline void OnParse(rq_val_t* qval) { parseReadQueueValue(qval); delete qval; if (--m_posted_parse_work_num == 0) { m_cv_parse.notify_one(); } } inline void OnAfterParse(rq_val_t* qval, bool canceled) { // FIXME: the after_work won't be fired if there are too many works in queue }
Async read block data
Asynchronously reading data is the simplest part:/*! \brief Read block data content. \return None. */ void ReadData(AsyncFile& afs, u64& pos, size_t size, const AsyncFile::cb_read_t& cb) { afs.Read(pos + FIELD_OFF_DATA, size, cb); }
Async write block
Blocks will not be covered. Any new blocks will be append to the end of volume. Here we use batching writing mode./*! \brief Prepare writing buffer for batch appending blocks. */ void PrepareWriteBuffer(u64 offset, wq_val_t& qval, shared_t<vec_t<char>>& buffer, u32* blockSize) { u32 prev_buf_size = buffer->size(); *blockSize = BLOCK_SIZE_MIN + qval.data->size(); u32 padding = *blockSize & 7 ? 8 - (*blockSize & 7) : 0; *blockSize += padding; qval.pos = offset; buffer->resize(prev_buf_size + *blockSize); char* ptr = &(*buffer)[prev_buf_size]; // Write header magic num *(u32 *)ptr = MAGIC_NUM_HEADER; ptr += sizeof(MAGIC_NUM_HEADER); // Add block info *(u128 *)ptr = qval.key; ptr += sizeof(u128); *(u32 *)ptr = qval.cookie; ptr += sizeof(u32); *(u16 *)ptr = qval.tag; ptr += sizeof(u16); *(u16 *)ptr = qval.flag; ptr += sizeof(u16); *(u32 *)ptr = (u32)qval.data->size(); ptr += sizeof(u32); *(u32 *)ptr = computeParity(*qval.data); ptr += sizeof(u32); std::copy(qval.data->begin(), qval.data->end(), ptr); ptr += qval.data->size(); // Add footer magic *(u32 *)ptr = MAGIC_NUM_FOOTER; ptr += sizeof(MAGIC_NUM_FOOTER); // Add padding if (padding > 0) std::memcpy(ptr, PADDING_VALUE, padding); } /*! \brief Append new block to the Volume file. \return None. */ void Append(AsyncFile& afs, u64 offset, write_queue_t* ptr_queue, const AsyncFile::cb_write_t& cb_write) { shared_t<vec_t<char>> buffer = mk_shared<vec_t<char>>(); u32 size = 0; /** \note size is a u32 type, which means the maximum size of data block must less than 4GB. */ u64 pos = offset; for(auto& it : *ptr_queue) { PrepareWriteBuffer(pos, it, buffer, &size); pos += size; } afs.Write(offset, buffer, [&, cb_write](int err, u64 num, void* userdata) mutable { write_queue_t* ptr = reinterpret_cast<write_queue_t *>(userdata); for(auto& it : *ptr) { it.cb_append(err, it.key, it.cookie, it.tag, it.flag, it.size, it.pos); } if(!err && num != buffer->size()) { err = _ERR(VOL_FAILED_TO_WRITE); } cb_write(err, num, ptr); }, ptr_queue); }
相关文章推荐
- node-haystack Episode 6: Data Structure And Constants
- node-haystack Episode 5: Volume
- Node.js 学习笔记 (一)
- [LeetCode]382. Linked List Random Node
- nodejs学习笔记(二)——javascript的同步异步行为和多线程
- 【LeetCode-19】Remove Nth Node From End of List(C++)
- 关于如何上手NodeMCU(ESP8266开发板)
- node和angular交互的两种方式
- 安装nodejs+ionic+cordova环境心得
- [LeetCode]25. Reverse Nodes in k-Group
- CDH集群中maptask的日志文件的位置整理
- NodeJS无所不能:细数10个令人惊讶的NodeJS开源项目
- LeetCode 19. Remove Nth Node From End of List
- 为什么 Node.js 这么火,而同样异步模式 Python 框架 Twisted 却十几年一直不温不火?
- pomelo 框架一个session bug的修复
- 关于LeetCode中Remove Nth Node From End of List一题的理解
- node-haystack Episode 3: Callback model in C++
- 完整的node项目
- Reverse Nodes in k-Group
- Node+Socketio实现消息群发功能