您的位置:首页 > 编程语言 > C语言/C++

WsaEventSelect封装的类,多线程

2014-03-08 20:51 495 查看
   最近工作上需要一个小的tcp服务器,就想到了用WsaEventSelect去管理socket, 遂在网上找了下资料,有个多线程的Wsaeventselect的C源码,就想将其封装为类,用了两天(好吧,水平有限)。其中有个问题,是客户端如果closesocket太快的话,服务端这边会收不到FD_CLOSE消息,在客户端[b]closesocket前加了个sleep就正常了,查了资料貌似是和tcp关闭时候的四次握手有关。上代码[/b]

#pragma once

// WSAEventSelect IO模型服务器实现
//多线程共享同一个监听套接字socketListen
//各个线程各自管理自己的socket客户连接

#include <WinSock2.h>
#include <windows.h>
#include <list>
#pragma comment(lib, "ws2_32.lib")

using std::list;

#define DELPOINT(p) if ((p) != NULL) delete (p)
#define CLOSESOCKET(S) if ((S) != INVALID_SOCKET) ::closesocket((s))
#define ENTERLOCK(l) ::EnterCriticalSection((l))
#define LEAVELOCK(l) ::LeaveCriticalSection((l))
#define PORT 9324

class CWsaEventSelect;

//套接字对象
typedef struct ST_SOCKET_OBJ
{
SOCKET s;
HANDLE hEvent;
SOCKADDR_IN addrClient;
~ST_SOCKET_OBJ()
{
::CloseHandle(hEvent);
CLOSESOCKET(s);
}

}SOCKETOBJ, *PSOCKETOBJ;

//线程对象
typedef struct ST_THREAD_OBJ
{
HANDLE hEvents[WSA_MAXIMUM_WAIT_EVENTS];
int nSocketCount;//当前线程管理的socket数量
list<PSOCKETOBJ> lstPSocket; //当前线程处理的socket列表
CRITICAL_SECTION cs; //用于本结构的同步访问

ST_THREAD_OBJ()
{
nSocketCount = 0;
::InitializeCriticalSection(&cs);
hEvents[0] = ::WSACreateEvent();
}

~ST_THREAD_OBJ()
{
nSocketCount = 0;
CloseHandle(hEvents[0]);

//释放socket对象
ENTERLOCK(&cs);
for (list<PSOCKETOBJ>::iterator it = lstPSocket.begin();\
it != lstPSocket.end(); it++)
{
DELPOINT(*it);
}
lstPSocket.clear();
LEAVELOCK(&cs);
::DeleteCriticalSection(&cs);
}

//重建线程对象的events数组,将psocket中的event和pthread中的events关联起来
void RebulidArray()
{
list<PSOCKETOBJ>::iterator it;
int iIndex = 1; //下标从1开始
ENTERLOCK(&cs);
for (it = lstPSocket.begin(); it != lstPSocket.end();\
it++)
{
hEvents[iIndex++] = (*it)->hEvent;
}
LEAVELOCK(&cs);
}

//查找相应序号的socket对象
PSOCKETOBJ FindSocketObj(int iIndex)
{
ENTERLOCK(&cs);
list<PSOCKETOBJ>::iterator it = lstPSocket.begin();
for (; it != lstPSocket.end()\
&& iIndex > 0; it++, iIndex--);
if (it == lstPSocket.end())
{
LEAVELOCK(&cs);
return NULL;
}
LEAVELOCK(&cs);
return *it;
}

//删除指定的socket对象
void RemoveSocketObj(PSOCKETOBJ pSocket)
{
ENTERLOCK(&cs);
list<PSOCKETOBJ>::iterator it = lstPSocket.begin();
for (; it != lstPSocket.end(); it++)
{
if (*it == pSocket)
{
DELPOINT(*it);
lstPSocket.erase(it);
nSocketCount--;
break;
}
}
LEAVELOCK(&cs);
}

}THREADOBJ, *PTHREADOBJ;

//用于向线程传递参数
typedef struct Thread_Param
{
CWsaEventSelect *pMain;
PTHREADOBJ pThread;
}THREADPARAM, *PTHREADPARAM;

class CWsaEventSelect
{
public:
CWsaEventSelect();
~CWsaEventSelect();

//每个线程管理的最大socket数量
enum {MAX_SOCKET_COUNT_PER_THREAD = WSA_MAXIMUM_WAIT_EVENTS - 1};

public:
//初始化socket
BOOL InitSocket();

//等待客户端连接
BOOL Start();

//删除线程对象
void RemoveThreadObj(PTHREADOBJ pThread);

private:

//创建一个socket对象
PSOCKETOBJ GetSocketObj(SOCKET s);

//创建一个ThreadObj对象
PTHREADOBJ GetThreadObj();

//重建线程对象的events数组,将psocket中的event和pthread中的events关联起来
void RebulidArray(PTHREADOBJ pThread);

//简单的线程管理
void AssignToThread(PSOCKETOBJ pSock);

//处理业务逻辑
BOOL HandleIO(PTHREADOBJ pThread, PSOCKETOBJ pSocket);

//工作线程
static DWORD WINAPI WorkThread(LPVOID lParam);
private:

//用于同步线程对象表
CRITICAL_SECTION m_csLock;

//线程对象列表
list<PTHREADOBJ> m_lstPThread;

//当前连接数
int m_iCurConnections;

//当前线程数
int m_iCurThreadNum;

//监听socket
SOCKET m_sckListener;

//初始化是否成功
BOOL m_bInit;
};
#include "WsaEventSelectObj.h"
#include <iostream>
using std::cout;
using std::endl;

CWsaEventSelect::CWsaEventSelect():
m_iCurConnections(0),
m_sckListener(INVALID_SOCKET),
m_bInit(FALSE),
m_iCurThreadNum(0)
{
::InitializeCriticalSection(&m_csLock);
}

CWsaEventSelect::~CWsaEventSelect()
{

}

//初始化socket
BOOL CWsaEventSelect::InitSocket()
{
WSADATA wsaData;

int err = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (err != 0)
{
cout << "WSAStartup 失败, 错误" << err << endl;
return FALSE;
}
if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2)
{
WSACleanup();
return FALSE;
}
m_bInit = TRUE;
return TRUE;
}

PSOCKETOBJ CWsaEventSelect::GetSocketObj(SOCKET s)
{
PSOCKETOBJ pSocket = new SOCKETOBJ;
if (pSocket != NULL)
{
pSocket->s = s;
pSocket->hEvent = ::WSACreateEvent();
}
return pSocket;
}

//创建一个ThreadObj对象
PTHREADOBJ CWsaEventSelect::GetThreadObj()
{
PTHREADOBJ pThread = new THREADOBJ;
if (pThread != NULL)
{
m_lstPThread.push_back(pThread);
}

return pThread;
}

//等待客户端连接
BOOL CWsaEventSelect::Start()
{
if (!m_bInit)
{
return FALSE;
}
m_sckListener = socket(AF_INET, SOCK_STREAM, 0);
SOCKADDR_IN addr;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);

if (SOCKET_ERROR == bind(m_sckListener, (SOCKADDR*)&addr, sizeof(SOCKADDR)))
{
//绑定失败
DWORD dwErr = ::WSAGetLastError();
return FALSE;
}

listen(m_sckListener, SOMAXCONN);

WSAEVENT wsaEvent = ::WSACreateEvent();
::WSAEventSelect(m_sckListener, wsaEvent, FD_ACCEPT);
while (TRUE)
{
int iRet = ::WSAWaitForMultipleEvents(1, &wsaEvent, FALSE, 10000, FALSE);
if (iRet == WSA_WAIT_FAILED)
{
//失败退出监听
break;
}
else if (iRet == WSA_WAIT_TIMEOUT)
{
cout << "当前连接数 " << m_iCurConnections << endl;
cout << "当前线程数 " << m_iCurThreadNum << endl;
cout << endl;
}
else
{
WSANETWORKEVENTS wsaNetEvent;
::WSAEnumNetworkEvents(m_sckListener, wsaEvent, &wsaNetEvent);
if (wsaNetEvent.lNetworkEvents & FD_ACCEPT)
{
if (wsaNetEvent.iErrorCode[FD_ACCEPT_BIT] == 0)
{
while(TRUE)
{
//把等待连接队列中的用户都接受建立请求,建立后立即派送给相关的服务线程处理
SOCKADDR_IN addrClient;
int iLen = sizeof(SOCKADDR);
SOCKET s = accept(m_sckListener, (SOCKADDR*)&addrClient,\
&iLen);
if (s == SOCKET_ERROR)
{
//没有连接请求了
break;
}
PSOCKETOBJ pSock = GetSocketObj(s);
if (pSock == NULL)
{
continue;
}
memcpy_s(&pSock->addrClient, sizeof(SOCKADDR_IN), &addrClient,\
sizeof(SOCKADDR_IN));
cout << "客户端连接 " << inet_ntoa(addrClient.sin_addr) << ":"\
<< ntohs(addrClient.sin_port) << endl;

//设置客户端socket关心的事件
::WSAEventSelect(pSock->s, pSock->hEvent, FD_READ | FD_CLOSE);
//交给线程管理
AssignToThread(pSock);
}
}
}
}
}
return TRUE;
}

//简单的线程管理
void CWsaEventSelect::AssignToThread(PSOCKETOBJ pSock)
{
ENTERLOCK(&m_csLock);//进入类成员锁
list<PTHREADOBJ>::iterator it = m_lstPThread.begin();

//查找一个管理的socket不是满的线程,插入这个线程
for (;it != m_lstPThread.end(); it++)
{
ENTERLOCK(&(*it)->cs); //进入线程对象锁
if ((*it)->nSocketCount < MAX_SOCKET_COUNT_PER_THREAD)
{
(*it)->lstPSocket.push_back(pSock);
(*it)->nSocketCount++;
//通知线程重建数组
::WSASetEvent((*it)->hEvents[0]);
LEAVELOCK(&(*it)->cs);
break;
}
LEAVELOCK(&(*it)->cs);
}
//没有空闲的线程
if (it == m_lstPThread.end())
{
//新建一个空闲线程对象,将pSock放入新线程对象中

//线程参数由线程去释放
PTHREADPARAM pParam = new THREADPARAM;

//线程对象
PTHREADOBJ pThread = new THREADOBJ;
ENTERLOCK(&pThread->cs);//进行线程对象锁
pThread->lstPSocket.push_back(pSock);
pThread->nSocketCount++;
LEAVELOCK(&pThread->cs);
m_lstPThread.push_front(pThread);

pParam->pMain = this;
pParam->pThread = pThread;
HANDLE hT = ::CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)WorkThread, (LPVOID)pParam, 0, NULL);
::InterlockedIncrement((LONG*)&m_iCurThreadNum);
::CloseHandle(hT);
//通知线程重建数组
::WSASetEvent(pThread->hEvents[0]);
}
LEAVELOCK(&m_csLock);
//当前连接数加1
::InterlockedIncrement((LONG*)&m_iCurConnections);
}

//工作线程
DWORD WINAPI CWsaEventSelect::WorkThread(LPVOID lParam)
{
PTHREADPARAM pParam = (PTHREADPARAM)lParam;
PTHREADOBJ pThread = pParam->pThread;
CWsaEventSelect *pMain = pParam->pMain;
//释放参数
DELPOINT(pParam);
while(TRUE)
{
HANDLE *pEvents;
int nCount;
//这里直接访问可能会有同步问题
ENTERLOCK(&pThread->cs);
pEvents = new HANDLE[pThread->nSocketCount + 1];
nCount = pThread->nSocketCount;
memcpy_s(pEvents, (nCount + 1) * sizeof(HANDLE), pThread->hEvents,\
(nCount + 1) * sizeof(HANDLE));
LEAVELOCK(&pThread->cs);

int iIndex = ::WSAWaitForMultipleEvents(nCount + 1, pEvents,\
FALSE, WSA_INFINITE, FALSE);
iIndex = iIndex - WSA_WAIT_EVENT_0;

//由于多个对象触发时只返回最小的那个索引,所以在这里循环查看所有可能受信的对象
for (int i = iIndex; i < nCount + 1; i++)
{
//如果对象是受信状态会立即返回
iIndex = ::WSAWaitForMultipleEvents(1, &pEvents[i], TRUE,\
0, FALSE);

if (iIndex == WSA_WAIT_FAILED || iIndex == WSA_WAIT_TIMEOUT)
{
//如果对象未受信则继续向后找
continue;
}
else
{
//第0个对象用于重建对应数组
if (0 == i)
{
pThread->RebulidArray();

ENTERLOCK(&pThread->cs);
if (pThread->nSocketCount <= 0)
{
pMain->RemoveThreadObj(pThread);
::InterlockedDecrement((LONG*)&pMain->m_iCurThreadNum);
//线程退出
//cout << "线程退出:" << ::GetCurrentThreadId() << endl;
delete []pEvents;
return 0;
}
LEAVELOCK(&pThread->cs);
::WSAResetEvent(pThread->hEvents[0]);
}
else
{
//得到socket对象
//如果是socket事件,则最小的序号为1,对应列表中的0索引
PSOCKETOBJ pSocket = pThread->FindSocketObj(i - 1);
if (pSocket != NULL)
{
if (!pMain->HandleIO(pThread, pSocket))//如果有客户端掉线
{
//重建句柄数组
pThread->RebulidArray();

ENTERLOCK(&pThread->cs);
if (pThread->nSocketCount <= 0)
{
pMain->RemoveThreadObj(pThread);
::InterlockedDecrement((LONG*)&pMain->m_iCurThreadNum);
//线程退出
//cout << "线程退出:" << ::GetCurrentThreadId() << endl;
delete []pEvents;
return 0;
}
LEAVELOCK(&pThread->cs);
}
}
else
{
//没找到序号为iIndex的socket对象
}
}
}
}
delete []pEvents;
}
}

//删除线程对象
void CWsaEventSelect::RemoveThreadObj(PTHREADOBJ pThread)
{
list<PTHREADOBJ>::iterator it;
ENTERLOCK(&m_csLock);
for (it = m_lstPThread.begin(); it != m_lstPThread.end(); it++)
{
if (*it == pThread)
{
DELPOINT(*it);
m_lstPThread.erase(it);
break;
}
}
LEAVELOCK(&m_csLock);
}

//处理业务逻辑
BOOL CWsaEventSelect::HandleIO(PTHREADOBJ pThread, PSOCKETOBJ pSocket)
{
WSANETWORKEVENTS wsaEvent;
::WSAEnumNetworkEvents(pSocket->s, pSocket->hEvent, &wsaEvent);

if (wsaEvent.lNetworkEvents & FD_READ)
{
if (wsaEvent.iErrorCode[FD_READ_BIT] == 0)
{
//业务处理
char szBuff[1024] = {0};
int iRecv = recv(pSocket->s, szBuff, 1024, 0);
if (iRecv > 0)
{
szBuff[iRecv] = '\0';
cout << szBuff << endl;
}
}
}
else if (wsaEvent.lNetworkEvents & FD_CLOSE)
{
if (wsaEvent.iErrorCode[FD_CLOSE_BIT] == 0)
{
//连接断开
//cout << "客户端退出 " << inet_ntoa(pSocket->addrClient.sin_addr) << endl;

pThread->RemoveSocketObj(pSocket);
::InterlockedDecrement((LONG*)&m_iCurConnections);
}
return FALSE;
}

return TRUE;
}

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息