您的位置:首页 > 其它

重叠IO与IOCP

2013-09-04 15:55 405 查看
(一) IO模型

I/O设备处理方式一般有两种 同步和异步

同步阻塞:这种方式就一直读写IO直道操作完成或者失败。

异步IO:使用overlapped I/O。overlapped I/O是WIN32的一项技术,你可以要求操作系统为你传送数据,并且在传送完毕时通知你。

(二)使用overlapped I/O: 

先来看看OVERLAPPED 结构体有两种定义:

typedef struct _OVERLAPPED
{
  DWORD Internal;
  DWORD InternalHigh;
  DWORD Offset;
  DWORD OffsetHigh;
  HANDLE hEvent;
}OVERLAPPED

typedef struct _OVERLAPPED
{
ULONG_PTR Internal; //操作系统保留,指出一个和系统相关的状态
ULONG_PTR InternalHigh; //指出发送或接收的数据长度
union
{
struct
{
DWORD Offset; //文件传送的字节偏移量的低位字
DWORD OffsetHigh; //文件传送的字节偏移量的高位字
};
PVOID Pointer; //指针,指向文件传送位置
};
HANDLE hEvent; //指定一个I/O操作完成后触发的事件
} OVERLAPPED, *LPOVERLAPPED;


  1> 进行I/O操作时,指定overlapped方式使用CreateFile (),将其第6个参数指定为FILE_FLAG_OVERLAPPED,就是准备使用overlapped的方式构造或打开文件;

  2> 如果采用 overlapped,那么ReadFile()、WriteFile()的第5个参数必须提供一个指针,指向一个OVERLAPPED结构。 OVERLAPPED用于记录了当前正在操作的文件一些相关信息。 详细的看下面代码例子:  

int main()
{
BOOL rc;
HANDLE hFile;
DWORD numread;
OVERLAPPED overlap;
char buf[512];
char szPath=”c:\\xxxx\xxxx”;
hFile = CreateFile( szPath,
GENERIC_READ,
FILE_SHARE_READ|FILE_SHARE_WRITE,
NULL,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED, // 以overlapped打开文件
NULL
);

// OVERLAPPED结构实始化为0
memset(&overlap, 0, sizeof(overlap));

//指定文件位置是1500;
overlap.Offset = 1500;
rc = ReadFile(hFile,buf,300,&numread,&overlap);
//因为是overlapped操作,ReadFile会将读文件请求放入读队列之后立即返回(false),而不会等到文件读完才返回(true)
if (rc)
{

//…………此处即得到数据了。
//文件真是被读完了,rc为true
// 或当数据被放入cache中,或操作系统认为它可以很快速地取得数据,rc为true
}
else
{
if (GetLastError() == ERROR_IO_PENDING)
{//当错误是ERROR_IO_PENDING,那意味着读文件的操作还在进行中
//等候,直到文件读完
WaitForSingleObject(hFile, INFINITE);
rc = GetOverlappedResult(hFile,&overlap,&numread,FALSE);
//上面二条语句完成的功能与下面一条语句的功能等价:一只阻塞等到得到数据才继续下面。
// GetOverlappedResult(hFile,&overlap,&numread,TRUE);
}
else
{
//出错了
}
}
CloseHandle(hFile);
return EXIT_SUCCESS;
}


注意:

  如果多个操作同时访问同一 操作i/o,这时我们可以使用OVERLAPPED结构体中的event字段。这个envent事件必须是手动方式。在转移完成时处理一个事件设置为有信号状态,调用进程集这个成员在调用ReadFile、 WriteFile、TransactNamedPipe、 ConnectNamedPipe函数之后事件会设置成武信号状态,当完成I/O操作之后信号有设置为由信号状态。

(三) socket IOCP模型

  socket通信也是一种I/O操作,既然文件I0有异步模型,那么socket也有异步模型,sokcet的异步操作模型由很多种,我们这里重点说明IOCP也叫完成端口。

使用普通socket通信和iocp通信流程图如下, 此图来自于博客/article/1644992.html

  


//filename Iocp.h
#pragma once
#include <WinSock2.h>
#include <stdio.h>
#include <process.h>

#define  IP_SIZE  32        //ip地址长度
#define  BUFFER_SIZE 1024

enum SOCKET_STATE
{
ACCEPT = 1,
SEND,
RECV
};
/*传送给处理函数的参数*/
typedef struct tagPleData
{
SOCKET sSocket;
CHAR szClientIP[IP_SIZE];
UINT  uiClientPort;
/*
其他信息
*/
}PLEDATA, * LPPLEDATA;

typedef struct tagIOData
{
OVERLAPPED oOverlapped;
WSABUF wsBuffer;
CHAR szBuffer[BUFFER_SIZE];
DWORD dSend;
DWORD dRecv;
SOCKET_STATE sState;
}IOData, *LPIOData;

typedef void (*ReadProc)(LPPLEDATA lpData,  CHAR * RecvData);

class Iocp
{
public:
Iocp(const CHAR * host, UINT port);
~Iocp(void);

static VOID ServerWorkThread( VOID * _this );   //监听完成端口线程
VOID SetReadProc(VOID * lprFun);                //设置读取回掉函数
bool ListenEx(UINT backlog);
static VOID AcceptEx(VOID  * _this);

public:

ReadProc lpFun;             //读取回调函数
HANDLE h_ComPlePort;        //完成端口句柄

bool bIsListen;                 //是否是服务端socket
SOCKADDR_IN m_SockAddr;         //socket地址
SOCKET  m_ListenSocketID;       //socket
CHAR m_Host[IP_SIZE];           //连接socketIp
UINT m_Port;                    //连接IP
};


//filenameIocp.cpp

#include "Iocp.h"

Iocp::Iocp(const CHAR * host, UINT port)
{
/*协商套接字版本*/
WSADATA wsaData;
DWORD dwRet = WSAStartup( 0x0202, &wsaData );
if (0 != dwRet )
{
WSACleanup();
throw 1;
}

m_ListenSocketID = INVALID_SOCKET ;
memset( &m_SockAddr, 0, sizeof(SOCKADDR_IN) ) ;
memset( m_Host, 0, IP_SIZE ) ;
m_Port = 0 ;
SYSTEM_INFO mySysInfo;
GetSystemInfo( &mySysInfo );
iThreadNums = mySysInfo.dwNumberOfProcessors * 2 + 1;

BOOL ret = FALSE ;
bIsListen = TRUE;
strncpy_s(m_Host,  host, IP_SIZE - 1);
m_SockAddr.sin_family = AF_INET;
m_SockAddr.sin_addr.s_addr =inet_addr(host);
m_SockAddr.sin_port = htons(port);

/*创建监听套接字*/

m_ListenSocketID = WSASocket( AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED );

if( m_ListenSocketID== INVALID_SOCKET )
{
throw 1;
}

/*设置套接字选项*/
CHAR opt = 1;
ret = setsockopt( m_ListenSocketID , SOL_SOCKET , SO_REUSEADDR , (const CHAR * )&opt , sizeof(opt) );
if( ret != 0 )
{
throw 1 ;
}

/*绑定套接字*/
if (SOCKET_ERROR == bind(m_ListenSocketID, (const struct sockaddr *)&m_SockAddr, sizeof(struct sockaddr)))
{
throw 1 ;
}

/*创建完成端口*/
h_ComPlePort  = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0 );
if ( h_ComPlePort == NULL )
{
throw 1 ;
}
for ( DWORD i = 0; i < ( mySysInfo.dwNumberOfProcessors * 2 + 1 ); ++i )
{
_beginthread(Iocp::ServerWorkThread,  0,  (VOID *)this);
}

}

Iocp::~Iocp(void)
{
WSACleanup();
}

/*************************************************
Function:AcceptEx
Description:接受套接字的线程函数
Input:
Output:
Others:
*************************************************/

