您的位置:首页 > 其它

利用线程本地存储降低服务器处理延时

2016-02-08 17:17 507 查看
上一篇博客我们介绍了使用shared_ptr来提高群发聊天服务器的并发性能,然而群发服务器不仅面临并发性能瓶颈,由于用户对于实时性要求较高。群发服务器还需要具备低延时的能力,才可以处理存在大量连接时的转发。上一篇博客中展示的群发服务器,采用multip Reactor模式,消息的转发统一的在main reactor所属线程中执行,因为连接队列只有一份,所以转发的效率并不高,从第一个连接到最后一个连接存在较高的延时。画一个不太恰当的示意图:



为了降低服务器的延时,我们可以借助线程本地存储数据,在每一个sub reactor所属的线程中创建一个用于存储连接对象的数据结构,有连接请求到来时,将其插入当前处理连接的线程本地连接队列中。这样再有转发业务需要处理的时候,所有线程可以并发的执行转发业务,并且不需要使用锁,大大降低了服务器的延迟时间。kimgbo网络库中直接采用了muduo对于线程本地数据存储的封装,可以利用kimgbo网络库方便的实现此功能。经过改进后的程序,所有的sub
reactor线程中都存储了一份连接队列,随时可以执行消息转发工作,而main reactor所在的主线程还需要负责调度、分发每个sub reactor在不同时刻的行为。示意图如下:



当然,主线程中也会存在一块线程本地存储数据。项目全部的代码参见kimgbo网络库example/chat目录下,核心的实现代码如下:

#include <stdio.h>
#include <set>
#include <functional>
#include <memory>
#include "Logging.h"
#include "Mutex.h"
#include "EventLoop.h"
#include "TcpServer.h"
#include "codec.h"
#include "ThreadLocalSingleton.h"

using namespace kimgbo;
using namespace kimgbo::net;

class ChatServer
{
public:
ChatServer(EventLoop* loop,
const InetAddress& listenAddr)
: m_loop(loop),
m_server(m_loop, listenAddr, "ChatServer"),
m_codec(std::bind(&ChatServer::onStringMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))
{
m_server.setConnectionCallback(
std::bind(&ChatServer::onConnection, this, std::placeholders::_1));
m_server.setMessageCallback(
std::bind(&LengthHeaderCodec::onMessage, &m_codec, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
}

void setThreadNum(int numThreads)
{
m_server.setThreadNum(numThreads);
}

void start()
{
m_server.setThreadInitCallback(std::bind(&ChatServer::threadInit, this, std::placeholders::_1));
m_server.start();
}

private:
typedef std::set<TcpConnectionPtr> ConnectionList;

void onConnection(const TcpConnectionPtr& conn)
{
LOG_INFO << conn->localAddress().toIpPort() << " -> "
<< conn->peerAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");

/*启动线程之前执行的初始化操作*/
if (conn->connected())
{
m_connections.instance().insert(conn);
}
else
{
m_connections.instance().erase(conn);
}
}

/*初始化线程之前*/
void threadInit(EventLoop* loop)
{
assert(m_connections.pointer() == NULL);
m_connections.instance();
assert(m_connections.pointer() != NULL);

MutexLockGuard lock(m_mutex); //有可能同时创建的多个I/O线程会同时执行threadInit函数,所以次处需要加锁保护数据
m_loops.insert(loop);
}

/*业务调度分发*/
void onStringMessage(const TcpConnectionPtr&, const kimgbo::string& message, Timestamp)
{
EventLoop::Functor f = std::bind(&ChatServer::distributeMessage, this, message);
LOG_DEBUG;

MutexLockGuard lock(m_mutex);
for (std::set<EventLoop*>::iterator it = m_loops.begin(); it != m_loops.end(); ++it)
{
(*it)->queueInLoop(f); //queueInLoop将其转到EventLoop所属线程中执行
}
LOG_DEBUG;
}

/*各个线程具体的消息转发处理函数*/
void distributeMessage(const kimgbo::string& message)
{
LOG_DEBUG << "begin";
for (ConnectionList::iterator it = m_connections.instance().begin(); it != m_connections.instance().end(); ++it)
{
m_codec.send((*it).get(), message);
}
LOG_DEBUG << "end";
}

private:
EventLoop* m_loop;
TcpServer m_server;
LengthHeaderCodec m_codec;
MutexLock m_mutex;
ThreadLocalSingleton<ConnectionList> m_connections; //定义线程本地存储数据
std::set<EventLoop*> m_loops; //存储各个线程的事件循环对象,方便调度
};

int main(int argc, char* argv[])
{
LOG_INFO << "pid = " << getpid();
if (argc > 1)
{
EventLoop loop;
uint16_t port = static_cast<uint16_t>(atoi(argv[1]));
InetAddress serverAddr(port);
ChatServer server(&loop, serverAddr);
if (argc > 2)
{
server.setThreadNum(atoi(argv[2]));
}
server.start();
loop.loop();
}
else
{
printf("Usage: %s port [thread_num]\n", argv[0]);
}
}
kimgbo开源网络I/O库见:https://github.com/kimg-bo/kimgbo
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: