您的位置:首页 > 理论基础 > 计算机网络

[置顶] Muduo网络库源码分析之TcpConnection Class

2018-01-17 15:04 411 查看
用于管理一个具体的 TCP 连接,比如消息的接收与发送,完成用户指定的连接回调
connectionCallback


TcpConnection
构造时接收参数有 TCP 连接的 sockfd,服务端地址
localAddr
,客户端地址
peerAddr
,并通过 Socket 封装
sockfd
。并用 Channel 管理该 sockfd,向 Channel 注册可读、可写、关闭、出错回调函数,用于 Poller 返回就绪事件后
Channel::handleEvent()
执行相应事件的回调。

TcpConnection 有四个状态,简单的状态图:



TcpConnection 有一系列用户指定的事件回调函数,比如
TcpConnection::connectionCallback
messageCallback
writeCompleteCallback
,这些是用户通过 TcpServer 传给 TcpConnection。当 Poller 返回 TcpConnection 对应的 Socket 就绪事件时,
Channel::handleEvent()
->
TcpConnection::handle*系列函数
-> 这些事件回调函数(如 connectionCallback)。

与上面对应,TcpConnection 有一系列 用于 TcpServer 为连接指定事件 callback 的函数,比如
TcpConnection::setConnectionCallback
setCloseCallback
等等,是在
TcpServer::newConncetion()
中创建一个 TcpConnection 对象并将用户指定的回调函数(上段说的)通过
TcpConnection::set*Callback
函数传递给 TcpConnection。

TcpConncetion 中还有一系列函数是处理 sockfd 上的事件回调函数,比如
TcpConnection::handleRead()
是在Poller 返回可读事件时由
Channel::handleEvent()
调用的,类似的还有
handleWrite()
handleClose()
handleError()
,这些函数会调用用户指定的事件回调函数(上上段说的),比如
TcpConnection::handleRead() -> messageCallback()
。这些事件回调函数是在 TcpConnection 构造时创建完 sockfd 对应的 Channel 后通过
Channel::set*Callback
系列函数注册的。

TcpConnection::handleRead()
:当连接对应的 sockfd 有可读事件发生时调用,主要是将数据读到 Buffer 中,执行消息回调函数
messageCallback_()


TcpConnection::handleWrite()
:当连接对应的 sockfd 有可写事件发生时调用,主要是将 Buffer 中的数据发送出去,如果一次性发送完毕,则执行用户指定的回调
writeCompleteCallback_()
,若一次没有发送完, muduo 采用 LT 模式, 会反复触发可写事件,下次还有机会发送剩下的数据。

TcpConnection::handleClose()
:主要执行
Channel::disableAll()
TcpConnection::closeCallback()


TcpConnection::handleError()
:主要是在日志中输出错误信息。

TcpConnection::closeCallback()
不是给普通用户用的,这个是给 TcpServer 和 TcpClient 用的,用于通知它们移除所持有的 TcpConnectionPtr。它绑定的是
TcpServer::removeConnection()
,通过下面关系:Accetptor 接受一个 Tcp 连接时
Channel::handleEvent() -> Acceptor::handleRead() -> TcpServer::newConnection()
中新建一个 TcpConnection 并通过
TcpConnection::setCloseCallback(bind(&TcpServer::removeConnectionCallback, this, _1))
。 这个回调什么时候被调用呢?当
TcpConnection::handleRead()
中 read 返回0,或者 sockfd 发生 POLLHUP 事件就绪时,都会调用到
TcpConnection::handleClose() -> TcpConnection::closeCallback_() -> TcpServer::removeConnection()


TcpConnection 中还有两个函数是给 TcpServer 使用的。

TcpConnection::connectEstablishd()
,是
TcpServer::newConnection()
创建完 TcpConnection 对象,设置好回调函数之后调用的,主要是调用
Channel::enableReading()
将 TcpConnection 对应的 sockfd 注册读事件,然后执行用户指定的
connectionCallback_()
,并将 TcpConnection 状态置为 kConnected。这个函数如何被执行呢? Acceptor 持有的 lfd 有可读事件发生,即有连接请求,此时
Channel::handleEvent() -> Acceptor::handleRead() -> TcpServer::newConnection() -> ……->TcpConnection::connectEstablished()
。中间省略的部分是线程转移,转移到TcpConnection 所在的 IO 线程执行,TcpServer 和 TcpConnection 可能不在同一线程。

