您的位置:首页 > 其它

第10章 同步设备I/O和异步设备I/O(4)_利用I/O完成端口实现Socket通信

2015-09-11 22:38 453 查看
I/O完成端口原理见上一篇(可点击这里

10.5.4.4 利用I/O完成端口实现Socket通信

(1)Accept和AcceptEx流程的比较

  ①采用accept方式的流程示意图如下(普通的阻塞函数



  ②采用AcceptEx方式的流程示意图如下(可工作在阻塞或非阻塞方式)、



【注意】:AcceptEx与Accept的不同

  ①AcceptEx是在客户端连入之前,就把客户端的Socket建立好了。也就是说,AcceptEx是先建立的Socket,然后才发出的AcceptEx调用。即在进行客户端的通信之前,无论是否有客户端连入,Socket都是提前建立好了;而不需要像accept是在客户端连入了之后,再临时去花费时间建立Socket。

  ②相比accept只能阻塞方式建立一个连接的入口,对于大量的并发客户端来讲,入口实在是有点挤;而AcceptEx可以同时在完成端口上投递多个请求,这样有客户端连入的时候,就非常优雅而且从容不迫的处理连入请求了。

  ③AcceptEx还有一个非常体贴的优点,就是在投递AcceptEx的时候,还可以顺便在AcceptEx的同时,收取客户端发来的第一组数据。这个是同时进行的,即在收到AcceptEx完成的通知的时候,就已经把这第一组数据接完毕了;但是这也意味着,如果客户端只是连入但是不发送数据的话,我们就不会收到这个AcceptEx完成的通知。

  ④Accept函数需要启动新的线程来连接客户端,而AcceptEx不需要!

(2)AcceptEx函数

参数

描述

sListenSocket

用来监听的Socket

sAcceptSocket

用于接受连接的socket,这个就是那个事先建好的,等客户端连接进来直接把这个Socket拿给它用的那个,是AcceptEx高性能的关键所在。

lpOutputBuffer

接收缓冲区。这也是AcceptEx比较有特色的地方,既然AcceptEx不是普通的accept函数,该缓冲区包含了三个信息:一是客户端发来的第一组数据,二是server的地址,三是client地址

dwReceiveDataLength

参数lpOutputBuffer中用于存放数据的空间大小。

如果此参数=0,则Accept时将不会待数据到来,而直接返回,如果此参数不为0,那么一定得等接收到数据了才会返回…… 所以通常当需要Accept接收数据时,就需要将该参数设成为:sizeof(lpOutputBuffer) - 2*(sizeof sockaddr_in +16),也就是说总长度减去两个地址空间的长度就是

dwLocalAddressLength

存放本地址地址信息的空间大小

dwRemoteAddressLength

存放本远端地址信息的空间大小

lpdwBytesReceived

一般没用,只用在同步时。

lpOverlapped

重叠I/O所要用到的重叠结构

备注:

①这是个扩展版本的函数,需要用WSAloctl()函数获取该函数的函数指针。

②此外,当客户端连接以后,可以通过GetAcceptExSockaddrs函数来获取地址信息,但该函数也要用WSAloctl函数来获取其指针。

(3)关于完成端口通知的次序问题

  调用GetQueuedCompletionStatus 获取I/O完成端口请求的时候,肯定是用先入先出的方式来进行的。但是,唤醒那些调用了GetQueuedCompletionStatus的线程是以后进先出的方式来进行的。如果出现了一个已经完成的I/O项,那么是最后一个调用GetQueuedCompletionStatus的线程会被唤醒。平常这个次序倒是不重要,但是在对数据包顺序有要求的时候。同时并发的线程可能不止一个,但他们处理I/O完成项的速度可能不一样。这样会导致读取到的数据的顺序可能跟收送过来的数据的顺序是不一样的。比如传送大块数据的时候,是需要注意下这个先后次序的。微软之所以这么做,那当然是有道理的,这样如果反复只有一个I/O操作而不是多个操作完成的话,内核就只需要唤醒同一个线程就可以了,而不需要轮着唤醒多个线程,节约了资源,而且可以把其他长时间睡眠的线程换出内存,提到资源利用率。

(4)文件传输问题

  如果各位需要使用完成端口来传送文件的话,这里有个非常需要注意的地方。因为发送文件的做法,一般都会是先打开一个文件,然后不断的循环调用ReadFile读取一块之后,然后再调用WSASend ()去发发送。但是ReadFile的时候,是需要操作系统通过磁盘的驱动程序,到实际的物理硬盘上去读取文件的,这就会使得操作系统从用户态转换到内核态去调用驱动程序,然后再把读取的结果返回至用户态;同样的道理,WSARecv也会涉及到从用户态到内核态切换的问题 --- 这样就使得我们不得不频繁的在用户态到内核态之间转换,效率低下……

  而一个非常好的解决方案是使用微软提供的扩展函数TransmitFile来传输文件,因为只需要传递给TransmitFile一个文件的句柄和需要传输的字节数,程序就会整个切换至内核态,无论是读取数据还是发送文件,都是直接在内核态中执行的,直到文件传输完毕才会返回至用户态给主进程发送通知,效率明显提高了许多。

(5)重叠结构数据释放的问题

  既然使用的是异步通讯的方式,就得要习惯一点,就是我们投递出去的完成请求,不知道什么时候我们才能收到操作完成的通知,而在这段等待通知的时间,我们就得要千万注意得保证我们投递请求的时候所使用的变量在此期间都得是有效的。

  例如发送WSARecv请求时候所使用的Overlapped变量,因为在操作完成的时候,这个结构里面会保存很多很重要的数据,对于设备驱动程序来讲,指示保存着我们这个Overlapped变量的指针,而在操作完成之后,驱动程序会将Buffer的指针、已经传输的字节数、错误码等等信息都写入到我们传递给它的那个Overlapped指针中去。如果我们已经不小心把Overlapped释放了,或者是又交给别的操作使用了的话,谁知道驱动程序会把这些东西写到哪里去呢?岂不是很崩溃?

【利用IOCP实现Socket通信程序】

效果图——可以打开任务管理器,查看CPU占用率!(比较低)



//服务器端

//main.c

/*************************************************************************
Moudule:  main.cpp
Notices:  Copyright(c) 2015 浅墨浓香
*************************************************************************/
#include "../../CommonFiles/CmnHdr.h"
#include "resource.h"
#include "IOCPModel.h"
#include <tchar.h>

//////////////////////////////////////////////////////////////////////////
TCHAR szApp[] = TEXT("服务端器");
CIOCPModel g_IocpModel;

//////////////////////////////////////////////////////////////////////////
//向列表示中加入信息
inline void AddInformation(HWND hwndLv, PCTSTR pszInfo)
{
LVITEM lv;
memset(&lv, 0, sizeof(lv));
lv.mask = LVIF_TEXT | LVIF_PARAM;
#ifdef UNICODE
lv.pszText = (PWSTR)pszInfo;
#else
USES_CONVERSION;
lv.pszText = A2W((PSTR)pszInfo);
#endif
lv.lParam =(LPARAM)ListView_GetItemCount(hwndLv);
ListView_InsertItem(hwndLv, &lv);
}

//////////////////////////////////////////////////////////////////////////
//初始化ListView
void InitListCtrl(HWND hwndLv){
ListView_SetExtendedListViewStyle(hwndLv, LVS_EX_FULLROWSELECT | LVS_EX_GRIDLINES);
LVCOLUMN lvCol;
//设置第1列为Information
lvCol.mask = LVCF_FMT | LVCF_ORDER | LVCF_TEXT | LVCF_WIDTH;
lvCol.fmt = LVCFMT_CENTER;
lvCol.cx = 490;
lvCol.pszText = TEXT("信息");
lvCol.cchTextMax = 4;
lvCol.iOrder = 0;
ListView_InsertColumn(hwndLv, 0, &lvCol);
}

//////////////////////////////////////////////////////////////////////////
BOOL Dlg_OnInitDialog(HWND hwnd, HWND hwndFocus, LPARAM lParam){
g_IocpModel.LoadSocketLib();

//设置本机IP
Static_SetText(GetDlgItem(hwnd, IDC_SERVERIP), g_IocpModel.GetLocalIP());
//设置默认端口
SetDlgItemInt(hwnd,IDC_PORT, DEFAULT_PORT,FALSE);

//初始化列表
InitListCtrl(GetDlgItem(hwnd, IDC_LIST_INFO));

//绑定主界面(为了方便在界面中显示信息 )
g_IocpModel.SetMainDlg(hwnd);

return TRUE;
}

//////////////////////////////////////////////////////////////////////////
void Dlg_OnCommand(HWND hwnd, int id, HWND hwndCtrl, UINT codeNotity){

switch (id)
{
case IDCANCEL:
g_IocpModel.Stop();
EndDialog(hwnd, id);
break;

case IDOK:
if (FALSE == g_IocpModel.Start()){
MessageBox(hwnd, TEXT("服务器启动失败!"), szApp, MB_OK | MB_ICONERROR);
return;
}
EnableWindow(GetDlgItem(hwnd, IDOK), FALSE);
EnableWindow(GetDlgItem(hwnd, IDC_STOP), TRUE);
break;

case IDC_STOP:
g_IocpModel.Stop();
EnableWindow(GetDlgItem(hwnd, IDC_STOP), FALSE);
EnableWindow(GetDlgItem(hwnd, IDOK), TRUE);
break;

}
}

//////////////////////////////////////////////////////////////////////////

//////////////////////////////////////////////////////////////////////////
INT_PTR WINAPI Dlg_Proc(HWND hwnd, UINT uMsg, WPARAM wParam, LPARAM lParam){
switch (uMsg)
{
chHANDLE_DLGMSG(hwnd, WM_INITDIALOG, Dlg_OnInitDialog);
chHANDLE_DLGMSG(hwnd, WM_COMMAND, Dlg_OnCommand);
}
return FALSE;
}

//////////////////////////////////////////////////////////////////////////
int WINAPI _tWinMain(HINSTANCE hInstExe, HINSTANCE, PTSTR pszCmdLine, int){
DialogBox(hInstExe, MAKEINTRESOURCE(IDD_IOCPSERVER), NULL, Dlg_Proc);
return 0;
}


//IOCPModel.h

/************************************************************************

************************************************************************/
#pragma once

#include "../../CommonFiles/CmnHdr.h"
#include <vector>
#include <atlconv.h>  //ATL库提供了更简便的字符集转换函数
#include <assert.h>

using namespace std;

#pragma comment(lib,"ws2_32.lib")
//////////////////////////////////////////////////////////////////////////
#define DEFAULT_IP  TEXT("127.0.0.1")
#define DEFAULT_PORT  12345
#define MAX_BUFFER_LEN  8192 //缓冲区大小8K

//////////////////////////////////////////////////////////////////////////
//向列表示中加入信息
extern inline void AddInformation(HWND hwndLv, PCTSTR pszInfo);

//////////////////////////////////////////////////////////////////////////
//在完成端口中上投递的I/O操作类型(完成键)
typedef enum _tag_OPERATION_TYPE
{
ACCEPT_POSTED, //Accept操作的标志
SEND_POSTED,   //发送操作的标志
RECV_POSTED,   //接收操作的标志
NULL_POSTED    //用于初始化,无意义
}OPERATION_TYPE;

//////////////////////////////////////////////////////////////////////////
//PER_IO_DATA(单I/O数据)封装了OVERLAPPED结构体
//单IO数据,封装了OVERLAPPED结构体,代表一次I/O重叠操作,一般与每次的操作WSARecv、
//WSASend等相对应,即进行一次读或写操作;
typedef struct _PER_IO_CONTEXT
{
OVERLAPPED m_Overlapped;  //每个异步调用Socket的函数都有一个这结构体,表示
//对该Socket的某种操作
SOCKET     m_sockAccpet;  //
WSABUF     m_wsaBuf;      //WSA类型的缓冲区,用于给重叠操作传参数

char       m_szBuffer[MAX_BUFFER_LEN];
OPERATION_TYPE  m_OpType; //标识网络操作的类型(对应上面的枚举)

//初始化
_PER_IO_CONTEXT(){
ZeroMemory(&m_Overlapped, sizeof(m_Overlapped));
ZeroMemory(m_szBuffer, MAX_BUFFER_LEN);
m_wsaBuf.buf = m_szBuffer;
m_wsaBuf.len = MAX_BUFFER_LEN;
m_OpType = NULL_POSTED;
}

//释放Socket
~_PER_IO_CONTEXT(){
if (m_sockAccpet!=INVALID_SOCKET){
closesocket(m_sockAccpet);
m_sockAccpet = INVALID_SOCKET;
}
}

//重置缓冲区内容
void ResetBuffer(){
ZeroMemory(m_szBuffer, MAX_BUFFER_LEN);
}

}PER_IO_CONTEXT,*PPER_IO_CONTEXT;

//////////////////////////////////////////////////////////////////////////
//完成键,传递的数据被称为单句柄结构PER_HANDLE_DATA,一般是与每个socket句柄对应
typedef struct _PER_SOCKET_CONTEXT
{
SOCKET  m_Socket;
SOCKADDR_IN  m_ClientAddr;  //客户端地址
vector<PPER_IO_CONTEXT> m_arrayIoContext;
//初始化
_PER_SOCKET_CONTEXT()
{
m_Socket = INVALID_SOCKET;
memset(&m_ClientAddr, 0, sizeof(m_ClientAddr));
}

//释放资源
~_PER_SOCKET_CONTEXT(){
if (m_Socket!=INVALID_SOCKET){
closesocket(m_Socket);
m_Socket = INVALID_SOCKET;
}

//释放掉所有的IO上下文数据
vector<PPER_IO_CONTEXT>::iterator it;
for (it = m_arrayIoContext.begin();it!=m_arrayIoContext.end();it++){
delete *it;
}
m_arrayIoContext.clear();
}

//获取一个新的IoContext
_PER_IO_CONTEXT* GetNewIoContext(){
_PER_IO_CONTEXT* p = new _PER_IO_CONTEXT;
m_arrayIoContext.push_back(p);
return p;
}

//从数组中移除一个指定的IoContext
void RemoveContext(_PER_IO_CONTEXT* pContext){
assert(pContext != NULL);

vector<PPER_IO_CONTEXT>::iterator it;
for (it = m_arrayIoContext.begin(); it != m_arrayIoContext.end(); it++){
if (*it == pContext){
m_arrayIoContext.erase(it);
delete pContext;
break;
}
}

}
}PER_SOCKET_CONTEXT,*PPER_SOCKET_CONTEXT;

//////////////////////////////////////////////////////////////////////////
class CIOCPModel;
typedef struct _tagThreadParams_WORKER
{
CIOCPModel* pIOCPModel;//类指类,用于调用类中的函数
int         nThreadNo;//线程编号
}THREADPARAMS_WORKER, *PTHREADPARAMS_WORKER;

//////////////////////////////////////////////////////////////////////////
//CIOCPModel类
class CIOCPModel
{
public:
CIOCPModel();
~CIOCPModel();
public:
//启动服务器
BOOL Start();

//停止服务器
void Stop();

//加载Socket库
BOOL LoadSocketLib();

//卸载Socket库
void UnloadSocketLib(){ WSACleanup(); }

//获得本机IP
TCHAR* GetLocalIP();

//设置监听端口
void SetPort(const int& nPort){m_nPort = nPort;}

//保存主对话框句柄
void SetMainDlg(HWND hwnd){ m_hWnd = hwnd; }

protected:
//线程函数,用来处理IOCP请求服务的
static DWORD WINAPI _WorkerThread(PVOID pParam);
//初始化IOCP
BOOL _InitializeIOCP();

//初始化Socket
BOOL _InitializeListenSocket();

//释放资源
BOOL _DeInitialize();

//投递Accept请求
BOOL _PostAccept(PER_IO_CONTEXT* pAcceptIoContext);

//投递接收数据请求
BOOL _PostRecv(PER_IO_CONTEXT* pIoContext);

//当客户端连入的时候,进行处理
BOOL _DoAccept(PER_SOCKET_CONTEXT* pSocketContext, PPER_IO_CONTEXT pIoContext);

//当有接收的数据到达的时候进行处理
BOOL _DoRecv(PER_SOCKET_CONTEXT* pSocketContext, PPER_IO_CONTEXT pIoContext);

//将设备绑定到完成端口中
BOOL _AssociateWithIOCP(PER_SOCKET_CONTEXT *pContext);

//获取本机的处理器数量
int _GetNumberOfProcessors();

//在主界面上显示信息
void _ShowMessage(PCTSTR pszFormat, ...) const;

//将客户端的相关信息存储到数组中
void _AddToContextList(PER_SOCKET_CONTEXT* pSocketContext);

//将客户端的信息从数组中移除
void _RemoveContext(PER_SOCKET_CONTEXT* pSocketContext);

//清空客户端信息
void _ClearContextList();

//获得AcceptEx函数的地址
BOOL _GetAcceptExPointer(SOCKET sock);

//获取GetAcceptExSockAddrs函数的指针
BOOL _GetAcceptExSockAddrsPointer(SOCKET sock);

//处理完成端口上的错误
BOOL _HandleError(PER_SOCKET_CONTEXT* pContext, const DWORD& dwErr);

//检测客户端Socket是否己断开
BOOL _IsSocketAlive(SOCKET s);

private:
TCHAR* m_szIP;  //服务器IP
int m_nPort;    //服务器监听端口
HWND m_hWnd;
int  m_nThreads;
HANDLE* m_phWorkerThreads; //所有工作线程(Worker)句柄数组
HANDLE m_hShutdownEvent;
HANDLE m_hIOCP;
CRITICAL_SECTION     m_csContextList;    // 用于Worker线程同步的互斥量
vector<PER_SOCKET_CONTEXT*> m_arrayClientContext;//客户端列表(Socket的Context信息)
PER_SOCKET_CONTEXT* m_pListenContext;   //用于监听Socket的Context信息
LPFN_ACCEPTEX m_lpfnAcceptEx; //AcceptEx函数指针
LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockAddrs;//GetAcceptExSockaddrs函数的指针
};


//IOCPModel.cpp

#include "IOCPModel.h"
#include "resource.h"
#include <tchar.h>
#include <ws2tcpip.h> //For getaddrinfo函数

//////////////////////////////////////////////////////////////////////////
#define WORKER_THREADS_PER_PROCESSOR    2  //每个处理器最大处理的线程数
#define EXIT_CODE                        NULL //传递给工作线程退出的信号
#define  MAX_POST_ACCEPT                10 //同时投递Accept请求的数量(可根据实际调整)

//////////////////////////////////////////////////////////////////////////
//释放指针宏
#define RELEASE(x)  {if(x!=NULL){delete x;x=NULL;}}

//释放句柄宏
#define RELEASE_HANDLE(x) \
{if(x!=NULL && x!=INVALID_HANDLE_VALUE) {CloseHandle(x);x = NULL;}}

//释放Socket宏
#define RELEASE_SOCKET(x) {if(x!=INVALID_SOCKET){closesocket(x);x=INVALID_SOCKET;}}
//////////////////////////////////////////////////////////////////////////
//工作者线程:为IOCP请求服务的线程函数,也就是当完成端口出现完成数据包里,
//就将之取走进行处理的线程
DWORD WINAPI CIOCPModel::_WorkerThread(PVOID pParam){
PTHREADPARAMS_WORKER pParams = (PTHREADPARAMS_WORKER)pParam;
CIOCPModel* pIOCPModel = (CIOCPModel*)pParams->pIOCPModel;
int nThreadNo = (int)pParams->nThreadNo;
pIOCPModel->_ShowMessage(_T("工作者线程启动,ID:%d"), nThreadNo);

OVERLAPPED* pOverlapped = NULL;
PER_SOCKET_CONTEXT *pSocketContext = NULL;
DWORD dwBytesTransfered = 0;
//循环处理请求,直接收到Shutdown信息为止
while (WAIT_OBJECT_0 != WaitForSingleObject(pIOCPModel->m_hShutdownEvent, 0)){
BOOL bRet = GetQueuedCompletionStatus(pIOCPModel->m_hIOCP,
&dwBytesTransfered,
(PULONG_PTR)&pSocketContext,
&pOverlapped,
INFINITE);
//退出时,程序要手动PostQueuedCompletionStatus(m_hIOCP, 0, (DWORD)EXIT_CODE, NULL);
if (EXIT_CODE ==(DWORD)pSocketContext){
break;
}

//判断是否出现了错误
if (!bRet){
DWORD dwErr = WSAGetLastError();

//显示提示信息
if (!pIOCPModel->_HandleError(pSocketContext, dwErr)){
break;
}
continue;;
} else{
//读取传入的参数
//CONTAINING_RECORD(pInt,A,a)展开来后:(A*)((char*)pInt - (unsigned long)(&((A*)0)->a))
/*
CONTAINING_RECORD 宏返回一个结构实例的基地址,该结构的类型和结构所包含的一个域(成员)地址已知。
PCHAR  CONTAINING_RECORD(IN PCHAR  Address,IN TYPE  Type,IN PCHAR  Field);
参数
Address 指向Type类型结构实例中某域(成员)的指针。
Type 需要得到基地址的结构实例的结构类型名。
Field Type类型结构包含的域(成员)的名称。
*/
PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped, PER_IO_CONTEXT, m_Overlapped); //(PER_IO_CONTEXT*)pOverlapped;

//判断是否客户端断开
if ((0==dwBytesTransfered)&& (RECV_POSTED==pIoContext->m_OpType ||
SEND_POSTED==pIoContext->m_OpType)){
TCHAR szIP[32];
InetNtop(AF_INET, &pSocketContext->m_ClientAddr.sin_addr, szIP, sizeof(szIP));
pIOCPModel->_ShowMessage(TEXT("客户端%s:%d断开连接."),
szIP, ntohs(pSocketContext->m_ClientAddr.sin_port));

//释放掉对应的资源
pIOCPModel->_RemoveContext(pSocketContext);
continue;
} else{
switch (pIoContext->m_OpType)
{
case ACCEPT_POSTED:  //Accept
pIOCPModel->_DoAccept(pSocketContext, pIoContext);
break;

case RECV_POSTED: //Recv
pIOCPModel->_DoRecv(pSocketContext, pIoContext);
break;

case SEND_POSTED: //Send
//这里略过
break;

default:
//不应该执行到这里
break;
}
}
}
} //end while
//TRACE(_T("工作者线程%d退出."), nThreadNo);
//释放线程参数
RELEASE(pParam);
return TRUE;
}

//////////////////////////////////////////////////////////////////////////
CIOCPModel::CIOCPModel() :
m_szIP(DEFAULT_IP), m_nPort(DEFAULT_PORT), m_hWnd(NULL),
m_nThreads(0), m_hIOCP(NULL), m_hShutdownEvent(NULL), m_phWorkerThreads(NULL),
m_pListenContext(NULL),m_lpfnGetAcceptExSockAddrs(NULL), m_lpfnAcceptEx(NULL)
{
//初始化线程互斥量
InitializeCriticalSection(&m_csContextList);
}

CIOCPModel::~CIOCPModel(){
this->Stop();
DeleteCriticalSection(&m_csContextList);
}

//初始化WinSock2.2
BOOL CIOCPModel::LoadSocketLib()
{
WSADATA wsaData;
int nRet;
nRet = WSAStartup(MAKEWORD(2, 2), &wsaData);

return (nRet);
}

//
BOOL CIOCPModel::Start()
{
//建立系统退出的事件通知
m_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL);

//初始化IOCP
if (FALSE == _InitializeIOCP()){
_ShowMessage(_T("初始化IOCP失败!\n"));
return FALSE;
} else{
_ShowMessage(_T("初始化IOCP成功!\n"));
}

//初始化Socket
if (FALSE == _InitializeListenSocket()){
_ShowMessage(_T("Listen Socket初始化失败!\n"));
_DeInitialize();
return FALSE;
} else{
_ShowMessage(_T("Listen Socket初始化!\n"));
}

//
_ShowMessage(_T("系统准备就绪,等候连接...\n"));
return TRUE;
}

//发送系统退出消息,退出完成端口和线程资源
void CIOCPModel::Stop(){
//激活关闭消息
SetEvent(m_hShutdownEvent);
for (int i = 0; i < m_nThreads; i++)
{
//通知所有的完成端口操作退出
PostQueuedCompletionStatus(m_hIOCP, 0, (DWORD)EXIT_CODE, NULL);
}

//等待所有的客户端资源退出
if (NULL != m_phWorkerThreads)
WaitForMultipleObjects(m_nThreads, m_phWorkerThreads, TRUE, INFINITE);

//清除客户端列表信息
_ClearContextList();

//清除其他资源
_DeInitialize();

_ShowMessage(_T("停止监听!\n"));
}

//////////////////////////////////////////////////////////////////////////
//初始化IOCP
BOOL CIOCPModel::_InitializeIOCP(){
//建立一个完成端口
m_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (NULL == m_hIOCP)
return FALSE;

//根据本机的处理器数量,建立对应的线程数
m_nThreads = WORKER_THREADS_PER_PROCESSOR*_GetNumberOfProcessors();

//为工作者线程分配句柄数组
m_phWorkerThreads = new HANDLE[m_nThreads];

//根据计算出出的数量建立工作者(Worker)线程
DWORD nThreadID;
for (int i = 0; i < m_nThreads;i++){
THREADPARAMS_WORKER* pThreadParams = new THREADPARAMS_WORKER;
pThreadParams->pIOCPModel = this;
pThreadParams->nThreadNo = i + 1;
m_phWorkerThreads[i] = chBEGINTHREADEX(NULL, 0, _WorkerThread,
(PVOID)pThreadParams, 0, &nThreadID);
if (m_phWorkerThreads[i] == NULL)
return FALSE;
}

return TRUE;
}

//初始化Socket
BOOL CIOCPModel::_InitializeListenSocket(){

m_pListenContext = new PER_SOCKET_CONTEXT;

//用WSASocket创建监听Socket(支持重叠IO操作)
m_pListenContext->m_Socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if (INVALID_SOCKET==m_pListenContext->m_Socket){
_ShowMessage(TEXT("初始化Socket失败,错误代码:%d.\n"), WSAGetLastError());
return FALSE;
};

//将监听Socket绑定到完成端口
if (NULL == CreateIoCompletionPort((HANDLE)m_pListenContext->m_Socket,
m_hIOCP,(ULONG_PTR)m_pListenContext,0)){
_ShowMessage(TEXT("绑定Listen Socket到完成端口失败!错误代码:%d\n"),WSAGetLastError());
RELEASE_SOCKET(m_pListenContext->m_Socket);
return FALSE;
}

//填充地址信息
struct sockaddr_in ServerAddress = { 0 };
ServerAddress.sin_family = AF_INET;

//这里可以绑定到任何IP,但也可以指定一个IP
//ServerAddress.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
InetPton(AF_INET, m_szIP,(PVOID)&ServerAddress.sin_addr.S_un.S_addr);
ServerAddress.sin_port = htons(m_nPort);

//绑定地址和端口到监听Socket
if (SOCKET_ERROR == bind(m_pListenContext->m_Socket,(sockaddr*)&ServerAddress,
sizeof(ServerAddress))){
_ShowMessage(TEXT("bind()函数执行错误.错误代码:%d\n"), WSAGetLastError());
return FALSE;
}

//开始监听
if (SOCKET_ERROR == listen(m_pListenContext->m_Socket,SOMAXCONN)){
_ShowMessage(TEXT("Listen()函数执行出错,错误代码:%d\n"), WSAGetLastError());
return FALSE;
}

//获取AcceptEx函数地址
if (!_GetAcceptExPointer(m_pListenContext->m_Socket))
{
_ShowMessage(TEXT("WSAIoCtl未能获取AcceptEx函数指针,错误代码:%d\n"), WSAGetLastError());
_DeInitialize();
return FALSE;
}

//获取GetAcceptExSockAddrs函数指针
if (!_GetAcceptExSockAddrsPointer(m_pListenContext->m_Socket))
{
_ShowMessage(TEXT("WSAIoCtl未能获取GetAcceptExSockAddrs函数指针,错误代码:%d\n"), WSAGetLastError());
_DeInitialize();
return FALSE;
}

//为AcceptEx准备参数,然后投递I/O请求
for (int i = 0; i < MAX_POST_ACCEPT;i++){
//新建一个IO_CONTEXT
PER_IO_CONTEXT* pAcceptIoContext = m_pListenContext->GetNewIoContext();
if (FALSE == _PostAccept(pAcceptIoContext)){
m_pListenContext->RemoveContext(pAcceptIoContext);
return FALSE;
}
}

_ShowMessage(TEXT("投递%d个AcceptEx请求完毕"), MAX_POST_ACCEPT);
return TRUE;
}

//释放资源
BOOL CIOCPModel::_DeInitialize(){

//关闭系统退出事件句柄
RELEASE_HANDLE(m_hShutdownEvent);

//释放工作者线程句柄
if (m_phWorkerThreads!=NULL)
for (int i = 0; i < m_nThreads; i++)
{
RELEASE_HANDLE(m_phWorkerThreads[i]);
}
RELEASE(m_phWorkerThreads);

//关闭IOCP句柄
RELEASE_HANDLE(m_hIOCP);

//关闭监听Socket句柄
RELEASE(m_pListenContext);
_ShowMessage(TEXT("释放资源完毕!\n"));
return TRUE;
}

//将Socket设备绑定到完成端口中
BOOL CIOCPModel::_AssociateWithIOCP(PER_SOCKET_CONTEXT *pContext){
//将用于和客户端通信的SOCKET绑定到完成端口中
HANDLE hTemp = CreateIoCompletionPort((HANDLE)pContext->m_Socket,
m_hIOCP,(DWORD)pContext,0);
if (NULL ==hTemp){
_ShowMessage(TEXT("执行Socket绑定到完成端口失败!错误代码:%u"), GetLastError());
return FALSE;
}
return TRUE;
}

//投递Accept请求
BOOL CIOCPModel::_PostAccept(PER_IO_CONTEXT* pAcceptIoContext){
assert(INVALID_SOCKET != m_pListenContext->m_Socket);

//准备参数
DWORD dwBytes = 0;
pAcceptIoContext->m_OpType = ACCEPT_POSTED;
WSABUF* p_wbuf = &pAcceptIoContext->m_wsaBuf;
OVERLAPPED* pol = &pAcceptIoContext->m_Overlapped;

//为以后新连入的客户端先准备好Socket(这与传统的Accept最大的差别)
pAcceptIoContext->m_sockAccpet = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP,
NULL, 0, WSA_FLAG_OVERLAPPED);
if (INVALID_SOCKET == pAcceptIoContext->m_sockAccpet){
_ShowMessage(TEXT("创建用于Accept的Socket失败!错误代码:%d"), WSAGetLastError());
return FALSE;
}

//投递AcceptEx请求
if (FALSE == m_lpfnAcceptEx(m_pListenContext->m_Socket, pAcceptIoContext->m_sockAccpet,
p_wbuf->buf, p_wbuf->len - ((sizeof(SOCKADDR_IN) + 16) * 2),
sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16,
&dwBytes, pol))
{
if (WSA_IO_PENDING != WSAGetLastError()){
_ShowMessage(TEXT("投递AcceptEx请求失败,错误代码:%d"),WSAGetLastError());
return FALSE;
}
}
return TRUE;
}

//投递接收数据请求
BOOL CIOCPModel::_PostRecv(PER_IO_CONTEXT* pIoContext){
//初始化变量
DWORD dwFlags = 0;
DWORD dwBytes = 0;
WSABUF* p_wbuf = &pIoContext->m_wsaBuf;
OVERLAPPED* pol = &pIoContext->m_Overlapped;

pIoContext->ResetBuffer();
pIoContext->m_OpType = RECV_POSTED;

//投递WSARecv请求
int nBytesRecv = WSARecv(pIoContext->m_sockAccpet, p_wbuf, 1, &dwBytes,
&dwFlags, pol, NULL);

//如果返回错误,并且错误代码不是Pending,说明这个请求失败了
if (SOCKET_ERROR == nBytesRecv && (WSA_IO_PENDING != WSAGetLastError())){
_ShowMessage(TEXT("投递第一个WSARecv失败!"));
return FALSE;
}
return TRUE;
}

//当客户端连入的时候,进行处理
//传入监听端口的Context,
BOOL CIOCPModel::_DoAccept(PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext){
LPSOCKADDR_IN pLocalAddr, pRemoteAddr;

int remoteLen = sizeof(SOCKADDR_IN), localLen = sizeof(SOCKADDR_IN);

//1.取得连入客户端的地址信息(客户端发来的第1组数据,本地地址、客户端地址信息)
m_lpfnGetAcceptExSockAddrs(pIoContext->m_wsaBuf.buf,
pIoContext->m_wsaBuf.len -((sizeof(SOCKADDR_IN)+16)*2),
sizeof(SOCKADDR_IN)+16,sizeof(SOCKADDR_IN)+16,
(LPSOCKADDR *)&pLocalAddr,
&localLen,
(LPSOCKADDR *)&pRemoteAddr,
&remoteLen);
TCHAR szIP[32];

InetNtop(AF_INET, (char*)&(pRemoteAddr->sin_addr.S_un.S_addr), szIP, sizeof(szIP));
_ShowMessage(TEXT("客户端:%s:%d连入."), szIP, ntohs(pRemoteAddr->sin_port));
_ShowMessage(TEXT("收到%s:%d信息:%s"), szIP, ntohs(pRemoteAddr->sin_port), pIoContext->m_wsaBuf.buf);

//2.将AcceptSocket绑定到完成端口(但是要为这个Soket创建一个上下文,即完成键)
PER_SOCKET_CONTEXT* pNewSocketContext = new PER_SOCKET_CONTEXT;
pNewSocketContext->m_Socket = pIoContext->m_sockAccpet;
memcpy(&(pNewSocketContext->m_ClientAddr), pRemoteAddr, sizeof(SOCKADDR_IN));
if (FALSE == _AssociateWithIOCP(pNewSocketContext)){
RELEASE(pNewSocketContext);
return FALSE;
}

//3.继续建立与这个Socket中Io,用于在这个Socket上投递第1个Recv数据
PER_IO_CONTEXT* pNewIoContext = pNewSocketContext->GetNewIoContext();
pNewIoContext->m_OpType = RECV_POSTED;
pNewIoContext->m_sockAccpet = pNewSocketContext->m_Socket;
//如果Buffer需要保留,则Copy一份出来
//memcpy(pNewIoContext->m_szBuffer, pIoContext->m_szBuffer, MAX_BUFFER_LEN);

//开始这个Socket上投递请求
if (FALSE == _PostRecv(pNewIoContext)){
pNewSocketContext->RemoveContext(pNewIoContext);
return FALSE;
}

//4.如果投递成功,将这个有效的客户端信息加入ContextList(有利用统一管理)
_AddToContextList(pNewSocketContext);

//5.使用完毕后,将监听Socket的那个IO重置,然后准备投递新的AcceptEx
pIoContext->ResetBuffer();

return _PostAccept(pIoContext);
}

//当有接收的数据到达的时候进行处理
BOOL CIOCPModel::_DoRecv(PER_SOCKET_CONTEXT* pSocketContext, PPER_IO_CONTEXT pIoContext){

//先把收到的数据显示出来,然后重置状态,发送一个Recv请求
SOCKADDR_IN* ClientAddr = &pSocketContext->m_ClientAddr;
TCHAR szIP[32];
InetNtop(AF_INET, &ClientAddr->sin_addr, szIP, sizeof(szIP));
_ShowMessage(TEXT("收到%s:%d信息:%s"),szIP,ntohs(ClientAddr->sin_port),pIoContext->m_wsaBuf.buf);

//投递一下个WSARecv请求
return _PostRecv(pIoContext);
}

//=========================================================================
//                        ContextList相关操作
//=========================================================================
//将客户端的相关信息存储到数组中
void CIOCPModel::_AddToContextList(PER_SOCKET_CONTEXT* pSocketContext){
EnterCriticalSection(&m_csContextList);
m_arrayClientContext.push_back(pSocketContext);
LeaveCriticalSection(&m_csContextList);
}

//移除某个特定的Context
void CIOCPModel::_RemoveContext(PER_SOCKET_CONTEXT* pSocketContext){
EnterCriticalSection(&m_csContextList);
vector<PER_SOCKET_CONTEXT*>::iterator it;
for (it = m_arrayClientContext.begin(); it != m_arrayClientContext.end();it++){
if (*it==pSocketContext){
m_arrayClientContext.erase(it);
delete pSocketContext;
break;
}
}
LeaveCriticalSection(&m_csContextList);
}

//清空客户端列表
void CIOCPModel::_ClearContextList(){
EnterCriticalSection(&m_csContextList);
vector<PER_SOCKET_CONTEXT*>::iterator it;
for (it = m_arrayClientContext.begin();it!=m_arrayClientContext.end();it++){
delete *it;
}
m_arrayClientContext.clear();
LeaveCriticalSection(&m_csContextList);
}

//=========================================================================
//                        其他辅助函数
//=========================================================================
//获取本机的处理器数量
inline int CIOCPModel::_GetNumberOfProcessors(){
SYSTEM_INFO si;
GetSystemInfo(&si);
return si.dwNumberOfProcessors;
}

// 在主界面中显示提示信息
void  CIOCPModel::_ShowMessage(PCTSTR pszFormat, ...) const{
if (NULL == m_hWnd)
return;

//根据传入的参数格式化字符串
TCHAR szMsg[1024];

va_list arglist;
va_start(arglist, pszFormat);
_vstprintf_s(szMsg, _countof(szMsg), pszFormat, arglist);
va_end(arglist);

AddInformation(GetDlgItem(m_hWnd, IDC_LIST_INFO), szMsg);
}

//获取本机IP
TCHAR* CIOCPModel::GetLocalIP(){
////获得本机主机名
char hostname[MAX_PATH] = { 0 };
gethostname(hostname, MAX_PATH);

struct addrinfo *pRet = NULL;//通过result指针指向addrinfo结构链表的指针
struct addrinfo hints;//需要<ws2tcpip.h>
ZeroMemory(&hints, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
DWORD dwRetval = getaddrinfo(hostname, NULL, &hints, &pRet);//返回值0代表成功
if (dwRetval != 0)
return m_szIP;

SOCKADDR_IN  *sockaddr_ipv4 = (SOCKADDR_IN*)pRet->ai_addr;
//将IP地址转化成字符串形式
char inAddr[16];
inet_ntop(AF_INET, (void*)&sockaddr_ipv4->sin_addr, inAddr, sizeof(inAddr));

#ifdef UNICODE
USES_CONVERSION;
m_szIP = A2W(inAddr);
#else
m_szIP =(TCHAR*)inAddr;
#endif

return m_szIP;
}

//获得AcceptEx函数的地址
BOOL CIOCPModel::_GetAcceptExPointer(SOCKET sock){
//使用AcceptEx函数,因为属于WinSock2规范之外的扩展函数,所以
//需要通过SWAIoCtl函数来获取其函数指针

DWORD dwBytes = 0;
GUID GuidAcceptEx = WSAID_ACCEPTEX;

return (SOCKET_ERROR != WSAIoctl(
sock,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&m_lpfnAcceptEx,
sizeof(m_lpfnAcceptEx),
&dwBytes,
NULL,
NULL
));
}

//获取GetAcceptExSockAddrs函数的指针
BOOL CIOCPModel::_GetAcceptExSockAddrsPointer(SOCKET sock){
//使用AcceptEx函数,因为属于WinSock2规范之外的扩展函数,所以
//需要通过WSAIoCtl函数来获取其函数指针

DWORD dwBytes = 0;
GUID GuidAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;

return (SOCKET_ERROR != WSAIoctl(
sock,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptExSockAddrs,
sizeof(GuidAcceptExSockAddrs),
&m_lpfnGetAcceptExSockAddrs,
sizeof(m_lpfnGetAcceptExSockAddrs),
&dwBytes,
NULL,
NULL
));
}

//检测客户端Socket是否己断开
BOOL CIOCPModel::_IsSocketAlive(SOCKET s){
int nBytesSend = send(s, "", 0, 0);
if (-1 == nBytesSend)
return FALSE;
return TRUE;
}

//处理完成端口上的错误
BOOL CIOCPModel::_HandleError(PER_SOCKET_CONTEXT* pContext, const DWORD& dwErr){

//如果超时,就再继续等待
if (WAIT_TIMEOUT ==dwErr){
//确认客户端是否还活着
if (!_IsSocketAlive(pContext->m_Socket)){
_ShowMessage(TEXT("检测到客户端异常退出!"));
_RemoveContext(pContext);
return TRUE;
}else{
_ShowMessage(TEXT("网络操作超时!重试中..."));
return TRUE;
}
//可能客户端异常退出了
} else if (ERROR_NETNAME_DELETED == dwErr){
_ShowMessage(TEXT("检测到客户端异常退出!"));
_RemoveContext(pContext);
return TRUE;
} else{
_ShowMessage(TEXT("完成端口操作出现错误,线程退出。错误代码:%d"), dwErr);
return FALSE;
}
return TRUE;
}


//Resouce.h

//{{NO_DEPENDENCIES}}
// Microsoft Visual C++ 生成的包含文件。
// 供 10_IOCPServer.rc 使用
//
#define IDC_STOP                        3
#define IDD_IOCPSERVER                   101
#define IDC_SERVERIP                    1003
#define IDC_PORT                        1004
#define IDC_LIST_INFO                   1006

// Next default values for new objects
//
#ifdef APSTUDIO_INVOKED
#ifndef APSTUDIO_READONLY_SYMBOLS
#define _APS_NEXT_RESOURCE_VALUE        102
#define _APS_NEXT_COMMAND_VALUE         40001
#define _APS_NEXT_CONTROL_VALUE         1009
#define _APS_NEXT_SYMED_VALUE           101
#endif
#endif


//Serve.rc

// Microsoft Visual C++ generated resource script.
//
#include "resource.h"

#define APSTUDIO_READONLY_SYMBOLS
/////////////////////////////////////////////////////////////////////////////
//
// Generated from the TEXTINCLUDE 2 resource.
//
#include "winres.h"

/////////////////////////////////////////////////////////////////////////////
#undef APSTUDIO_READONLY_SYMBOLS

/////////////////////////////////////////////////////////////////////////////
// 中文(简体,中国) resources

#if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_CHS)
LANGUAGE LANG_CHINESE, SUBLANG_CHINESE_SIMPLIFIED

#ifdef APSTUDIO_INVOKED
/////////////////////////////////////////////////////////////////////////////
//
// TEXTINCLUDE
//

1 TEXTINCLUDE
BEGIN
"resource.h\0"
END

2 TEXTINCLUDE
BEGIN
"#include ""winres.h""\r\n"
"\0"
END

3 TEXTINCLUDE
BEGIN
"\r\n"
"\0"
END

#endif    // APSTUDIO_INVOKED

/////////////////////////////////////////////////////////////////////////////
//
// Dialog
//

IDD_IOCPSERVER DIALOGEX 0, 0, 309, 176
STYLE DS_SETFONT | DS_MODALFRAME | DS_CENTER | WS_MINIMIZEBOX | WS_POPUP | WS_CAPTION | WS_SYSMENU
CAPTION "IOCP服务器"
FONT 10, "宋体", 400, 0, 0x86
BEGIN
DEFPUSHBUTTON   "开始监听",IDOK,40,153,50,14
PUSHBUTTON      "退出",IDCANCEL,184,153,50,14
PUSHBUTTON      "停止监听",IDC_STOP,112,153,50,14
LTEXT           "服务器IP地址:",IDC_STATIC,25,137,56,8
LTEXT           "127.0.0.1",IDC_SERVERIP,85,138,61,8
LTEXT           "监听端口:",IDC_STATIC,186,137,36,8
EDITTEXT        IDC_PORT,227,135,40,14,ES_AUTOHSCROLL
CONTROL         "",IDC_LIST_INFO,"SysListView32",LVS_REPORT | LVS_SINGLESEL | LVS_ALIGNLEFT | WS_BORDER | WS_TABSTOP,5,7,297,125
END

/////////////////////////////////////////////////////////////////////////////
//
// DESIGNINFO
//

#ifdef APSTUDIO_INVOKED
GUIDELINES DESIGNINFO
BEGIN
IDD_IOCPSERVER, DIALOG
BEGIN
END
END
#endif    // APSTUDIO_INVOKED

#endif    // 中文(简体,中国) resources
/////////////////////////////////////////////////////////////////////////////

#ifndef APSTUDIO_INVOKED
/////////////////////////////////////////////////////////////////////////////
//
// Generated from the TEXTINCLUDE 3 resource.
//

/////////////////////////////////////////////////////////////////////////////
#endif    // not APSTUDIO_INVOKED


【客户端】

//main.c

/*************************************************************************
Moudule:  main.cpp
Notices:  Copyright(c) 2015 浅墨浓香
*************************************************************************/
#include "../../CommonFiles/CmnHdr.h"
#include "resource.h"
#include "IOCPClient.h"
#include <tchar.h>

//////////////////////////////////////////////////////////////////////////
CClient g_Client;

//////////////////////////////////////////////////////////////////////////
//向列表示中加入信息
inline void AddInformation(HWND hwndLv, PCTSTR pszInfo)
{
LVITEM lv;
memset(&lv, 0, sizeof(lv));
lv.mask = LVIF_TEXT | LVIF_PARAM;
#ifdef UNICODE
lv.pszText = (PWSTR)pszInfo;
#else
USES_CONVERSION;
lv.pszText = A2W((PSTR)pszInfo);
#endif
lv.lParam = (LPARAM)ListView_GetItemCount(hwndLv);
ListView_InsertItem(hwndLv, &lv);
}

//////////////////////////////////////////////////////////////////////////
//初始化ListView
void InitListCtrl(HWND hwndLv){
ListView_SetExtendedListViewStyle(hwndLv, LVS_EX_FULLROWSELECT | LVS_EX_GRIDLINES);
LVCOLUMN lvCol;
//设置第1列为Information
lvCol.mask = LVCF_FMT | LVCF_ORDER | LVCF_TEXT | LVCF_WIDTH;
lvCol.fmt = LVCFMT_CENTER;
lvCol.cx = 420;
lvCol.pszText = TEXT("信息");
lvCol.cchTextMax = 4;
lvCol.iOrder = 0;
ListView_InsertColumn(hwndLv, 0, &lvCol);
}

//////////////////////////////////////////////////////////////////////////
//初始化界面信息
void InitGUI(HWND hwnd){

//设置本机IP
SetDlgItemText(hwnd, IDC_IPADDRESS_SERVER, g_Client.GetLocalIP());

//设置默认端口
SetDlgItemInt(hwnd, IDC_EDIT_PORT, DEFAULT_PORT,FALSE);

//设置默认的并发线程数
SetDlgItemInt(hwnd, IDC_EDIT_THREADS, DEFAULT_THREADS, FALSE);

//设置默认发送信息
SetDlgItemText(hwnd, IDC_EDIT_MESSAGE, DEFAULT_MESSAGE);

//初始化列表
InitListCtrl(GetDlgItem(hwnd, IDC_LIST_INFO));
}

//////////////////////////////////////////////////////////////////////////
BOOL Dlg_OnInitDialog(HWND hwnd, HWND hwndFocus, LPARAM lParam){

g_Client.m_hwnd = hwnd;
InitGUI(hwnd);

return TRUE;
}

//
DWORD WINAPI _StopProc(PVOID pvParam){
g_Client.Stop();
return 0;
}

//////////////////////////////////////////////////////////////////////////
void Dlg_OnCommand(HWND hwnd, int id, HWND hwndCtrl, UINT codeNotity){

switch (id)
{
case IDCANCEL:
//停止监听
CloseHandle(chBEGINTHREADEX(NULL, 0, _StopProc, NULL, 0, NULL));

EndDialog(hwnd, id);
break;

case IDOK:
{
int nPort, nThreads;
TCHAR szIP[32], szMessage[MAX_BUFFER_LEN];
nPort = GetDlgItemInt(hwnd, IDC_EDIT_PORT, NULL, FALSE);
nThreads = GetDlgItemInt(hwnd, IDC_EDIT_THREADS, NULL, FALSE);
GetDlgItemText(hwnd, IDC_IPADDRESS_SERVER, szIP, _countof(szIP));
GetDlgItemText(hwnd, IDC_EDIT_MESSAGE, szMessage, _countof(szMessage));

if ((lstrlen(szIP) == 0) || (lstrlen(szMessage) == 0) || (nPort <= 0) || (nThreads <= 0)) {
MessageBox(hwnd, TEXT("请输入合法的参数!"), TEXT("错误"), MB_OK | MB_ICONERROR);
return;
}

//给CClient设置参数
g_Client.SetIP(szIP);
g_Client.SetPort(nPort);
g_Client.SetThreads(nThreads);
g_Client.SetMessage(szMessage);

//开始
if (!g_Client.Start()){
MessageBox(hwnd, TEXT("启动失败!"), TEXT("错误"), MB_OK | MB_ICONERROR);
return;
}

AddInformation(GetDlgItem(hwnd, IDC_LIST_INFO), TEXT("测试开始"));

EnableWindow(GetDlgItem(hwnd, IDOK), FALSE);
EnableWindow(GetDlgItem(hwnd, IDC_STOP), TRUE);
}

break;

case IDC_STOP:
{
HANDLE hThread = chBEGINTHREADEX(NULL, 0, _StopProc, NULL, 0, NULL);
CloseHandle(hThread);
AddInformation(GetDlgItem(hwnd, IDC_LIST_INFO), TEXT("测试结束"));

//g_Client.Stop();

EnableWindow(GetDlgItem(hwnd, IDC_STOP), FALSE);
EnableWindow(GetDlgItem(hwnd, IDOK), TRUE);
}

break;

}
}

//////////////////////////////////////////////////////////////////////////

//////////////////////////////////////////////////////////////////////////
INT_PTR WINAPI Dlg_Proc(HWND hwnd, UINT uMsg, WPARAM wParam, LPARAM lParam){
switch (uMsg)
{
chHANDLE_DLGMSG(hwnd, WM_INITDIALOG, Dlg_OnInitDialog);
chHANDLE_DLGMSG(hwnd, WM_COMMAND, Dlg_OnCommand);
}
return FALSE;
}

//////////////////////////////////////////////////////////////////////////
int WINAPI _tWinMain(HINSTANCE hInstExe, HINSTANCE, PTSTR pszCmdLine, int){
DialogBox(hInstExe, MAKEINTRESOURCE(IDD_IOCPCLIENT), NULL, Dlg_Proc);
return 0;
}


//IOCPClient.h

/*************************************************************************
多线程阻塞式Socket,每个线程只发送一次数据
*************************************************************************/
#pragma  once
#include "../../CommonFiles/CmnHdr.h"
#include <tchar.h>

//缓冲区长度
#define MAX_BUFFER_LEN 8196 //8*1024字节,即8K
#define DEFAULT_PORT      12345  //默认端口
#define DEFAULT_IP        TEXT("127.0.0.1")   //默认IP
#define DEFAULT_THREADS   100  //默认并发线程数
#define DEFAULT_MESSAGE   TEXT("Hello!") //默认的发送信息

//////////////////////////////////////////////////////////////////////////
//向列表示中加入信息
extern inline void AddInformation(HWND hwndLv, PCTSTR pszInfo);

//////////////////////////////////////////////////////////////////////////
class CClient;

//用于发送数据的线程参数
typedef struct _tagThreadParams_WORKER{
CClient* pClient;     //类指针,用于调用类中的函数
SOCKET   sock;        //每个线程使用的Socket
int nThreadNo;        //线程编号
TCHAR szBuffer[MAX_BUFFER_LEN];

~_tagThreadParams_WORKER(){
if (sock !=NULL){
closesocket(sock);
}
}
}THREADPARAMS_WORKER,*PTHREADPARAMS_WORKER;

//产生Socket连接的线程
typedef struct _tagThreadParams_CONNECTION{
CClient* pClient;
}THREADPARAMS_CONNECTION,*PTHREADPARAMS_CONNECTION;

class CClient
{
public:
CClient();
~CClient();

public:

// 加载Socket库
bool LoadSocketLib();
// 卸载Socket库,彻底完事
void UnloadSocketLib() { WSACleanup(); }

//开始测试
BOOL Start();

//停止测试
void Stop();

//获取本机IP
TCHAR* GetLocalIP();

//设置连接IP
void SetIP(TCHAR* szServerIP){ memcpy(m_szServerIP, szServerIP, sizeof(m_szServerIP)); }
//设置监听端口
void SetPort(const int& nPort){m_nPort = nPort;}
//设置并发线程数量
void SetThreads(const int& n){ m_nThreads = n; }

//设置要发送的消息
void SetMessage(PCTSTR strMessage){ memcpy(m_strMessage, strMessage, sizeof(m_strMessage)); }

//在主界面中显示信息
void ShowMessage(PCTSTR strInfo, ...) const;

private:
//建立连接
BOOL EstablishConnections();

//向服务器进行连接
BOOL ConnectToServer(SOCKET* pSocket, TCHAR* strServer, int nPort);

//用于建立连接的线程
static DWORD WINAPI _ConnectionThread(LPVOID lpParam);
//用于发送信息的线程
static DWORD WINAPI _WorkerThread(LPVOID lpParam);
//释放资源
void Cleanup();

//等待函数,超过64个对象
DWORD SyncWaitForMultipleObjs(HANDLE * handles, size_t count);
public:
HWND  m_hwnd;  //主窗口
private:
TCHAR m_szServerIP[32];  //服务器端IP地址
TCHAR m_szLocalIP[32];  //本机IP地址
TCHAR m_strMessage[MAX_BUFFER_LEN];  //发给服务器的信息
int   m_nPort;       //监听端口
int   m_nThreads;    //并发线程数量
int   m_nThreadsNum; //实际并发的线程数量

HANDLE* m_phWorkerThreads;
HANDLE m_hConnectionThread;  //接受连接的线程句柄
static HANDLE m_hShutdownEvent;     //用来通知线程退出的事件对象

THREADPARAMS_WORKER *m_pParamsWorker;  //线程参数

};


//IOCPClient.cpp

#include "IOCPClient.h"
#include <atlconv.h>  //ATL库提供了更简便的字符集转换函数
#include <ws2tcpip.h> //For getaddrinfo函数
#include <strsafe.h>
#include "resource.h"
#include <assert.h>

#pragma comment(lib,"ws2_32.lib")

//////////////////////////////////////////////////////////////////////////
#define RELEASE_HANDLE(x)               {if(x != NULL && x!=INVALID_HANDLE_VALUE){ CloseHandle(x);x = NULL;}}
#define RELEASE(x)                      {if(x != NULL ){delete x;x=NULL;}}
#define RELEASEX(x)                     {if(x != NULL ){delete[] x;x=NULL;}}
//////////////////////////////////////////////////////////////////////////
HANDLE CClient::m_hShutdownEvent = NULL;

//////////////////////////////////////////////////////////////////////////
//用于建立连接的线程
DWORD WINAPI CClient::_ConnectionThread(LPVOID lpParam){
THREADPARAMS_CONNECTION* pParams = (THREADPARAMS_CONNECTION*)lpParam;
CClient* pClient = (CClient*)pParams->pClient;

pClient->EstablishConnections();

pClient->SyncWaitForMultipleObjs(pClient->m_phWorkerThreads, pClient->m_nThreadsNum);

RELEASE(pParams);

return 0;
}

//用于发送信息的线程
DWORD WINAPI CClient::_WorkerThread(LPVOID lpParam){

THREADPARAMS_WORKER* pParams = (PTHREADPARAMS_WORKER)lpParam;
CClient* pClient = pParams->pClient;

TCHAR  szTemp[MAX_BUFFER_LEN];
memset(szTemp, 0, sizeof(szTemp));
TCHAR szRecv[MAX_BUFFER_LEN];
memset(szRecv, 0, sizeof(szRecv));

int nBytesSend = 0;

//向服务器发送信息,发送3次
for (int i = 0; i < 3;i++){

if (WAIT_OBJECT_0 == WaitForSingleObject(m_hShutdownEvent, 0))
return 0;
StringCchPrintf(szTemp, _countof(szTemp), TEXT("第%d条信息:%s\n"),i+1, pParams->szBuffer);
nBytesSend = send(pParams->sock, (char*)szTemp,lstrlen(szTemp)*sizeof(TCHAR), 0);
if (SOCKET_ERROR == nBytesSend){
return 1;  //错误!
}

pClient->ShowMessage(TEXT("向服务器发送信息成功:%s"), szTemp);
Sleep(3000);
}

if (pParams->nThreadNo == pClient->m_nThreads){
pClient->ShowMessage(TEXT("测试并发%d个线程完毕。"), pClient->m_nThreads);
}

return 0;
}

//////////////////////////////////////////////////////////////////////////
//建立连接
BOOL CClient::EstablishConnections(){
DWORD nThreadID;
m_nThreadsNum = 0;
m_phWorkerThreads = new HANDLE[m_nThreads];
m_pParamsWorker = new THREADPARAMS_WORKER[m_nThreads];

if ((m_phWorkerThreads == NULL) || (m_pParamsWorker == NULL))
return FALSE;

//根据用户设置的线程数量,生成每一个线程连接到服务器,并生成线程发送数据
for (int i = 0; i < m_nThreads;i++){

//监听用户停止事件
if (WAIT_OBJECT_0 == WaitForSingleObject(m_hShutdownEvent,0) || m_hShutdownEvent == NULL)
break;

//连接服务器
if (!ConnectToServer(&m_pParamsWorker[i].sock, m_szServerIP, m_nPort)){
ShowMessage(TEXT("连接服务器失败,错误代码:%d!"), WSAGetLastError());
Cleanup();
break;
}

m_pParamsWorker[i].nThreadNo = i + 1;
StringCchPrintf(m_pParamsWorker[i].szBuffer, _countof(m_pParamsWorker[i].szBuffer), TEXT("%d号线程,发送数据%s"),
i+1,m_strMessage);

Sleep(10);

//如果连接服务器成功,就开始建立工作者线程,向服务器发送指定的数据
m_pParamsWorker[i].pClient = this;
m_phWorkerThreads[i] = chBEGINTHREADEX(0, 0, _WorkerThread,
(PVOID)(&m_pParamsWorker[i]), 0, &nThreadID);
m_nThreadsNum++;
}
return TRUE;
}

//向服务器进行连接
BOOL CClient::ConnectToServer(SOCKET* pSocket, TCHAR* strServer, int nPort){
SOCKADDR_IN ServerAddress = {0};

//生成SOCKET
*pSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (INVALID_SOCKET == *pSocket)
return FALSE;

//生成地址信息
char* szServeName;
#ifdef UNICODE
USES_CONVERSION;
szServeName = W2A(strServer);
#else
szServeName = (char*)szServeName;
#endif

ServerAddress.sin_family = AF_INET;
inet_pton(AF_INET, szServeName, (PVOID)&ServerAddress.sin_addr.S_un.S_addr);
ServerAddress.sin_port = htons(m_nPort);

//开始连接服务器
if (SOCKET_ERROR == connect(*pSocket,(const struct sockaddr*)(&ServerAddress),
sizeof(ServerAddress))){
closesocket(*pSocket);
return FALSE;
}

return TRUE;
}

//////////////////////////////////////////////////////////////////////////
CClient::CClient():m_nThreads(DEFAULT_THREADS),m_nPort(DEFAULT_PORT),
m_phWorkerThreads(NULL), m_hConnectionThread(NULL), m_nThreadsNum(0)
{
StringCchCopy(m_szServerIP, _countof(m_szServerIP), DEFAULT_IP);
StringCchCopy(m_szLocalIP, _countof(m_szLocalIP), DEFAULT_IP);
StringCchCopy(m_strMessage, _countof(m_strMessage), DEFAULT_IP);

//创建退出的事件通知对象
m_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL);

LoadSocketLib();
}

CClient::~CClient()
{
Stop();
if (m_hShutdownEvent != NULL)
RELEASE_HANDLE(m_hShutdownEvent);
UnloadSocketLib();
}

//////////////////////////////////////////////////////////////////////////
//开始监听
BOOL CClient::Start(){

ResetEvent(m_hShutdownEvent);

//启动连接线程
DWORD nThreadID;
THREADPARAMS_WORKER* pThreadParams = new THREADPARAMS_WORKER;
pThreadParams->pClient = this;
m_hConnectionThread = chBEGINTHREADEX(0, 0,
_ConnectionThread,
(PVOID)pThreadParams, 0,&nThreadID);
return TRUE;
}

//停止监听
void CClient::Stop(){

if (m_hShutdownEvent == NULL) return;

SetEvent(m_hShutdownEvent);

//等待Connection线程退出
WaitForSingleObject(m_hConnectionThread, INFINITE);

//关闭所有的Socket
for (int i = 0; i < m_nThreadsNum; i++){
closesocket(m_pParamsWorker[i].sock);
}

//等待所有的工作者线程退出
//SyncWaitForMultipleObjs(m_phWorkerThreads, m_nThreadsNum);
////WaitForMultipleObjects(m_nThreadsNum, m_phWorkerThreads, TRUE, INFINITE); //该函数最多只能等64个内核对象
//for (int i = 0; i < m_nThreadsNum;i++){
//    if (m_phWorkerThreads[i] != NULL)
//        CloseHandle(m_phWorkerThreads[i]);
//}

//清空资源
Cleanup();
}

//清空资源
void CClient::Cleanup(){

RELEASE(m_phWorkerThreads);
RELEASE_HANDLE(m_hConnectionThread);
RELEASEX(m_pParamsWorker);
RELEASE_HANDLE(m_hShutdownEvent);
m_nThreadsNum = 0;
}

//////////////////////////////////////////////////////////////////////////
//初始化WinSock2.2
bool CClient::LoadSocketLib()
{
WSADATA wsaData;
int nRet = WSAStartup(MAKEWORD(2, 2), &wsaData);

if (NO_ERROR !=nRet){
ShowMessage(TEXT("初始化WinSock2.2失败!\n"));
return FALSE;
}
return TRUE;
}

//////////////////////////////////////////////////////////////////////////
//获取本机IP
TCHAR* CClient::GetLocalIP(){
////获得本机主机名
char hostname[MAX_PATH] = { 0 };
gethostname(hostname, MAX_PATH);

struct addrinfo *pRet = NULL;//通过result指针指向addrinfo结构链表的指针
struct addrinfo hints;//需要<ws2tcpip.h>
ZeroMemory(&hints, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
DWORD dwRetval = getaddrinfo(hostname, NULL, &hints, &pRet);//返回值0代表成功
if (dwRetval != 0)
return m_szLocalIP;

SOCKADDR_IN  *sockaddr_ipv4 = (SOCKADDR_IN*)pRet->ai_addr;
//将IP地址转化成字符串形式
char inAddr[16];
inet_ntop(AF_INET, (void*)&sockaddr_ipv4->sin_addr, inAddr, sizeof(inAddr));

#ifdef UNICODE
USES_CONVERSION;
memcpy(m_szLocalIP,A2W(inAddr),sizeof(m_szLocalIP));
#else
memcpy(m_szLocalIP,inAddr,sizeof(m_szLocalIP));
#endif

return m_szLocalIP;
}

//在主界面中显示信息
void  CClient::ShowMessage(PCTSTR strInfo, ...) const{

if (NULL == m_hwnd)
return;

//根据传入的参数格式化字符串
TCHAR szMsg[1024];

va_list arglist;
va_start(arglist, strInfo);
_vstprintf_s(szMsg, _countof(szMsg),strInfo, arglist);
va_end(arglist);

AddInformation(GetDlgItem(m_hwnd, IDC_LIST_INFO), szMsg);
}

/*
* Synchronically waiting for all objects signaled.
* - handles : An array of object handles to wait.
* - count   : The count of handles.
* returns : Same as WaitForMultipleObjects.
*/
DWORD CClient::SyncWaitForMultipleObjs(HANDLE * handles, size_t count){

int waitingThreadsCount = count;
int index = 0;
DWORD res = 0;
while (waitingThreadsCount >= MAXIMUM_WAIT_OBJECTS)
{
res = WaitForMultipleObjects(MAXIMUM_WAIT_OBJECTS, &handles[index], TRUE, INFINITE);
assert(res == 0);

if (res == WAIT_TIMEOUT || res == WAIT_FAILED)
{
//puts("1. Wait Failed.");
return res;
}

waitingThreadsCount -= MAXIMUM_WAIT_OBJECTS;
index += MAXIMUM_WAIT_OBJECTS;
}

if (waitingThreadsCount > 0)
{
res = WaitForMultipleObjects(waitingThreadsCount, &handles[index], TRUE, INFINITE);
assert(res == 0);
if (res == WAIT_TIMEOUT || res == WAIT_FAILED)
{
//puts("2. Wait Failed.");
}
}

return res;

}


//resource.h

//{{NO_DEPENDENCIES}}
// Microsoft Visual C++ 生成的包含文件。
// 供 10_IOCPClient.rc 使用
//
#define IDD_IOCPCLIENT                  101
#define IDC_LIST_INFO                   1000
#define IDC_STOP                        1001
#define IDC_EDIT_PORT                   1002
#define IDC_IPADDRESS_SERVER            1003
#define IDC_EDIT_THREADS                1004
#define IDC_EDIT_MESSAGE                1005

// Next default values for new objects
//
#ifdef APSTUDIO_INVOKED
#ifndef APSTUDIO_READONLY_SYMBOLS
#define _APS_NEXT_RESOURCE_VALUE        102
#define _APS_NEXT_COMMAND_VALUE         40001
#define _APS_NEXT_CONTROL_VALUE         1002
#define _APS_NEXT_SYMED_VALUE           101
#endif
#endif


//Client.rc

// Microsoft Visual C++ generated resource script.
//
#include "resource.h"

#define APSTUDIO_READONLY_SYMBOLS
/////////////////////////////////////////////////////////////////////////////
//
// Generated from the TEXTINCLUDE 2 resource.
//
#include "winres.h"

/////////////////////////////////////////////////////////////////////////////
#undef APSTUDIO_READONLY_SYMBOLS

/////////////////////////////////////////////////////////////////////////////
// 中文(简体,中国) resources

#if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_CHS)
LANGUAGE LANG_CHINESE, SUBLANG_CHINESE_SIMPLIFIED

#ifdef APSTUDIO_INVOKED
/////////////////////////////////////////////////////////////////////////////
//
// TEXTINCLUDE
//

1 TEXTINCLUDE
BEGIN
"resource.h\0"
END

2 TEXTINCLUDE
BEGIN
"#include ""winres.h""\r\n"
"\0"
END

3 TEXTINCLUDE
BEGIN
"\r\n"
"\0"
END

#endif    // APSTUDIO_INVOKED

/////////////////////////////////////////////////////////////////////////////
//
// Dialog
//

IDD_IOCPCLIENT DIALOGEX 0, 0, 309, 220
STYLE DS_SETFONT | DS_MODALFRAME | DS_CENTER | WS_MINIMIZEBOX | WS_POPUP | WS_VISIBLE | WS_CAPTION | WS_SYSMENU
CAPTION "服务器压力测试工具"
FONT 9, "宋体", 400, 0, 0x86
BEGIN
LTEXT           "服务器IP地址:",IDC_STATIC,14,21,70,8
LTEXT           "连接端口:",IDC_STATIC,215,23,43,8
EDITTEXT        IDC_EDIT_PORT,255,19,40,14,ES_AUTOHSCROLL
CONTROL         "",IDC_IPADDRESS_SERVER,"SysIPAddress32",WS_TABSTOP,69,18,132,15
LTEXT           "并发线程数:",IDC_STATIC,207,42,51,8
LTEXT           "发送信息:",IDC_STATIC,16,42,47,8
EDITTEXT        IDC_EDIT_THREADS,255,39,40,14,ES_AUTOHSCROLL
EDITTEXT        IDC_EDIT_MESSAGE,60,39,141,14,ES_AUTOHSCROLL
GROUPBOX        "参数设置",IDC_STATIC,7,7,295,54
DEFPUSHBUTTON   "开始测试",IDOK,35,200,50,14
PUSHBUTTON      "退出",IDCANCEL,223,200,50,14
PUSHBUTTON      "停止测试",IDC_STOP,128,200,50,14,WS_DISABLED
CONTROL         "",IDC_LIST_INFO,"SysListView32",LVS_REPORT | LVS_ALIGNLEFT | WS_BORDER | WS_TABSTOP,9,65,294,130
END

/////////////////////////////////////////////////////////////////////////////
//
// DESIGNINFO
//

#ifdef APSTUDIO_INVOKED
GUIDELINES DESIGNINFO
BEGIN
IDD_IOCPCLIENT, DIALOG
BEGIN
BOTTOMMARGIN, 217
END
END
#endif    // APSTUDIO_INVOKED

#endif    // 中文(简体,中国) resources
/////////////////////////////////////////////////////////////////////////////

#ifndef APSTUDIO_INVOKED
/////////////////////////////////////////////////////////////////////////////
//
// Generated from the TEXTINCLUDE 3 resource.
//

/////////////////////////////////////////////////////////////////////////////
#endif    // not APSTUDIO_INVOKED
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: