您的位置:首页 > 其它

详谈高性能UDP服务器的开发[转]

2010-12-23 22:12 239 查看
详谈高性能UDP服务器的开发


上一篇文章我详细介绍了如何开发一款高性能的TCP服务器的网络传输层.本章我将谈谈如何开发一个高性能的UDP服务器的网络
层.UDP服务器的网络层开发相对与TCP服务器来说要容易和简单的多,UDP服务器的大致流程为创建一个socket然后将其绑定到完成端口上并投递一
定数量的recv操作.当有数据到来时从完成队列中取出数据发送到接收队列中即可。

测试结果如下:

WindowsXP Professional,Intel Core Duo E4600 双核2.4G ,
2G内存。同时30K个用户和该UDP服务器进行交互其CPU使用率为10%左右,内存占用7M左右。

下面详细介绍该服务器的架构及流程:

下面将主要介绍UdpSer类, 该类主要用来管理UDP服务.其定义如下:

1 class DLLENTRY UdpSer

2 {

3 public:

4 UdpSer();

5 ~UdpSer();

6

7
/**//************************************************************************

8 * Desc : 初始化静态资源,在申请UDP实例对象之前应先调用该函数, 否则程序无法正常运行

9
************************************************************************/

10 static void InitReource();

11

12
/**//************************************************************************

13 * Desc : 在释放UDP实例以后, 掉用该函数释放相关静态资源

14
************************************************************************/

15 static void ReleaseReource();

16

17 //用指定本地地址和端口进行初始化

18 BOOL StartServer(const CHAR* szIp = "0.0.0.0", INT nPort =
0);

19

20 //从数据队列的头部获取一个接收数据, pCount不为null时返回队列的长度

21 UDP_RCV_DATA* GetRcvData(DWORD* pCount);

22

23 //向对端发送数据

24 BOOL SendData(const IP_ADDR& PeerAddr, const CHAR*
szData, INT nLen);

25

26 /**//****************************************************

27 * Name : CloseServer()

28 * Desc : 关闭服务器

29 ****************************************************/

30 void CloseServer();

31

32 protected:

33 SOCKET m_hSock;

34 vector<UDP_RCV_DATA* > m_RcvDataQue;
//接收数据队列

35 CRITICAL_SECTION m_RcvDataLock;
//访问m_RcvDataQue的互斥锁

36 long volatile m_bThreadRun;
//是否允许后台线程继续运行

37 BOOL m_bSerRun;
//服务器是否正在运行

38

39 HANDLE *m_pThreads; //线程数组

40 HANDLE m_hCompletion; //完成端口句柄

41

42 void ReadCompletion(BOOL bSuccess, DWORD
dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped);

43

44 /**//****************************************************

45 * Name : WorkThread()

46 * Desc : I/O 后台管理线程

47 ****************************************************/

48 static UINT WINAPI WorkThread(LPVOID lpParam);

49 };

1. InitReource()
主要对相关的静态资源进行初始化.其实大致和TcpServer::InitReource()大致相同.在UdpSer实例使用之前必须调用该函数进行
静态资源的初始化, 否则服务器无法正常使用.

2.ReleaseReource() 主要对相关静态资源进行释放.只有在应用程序结束时才能调用该函数进行静态资源的释放.

3. StartServer()

该函数的主要功能启动一个UDP服务.其大致流程为先创建服务器UDP socket,
将其绑定到完成端口上然后投递一定数量的recv操作以接收客户端的数据.其实现如下:

1 BOOL UdpSer::StartServer(const CHAR* szIp /**//* = */, INT nPort
/**//* = 0 */)