TcpConnection::connectDestroyed()
,这是 TcpConnection 析构前最后调用的一个成员函数,它通知用户连接已断开。主要功能是设置 TcpConnection 的状态为 kDisconnected;停止监听所有事件;调用
connectionCallback_()
执行用户指定的回调;从 epoll 监听的 fd 中移除 TcpConnection 对应的 sockfd。那什么时候调用到这个函数呢?一个是 TcpServer 析构时,一般情况下是经由
TcpConnection::handleClose() -> TcpConnection::closeCallback_() -> TcpServer::removeConnection() -> ……->TcpConnection::connectDestroyed()


send 一系列函数是可以用户或者其他线程调用,用于发送信息。如果不是在IO线程,它会把实际工作转移到IO线程调用。首先检查 TcpConnection 对应的 Socket 是否注册了可写事件,若注册了可写事件表明输出缓冲区 outputBuffer_中已经有数据等待发送,为了保证不乱序,这次的数据只要追加到输出缓冲区中,通过
Channel::handleEvent() -> TcpConnection::handleWrite()
来发送。如果Socket 没有注册可写事件,输出缓冲区没有数据,那么这次的消息可以直接通过 write 发送,如果没有一次性发送完毕,那么 message 剩余的数据仍然要 append 到 outputBuffer 中,并向 Poller 注册可写事件,当 socket 变得可写时,Channel 会调用
TcpConnection::handleWrite()
来发送 outputBuffer_ 中堆积的数据,发送完毕后立刻停止监听可写事件,避免 busy loop。无论是
sendInLoop() -> write()
还是
Channel::handleEvent() -> handleWrite()
,只要确定发送完 message 或者 outputBuffer_ 中的数据,那么都要调用用户指定的回调
writeCompleteCallback()


使用 epoll 的 LT 模式,当 socket 可写时,会不停的触发 socket 的可写事件,这个时候如何解决?第一种方式:需要向 socket 写数据时,注册此 socket 的可写事件,接收到可写事件后,然后调用 write/send 写数据,当所有数据都写完后,立刻停止观察 writable 事件,避免 busy loop。。第二种,需要发送数据时,直接调用 write/send 写,如果没有发送完,那么开是监听此 socket 的 writable 事件,然后接收到可写事件后,调用 write/send 发送数据,当所有数据都写完后,立刻停止观察 writable 事件,避免 busy loop。 muduo 采用的 LT 模式,就是用的第二种解决这个问题。

此外 TcpConnection 还有几个小功能,比如
TcpConnection::setTcpNoDelay()
TcpConnection::setKeepAlive()
。TCP No Delay 和 TCP keepalive 都是常用的 TCP 选项,前者的作用是禁止 Nagle 算法,避免连续发包出现延迟,这对编写低延迟网络服务很重要。后者的作用是定期探查 TCP 连接是否还存在,一般来说如果有应用层心跳的话,TCP keepalive 不是必须的。

Buffer 类的设计后面会分析。

TcpConnection.h

#ifndef MUDUO_NET_TCPCONNECTION_H
#define MUDUO_NET_TCPCONNECTION_H

#include <muduo/base/StringPiece.h>
#include <muduo/base/Types.h>
#include <muduo/net/Callbacks.h>
#include <muduo/net/Buffer.h>
#include <muduo/net/InetAddress.h>

#include <boost/any.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>

// struct tcp_info is in <netinet/tcp.h>
struct tcp_info;

namespace muduo
{
namespace net
{

class Channel;
class EventLoop;
class Socket;

///
/// TCP connection, for both client and server usage.
///
/// This is an interface class, so don't expose too much details.
class TcpConnection : boost::noncopyable,
public boost::enable_shared_from_this<TcpConnection>
{
public:
/// Constructs a TcpConnection with a connected sockfd
///
/// User should not create this object.
TcpConnection(EventLoop* loop,
const string& name,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr);
~TcpConnection();

EventLoop* getLoop() const { return loop_; }
const string& name() const { return name_; }
const InetAddress& localAddress() const { return localAddr_; }
const InetAddress& peerAddress() const { return peerAddr_; }
bool connected() const { return state_ == kConnected; }
bool disconnected() const { return state_ == kDisconnected; }
// return true if success.
bool getTcpInfo(struct tcp_info*) const;
string getTcpInfoString() const;

// void send(string&& message); // C++11
void send(const void* message, int len);
void send(const StringPiece& message);
// void send(Buffer&& message); // C++11
void send(Buffer* message);  // this one will swap data
void shutdown(); // NOT thread safe, no simultaneous calling
// void shutdownAndForceCloseAfter(double seconds); // NOT thread safe, no simultaneous calling
void forceClose();
void forceCloseWithDelay(double seconds);
void setTcpNoDelay(bool on);
// reading or not
void startRead();
void stopRead();
bool isReading() const { return reading_; }; // NOT thread safe, may race with start/stopReadInLoop

/* 设置TCP上下文 */
void setContext(const boost::any& context)
{ context_ = context; }

/* 获取TCP上下文 */
const boost::any& getContext() const
{ return context_; }

boost::any* getMutableContext()
{ return &context_; }

/* 设置连接建立时的回调函数 */
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }

/* 设置消息到来的回调函数 */
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }

/* 设置成功将数据写入对方时的回调函数 */
void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{ writeCompleteCallback_ = cb; }

/* 设置高水位回调函数和高水位值,当缓冲区的size达到highWaterMark时触发此请求 */
void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
{ highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }

/// Advanced interface
/* 返回输入缓冲区和输出缓冲区指针 */
Buffer* inputBuffer()
{ return &inputBuffer_; }

Buffer* outputBuffer()
{ return &outputBuffer_; }

/// Internal use only.
/* 设置TCP关闭的回调函数,仅在内部使用,用于移除持有的TcpConnectionPtr */
void setCloseCallback(const CloseCallback& cb)
{ closeCallback_ = cb; }

// called when TcpServer accepts a new connection
/* TcpServer使用,创建完一个新的连接后调用 */
void connectEstablished();   // should be called only once
// called when TcpServer has removed me from its map
/* TcpServer使用,从 map 中删除掉时调用 */
void connectDestroyed();  // should be called only once

private:
/* TcpConnection 有四种状态,从左到右依次是:已断开 正在连接 已连接 正在断开 */
enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };

/* 处理 connfd 上的事件回调函数 */
void handleRead(Timestamp receiveTime);   // 处理可读事件
void handleWrite();   // 处理可写事件
void handleClose();   // 处理关闭事件
void handleError();   // 处理错误事件

/* 通过线程转移操作实现安全发送消息 */
// void sendInLoop(string&& message);
void sendInLoop(const StringPiece& message);
void sendInLoop(const void* message, size_t len);
/* 通过线程转移操作实现安全关闭TCP连接 */
void shutdownInLoop();
// void shutdownAndForceCloseInLoop(double seconds);
/* 主动关闭连接 */
void forceCloseInLoop();
/* 设置TCP连接的状态 */
void setState(StateE s) { state_ = s; }
const char* stateToString() const;
void startReadInLoop();
void stopReadInLoop();

EventLoop* loop_;     // 处理该TCP连接的EventLoop,该EventLoop内部的epoll监听TCP连接对应的fd
const string name_;   // TCP连接的名字
StateE state_;  // FIXME: use atomic variable     // 本条TCP连接的状态
bool reading_;        //
// we don't expose those classes to client.
boost::scoped_ptr<Socket> socket_;    // TCP连接的fd所在的socket对象,fd的关闭由它决定
boost::scoped_ptr<Channel> channel_;  // TCP连接的fd 对应的 Channel
const InetAddress localAddr_;         // TCP连接本地的地址:ip port
const InetAddress peerAddr_;          // TCP连接对方的地址:ip port
ConnectionCallback connectionCallback_;   // 连接建立时的回调函数
MessageCallback messageCallback_;         // 收到消息时的回调函数
WriteCompleteCallback writeCompleteCallback_;     // 消息写入对方缓冲区时的回调函数
HighWaterMarkCallback highWaterMarkCallback_;     // 高水位回调函数
CloseCallback closeCallback_;         // 关闭TCP连接的回调函数
size_t highWaterMark_;    // 高水位标记
Buffer inputBuffer_;      // TCP连接的输入缓冲区,从连接中读取输入然后存入这里
Buffer outputBuffer_;     // TCP连接的输出缓冲区,要发送的数据保存在这里
// FIXME: use list<Buffer> as output buffer.
boost::any context_;      // TCP连接的上下文,一般用于处理多次消息相互存在关联的情形,例如文件发送
// FIXME: creationTime_, lastReceiveTime_
//        bytesReceived_, bytesSent_
};

/* 使用 shared_ptr 来管理 TCP 连接对象的生存周期 */
typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr;

}
}

#endif  // MUDUO_NET_TCPCONNECTION_H


TcpConnection.cc

#include <muduo/net/TcpConnection.h>

#include <muduo/base/Logging.h>
#include <muduo/base/WeakCallback.h>
#include <muduo/net/Channel.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/Socket.h>
#include <muduo/net/SocketsOps.h>

#include <boost/bind.hpp>

#include <errno.h>

using namespace muduo;
using namespace muduo::net;

/* 默认的当连接建立和断开时的回调函数 */
void muduo::net::defaultConnectionCallback(const TcpConnectionPtr& conn)
{
LOG_TRACE << conn->localAddress().toIpPort() << " -> "
<< conn->peerAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
// do not call conn->forceClose(), because some users want to register message callback only.
}

/* 默认的收消息的回调函数 */
void muduo::net::defaultMessageCallback(const TcpConnectionPtr&,
Buffer* buf,
Timestamp)
{
buf->retrieveAll();   // 默认取出缓冲区中所有数据
}

TcpConnection::TcpConnection(EventLoop* loop,
const string& nameArg,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CHECK_NOTNULL(loop)),
name_(nameArg),         // connection 的名字
state_(kConnecting),    // 初始状态是 正在连接
reading_(true),
socket_(new Socket(sockfd)),    // 创建sockfd 对应的Socket
channel_(new Channel(loop, sockfd)), // 创建sockfd对应的Channel
localAddr_(localAddr),
peerAddr_(peerAddr),
highWaterMark_(64*1024*1024)    // 高水位默认是64K
{
/* 注册事件的回调函数,当对应事件发生时会调用 */
channel_->setReadCallback(
boost::bind(&TcpConnection::handleRead, this, _1));
channel_->setWriteCallback(
boost::bind(&TcpConnection::handleWrite, this));
channel_->setCloseCallback(
boost::bind(&TcpConnection::handleClose, this));
channel_->setErrorCallback(
boost::bind(&TcpConnection::handleError, this));
LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
<< " fd=" << sockfd;
socket_->setKeepAlive(true);
}

TcpConnection::~TcpConnection()
{
LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
<< " fd=" << channel_->fd()
<< " state=" << stateToString();
assert(state_ == kDisconnected);
}

bool TcpConnection::getTcpInfo(struct tcp_info* tcpi) const
{
return socket_->getTcpInfo(tcpi);
}

string TcpConnection::getTcpInfoString() const
{
char buf[1024];
buf[0] = '\0';
socket_->getTcpInfoString(buf, sizeof buf);
return buf;
}

void TcpConnection::send(const void* data, int len)
{
send(StringPiece(static_cast<const char*>(data), len));
}

void TcpConnection::send(const StringPiece& message)
{
if (state_ == kConnected)
{
/* 如果是在loop线程,就直接发送数据 */
if (loop_->isInLoopThread())
{
sendInLoop(message);
}
else
{
/* 否则,转移到loop线程执行 */
loop_->runInLoop(
boost::bind(&TcpConnection::sendInLoop,
this,     // FIXME
message.as_string()));
//std::forward<string>(message)));
}
}
}

// FIXME efficiency!!!
void TcpConnection::send(Buffer* buf)
{
if (state_ == kConnected)
{
if (loop_->isInLoopThread())
{
sendInLoop(buf->peek(), buf->readableBytes());
buf->retrieveAll();
}
else
{
loop_->runInLoop(
boost::bind(&TcpConnection::sendInLoop,
this,     // FIXME
buf->retrieveAllAsString()));
//std::forward<string>(message)));
}
}
}

void TcpConnection::sendInLoop(const StringPiece& message)
{
sendInLoop(message.data(), message.size());
}

/*
* 先尝试直接发送数据,如果一次发送完毕就不会启用WriteCallback
*
* 如果只发送了部分数据,则把剩余数据放入输出缓冲区,
* 并关注writable事件,以后在handleWrite()中发送剩余的数据
*
* 如果当前输出缓冲区已经有待发送的数据,那么就不能先尝试发送了
* 会造成数据乱序
*/
void TcpConnection::sendInLoop(const void* data, size_t len)
{
/* 断言确保是在loop线程 */
loop_->assertInLoopThread();
ssize_t nwrote = 0;
size_t remaining = len;
bool faultError = false;
/* 如果连接关闭,那么就放弃写数据  */
if (state_ == kDisconnected)
{
LOG_WARN << "disconnected, give up writing";
return;
}
// if no thing in output queue, try writing directly
/* 如果输出缓冲区中没有数据,可以直接往fd写数据,不会乱序 */
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
/* 往fd发送数据 */
nwrote = sockets::write(channel_->fd(), data, len);
/* 发送成功,看是否还有剩余数据 */
if (nwrote >= 0)
{
/* 计算剩余量 */
remaining = len - nwrote;
/* 全部发送完了,执行writeCompleteCallback_回调 */
if (remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
}
}
/* 发送 失败 */
else // nwrote < 0
{
nwrote = 0;
if (errno != EWOULDBLOCK)
{
LOG_SYSERR << "TcpConnection::sendInLoop";
if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
{
faultError = true;
}
}
}
} // end if

assert(remaining <= len);
if (!faultError && remaining > 0)
{
/* 输出缓冲区的剩余字节 */
size_t oldLen = outputBuffer_.readableBytes();
/* 如果现在需要发送的字节数达到高水位,之前没有达到 */
if (oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)
{
/* 在 loop 线程中执行高水位回调函数 */
loop_->queueInLoop(boost::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}

/* 将未发送的data中的数据放入输出缓冲区 */
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
/* 如果对应的Channel没有在监听write事件 */
if (!channel_->isWriting())
{
/* 注册可写事件 */
channel_->enableWriting();
}
}
}

void TcpConnection::shutdown()
{
// FIXME: use compare and swap
if (state_ == kConnected)     // 执行shutdown必须是在连接正常的情况下
{
setState(kDisconnecting);
// FIXME: shared_from_this()?
loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
}
}

void TcpConnection::shutdownInLoop()
{
loop_->assertInLoopThread();  // 在IO线程中执行
if (!channel_->isWriting())   // 当前没有监听写事件
{
// we are not writing
socket_->shutdownWrite();   // 则关闭写端
}
}
/* 主动关闭连接 */
void TcpConnection::forceClose()
{
// FIXME: use compare and swap
if (state_ == kConnected || state_ == kDisconnecting)
{
setState(kDisconnecting);
loop_->queueInLoop(boost::bind(&TcpConnection::forceCloseInLoop, shared_from_this()));
}
}

void TcpConnection::forceCloseWithDelay(double seconds)
{
if (state_ == kConnected || state_ == kDisconnecting)
{
setState(kDisconnecting);
loop_->runAfter(
seconds,
makeWeakCallback(shared_from_this(),
&TcpConnection::forceClose));  // not forceCloseInLoop to avoid race condition
}
}

void TcpConnection::forceCloseInLoop()
{
loop_->assertInLoopThread();
if (state_ == kConnected || state_ == kDisconnecting)
{
// as if we received 0 byte in handleRead();
handleClose(); // 同样调用 handleClose()
}
}

const char* TcpConnection::stateToString() const
{
switch (state_)
{
case kDisconnected:
return "kDisconnected";
case kConnecting:
return "kConnecting";
case kConnected:
return "kConnected";
case kDisconnecting:
return "kDisconnecting";
default:
return "unknown state";
}
}

/* 禁用nagle算法,降低网络延迟 */
void TcpConnection::setTcpNoDelay(bool on)
{
socket_->setTcpNoDelay(on);
}

void TcpConnection::startRead()
{
loop_->runInLoop(boost::bind(&TcpConnection::startReadInLoop, this));
}

void TcpConnection::startReadInLoop()
{
loop_->assertInLoopThread();
if (!reading_ || !channel_->isReading())
{
channel_->enableReading();
reading_ = true;
}
}

void TcpConnection::stopRead()
{
loop_->runInLoop(boost::bind(&TcpConnection::stopReadInLoop, this));
}

void TcpConnection::stopReadInLoop()
{
loop_->assertInLoopThread();
if (reading_ || channel_->isReading())
{
channel_->disableReading();
reading_ = false;
}
}

/* 连接建立,TcpServer::newconnection() 调用 */
void TcpConnection::connectEstablished()
{
loop_->assertInLoopThread();
assert(state_ == kConnecting);
setState(kConnected);     // 设置状态为已建立
channel_->tie(shared_from_this());
channel_->enableReading();    // 注册读事件

/* 执行用户建立连接时的逻辑 */
connectionCallback_(shared_from_this());
}

/* 连接关闭,提供给TcpServer使用 TcpServer::removeConnectionInLoop() */
void TcpConnection::connectDestroyed()
{
loop_->assertInLoopThread();
if (state_ == kConnected)
{
setState(kDisconnected);
channel_->disableAll();     // 停止监听所有事件

/* 执行用户的关闭逻辑 */
connectionCallback_(shared_from_this());
}
channel_->remove();   // 从 epoll 中移除connfd
}

/* 处理读事件 */
void TcpConnection::handleRead(Timestamp receiveTime)
{
loop_->assertInLoopThread();
int savedErrno = 0;
/* 从fd读取数据到输入缓冲区中,返回成功读取的字节数 */
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
/* 执行消息回调函数 */
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{
/* 连接关闭,执行关闭回调函数 */
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}

/* 处理可写事件 往connfd中write */
void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
/* 如果Channel正在监听 write 事件 */
if (channel_->isWriting())
{
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0)
{
/* 从输出缓冲区中将已经发送的数据删除 */
outputBuffer_.retrieve(n);
/* 所有要发送的数据都发生完毕 */
if (outputBuffer_.readableBytes() == 0)
{
/* 停止监听fd的写事件*/
channel_->disableWriting();
/* 调用用户指定的回调 */
if (writeCompleteCallback_)
{
loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
}
if (state_ == kDisconnecting)
{
shutdownInLoop();
}
}
}
else
{
LOG_SYSERR << "TcpConnection::handleWrite";
// if (state_ == kDisconnecting)
// {
//   shutdownInLoop();
// }
}
}
else
{
LOG_TRACE << "Connection fd = " << channel_->fd()
<< " is down, no more writing";
}
}

void TcpConnection::handleClose()
{
loop_->assertInLoopThread();
LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString();
assert(state_ == kConnected || state_ == kDisconnecting);
// we don't close fd, leave it to dtor, so we can find leaks easily.
setState(kDisconnected);
channel_->disableAll();   // Channel停止监听所有事件

TcpConnectionPtr guardThis(shared_from_this());
connectionCallback_(guardThis);   // 执行用户的关闭连接逻辑
// must be the last line
/*
* closeCallback_,绑定到 TcpServer::removeConnection()
* 在里面执行connectDestroyed()
*/
closeCallback_(guardThis);
}

/* 在日志中输出错误信息 */
void TcpConnection::handleError()
{
int err = sockets::getSocketError(channel_->fd());
LOG_ERROR << "TcpConnection::handleError [" << name_
<< "] - SO_ERROR = " << err << " " << strerror_tl(err);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: