您的位置:首页 > 理论基础 > 计算机网络

完善TcpConnection(38)

2013-11-08 10:55 183 查看
完善TcpConnection

WriteCompleteCallback含义

HighWaterMarkCallback含义

boost::any context_

signal(SIGPIPE, SIG_IGN) // 请看channel class

可变类型解决方案

void*. 这种方法不是类型安全的

boost::any

boost::any

任意类型的类型安全存储以及安全的取回

在标准库容器中存放不同类型的方法,比如说vector<boost::any>


TcpConnection 完整的头文件

TcpConnection.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/ //
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.

#ifndef MUDUO_NET_TCPCONNECTION_H
#define MUDUO_NET_TCPCONNECTION_H

#include <muduo/base/Mutex.h>
#include <muduo/base/StringPiece.h>
#include <muduo/base/Types.h>
#include <muduo/net/Callbacks.h>
#include <muduo/net/Buffer.h>
#include <muduo/net/InetAddress.h>

#include <boost/any.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>

namespace muduo
{
namespace net
{

class Channel;
class EventLoop;
class Socket;

///
/// TCP connection, for both client and server usage.
///
/// This is an interface class, so don't expose too much details.
class TcpConnection : boost::noncopyable,
public boost::enable_shared_from_this<TcpConnection>
{
public:
/// Constructs a TcpConnection with a connected sockfd
///
/// User should not create this object.
TcpConnection(EventLoop* loop,
const string& name,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr);
~TcpConnection();

EventLoop* getLoop() const { return loop_; }
const string& name() const { return name_; }
const InetAddress& localAddress() { return localAddr_; }
const InetAddress& peerAddress() { return peerAddr_; }
bool connected() const { return state_ == kConnected; }

// void send(string&& message); // C++11
void send(const void* message, size_t len);
void send(const StringPiece& message);
// void send(Buffer&& message); // C++11
void send(Buffer* message);  // this one will swap data
void shutdown(); // NOT thread safe, no simultaneous calling
void setTcpNoDelay(bool on);

void setContext(const boost::any& context)
{ context_ = context; }

const boost::any& getContext() const
{ return context_; }

boost::any* getMutableContext()
{ return &context_; }

void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }

void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }

void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{ writeCompleteCallback_ = cb; }

void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
{ highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }

Buffer* inputBuffer()
{ return &inputBuffer_; }

/// Internal use only.
void setCloseCallback(const CloseCallback& cb)
{ closeCallback_ = cb; }

// called when TcpServer accepts a new connection
void connectEstablished();   // should be called only once
// called when TcpServer has removed me from its map
void connectDestroyed();  // should be called only once

private:
enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
void handleRead(Timestamp receiveTime);
void handleWrite();
void handleClose();
void handleError();
void sendInLoop(const StringPiece& message);
void sendInLoop(const void* message, size_t len);
void shutdownInLoop();
void setState(StateE s) { state_ = s; }

EventLoop* loop_;         // 所属EventLoop
string name_;             // 连接名
StateE state_;  // FIXME: use atomic variable
// we don't expose those classes to client.
boost::scoped_ptr<Socket> socket_;
boost::scoped_ptr<Channel> channel_;
InetAddress localAddr_;
InetAddress peerAddr_;
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_;     // 数据发送完毕回调函数,即所有的用户数据都已拷贝到内核缓冲区时回调该函数
// outputBuffer_被清空也会回调该函数,可以理解为低水位标回调函数
HighWaterMarkCallback highWaterMarkCallback_;     // 高水位标回调函数
CloseCallback closeCallback_;
size_t highWaterMark_;        // 高水位标
Buffer inputBuffer_;          // 应用层接收缓冲区
Buffer outputBuffer_;         // 应用层发送缓冲区
boost::any context_;          // 绑定一个未知类型的上下文对象
};

typedef boost::shared_ptr<TcpConnection> TcpConnectionPtr;

}
}

#endif  // MUDUO_NET_TCPCONNECTION_H

[/code]

TcpConnection 完整的源文件

TcpConnection.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/ //
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)

#include <muduo/net/TcpConnection.h>

#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/Socket.h>
#include <muduo/net/SocketsOps.h>

#include <boost/bind.hpp>

#include <errno.h>
#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

void muduo::net::defaultConnectionCallback(const TcpConnectionPtr& conn)
{
LOG_TRACE << conn->localAddress().toIpPort() << " -> "
<< conn->peerAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
}

void muduo::net::defaultMessageCallback(const TcpConnectionPtr&,
Buffer* buf,
Timestamp)
{
buf->retrieveAll();
}

TcpConnection::TcpConnection(EventLoop* loop,
const string& nameArg,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CHECK_NOTNULL(loop)),
name_(nameArg),
state_(kConnecting),
socket_(new Socket(sockfd)),
channel_(new Channel(loop, sockfd)),
localAddr_(localAddr),
peerAddr_(peerAddr),
highWaterMark_(64*1024*1024)
{
// 通道可读事件到来的时候,回调TcpConnection::handleRead,_1是事件发生时间
channel_->setReadCallback(
boost::bind(&TcpConnection::handleRead, this, _1));
// 通道可写事件到来的时候,回调TcpConnection::handleWrite
channel_->setWriteCallback(
boost::bind(&TcpConnection::handleWrite, this));
// 连接关闭,回调TcpConnection::handleClose
channel_->setCloseCallback(
boost::bind(&TcpConnection::handleClose, this));
// 发生错误,回调TcpConnection::handleError
channel_->setErrorCallback(
boost::bind(&TcpConnection::handleError, this));
LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
<< " fd=" << sockfd;
socket_->setKeepAlive(true);
}

TcpConnection::~TcpConnection()
{
LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
<< " fd=" << channel_->fd();
}

// 线程安全,可以跨线程调用
void TcpConnection::send(const void* data, size_t len)
{
if (state_ == kConnected)
{
if (loop_->isInLoopThread())
{
sendInLoop(data, len);
}
else
{
string message(static_cast<const char*>(data), len);
loop_->runInLoop(
boost::bind(&TcpConnection::sendInLoop,
this,
message));
}
}
}

// 线程安全,可以跨线程调用
void TcpConnection::send(const StringPiece& message)
{
if (state_ == kConnected)
{
if (loop_->isInLoopThread())
{
sendInLoop(message);
}
else
{
loop_->runInLoop(
boost::bind(&TcpConnection::sendInLoop,
this,
message.as_string()));
//std::forward<string>(message)));
}
}
}

// 线程安全,可以跨线程调用
void TcpConnection::send(Buffer* buf)
{
if (state_ == kConnected)
{
if (loop_->isInLoopThread())
{
sendInLoop(buf->peek(), buf->readableBytes());
buf->retrieveAll();
}
else
{
loop_->runInLoop(
boost::bind(&TcpConnection::sendInLoop,
this,
buf->retrieveAllAsString()));
//std::forward<string>(message)));
}
}
}

void TcpConnection::sendInLoop(const StringPiece& message)
{
sendInLoop(message.data(), message.size());
}

void TcpConnection::sendInLoop(const void* data, size_t len)
{
/*
loop_->assertInLoopThread();
sockets::write(channel_->fd(), data, len);
*/

loop_->assertInLoopThread();
ssize_t nwrote = 0;
size_t remaining = len;
bool error = false;
if (state_ == kDisconnected)
{
LOG_WARN << "disconnected, give up writing";
return;
}
// if no thing in output queue, try writing directly
// 通道没有关注可写事件并且发送缓冲区没有数据,直接write
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
remaining = len - nwrote;
// 写完了,回调writeCompleteCallback_
if (remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
}
}
else // nwrote < 0
{
nwrote = 0;
if (errno != EWOULDBLOCK)
{
LOG_SYSERR << "TcpConnection::sendInLoop";
if (errno == EPIPE) // FIXME: any others?
{
error = true;
}
}
}
}

assert(remaining <= len);
// 没有错误,并且还有未写完的数据(说明内核发送缓冲区满,要将未写完的数据添加到output buffer中)
if (!error && remaining > 0)
{
LOG_TRACE << "I am going to write more data";
size_t oldLen = outputBuffer_.readableBytes();
// 如果超过highWaterMark_(高水位标),回调highWaterMarkCallback_
//即使oldLen + remaining >= highWaterMark_ ,remain的数据也是可以存放到buffer中的,因为buffer
//自动伸缩的
if (oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)
{
loop_->queueInLoop(boost::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
if (!channel_->isWriting())
{
channel_->enableWriting();     // 关注POLLOUT事件
}
}
}

void TcpConnection::shutdown()
{
// FIXME: use compare and swap
if (state_ == kConnected)
{
setState(kDisconnecting);
// FIXME: shared_from_this()?
loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
}
}

void TcpConnection::shutdownInLoop()
{
loop_->assertInLoopThread();
if (!channel_->isWriting())
{
// we are not writing
socket_->shutdownWrite();
}
}

void TcpConnection::setTcpNoDelay(bool on)
{
socket_->setTcpNoDelay(on);
}

void TcpConnection::connectEstablished()
{
loop_->assertInLoopThread();
assert(state_ == kConnecting);
setState(kConnected);
LOG_TRACE << "[3] usecount=" << shared_from_this().use_count();
channel_->tie(shared_from_this());
channel_->enableReading(); // TcpConnection所对应的通道加入到Poller关注

connectionCallback_(shared_from_this());
LOG_TRACE << "[4] usecount=" << shared_from_this().use_count();
}

void TcpConnection::connectDestroyed()
{
loop_->assertInLoopThread();
if (state_ == kConnected)
{
setState(kDisconnected);
channel_->disableAll();

connectionCallback_(shared_from_this());
}
channel_->remove();
}

void TcpConnection::handleRead(Timestamp receiveTime)
{
/*
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
*/

/*
loop_->assertInLoopThread();
int savedErrno = 0;
char buf[65536];
ssize_t n = ::read(channel_->fd(), buf, sizeof buf);
if (n > 0)
{
messageCallback_(shared_from_this(), buf, n);
}
else if (n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
*/
loop_->assertInLoopThread();
int savedErrno = 0;
/*一次性读完内核缓冲区的数据,这可以防止busy loop*/
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{
//读到文件eof ,就是对等方已经关闭连接
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}

// 内核发送缓冲区有空间了,回调该函数
void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
/*通道有关注write事件 */
if (channel_->isWriting())
{
/*写数据到内核中*/
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0)    // 发送缓冲区已清空
{
channel_->disableWriting();      // 停止关注POLLOUT事件,以免出现busy loop
if (writeCompleteCallback_)     // 回调writeCompleteCallback_
{
// 应用层发送缓冲区被清空,就回调用writeCompleteCallback_
loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
}
/*如果状态码是kDisconnecting ,说明应用层已经发起shutdown命令了,这是最后一个数据包,
所以发完就可以关闭了。
请参照void TcpConnection::shutdownInLoop()
*/
if (state_ == kDisconnecting)   // 发送缓冲区已清空并且连接状态是kDisconnecting, 要关闭连接
{
shutdownInLoop();     // 关闭连接
}
}
else
{
LOG_TRACE << "I am going to write more data";
}
}
else
{
LOG_SYSERR << "TcpConnection::handleWrite";
// if (state_ == kDisconnecting)
// {
//   shutdownInLoop();
// }
}
}
else
{
LOG_TRACE << "Connection fd = " << channel_->fd()
<< " is down, no more writing";
}
}

