您的位置:首页 > 理论基础 > 计算机网络

muduo : TcpConnection's Write Buffer

2016-08-12 11:49 239 查看

引言

前面的文章分别分析到了
三个半事件
中的连接建立、连接断开、数据读入,这里分析最后
半个事件
,即发送数据。muduo作者将该事件称为是半个事件是有道理的,因为这里的发送是指将数据放到TCP协议栈的发送缓冲区,由TCP协议栈负责将数据发送到对端,因此称为半个事件。

发送数据

发送数据比接收数据更难,因为发送数据是主动的,接收读取数据是被动的。因为muduo采用的是LT模式,因为合适注册writeable事件需要好好考虑。muduo的send函数的策略是这样的,先判断outputBuffer中是否有数据,如果没有那么尝试直接发送数据,如果一次没有发送完就把剩余的数据放到outputBuffer中,并注册writeable事件,以后在handleWrite中发送剩余数据;如果outputBuffer中有数据就不能尝试先发送了,因为会造成数据乱序。

TcpConnection::send

send函数有3个重载,大体相同,这里只给出一个。

void TcpConnection::send(const void* data, size_t len)
{
if (state_ == kConnected)
{
// 在IO线程执行,保证线程安全
if (loop_->isInLoopThread())
{
sendInLoop(data, len);
}
else
{
string message(static_cast<const char*>(data), len);
loop_->runInLoop(
boost::bind(&TcpConnection::sendInLoop,
this,
message));
}
}
}

void TcpConnection::send(const StringPiece& message)
{
// ...
}

// FIXME efficiency!!!
void TcpConnection::send(Buffer* buf)
{
// ...
}

void TcpConnection::sendInLoop(const void* data, size_t len)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0;

// 如果outputBuffer没有数据,就尝试直接发送
// if no thing in output queue, try writing directly
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = ::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
if (implicit_cast<size_t>(nwrote) < len)
{
LOG_TRACE << "I am going to write more data";
}
else if (writeCompleteCallback_)
{ // 一次就发送完了,那么可以调用`writeCompleteCallback`了,很好
loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
}
}
else // nwrote < 0
{
nwrote = 0;
if (errno != EWOULDBLOCK)
{
LOG_SYSERR << "TcpConnection::sendInLoop";
}
}
}

assert(nwrote >= 0);
if (implicit_cast<size_t>(nwrote) < len)
{ // 将剩余的数据添加到outputBuffer
outputBuffer_.append(static_cast<const char*>(data)+nwrote, len-nwrote);
// 开始关注writeable事件,剩余的数据在handleWrite中发送
if (!channel_->isWriting())
{
channel_->enableWriting();
}
}
}


TcpConnection::handleWrite

void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
if (channel_->isWriting())
{
ssize_t n = ::write(channel_->fd(), outputBuffer_.peek(), outputBuffer_.readableBytes());
if (n > 0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0)
{ // 发送完了,不再关注writeable事件,调用`writeCompleteCallback`
channel_->disableWriting();
if (writeCompleteCallback_)
{
loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
}
if (state_ == kDisconnecting)
{
shutdownInLoop();
}
}
else
{ // 使用的是LT模式,当可写时,handleWrite会被继续调用
LOG_TRACE << "I am going to write more data";
}
}
else
{
LOG_SYSERR << "TcpConnection::handleWrite";
abort();  // FIXME
}
}
else
{
LOG_TRACE << "Connection is down, no more writing";
}
}


经过muduo的封装,用户需要发送数据时直接把数据交给send就可以了,send没有返回值,muduo在底层会将数据发送完毕,最后调用用户设置好的回调,看来很方便。

可以想一想在发送数据时,client断开连接的情况,handleWrite发现后没有做过多处理(see the last else),因为handleRead会在read返回0时发现这一点,然后断开连接。

HighWaterMarkCallback

上面提到的
WriteCompleteCallback
可以称为低水位回调,那么
HighWaterMarkCallback
高水位回调


想想一下一个proxy,C和S通过该proxy连接,S的发送数据速度很快,C的读取数据的速度很慢,那么proxy的outputBuffer会暴涨。为了协调一下收发的速度,可以使用
高水位回调


当TcpConnection::outputBuffer的大小到达某个值时(hightWaterMark),HighWaterMarkCallback函数被调用。在该函数中可以先停止对读事件的关注,在outputBuffer发送完毕(即WirteCompleteCallback被调用时)再开启对读事件的关注。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: