[置顶] Muduo网络库源码分析之TcpConnection Class
2018-01-17 15:04
411 查看
用于管理一个具体的 TCP 连接,比如消息的接收与发送,完成用户指定的连接回调
TcpConnection 有四个状态,简单的状态图:
![](https://img-blog.csdn.net/20180117143240602?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvVGFuc3dlcl8=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
TcpConnection 有一系列用户指定的事件回调函数,比如
与上面对应,TcpConnection 有一系列 用于 TcpServer 为连接指定事件 callback 的函数,比如
TcpConncetion 中还有一系列函数是处理 sockfd 上的事件回调函数,比如
TcpConnection 中还有两个函数是给 TcpServer 使用的。
send 一系列函数是可以用户或者其他线程调用,用于发送信息。如果不是在IO线程,它会把实际工作转移到IO线程调用。首先检查 TcpConnection 对应的 Socket 是否注册了可写事件,若注册了可写事件表明输出缓冲区 outputBuffer_中已经有数据等待发送,为了保证不乱序,这次的数据只要追加到输出缓冲区中,通过
使用 epoll 的 LT 模式,当 socket 可写时,会不停的触发 socket 的可写事件,这个时候如何解决?第一种方式:需要向 socket 写数据时,注册此 socket 的可写事件,接收到可写事件后,然后调用 write/send 写数据,当所有数据都写完后,立刻停止观察 writable 事件,避免 busy loop。。第二种,需要发送数据时,直接调用 write/send 写,如果没有发送完,那么开是监听此 socket 的 writable 事件,然后接收到可写事件后,调用 write/send 发送数据,当所有数据都写完后,立刻停止观察 writable 事件,避免 busy loop。 muduo 采用的 LT 模式,就是用的第二种解决这个问题。
此外 TcpConnection 还有几个小功能,比如
Buffer 类的设计后面会分析。
TcpConnection.h
TcpConnection.cc
connectionCallback。
TcpConnection构造时接收参数有 TCP 连接的 sockfd,服务端地址
localAddr,客户端地址
peerAddr,并通过 Socket 封装
sockfd。并用 Channel 管理该 sockfd,向 Channel 注册可读、可写、关闭、出错回调函数,用于 Poller 返回就绪事件后
Channel::handleEvent()执行相应事件的回调。
TcpConnection 有四个状态,简单的状态图:
TcpConnection 有一系列用户指定的事件回调函数,比如
TcpConnection::connectionCallback、
messageCallback、
writeCompleteCallback,这些是用户通过 TcpServer 传给 TcpConnection。当 Poller 返回 TcpConnection 对应的 Socket 就绪事件时,
Channel::handleEvent()->
TcpConnection::handle*系列函数-> 这些事件回调函数(如 connectionCallback)。
与上面对应,TcpConnection 有一系列 用于 TcpServer 为连接指定事件 callback 的函数,比如
TcpConnection::setConnectionCallback、
setCloseCallback等等,是在
TcpServer::newConncetion()中创建一个 TcpConnection 对象并将用户指定的回调函数(上段说的)通过
TcpConnection::set*Callback函数传递给 TcpConnection。
TcpConncetion 中还有一系列函数是处理 sockfd 上的事件回调函数,比如
TcpConnection::handleRead()是在Poller 返回可读事件时由
Channel::handleEvent()调用的,类似的还有
handleWrite()、
handleClose()、
handleError(),这些函数会调用用户指定的事件回调函数(上上段说的),比如
TcpConnection::handleRead() -> messageCallback()。这些事件回调函数是在 TcpConnection 构造时创建完 sockfd 对应的 Channel 后通过
Channel::set*Callback系列函数注册的。
TcpConnection::handleRead():当连接对应的 sockfd 有可读事件发生时调用,主要是将数据读到 Buffer 中,执行消息回调函数
messageCallback_()。
TcpConnection::handleWrite():当连接对应的 sockfd 有可写事件发生时调用,主要是将 Buffer 中的数据发送出去,如果一次性发送完毕,则执行用户指定的回调
writeCompleteCallback_(),若一次没有发送完, muduo 采用 LT 模式, 会反复触发可写事件,下次还有机会发送剩下的数据。
TcpConnection::handleClose():主要执行
Channel::disableAll()和
TcpConnection::closeCallback()。
TcpConnection::handleError():主要是在日志中输出错误信息。
TcpConnection::closeCallback()不是给普通用户用的,这个是给 TcpServer 和 TcpClient 用的,用于通知它们移除所持有的 TcpConnectionPtr。它绑定的是
TcpServer::removeConnection(),通过下面关系:Accetptor 接受一个 Tcp 连接时
Channel::handleEvent() -> Acceptor::handleRead() -> TcpServer::newConnection()中新建一个 TcpConnection 并通过
TcpConnection::setCloseCallback(bind(&TcpServer::removeConnectionCallback, this, _1))。 这个回调什么时候被调用呢?当
TcpConnection::handleRead()中 read 返回0,或者 sockfd 发生 POLLHUP 事件就绪时,都会调用到
TcpConnection::handleClose() -> TcpConnection::closeCallback_() -> TcpServer::removeConnection()。
TcpConnection 中还有两个函数是给 TcpServer 使用的。
TcpConnection::connectEstablishd(),是
TcpServer::newConnection()创建完 TcpConnection 对象,设置好回调函数之后调用的,主要是调用
Channel::enableReading()将 TcpConnection 对应的 sockfd 注册读事件,然后执行用户指定的
connectionCallback_(),并将 TcpConnection 状态置为 kConnected。这个函数如何被执行呢? Acceptor 持有的 lfd 有可读事件发生,即有连接请求,此时
Channel::handleEvent() -> Acceptor::handleRead() -> TcpServer::newConnection() -> ……->TcpConnection::connectEstablished()。中间省略的部分是线程转移,转移到TcpConnection 所在的 IO 线程执行,TcpServer 和 TcpConnection 可能不在同一线程。
TcpConnection::connectDestroyed(),这是 TcpConnection 析构前最后调用的一个成员函数,它通知用户连接已断开。主要功能是设置 TcpConnection 的状态为 kDisconnected;停止监听所有事件;调用
connectionCallback_()执行用户指定的回调;从 epoll 监听的 fd 中移除 TcpConnection 对应的 sockfd。那什么时候调用到这个函数呢?一个是 TcpServer 析构时,一般情况下是经由
TcpConnection::handleClose() -> TcpConnection::closeCallback_() -> TcpServer::removeConnection() -> ……->TcpConnection::connectDestroyed()。
send 一系列函数是可以用户或者其他线程调用,用于发送信息。如果不是在IO线程,它会把实际工作转移到IO线程调用。首先检查 TcpConnection 对应的 Socket 是否注册了可写事件,若注册了可写事件表明输出缓冲区 outputBuffer_中已经有数据等待发送,为了保证不乱序,这次的数据只要追加到输出缓冲区中,通过
Channel::handleEvent() -> TcpConnection::handleWrite()来发送。如果Socket 没有注册可写事件,输出缓冲区没有数据,那么这次的消息可以直接通过 write 发送,如果没有一次性发送完毕,那么 message 剩余的数据仍然要 append 到 outputBuffer 中,并向 Poller 注册可写事件,当 socket 变得可写时,Channel 会调用
TcpConnection::handleWrite()来发送 outputBuffer_ 中堆积的数据,发送完毕后立刻停止监听可写事件,避免 busy loop。无论是
sendInLoop() -> write()还是
Channel::handleEvent() -> handleWrite(),只要确定发送完 message 或者 outputBuffer_ 中的数据,那么都要调用用户指定的回调
writeCompleteCallback()。
使用 epoll 的 LT 模式,当 socket 可写时,会不停的触发 socket 的可写事件,这个时候如何解决?第一种方式:需要向 socket 写数据时,注册此 socket 的可写事件,接收到可写事件后,然后调用 write/send 写数据,当所有数据都写完后,立刻停止观察 writable 事件,避免 busy loop。。第二种,需要发送数据时,直接调用 write/send 写,如果没有发送完,那么开是监听此 socket 的 writable 事件,然后接收到可写事件后,调用 write/send 发送数据,当所有数据都写完后,立刻停止观察 writable 事件,避免 busy loop。 muduo 采用的 LT 模式,就是用的第二种解决这个问题。
此外 TcpConnection 还有几个小功能,比如
TcpConnection::setTcpNoDelay()和
TcpConnection::setKeepAlive()。TCP No Delay 和 TCP keepalive 都是常用的 TCP 选项,前者的作用是禁止 Nagle 算法,避免连续发包出现延迟,这对编写低延迟网络服务很重要。后者的作用是定期探查 TCP 连接是否还存在,一般来说如果有应用层心跳的话,TCP keepalive 不是必须的。
Buffer 类的设计后面会分析。
TcpConnection.h
#ifndef MUDUO_NET_TCPCONNECTION_H #define MUDUO_NET_TCPCONNECTION_H #include <muduo/base/StringPiece.h> #include <muduo/base/Types.h> #include <muduo/net/Callbacks.h> #include <muduo/net/Buffer.h> #include <muduo/net/InetAddress.h> #include <boost/any.hpp> #include <boost/enable_shared_from_this.hpp> #include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> // struct tcp_info is in <netinet/tcp.h> struct tcp_info; namespace muduo { namespace net { class Channel; class EventLoop; class Socket; /// /// TCP connection, for both client and server usage. /// /// This is an interface class, so don't expose too much details. class TcpConnection : boost::noncopyable, public boost::enable_shared_from_this<TcpConnection> { public: /// Constructs a TcpConnection with a connected sockfd /// /// User should not create this object. TcpConnection(EventLoop* loop, const string& name, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr); ~TcpConnection(); EventLoop* getLoop() const { return loop_; } const string& name() const { return name_; } const InetAddress& localAddress() const { return localAddr_; } const InetAddress& peerAddress() const { return peerAddr_; } bool connected() const { return state_ == kConnected; } bool disconnected() const { return state_ == kDisconnected; } // return true if success. bool getTcpInfo(struct tcp_info*) const; string getTcpInfoString() const; // void send(string&& message); // C++11 void send(const void* message, int len); void send(const StringPiece& message); // void send(Buffer&& message); // C++11 void send(Buffer* message); // this one will swap data void shutdown(); // NOT thread safe, no simultaneous calling // void shutdownAndForceCloseAfter(double seconds); // NOT thread safe, no simultaneous calling void forceClose(); void forceCloseWithDelay(double seconds); void setTcpNoDelay(bool on); // reading or not void startRead(); void stopRead(); bool isReading() const { return reading_; }; // NOT thread safe, may race with start/stopReadInLoop /* 设置TCP上下文 */ void setContext(const boost::any& context) { context_ = context; } /* 获取TCP上下文 */ const boost::any& getContext() const { return context_; } boost::any* getMutableContext() { return &context_; } /* 设置连接建立时的回调函数 */ void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = cb; } /* 设置消息到来的回调函数 */ void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; } /* 设置成功将数据写入对方时的回调函数 */ void setWriteCompleteCallback(const WriteCompleteCallback& cb) { writeCompleteCallback_ = cb; } /* 设置高水位回调函数和高水位值,当缓冲区的size达到highWaterMark时触发此请求 */ void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark) { highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; } /// Advanced interface /* 返回输入缓冲区和输出缓冲区指针 */ Buffer* inputBuffer() { return &inputBuffer_; } Buffer* outputBuffer() { return &outputBuffer_; } /// Internal use only. /* 设置TCP关闭的回调函数,仅在内部使用,用于移除持有的TcpConnectionPtr */ void setCloseCallback(const CloseCallback& cb) { closeCallback_ = cb; } // called when TcpServer accepts a new connection /* TcpServer使用,创建完一个新的连接后调用 */ void connectEstablished(); // should be called only once // called when TcpServer has removed me from its map /* TcpServer使用,从 map 中删除掉时调用 */ void connectDestroyed(); // should be called only once private: /* TcpConnection 有四种状态,从左到右依次是:已断开 正在连接 已连接 正在断开 */ enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting }; /* 处理 connfd 上的事件回调函数 */ void handleRead(Timestamp receiveTime); // 处理可读事件 void handleWrite(); // 处理可写事件 void handleClose(); // 处理关闭事件 void handleError(); // 处理错误事件 /* 通过线程转移操作实现安全发送消息 */ // void sendInLoop(string&& message); void sendInLoop(const StringPiece& message); void sendInLoop(const void* message, size_t len); /* 通过线程转移操作实现安全关闭TCP连接 */ void shutdownInLoop(); // void shutdownAndForceCloseInLoop(double seconds); /* 主动关闭连接 */ void forceCloseInLoop(); /* 设置TCP连接的状态 */ void setState(StateE s) { state_ = s; } const char* stateToString() const; void startReadInLoop(); void stopReadInLoop(); EventLoop* loop_; // 处理该TCP连接的EventLoop,该EventLoop内部的epoll监听TCP连接对应的fd const string name_; // TCP连接的名字 StateE state_; // FIXME: use atomic variable // 本条TCP连接的状态 bool reading_; // // we don't expose those classes to client. boost::scoped_ptr<Socket> socket_; // TCP连接的fd所在的socket对象,fd的关闭由它决定 boost::scoped_ptr<Channel> channel_; // TCP连接的fd 对应的 Channel const InetAddress localAddr_; // TCP连接本地的地址:ip port const InetAddress peerAddr_; // TCP连接对方的地址:ip port ConnectionCallback connectionCallback_; // 连接建立时的回调函数 MessageCallback messageCallback_; // 收到消息时的回调函数 WriteCompleteCallback writeCompleteCallback_; // 消息写入对方缓冲区时的回调函数 HighWaterMarkCallback highWaterMarkCallback_; // 高水位回调函数 CloseCallback closeCallback_; // 关闭TCP连接的回调函数 size_t highWaterMark_; // 高水位标记 Buffer inputBuffer_; // TCP连接的输入缓冲区,从连接中读取输入然后存入这里 Buffer outputBuffer_; // TCP连接的输出缓冲区,要发送的数据保存在这里 // FIXME: use list<Buffer> as output buffer. boost::any context_; // TCP连接的上下文,一般用于处理多次消息相互存在关联的情形,例如文件发送 // FIXME: creationTime_, lastReceiveTime_ // bytesReceived_, bytesSent_ }; /* 使用 shared_ptr 来管理 TCP 连接对象的生存周期 */ typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr; } } #endif // MUDUO_NET_TCPCONNECTION_H
TcpConnection.cc
#include <muduo/net/TcpConnection.h> #include <muduo/base/Logging.h> #include <muduo/base/WeakCallback.h> #include <muduo/net/Channel.h> #include <muduo/net/EventLoop.h> #include <muduo/net/Socket.h> #include <muduo/net/SocketsOps.h> #include <boost/bind.hpp> #include <errno.h> using namespace muduo; using namespace muduo::net; /* 默认的当连接建立和断开时的回调函数 */ void muduo::net::defaultConnectionCallback(const TcpConnectionPtr& conn) { LOG_TRACE << conn->localAddress().toIpPort() << " -> " << conn->peerAddress().toIpPort() << " is " << (conn->connected() ? "UP" : "DOWN"); // do not call conn->forceClose(), because some users want to register message callback only. } /* 默认的收消息的回调函数 */ void muduo::net::defaultMessageCallback(const TcpConnectionPtr&, Buffer* buf, Timestamp) { buf->retrieveAll(); // 默认取出缓冲区中所有数据 } TcpConnection::TcpConnection(EventLoop* loop, const string& nameArg, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr) : loop_(CHECK_NOTNULL(loop)), name_(nameArg), // connection 的名字 state_(kConnecting), // 初始状态是 正在连接 reading_(true), socket_(new Socket(sockfd)), // 创建sockfd 对应的Socket channel_(new Channel(loop, sockfd)), // 创建sockfd对应的Channel localAddr_(localAddr), peerAddr_(peerAddr), highWaterMark_(64*1024*1024) // 高水位默认是64K { /* 注册事件的回调函数,当对应事件发生时会调用 */ channel_->setReadCallback( boost::bind(&TcpConnection::handleRead, this, _1)); channel_->setWriteCallback( boost::bind(&TcpConnection::handleWrite, this)); channel_->setCloseCallback( boost::bind(&TcpConnection::handleClose, this)); channel_->setErrorCallback( boost::bind(&TcpConnection::handleError, this)); LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this << " fd=" << sockfd; socket_->setKeepAlive(true); } TcpConnection::~TcpConnection() { LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this << " fd=" << channel_->fd() << " state=" << stateToString(); assert(state_ == kDisconnected); } bool TcpConnection::getTcpInfo(struct tcp_info* tcpi) const { return socket_->getTcpInfo(tcpi); } string TcpConnection::getTcpInfoString() const { char buf[1024]; buf[0] = '\0'; socket_->getTcpInfoString(buf, sizeof buf); return buf; } void TcpConnection::send(const void* data, int len) { send(StringPiece(static_cast<const char*>(data), len)); } void TcpConnection::send(const StringPiece& message) { if (state_ == kConnected) { /* 如果是在loop线程,就直接发送数据 */ if (loop_->isInLoopThread()) { sendInLoop(message); } else { /* 否则,转移到loop线程执行 */ loop_->runInLoop( boost::bind(&TcpConnection::sendInLoop, this, // FIXME message.as_string())); //std::forward<string>(message))); } } } // FIXME efficiency!!! void TcpConnection::send(Buffer* buf) { if (state_ == kConnected) { if (loop_->isInLoopThread()) { sendInLoop(buf->peek(), buf->readableBytes()); buf->retrieveAll(); } else { loop_->runInLoop( boost::bind(&TcpConnection::sendInLoop, this, // FIXME buf->retrieveAllAsString())); //std::forward<string>(message))); } } } void TcpConnection::sendInLoop(const StringPiece& message) { sendInLoop(message.data(), message.size()); } /* * 先尝试直接发送数据,如果一次发送完毕就不会启用WriteCallback * * 如果只发送了部分数据,则把剩余数据放入输出缓冲区, * 并关注writable事件,以后在handleWrite()中发送剩余的数据 * * 如果当前输出缓冲区已经有待发送的数据,那么就不能先尝试发送了 * 会造成数据乱序 */ void TcpConnection::sendInLoop(const void* data, size_t len) { /* 断言确保是在loop线程 */ loop_->assertInLoopThread(); ssize_t nwrote = 0; size_t remaining = len; bool faultError = false; /* 如果连接关闭,那么就放弃写数据 */ if (state_ == kDisconnected) { LOG_WARN << "disconnected, give up writing"; return; } // if no thing in output queue, try writing directly /* 如果输出缓冲区中没有数据,可以直接往fd写数据,不会乱序 */ if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { /* 往fd发送数据 */ nwrote = sockets::write(channel_->fd(), data, len); /* 发送成功,看是否还有剩余数据 */ if (nwrote >= 0) { /* 计算剩余量 */ remaining = len - nwrote; /* 全部发送完了,执行writeCompleteCallback_回调 */ if (remaining == 0 && writeCompleteCallback_) { loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this())); } } /* 发送 失败 */ else // nwrote < 0 { nwrote = 0; if (errno != EWOULDBLOCK) { LOG_SYSERR << "TcpConnection::sendInLoop"; if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others? { faultError = true; } } } } // end if assert(remaining <= len); if (!faultError && remaining > 0) { /* 输出缓冲区的剩余字节 */ size_t oldLen = outputBuffer_.readableBytes(); /* 如果现在需要发送的字节数达到高水位,之前没有达到 */ if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_) { /* 在 loop 线程中执行高水位回调函数 */ loop_->queueInLoop(boost::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining)); } /* 将未发送的data中的数据放入输出缓冲区 */ outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining); /* 如果对应的Channel没有在监听write事件 */ if (!channel_->isWriting()) { /* 注册可写事件 */ channel_->enableWriting(); } } } void TcpConnection::shutdown() { // FIXME: use compare and swap if (state_ == kConnected) // 执行shutdown必须是在连接正常的情况下 { setState(kDisconnecting); // FIXME: shared_from_this()? loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this)); } } void TcpConnection::shutdownInLoop() { loop_->assertInLoopThread(); // 在IO线程中执行 if (!channel_->isWriting()) // 当前没有监听写事件 { // we are not writing socket_->shutdownWrite(); // 则关闭写端 } } /* 主动关闭连接 */ void TcpConnection::forceClose() { // FIXME: use compare and swap if (state_ == kConnected || state_ == kDisconnecting) { setState(kDisconnecting); loop_->queueInLoop(boost::bind(&TcpConnection::forceCloseInLoop, shared_from_this())); } } void TcpConnection::forceCloseWithDelay(double seconds) { if (state_ == kConnected || state_ == kDisconnecting) { setState(kDisconnecting); loop_->runAfter( seconds, makeWeakCallback(shared_from_this(), &TcpConnection::forceClose)); // not forceCloseInLoop to avoid race condition } } void TcpConnection::forceCloseInLoop() { loop_->assertInLoopThread(); if (state_ == kConnected || state_ == kDisconnecting) { // as if we received 0 byte in handleRead(); handleClose(); // 同样调用 handleClose() } } const char* TcpConnection::stateToString() const { switch (state_) { case kDisconnected: return "kDisconnected"; case kConnecting: return "kConnecting"; case kConnected: return "kConnected"; case kDisconnecting: return "kDisconnecting"; default: return "unknown state"; } } /* 禁用nagle算法,降低网络延迟 */ void TcpConnection::setTcpNoDelay(bool on) { socket_->setTcpNoDelay(on); } void TcpConnection::startRead() { loop_->runInLoop(boost::bind(&TcpConnection::startReadInLoop, this)); } void TcpConnection::startReadInLoop() { loop_->assertInLoopThread(); if (!reading_ || !channel_->isReading()) { channel_->enableReading(); reading_ = true; } } void TcpConnection::stopRead() { loop_->runInLoop(boost::bind(&TcpConnection::stopReadInLoop, this)); } void TcpConnection::stopReadInLoop() { loop_->assertInLoopThread(); if (reading_ || channel_->isReading()) { channel_->disableReading(); reading_ = false; } } /* 连接建立,TcpServer::newconnection() 调用 */ void TcpConnection::connectEstablished() { loop_->assertInLoopThread(); assert(state_ == kConnecting); setState(kConnected); // 设置状态为已建立 channel_->tie(shared_from_this()); channel_->enableReading(); // 注册读事件 /* 执行用户建立连接时的逻辑 */ connectionCallback_(shared_from_this()); } /* 连接关闭,提供给TcpServer使用 TcpServer::removeConnectionInLoop() */ void TcpConnection::connectDestroyed() { loop_->assertInLoopThread(); if (state_ == kConnected) { setState(kDisconnected); channel_->disableAll(); // 停止监听所有事件 /* 执行用户的关闭逻辑 */ connectionCallback_(shared_from_this()); } channel_->remove(); // 从 epoll 中移除connfd } /* 处理读事件 */ void TcpConnection::handleRead(Timestamp receiveTime) { loop_->assertInLoopThread(); int savedErrno = 0; /* 从fd读取数据到输入缓冲区中,返回成功读取的字节数 */ ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); if (n > 0) { /* 执行消息回调函数 */ messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); } else if (n == 0) { /* 连接关闭,执行关闭回调函数 */ handleClose(); } else { errno = savedErrno; LOG_SYSERR << "TcpConnection::handleRead"; handleError(); } } /* 处理可写事件 往connfd中write */ void TcpConnection::handleWrite() { loop_->assertInLoopThread(); /* 如果Channel正在监听 write 事件 */ if (channel_->isWriting()) { ssize_t n = sockets::write(channel_->fd(), outputBuffer_.peek(), outputBuffer_.readableBytes()); if (n > 0) { /* 从输出缓冲区中将已经发送的数据删除 */ outputBuffer_.retrieve(n); /* 所有要发送的数据都发生完毕 */ if (outputBuffer_.readableBytes() == 0) { /* 停止监听fd的写事件*/ channel_->disableWriting(); /* 调用用户指定的回调 */ if (writeCompleteCallback_) { loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this())); } if (state_ == kDisconnecting) { shutdownInLoop(); } } } else { LOG_SYSERR << "TcpConnection::handleWrite"; // if (state_ == kDisconnecting) // { // shutdownInLoop(); // } } } else { LOG_TRACE << "Connection fd = " << channel_->fd() << " is down, no more writing"; } } void TcpConnection::handleClose() { loop_->assertInLoopThread(); LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString(); assert(state_ == kConnected || state_ == kDisconnecting); // we don't close fd, leave it to dtor, so we can find leaks easily. setState(kDisconnected); channel_->disableAll(); // Channel停止监听所有事件 TcpConnectionPtr guardThis(shared_from_this()); connectionCallback_(guardThis); // 执行用户的关闭连接逻辑 // must be the last line /* * closeCallback_,绑定到 TcpServer::removeConnection() * 在里面执行connectDestroyed() */ closeCallback_(guardThis); } /* 在日志中输出错误信息 */ void TcpConnection::handleError() { int err = sockets::getSocketError(channel_->fd()); LOG_ERROR << "TcpConnection::handleError [" << name_ << "] - SO_ERROR = " << err << " " << strerror_tl(err); }
相关文章推荐
- Muduo网络库源码分析(六)TcpConnection 的生存期管理
- Muduo网络库源码分析(六)TcpConnection 的生存期管理
- [置顶] Muduo网络库源码分析之Acceptor和TcpServer
- muduo源码分析之TcpConnection发送数据
- Muduo之TcpConnection源码分析笔记
- 开源框架源码分析:网速监听—facebook/network-connection-class
- [置顶] Muduo网络库源码分析之对socket及其相关操作的封装
- [置顶] Muduo网络库源码分析之定时器的实现
- muduo源码分析--事件回调层次是怎么传递的Tcpserver Channel TcpConnection
- [置顶] Muduo网络库源码分析之Reactor模式的关键结构
- [置顶] android源码分析——由SetContentView串起来的布局加载机制
- [置顶] Spark 2.1.0 大数据平台源码分析:章节序列
- jQuery源码解读之addClass()方法分析
- hadoop job.class 源码分析
- [置顶] 45-总结-【cartographer源码分析】系列的第五部分【kalman_filter】
- [Muduo网络库源码分析] (8) base/StringPiece.h_字符串参数传递类型
- 源码分析RocketMQ消息过滤机制下篇-FilterServer、ClassFilter模式详解
- [置顶] 使用java命令运行class文件提示“错误:找不到或无法加载主类“的问题分析
- MyBatis-3.4.2-源码分析18:XML解析之RoleMapper userMapper = sqlSession.getMapper(RoleMapper.class)
- 基于TCP网络通信的自动升级程序源码分析-客户端接收文件