void TcpConnection::handleClose()
{
loop_->assertInLoopThread();
LOG_TRACE << "fd = " << channel_->fd() << " state = " << state_;
assert(state_ == kConnected || state_ == kDisconnecting);
// we don't close fd, leave it to dtor, so we can find leaks easily.
setState(kDisconnected);
channel_->disableAll();

TcpConnectionPtr guardThis(shared_from_this());
connectionCallback_(guardThis);       // 这一行,可以不调用
LOG_TRACE << "[7] usecount=" << guardThis.use_count();
// must be the last line
closeCallback_(guardThis);    // 调用TcpServer::removeConnection
LOG_TRACE << "[11] usecount=" << guardThis.use_count();
}

void TcpConnection::handleError()
{
int err = sockets::getSocketError(channel_->fd());
LOG_ERROR << "TcpConnection::handleError [" << name_
<< "] - SO_ERROR = " << err << " " << strerror_tl(err);
}

[/code]

测试程序

#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>

#include <boost/bind.hpp>

#include <stdio.h>
/*
程序说明:
客户端请求连接-------》服务器马上发送一堆数据

*/
using namespace muduo;
using namespace muduo::net;

class TestServer
{
public:
TestServer(EventLoop* loop,
const InetAddress& listenAddr)
: loop_(loop),
server_(loop, listenAddr, "TestServer")
{
server_.setConnectionCallback(
boost::bind(&TestServer::onConnection, this, _1));
server_.setMessageCallback(
boost::bind(&TestServer::onMessage, this, _1, _2, _3));
server_.setWriteCompleteCallback(
boost::bind(&TestServer::onWriteComplete, this, _1));

// 这是一个数据生成协议
string line;
for (int i = 33; i < 127; ++i)
{
line.push_back(char(i));
}
line += line;

for (size_t i = 0; i < 127-33; ++i)
{
message_ += line.substr(i, 72) + '\n';
}
}

void start()
{
server_.start();
}

private:
void onConnection(const TcpConnectionPtr& conn)
{
if (conn->connected())
{
printf("onConnection(): new connection [%s] from %s\n",
conn->name().c_str(),
conn->peerAddress().toIpPort().c_str());

conn->setTcpNoDelay(true);
/*连接到来直接发送数据*/
conn->send(message_);
}
else
{
printf("onConnection(): connection [%s] is down\n",
conn->name().c_str());
}
}

void onMessage(const TcpConnectionPtr& conn,
Buffer* buf,
Timestamp receiveTime)
{
muduo::string msg(buf->retrieveAllAsString());
printf("onMessage(): received %zd bytes from connection [%s] at %s\n",
msg.size(),
conn->name().c_str(),
receiveTime.toFormattedString().c_str());

conn->send(msg);
}

void onWriteComplete(const TcpConnectionPtr& conn)
{ /*可写事件,在发送*/
conn->send(message_);
}

EventLoop* loop_;
TcpServer server_;

muduo::string message_;
};

int main()
{
printf("main(): pid = %d\n", getpid());

InetAddress listenAddr(8888);
EventLoop loop;

TestServer server(&loop, listenAddr);
server.start();

loop.loop();
}

[/code]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: