您的位置:首页 > 理论基础 > 计算机网络

完成端口封装(修复Windows 网络与通信程序设计 可伸缩IOCP模型的bug)

2016-08-26 11:06 423 查看


前言

看过《Windows网络与通信程序设计》的人都知道,里面有一段有关于IOCP的经典封装。大大方便了“伸手党”服务器端程序的开发(我也是其中之一)。但是应用到实际程序中你会发现经常出现一个莫名奇妙的问题:一旦客户端发送的字节数过多,服务器端接受其中几条后就“死掉”了,我也深受其害,于是乎今天花了2小时时间通读了代码,把其中的bug找到(PS:不敢保证是否还是其他bug,暂时还未发现)


问题

引起上述现象的是由于作者的小疏忽导致的,我们知道采用完成端口时要开辟多个线程(一般为CPU核心数)来监听请求,如果客户端在短时间内发送了一大堆字节,

这些字节在客户端上肯定是分多次顺序进行发送。服务端接受的时候却是多个线程(单线程CPU例外)各自接受各自的原来的顺序就被打乱了,如果只是简单拼凑起来肯定会出现问题,如何解决这个问题呢,作者已经帮我想好了。虽然各个线程处理数据的顺序可能不一直,但是投递读请求的顺序肯定是与客户端发送的顺序是一直的,于是作者CIOCPBuffer中添加了一个顺序标识nSequenceNumber用来标识当前读取数据的发送顺序,并建立了一个队列将当前接受CIOCPBuffer联系起来。

// 这是per-I/O数据。它包含了在套节字上处理I/O操作的必要信息

struct CIOCPBuffer

{

WSAOVERLAPPED ol;


SOCKET sClient;// AcceptEx接收的客户方套节字


char *buff;// I/O操作使用的缓冲区

int nLen;// buff缓冲区(使用的)大小


ULONG nSequenceNumber;// 此I/O的序列号


int nOperation;// 操作类型

#define OP_ACCEPT1

#define OP_WRITE2

#define OP_READ3


CIOCPBuffer *pNext;

};

[/code]

读取数据的时候先判断读取数据CIOCPBuffer的nSequenceNumber与当前队列头的CIOCPBuffer是否一致,一致则循环将队列中的所有Buffer按顺序传递给用户,不一致就将它按顺序加入队列。

这种设计是不是很巧妙。

作者的代码:

CIOCPBuffer *CIOCPServer::GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer)

{

if(pBuffer != NULL)

{

// 如果与要读的下一个序列号相等,则读这块缓冲区

if(pBuffer->nSequenceNumber == pContext->nCurrentReadSequence)

{

return pBuffer;

}


// 如果不相等,则说明没有按顺序接收数据,将这块缓冲区保存到连接的pOutOfOrderReads列表中


// 列表中的缓冲区是按照其序列号从小到大的顺序排列的


pBuffer->pNext = NULL;


CIOCPBuffer *ptr = pContext->pOutOfOrderReads;

CIOCPBuffer *pPre = NULL;

while(ptr != NULL)

{

if(pBuffer->nSequenceNumber < ptr->nSequenceNumber)

break;


pPre = ptr;

ptr = ptr->pNext;

}


if(pPre == NULL) // 应该插入到表头

{

pBuffer->pNext = pContext->pOutOfOrderReads;

pContext->pOutOfOrderReads = pBuffer;

}

else// 应该插入到表的中间

{

pBuffer->pNext = pPre->pNext;

pPre->pNext = pBuffer->pNext;

}

}


// 检查表头元素的序列号,如果与要读的序列号一致,就将它从表中移除,返回给用户

CIOCPBuffer *ptr = pContext->pOutOfOrderReads;

if(ptr != NULL && (ptr->nSequenceNumber == pContext->nCurrentReadSequence))

{

pContext->pOutOfOrderReads = ptr->pNext;

return ptr;

}

return NULL;

}

[/code]

我处理后的代码

CIOCPBuffer *CIOCPServer::GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer)

{

if(pBuffer != NULL)

{

// 如果与要读的下一个序列号相等,则读这块缓冲区

if(pBuffer->nSequenceNumber == pContext->nCurrentReadSequence)

{

return pBuffer;

}


// 如果不相等,则说明没有按顺序接收数据,将这块缓冲区保存到连接的pOutOfOrderReads列表中


// 列表中的缓冲区是按照其序列号从小到大的顺序排列的


pBuffer->pNext = NULL;


CIOCPBuffer *ptr = pContext->pOutOfOrderReads;

CIOCPBuffer *pPre = NULL;

while(ptr != NULL)

{

if(pBuffer->nSequenceNumber < ptr->nSequenceNumber)

break;


pPre = ptr;

ptr = ptr->pNext;

}


if(pPre == NULL) // 应该插入到表头

{

pBuffer->pNext = pContext->pOutOfOrderReads;

pContext->pOutOfOrderReads = pBuffer;

}

else// 应该插入到表的中间

{

pBuffer->pNext = pPre->pNext;

pPre->pNext = pBuffer;

}

}


// 检查表头元素的序列号,如果与要读的序列号一致,就将它从表中移除,返回给用户

CIOCPBuffer *ptr = pContext->pOutOfOrderReads;

if(ptr != NULL && (ptr->nSequenceNumber == pContext->nCurrentReadSequence))

{

pContext->pOutOfOrderReads = ptr->pNext;

return ptr;

}

return NULL;

}

[/code]

对,你没有看错,仅仅是这么简单一句代码,就造成了整个程序的错误。看不懂的同学自己去看看如何向链表中见插入一个元素吧。

废话不多说上一段完整程序:

////////////////////////////////////////

// IOCP.h文件


#ifndef __IOCP_H__

#define __IOCP_H__


#include <winsock2.h>

#include <windows.h>

#include <Mswsock.h>


#define BUFFER_SIZE 1024*2// I/O请求的缓冲区大小



// 这是per-I/O数据。它包含了在套节字上处理I/O操作的必要信息

struct CIOCPBuffer

{

WSAOVERLAPPED ol;


SOCKET sClient;// AcceptEx接收的客户方套节字


char *buff;// I/O操作使用的缓冲区

int nLen;// buff缓冲区(使用的)大小


ULONG nSequenceNumber;// 此I/O的序列号


int nOperation;// 操作类型

#define OP_ACCEPT1

#define OP_WRITE2

#define OP_READ3


CIOCPBuffer *pNext;

};


// 这是per-Handle数据。它包含了一个套节字的信息

struct CIOCPContext

{

SOCKET s;// 套节字句柄


SOCKADDR_IN addrLocal;// 连接的本地地址

SOCKADDR_IN addrRemote;// 连接的远程地址


BOOL bClosing;// 套节字是否关闭


int nOutstandingRecv;// 此套节字上抛出的重叠操作的数量

int nOutstandingSend;



ULONG nReadSequence;// 安排给接收的下一个序列号

ULONG nCurrentReadSequence;// 当前要读的序列号

CIOCPBuffer *pOutOfOrderReads;// 记录没有按顺序完成的读I/O


CRITICAL_SECTION Lock;// 保护这个结构


CIOCPContext *pNext;

};



class CIOCPServer   // 处理线程

{

public:

CIOCPServer();

~CIOCPServer();


// 开始服务

BOOL Start(int nPort = 4567, int nMaxConnections = 2000,

int nMaxFreeBuffers = 200, int nMaxFreeContexts = 100, int nInitialReads = 4);

// 停止服务

void Shutdown();


// 关闭一个连接和关闭所有连接

void CloseAConnection(CIOCPContext *pContext);

void CloseAllConnections();


// 取得当前的连接数量

ULONG GetCurrentConnection() { return m_nCurrentConnection; }


// 向指定客户发送文本

BOOL SendText(CIOCPContext *pContext, char *pszText, int nLen);


// 获得本机处理器的数量

static int _GetNoOfProcessors();


protected:


// 申请和释放缓冲区对象

CIOCPBuffer *AllocateBuffer(int nLen);

void ReleaseBuffer(CIOCPBuffer *pBuffer);


// 申请和释放套节字上下文

CIOCPContext *AllocateContext(SOCKET s);

void ReleaseContext(CIOCPContext *pContext);


// 释放空闲缓冲区对象列表和空闲上下文对象列表

void FreeBuffers();

void FreeContexts();


// 向连接列表中添加一个连接

BOOL AddAConnection(CIOCPContext *pContext);


// 插入和移除未决的接受请求

BOOL InsertPendingAccept(CIOCPBuffer *pBuffer);

BOOL RemovePendingAccept(CIOCPBuffer *pBuffer);


// 取得下一个要读取的

CIOCPBuffer *GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer);



// 投递接受I/O、发送I/O、接收I/O

BOOL PostAccept(CIOCPBuffer *pBuffer);

BOOL PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer);

BOOL PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer);


void HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError);



// 事件通知函数

// 建立了一个新的连接

virtual void OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer);

// 一个连接关闭

virtual void OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer);

// 在一个连接上发生了错误

virtual void OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError);

// 一个连接上的读操作完成

virtual void OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer);

// 一个连接上的写操作完成

virtual void OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer);


protected:


// 记录空闲结构信息

CIOCPBuffer *m_pFreeBufferList;

CIOCPContext *m_pFreeContextList;

int m_nFreeBufferCount;

int m_nFreeContextCount;

CRITICAL_SECTION m_FreeBufferListLock;

CRITICAL_SECTION m_FreeContextListLock;


// 记录抛出的Accept请求

CIOCPBuffer *m_pPendingAccepts;   // 抛出请求列表。

long m_nPendingAcceptCount;

CRITICAL_SECTION m_PendingAcceptsLock;


// 记录连接列表

CIOCPContext *m_pConnectionList;

int m_nCurrentConnection;

CRITICAL_SECTION m_ConnectionListLock;


// 用于投递Accept请求

HANDLE m_hAcceptEvent;

HANDLE m_hRepostEvent;

LONG m_nRepostCount;


intm_nThread;

int m_nPort;// 服务器监听的端口


int m_nInitialAccepts;

int m_nInitialReads;

int m_nMaxAccepts;

int m_nMaxSends;

int m_nMaxFreeBuffers;

int m_nMaxFreeContexts;

int m_nMaxConnections;


HANDLE m_hListenThread;// 监听线程

HANDLE m_hCompletion;// 完成端口句柄

SOCKET m_sListen;// 监听套节字句柄

LPFN_ACCEPTEX m_lpfnAcceptEx;// AcceptEx函数地址

LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockaddrs; // GetAcceptExSockaddrs函数地址


BOOL m_bShutDown;// 用于通知监听线程退出

BOOL m_bServerStarted;// 记录服务是否启动


HANDLEm_hMutex;


private:// 线程函数

static DWORD WINAPI _ListenThreadProc(LPVOID lpParam);

static DWORD WINAPI _WorkerThreadProc(LPVOID lpParam);

};



#endif // __IOCP_H__

[/code]

//////////////////////////////////////////////////

// IOCP.cpp文件

#include "StdAfx.h"

#include "iocp.h"

#pragma comment(lib, "WS2_32.lib")

#include <stdio.h>

CIOCPServer::CIOCPServer()

{

// 列表

m_pFreeBufferList = NULL;

m_pFreeContextList = NULL;

m_pPendingAccepts = NULL;

m_pConnectionList = NULL;


m_nFreeBufferCount = 0;

m_nFreeContextCount = 0;

m_nPendingAcceptCount = 0;

m_nCurrentConnection = 0;


::InitializeCriticalSection(&m_FreeBufferListLock);

::InitializeCriticalSection(&m_FreeContextListLock);

::InitializeCriticalSection(&m_PendingAcceptsLock);

::InitializeCriticalSection(&m_ConnectionListLock);


// Accept请求

m_hAcceptEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);

m_hRepostEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);

m_nRepostCount = 0;


m_nThread = 0;

m_nPort = 4567;


m_nInitialAccepts = 10;

m_nInitialReads = 4;

m_nMaxAccepts = 100;

m_nMaxSends = 20;

m_nMaxFreeBuffers = 200;

m_nMaxFreeContexts = 100;

m_nMaxConnections = 2000;


m_hListenThread = NULL;

m_hCompletion = NULL;

m_sListen = INVALID_SOCKET;

m_lpfnAcceptEx = NULL;

m_lpfnGetAcceptExSockaddrs = NULL;


m_bShutDown = FALSE;

m_bServerStarted = FALSE;


m_hMutex = CreateMutex(NULL,FALSE,"LOCK");


// 初始化WS2_32.dll

WSADATA wsaData;

WORD sockVersion = MAKEWORD(2, 2);

::WSAStartup(sockVersion, &wsaData);

}


CIOCPServer::~CIOCPServer()

{

Shutdown();


if(m_sListen != INVALID_SOCKET)

::closesocket(m_sListen);

if(m_hListenThread != NULL)

::CloseHandle(m_hListenThread);


::CloseHandle(m_hRepostEvent);

::CloseHandle(m_hAcceptEvent);


::DeleteCriticalSection(&m_FreeBufferListLock);

::DeleteCriticalSection(&m_FreeContextListLock);

::DeleteCriticalSection(&m_PendingAcceptsLock);

::DeleteCriticalSection(&m_ConnectionListLock);


::WSACleanup();

}



///////////////////////////////////

// 自定义帮助函数


CIOCPBuffer *CIOCPServer::AllocateBuffer(int nLen)

{

CIOCPBuffer *pBuffer = NULL;

if(nLen > BUFFER_SIZE)

return NULL;


// 为缓冲区对象申请内存

::EnterCriticalSection(&m_FreeBufferListLock);

if(m_pFreeBufferList == NULL)  // 内存池为空,申请新的内存

{

pBuffer = (CIOCPBuffer *)::HeapAlloc(GetProcessHeap(), 

HEAP_ZERO_MEMORY, sizeof(CIOCPBuffer) + BUFFER_SIZE);

}

else// 从内存池中取一块来使用

{

pBuffer = m_pFreeBufferList;

m_pFreeBufferList = m_pFreeBufferList->pNext;

pBuffer->pNext = NULL;

m_nFreeBufferCount --;

}

::LeaveCriticalSection(&m_FreeBufferListLock);


// 初始化新的缓冲区对象

if(pBuffer != NULL)

{

pBuffer->buff = (char*)(pBuffer + 1);

pBuffer->nLen = nLen;

}

return pBuffer;

}


void CIOCPServer::ReleaseBuffer(CIOCPBuffer *pBuffer)

{

::EnterCriticalSection(&m_FreeBufferListLock);


if(m_nFreeBufferCount <= m_nMaxFreeBuffers)// 将要释放的内存添加到空闲列表中

{

memset(pBuffer, 0, sizeof(CIOCPBuffer) + BUFFER_SIZE);

pBuffer->pNext = m_pFreeBufferList;

m_pFreeBufferList = pBuffer;


m_nFreeBufferCount ++ ;

}

else// 已经达到最大值,真正的释放内存

{

::HeapFree(::GetProcessHeap(), 0, pBuffer);

}


::LeaveCriticalSection(&m_FreeBufferListLock);

}



CIOCPContext *CIOCPServer::AllocateContext(SOCKET s)

{

CIOCPContext *pContext;


// 申请一个CIOCPContext对象

::EnterCriticalSection(&m_FreeContextListLock);

if(m_pFreeContextList == NULL)

{

pContext = (CIOCPContext *)

::HeapAlloc(::GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(CIOCPContext)); 


::InitializeCriticalSection(&pContext->Lock);

}

else

{

// 在空闲列表中申请

pContext = m_pFreeContextList;

m_pFreeContextList = m_pFreeContextList->pNext;

pContext->pNext = NULL;


m_nFreeBufferCount --;

}

::LeaveCriticalSection(&m_FreeContextListLock);


// 初始化对象成员

if(pContext != NULL)

{

pContext->s = s;

}

return pContext;

}


void CIOCPServer::ReleaseContext(CIOCPContext *pContext)

{

if(pContext->s != INVALID_SOCKET)

::closesocket(pContext->s);


// 首先释放(如果有的话)此套节字上的没有按顺序完成的读I/O的缓冲区

CIOCPBuffer *pNext;

while(pContext->pOutOfOrderReads != NULL)

{

pNext = pContext->pOutOfOrderReads->pNext;

ReleaseBuffer(pContext->pOutOfOrderReads);

pContext->pOutOfOrderReads = pNext;

}


::EnterCriticalSection(&m_FreeContextListLock);


if(m_nFreeContextCount <= m_nMaxFreeContexts) // 添加到空闲列表

{

// 先将关键代码段变量保存到一个临时变量中

CRITICAL_SECTION cstmp = pContext->Lock;

// 将要释放的上下文对象初始化为0

memset(pContext, 0, sizeof(CIOCPContext));


// 再放会关键代码段变量,将要释放的上下文对象添加到空闲列表的表头

pContext->Lock = cstmp;

pContext->pNext = m_pFreeContextList;

m_pFreeContextList = pContext;


// 更新计数

m_nFreeContextCount ++;

}

else

{

::DeleteCriticalSection(&pContext->Lock);

::HeapFree(::GetProcessHeap(), 0, pContext);

}


::LeaveCriticalSection(&m_FreeContextListLock);

}


void CIOCPServer::FreeBuffers()

{

// 遍历m_pFreeBufferList空闲列表,释放缓冲区池内存

::EnterCriticalSection(&m_FreeBufferListLock);


CIOCPBuffer *pFreeBuffer = m_pFreeBufferList;

CIOCPBuffer *pNextBuffer;

while(pFreeBuffer != NULL)

{

pNextBuffer = pFreeBuffer->pNext;

if(!::HeapFree(::GetProcessHeap(), 0, pFreeBuffer))

{

#ifdef _DEBUG

::OutputDebugString("  FreeBuffers释放内存出错!");

#endif // _DEBUG

break;

}

pFreeBuffer = pNextBuffer;

}

m_pFreeBufferList = NULL;

m_nFreeBufferCount = 0;


::LeaveCriticalSection(&m_FreeBufferListLock);

}


void CIOCPServer::FreeContexts()

{

// 遍历m_pFreeContextList空闲列表,释放缓冲区池内存

::EnterCriticalSection(&m_FreeContextListLock);


CIOCPContext *pFreeContext = m_pFreeContextList;

CIOCPContext *pNextContext;

while(pFreeContext != NULL)

{

pNextContext = pFreeContext->pNext;


::DeleteCriticalSection(&pFreeContext->Lock);

if(!::HeapFree(::GetProcessHeap(), 0, pFreeContext))

{

#ifdef _DEBUG

::OutputDebugString("  FreeBuffers释放内存出错!");

#endif // _DEBUG

break;

}

pFreeContext = pNextContext;

}

m_pFreeContextList = NULL;

m_nFreeContextCount = 0;


::LeaveCriticalSection(&m_FreeContextListLock);

}



BOOL CIOCPServer::AddAConnection(CIOCPContext *pContext)

{

// 向客户连接列表添加一个CIOCPContext对象


::EnterCriticalSection(&m_ConnectionListLock);

if(m_nCurrentConnection <= m_nMaxConnections)

{

// 添加到表头

pContext->pNext = m_pConnectionList;

m_pConnectionList = pContext;

// 更新计数

m_nCurrentConnection ++;


::LeaveCriticalSection(&m_ConnectionListLock);

return TRUE;

}

::LeaveCriticalSection(&m_ConnectionListLock);


return FALSE;

}


void CIOCPServer::CloseAConnection(CIOCPContext *pContext)

{

// 首先从列表中移除要关闭的连接

::EnterCriticalSection(&m_ConnectionListLock);


CIOCPContext* pTest = m_pConnectionList;

if(pTest == pContext)

{

m_pConnectionList =  pContext->pNext;

m_nCurrentConnection --;

}

else

{

while(pTest != NULL && pTest->pNext !=  pContext)

pTest = pTest->pNext;

if(pTest != NULL)

{

pTest->pNext =  pContext->pNext;

m_nCurrentConnection --;

}

}


::LeaveCriticalSection(&m_ConnectionListLock);


// 然后关闭客户套节字

::EnterCriticalSection(&pContext->Lock);


if(pContext->s != INVALID_SOCKET)

{

::closesocket(pContext->s);

pContext->s = INVALID_SOCKET;

}

pContext->bClosing = TRUE;


::LeaveCriticalSection(&pContext->Lock);

}


void CIOCPServer::CloseAllConnections()

{

// 遍历整个连接列表,关闭所有的客户套节字


::EnterCriticalSection(&m_ConnectionListLock);


CIOCPContext *pContext = m_pConnectionList;

while(pContext != NULL)

{

::EnterCriticalSection(&pContext->Lock);


if(pContext->s != INVALID_SOCKET)

{

::closesocket(pContext->s);

pContext->s = INVALID_SOCKET;

}


pContext->bClosing = TRUE;


::LeaveCriticalSection(&pContext->Lock);


pContext = pContext->pNext;

}


m_pConnectionList = NULL;

m_nCurrentConnection = 0;


::LeaveCriticalSection(&m_ConnectionListLock);

}



BOOL CIOCPServer::InsertPendingAccept(CIOCPBuffer *pBuffer)

{

// 将一个I/O缓冲区对象插入到m_pPendingAccepts表中


::EnterCriticalSection(&m_PendingAcceptsLock);


if(m_pPendingAccepts == NULL)

m_pPendingAccepts = pBuffer;

else

{

pBuffer->pNext = m_pPendingAccepts;

m_pPendingAccepts = pBuffer;

}

m_nPendingAcceptCount ++;


::LeaveCriticalSection(&m_PendingAcceptsLock);


return TRUE;

}


BOOL CIOCPServer::RemovePendingAccept(CIOCPBuffer *pBuffer)

{

BOOL bResult = FALSE;


// 遍历m_pPendingAccepts表,从中移除pBuffer所指向的缓冲区对象

::EnterCriticalSection(&m_PendingAcceptsLock);


CIOCPBuffer *pTest = m_pPendingAccepts;

if(pTest == pBuffer)// 如果是表头元素

{

m_pPendingAccepts = pBuffer->pNext;

bResult = TRUE;

}

else// 不是表头元素的话,就要遍历这个表来查找了

{

while(pTest != NULL && pTest->pNext != pBuffer)

pTest = pTest->pNext;

if(pTest != NULL)

{

pTest->pNext = pBuffer->pNext;

 bResult = TRUE;

}

}

// 更新计数

if(bResult)

m_nPendingAcceptCount --;


::LeaveCriticalSection(&m_PendingAcceptsLock);


return  bResult;

}



CIOCPBuffer *CIOCPServer::GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer)

{

if(pBuffer != NULL)

{

// 如果与要读的下一个序列号相等,则读这块缓冲区

if(pBuffer->nSequenceNumber == pContext->nCurrentReadSequence)

{

return pBuffer;

}


// 如果不相等,则说明没有按顺序接收数据,将这块缓冲区保存到连接的pOutOfOrderReads列表中


// 列表中的缓冲区是按照其序列号从小到大的顺序排列的


pBuffer->pNext = NULL;


CIOCPBuffer *ptr = pContext->pOutOfOrderReads;

CIOCPBuffer *pPre = NULL;

while(ptr != NULL)

{

if(pBuffer->nSequenceNumber < ptr->nSequenceNumber)

break;


pPre = ptr;

ptr = ptr->pNext;

}


if(pPre == NULL) // 应该插入到表头

{

pBuffer->pNext = pContext->pOutOfOrderReads;

pContext->pOutOfOrderReads = pBuffer;

}

else// 应该插入到表的中间

{

pBuffer->pNext = pPre->pNext;

pPre->pNext = pBuffer;

}

}


// 检查表头元素的序列号,如果与要读的序列号一致,就将它从表中移除,返回给用户

CIOCPBuffer *ptr = pContext->pOutOfOrderReads;

if(ptr != NULL && (ptr->nSequenceNumber == pContext->nCurrentReadSequence))

{

pContext->pOutOfOrderReads = ptr->pNext;

return ptr;

}

return NULL;

}



BOOL CIOCPServer::PostAccept(CIOCPBuffer *pBuffer)// 在监听套节字上投递Accept请求

{

// 设置I/O类型

pBuffer->nOperation = OP_ACCEPT;


// 投递此重叠I/O  

DWORD dwBytes;

pBuffer->sClient = ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);

BOOL b = m_lpfnAcceptEx(m_sListen, 

pBuffer->sClient,

pBuffer->buff, 

pBuffer->nLen - ((sizeof(sockaddr_in) + 16) * 2),

sizeof(sockaddr_in) + 16, 

sizeof(sockaddr_in) + 16, 

&dwBytes, 

&pBuffer->ol);

if(!b && ::WSAGetLastError() != WSA_IO_PENDING)

{

return FALSE;

}

return TRUE;

};


BOOL CIOCPServer::PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer)

{

// 设置I/O类型

pBuffer->nOperation = OP_READ;


::EnterCriticalSection(&pContext->Lock);


// 设置序列号

pBuffer->nSequenceNumber = pContext->nReadSequence;


// 投递此重叠I/O

DWORD dwBytes;

DWORD dwFlags = 0;

WSABUF buf;

buf.buf = pBuffer->buff;

buf.len = pBuffer->nLen;

if(::WSARecv(pContext->s, &buf, 1, &dwBytes, &dwFlags, &pBuffer->ol, NULL) != NO_ERROR)

{

if(::WSAGetLastError() != WSA_IO_PENDING)

{

::LeaveCriticalSection(&pContext->Lock);

return FALSE;

}

}


// 增加套节字上的重叠I/O计数和读序列号计数


pContext->nOutstandingRecv ++;

pContext->nReadSequence ++;


::LeaveCriticalSection(&pContext->Lock);


return TRUE;

}


BOOL CIOCPServer::PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer)

{

// 跟踪投递的发送的数量,防止用户仅发送数据而不接收,导致服务器抛出大量发送操作

if(pContext->nOutstandingSend > m_nMaxSends)

return FALSE;


// 设置I/O类型,增加套节字上的重叠I/O计数

pBuffer->nOperation = OP_WRITE;


// 投递此重叠I/O

DWORD dwBytes;

DWORD dwFlags = 0;

WSABUF buf;

buf.buf = pBuffer->buff;

buf.len = pBuffer->nLen;

if(::WSASend(pContext->s, 

&buf, 1, &dwBytes, dwFlags, &pBuffer->ol, NULL) != NO_ERROR)

{

if(::WSAGetLastError() != WSA_IO_PENDING)

return FALSE;

}


// 增加套节字上的重叠I/O计数

::EnterCriticalSection(&pContext->Lock);

pContext->nOutstandingSend ++;

::LeaveCriticalSection(&pContext->Lock);


return TRUE;

}



BOOL CIOCPServer::Start(int nPort, int nMaxConnections,

int nMaxFreeBuffers, int nMaxFreeContexts, int nInitialReads)

{

// 检查服务是否已经启动

if(m_bServerStarted)

return FALSE;

OutputDebugString("测试!/r\n");

// 保存用户参数

m_nPort = nPort;

m_nMaxConnections = nMaxConnections;

m_nMaxFreeBuffers = nMaxFreeBuffers;

m_nMaxFreeContexts = nMaxFreeContexts;

m_nInitialReads = nInitialReads;


// 初始化状态变量

m_bShutDown = FALSE;

m_bServerStarted = TRUE;



// 创建监听套节字,绑定到本地端口,进入监听模式

m_sListen = ::WSASocket(AF_INET, SOCK_STREAM,IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED);

SOCKADDR_IN si;

si.sin_family = AF_INET;

si.sin_port = ::ntohs(m_nPort);

si.sin_addr.S_un.S_addr = INADDR_ANY;

if(::bind(m_sListen, (sockaddr*)&si, sizeof(si)) == SOCKET_ERROR)

{

m_bServerStarted = FALSE;

return FALSE;

}

::listen(m_sListen, 200);


// 创建完成端口对象

m_hCompletion = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);


// 加载扩展函数AcceptEx

GUID GuidAcceptEx = WSAID_ACCEPTEX;

DWORD dwBytes;

::WSAIoctl(m_sListen, 

SIO_GET_EXTENSION_FUNCTION_POINTER, 

&GuidAcceptEx, 

sizeof(GuidAcceptEx),

&m_lpfnAcceptEx, 

sizeof(m_lpfnAcceptEx), 

&dwBytes, 

NULL, 

NULL);


// 加载扩展函数GetAcceptExSockaddrs

GUID GuidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;

::WSAIoctl(m_sListen,

SIO_GET_EXTENSION_FUNCTION_POINTER,

&GuidGetAcceptExSockaddrs,

sizeof(GuidGetAcceptExSockaddrs),

&m_lpfnGetAcceptExSockaddrs,

sizeof(m_lpfnGetAcceptExSockaddrs),

&dwBytes,

NULL,

NULL

);



// 将监听套节字关联到完成端口,注意,这里为它传递的CompletionKey为0

::CreateIoCompletionPort((HANDLE)m_sListen, m_hCompletion, (DWORD)0, 0);


// 注册FD_ACCEPT事件。

// 如果投递的AcceptEx I/O不够,线程会接收到FD_ACCEPT网络事件,说明应该投递更多的AcceptEx I/O

WSAEventSelect(m_sListen, m_hAcceptEvent, FD_ACCEPT);


// 创建监听线程

m_hListenThread = ::CreateThread(NULL, 0, _ListenThreadProc, this, 0, NULL);


return TRUE;

}


void CIOCPServer::Shutdown()

{

if(!m_bServerStarted)

return;


// 通知监听线程,马上停止服务

m_bShutDown = TRUE;

::SetEvent(m_hAcceptEvent);

// 等待监听线程退出

::WaitForSingleObject(m_hListenThread, INFINITE);

::CloseHandle(m_hListenThread);

m_hListenThread = NULL;


m_bServerStarted = FALSE;

}


DWORD WINAPI CIOCPServer::_ListenThreadProc(LPVOID lpParam)

{

CIOCPServer *pThis = (CIOCPServer*)lpParam;


// 先在监听套节字上投递几个Accept I/O

CIOCPBuffer *pBuffer;

for(int i=0; i<pThis->m_nInitialAccepts; i++)

{

pBuffer = pThis->AllocateBuffer(BUFFER_SIZE);

if(pBuffer == NULL)

return -1;

pThis->InsertPendingAccept(pBuffer);

pThis->PostAccept(pBuffer);

}


// 构建事件对象数组,以便在上面调用WSAWaitForMultipleEvents函数

pThis->m_nThread = _GetNoOfProcessors();

HANDLE *hWaitEvents = new HANDLE[2 + pThis->m_nThread ];

int nEventCount = 0;

hWaitEvents[nEventCount ++] = pThis->m_hAcceptEvent;

hWaitEvents[nEventCount ++] = pThis->m_hRepostEvent;


// 创建指定数量的工作线程在完成端口上处理I/O

for(int i=0; i<pThis->m_nThread ; i++)

{

hWaitEvents[nEventCount ++] = ::CreateThread(NULL, 0, _WorkerThreadProc, pThis, 0, NULL);

}


// 下面进入无限循环,处理事件对象数组中的事件

while(TRUE)

{

//::OutputDebugString("WSAWaitForMultipleEvents begin\n");

int nIndex = ::WSAWaitForMultipleEvents(nEventCount, hWaitEvents, FALSE,30*1000, FALSE);

//::OutputDebugString("WSAWaitForMultipleEvents end\n");

char temp[256];

//::OutputDebugString(itoa(nIndex,temp,10));

// 首先检查是否要停止服务

if(pThis->m_bShutDown || nIndex == WSA_WAIT_FAILED)

{::OutputDebugString("pThis->m_bShutDown\n");

// 关闭所有连接

pThis->CloseAllConnections();

::Sleep(0);// 给I/O工作线程一个执行的机会

// 关闭监听套节字

::closesocket(pThis->m_sListen);

pThis->m_sListen = INVALID_SOCKET;

::Sleep(0);// 给I/O工作线程一个执行的机会


// 通知所有I/O处理线程退出

for(int i=2; i<pThis->m_nThread + 2; i++)

{

::PostQueuedCompletionStatus(pThis->m_hCompletion, -1, 0, NULL);

}


// 等待I/O处理线程退出

::WaitForMultipleObjects(pThis->m_nThread , &hWaitEvents[2], TRUE, 5*1000);


for(int i=2; i<pThis->m_nThread  + 2; i++)

{

::CloseHandle(hWaitEvents[i]);

}


::CloseHandle(pThis->m_hCompletion);


pThis->FreeBuffers();

pThis->FreeContexts();

::ExitThread(0);

}


// 1)定时检查所有未返回的AcceptEx I/O的连接建立了多长时间

if(nIndex == WSA_WAIT_TIMEOUT)

{::OutputDebugString("WSA_WAIT_TIMEOUT\n");

pBuffer = pThis->m_pPendingAccepts;

while(pBuffer != NULL)

{

int nSeconds;

int nLen = sizeof(nSeconds);

// 取得连接建立的时间

::getsockopt(pBuffer->sClient, 

SOL_SOCKET, SO_CONNECT_TIME, (char *)&nSeconds, &nLen);

// 如果超过2分钟客户还不发送初始数据,就让这个客户go away

if(nSeconds != -1 && nSeconds > 2*60)

{   

closesocket(pBuffer->sClient);

pBuffer->sClient = INVALID_SOCKET;

}


pBuffer = pBuffer->pNext;

}

}

else

{

nIndex = nIndex - WAIT_OBJECT_0;

//int nRet = ::WSAWaitForMultipleEvents(1, &hWaitEvents[nIndex], TRUE, 0, FALSE);

WSANETWORKEVENTS ne;

int nLimit=0;

if(nIndex == 0)// 2)m_hAcceptEvent事件对象受信,说明投递的Accept请求不够,需要增加

{

::WSAEnumNetworkEvents(pThis->m_sListen, hWaitEvents[nIndex], &ne);

if(ne.lNetworkEvents & FD_ACCEPT)

{

nLimit = 50;  // 增加的个数,这里设为50个

}

}

else if(nIndex == 1)// 3)m_hRepostEvent事件对象受信,说明处理I/O的线程接受到新的客户

{

nLimit = InterlockedExchange(&pThis->m_nRepostCount, 0);

::OutputDebugString("处理I/O的线程接受到新的客户\n");

}

else if(nIndex > 1)// I/O服务线程退出,说明有错误发生,关闭服务器

{

pThis->m_bShutDown = TRUE;

continue;

}

::OutputDebugString("投递nLimit个AcceptEx I/O请求\n");

// 投递nLimit个AcceptEx I/O请求

int i = 0;

while(i++ < nLimit && pThis->m_nPendingAcceptCount < pThis->m_nMaxAccepts)

{

pBuffer = pThis->AllocateBuffer(BUFFER_SIZE);

if(pBuffer != NULL)

{

pThis->InsertPendingAccept(pBuffer);

pThis->PostAccept(pBuffer);

}

}

}

}


delete []hWaitEvents;

return 0;

}


DWORD WINAPI CIOCPServer::_WorkerThreadProc(LPVOID lpParam)

{

#ifdef _DEBUG

::OutputDebugString("WorkerThread 启动... \n");

#endif // _DEBUG


CIOCPServer *pThis = (CIOCPServer*)lpParam;


CIOCPBuffer *pBuffer;

DWORD dwKey;

DWORD dwTrans;

LPOVERLAPPED lpol;

while(TRUE)

{

::OutputDebugString("GetQueuedCompletionStatus begin\n");

// 在关联到此完成端口的所有套节字上等待I/O完成

BOOL bOK = ::GetQueuedCompletionStatus(pThis->m_hCompletion, 

&dwTrans, (LPDWORD)&dwKey, (LPOVERLAPPED*)&lpol, WSA_INFINITE);

::OutputDebugString("GetQueuedCompletionStatus completed\n");

if(dwTrans == -1) // 用户通知退出

{

#ifdef _DEBUG

::OutputDebugString("WorkerThread 退出 \n");

#endif // _DEBUG

::ExitThread(0);

}


pBuffer = CONTAINING_RECORD(lpol, CIOCPBuffer, ol);

int nError = NO_ERROR;

if(!bOK)// 在此套节字上有错误发生

{

SOCKET s;

if(pBuffer->nOperation == OP_ACCEPT)

{

s = pThis->m_sListen;

}

else

{

if(dwKey == 0)

break;

s = ((CIOCPContext*)dwKey)->s;

}

DWORD dwFlags = 0;

if(!::WSAGetOverlappedResult(s, &pBuffer->ol, &dwTrans, FALSE, &dwFlags))

{

nError = ::WSAGetLastError();

}

}

WaitForSingleObject(pThis->m_hMutex,INFINITE);

pThis->HandleIO(dwKey, pBuffer, dwTrans, nError);

ReleaseMutex(pThis->m_hMutex);

}


#ifdef _DEBUG

::OutputDebugString("WorkerThread 退出 \n");

#endif // _DEBUG

return 0;

}



void CIOCPServer::HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError)

{

CIOCPContext *pContext = (CIOCPContext *)dwKey;


#ifdef _DEBUG

::OutputDebugString("HandleIO... \n");

#endif // _DEBUG


// 1)首先减少套节字上的未决I/O计数

if(pContext != NULL)

{

::EnterCriticalSection(&pContext->Lock);


if(pBuffer->nOperation == OP_READ)

pContext->nOutstandingRecv --;

else if(pBuffer->nOperation == OP_WRITE)

pContext->nOutstandingSend --;


::LeaveCriticalSection(&pContext->Lock);


// 2)检查套节字是否已经被我们关闭

if(pContext->bClosing) 

{

#ifdef _DEBUG

::OutputDebugString("检查到套节字已经被我们关闭 \n");

#endif // _DEBUG

if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)

{

ReleaseContext(pContext);

}

// 释放已关闭套节字的未决I/O

ReleaseBuffer(pBuffer);

return;

}

}

else

{

RemovePendingAccept(pBuffer);

#ifdef _DEBUG

::OutputDebugString("RemovePendingAccept(pBuffer);\n");

#endif // _DEBUG

}


// 3)检查套节字上发生的错误,如果有的话,通知用户,然后关闭套节字

if(nError != NO_ERROR)

{

if(pBuffer->nOperation != OP_ACCEPT)

{

OnConnectionError(pContext, pBuffer, nError);

CloseAConnection(pContext);

if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)

{

ReleaseContext(pContext);

}

#ifdef _DEBUG

::OutputDebugString("检查到客户套节字上发生错误 \n");

#endif // _DEBUG

}

else // 在监听套节字上发生错误,也就是监听套节字处理的客户出错了

{

// 客户端出错,释放I/O缓冲区

if(pBuffer->sClient != INVALID_SOCKET)

{

::closesocket(pBuffer->sClient);

pBuffer->sClient = INVALID_SOCKET;

}

#ifdef _DEBUG

::OutputDebugString("检查到监听套节字上发生错误 \n");

#endif // _DEBUG

}


ReleaseBuffer(pBuffer);

return;

}

//printf("%d\n",pBuffer->nOperation);

//#ifdef _DEBUG

//::OutputDebugString((const char *)pBuffer->nOperation);

//#endif // _DEBUG


// 开始处理

if(pBuffer->nOperation == OP_ACCEPT)

{

if(dwTrans == 0)

{

#ifdef _DEBUG

::OutputDebugString("监听套节字上客户端关闭 \n");

#endif // _DEBUG


if(pBuffer->sClient != INVALID_SOCKET)

{

::closesocket(pBuffer->sClient);

pBuffer->sClient = INVALID_SOCKET;

}

}

else

{

// 为新接受的连接申请客户上下文对象

CIOCPContext *pClient = AllocateContext(pBuffer->sClient);

if(pClient != NULL)

{

if(AddAConnection(pClient))

{

// 取得客户地址

::OutputDebugString("AddAConnection(pClient)\n");

int nLocalLen, nRmoteLen;

LPSOCKADDR pLocalAddr, pRemoteAddr;

m_lpfnGetAcceptExSockaddrs(

pBuffer->buff,

pBuffer->nLen - ((sizeof(sockaddr_in) + 16) * 2),

sizeof(sockaddr_in) + 16,

sizeof(sockaddr_in) + 16,

(SOCKADDR **)&pLocalAddr,

&nLocalLen,

(SOCKADDR **)&pRemoteAddr,

&nRmoteLen);

memcpy(&pClient->addrLocal, pLocalAddr, nLocalLen);

memcpy(&pClient->addrRemote, pRemoteAddr, nRmoteLen);


// 关联新连接到完成端口对象

::CreateIoCompletionPort((HANDLE)pClient->s, m_hCompletion, (DWORD)pClient, 0);


// 通知用户

pBuffer->nLen = dwTrans;

OnConnectionEstablished(pClient, pBuffer);


// 向新连接投递几个Read请求,这些空间在套节字关闭或出错时释放

::OutputDebugString("New Connect Post 5 Recv Request\n");

for(int i=0; i<5; i++)

{

CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);

if(p != NULL)

{

if(!PostRecv(pClient, p))

{

CloseAConnection(pClient);

break;

}

}

}

}

else// 连接数量已满,关闭连接

{

CloseAConnection(pClient);

ReleaseContext(pClient);

}

}

else

{

// 资源不足,关闭与客户的连接即可

::closesocket(pBuffer->sClient);

pBuffer->sClient = INVALID_SOCKET;

}

}


// Accept请求完成,释放I/O缓冲区

ReleaseBuffer(pBuffer);


// 通知监听线程继续再投递一个Accept请求

::InterlockedIncrement(&m_nRepostCount);

::SetEvent(m_hRepostEvent);

}

else if(pBuffer->nOperation == OP_READ)

{

if(dwTrans == 0)// 对方关闭套节字

{

// 先通知用户

pBuffer->nLen = 0;

OnConnectionClosing(pContext, pBuffer);

// 再关闭连接

CloseAConnection(pContext);

// 释放客户上下文和缓冲区对象

if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)

{

ReleaseContext(pContext);

}

ReleaseBuffer(pBuffer);

}

else

{

pBuffer->nLen = dwTrans;

// 按照I/O投递的顺序读取接收到的数据

CIOCPBuffer *p = GetNextReadBuffer(pContext, pBuffer);

while(p != NULL)

{

// 通知用户

OnReadCompleted(pContext, p);

// 增加要读的序列号的值

::InterlockedIncrement((LONG*)&pContext->nCurrentReadSequence);

// 释放这个已完成的I/O

ReleaseBuffer(p);

p = GetNextReadBuffer(pContext, NULL);

}


// 继续投递一个新的接收请求

pBuffer = AllocateBuffer(BUFFER_SIZE);

if(pBuffer == NULL || !PostRecv(pContext, pBuffer))

{

CloseAConnection(pContext);

}

}

}

else if(pBuffer->nOperation == OP_WRITE)

{


if(dwTrans == 0)// 对方关闭套节字

{

// 先通知用户

pBuffer->nLen = 0;

OnConnectionClosing(pContext, pBuffer);


// 再关闭连接

CloseAConnection(pContext);


// 释放客户上下文和缓冲区对象

if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)

{

ReleaseContext(pContext);

}

ReleaseBuffer(pBuffer);

}

else

{

// 写操作完成,通知用户

pBuffer->nLen = dwTrans;

OnWriteCompleted(pContext, pBuffer);

// 释放SendText函数申请的缓冲区

ReleaseBuffer(pBuffer);

}

}

}



BOOL CIOCPServer::SendText(CIOCPContext *pContext, char *pszText, int nLen)

{

CIOCPBuffer *pBuffer = AllocateBuffer(nLen);

if(pBuffer != NULL)

{

memcpy(pBuffer->buff, pszText, nLen);

return PostSend(pContext, pBuffer);

}

return FALSE;

}


int CIOCPServer::_GetNoOfProcessors()

{

SYSTEM_INFO si;


GetSystemInfo(&si);


return si.dwNumberOfProcessors;

}


void CIOCPServer::OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer)

{

}


void CIOCPServer::OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer)

{

}



void CIOCPServer::OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)

{

}


void CIOCPServer::OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)

{

}


void CIOCPServer::OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError)

{

}

[/code]

PS:不排除其他bug,如有问题请指正,喷子滚一表去。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: