您的位置:首页 > 理论基础 > 计算机网络

游戏服务器之网络收发线程处理详细分析

2014-08-17 14:48 330 查看
业务网络收发线程的循环部分,处理网络数据的接收和发送。

设计上:

(1)会话检查

1)先检查所有连接是否处于正常状态。回收标记关闭的会话。

例行检查: 遍历所有连接,发送测试信号(心跳)到客户端,回收标识关闭的会话连接(也要从读epoll描述符中注销),注册正常的连接会话到读epoll描述符

(2)网络的处理是读优先的,有读才检查写,写的检查是一段时间检查一次(目前是50ms)。通过两个epoll描述符来处理:

1)第一个epoll描述符监听所有连接的读事件,并处理网络数据的接收。

检查需要检查的连接的读事件,并处理epoll读事件(非阻塞检查)

2)第二个epoll描述符监听所有连接的读写事件,并处理网络数据的接收和发送(顺带会处理有读事件和写事件的连接的接收和发送)。

每隔一段时间并且在有读事件情况下,检查写缓存是否有数据,若有数据则发送数据(非阻塞处理)

(3)接收数据处理

1)网络接收数据到接收缓存

2)解压并拷贝到栈缓存

3)解析消息并派送消息到逻辑线程

(4)发送数据处理

1)拷贝数据到发送缓存1

2)拷贝发送缓存到栈缓存,先压缩,后加密(有标识才处理,在不同栈缓存处理)。拷贝到网络发送缓存2

3)发送网络发送缓存2

1、添加会话

添加会话到网络线程的会话管理器

注册会话到读写epoll,监听会话的读事件和写事件

void main_service_thread::_add(tcp_session *task)
{
task->addEpoll(kdpfd, EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLPRI, (void *)task);
tasks.push_back(task);
task_count = tasks.size();
if (task_count > epfds.size())
{
epfds.resize(task_count + 16);
}
。。。
}


2、网络线程循环

网络线程循环处理

void main_service_thread::run()
{
realtime currentTime;
realtime _write_time;
tcp_session_IT it;

int kdpfd_r;
vector_epoll epfds_r;
kdpfd_r = epoll_create(256);//创建读epoll描述符(监听队列大小)
assert(-1 != kdpfd_r);
epfds.resize(256);
uint32 fds_count_r = 0;
bool check=false;
while(!isFinal())
{
this->setRunning();
currentTime.now();
//检查没有关闭的连接,作为读事件的检查对象
if (check)
{
check_queue();
if (!tasks.empty())
{
//遍历所有连接,检查连接是否有读事件,发送测试信号(心跳)到客户端,断开需要断开的连接并回收
for(it = tasks.begin(); it != tasks.end();)//遍历所有连接
{
tcp_session *task = *it;

//检查测试信号指令(检查是否需要发送网络测试信号(只有简单包头的没有内容的消息)到客户端)
task->checkSignal(currentTime);
//回收需要断开的连接
if (task->isTerminateWait())//检查任务是否被中断,是的话就要设置中断任务标识(标识可以是其他线程标识的,但需要在本线程检查关闭和回收)
{
task->Terminate();//被动关闭(在其他线程中关闭的)
}
if (task->isTerminate())//如果连接被中断就从读epoll中删除(从epoll描述符中删除掉对该套接字的监听),并回收该连接
{
if (task->isFdsrAdd()) //如果被添加到监听读epoll的会话,需要从读epoll描述符移除该会话
{
task->delEpoll(kdpfd_r, EPOLLIN | EPOLLERR | EPOLLPRI);
fds_count_r --;//需要监听的读描述符数量减一
}
remove(it);//先从容器移除该会话
task->getNextState();//回收连接(先设置状态再添加容器)
pool->addRecycle(task);
}
else
{
//还没有加入监听读epoll描述符的连接会话,添加到读epoll描述符
if(!task->isFdsrAdd())
{
task->addEpoll(kdpfd_r, EPOLLIN | EPOLLERR | EPOLLPRI, (void *)task);
task->fdsrAdd();//标识该连接会话被添加到读事件监听列表
fds_count_r++;
if (fds_count_r > epfds_r.size())
{
epfds_r.resize(fds_count_r + 16);
}
}
++it;
}
}
}
check=false;
}
thread_base::msleep(2);//空闲时间

//检查需要检查的连接的读事件,并处理epoll读事件(非阻塞检查)
if(fds_count_r)//处理epoll读事件
{
int retcode = epoll_wait(kdpfd_r, &epfds_r[0], fds_count_r, 0);//监听所有的连接会话的读事件,fds_count_r 是需要监听的数量
if (retcode > 0)
{
for(int i = 0; i < retcode; i++)
{
tcp_session *task = (tcp_session *)epfds_r[i].data.ptr;
if (epfds_r[i].events & (EPOLLERR | EPOLLPRI))
{
//套接口出现错误(套接字出现错误就中断该任务)。(如果客户端关闭了就需要在这里检查错误)
if(task->TerminateError())
{
g_log->debug("%s: 套接口异常错误:%u", __PRETTY_FUNCTION__,epfds_r[i].events);
task->Terminate(tcp_session::terminate_active);//标识主动关闭会话
check=true;
}
else
{
if (task->isFdsrAdd())//从epoll描述符中删除该套接字
{
task->delEpoll(kdpfd_r, EPOLLIN | EPOLLERR | EPOLLPRI);//这里可能导致fds_count_r大于需要监听读的连接数
}
task->delEpoll(kdpfd, EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLPRI);
}
}
else
{
//接收有读事件的连接的数据
if (epfds_r[i].events & EPOLLIN)//处理读取事件
{
//套接口准备好了读取操作
if (!task->ListeningRecv(true))//处理网络读取事件,读取网络缓冲区的数据到指令,并分析指令
{
//读取接收缓冲区数据如果返回-1则是对方已经断开连接,则要中断任务
g_log->debug("%s: 套接口读操作错误 errno:%u, strerror:%s", __PRETTY_FUNCTION__, errno, strerror(errno));
task->Terminate(tcp_session::terminate_active);
check=true;
}
}
}
epfds_r[i].events=0;
}
}
}

//成功接收到数据才检查写缓存 (一直监听连接的写事件)
//套接字出错或者有读事件但还没到读缓冲区则继续检查读,否则开始检查写缓存
if(check)
{
continue;
}
//处理网络写
//每隔一段时间才查看检查是否有写事件需要处理
if (currentTime.msec() - _write_time.msec() >= (unsigned long)(pool->usleep_time/1000))
{
_write_time = currentTime;
if (!tasks.empty())
{
//检查epoll描述符(kdpfd)有事件的连接(写事件一直存在)
int retcode = epoll_wait(kdpfd, &epfds[0], task_count, 0);//处理其他服务器的连接
if (retcode > 0)
{
for(int i = 0; i < retcode; i++)
{
tcp_session *task = (tcp_session *)epfds[i].data.ptr;
if (epfds[i].events & (EPOLLERR | EPOLLPRI))//错误则中断任务
{
//套接口出现错误(需要主动标识回收)
if(task->TerminateError())
{
g_log->debug("%s: 套接口异常错误:%u", __PRETTY_FUNCTION__,epfds[i].events);
task->Terminate(tcp_session::terminate_active);
}
else
{
if (task->isFdsrAdd())//从读epoll描述符中移除
{
task->delEpoll(kdpfd_r, EPOLLIN | EPOLLERR | EPOLLPRI);
}
task->delEpoll(kdpfd, EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLPRI);
}
}
else//处理所有连接的读事件和写事件
{
//再次是否有读事件(epoll描述符kdpfd)
////这里又进行一次读事件检查,是为了尽量多处理事件,
//这样可以尽量减少系统调用从而提高效率(epoll使用的是水平触发,就算这里不处理读事件也是可以的,
//但是还是尽量多处理掉多些事件来减少epoll_wait的调用)
if (epfds[i].events & EPOLLIN)
{
//连接的接收数据
//套接口准备好了读取操作
if (!task->ListeningRecv(true))
{
g_log->debug("%s: 套接口读操作错误,errno:%u, strerror:%s", __PRETTY_FUNCTION__,errno, strerror(errno));
task->Terminate(tcp_session::terminate_active);
}
}
//检查连接的写事件(连接的写事件一直存在,目的检查的是写缓存)
if (epfds[i].events & EPOLLOUT)//处理写事件
{
//检查连接的写缓存并写数据
//套接口准备好了写入操作
if (!task->ListeningSend())
{
g_log->debug("%s: 套接口写操作错误errno:%u,strerrno:%s", __PRETTY_FUNCTION__,errno,strerror(errno));										task->Terminate(tcp_session::terminate_active);
}
}
}
epfds[i].events=0;//该事件处理完后手动去掉epoll事件标识
}
}
}
check=true;
}
}
//进程关闭前回收所有的会话
//回收所有的会话:
//线程关闭前就先要把所有的连接加入回收线程的队列,回收这些连接
for(it = tasks.begin(); it != tasks.end();)
{
tcp_session *task = *it;
remove(it);
// state_okay -> state_recy
/*
* cjy
* 先设置状态再添加容器,
*/
task->getNextState();
pool->addRecycle(task);
}
TEMP_FAILURE_RETRY(::close(kdpfd_r));//关闭epoll描述符
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