您的位置:首页 > Web前端 > Node.js

node-haystack Episode 7: Asynchronously manipulate blocks

2016-09-10 14:31 399 查看
Asynchronous operations can drive the hardware working in full speed, or almost.

Class Block

Exactly, manipulating the blocks happen in
class Block
.

Let’s take a look at the member varaiables of class block:

class Block {
public:
DECL_CB(read_info, int /*!< Error code. */, u64& /*!< Position of block. */, const block_info_t& /*!< Block index. */)
DECL_CB(read_data, int /*!< Error code. */, shared_t<vec_t<char>>& /*!< Block data. */)
DECL_CB(append, int /*!< Error code. */, const u128& /*!< Key of block. */, const u32& /*!< Cookie of block. */, const u16& /*!< Tag data of block. */, const u16& /*!< Flag of block. */, const u32& /*!< Size of block data. */, const u64& /*!< Position of block in file. */)
DECL_CB(load_complete, int /*!< Error code. */)
DECL_CB(remove, int /*!< Error code. */)
DECL_CB(verify, int /*!< Error code. */, const std::string& /*!< Status message. */, u32 /*!< Percentage of progress. */)

typedef struct {
u128    key;
u32     cookie;
u16     tag;
u16     flag;
u32     size;
u64     pos;
shared_t<vec_t<char>>   data;
cb_append_t     cb_append;
} write_queue_value_t;

using wq_val_t = write_queue_value_t;
using write_queue_t = std::list<wq_val_t>;
private:
typedef struct {
Block* sender;
vec_t<char> data;
block_info_t info;
u64 pos;
bool validated;
cb_read_info_t cb_read;
}  read_queue_value_t;

using rq_val_t = read_queue_value_t;
using parse_worker_t = AsyncWorker<rq_val_t>;

....
private:
shared_t<vec_t<char>> m_array_of_removal_flag;

bst::mutex m_parse_mtx;
bst::condition_variable m_cv_parse;

bst::atomic<u64> m_posted_parse_work_num;

parse_worker_t m_parser;
parse_worker_t::cb_work_t m_cb_parse;
parse_worker_t::cb_after_work_t m_cb_after_parse;
}; // class block


Async load

To load the volume as fast as possible, the loading operation has to separate into two parts: one part reads the block wholly, another one is working in work queue parsing the block and fire the callback.

Because any step when reading blocks can fail, there are a lot of error probing code and recovering-from-corruption code.

void LoadBlockInfo(bfs::fstream& fs, const cb_read_info_t& cb_read, const cb_load_complete_t& cb_done) {
vec_t<char> buffer;
u64 blk_pos, buf_cur, buf_beg;
char* ptr;
char* pfooter;
size_t data_size;
size_t blk_size = BLOCK_SIZE_MIN;
size_t padding;
bool data_drained;
u32 err_count = 0;

buf_cur = buf_beg = fs.tellg();

do {
if (m_posted_parse_work_num >= PARSE_QUEUE_LEN) {
bst::mutex::scoped_lock lock(m_parse_mtx);
// LOG("Waiting...");
m_cv_parse.wait(lock, [&]() mutable {
// LOG("Check posted works: %u", (u64)m_posted_parse_work_num);
return m_posted_parse_work_num == 0;
});
}

if (!buffer.size() || buffer.size() - (buf_cur - buf_beg) < blk_size) {
// Refill buffer
fs.seekg(buf_cur - fs.tellg(), fs.cur);

if (buffer.size() < READ_BUFFER_SIZE) buffer.resize(READ_BUFFER_SIZE);

fs.read(&buffer[0], READ_BUFFER_SIZE);

if (fs.eof() && fs.gcount() == 0) {
cb_done(0);
break;
}

if (fs.gcount() < BLOCK_SIZE_MIN) {
LOG("Failed to read block: expect %u bytes but get %u at position 0x%.8x", BLOCK_SIZE_MIN, fs.gcount(), buf_cur);
cb_done(_ERR(VOL_READ_BLOCK));
break;
}

// Resize buffer to fit
if (fs.gcount() != READ_BUFFER_SIZE) {
buffer.resize(fs.gcount());
}

buf_beg = buf_cur;
} // if need to refill buffer

blk_pos = buf_cur;
assert(buf_cur >= buf_beg);
ptr = &buffer[buf_cur - buf_beg];

if (*(u32 *)ptr != MAGIC_NUM_HEADER) {
LOG("Invalid header magic(%u): 0x%.8x/%.8x at position 0x%.8x", err_count, *(u32 *)ptr, MAGIC_NUM_HEADER, buf_cur);
// Attempt to recover from failure.
char* pend = &buffer[0] + buffer.size();

if (err_count < MAX_ERROR && ptr < pend) {
err_count++;

// Find next MAGIC_NUM_HEADER
// All data must align at 4 bytes.
ptr += 4;
bool found = false;

while(ptr < pend) {
if (*(u32 *)ptr == MAGIC_NUM_HEADER) {
break;
}

ptr += 4;
}

// Force refill
if (ptr >= pend) {
buf_cur = buf_beg + buffer.size();
continue;
}

// Can not recover from error.
if (*(u32 *)ptr != MAGIC_NUM_HEADER) {
cb_done(_ERR(VOL_INVALID_BLOCK_HEADER));
break;
}

assert(ptr < pend - 4);
assert(*(u32 *)ptr == MAGIC_NUM_HEADER);

buf_cur += ptr - &buffer[buf_cur - buf_beg];

LOG("Recovered from error at position: 0x%.8x", buf_cur);

blk_pos = buf_cur;
assert(buf_cur >= buf_beg);
assert(ptr == &buffer[buf_cur - buf_beg]);
} else {
// Too many errors.
cb_done(_ERR(VOL_INVALID_BLOCK_HEADER));
break;
}
} // if wrong header magic number found

data_size = *(u32 *)(ptr + FIELD_OFF_SIZE);
blk_size =  data_size + BLOCK_SIZE_MIN;
padding = blk_size & 7 ? 8 - (blk_size & 7) : 0;
blk_size += padding;

if (buffer.size() - (buf_cur - buf_beg) < blk_size) {
// Refill buffer
continue ;
}

pfooter = ptr + FIELD_OFF_DATA + data_size;

if (*(u32 *)pfooter != MAGIC_NUM_FOOTER) {
LOG("Invalid footer magic: 0x%.8x/0x%.8x at position 0x%.8x", *(u32 *)pfooter, MAGIC_NUM_FOOTER, buf_cur + FIELD_OFF_DATA + data_size);
cb_done(_ERR(VOL_INVALID_BLOCK_FOOTER));
break;
}

// Continue Next block
buf_cur += blk_size;
data_drained = fs.eof() && buffer.size() - (buf_cur - buf_beg) <= 0;

auto qval = createReadQueueValue(blk_pos, blk_size, cb_read, ptr);

m_posted_parse_work_num++;
// LOG("%u works have been posted", (u64)m_posted_parse_work_num);
m_parser.post(qval, m_cb_parse, m_cb_after_parse);

// Has more data
if (data_drained) {
LOG("No more data");
cb_done(0);
break;
}
} while(true);
}

/*!
\brief Create read queue item.
\return rq_val_t* the read queue item.
*/
inline rq_val_t* createReadQueueValue(u64 pos, u32 size, const cb_read_info_t& cb_read, char* pdata) {
rq_val_t* val = new rq_val_t();

val->sender = this;
val->data.resize(size);
val->pos = pos;
val->validated = false;
val->cb_read = cb_read;

std::copy(pdata, pdata + size, &val->data[0]);
return val;
}

/*!
\brief Parse read queue item.
*/
inline void parseReadQueueValue(rq_val_t* qval) {
char* ptr = &qval->data[0] + sizeof(MAGIC_NUM_HEADER);
char* pcnt = &qval->data[0] + FIELD_OFF_DATA;

qval->info = *(block_info_t *)ptr;

if (qval->info.flag != FLAG_REMOVE) {
u32 res = qval->sender->computeParity(pcnt, qval->info.size);

qval->validated = qval->info.checkSum == res;
qval->cb_read(qval->validated ? 0 : -1, qval->pos, qval->info);
}
}

....
private:
/*!
\brief Event handle for parsing read block data.
\return "Return of the function"
*/
inline void OnParse(rq_val_t* qval) {
parseReadQueueValue(qval);

delete qval;

if (--m_posted_parse_work_num == 0) {
m_cv_parse.notify_one();
}
}

inline void OnAfterParse(rq_val_t* qval, bool canceled) {
// FIXME: the after_work won't be fired if there are too many works in queue
}


Async read block data

Asynchronously reading data is the simplest part:

/*!
\brief Read block data content.
\return None.
*/
void ReadData(AsyncFile& afs, u64& pos, size_t size, const AsyncFile::cb_read_t& cb) {
afs.Read(pos + FIELD_OFF_DATA, size, cb);
}


Async write block

Blocks will not be covered. Any new blocks will be append to the end of volume. Here we use batching writing mode.

/*!
\brief Prepare writing buffer for batch appending blocks.
*/
void PrepareWriteBuffer(u64 offset, wq_val_t& qval, shared_t<vec_t<char>>& buffer, u32* blockSize) {
u32 prev_buf_size = buffer->size();
*blockSize = BLOCK_SIZE_MIN + qval.data->size();
u32 padding = *blockSize & 7 ? 8 - (*blockSize & 7) : 0;

*blockSize += padding;
qval.pos = offset;

buffer->resize(prev_buf_size + *blockSize);

char* ptr = &(*buffer)[prev_buf_size];

// Write header magic num
*(u32 *)ptr = MAGIC_NUM_HEADER; ptr += sizeof(MAGIC_NUM_HEADER);
// Add block info
*(u128 *)ptr = qval.key;        ptr += sizeof(u128);
*(u32 *)ptr = qval.cookie;      ptr += sizeof(u32);
*(u16 *)ptr = qval.tag;         ptr += sizeof(u16);
*(u16 *)ptr = qval.flag;        ptr += sizeof(u16);
*(u32 *)ptr = (u32)qval.data->size();   ptr += sizeof(u32);
*(u32 *)ptr = computeParity(*qval.data); ptr += sizeof(u32);

std::copy(qval.data->begin(), qval.data->end(), ptr);
ptr += qval.data->size();

// Add footer magic
*(u32 *)ptr = MAGIC_NUM_FOOTER; ptr += sizeof(MAGIC_NUM_FOOTER);
// Add padding
if (padding > 0)
std::memcpy(ptr, PADDING_VALUE, padding);
}

/*!
\brief Append new block to the Volume file.
\return None.
*/
void Append(AsyncFile& afs, u64 offset, write_queue_t* ptr_queue, const AsyncFile::cb_write_t& cb_write) {
shared_t<vec_t<char>> buffer = mk_shared<vec_t<char>>();

u32 size = 0; /** \note size is a u32 type, which means the maximum size of data block must less than 4GB. */
u64 pos = offset;

for(auto& it : *ptr_queue) {
PrepareWriteBuffer(pos, it, buffer, &size);
pos += size;
}

afs.Write(offset, buffer, [&, cb_write](int err, u64 num, void* userdata) mutable {
write_queue_t* ptr = reinterpret_cast<write_queue_t *>(userdata);

for(auto& it : *ptr) {
it.cb_append(err, it.key, it.cookie, it.tag, it.flag, it.size, it.pos);
}

if(!err && num != buffer->size()) {
err = _ERR(VOL_FAILED_TO_WRITE);
}

cb_write(err, num, ptr);
}, ptr_queue);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: