完善TcpConnection(38)
2013-11-08 10:55
183 查看
完善TcpConnection
WriteCompleteCallback含义
HighWaterMarkCallback含义
boost::any context_
signal(SIGPIPE, SIG_IGN) // 请看channel class
可变类型解决方案
void*. 这种方法不是类型安全的
boost::any
boost::any
任意类型的类型安全存储以及安全的取回
在标准库容器中存放不同类型的方法,比如说vector<boost::any>
[/code]
[/code]
[/code]
WriteCompleteCallback含义
HighWaterMarkCallback含义
boost::any context_
signal(SIGPIPE, SIG_IGN) // 请看channel class
可变类型解决方案
void*. 这种方法不是类型安全的
boost::any
boost::any
任意类型的类型安全存储以及安全的取回
在标准库容器中存放不同类型的方法,比如说vector<boost::any>
TcpConnection 完整的头文件
TcpConnection.h// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) // // This is a public header file, it must only include public header files. #ifndef MUDUO_NET_TCPCONNECTION_H #define MUDUO_NET_TCPCONNECTION_H #include <muduo/base/Mutex.h> #include <muduo/base/StringPiece.h> #include <muduo/base/Types.h> #include <muduo/net/Callbacks.h> #include <muduo/net/Buffer.h> #include <muduo/net/InetAddress.h> #include <boost/any.hpp> #include <boost/enable_shared_from_this.hpp> #include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> namespace muduo { namespace net { class Channel; class EventLoop; class Socket; /// /// TCP connection, for both client and server usage. /// /// This is an interface class, so don't expose too much details. class TcpConnection : boost::noncopyable, public boost::enable_shared_from_this<TcpConnection> { public: /// Constructs a TcpConnection with a connected sockfd /// /// User should not create this object. TcpConnection(EventLoop* loop, const string& name, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr); ~TcpConnection(); EventLoop* getLoop() const { return loop_; } const string& name() const { return name_; } const InetAddress& localAddress() { return localAddr_; } const InetAddress& peerAddress() { return peerAddr_; } bool connected() const { return state_ == kConnected; } // void send(string&& message); // C++11 void send(const void* message, size_t len); void send(const StringPiece& message); // void send(Buffer&& message); // C++11 void send(Buffer* message); // this one will swap data void shutdown(); // NOT thread safe, no simultaneous calling void setTcpNoDelay(bool on); void setContext(const boost::any& context) { context_ = context; } const boost::any& getContext() const { return context_; } boost::any* getMutableContext() { return &context_; } void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = cb; } void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; } void setWriteCompleteCallback(const WriteCompleteCallback& cb) { writeCompleteCallback_ = cb; } void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark) { highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; } Buffer* inputBuffer() { return &inputBuffer_; } /// Internal use only. void setCloseCallback(const CloseCallback& cb) { closeCallback_ = cb; } // called when TcpServer accepts a new connection void connectEstablished(); // should be called only once // called when TcpServer has removed me from its map void connectDestroyed(); // should be called only once private: enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting }; void handleRead(Timestamp receiveTime); void handleWrite(); void handleClose(); void handleError(); void sendInLoop(const StringPiece& message); void sendInLoop(const void* message, size_t len); void shutdownInLoop(); void setState(StateE s) { state_ = s; } EventLoop* loop_; // 所属EventLoop string name_; // 连接名 StateE state_; // FIXME: use atomic variable // we don't expose those classes to client. boost::scoped_ptr<Socket> socket_; boost::scoped_ptr<Channel> channel_; InetAddress localAddr_; InetAddress peerAddr_; ConnectionCallback connectionCallback_; MessageCallback messageCallback_; WriteCompleteCallback writeCompleteCallback_; // 数据发送完毕回调函数,即所有的用户数据都已拷贝到内核缓冲区时回调该函数 // outputBuffer_被清空也会回调该函数,可以理解为低水位标回调函数 HighWaterMarkCallback highWaterMarkCallback_; // 高水位标回调函数 CloseCallback closeCallback_; size_t highWaterMark_; // 高水位标 Buffer inputBuffer_; // 应用层接收缓冲区 Buffer outputBuffer_; // 应用层发送缓冲区 boost::any context_; // 绑定一个未知类型的上下文对象 }; typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr; } } #endif // MUDUO_NET_TCPCONNECTION_H
[/code]
TcpConnection 完整的源文件
TcpConnection.cc// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) #include <muduo/net/TcpConnection.h> #include <muduo/base/Logging.h> #include <muduo/net/Channel.h> #include <muduo/net/EventLoop.h> #include <muduo/net/Socket.h> #include <muduo/net/SocketsOps.h> #include <boost/bind.hpp> #include <errno.h> #include <stdio.h> using namespace muduo; using namespace muduo::net; void muduo::net::defaultConnectionCallback(const TcpConnectionPtr& conn) { LOG_TRACE << conn->localAddress().toIpPort() << " -> " << conn->peerAddress().toIpPort() << " is " << (conn->connected() ? "UP" : "DOWN"); } void muduo::net::defaultMessageCallback(const TcpConnectionPtr&, Buffer* buf, Timestamp) { buf->retrieveAll(); } TcpConnection::TcpConnection(EventLoop* loop, const string& nameArg, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr) : loop_(CHECK_NOTNULL(loop)), name_(nameArg), state_(kConnecting), socket_(new Socket(sockfd)), channel_(new Channel(loop, sockfd)), localAddr_(localAddr), peerAddr_(peerAddr), highWaterMark_(64*1024*1024) { // 通道可读事件到来的时候,回调TcpConnection::handleRead,_1是事件发生时间 channel_->setReadCallback( boost::bind(&TcpConnection::handleRead, this, _1)); // 通道可写事件到来的时候,回调TcpConnection::handleWrite channel_->setWriteCallback( boost::bind(&TcpConnection::handleWrite, this)); // 连接关闭,回调TcpConnection::handleClose channel_->setCloseCallback( boost::bind(&TcpConnection::handleClose, this)); // 发生错误,回调TcpConnection::handleError channel_->setErrorCallback( boost::bind(&TcpConnection::handleError, this)); LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this << " fd=" << sockfd; socket_->setKeepAlive(true); } TcpConnection::~TcpConnection() { LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this << " fd=" << channel_->fd(); } // 线程安全,可以跨线程调用 void TcpConnection::send(const void* data, size_t len) { if (state_ == kConnected) { if (loop_->isInLoopThread()) { sendInLoop(data, len); } else { string message(static_cast<const char*>(data), len); loop_->runInLoop( boost::bind(&TcpConnection::sendInLoop, this, message)); } } } // 线程安全,可以跨线程调用 void TcpConnection::send(const StringPiece& message) { if (state_ == kConnected) { if (loop_->isInLoopThread()) { sendInLoop(message); } else { loop_->runInLoop( boost::bind(&TcpConnection::sendInLoop, this, message.as_string())); //std::forward<string>(message))); } } } // 线程安全,可以跨线程调用 void TcpConnection::send(Buffer* buf) { if (state_ == kConnected) { if (loop_->isInLoopThread()) { sendInLoop(buf->peek(), buf->readableBytes()); buf->retrieveAll(); } else { loop_->runInLoop( boost::bind(&TcpConnection::sendInLoop, this, buf->retrieveAllAsString())); //std::forward<string>(message))); } } } void TcpConnection::sendInLoop(const StringPiece& message) { sendInLoop(message.data(), message.size()); } void TcpConnection::sendInLoop(const void* data, size_t len) { /* loop_->assertInLoopThread(); sockets::write(channel_->fd(), data, len); */ loop_->assertInLoopThread(); ssize_t nwrote = 0; size_t remaining = len; bool error = false; if (state_ == kDisconnected) { LOG_WARN << "disconnected, give up writing"; return; } // if no thing in output queue, try writing directly // 通道没有关注可写事件并且发送缓冲区没有数据,直接write if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { nwrote = sockets::write(channel_->fd(), data, len); if (nwrote >= 0) { remaining = len - nwrote; // 写完了,回调writeCompleteCallback_ if (remaining == 0 && writeCompleteCallback_) { loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this())); } } else // nwrote < 0 { nwrote = 0; if (errno != EWOULDBLOCK) { LOG_SYSERR << "TcpConnection::sendInLoop"; if (errno == EPIPE) // FIXME: any others? { error = true; } } } } assert(remaining <= len); // 没有错误,并且还有未写完的数据(说明内核发送缓冲区满,要将未写完的数据添加到output buffer中) if (!error && remaining > 0) { LOG_TRACE << "I am going to write more data"; size_t oldLen = outputBuffer_.readableBytes(); // 如果超过highWaterMark_(高水位标),回调highWaterMarkCallback_ //即使oldLen + remaining >= highWaterMark_ ,remain的数据也是可以存放到buffer中的,因为buffer //自动伸缩的 if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_) { loop_->queueInLoop(boost::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining)); } outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining); if (!channel_->isWriting()) { channel_->enableWriting(); // 关注POLLOUT事件 } } } void TcpConnection::shutdown() { // FIXME: use compare and swap if (state_ == kConnected) { setState(kDisconnecting); // FIXME: shared_from_this()? loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this)); } } void TcpConnection::shutdownInLoop() { loop_->assertInLoopThread(); if (!channel_->isWriting()) { // we are not writing socket_->shutdownWrite(); } } void TcpConnection::setTcpNoDelay(bool on) { socket_->setTcpNoDelay(on); } void TcpConnection::connectEstablished() { loop_->assertInLoopThread(); assert(state_ == kConnecting); setState(kConnected); LOG_TRACE << "[3] usecount=" << shared_from_this().use_count(); channel_->tie(shared_from_this()); channel_->enableReading(); // TcpConnection所对应的通道加入到Poller关注 connectionCallback_(shared_from_this()); LOG_TRACE << "[4] usecount=" << shared_from_this().use_count(); } void TcpConnection::connectDestroyed() { loop_->assertInLoopThread(); if (state_ == kConnected) { setState(kDisconnected); channel_->disableAll(); connectionCallback_(shared_from_this()); } channel_->remove(); } void TcpConnection::handleRead(Timestamp receiveTime) { /* loop_->assertInLoopThread(); int savedErrno = 0; ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); if (n > 0) { messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); } else if (n == 0) { handleClose(); } else { errno = savedErrno; LOG_SYSERR << "TcpConnection::handleRead"; handleError(); } */ /* loop_->assertInLoopThread(); int savedErrno = 0; char buf[65536]; ssize_t n = ::read(channel_->fd(), buf, sizeof buf); if (n > 0) { messageCallback_(shared_from_this(), buf, n); } else if (n == 0) { handleClose(); } else { errno = savedErrno; LOG_SYSERR << "TcpConnection::handleRead"; handleError(); } */ loop_->assertInLoopThread(); int savedErrno = 0; /*一次性读完内核缓冲区的数据,这可以防止busy loop*/ ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); if (n > 0) { messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); } else if (n == 0) { //读到文件eof ,就是对等方已经关闭连接 handleClose(); } else { errno = savedErrno; LOG_SYSERR << "TcpConnection::handleRead"; handleError(); } } // 内核发送缓冲区有空间了,回调该函数 void TcpConnection::handleWrite() { loop_->assertInLoopThread(); /*通道有关注write事件 */ if (channel_->isWriting()) { /*写数据到内核中*/ ssize_t n = sockets::write(channel_->fd(), outputBuffer_.peek(), outputBuffer_.readableBytes()); if (n > 0) { outputBuffer_.retrieve(n); if (outputBuffer_.readableBytes() == 0) // 发送缓冲区已清空 { channel_->disableWriting(); // 停止关注POLLOUT事件,以免出现busy loop if (writeCompleteCallback_) // 回调writeCompleteCallback_ { // 应用层发送缓冲区被清空,就回调用writeCompleteCallback_ loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this())); } /*如果状态码是kDisconnecting ,说明应用层已经发起shutdown命令了,这是最后一个数据包, 所以发完就可以关闭了。 请参照void TcpConnection::shutdownInLoop() */ if (state_ == kDisconnecting) // 发送缓冲区已清空并且连接状态是kDisconnecting, 要关闭连接 { shutdownInLoop(); // 关闭连接 } } else { LOG_TRACE << "I am going to write more data"; } } else { LOG_SYSERR << "TcpConnection::handleWrite"; // if (state_ == kDisconnecting) // { // shutdownInLoop(); // } } } else { LOG_TRACE << "Connection fd = " << channel_->fd() << " is down, no more writing"; } } void TcpConnection::handleClose() { loop_->assertInLoopThread(); LOG_TRACE << "fd = " << channel_->fd() << " state = " << state_; assert(state_ == kConnected || state_ == kDisconnecting); // we don't close fd, leave it to dtor, so we can find leaks easily. setState(kDisconnected); channel_->disableAll(); TcpConnectionPtr guardThis(shared_from_this()); connectionCallback_(guardThis); // 这一行,可以不调用 LOG_TRACE << "[7] usecount=" << guardThis.use_count(); // must be the last line closeCallback_(guardThis); // 调用TcpServer::removeConnection LOG_TRACE << "[11] usecount=" << guardThis.use_count(); } void TcpConnection::handleError() { int err = sockets::getSocketError(channel_->fd()); LOG_ERROR << "TcpConnection::handleError [" << name_ << "] - SO_ERROR = " << err << " " << strerror_tl(err); }
[/code]
测试程序
#include <muduo/net/TcpServer.h> #include <muduo/net/EventLoop.h> #include <muduo/net/InetAddress.h> #include <boost/bind.hpp> #include <stdio.h> /* 程序说明: 客户端请求连接-------》服务器马上发送一堆数据 */ using namespace muduo; using namespace muduo::net; class TestServer { public: TestServer(EventLoop* loop, const InetAddress& listenAddr) : loop_(loop), server_(loop, listenAddr, "TestServer") { server_.setConnectionCallback( boost::bind(&TestServer::onConnection, this, _1)); server_.setMessageCallback( boost::bind(&TestServer::onMessage, this, _1, _2, _3)); server_.setWriteCompleteCallback( boost::bind(&TestServer::onWriteComplete, this, _1)); // 这是一个数据生成协议 string line; for (int i = 33; i < 127; ++i) { line.push_back(char(i)); } line += line; for (size_t i = 0; i < 127-33; ++i) { message_ += line.substr(i, 72) + '\n'; } } void start() { server_.start(); } private: void onConnection(const TcpConnectionPtr& conn) { if (conn->connected()) { printf("onConnection(): new connection [%s] from %s\n", conn->name().c_str(), conn->peerAddress().toIpPort().c_str()); conn->setTcpNoDelay(true); /*连接到来直接发送数据*/ conn->send(message_); } else { printf("onConnection(): connection [%s] is down\n", conn->name().c_str()); } } void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp receiveTime) { muduo::string msg(buf->retrieveAllAsString()); printf("onMessage(): received %zd bytes from connection [%s] at %s\n", msg.size(), conn->name().c_str(), receiveTime.toFormattedString().c_str()); conn->send(msg); } void onWriteComplete(const TcpConnectionPtr& conn) { /*可写事件,在发送*/ conn->send(message_); } EventLoop* loop_; TcpServer server_; muduo::string message_; }; int main() { printf("main(): pid = %d\n", getpid()); InetAddress listenAddr(8888); EventLoop loop; TestServer server(&loop, listenAddr); server.start(); loop.loop(); }
[/code]
相关文章推荐
- 关于HttpURLConnection的TCP连接
- Android与服务器通信的方法之一(TCP)效率高安全性完善
- RT:How HTTP use TCP connection
- TCP连接复用(TCP Connection Reuse)
- 到主机 的 TCP/IP 连接失败。 java.net.ConnectException: Connection refused: connect
- WebSocket-a protocol providing full-duplex communications channels over a single TCP connection
- Go丨语言对数据库操作报错 panic: dial tcp 127.0.0.1:3306: connectex: No connection could be made because the target machine actively refused
- Summary_异常- Exception in thread "RMI TCP Connection(idle)" java.lang.OutOfMemoryError: PermGen space
- failed to open gcomm backend connection: 13: error while trying to listen 'tcp:/
- C#实现SMTP服务器,使用TCP命令实现,功能比较完善
- 使用pb的connection对象建立TCP连接
- TCP连接复用(TCP Connection Reuse)
- Muduo之TcpConnection源码分析笔记
- linux上TCP connection timeout的原因查找
- 通用通信模型的TCP服务端实现完善
- tomcat 运行异常Cannot create PoolableConnectionFactory (到主机 的 TCP/IP 联接失败)(用户sa登录失败)
- get value from agent failed: ZBX_TCP_READ() failed;[104] connection reset by peer
- tcp connection setup的实现(一)
- 严重 [RMI TCP Connection(3)-127.0.0.1]
- Exception in thread "RMI TCP Connection(idle)" java.lang.OutOfMemoryError: PermGen space