您的位置:首页 > 编程语言 > C语言/C++

C++服务器重叠I/O+事件通告模型

2016-06-22 21:14 656 查看
#include<iostream>
#include<WinSock2.h>
#include <mswsock.h>
#include<vector>
using namespace std;
#pragma comment(lib,"Ws2_32.lib")
#pragma comment (lib, "mswsock.lib")
const int nPort=10000;
const int buf_len=1024;
//对于同一个客户连接套接字,任意时刻只能有一个为完成的异步I/O操作,要么是
//WSASend(IoWrite),要么是WSARecv(IoRead)
typedef enum _IO_OPERATION{
IoRead,
IoWrite
}IO_OPERATION;
//每一个连接都有一个Connection对象Connection对象包含一个WSAOVERLAPPED结构
//同时由于一个Connection只有一个WSAOVERLAPPED结构,并且由于一个I/O异步请求
//必须有一个唯一的WSAOVERLAPPED结构,因此任意时刻对于一个连接只能有一个未完成的
//异步I/O操作
struct Connection{
SOCKET hSocket;
char Buffer[buf_len];
int nBytes;
//调用WSASend或者WSARecv是需要一个WSABUF结构的指针
WSABUF wsaBuffer;
WSAOVERLAPPED overlap;
IO_OPERATION op;
Connection(SOCKET socket):hSocket(socket),nBytes(0)
{
wsaBuffer.buf=Buffer;
wsaBuffer.len=buf_len;
ZeroMemory(&overlap,sizeof(WSAOVERLAPPED));
//由于程序使用事件完成通知,因此需要为WSAOVERLAPPED结构创建一个时间内核对象
overlap.hEvent=WSACreateEvent();
}
};
typedef vector<Connection*> ConnectionList;
// 重置conns,把其中无效的套接字移除
void ResetConns(ConnectionList& conns){
ConnectionList::iterator it = conns.begin();
while(it != conns.end()){
if((*it)->hSocket == INVALID_SOCKET){
delete (*it);
it = conns.erase(it);
}
else
++it;
}
}
// 为WSAWaitForMultipleEvents填充好需要等待的事件内核对象数组
int FillEventArray(HANDLE hEvents[], HANDLE hListenEvent,
ConnectionList& conns){
// 监听套接字的事件对象放在最前面,之后依次填入当前所有客户连接套接字
// 的事件对象
hEvents[0] = hListenEvent;
int nEvents = 1;
ConnectionList::iterator it = conns.begin();
while(it != conns.end()){
// 使用WSAOVERLAPPED结构中的hEvent填充数组
hEvents[nEvents] = (*it)->overlap.hEvent;
++nEvents;
++it;
}
return (int)(conns.size() + 1);
}
// 异步AcceptEx请求已完成,获取结果
bool HandleAccept(SOCKET hListenSocket, SOCKET hAcceptSocket, LPOVERLAPPED
lpOverlapListen, ConnectionList& conns) {
DWORD flags = 0;
DWORD bytes = 0;
// 获取异步I/O的结果
if(!WSAGetOverlappedResult(hListenSocket, lpOverlapListen, &bytes,
FALSE, &flags))
{
cout<<"WSAGetOverlappedResult error "<< WSAGetLastError() << endl;
return false;
}
// 超出单线程所能处理的连接数
if(conns.size() + 1 >= WSA_MAXIMUM_WAIT_EVENTS){
cout << "exceed connection limit" << endl;
// 关闭已接受的客户连接,即拒绝服务
closesocket(hAcceptSocket);
return true;
}
// 为新接受的客户连接创建一个Connection对象
conns.push_back(new Connection(hAcceptSocket));
Connection* pConn = conns.back();
// 第一次的异步I/O请求是IoRead,因为对于回显服务器来说,必须先接收到数据后
// 才能回显数据
pConn->op = IoRead;
flags = 0;
// 对这个新的客户连接发出第一个异步I/O请求
int nRet = WSARecv(pConn->hSocket, &(pConn->wsaBuffer), 1, NULL,
&flags, &pConn->overlap, NULL);
int lastErr = WSAGetLastError();
// 如果WSARecv失败并且错误代码不是ERROR_IO_PENDING
if(nRet == SOCKET_ERROR && WSA_IO_PENDING != lastErr){
cout<<"WSARecv error "<< lastErr << endl;
return false;
}
return true;
}

// 异步的WSASend或者WSARecv已完成,获取结果
bool HandleSendRecv(Connection* pConn){
DWORD flags = 0;
DWORD bytes = 0;
// 获取异步I/O的结果
if(!WSAGetOverlappedResult(pConn->hSocket, &pConn->overlap, &bytes,
FALSE, &flags)) {
int lastErr = WSAGetLastError();
cout<<"WSAGetOverlappedResult error "<< lastErr << endl;
// 连接被对方意外关闭
if(lastErr == WSAECONNRESET)
cout<<"Connection was reset."<<endl;
return false;
}
if(bytes == 0){
// 对方正常关闭了连接
cout << "Connection closed by peer." << endl;
return false;
}
// 如果当前已完成的异步I/O是WSARecv
if(pConn->op == IoRead){
// 更新可用数据的大小
pConn->nBytes += bytes;
// 为即将调用的WSASend准备好缓冲区参数
pConn->wsaBuffer.len = pConn->nBytes;
pConn->wsaBuffer.buf = pConn->Buffer;
flags = 0;
// 由于WSARecv已成功接收了数据,现在可以发出异步WSASend请求来回显数据
pConn->op = IoWrite;
int nRet = WSASend(pConn->hSocket, &(pConn->wsaBuffer), 1, NULL,
flags, &pConn->overlap, NULL);
int lastErr = WSAGetLastError();
if(nRet == SOCKET_ERROR && WSA_IO_PENDING != lastErr) {
cout<<"WSASend error "<< lastErr << endl;
return false;
}
}
// 如果当前已完成的异步I/O是WSASend
else if(pConn->op == IoWrite){
// 更新可用数据的大小
pConn->nBytes -= bytes;
// 计算缓冲区空闲空间的大小
pConn->wsaBuffer.len = nBuffSize - pConn->nBytes;
// 如果缓冲区还有剩余数据没有发送出去,则需要把它们移到缓冲区的头部
if(pConn->nBytes > 0) {
memmove(pConn->Buffer, pConn->Buffer + bytes, pConn->nBytes);
}
// 计算缓冲区空闲空间的偏移
pConn->wsaBuffer.buf = pConn->Buffer + pConn->nBytes;
flags = 0;
pConn->op = IoRead;
// 发出异步WSARecv请求
int nRet = WSARecv(pConn->hSocket, &(pConn->wsaBuffer), 1, NULL,
&flags, &pConn->overlap, NULL);
int lastErr = WSAGetLastError();
if(nRet == SOCKET_ERROR && WSA_IO_PENDING != lastErr) {
cout<<"WSARecv error "<< lastErr << endl;
return false;
}
}
return true;
}
//创建一个WSA_FLAG_OVERLAPPED套接字
SOCKET CreateOverlappedSocket()
{
SOCKET hSocket=WSASocket(AF_INET,SOCK_STREAM,IPPROTO_TCP,NULL,0,WSA_FLAG_OVERLAPPED);
if(hSocket==INVALID_SOCKET)
{
cout<<"WSASocket 错误"<<WSAGetLastError()<<endl;
}
return hSocket;
}
//返回一个用于异步I/O的监听套接字进入监听状态
SOCKET BindListenOverlapped()
{
//创建一个用于异步I/O的监听套接字
SOCKET sd=CreateOverlappedSocket();
if(sd==INVALID_SOCKET)
{
return INVALID_SOCKET;
}
//填充本地套接字地址
sockaddr_in saListen;
saListen.sin_family=AF_INET;
saListen.sin_addr.s_addr=htonl(INADDR_ANY);
saListen.sin_port=htons(nPort);
//调用bind把本地套接字地址绑定到监听套接字
if(bind(sd,(sockaddr*)&saListen,sizeof(sockaddr_in))==SOCKET_ERROR)
{
cout<<"绑定失败"<<WSAGetLastError()<<endl;
closesocket(sd);
return INVALID_SOCKET;
}
//开始监听
if(listen(sd,5)==SOCKET_ERROR)
{
cout<<"监听失败"<<WSAGetLastError()<<endl;
closesocket(sd);
return INVALID_SOCKET;
}
return sd;
}
//调用AcceptEx时需要用到的缓冲区,这个缓冲区用来保存本地和远程地址
char bAcceptBuffer[2*(sizeof(SOCKADDR_IN)+16)];
DWORD dwAcceptBytes=0;
//发出异步AcceptEx请求
SOCKET StartAccept(SOCKET hListenSocket,HANDLE hListenEvent,LPOVERLAPPED lpOverlapListen)
{
//创建一个异步套接字hAcceptSocket,并传给AcceptEx。当异步的AcceptEx完成时
//即当WSAWaitForMultipleEvents成功返回其返回值表示出现信号的事件是
//监听套接字的事件时,在此处创建的hAcceptSocket就代表成功接受的客户连接
SOCKET hAcceptSocket=CreateOverlappedSocket();
if(hAcceptSocket==INVALID_SOCKET)
{
return INVALID_SOCKET;
}
//初始化监听套接字的WSAOVERLAPPED结构
ZeroMemory(lpOverlapListen,sizeof(WSAOVERLAPPED));
lpOverlapListen->hEvent=hListenEvent;
//发出异步AcceptEx请求
if(!AcceptEx(hListenSocket,hAcceptSocket,bAcceptBuffer,0,sizeof(SOCKADDR_IN)+16,sizeof(SOCKADDR_IN)+16,&dwAcceptBytes,lpOverlapListen))
{
//如果AcceptEx失败并且错误代码不是ERROR_IO_PENDING
int lastErr=WSAGetLastError();
if(lastErr!=ERROR_IO_PENDING)
{
cout<<"AcceptEx 错误"<<lastErr<<endl;
closesocket(hAcceptSocket);
return INVALID_SOCKET;
}
}
return hAcceptSocket;
}
// OverlappedEventServer的主体函数
void DoWork() {
// 定义事件内核对象句柄数组
HANDLE hEvents[WSA_MAXIMUM_WAIT_EVENTS];
ConnectionList conns;
// 获取一个用于异步I/O的监听套接字
SOCKET hListenSocket = BindListenOverlapped();
if(hListenSocket == INVALID_SOCKET)
goto cleanup;
// 为监听套接字创建一个事件内核对象
HANDLE hListenEvent = WSACreateEvent();
// 用于监听套接字的WSAOVERLAPPED结构
WSAOVERLAPPED overlapListen;
// 开始监听套接字的异步AcceptEx请求
SOCKET hAcceptSocket = StartAccept(hListenSocket, hListenEvent,
&overlapListen);
if(hAcceptSocket == INVALID_SOCKET)
goto cleanup;
// 主循环
while(true){
// 从客户连接列表中去掉无效的连接,即那些已关闭或者发生了错误的连接
ResetConns(conns);
// 用监听套接字的事件和所有有效客户连接的事件填充一个事件数组
int nEvents = FillEventArray(hEvents, hListenEvent, conns);
// 等待任一(或一些)事件出现信号
int nRet = WSAWaitForMultipleEvents(nEvents, hEvents, FALSE,
WSA_INFINITE, FALSE);
if(nRet == WSA_WAIT_FAILED){
cout<<"WSAWaitForMultipleEvents "<< WSAGetLastError() << endl;
goto cleanup;
}
// 获取所有出现信号的事件中最小的索引值
nRet = nRet - WSA_WAIT_EVENT_0;
// 检查每一个可能的事件,看其有没有信号
for(int nIndex = nRet; nIndex < nEvents; ++nIndex) {
// 测试索引值为nIndex的事件是否出现信号
nRet = WSAWaitForMultipleEvents(1, &hEvents[nIndex], true, 0,
FALSE);
// 没有信号则继续主循环
if(nRet == WSA_WAIT_FAILED || nRet == WSA_WAIT_TIMEOUT)
continue;
// 重置出现信号的事件,以便下一次进入主循环等待时其状态为无信号
WSAResetEvent(hEvents[nIndex]);
// nIndex为0代表监听套接字的事件出现信号
if(nIndex == 0){
// 监听套接字的异步AcceptEx已经完成,新的客户连接套接字是
// hAcceptSocket。调用HandleAccept来执行异步I/O完成后的工作
if(!HandleAccept(hListenSocket, hAcceptSocket,
&overlapListen, conns))
goto cleanup;
// 开始监听套接字的下一个异步AcceptEx请求
hAcceptSocket = StartAccept(hListenSocket, hListenEvent,
&overlapListen);
if(hAcceptSocket == INVALID_SOCKET)
goto cleanup;
}
// nIndex大于0代表客户连接的套接字事件出现信号
else{
// 找到客户连接的Connection对象
Connection* pConn = conns[nIndex-1];
// 调用HandleSendRecv来执行异步I/O完成后的工作
if(!HandleSendRecv(pConn)){
closesocket(pConn->hSocket);
pConn->hSocket = INVALID_SOCKET;
WSACloseEvent(pConn->overlap.hEvent);
}
}
}
}
// 释放资源
cleanup:
ConnectionList::iterator it = conns.begin();
for(;it != conns.end();++it){
closesocket((*it)->hSocket);
WSACloseEvent((*it)->overlap.hEvent);
delete (*it);
}
if(hListenSocket != INVALID_SOCKET)
closesocket(hListenSocket);
WSACloseEvent(hListenEvent);
}
int main(int argc, char* argv[]){
WSAData wsaData;
int nCode;
if ((nCode = WSAStartup(MAKEWORD(2, 2), &wsaData)) != 0) {
cout << "WSAStartup error " << nCode << endl;
return -1;
}
DoWork();
WSACleanup();
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息