利用线程本地存储降低服务器处理延时
2016-02-08 17:17
507 查看
上一篇博客我们介绍了使用shared_ptr来提高群发聊天服务器的并发性能,然而群发服务器不仅面临并发性能瓶颈,由于用户对于实时性要求较高。群发服务器还需要具备低延时的能力,才可以处理存在大量连接时的转发。上一篇博客中展示的群发服务器,采用multip Reactor模式,消息的转发统一的在main reactor所属线程中执行,因为连接队列只有一份,所以转发的效率并不高,从第一个连接到最后一个连接存在较高的延时。画一个不太恰当的示意图:
为了降低服务器的延时,我们可以借助线程本地存储数据,在每一个sub reactor所属的线程中创建一个用于存储连接对象的数据结构,有连接请求到来时,将其插入当前处理连接的线程本地连接队列中。这样再有转发业务需要处理的时候,所有线程可以并发的执行转发业务,并且不需要使用锁,大大降低了服务器的延迟时间。kimgbo网络库中直接采用了muduo对于线程本地数据存储的封装,可以利用kimgbo网络库方便的实现此功能。经过改进后的程序,所有的sub
reactor线程中都存储了一份连接队列,随时可以执行消息转发工作,而main reactor所在的主线程还需要负责调度、分发每个sub reactor在不同时刻的行为。示意图如下:
当然,主线程中也会存在一块线程本地存储数据。项目全部的代码参见kimgbo网络库example/chat目录下,核心的实现代码如下:
为了降低服务器的延时,我们可以借助线程本地存储数据,在每一个sub reactor所属的线程中创建一个用于存储连接对象的数据结构,有连接请求到来时,将其插入当前处理连接的线程本地连接队列中。这样再有转发业务需要处理的时候,所有线程可以并发的执行转发业务,并且不需要使用锁,大大降低了服务器的延迟时间。kimgbo网络库中直接采用了muduo对于线程本地数据存储的封装,可以利用kimgbo网络库方便的实现此功能。经过改进后的程序,所有的sub
reactor线程中都存储了一份连接队列,随时可以执行消息转发工作,而main reactor所在的主线程还需要负责调度、分发每个sub reactor在不同时刻的行为。示意图如下:
当然,主线程中也会存在一块线程本地存储数据。项目全部的代码参见kimgbo网络库example/chat目录下,核心的实现代码如下:
#include <stdio.h> #include <set> #include <functional> #include <memory> #include "Logging.h" #include "Mutex.h" #include "EventLoop.h" #include "TcpServer.h" #include "codec.h" #include "ThreadLocalSingleton.h" using namespace kimgbo; using namespace kimgbo::net; class ChatServer { public: ChatServer(EventLoop* loop, const InetAddress& listenAddr) : m_loop(loop), m_server(m_loop, listenAddr, "ChatServer"), m_codec(std::bind(&ChatServer::onStringMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)) { m_server.setConnectionCallback( std::bind(&ChatServer::onConnection, this, std::placeholders::_1)); m_server.setMessageCallback( std::bind(&LengthHeaderCodec::onMessage, &m_codec, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); } void setThreadNum(int numThreads) { m_server.setThreadNum(numThreads); } void start() { m_server.setThreadInitCallback(std::bind(&ChatServer::threadInit, this, std::placeholders::_1)); m_server.start(); } private: typedef std::set<TcpConnectionPtr> ConnectionList; void onConnection(const TcpConnectionPtr& conn) { LOG_INFO << conn->localAddress().toIpPort() << " -> " << conn->peerAddress().toIpPort() << " is " << (conn->connected() ? "UP" : "DOWN"); /*启动线程之前执行的初始化操作*/ if (conn->connected()) { m_connections.instance().insert(conn); } else { m_connections.instance().erase(conn); } } /*初始化线程之前*/ void threadInit(EventLoop* loop) { assert(m_connections.pointer() == NULL); m_connections.instance(); assert(m_connections.pointer() != NULL); MutexLockGuard lock(m_mutex); //有可能同时创建的多个I/O线程会同时执行threadInit函数,所以次处需要加锁保护数据 m_loops.insert(loop); } /*业务调度分发*/ void onStringMessage(const TcpConnectionPtr&, const kimgbo::string& message, Timestamp) { EventLoop::Functor f = std::bind(&ChatServer::distributeMessage, this, message); LOG_DEBUG; MutexLockGuard lock(m_mutex); for (std::set<EventLoop*>::iterator it = m_loops.begin(); it != m_loops.end(); ++it) { (*it)->queueInLoop(f); //queueInLoop将其转到EventLoop所属线程中执行 } LOG_DEBUG; } /*各个线程具体的消息转发处理函数*/ void distributeMessage(const kimgbo::string& message) { LOG_DEBUG << "begin"; for (ConnectionList::iterator it = m_connections.instance().begin(); it != m_connections.instance().end(); ++it) { m_codec.send((*it).get(), message); } LOG_DEBUG << "end"; } private: EventLoop* m_loop; TcpServer m_server; LengthHeaderCodec m_codec; MutexLock m_mutex; ThreadLocalSingleton<ConnectionList> m_connections; //定义线程本地存储数据 std::set<EventLoop*> m_loops; //存储各个线程的事件循环对象,方便调度 }; int main(int argc, char* argv[]) { LOG_INFO << "pid = " << getpid(); if (argc > 1) { EventLoop loop; uint16_t port = static_cast<uint16_t>(atoi(argv[1])); InetAddress serverAddr(port); ChatServer server(&loop, serverAddr); if (argc > 2) { server.setThreadNum(atoi(argv[2])); } server.start(); loop.loop(); } else { printf("Usage: %s port [thread_num]\n", argv[0]); } }kimgbo开源网络I/O库见:https://github.com/kimg-bo/kimgbo
相关文章推荐
- C++自制Redis数据库(七) 决战架构设计--从数据的角度贯通始终,表白我的心。
- adb启动失效
- 总结2015,启航2016
- BZOJ 2096: [Poi2010]Pilots
- 【深度】20年,中国互联网主流产品的演变和逻辑
- eclipse 交叉编译unresolved inclusion:<stdio.h>问题
- NYOJ 题目68 三点顺序
- 有意思的数学题:Trapping Rain Water
- 建筑图纸符号大全
- 2015年终总结
- Caffe使用教程
- BZOJ 1305: [CQOI2009]dance跳舞
- 怎么读Man Page和BNF
- abstract 抽象类
- ADO.NET之存储图片路径
- 两种不同刷新和显示,有待研究
- 【翻译自mos文章】在windows平台上怎么启用Oracle database 企业版的partition?
- VS2013 更改MFC标题栏图标和生成的执行文件图标
- restlet 2.3.5 org.restlet包导入eclipse出现的com.sun.net.httpserver类包找不到问题
- C printf() 详解——printf('%08x',number); 程序员面试宝典中的一个错误 char * b=(char *)&a