2 {

3 BOOL bRet = TRUE;

4 const int RECV_COUNT = 500;

5 WSABUF RcvBuf = { NULL, 0 };

6 DWORD dwBytes = 0;

7 DWORD dwFlag = 0;

8 INT nAddrLen = sizeof(IP_ADDR);

9 INT iErrCode = 0;

10

11 try

12 {

13 if (m_bSerRun)

14 {

15 THROW_LINE;

16 }

17

18 m_bSerRun = TRUE;

19 m_hSock = WSASocket(AF_INET, SOCK_DGRAM, 0, NULL, 0,
WSA_FLAG_OVERLAPPED);

20 if (INVALID_SOCKET == m_hSock)

21 {

22 THROW_LINE;

23 }

24 ULONG ul = 1;

25 ioctlsocket(m_hSock, FIONBIO, &ul);

26

27 //设置为地址重用,优点在于服务器关闭后可以立即启用

28 int nOpt = 1;

29 setsockopt(m_hSock, SOL_SOCKET, SO_REUSEADDR,
(char*)&nOpt, sizeof(nOpt));

30

31 //关闭系统缓存,使用自己的缓存以防止数据的复制操作

32 INT nZero = 0;

33 setsockopt(m_hSock, SOL_SOCKET, SO_SNDBUF,
(char*)&nZero, sizeof(nZero));

34 setsockopt(m_hSock, SOL_SOCKET, SO_RCVBUF,
(CHAR*)&nZero, sizeof(nZero));

35

36 IP_ADDR addr(szIp, nPort);

37 if (SOCKET_ERROR == bind(m_hSock, (sockaddr*)&addr,
sizeof(addr)))

38 {

39 closesocket(m_hSock);

40 THROW_LINE;

41 }

42

43 //将SOCKET绑定到完成端口上

44 CreateIoCompletionPort((HANDLE)m_hSock, m_hCompletion, 0,
0);

45

46 //投递读操作

47 for (int nIndex = 0; nIndex < RECV_COUNT; nIndex++)

48 {

49 UDP_CONTEXT* pRcvContext = new UDP_CONTEXT();

50 if (pRcvContext && pRcvContext->m_pBuf)

51 {

52 dwFlag = 0;

53 dwBytes = 0;

54 nAddrLen = sizeof(IP_ADDR);

55 RcvBuf.buf = pRcvContext->m_pBuf;

56 RcvBuf.len = UDP_CONTEXT::S_PAGE_SIZE;

57

58 pRcvContext->m_hSock = m_hSock;

59 pRcvContext->m_nOperation =
OP_READ;

60 iErrCode = WSARecvFrom(pRcvContext->m_hSock,
&RcvBuf, 1, &dwBytes, &dwFlag,
(sockaddr*)(&pRcvContext->m_RemoteAddr)

61 , &nAddrLen, &(pRcvContext->m_ol),
NULL);

62 if (SOCKET_ERROR == iErrCode &&
ERROR_IO_PENDING != WSAGetLastError())

63 {

64 delete pRcvContext;

65 pRcvContext = NULL;

66 }

67 }

68 else

69 {

70 delete pRcvContext;

71 }

72 }

73 }

74 catch (const long &lErrLine)

75 {

76 bRet = FALSE;

77 _TRACE("Exp : %s -- %ld ", __FILE__,
lErrLine);

78 }

79

80 return bRet;

81 }4. GetRcvData(), 从接收队列中取出一个数据包.

1 UDP_RCV_DATA *UdpSer::GetRcvData(DWORD* pCount)

2 {

3 UDP_RCV_DATA* pRcvData = NULL;

4

5 EnterCriticalSection(&m_RcvDataLock);

6 vector<UDP_RCV_DATA* >::iterator iterRcv =
m_RcvDataQue.begin();

7 if (iterRcv != m_RcvDataQue.end())

8 {

9 pRcvData = *iterRcv;

10 m_RcvDataQue.erase(iterRcv);

11 }

12

13 if (pCount)

14 {

15 *pCount = (DWORD)(m_RcvDataQue.size());

16 }

17 LeaveCriticalSection(&m_RcvDataLock);

18

19 return pRcvData;

20 }

5. SendData() 发送指定长度的数据包.

1 BOOL UdpSer::SendData(const IP_ADDR& PeerAddr, const CHAR*
szData, INT nLen)

2 {

3 BOOL bRet = TRUE;

4 try

5 {

6 if (nLen >= 1500)

7 {

8 THROW_LINE;

9 }

10

11 UDP_CONTEXT* pSendContext = new UDP_CONTEXT();

12 if (pSendContext && pSendContext->m_pBuf)

13 {

14 pSendContext->m_nOperation = OP_WRITE;

15 pSendContext->m_RemoteAddr = PeerAddr;

16

17 memcpy(pSendContext->m_pBuf, szData, nLen);

18

19 WSABUF SendBuf = { NULL, 0 };

20 DWORD dwBytes = 0;

21 SendBuf.buf = pSendContext->m_pBuf;

22 SendBuf.len = nLen;

23

24 INT iErrCode = WSASendTo(m_hSock, &SendBuf, 1,
&dwBytes, 0, (sockaddr*)&PeerAddr, sizeof(PeerAddr),
&(pSendContext->m_ol), NULL);

25 if (SOCKET_ERROR == iErrCode &&
ERROR_IO_PENDING != WSAGetLastError())

26 {

27 delete pSendContext;

28 THROW_LINE;

29 }

30 }

31 else

32 {

33 delete pSendContext;

34 THROW_LINE;

35 }

36 }

37 catch (const long &lErrLine)

38 {

39 bRet = FALSE;

40 _TRACE("Exp : %s -- %ld ", __FILE__,
lErrLine);

41 }

42

43 return bRet;

44 }

6. CloseServer() 关闭服务

1 void UdpSer::CloseServer()

2 {

3 m_bSerRun = FALSE;

4 closesocket(m_hSock);

5 }

7. WorkThread() 在完成端口上工作的后台线程

1 UINT WINAPI UdpSer::WorkThread(LPVOID lpParam)

2 {

3 UdpSer *pThis = (UdpSer *)lpParam;

4 DWORD dwTrans = 0, dwKey = 0;

5 LPOVERLAPPED pOl = NULL;

6 UDP_CONTEXT *pContext = NULL;

7

8 while (TRUE)

9 {

10 BOOL bOk =
GetQueuedCompletionStatus(pThis->m_hCompletion, &dwTrans,
&dwKey, (LPOVERLAPPED *)&pOl, WSA_INFINITE);

11

12 pContext = CONTAINING_RECORD(pOl, UDP_CONTEXT, m_ol);

13 if (pContext)

14 {

15 switch (pContext->m_nOperation)

16 {

17 case OP_READ:

18 pThis->ReadCompletion(bOk, dwTrans, pOl);

19 break;

20 case OP_WRITE:

21 delete pContext;

22 pContext = NULL;

23 break;

24 }

25 }

26

27 if (FALSE ==
InterlockedExchangeAdd(&(pThis->m_bThreadRun), 0))

28 {

29 break;

30 }

31 }

32

33 return 0;

34 }

8.ReadCompletion(), 接收操作完成后的回调函数

1 void UdpSer::ReadCompletion(BOOL bSuccess, DWORD
dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped)

2 {

3 UDP_CONTEXT* pRcvContext = CONTAINING_RECORD(lpOverlapped,
UDP_CONTEXT, m_ol);

4 WSABUF RcvBuf = { NULL, 0 };

5 DWORD dwBytes = 0;

6 DWORD dwFlag = 0;

7 INT nAddrLen = sizeof(IP_ADDR);

8 INT iErrCode = 0;

9

10 if (TRUE == bSuccess && dwNumberOfBytesTransfered
<= UDP_CONTEXT::S_PAGE_SIZE)

11 {

12#ifdef _XML_NET_

13 EnterCriticalSection(&m_RcvDataLock);

14

15 UDP_RCV_DATA* pRcvData = new
UDP_RCV_DATA(pRcvContext->m_pBuf, dwNumberOfBytesTransfered,
pRcvContext->m_RemoteAddr);

16 if (pRcvData && pRcvData->m_pData)

17 {

18 m_RcvDataQue.push_back(pRcvData);

19 }

20 else

21 {

22 delete pRcvData;

23 }

24

25 LeaveCriticalSection(&m_RcvDataLock);

26#else

27 if (dwNumberOfBytesTransfered >= sizeof(PACKET_HEAD))

28 {

29 EnterCriticalSection(&m_RcvDataLock);

30

31 UDP_RCV_DATA* pRcvData = new
UDP_RCV_DATA(pRcvContext->m_pBuf, dwNumberOfBytesTransfered,
pRcvContext->m_RemoteAddr);

32 if (pRcvData && pRcvData->m_pData)

33 {

34 m_RcvDataQue.push_back(pRcvData);

35 }

36 else

37 {

38 delete pRcvData;

39 }

40

41 LeaveCriticalSection(&m_RcvDataLock);

42 }

43#endif

44

45 //投递下一个接收操作

46 RcvBuf.buf = pRcvContext->m_pBuf;

47 RcvBuf.len = UDP_CONTEXT::S_PAGE_SIZE;

48

49 iErrCode = WSARecvFrom(pRcvContext->m_hSock,
&RcvBuf, 1, &dwBytes, &dwFlag,
(sockaddr*)(&pRcvContext->m_RemoteAddr)

50 , &nAddrLen, &(pRcvContext->m_ol), NULL);

51 if (SOCKET_ERROR == iErrCode && ERROR_IO_PENDING
!= WSAGetLastError())

52 {

53 ATLTRACE("/r/n%s -- %ld dwNumberOfBytesTransfered =
%ld, LAST_ERR = %ld"

54 , __FILE__, __LINE__, dwNumberOfBytesTransfered,
WSAGetLastError());

55 delete pRcvContext;

56 pRcvContext = NULL;

57 }

58 }

59 else

60 {

61 delete pRcvContext;

62 }

63 }
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: