您的位置:首页 > 其它

完成端口I/O模型编写心得!

2009-08-13 11:09 375 查看
完成端口学习心得:
废话少说,说说思路,再说说步骤,
思路:
完成端口是创建高性能I/O处理服务器的利器,在windows平台,性能最好的I/O模型就是他。因为完成端口模型主要是利用了,重叠I/O的异步套接字。套接字函数都是异步执行的。
而且现在自己面临的是一个TCP服务器。所以用他。
步骤:
1.       创建一个完成端口。
2.       创建于CPU数量相当的线程(用于处理I//O请求)。
3.       创建一个监听套接字,用于监听客户端的连接请求
4.       在主线程的循环中不断等待客户请求(Accept)
5.       /当有客户来连接成功后,将新的连接套接字和完成端口相关联
6.       在新的套接字上投递相关数据和接收请求
7.       当再有新的连接的时候重复 4~6
 
服务器端代码如下:
 
]#include <assert.h>

#include <process.h>

#include <Winsock2.h>

#include "GlobalData.h"

#include <iostream>

using namespace std;

#pragma comment(lib, "Ws2_32.lib")

#define SOCK_QUIT -1

#define SOCK_READ 1

HANDLE CreateNewCompletionPort(DWORD dwNumberOfConcurrentThreads )

{

return CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, dwNumberOfConcurrentThreads);

}

void AssociateCompletionPort(HANDLE hFileHandle, HANDLE hExistingCompletionPort, ULONG_PTR uCompletionKey)

{

CreateIoCompletionPort(hFileHandle, hExistingCompletionPort, uCompletionKey, 0);

}

void InitWinSockLib()

{

WSADATA wsadata;

int nRet(0);

nRet = WSAStartup(MAKEWORD(2,2), &wsadata);

if(nRet != 0)

{

OutputDebugString(TEXT("init err!"));

return;

}

if(LOWORD(wsadata.wVersion) != 2 || HIWORD(wsadata.wVersion) != 2)

{

OutputDebugString(TEXT("wrong version"));

return;

}

return;

}

void UnloadSockLib()

{

WSACleanup();

}

DWORD dwRecvCount(0);

unsigned int __stdcall WorkerThread(PVOID pArg)

{

HANDLE hCompletionPort = *(HANDLE *)pArg;

assert(hCompletionPort != NULL);

DWORD dwBytesOfTransfer(0);

//ULONG_PTR uCompletionKey(0);

//OVERLAPPED *pOverlapped = NULL;

PER_IO_DATA *pPerIoData(NULL) , *pKey(NULL);

ULONG_PTR uPortData(0);

BOOL bRet(FALSE);

DWORD dwFlags(0);

DWORD dwReceiveBytes(0);

int nRet(0);

__try

{

while(1)

{

bRet = GetQueuedCompletionStatus(hCompletionPort, &dwBytesOfTransfer, (PULONG_PTR)&uPortData, (LPOVERLAPPED*)(&pPerIoData), INFINITE);

cout<<"/r/nbRet :"<<bRet<<"Current Thread Id:"<<GetCurrentThreadId()<<endl;

if(bRet)

{

pKey = (PER_IO_DATA*)uPortData;

if(pKey->bOperationType == SOCK_READ && dwBytesOfTransfer > 0)//收到数据

{

cout<<"reiceive msg size :"<<dwBytesOfTransfer<<endl;

cout<<"receive times :"<< ++dwRecvCount<<endl;

}

else if(pKey->bOperationType == SOCK_QUIT || dwBytesOfTransfer == 0)

{

//cout<<"status quit /r/n"<<endl;

//OutputDebugString(TEXT("status quit /r/n"));

cout<<"error num :"<<pPerIoData->overlapped.Internal<<endl;

shutdown(pKey->sockNew, SD_BOTH);

closesocket(pKey->sockNew);

if(pKey != NULL)

{

delete pKey;

pKey = NULL;

}

continue;

//break;

}

ZeroMemory(&(pPerIoData->overlapped), sizeof(pPerIoData->overlapped));

pPerIoData->dataBuf.buf = pPerIoData->szBuffer;

pPerIoData->dataBuf.len = DATA_BUFFER_SIZE;

pPerIoData->bOperationType = SOCK_READ;

pPerIoData->sockNew = pPerIoData->sockNew;

nRet = WSARecv(pKey->sockNew, &(pKey->dataBuf), 1, &dwReceiveBytes, &dwFlags, &(pPerIoData->overlapped), NULL);

if(WSAGetLastError() != WSA_IO_PENDING && nRet != 0)

{

cout<<"in Worker thread WSARecv error"<<endl;

cout<<"errorCode: "<<WSAGetLastError()<<endl;

return 0;

//break;

}

}

}

}

__except(EXCEPTION_EXECUTE_HANDLER)

{

cout<<"Exception accurred! /r/n"<<endl;

return 1;

}

//OutputDebugString(TEXT("quit /r/n"));

if(pKey->bOperationType != SOCK_QUIT)

{

shutdown(pKey->sockNew, SD_BOTH);

closesocket(pKey->sockNew);

}

assert(pKey != NULL);

delete pKey;

pKey = NULL;

//OutputDebugString(TEXT("quit from thread /r/n"));

cout<<"quit from thread in WorkThread/r/n"<<endl;

return 0;

}

int _tmain(int argc, _TCHAR* argv[])

{

InitWinSockLib();

//创建完成端口

HANDLE hCompletionPort = CreateNewCompletionPort(0);

assert(hCompletionPort != NULL);

//查询机器的CPU个数

SYSTEM_INFO  sysInfo;

GetSystemInfo(&sysInfo);

int nRet(0);

HANDLE hThread[2] = {NULL};

//创建工作者线程

for(DWORD i = 0; i < sysInfo.dwNumberOfProcessors; i++)

{

hThread[i] = (HANDLE)_beginthreadex(NULL, 0, WorkerThread, (void*)&hCompletionPort, 0, NULL);

assert(hThread[i] != NULL);

//CloseHandle(hThread);

}

//创建监听套接字

SOCKET sockListen = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);

if(INVALID_SOCKET == sockListen)

{

OutputDebugString(TEXT("create socket listen failed!"));

return 0;

}

//绑定

SOCKADDR_IN addrListen;

addrListen.sin_family = AF_INET;

addrListen.sin_addr.S_un.S_addr = inet_addr("192.168.100.94");

addrListen.sin_port = htons(3535);

nRet = bind(sockListen, (SOCKADDR*)&addrListen, sizeof(SOCKADDR));

if(nRet == SOCKET_ERROR)

{

OutputDebugString(TEXT("bind error!"));

return 1;

}

//设置为监听模式

nRet = listen(sockListen, 0);

if(SOCKET_ERROR == nRet)

{

OutputDebugString(TEXT("listen error!"));

return 1;

}

SOCKADDR_IN addrFrom;

int nLen = sizeof(addrFrom);

DWORD dwFlags(0);

int nWaitCount(100);

while(nWaitCount > 0)

{

SOCKET sockNew = accept(sockListen, (SOCKADDR*)&addrFrom, &nLen);

if( INVALID_SOCKET == sockNew)

{

OutputDebugString(TEXT("accept failed! /r/n"));

break;

}

PER_IO_DATA *pPerIoData = new PER_IO_DATA();

assert(pPerIoData != NULL);

ZeroMemory(pPerIoData, sizeof(PER_IO_DATA));

pPerIoData->bOperationType = SOCK_READ;

pPerIoData->sockNew = sockNew;

pPerIoData->dataBuf.buf = pPerIoData->szBuffer;

pPerIoData->dataBuf.len = DATA_BUFFER_SIZE;

//将新的套接字和完成端口关联起来

AssociateCompletionPort((HANDLE)sockNew, hCompletionPort, (ULONG_PTR)pPerIoData);

WSARecv(sockNew, &(pPerIoData->dataBuf), 1, NULL, &dwFlags, &(pPerIoData->overlapped), NULL);

if(WSAGetLastError() != WSA_IO_PENDING)

{

OutputDebugString(TEXT("WSARecv failed /r/n"));

break;

}

nWaitCount--;

//让完成端口管理新建立的链接套接字I/O处理

}

OutputDebugString(TEXT("will quit /r/n"));

PER_IO_DATA *pPerIoDataTmp = new PER_IO_DATA();

pPerIoDataTmp->bOperationType = SOCK_QUIT;

pPerIoDataTmp->dataBuf.buf = pPerIoDataTmp->szBuffer;

PostQueuedCompletionStatus(hCompletionPort, 0, (DWORD)pPerIoDataTmp, NULL);

WaitForMultipleObjects(2, hThread, TRUE, INFINITE);

//delete pPerIoDataTmp;

pPerIoDataTmp = NULL;

for(int i = 0; i < 2; i++)

{

CloseHandle(hThread[i]);

}

CloseHandle(hCompletionPort);

closesocket(sockListen);

UnloadSockLib();

return 0;

}
  
 
2、测试客户端代码
 
]#include <WinSock2.h>

#include <iostream>

using namespace std;

#pragma comment(lib, "Ws2_32.lib")

#define BUF_SIZE 100

void InitWinSockLib()

{

WSADATA wsadata;

int nRet(0);

nRet = WSAStartup(MAKEWORD(2,2), &wsadata);

if(nRet != 0)

{

OutputDebugString(TEXT("init err!"));

return;

}

if(LOWORD(wsadata.wVersion) != 2 || HIWORD(wsadata.wVersion) != 2)

{

OutputDebugString(TEXT("wrong version"));

return;

}

return;

}

void UnloadSockLib()

{

WSACleanup();

}

int _tmain(int argc, _TCHAR* argv[])

{

InitWinSockLib();

int nRet(0);

SOCKET sockConn = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);

if(INVALID_SOCKET == sockConn)

{

OutputDebugString(TEXT("create socket listen failed!"));

return 0;

}

//绑定

SOCKADDR_IN addrConn;

addrConn.sin_family = AF_INET;

addrConn.sin_addr.S_un.S_addr = inet_addr("192.168.100.94");

addrConn.sin_port = htons(3536);

nRet = bind(sockConn, (SOCKADDR*)&addrConn, sizeof(SOCKADDR));

if(nRet == SOCKET_ERROR)

{

OutputDebugString(TEXT("bind error!"));

return 1;

}

SOCKADDR_IN addrTo;

addrTo.sin_family = AF_INET;

addrTo.sin_addr.S_un.S_addr = inet_addr("192.168.100.94");

addrTo.sin_port = htons(3535);

nRet = connect(sockConn, (SOCKADDR*)&addrTo, sizeof(SOCKADDR));

if(SOCKET_ERROR == nRet)

{

OutputDebugString(TEXT("connect server failed"));

return 1;

}

char szSendBuf[BUF_SIZE] = {0};

int nSend(0);

int nLeft(BUF_SIZE);

int nSendCout(0);

while(nSendCout < 50)

{

while(nLeft != 0)

{

nSend += send(sockConn, &szSendBuf[nSend], BUF_SIZE, 0);

nLeft = nLeft - nSend;

}

cout<<"send msg to server"<<endl;

nLeft = BUF_SIZE;

nSend = 0;

::Sleep(1);

nSendCout++;

cout<<"send times: "<<nSendCout<<endl;

}

shutdown(sockConn,  SD_SEND);

closesocket(sockConn);

cout <<"Stop client"<<endl;

//system("pause");

UnloadSockLib();

return 0;

}

都是编译通过,简单测试过,可以拿来用,但是没经过很好的全面的测试。
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息