VOID Iocp::AcceptEx(VOID  * _this)
{
SOCKET acSocket;
DWORD dwRecvBytes;
Iocp * pTemp = (Iocp *)_this;
SOCKADDR_IN sAddr;
INT uiClientSize = sizeof(sAddr);
//struct socketaddrin
while (TRUE)
{
int x = 6;
acSocket = WSAAccept( pTemp->m_ListenSocketID, (SOCKADDR *)&sAddr, &uiClientSize, NULL, 0 );
if ( acSocket == SOCKET_ERROR )
{
return;
}

LPPLEDATA lpSocketData = (LPPLEDATA)malloc(sizeof(PLEDATA));
if ( NULL == lpSocketData )
{
return;
}

lpSocketData->sSocket = acSocket;
sprintf(lpSocketData->szClientIP, inet_ntoa(sAddr.sin_addr));
lpSocketData->uiClientPort = sAddr.sin_port;

//将连接socket 加入完成端口中
if ( CreateIoCompletionPort( (HANDLE)acSocket, pTemp->h_ComPlePort, (ULONG_PTR)lpSocketData, 0 ) == NULL )
{
return;
}

/*这里停止监听会有问题*/

if (pTemp->bIsListen = FALSE)
{
break;
}
LPIOData lpIoData = (LPIOData )malloc(sizeof(IOData));
if ( lpIoData == NULL )
{
return;
}

#pragma region 投递线程事件

ZeroMemory( &( lpIoData->oOverlapped ), sizeof( lpIoData->oOverlapped) );
lpIoData->dSend = 0;
lpIoData->dRecv = 0;
lpIoData->wsBuffer.len = BUFFER_SIZE;
lpIoData->wsBuffer.buf = lpIoData->szBuffer;
lpIoData->sState = SEND;

DWORD flags = 0;
if ( WSARecv(acSocket, &(lpIoData->wsBuffer), 1, &dwRecvBytes, &flags, &(lpIoData->oOverlapped), NULL ) == SOCKET_ERROR )
{
if ( WSAGetLastError() != ERROR_IO_PENDING )
{
return;
}
else
{
//return;
printf("ERROR_IO_PENDING:ok\n");
}
}
#pragma endregion 投递线程事件
}
}

/*************************************************
Function:ListenEx
Description:监听函数
Input:
Output:
Others:
*************************************************/

BOOL Iocp::ListenEx(UINT backlog)
{
if (SOCKET_ERROR == listen(m_ListenSocketID, backlog))
{
return FALSE;
}
/*创建监听线程*/
if (-1 == _beginthread(Iocp::AcceptEx, 0, (VOID *)this))
{
return FALSE;
}
return TRUE;
}

/*************************************************
Function:ServerWorkThread
Description:端口上的工作线程
Input:
Output:
Others:
*************************************************/

VOID Iocp:: ServerWorkThread( VOID * _this )
{
Iocp * lpTemp = (Iocp *)_this;
HANDLE hPlePort  = (HANDLE)lpTemp->h_ComPlePort;
DWORD dwBytes;
LPPLEDATA lpPleData = NULL;
LPIOData lpIoData = NULL;
DWORD sendBytes = 0;
DWORD recvBytes = 0;
DWORD dwFlag = 0;
while (TRUE)
{
int x = 89;
if ( GetQueuedCompletionStatus( hPlePort, &dwBytes, (PULONG_PTR)&lpPleData, (LPOVERLAPPED *)&lpIoData, INFINITE ) == 0 )
{
return ;
}
if ( dwBytes == 0 || NULL == lpIoData)
{
printf("there is a socket away\n");
free( lpPleData );
free( lpIoData );
continue;
}
else
{

#pragma region 接受到数据

lpIoData->dRecv = dwBytes;
lpIoData->szBuffer[lpIoData->dRecv] = 0;
//printf("ServerWorkThread:R[%s]\n", lpIoData->szBuffer);
lpTemp->lpFun(lpPleData, lpIoData->szBuffer);

#pragma endregion 接受到数据

#pragma region 再次投递
lpIoData->dRecv = 0;
ZeroMemory( &(lpIoData->oOverlapped), sizeof( OVERLAPPED ) );
lpIoData->wsBuffer.len = BUFFER_SIZE;
lpIoData->wsBuffer.buf = lpIoData->szBuffer;

if ( WSARecv( lpPleData->sSocket, &(lpIoData->wsBuffer), 1, &recvBytes, &dwFlag, &(lpIoData->oOverlapped), NULL ) == SOCKET_ERROR )
{
if ( WSAGetLastError() != ERROR_IO_PENDING )
{
return ;
}
}
#pragma endregion 再次投递
}
}
}

VOID Iocp::SetReadProc(VOID * lprFun)
{
lpFun  = (ReadProc)lprFun;
}


//finename main.cpp
#include <iostream>
#include "Iocp.h"

using namespace std;

#pragma comment( lib, "Ws2_32.lib" )
//客户端的发送的数据会在这个函数通知
void OnRead(LPPLEDATA lpData, CHAR * lpRecvData)
{
SOCKET sSock = lpData->sSocket;
printf("socket:IP[%s:%d] send data[%s]\n",lpData->szClientIP, lpData->uiClientPort, lpRecvData);
}

void main()
{

Iocp server("127.0.0.1",  20000);
server.SetReadProc((VOID *)OnRead);
server.ListenEx(10);
}


  

    
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: