您的位置:首页 > 理论基础 > 计算机网络

muduo库阅读(40)——Net部分:TCP服务器TcpServer

2015-11-12 15:12 555 查看
TcpServer的工作流程如下:

1、根据构造函数传递进来的Acceptor的EventLoop和地址等信息进行初始化

2、创建Acceptor对象

3、把Accptor的EventLoop作为参数,创建EventLoop线程池,此时Acceptor的EventLoop就是线程池的基础EventLoop,就算线程池设置的线程数量为0,线程池中也还有一个基础的EventLoop。其中线程个数的意义如下:

4、为Acceptor设置新连接到来的回调函数

5、开始监听

6、新连接到来的时候,处理流程如下:

6.1、从线程池中取出一个EventLoop对象,如果线程池的的线程数量(每创建一个线程就意味着多创建一个EventLoop)为0,那么就返回基础的EventLoop(即Accptor的EventLoop)。新的连接就交给这个EventLoop来管理。

6.2、计算新连接的id和名字

6.3、利用步骤6.1返回的EventLoop对象创建一个TcpConnection对象

6.4、存储TcpConnection对象

6.5、为TcpConnection对象设置用户的回调函数:连接到来的回调函数、数据到来的回调函数、写完成的回调函数、关闭连接的回调函数

6.6、调用TcpConnection::connectEstablished表示链接建立完毕

/*
* TCP服务器
*/
namespace muduo
{
namespace net
{

class Acceptor;
class EventLoop;
class EventLoopThreadPool;

///
/// TCP server, supports single-threaded and thread-pool models.
///
/// This is an interface class, so don't expose too much details.
class TcpServer : boost::noncopyable
{
public:
typedef boost::function<void(EventLoop*)> ThreadInitCallback;
// 选项
enum Option
{
// 不复用端口
kNoReusePort,
// 复用端口
kReusePort,
};

// 构造函数
TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option = kNoReusePort);

// 析构函数
~TcpServer();

// 获取ip和端口
const string& hostport() const { return hostport_; }

// 获取名字
const string& name() const { return name_; }

// 获取Acceptor的EventLoop
EventLoop* getLoop() const { return loop_; }

// 设置线程的数量
/* numThreads的值:
* =0 表示所有的io操作都在Acceptor的EventLoop中进行
* =1 表示新的链接在Acceptor的EventLoop中进行,而其他的io操作在另一个线程中进行
* =N 表示新的链接Acceptor的EventLoop中进行,获取之后按照round-robin的方式分配给其中的一个线程处理其他的IO
*/
void setThreadNum(int numThreads);

// 设置线程初始化回调函数
void setThreadInitCallback(const ThreadInitCallback& cb)
{ threadInitCallback_ = cb; }

// 返回线程池
boost::shared_ptr<EventLoopThreadPool> threadPool()
{ return threadPool_; }

// 启动服务器
void start();

// 设置连接回调函数
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }

// 设置数据到来的回调函数
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }

// 设置写完成回调函数
void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{ writeCompleteCallback_ = cb; }

private:
/// 新连接到来回调函数
void newConnection(int sockfd, const InetAddress& peerAddr);
/// 删除连接
void removeConnection(const TcpConnectionPtr& conn);
/// 删除连接
void removeConnectionInLoop(const TcpConnectionPtr& conn);

typedef std::map<string, TcpConnectionPtr> ConnectionMap;

// 接收者的Reactor对象
EventLoop* loop_;  // the acceptor loop

// ip和端口号
const string hostport_;

// 名字
const string name_;

// 接收者
boost::scoped_ptr<Acceptor> acceptor_; // avoid revealing Acceptor

// Reactor线程池
boost::shared_ptr<EventLoopThreadPool> threadPool_;

// 连接建立回调函数
ConnectionCallback connectionCallback_;

// 数据到来回调函数
MessageCallback messageCallback_;

// 写完成回调函数
WriteCompleteCallback writeCompleteCallback_;

// 线程初始化回调函数
ThreadInitCallback threadInitCallback_;

// 服务器是否已经启动
AtomicInt32 started_;

// 下一个连接的id
int nextConnId_;

// 存放所有的连接
ConnectionMap connections_;
};

}
}


using namespace muduo;
using namespace muduo::net;

/*
* 构造函数
*/
TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option)
: loop_(CHECK_NOTNULL(loop)),
hostport_(listenAddr.toIpPort()),
name_(nameArg),

// 创建Acceptor
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),

// 创建EventLoop线程池,loop作为基础的EventLoop传进去
// 那么就算线程池设置的线程数量为0,EventLoopThreadPool中也还会有一个基础的EventLoop
threadPool_(new EventLoopThreadPool(loop, name_)),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
nextConnId_(1)
{
// 设置Acceptor新链接到来的回调函数
acceptor_->setNewConnectionCallback(boost::bind(&TcpServer::newConnection, this, _1, _2));
}

/*
* 析构函数
*/
TcpServer::~TcpServer()
{
loop_->assertInLoopThread();
LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";

// 断开每一个连接
for (ConnectionMap::iterator it(connections_.begin()); it != connections_.end(); ++it)
{
TcpConnectionPtr conn = it->second;
it->second.reset();

// 销毁连接
conn->getLoop()->runInLoop(boost::bind(&TcpConnection::connectDestroyed, conn));
conn.reset();
}
}

/*
* 设置EventLoop线程池中线程的数量
*/
void TcpServer::setThreadNum(int numThreads)
{
assert(0 <= numThreads);
threadPool_->setThreadNum(numThreads);
}

// 启动服务器
void TcpServer::start()
{
if (started_.getAndSet(1) == 0)
{
// 启动线程池中的线程
threadPool_->start(threadInitCallback_);

assert(!acceptor_->listenning());

// Acceptor开始监听
loop_->runInLoop(boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
}
}

/*
* 新链接到来的回调函数
*/
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();

// 从线程池中取出一个EventLoop对象,然后把该链接交给它管理
// 如果线程池的线程数量为0,那么将返回基础的EventLoop即loop(Acceptor的EventLoop)
EventLoop* ioLoop = threadPool_->getNextLoop();

// 计算连接的id
char buf[32];
snprintf(buf, sizeof buf, ":%s#%d", hostport_.c_str(), nextConnId_);
++nextConnId_;

// 构建链接的名字
string connName = name_ + buf;

LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toIpPort();

InetAddress localAddr(sockets::getLocalAddr(sockfd));

// 创建一个TcpConnection对象
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));

// 存储TcpConnection对象
connections_[connName] = conn;

// 为TcpConnection对象设置用户的回调函数:连接到来的回调函数、数据到来的回调函数、写完成的回调函数、关闭链接的回调函数
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe

// 调用TcpConnection::connectEstablished表示链接建立完毕
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
}

// 移除一个连接
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
// FIXME: unsafe
loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop, this, conn));
}

// 移除一个链接
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
loop_->assertInLoopThread();
LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
<< "] - connection " << conn->name();

// 删除链接
size_t n = connections_.erase(conn->name());
(void)n;
assert(n == 1);

EventLoop* ioLoop = conn->getLoop();

// 调用TcpConnection::connectDestroyed销毁连接
ioLoop->queueInLoop(boost::bind(&TcpConnection::connectDestroyed, conn));
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: