媒体转发服务器-TCP 在 EPOLL 模型中的注意细节
2013-12-13 14:47
573 查看
前段时间在公司开发了基于udp的流媒体转发服务器,在公网udp转发ts,花屏比较严重。课下之余写了epoll-tcp模型的转发服务器作为测试,比较一下效果,其间遇到不少问题,在此做个笔记。代码最后附上
一、业务需求:终端录制视频(android编码h264) 客户端请求视频 服务器负责转发
因为是测试用没有考虑配置文件,负载均衡,安全认证等
二、协议指定
1、音视频 协议定义:总长度不大于1500 bytes, 终端启动后就进行发送数据
类型标志:0xF0 视频
0xF1 音频
0x00 指令
2、指令协议 用于客户端请求视频
指令:0x0010 请求报活
0x0011 停止请求
三、数据结构关联
1、会话管理
struct Connection //终端、客户端 会话管理
{
int term; // 终端id, 用于客户端时表示请求目标的id
int sock; //tcp
time_t tm; //上次活动时间
int bufsize; //用于epoll 接收
int wantsize;
int recvsize;
char *recvbuf;
CBufQue bufque; //循环队列,客户端用于缓存要发送的数据
}
typedef map<int, Connection*> MAPConnection; // socket -Connection //存储会话
typedef set<int> SETSocket; //socket
typedef map<int, SETSocket*> MAPTermClient; // 一个终端可以转发到多个客户端,保存客户端的key
2、流程管理
线程1-terminal:接收终端视频 : epoll ET模式 非阻塞socket
线程2-media:将接受的视频数据分发到对应的客户端发送缓冲队列
线程3-client:接收客户端的请求, 分发视频数据,epoll LT模式 非阻塞 socket
线程4-cmd:处理客户端的指令,请求停止等
四、代码注意细节
1、socket 设置非堵塞
2、socket 设置SO_REUSEADDR
为什么要设置此项呢?参考 /article/11013798.html
3、accept
LT模式比较清晰,就不说了
ET模式下accept存在的问题
考虑这种情况:多个连接同时到达,服务器的TCP就绪队列瞬间积累多个就绪连接,由于是边缘触发模式,epoll只会通知一次,accept只处理一个连接,导致TCP就绪队列中剩下的连接都得不到处理。
解决办法是用while循环抱住accept调用,处理完TCP就绪队列中的所有连接后再退出循环。如何知道是否处理完就绪队列中的所有连接呢?accept返回-1并且errno设置为EAGAIN就表示所有连接都处理完。
4、recv
Epoll ET模式 非阻塞socket,必须独到没有数据可读,此处的设计是每个socket对应一个connection,connection中有一个数据包的长度wantsize,接收完一个数据包之后,会将此数据包放入队列等待处理,循环接收下一个数据包
此函数是在epoll接到事件后调用,协议约定数据包前俩字节是长度
epoll 循环,epoll添加socket时,epoll_eveny关联的是connection* ,在收到事件是,可以直接取来接收存放数据
5、send
这个问题比较谨慎处理,EPOLLOUT 到底在什么时候触发??
ET 模式下,转发一片文章 /article/4977346.html
ET模式称为边缘触发模式,顾名思义,不到边缘情况,是死都不会触发的。
EPOLLOUT事件:
EPOLLOUT事件只有在连接时触发一次,表示可写,其他时候想要触发,那你要先准备好下面条件:
1.某次write,写满了发送缓冲区,返回错误码为EAGAIN。
2.对端读取了一些数据,又重新可写了,此时会触发EPOLLOUT。
简单地说:EPOLLOUT事件只有在不可写到可写的转变时刻,才会触发一次,所以叫边缘触发,这叫法没错的!
其实,如果你真的想强制触发一次,也是有办法的,直接调用epoll_ctl重新设置一下event就可以了,event跟原来的设置一模一样都行(但必须包含EPOLLOUT),关键是重新设置,就会马上触发一次EPOLLOUT事件。
EPOLLIN事件:
EPOLLIN事件则只有当对端有数据写入时才会触发,所以触发一次后需要不断读取所有数据直到读完EAGAIN为止。否则剩下的数据只有在下次对端有写入时才能一起取出来了。
现在明白为什么说epoll必须要求异步socket了吧?如果同步socket,而且要求读完所有数据,那么最终就会在堵死在阻塞里。
LT模式 比较明了,只要可写就一直触发EPOLLOUT
在此测试项目,用了LT模式,但如何避免一直触发?
当有数据需要发送时,添加EPOLLOUT监听事件。当发送完成后移除此事件
附上代码:http://download.csdn.net/detail/liuhongxiangm/6709063
一、业务需求:终端录制视频(android编码h264) 客户端请求视频 服务器负责转发
因为是测试用没有考虑配置文件,负载均衡,安全认证等
二、协议指定
1、音视频 协议定义:总长度不大于1500 bytes, 终端启动后就进行发送数据
长度 short | 终端id int | 类型 byte | 包序号 short | 帧类型 byte | 数据 |
2 byte | 4 byte | 1 byte | 2 byte | 1 byte |
0xF1 音频
0x00 指令
2、指令协议 用于客户端请求视频
长度 short | 终端id int | 类型 byte | 指令 int |
2 byte | 4 byte | 1 byte | 4 byte |
0x0011 停止请求
三、数据结构关联
1、会话管理
struct Connection //终端、客户端 会话管理
{
int term; // 终端id, 用于客户端时表示请求目标的id
int sock; //tcp
time_t tm; //上次活动时间
int bufsize; //用于epoll 接收
int wantsize;
int recvsize;
char *recvbuf;
CBufQue bufque; //循环队列,客户端用于缓存要发送的数据
}
typedef map<int, Connection*> MAPConnection; // socket -Connection //存储会话
typedef set<int> SETSocket; //socket
typedef map<int, SETSocket*> MAPTermClient; // 一个终端可以转发到多个客户端,保存客户端的key
2、流程管理
线程1-terminal:接收终端视频 : epoll ET模式 非阻塞socket
线程2-media:将接受的视频数据分发到对应的客户端发送缓冲队列
线程3-client:接收客户端的请求, 分发视频数据,epoll LT模式 非阻塞 socket
线程4-cmd:处理客户端的指令,请求停止等
四、代码注意细节
1、socket 设置非堵塞
bool NetCommon::SetSockBlock(const int &fd, bool block) { if(block) { int flags = fcntl(fd, F_GETFL, 0); fcntl(fd, F_SETFL, flags&~O_NONBLOCK); } else { int flags = fcntl(fd, F_GETFL, 0); fcntl(fd, F_SETFL, flags|O_NONBLOCK); } return true; }
2、socket 设置SO_REUSEADDR
bool NetCommon::SetReuseAddr(const int &fd, bool reuse) { int opt = 0; if(reuse) { opt = 1; } else { opt = 0; } if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) { return false; } return true; }
为什么要设置此项呢?参考 /article/11013798.html
3、accept
LT模式比较清晰,就不说了
ET模式下accept存在的问题
考虑这种情况:多个连接同时到达,服务器的TCP就绪队列瞬间积累多个就绪连接,由于是边缘触发模式,epoll只会通知一次,accept只处理一个连接,导致TCP就绪队列中剩下的连接都得不到处理。
解决办法是用while循环抱住accept调用,处理完TCP就绪队列中的所有连接后再退出循环。如何知道是否处理完就绪队列中的所有连接呢?accept返回-1并且errno设置为EAGAIN就表示所有连接都处理完。
int ctcpserver::acceptterminal(Connection *pConn) { while(true) { int newsock = accept(pConn->sock,NULL, NULL); if(newsock < 0) { if(errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) { break; } return -1; } if( newsock > 0) { NetCommon::SetSockBlock(newsock,false); NetCommon::SetReuseAddr(newsock, true); if(m_mapConnTerminal.size() > 500) { close(newsock); return 0; } Connection *newcon = new Connection; newcon->sock = newsock; newcon->wantsize = 2; newcon->recvsize = 0; time( &(newcon->tm)); if(m_epollTerminal.EpollAdd(newsock, EPOLLIN|EPOLLET, newcon) < 0) { delete newcon; newcon = NULL; close(newsock); return -1; } m_mapConnTerminal.insert(make_pair<int, Connection*>(newsock,newcon)); } } return 0; }
4、recv
Epoll ET模式 非阻塞socket,必须独到没有数据可读,此处的设计是每个socket对应一个connection,connection中有一个数据包的长度wantsize,接收完一个数据包之后,会将此数据包放入队列等待处理,循环接收下一个数据包
int ctcpserver::recvn(Connection *pConn) { int iret = 0; //wantsize < 2048 while(pConn->recvsize < pConn->wantsize) { iret = recv(pConn->sock, pConn->buf+pConn->recvsize,pConn->wantsize-pConn->recvsize, 0); if(iret == -1) { if(errno == EINTR) { break; } else if(errno == EWOULDBLOCK || errno == EAGAIN) { break; } else { return -1; } } if(iret == 0) { return -1; } pConn->recvsize += iret; } time(&(pConn->tm)); return pConn->recvsize; }
此函数是在epoll接到事件后调用,协议约定数据包前俩字节是长度
int ctcpserver::recvtrminal(Connection *pConn) { int iret = 0; while(true) { iret = recvn(pConn); if(iret < 0) { close(pConn->sock); m_epollTerminal.EpollDel(pConn->sock,pConn); m_mapConnTerminal.erase(pConn->sock); delete pConn; pConn = NULL; return -1; //error } if(pConn->recvsize != pConn->wantsize) //no data recv { break; } else { if(pConn->wantsize == 2) //接收玩的是数据的长度信息,去设置数据的接收大小 { pConn->wantsize = *(short*)(pConn->buf); if(pConn->wantsize > 2048 || pConn->wantsize < 2) //设计是不大于1500,此处为了兼容其他测试 { close(pConn->sock); m_epollTerminal.EpollDel(pConn->sock,pConn); m_mapConnTerminal.erase(pConn->sock); delete pConn; pConn = NULL; return -1; //something error with data } } else //接收完一个数据包 { BufNode *pnode = m_bufQueMedia.AllocNode(); if(pnode != NULL ) { memset( pnode->pBuf,0,pnode->nMaxLen); pnode->nLen = pConn->recvsize; memcpy(pnode->pBuf, pConn->buf,pConn->recvsize); pnode->sock = pConn->sock; m_bufQueMedia.PushNode(pnode); //media 线程负责处理数据 m_semMediaTask.Post(); } pConn->recvsize = 0; pConn->wantsize = 2; } } } return 0; }
epoll 循环,epoll添加socket时,epoll_eveny关联的是connection* ,在收到事件是,可以直接取来接收存放数据
int ctcpserver::terminalloop() { int event_count = 0; while(true) { event_count = m_epollTerminal.EpollWait(); if(event_count < 0) { if(errno == EINTR) { continue; } return 0; } else if(event_count > 0) { m_mutexTerminal.Lock(); for(int index=0; index < event_count; index++) { Connection *pConn = (Connection*)(m_epollTerminal.GetEVPtr(index)); if(pConn->sock == m_terminalsock) { if(acceptterminal(pConn) < 0) { return 0; } } else { if( m_epollTerminal.GetEVEvents(index)&EPOLLIN ) { if(recvtrminal(pConn) < 0) { continue; } } else if(m_epollTerminal.GetEVEvents(index)&EPOLLOUT) { } } } m_mutexTerminal.UnLock(); } } return 0; }
5、send
这个问题比较谨慎处理,EPOLLOUT 到底在什么时候触发??
ET 模式下,转发一片文章 /article/4977346.html
ET模式称为边缘触发模式,顾名思义,不到边缘情况,是死都不会触发的。
EPOLLOUT事件:
EPOLLOUT事件只有在连接时触发一次,表示可写,其他时候想要触发,那你要先准备好下面条件:
1.某次write,写满了发送缓冲区,返回错误码为EAGAIN。
2.对端读取了一些数据,又重新可写了,此时会触发EPOLLOUT。
简单地说:EPOLLOUT事件只有在不可写到可写的转变时刻,才会触发一次,所以叫边缘触发,这叫法没错的!
其实,如果你真的想强制触发一次,也是有办法的,直接调用epoll_ctl重新设置一下event就可以了,event跟原来的设置一模一样都行(但必须包含EPOLLOUT),关键是重新设置,就会马上触发一次EPOLLOUT事件。
EPOLLIN事件:
EPOLLIN事件则只有当对端有数据写入时才会触发,所以触发一次后需要不断读取所有数据直到读完EAGAIN为止。否则剩下的数据只有在下次对端有写入时才能一起取出来了。
现在明白为什么说epoll必须要求异步socket了吧?如果同步socket,而且要求读完所有数据,那么最终就会在堵死在阻塞里。
LT模式 比较明了,只要可写就一直触发EPOLLOUT
在此测试项目,用了LT模式,但如何避免一直触发?
当有数据需要发送时,添加EPOLLOUT监听事件。当发送完成后移除此事件
int ctcpserver::sendclient(Connection *pConn) { // BufNode *pnode = pConn->bufque.FrontNode(); // if(pnode != NULL) { int iret = send(pConn->sock, pnode->pBuf,pnode->nLen,0); if(iret <= 0) { if(errno != EAGAIN || errno == EWOULDBLOCK || errno == EAGAIN) { return 0; } else { close(pConn->sock); m_epollClient.EpollDel(pConn->sock,pConn); m_mapConClient.erase(pConn->sock); delete pConn; pConn = NULL; // cout << "m_mapConClient count:" << m_mapConClient.size() << endl; return -1; } } else { pConn->bufque.PopNode(); pConn->bufque.FreeNode(pnode); } } if(pConn->bufque.GetUsedNodeCount() == 0) { m_epollClient.EpollMod(pConn->sock, EPOLLIN, pConn);//移除EPOLLOUT 监听 } return 0; }
int ctcpserver::mediataskloop() { while (true) { m_semMediaTask.Wait(); BufNode *pnode = m_bufQueMedia.PopNode(); if(pnode == NULL) { continue; } //id-client int id = *(int*)(pnode->pBuf+2); m_mutexTermClient.Lock(); MAPTermClientIt itTermClient = m_mapTermClient.find(id); if(itTermClient == m_mapTermClient.end()) { SETSocket *pSetSock = new SETSocket; pSetSock->clear(); m_mapTermClient.insert(make_pair<int, SETSocket*>(id,pSetSock)); } else { SETSocket *pSetSock= itTermClient->second; if(!pSetSock->empty()) { SETSocketIt itSock; m_mutexClient.Lock(); for(itSock=pSetSock->begin(); itSock != pSetSock->end(); ++ itSock) { int sock = *itSock; MAPConnectionIt itConnClient = m_mapConClient.find(sock); if(itConnClient != m_mapConClient.end()) { Connection *pConn = itConnClient->second; BufNode *pnodeClient = pConn->bufque.AllocNode(); if(pnodeClient == NULL) { continue; } memcpy(pnodeClient->pBuf, pnode->pBuf, pnode->nLen ); pnodeClient->nLen = pnode->nLen; pConn->bufque.PushNode(pnodeClient); m_epollClient.EpollMod(pConn->sock, EPOLLIN|EPOLLOUT, pConn); //添加EPOLLOUT 监听 } else { pSetSock->erase(itSock); } } m_mutexClient.UnLock(); } } m_mutexTermClient.UnLock(); m_bufQueMedia.FreeNode(pnode); } return 0; }
附上代码:http://download.csdn.net/detail/liuhongxiangm/6709063
相关文章推荐
- 服务器-TCP 在 EPOLL 模型中的注意细节
- UNIX TCP回射服务器/客户端之使用epoll模型的服务器
- Linux Socket 事件触发模型 epoll 示例 这里会写一个用C语言的TCP服务器的完全实现的简单程序
- UNIX TCP回射服务器/客户端(4):使用epoll模型的服务器
- Linux网络编程【六】:TCP协议高性能服务器(http)模型之I/O多路转接epoll
- TCP并发服务器模型(一)
- epoll模型设计海量级连接服务器
- 采用epoll模型服务器连接管理器实现
- 关于客户端发送给epoll模型的服务器,卡send时的一个问题
- Nginx网络epoll多进程系列:tcp服务器仿nginx多进程和多路IO的实现
- 关于TCP ,select,epoll服务器的区别与联系
- 向服务器请求数据(没有上拉和下拉刷新的情况)注意细节
- Linux网络编程【五】:TCP协议高性能服务器(http)模型之I/O多路转接select
- 基于EPOLL模型的局域网聊天室和Echo服务器
- Linux 服务器IO模型 epoll
- Select I/O模型来实现一个并发处理多个客户端的TCP服务器
- TCP服务器模型
- 在Linux上开发网络服务器的一些相关细节:poll与epoll
- Linux下TCP/UDP socket服务器模型
- C/S通信---服务器IO多路复用模型之epoll的使用