您的位置:首页 > 理论基础 > 数据结构算法

zeromq源代码分析5-3------管道相关的数据结构yqueue, ypipe, pipe等

2011-07-21 17:35 555 查看
这篇文章我们讲一下pipe, 从前面的博文中你了解了zeromq数据读写是异步的,主要与管道交互。先说一下pipe的基本功能吧:1. 流量控制: 有一个HWM(最高水位), LWM(最低水位)和active标志。2. 通过发送终结符类型的消息来销毁管道。3. swap模式,当管道被写满的时候,也就是到达最高水位的时候,如果配置了swap file这时可以将消息写到swap文件中,等到日后reader将其消息读到LWM时,再将消息从swap文件中读出来重新写入管道中。
一个pipe有两个部分组成,一个是reader,一个是writer。我们先看看reader:先看read(1)函数:
bool zmq::reader_t::read (zmq_msg_t *msg_)
{
if (!active)
return false;

if (!pipe->read (msg_)) {
active = false;
return false;
}

//  If delimiter was read, start termination process of the pipe. 处理终结消息
unsigned char *offset = 0;
if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) {
if (sink)
sink->delimited (this);
terminate ();
return false;
}

if (!(msg_->flags & ZMQ_MSG_MORE))
msgs_read++;

if (lwm > 0 && msgs_read % lwm == 0) // 当读到的消息数累积到lwm时我们发送激活writer的command消息。
send_activate_writer (writer, msgs_read);

return true;
}

该函数主要是利用上一篇博客中讲的ypipe队列,从中读取消息,如果消息是终结类型的消息,则销毁管道,这个我们后面会讲。
还要注意这边对multipart message消息计数的处理。
而check_read()函数主要是检查ypipe队列中有木有消息。
bool zmq::reader_t::check_read ()
{
if (!active)
return false;

//  Check if there's an item in the pipe.
if (!pipe->check_read ()) {    // 如果没有消息,那么将active设置成false,使得后面的检测更快,需要writer端新写消息并且flush()的时候来激活reader
active = false;
return false;
}

//  If the next item in the pipe is message delimiter,
//  initiate its termination.
if (pipe->probe (is_delimiter)) {  // 这边是侦测pipe中下一条消息是否是终结符类型的消息, 如果是的话就读出来销毁管道
zmq_msg_t msg;
bool ok = pipe->read (&msg);
zmq_assert (ok);
if (sink)
sink->delimited (this);
terminate ();
return false;
}

return true;
}
1. 如果管道中没有消息,那么将active设置成false,使得后面的检测更快,需要writer端新写消息并且flush()的时候来激活reader。
2.如果管道中还有消息,侦测pipe中下一条消息是否是终结符类型的消息, 如果是的话就读出来销毁管道。
接下来我们来看看writer:先看write(1)函数:
bool zmq::writer_t::write (zmq_msg_t *msg_)
{
if (unlikely (!check_write (msg_)))
return false;

if (unlikely (swapping)) {
bool stored = swap->store (msg_);
zmq_assert (stored);
if (!(msg_->flags & ZMQ_MSG_MORE))
swap->commit ();
return true;
}

pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE);
if (!(msg_->flags & ZMQ_MSG_MORE))
msgs_written++;

return true;
}
1. 会先调用check_write(1)来看看管道是否满了,即是否达到HWM。2. swap模式: 在check_write(1)的时候,如果管道满了的话,如果有swap file配置的话就会暂时写消息到swap文件,没有或者swap文件也容纳不下该消息的时候会将activate设为false,说明无法写入管道了,要等到读端接收到LWM的时候才唤醒激活写端。
3. 最后调用ypipe::write(2)完成对消息的写入到管道中的操作。4. 和读端一样,同样注意这边对已multipart的消息的计数。
消息写到管道中并不意味着reader就能读出消息,这点上篇博客中的ypipe大家应该都了解了,因此这边还需要调用flush()函数来刷新。
void zmq::writer_t::flush ()
{
//  In the swapping mode, flushing is automatically handled by swap object.
if (!swapping && !pipe->flush ())
send_activate_reader (reader);
}
1. 如果是swap模式下,flushing是自动完成的,swap模式下,消息存在swap文件中可以commit/rollback。

2. 当flush失败的时候说明管道中读者先去读了,然后发觉木有消息,于是active设成了false,因此这边要对这种情况进行处理,发送给reader激活的消息。
再看rollback():rollback()函数能够让我们移除掉未完成的消息。未完成主要是针对multipart message来说的,假如一个multipart message有三个子消息组成,结果我向管道中写了其中2个,那么我可以调用rollback()函数将这两个移除掉。
void zmq::writer_t::rollback ()
{
//  Remove incomplete message from the swap.
if (unlikely (swapping)) {
swap->rollback ();
return;
}

//  Remove incomplete message from the pipe.
zmq_msg_t msg;
while (pipe->unwrite (&msg)) {
zmq_assert (msg.flags & ZMQ_MSG_MORE);
zmq_msg_close (&msg);
}
}

刚才我们在上面讲了在reader从管道中读取消息时会发送激活writer的command消息:
if (lwm > 0 && msgs_read % lwm == 0) // 当读到的消息数累积到lwm时我们发送激活writer的command消息。
send_activate_writer (writer, msgs_read);

因此我们来看writer处理改command消息的函数process_activate_writer(): PS: 如果不清楚这边为啥会跳到这个函数的话,请回过头去看《zeromq源代码分析2------线/进程间通信方式》。
void zmq::writer_t::process_activate_writer (uint64_t msgs_read_)
{
//  Store the reader's message sequence number.
msgs_read = msgs_read_;

//  If we are in the swapping mode, we have some messages in the swap.
//  Given that pipe is now ready for writing we can move part of the
//  swap into the pipe.
if (swapping) {
zmq_msg_t msg;
while (!pipe_full () && !swap->empty ()) { // 管道还未满或者swap中还存有消息
swap->fetch(&msg);
pipe->write (msg, msg.flags & ZMQ_MSG_MORE);
if (!(msg.flags & ZMQ_MSG_MORE)) // multipart的消息计数处理
msgs_written++;
}
if (!pipe->flush ())  // flush管道,如果reader因为曾经读管道发觉木有消息的情况,就会active = false,因此得激活它
send_activate_reader (reader); // 发送激活reader的command消息

//  There are no more messages in the swap. We can switch into
//  standard in-memory mode.
if (swap->empty ()) { // 如果swap中无消息了,切换回in-memory模式
swapping = false;

//  Push delimiter into the pipe. Trick the compiler to belive that
//  the tag is a valid pointer. Note that watermarks are not checked
//  thus the delimiter can be written even though the pipe is full.
if (pending_delimiter) { // 因为管道满开启swap模式的时候发送的terminate命令,现在可以通过写入终结类型的消息来进行销毁管道的过程。
zmq_msg_t msg;
const unsigned char *offset = 0;
msg.content = (void*) (offset + ZMQ_DELIMITER);
msg.flags = 0;
pipe->write (msg, false);
flush ();
return;
}
}
}

//  If the writer was non-active before, let's make it active
//  (available for writing messages to).
if (!active && !terminating) {
active = true;
zmq_assert (sink);
sink->activated (this);
}
}

1. 这边如果先不看swap模式的话,就是将active设成false,并且将此事件传递给相应的sink(socket / session)。
2. 而如果是swap模式的话,我们就从swap文件中fetch消息写入管道中。细节请看注释。

最后我们看一下terminate的基本过程。
1. writer先发起writer_t::terminate():
a. writer rollback所有未完成的消息,发送终结消息给管道,如果管道已经满了进入swap模式了,那么就设置pending_delimiter = true,日后等到上面介绍的被唤醒的时候再处理:
void zmq::writer_t::terminate ()
{
//  Prevent double termination.
if (terminating)
return;
terminating = true;

//  Mark the pipe as not available for writing.
active = false;

//  Rollback any unfinished messages.
rollback ();

if (swapping) {
pending_delimiter = true;
return;
}

//  Push delimiter into the pipe. Trick the compiler to belive that
//  the tag is a valid pointer. Note that watermarks are not checked
//  thus the delimiter can be written even though the pipe is full.
zmq_msg_t msg;
const unsigned char *offset = 0;
msg.content = (void*) (offset + ZMQ_DELIMITER);
msg.flags = 0;
pipe->write (msg, false);
flush ();
}

b. 然后reader当收到终结消息的时候,就调用reader_t::terminate()发送pipe_term的command消息给writer:
void zmq::reader_t::terminate ()
{
//  If termination was already started by the peer, do nothing.
if (terminating)
return;

active = false;
terminating = true;
send_pipe_term (writer);
}

c. writer发送pipe_term_ack的command消息给reader,调用钩子函数(sink->terminated(this)),然后自毁。
void zmq::writer_t::process_pipe_term ()
{
send_pipe_term_ack (reader);

//  The above command allows reader to deallocate itself and the pipe.
//  For safety's sake we'll drop the pointers here.
reader = NULL;
pipe = NULL;

//  Notify owner about the termination.
zmq_assert (sink);
sink->terminated (this);

//  Deallocate the resources.
delete this;
}

d. reader收到pipe_term_ack的command消息后,调用钩子函数(sink->terminated(this)),自毁。
void zmq::reader_t::process_pipe_term_ack ()
{
//  At this point writer may already be deallocated.
//  For safety's sake drop the reference to it.
writer = NULL;

//  Notify owner about the termination.
zmq_assert (sink);
sink->terminated (this);

//  Deallocate resources.
delete this;
}

2. reader先发起reader_t::terminate():
那么就b, c, d几步了。

总结:
1. 流量控制。
2. 当管道满时能够开启swap模式。
3. 通过active flag来控制reader的sleep和被激活唤醒。
4. 能够terminate管道。

下一篇文章会介绍zeromq各种socket的实现,可能要分个几小节,敬请期待!希望有兴趣的朋友可以和我联系,一起学习。 kaka11.chen@gmail.com
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: