您的位置:首页 > 编程语言

windows Socket编程之完成端口模型

2016-08-17 15:09 274 查看
上篇介绍了重叠IO模型,它已经把我们的等待数据到来和拷贝数据到我们程序的缓冲区这个时间全部交给了操作系统去完成了,它已经很完善了。但是,如果我们想要把服务端的性能做的更好一点的话,它还是有点不足的,比如说,重叠IO它只有一个工作者线程在工作,要想获得更高的效率,我们应该增加一些线程。而且,重叠IO它只能同时进行64个客户端的数据收发,这也是一个很大的问题。为此,就产生了完成端口模型,它就解决了这些问题。可以说,在windows下做大型服务端的开发,完成端口模型是不二之选。

前面几种模型呢,我们都是在main函数里面直接写的,由于这个完成端口比较重要,我们就给它稍微封装一下。首先,新建一个完成端口的类,在构造函数里面我们对它的一些数据成员进行初始化和对网络环境的初始化。其次它有几个公有成员函数,第一个是Initialize初始化函数,来看下它的声明:

bool Initialize(
NOTIFYPROC pNotifyProc, //回调函数地址
int nPort               //端口
);
第一个参数的的类型是我们自定义的一个函数指针,它的格式如下:

typedef void (CALLBACK* NOTIFYPROC)(LPVOID pContext,LPBYTE lpBuf, DWORD dwSize);
在Initialize这个函数里面我们做了哪些事情呢,首先,我们调用WSASocket来建立一个监听的udp的socket,然后进行绑定,因为是udp所以不用进行监听了。接下来,我们调用CreateIoCompletionPort创建一个完成端口对象,其声明如下:

HANDLE CreateIoCompletionPort (
HANDLE FileHandle,              // 文件的句柄
HANDLE ExistingCompletionPort,  // 已存在的完成端口对象,为NULL表示新建一个完成端口
ULONG_PTR CompletionKey,        // 传递给处理函数的参数
DWORD NumberOfConcurrentThreads // 最大访问该IO操作的线程数
);
我们给这些参数赋值为无效值或空,这样我们就新建了一个完成端口了。

接下来,我们再次调用CreateIoCompletionPort函数,这是我们给的参数与上面的不一样了,第一个参数我们把监听的那个socket传进去,因为socket也是一个特殊的文件句柄。第二个参数我们把刚创建的完成端口传进去。第三个参数我们也把监听的socket传过去,第四个参数我们一般把它赋值为CPU核心数的2倍左右的个数。调用完该函数之后,我们就已经将一个socket与我们的完成端口进行了一个绑定。

与完成端口绑定好了之后,我们就创建一些工作者线程,创建线程的数量与上面那个函数的第四个参数的个数就行了。创建好线程之后我们会调用另外一个成员函数PostRecv函数,来投递一个接收请求。至此,Initialize成员函数就到此为止了。

接下来看下工作者线程是如何工作的。在工作者线程函数里,有一个循环,我们调用了GetQueuedCompletionStatus这么一个函数,它和重叠IO模型l里面的WSAWaitForMultipleEvents函数的功能是相似的,这个函数呢是用于监控一个完成端口,看这个完成端口上所投递的任务是否完成,如果完成就返回,否则继续监控。其声明如下:

BOOL GetQueuedCompletionStatus(
HANDLE CompletionPort,       // 指定的完成端口
LPDWORD lpNumberOfBytes,     // 传输的字节数
PULONG_PTR lpCompletionKey,  // 用于接收IO操作完成后与文件句柄相关联的Completion Key
LPOVERLAPPED *lpOverlapped,  // overlapped结构
DWORD dwMilliseconds         // 等待时间
);
我们在最后一个参数传了一个INFINITE,表示无限的等待。第三个参数overlapped结构我们也和重叠IO的单IO结构一样,对OVERLAPPED结构进行了扩展。该类声明如下:

class OVERLAPPEDPLUS
{
public:
OVERLAPPED m_ol;<span style="white-space:pre">		</span>//overlapped结构
WSABUF DataBuf;    <span style="white-space:pre">		</span>//WSABUF结构,里面保存了大小和指针
CHAR   Buffer[BUFSIZE];   <span style="white-space:pre">	</span>//缓冲区
unsigned long recvBytes;     <span style="white-space:pre">	</span>//存储接收到的字节数
<span style="white-space:pre">	</span>SOCKADDR_IN remoteAddr; <span style="white-space:pre">	</span>//存储数据来源IP地址
<span style="white-space:pre">	</span>int remoteAddrLen;              //存储数据来源IP地址长度

OVERLAPPEDPLUS() {
ZeroMemory(this, sizeof(OVERLAPPEDPLUS));
}
};

当GetQueuedCompletionStatus成功返回的时候,我们可以来调用成员函数OnRecv来处理接收到的数据。至此,我们的工作者线程函数就完成了。

我们在Initialize函数最后投递了一个接收请求,我们来看下这个成员函数的实现。在这个函数的开始处,我们调用AllocateContext成员函数为OVERLAPPEDPLUS申请了一段内存,然后,我们调用WSARecvFrom来接收数据,由于该函数是异步的,它会立即返回,其声明如下:

int WSARecvFrom(
SOCKET s,                     <span style="white-space:pre">			</span>  //监听的socket
LPWSABUF lpBuffers,                                     //WSABUFFER指针
DWORD dwBufferCount,                                    //buf的大小
LPDWORD lpNumberOfBytesRecvd,                           //接收的字节数
LPDWORD lpFlags,                                        //标志位
struct sockaddr FAR *lpFrom,                            //接收原端口地址
LPINT lpFromlen,                                        //接收原端口地址的大小
LPWSAOVERLAPPED lpOverlapped,                           //overlapped结构
LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine  //指向接收操作完成后调用的例程的指针,这里给NULL就行了
);
当这个函数成功返回的时候,我们的工作者线程函数里面的GetQueuedCompletionStatus就会返回,我们给GetQueuedCompletionStatus里面传了一个overlapped结构,返回时它也会返回overlapped结构的地址,找到了地址,我们自然也知道了里面的一些信息了。

PostRecv完了之后,我们来看下OnRecv成员函数,在这个函数里面我们定义的一个回调函数,在里面我们来真正的处理接收到的数据,然后调用成员函数MoveToFreePool来释放内存,最后再调用PostRecv来继续投递一个接收请求。我们的OnRecv就做这么多。

在PostRecv开始处我们为OVERLAPPED结构申请了一段内存,调用AllocateContext来实现,这个函数里,我们调用Interlocked系列来做了一个自旋锁,因为有多个线程访问关键数据,我们必须做线程同步操作。然后我们又做了一个最简单的内存池。我们用两个链表来保存
4000
OVERLAPPED结构的指针,一个是保存正在使用的,一个是空闲的,当我们需要申请内存的时候,我们就查看空闲链表里面是否为空,不为空我们就直接拿来用,否则,就申请。然后初始化一下OVERLAPPEDPLUS的成员。

在MoveToFreePool里面,我们也会做个自旋锁来进行线程同步,然后遍历正在使用的链表,找到要释放的指针,将其移除链表,然后加到空闲链表里边去。

我们来看下资源释放的成员函数CloseCompletionPort,我们在Initialize函数里创建了一些线程,我们需要调用PostQueuedCompletionStatus这个函数将其释放,其声明如下:

BOOL PostQueuedCompletionStatus(
HANDLE CompletionPort,            // 完成端口句柄
DWORD dwNumberOfBytesTransferred, // 传输的字节数,这里给0
ULONG_PTR dwCompletionKey,        // 这里给0
LPOVERLAPPED lpOverlapped         // overlapped结构,给0
);
我们将后三个参数全部给0,特别是第三个,我们会在工作者线程函数调用GetQueuedCompletionStatus之后,检查一下第三个参数是否为0,若是则表示要关闭了,就退出该工作者线程,线程就自然结束了。至于剩下的操作就是删除两个链表里面的指针了。

至此,完成端口模型就结束了。以下是其示例代码:

#ifndef AFX_IOCPSERVER_H__75B80E90_FD25_4FFB_B273_0090AA43BBDF__INCLUDED_
#define AFX_IOCPSERVER_H__75B80E90_FD25_4FFB_B273_0090AA43BBDF__INCLUDED_

#include <WINSOCK2.H>
#include <windows.h>
#include <algorithm>
#include <list>

using namespace std;
#pragma comment(lib,"ws2_32.lib")
#define BUFSIZE     512

class OVERLAPPEDPLUS
{
public:
OVERLAPPED			m_ol;	//overlapped结构
WSABUF DataBuf;    			//WSABUF指针
CHAR   Buffer[BUFSIZE];    //缓冲区
unsigned long recvBytes;      //存储接收到的字节数
SOCKADDR_IN remoteAddr; //存储数据来源IP地址
int remoteAddrLen;              //存储数据来源IP地址长度

OVERLAPPEDPLUS() {
ZeroMemory(this, sizeof(OVERLAPPEDPLUS));
}
};

//函数指针
typedef void (CALLBACK* NOTIFYPROC)(LPVOID pContext,LPBYTE lpBuf, DWORD dwSize);
typedef list<OVERLAPPEDPLUS* > ContextList;

class CIOCPServer
{
public:
CIOCPServer();
virtual ~CIOCPServer();

NOTIFYPROC					m_pNotifyProc;

bool Initialize(NOTIFYPROC pNotifyProc, int nPort);

static unsigned __stdcall WorkerThreadFunc(LPVOID WorkContext);

bool PostSend(OVERLAPPEDPLUS* pContext, LPBYTE lpData, UINT nSize);
bool PostRecv();
<span style="white-space:pre">	volatile LONG<span style="white-space:pre">			</span>g_fResourceInUse ;<span style="white-space:pre">	</span>//实现自旋锁进行内存池管理</span>
void Shutdown();

LONG					m_nCurrentThreads;	//当前开启的线程数
LONG					m_nBusyThreads;		//正在工作的线程数

ContextList				m_listContexts;		//正在使用的链表
ContextList				m_listFreePool;		//空闲链表

protected:

void MoveToFreePool(OVERLAPPEDPLUS *pContext);
OVERLAPPEDPLUS*  AllocateContext();

bool				m_bInit;				//是否完成初始化并运行

void CloseCompletionPort();
void Stop();

SOCKET					m_socListen;

HANDLE					m_hCompletionPort;
bool					m_bTimeToKill;

string GetHostName(SOCKET socket);

bool OnRecv		(OVERLAPPEDPLUS* pContext, DWORD dwSize = 0);

};

#endif

#include "IOCPServer.h"

CIOCPServer::CIOCPServer()
{
printf("CIOCPServer=%p\n",this);

WSADATA wsaData;
WSAStartup(MAKEWORD(2,2), &wsaData);

m_socListen		= NULL;

m_bTimeToKill		= false;

m_hCompletionPort= NULL;

m_bInit = false;
m_nCurrentThreads	= 0;
m_nBusyThreads		= 0;

m_nSendKbps = 0;
m_nRecvKbps = 0;

g_fResourceInUse = FALSE;
}

CIOCPServer::~CIOCPServer()
{

printf("IOCPServer[%p] is shuting down\n",this);
Shutdown();
WSACleanup();
}

bool CIOCPServer::Initialize(NOTIFYPROC pNotifyProc,int nPort)
{
m_pNotifyProc	= pNotifyProc;
DWORD Flags = 0;
//创建监听的socket
m_socListen = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED);

if (m_socListen == INVALID_SOCKET)
{
printf("Could not create listen socket %ld\n",WSAGetLastError());
return false;
}
//绑定
SOCKADDR_IN		saServer;
saServer.sin_port = htons(nPort);
saServer.sin_family = AF_INET;
saServer.sin_addr.s_addr = INADDR_ANY;

nRet = bind(m_socListen, (LPSOCKADDR)&saServer, sizeof(struct sockaddr));

if (nRet == SOCKET_ERROR)
{
printf("bind() error %ld\n",WSAGetLastError());
closesocket(m_socListen);
return false;
}
printf("CIOCPServer[%p] is binded on port %d now\n",this,nPort);
//创建完成端口
m_hCompletionPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0 );
if ( m_hCompletionPort == NULL )
{
printf("CIOCPServer[%p] CreateIoCompletionPort failed\n",this);
closesocket( m_socListen );
return false;
}
SYSTEM_INFO systemInfo;
GetSystemInfo( &systemInfo );
// 每个CPU核心建立2个工作者线程
UINT nWorkerCnt = systemInfo.dwNumberOfProcessors * 2;
//将完成端口与监听的socket进行绑定
CreateIoCompletionPort ((HANDLE)m_socListen,m_hCompletionPort,(DWORD)m_socListen,nWorkerCnt);

// 创建一些线程
HANDLE hWorker;
for ( UINT i = 0; i < nWorkerCnt; i++ )
{
UINT tmp=i+1;
hWorker=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)WorkerThreadFunc,this,0,(LPDWORD)&tmp);
if (hWorker == NULL )
{
CloseHandle( m_hCompletionPort );
return false;
}
//投递一个接收请求
PostRecv();
m_nCurrentThreads++;
//关闭句柄,那么线程就没人管的运行了
CloseHandle(hWorker);
}
m_bInit = true;
return true;
}

unsigned CIOCPServer::WorkerThreadFunc (LPVOID thisContext)
{
CIOCPServer* pThis = reinterpret_cast<CIOCPServer*>(thisContext);

HANDLE hCompletionPort = pThis->m_hCompletionPort;
DWORD dwIoSize;
OVERLAPPEDPLUS*	pOverlapPlus;
bool			bError;
DWORD nSocket;
InterlockedIncrement(&pThis->m_nBusyThreads);

while(true)
{
pOverlapPlus	= NULL;
bError			= false;

InterlockedDecrement(&pThis->m_nBusyThreads);
//无线等待是否有任务完成了
BOOL bIORet = GetQueuedCompletionStatus(
hCompletionPort,
&dwIoSize,
(LPDWORD) &nSocket,
(LPOVERLAPPED *)&pOverlapPlus, INFINITE);
//nSocket=0表示要退出线程了
if (nSocket == 0)
{
break;
}

int nBusyThreads = InterlockedIncrement(&pThis->m_nBusyThreads);
DWORD dwIOError = GetLastError();
//若接收出错了
if (dwIoSize == 0 || dwIOError == SOCKET_ERROR || bIORet == 0)
{
pThis->MoveToFreePool(pOverlapPlus);
continue;
}
//处理接收的数据
pThis->OnRecv(pOverlapPlus,dwIoSize);

}

InterlockedDecrement(&pThis->m_nCurrentThreads);
InterlockedDecrement(&pThis->m_nBusyThreads);
return 0;
}

string CIOCPServer::GetHostName(SOCKET socket)
{
sockaddr_in  sockAddr;
memset(&sockAddr, 0, sizeof(sockAddr));
int nSockAddrLen = sizeof(sockAddr);
BOOL bResult = getpeername(socket,(SOCKADDR*)&sockAddr, &nSockAddrLen);
return bResult != INVALID_SOCKET ? inet_ntoa(sockAddr.sin_addr) : "";
}

bool CIOCPServer::PostRecv()
{
// 分配内存
OVERLAPPEDPLUS * PerIoData =AllocateContext();
if (PerIoData == NULL)
{
printf("GlobalAlloc() failed with error %d\n", GetLastError());
return false;
}

DWORD RecvBytes=0;
DWORD Flags=0;
//接收数据
if (WSARecvFrom(m_socListen, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags,
(sockaddr*)&(PerIoData->remoteAddr),&PerIoData->remoteAddrLen,
&(PerIoData->m_ol), NULL) == SOCKET_ERROR)
{
if (WSAGetLastError() != ERROR_IO_PENDING)
{
printf("WSARecvFrom() failed with error %d\n", WSAGetLastError());
return false;
}
}
return true;
}

bool CIOCPServer::PostSend(OVERLAPPEDPLUS* pContext, LPBYTE lpData, UINT nSize)
{
if (pContext == NULL)
return false;
//发送数据
sendto( m_socListen, (char *)lpData,nSize, 0 , (sockaddr*)&(pContext->remoteAddr), sizeof(pContext->remoteAddr));

return true;
}

bool CIOCPServer::OnRecv(OVERLAPPEDPLUS* pContext, DWORD dwIoSize)
{
if (dwIoSize == 0)
{
return false;
}

printf("BusyThread: %d\t- %s\n",m_nBusyThreads,pContext->Buffer);
//回调函数,处理数据
m_pNotifyProc(pContext,(LPBYTE)pContext->Buffer, dwIoSize);
//释放内存
MoveToFreePool(pContext);
PostRecv();

return true;
}

void CIOCPServer::CloseCompletionPort()
{
while (m_nCurrentThreads)
{
PostQueuedCompletionStatus(m_hCompletionPort, 0, (DWORD) NULL, NULL);
Sleep(100);
}
// 关闭完成端口句柄
CloseHandle(m_hCompletionPort);
//清除指针操作
OVERLAPPEDPLUS* pContext = NULL;
list<OVERLAPPEDPLUS* >::iterator iter;
for(iter=m_listContexts.begin();iter!=m_listContexts.end();)
{
pContext=*iter++;
delete pContext;
}
m_listContexts.clear();
for(iter=m_listFreePool.begin();iter!=m_listFreePool.end();)
{
pContext=*iter++;
delete pContext;
}
m_listFreePool.clear();

}

void CIOCPServer::Shutdown()
{
if (m_bInit == false)
return;

m_bInit = false;
m_bTimeToKill = true;

CloseCompletionPort();
closesocket(m_socListen);

}

void CIOCPServer::MoveToFreePool(OVERLAPPEDPLUS *pContext)
{
//线程同步,自旋锁
while(InterlockedExchange(&g_fResourceInUse,TRUE) == TRUE)Sleep(1);
//将要释放的指针添加到空闲链表中
ContextList::iterator iter;
iter=find(m_listContexts.begin(),m_listContexts.end(),pContext);
if (iter!=m_listContexts.end())
{
m_listContexts.remove(pContext);
m_listFreePool.push_back(pContext);
}
InterlockedExchange(&g_fResourceInUse,FALSE);
}

OVERLAPPEDPLUS*  CIOCPServer::AllocateContext()
{
OVERLAPPEDPLUS* pContext = NULL;
//自旋锁,线程同步
while(InterlockedExchange(&g_fResourceInUse,TRUE) == TRUE)Sleep(1);
//若空闲链表不为空
if (!m_listFreePool.empty())
{
pContext = m_listFreePool.front();
m_listFreePool.remove(pContext);
}
else
{
pContext = new OVERLAPPEDPLUS();
}
//初始化赋值操作
ZeroMemory(pContext, sizeof(OVERLAPPEDPLUS));
pContext->DataBuf.len = BUFSIZE;
pContext->DataBuf.buf = pContext->Buffer;
pContext->recvBytes = 24;
pContext->remoteAddrLen = sizeof(pContext->remoteAddr);
m_listContexts.push_back(pContext);
InterlockedExchange(&g_fResourceInUse,FALSE);
return pContext;
}
#include <conio.h>
#include "IOCPServer.h"

void CALLBACK cb(LPVOID pContext,LPBYTE lpBuf, DWORD dwSize)
{
printf("%s\n",lpBuf);
return;
}
int main(int argc, char * argv[])
{
CIOCPServer * server=new CIOCPServer();
server->Initialize(cb,6000);
_getch();
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息