您的位置:首页 > 理论基础 > 计算机网络

网络编程之 Socket的模式(四) --- “Window网络I/O模型” 续

2014-03-03 23:06 507 查看

1. Windows下的异步I/O

在接下来讨论"Overlapped I/O 事件通知模型"、"Overlapped I/O 完成例程模型"、"IOCP模型"之前,先来看一下Windows的Overlapped I/O,它实际上对应于"网络编程之
Socket的模式(二) --- Linux网络I/O模型"里的异步I/O(asynchronous I/O或者nonblocking I/O)。通过使用overlapped I/O,开发者可以要求操作系统主动传送数据,并且在传送完毕时通知开发者。这使得开发者的程序在I/O进行过程中,仍然可以处理其他事务。事实上,操作系统内部正是以线程来完成overlapped I/O的。
关于这一点,插上两句。我们自己在写程序的时候,一般很少会把需要循环读写的I/O操作放在主线程中,通常情况下会对此类I/O操作单独开线程处理,并通过消息队列来进行设计上的解耦,当然这需要一点点设计的技巧。如果需要操作的I/O数目很多,程序结构上设计不好的话,这通常会成为开发者的一个负担。而这正是Window的Overlapped I/O想解决的问题,在Windows的Overlapped
I/O机制中,I/O的操作线程被封装在了操作系统内核中,API只暴露出有限的接口,Overlapped I/O的API并不需要用户对线程进行管理,用户对内部实现完全不可见。当然这对于想要盘根问底的程序员来说,并不是什么好事。何况在Window的异步I/O模型中,系统内部对I/O的处理在性能上有很大的优化。

让我们先来看一下,异步I/O中关键数据结构OVERLAPPED。其定义如下:

[cpp] view
plaincopy

typedef struct _OVERLAPPED {

DWORD Internal; // 通常被保留,当GetOverlappedResult()传回False并且GatLastError()并非传回ERROR_IO_PENDINO时,该状态置为系统定的状态。

DWORD InternalHigh; // 通常被保留,当GetOverlappedResult()传回False时,为被传输数据的长度。

DWORD Offset; // 指定文件的位置,从该位置传送数据,文件位置是相对文件开始处的字节偏移量。调用 ReadFile或WriteFile函数之前调用进

// 程设置这个成员,读写命名管道及通信设备时调用进程忽略这个成员;

DWORD OffsetHigh; // 64位的文件偏移位置中,较高的32位,读写命名管道及通信设备时调用进程忽略这个成员(因为流式的I/O不支持文件位置);

HANDLE hEvent; // 一个手动重置的event事件,当overlapped I/O完成时被激发。ReadFileEx()和WriteFileEx()会忽略这个栏位,此时这个地方

// 可能被传递一个用户自定义的指针

} OVERLAPPED, *LPOVERLAPPED;

Windows的Overlapped I/O只是一种模型,它可以由内核对象(handle),事件内核对象(hEvent), 异步过程调用(apcs) 和完成端口(I/O completion)来实现。所有的I/O设备都可以使用这套机制,包括文件、管道、Socket、串口等。

OVERLAPPED定义了windows上异步I/O所需要的数据结构。在网络编程之
Socket的模式(一) --- “阻塞/非阻塞” 与 “同步/异步”中,我们也讨论了模块之间获取异步结果的方式,有两种:

第一,调用方在异步操作后,不断向被调用方轮询调用结果。

第二,调用方在异步操作后,被调用方主动通知调用方结果。

Windows也异步操作也为上述两种方式提供了相应的接口。

1. 主动查询异步调用结果

[cpp] view
plaincopy

BOOL GetOverlappedResult(

HANDLE hFile, // 串口的句柄

LPOVERLAPPED lpOverlapped, // 指向重叠操作开始时指定的OVERLAPPED结构

LPDWORD lpNumberOfBytesTransferred, // 指向一个32位变量,该变量的值返回实际读写操作传输的字节数。

BOOL bWait // 该参数用于指定函数是否一直等到重叠操作结束。

// 如果该参数为TRUE,函数直到操作结束才返回。

// 如果该参数为FALSE,函数直接返回。

);

返回值:

如果overlapped操作成功,此函数返回TRUE,失败则返回FALSE.GetLastError()函数可以获得更详细的失败信息。如果bWait为FALSE而overlapped还是没有完成,GetLastError()函数会返回ERROR_IO_INCOMPLETE。

2. 被调用方给出通知

下面两个函数都很熟悉了,不再介绍。

[cpp] view
plaincopy

DWORD WINAPI WaitForSingleObject(

HANDLE hHandle,

DWORD dwMilliseconds

);

DWORD WINAPI WaitForMultipleObject(

DWORD dwCount,

CONST HANDLE* phObject,

BOOL fWaitAll,

DWORD dwMillisecinds);

如果被调用方向调用方给出通知,调用方应该向被调方设置回调函数,或者至少留下一个关联信息,在这里就是核心对象的句柄。而OVERLAPPED结构体把需要操作I/O同核心对象相绑定,如此一来通过上述函数就可以对I/O实现异步操作了。

2. Overlapped I/O 事件通知模型

2.1 函数

回到Socket模式上,Socket同样可以被看成为一个I/O设备。为了Socket编程方便,Windows定义了一个同OVERLAPPED相似的数据结构WSAOVERLAPPED,基本上换汤不换药。定义如下:

[cpp] view
plaincopy

typedef struct _WSAOVERLAPPED {

DWORD Internal;

DWORD InternalHigh;

DWORD Offset;

DWORD OffsetHigh;

WSAEVENT hEvent; // 唯一需要关注的参数,用来关联WSAEvent对象

} WSAOVERLAPPED, *LPWSAOVERLAPPED;

相似的两个函数还包括:

[cpp] view
plaincopy

BOOL WSAGetOverlappedResult(

SOCKET s, // SOCKET

LPWSAOVERLAPPED lpOverlapped, // 这里是想要查询结果的那个重叠结构的指针

LPDWORD lpcbTransfer, // 本次重叠操作的实际接收(或发送)的字节数

BOOL fWait, // 设置为TRUE,除非重叠操作完成,否则函数不会返回

// 设置FALSE,而且操作仍处于挂起状态,那么函数就会返回FALSE,错误为WSA_IO_INCOMPLETE

LPDWORD lpdwFlags // 指向DWORD的指针,负责接收结果标志

);

[cpp] view
plaincopy

DWORD WSAWaitForMultipleEvents(

DWORD cEvents, // 等候事件的总数量

const WSAEVENT* lphEvents, // 事件数组的指针

BOOL fWaitAll, // 这个要多说两句: 如果设置为 TRUE,则事件数组中所有事件被传信的时候函数才会返回,

// FALSE则任何一个事件被传信函数都要返回,我们这里肯定是要设置为FALSE的

DWORD dwTimeout, // 超时时间,如果超时,函数会返回WSA_WAIT_TIMEOUT,如果设置为0,函数会立即返回,

// 如果设置为 WSA_INFINITE只有在某一个事件被传信后才会返回,在这里不建议设置为WSA_INFINITE

BOOL fAlertable // 在完成例程中会用到这个参数,这里我们先设置为FALSE

);

返回值:

WSA_WAIT_TIMEOUT: 最常见的返回值,需要做的就是继续Wait

WSA_WAIT_FAILED: 出现了错误,请检查cEvents和lphEvents两个参数是否有效

如果事件数组中有某一个事件被传信了,函数会返回这个事件的索引值,但是这个索引值需要减去预定义值 WSA_WAIT_EVENT_0才是这个事件在事件数组中的位置。

同Socket相关,特有的函数包括:

[cpp] view
plaincopy

int WSARecv(  

SOCKET s, // 当然是投递这个操作的套接字  

LPWSABUF lpBuffers, // 接收缓冲区,与Recv函数不同  

// 这里需要一个由WSABUF结构构成的数组  

DWORD dwBufferCount, // 数组中WSABUF结构的数量  

LPDWORD lpNumberOfBytesRecvd, // 如果接收操作立即完成,这里会返回函数调用所接收到的字节数  

LPDWORD lpFlags, //一个指向标志位的指针。 

LPWSAOVERLAPPED lpOverlapped, // “绑定”的重叠结构  

LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine // 完成例程中将会用到的参数,我们这里设置为 NULL

);

返回值:

若无错误发生且接收操作立即完成,则WSARecv()函数返回0。否则的话,将返回SOCKET_ERROR错误,应用程序可通过WSAGetLastError()来获取相应的错误代码。错误代码WSA_IO_PENDING表示重叠操作成功启动,但是I/O操作还没有完成,所以我们就需要绑定一个事件来通知我们操作何时完成。任何其他的错误表示重叠操作未能成功地启动,以后也不会有完成指示。

[cpp] view
plaincopy

int WSASend (

SOCKET s, // s:标识一个已连接套接口的描述字。

LPWSABUF lpBuffers, // 一个指向WSABUF结构数组的指针。每个WSABUF结构包含缓冲区的指针和缓冲区的大小。

DWORD dwBufferCount, // lpBuffers数组中WSABUF结构的数目。

LPDWORD lpNumberOfBytesSent, // 如果发送操作立即完成,则为一个指向所发送数据字节数的指针。

DWORD dwFlags, // 标志位。

LPWSAOVERLAPPED lpOverlapped, // 指向WSAOVERLAPPED结构的指针(对于非重叠套接口则忽略)。

LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine // 一个指向发送操作完成后调用的完成例程的指针。(对于非重叠套接口则忽略)。

);

返回值:

若无错误发生且发送操作立即完成,则WSASend()函数返回0。这时,完成例程(Completion Routine)应该已经被调度,一旦调用线程处于alertable状态时就会调用它。否则,返回SOCKET_ERROR 。通过WSAGetLastError获得详细的错误代码。WSA_IO_PENDING 这个错误码(其实表示没有错误)表示重叠操作已经提交成功(就是异步IO的意思了),稍后会提示完成(这个完成可不一定是发送成功,没准出问题也不一定)。其他的错误代码都代表重叠操作没有正确开始,也不会有完成标志出现。

2.2 概述

Overlapped I/O 事件通知模型是个异步模型,和"网络编程之
Socket的模式(二) --- Linux网络I/O模型"中的异步模型类似,操作系统负责数据缓冲区数据的拷贝。而WSAAsyncSelect以及WSAEventSelect模型中,数据的拷贝则是在应用层。

下面是一个例子:

[cpp] view
plaincopy

#include <winsock2.h>

#include <stdio.h>

#pragma comment(lib,"ws2_32.lib")

#define DATA_BUF_LEN 1024 // 接收缓冲区大小

SOCKET SLisent;

SOCKET SWorker[DATA_BUF_LEN] = {0};

WSABUF RecvBuffer[DATA_BUF_LEN];

WSAOVERLAPPED Overlapped[DATA_BUF_LEN]; // 重叠结构

WSAEVENT EventArray[WSA_MAXIMUM_WAIT_EVENTS]; // 用来通知重叠操作完成的事件句柄数组

DWORD dwRecvBytes = 0, // 接收到的字符长度

DWORD Flags = 0; // WSARecv的参数

DWORD volatile dwEventTotal = 0; // 程序中事件的总数

//由于EVENT数量限制,目前最多只能支持64个连接

DWORD WINAPI AcceptThread(LPVOID lpParameter)

{

WSADATA wsaData;

WSAStartup(MAKEWORD(2,2),&wsaData);

// 使用Overlapped I/O模型必须设置WSA_FLAG_OVERLAPPED参数

SLisent = WSASocket(AF_INET,SOCK_STREAM,IPPROTO_TCP,NULL,NULL,WSA_FLAG_OVERLAPPED);

// 创建Socket

SOCKADDR_IN ServerAddr;

ServerAddr.sin_family = AF_INET;

ServerAddr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);

ServerAddr.sin_port = htons(1234);

// 监听

bind(SLisent,(LPSOCKADDR)&ServerAddr,sizeof(ServerAddr));

listen(SLisent,100);

int i = 0;

SOCKADDR_IN ClientAddr;

int addr_length=sizeof(ClientAddr);

while (TRUE)

{

while((SWorker[i] == 0) && (SWorker[i] = accept(SLisent,(SOCKADDR*)&ClientAddr, &addr_length)) != INVALID_SOCKET)

{

printf("accept %d ip:%s port:%dn",i+1,inet_ntoa(ClientAddr.sin_addr),ClientAddr.sin_port);

// 创建触发事件

EventArray[i] = WSACreateEvent();

dwEventTotal++;

memset(&Overlapped[i],0,sizeof(WSAOVERLAPPED));

// 绑定overlapped i/o和事件

Overlapped[i].hEvent = EventArray[i];

char * buffer = new char[DATA_BUF_LEN];

memset(buffer,0,DATA_BUF_LEN);

RecvBuffer[i].buf = buffer;

RecvBuffer[i].len = DATA_BUF_LEN;

if(WSARecv(SWorker[i], &RecvBuffer[i], dwEventTotal, &dwRecvBytes, &Flags, &Overlapped[i], NULL) == SOCKET_ERROR)

{

int err = WSAGetLastError();

if(WSAGetLastError() != WSA_IO_PENDING)

{

printf("disconnect: %dn",i+1);

closesocket(SWorker[i]);

SWorker[i] = 0;

WSACloseEvent(EventArray[i]); // 关闭事件

RecvBuffer[i].buf = NULL;

RecvBuffer[i].len = NULL;

continue;

}

}

i = (i+1)%WSA_MAXIMUM_WAIT_EVENTS;

}

}

return FALSE;

}

DWORD WINAPI ReceiveThread(LPVOID lpParameter)

{

DWORD dwIndex = 0;

while (true)

{

dwIndex = WSAWaitForMultipleEvents(dwEventTotal, EventArray, FALSE, 1000, FALSE);

if (dwIndex == WSA_WAIT_FAILED || dwIndex == WSA_WAIT_TIMEOUT)

continue;

dwIndex = dwIndex - WSA_WAIT_EVENT_0;

WSAResetEvent(EventArray[dwIndex]);

DWORD dwBytesTransferred;

WSAGetOverlappedResult( SWorker[dwIndex], &Overlapped[dwIndex], &dwBytesTransferred, FALSE, &Flags);

if(dwBytesTransferred == 0)

{

printf("disconnect: %dn",dwIndex+1);

closesocket(SWorker[dwIndex]);

SWorker[dwIndex] = 0;

WSACloseEvent(EventArray[dwIndex]); // 关闭事件

RecvBuffer[dwIndex].buf = NULL;

RecvBuffer[dwIndex].len = NULL;

continue;

}

//使用数据

printf("%sn",RecvBuffer[dwIndex].buf);

memset(RecvBuffer[dwIndex].buf,0,DATA_BUF_LEN);

if(WSARecv(SWorker[dwInde x], &RecvBuffer[dwIndex], dwEventTotal, &dwRecvBytes, &Flags, &Overlapped[dwIndex], NULL) == SOCKET_ERROR)

{

if(WSAGetLastError() != WSA_IO_PENDING)

{

printf("disconnect: %dn",dwIndex+1);

closesocket(SWorker[dwIndex]);

SWorker[dwIndex] = 0;

WSACloseEvent(EventArray[dwIndex]); // 关闭事件

RecvBuffer[dwIndex].buf = NULL;

RecvBuffer[dwIndex].len = NULL;

continue;

}

}

}

return FALSE;

}

void main()

{

HANDLE hThreads[2];

hThreads[0] = CreateThread(NULL, 0, AcceptThread, NULL, NULL, NULL);

hThreads[1] = CreateThread(NULL, 0, ReceiveThread, NULL, NULL, NULL);

WaitForMultipleObjects(2,hThreads,TRUE,INFINITE);

printf("exitn");

CloseHandle(hThreads[0]);

CloseHandle(hThreads[1]);

}

Overlapped I/O 事件通知模型的缺点是,使用了WSAWaitForMultipleEvents函数,而该函数最多只能同时等待64个消息。所以如果想要支持更多的连接,就必须自己开辟线程去管理.该模型的所接入连接数同线程数成线性关系。在线程过多情况下,线程上下文之间的切换,将会影响程序的性能,

3. Overlapped I/O 完成例程模型

完成例程模型相比与事件通知模型有个很大的优点就是不再受64个消息的限制,一个线程可以同时管理成百上千个socket连接,且保持较高的性能,使用上则类似。而效率上大幅提升的原因在于在完成例程模型中使用了回调函数。在Overlapped I/O 事件通知模型中WSAWaitForMultipleEvents,用来捕获缓冲区满时的信号,当收到信号后,开发者使用接受到的数据进行相关的业务操作。而在完成例程模型中,用户处理相关业务的函数被当作回调函数,直接设置进入WSARecv中,在接受数据后操作系统会自动调用用户的回调函数完成工作。

在这种模式下,本来接受数据,并对数据进行分析的工作,在Overlapped I/O 事件通知模型中需要用户线程参与的事情(操作系统系统内部,接受完数据,通知用户线程,用户线程接管数据并处理),只需在操作系统内部处理即可(回调函数已经告知操作系统该如何处理数据)。这实际上是一个异步过程调用(Asynchronous Procedure Calls, APCs)的用例。

下面是一个例子:

[cpp] view
plaincopy

#include <winsock2.h>

#include <stdio.h>

#pragma comment(lib,"ws2_32.lib")

#define DATA_BUF_LEN 1024 // 接收缓冲区大小

#define MAXSESSION 10000 // 最大连接数

typedef struct _SOCKET_INFORMATION {

OVERLAPPED Overlapped;

SOCKET Socket;

WSABUF DataBuf;

DWORD BytesSEND;

DWORD BytesRECV;

} SOCKET_INFORMATION, * LPSOCKET_INFORMATION;

SOCKET ListenSocket = INVALID_SOCKET;

DWORD Flags = 0;

void CALLBACK WorkerRoutine(DWORD Error, DWORD BytesTransferred,LPWSAOVERLAPPED Overlapped, DWORD InFlags);

DWORD WINAPI AcceptThread(LPVOID lpParameter)

{

WSADATA wsaData;

WSAStartup(MAKEWORD(2,2),&wsaData);

// 创建Socket,设置异步I/O标志WSA_FLAG_OVERLAPPED

ListenSocket = WSASocket(AF_INET,SOCK_STREAM,IPPROTO_TCP,NULL,NULL,WSA_FLAG_OVERLAPPED);

SOCKADDR_IN ServerAddr;

ServerAddr.sin_family = AF_INET;

ServerAddr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);

ServerAddr.sin_port = htons(1234);

// 绑定

bind(ListenSocket,(LPSOCKADDR)&ServerAddr,sizeof(ServerAddr));

// 监听

listen(ListenSocket,MAXSESSION);

printf("listenning.../n");

SOCKADDR_IN ClientAddr;

int addr_length=sizeof(ClientAddr);

while (TRUE)

{

LPSOCKET_INFORMATION SI = new SOCKET_INFORMATION;

// 接受新连接

if ((SI->Socket = accept(ListenSocket,(SOCKADDR*)&ClientAddr, &addr_length)) != INVALID_SOCKET)

{

printf("accept ip:%s port:%d/n",inet_ntoa(ClientAddr.sin_addr),ClientAddr.sin_port);

memset(&SI->Overlapped,0,sizeof(WSAOVERLAPPED));

SI->DataBuf.buf = new char[DATA_BUF_LEN];

SI->DataBuf.len = DATA_BUF_LEN;

memset(SI->DataBuf.buf,0,DATA_BUF_LEN);

// 设置回调函数

if(WSARecv(SI->Socket, &SI->DataBuf, 1, &SI->BytesRECV, &Flags, &SI->Overlapped, WorkerRoutine) == SOCKET_ERROR)

{

int err = WSAGetLastError();

if(WSAGetLastError() != WSA_IO_PENDING)

{

printf("disconnect/n");

closesocket(SI->Socket);

delete [] SI->DataBuf.buf;

delete SI;

continue;

}

}

}

}

return FALSE;

}

void CALLBACK WorkerRoutine(DWORD Error, DWORD BytesTransferred, LPWSAOVERLAPPED Overlapped, DWORD InFlags)

{

LPSOCKET_INFORMATION SI = (LPSOCKET_INFORMATION)Overlapped;

if (Error != 0 || BytesTransferred == 0)

{

printf("disconnect/n");

closesocket(SI->Socket);

delete [] SI->DataBuf.buf;

delete SI;

return;

}

// 使用数据

printf("call back:%s/n",SI->DataBuf.buf);

memset(SI->DataBuf.buf,0,DATA_BUF_LEN);

// 设置回调函数

if(WSARecv(SI->Socket, &SI->DataBuf, 1, &SI->BytesRECV, &Flags, &SI->Overlapped, WorkerRoutine) == SOCKET_ERROR)

{

int err = WSAGetLastError();

if(WSAGetLastError() != WSA_IO_PENDING)

{

printf("disconnect/n");

closesocket(SI->Socket);

delete [] SI->DataBuf.buf;

delete SI;

return;

}

}

}

void main()

{

HANDLE hThreads = CreateThread(NULL, 0, AcceptThread, NULL, NULL, NULL);

WaitForSingleObject(hThreads,INFINITE);

printf("exit/n");

CloseHandle(hThreads);

}

4. IOCP模型

IOCP(I/O Completion Port,I/O完成端口)是Window下性能最好的一种I/O模型。同Overlapped I/O 完成例程模型相比,IOCP模型具有更好的伸缩性,也就是说当CPU或内存增加时,性能能够成线性的同步增加。

让我们来看一下"Overlapped I/O 完成例程模型"(APCs调用方式)的特点,可以发现只有发出"overlapped请求"的线程才能够提供callback函数,而IOCP模型则不需要。使用IOCP模型具有以下优点:

1. I/O Completion Port允许一个线程将一个请求暂时保存下来,而由另外一个线程为它做实际服务。

2. I/O Completion Port支持scalable架构。

Overlapped I/O 完成例程模型可以被这样描述:它是用来管理一堆线程如何为Completion overlapped I/O request服务的机制,在这种机制下,一个或多个CPU将尽可能的保持忙碌,但也不会被太多的线程淹没。

IOCP模型内部实现复杂,但使用却很简单。步骤如下:

1. 产生一个I/O completion port。

2. 让它和一个I/O句柄产生关联

3. 产生一堆线程

4. 让每一个线程都在completion port上等待。

5. 开始对着I/O句柄发出一些overlapped I/O请求。

涉及到的函数如下:

创建一个Completion Port核心对象 或 关联I/O与Completion Port核心对象

[cpp] view
plaincopy

HANDLE CreateIoCompletionPort(

HANDLE FileHandle, // 文件或设备(device)的handle。在Windows NT3.51之后,此栏位可设定为INVALID_HANDLE_VALUE,于是产生一个没有和

// 任何文件handle有关系的port

HANDLE ExistingCompletionPort, // 如果此栏位被指定,那么上一栏位FileHandle就会被加到此port之上,而不会产生新的port。指定NULL可以产生一个新

// 的port

ULONG_PTR CompletionKey, // 用户自定义的一个数值,将被交给提供服务的线程。此值和FileHandle有关联。

DWORD NumberOfConcurrentThreads // 与此I/O completion port有关联的线程个数。

);

返回值:

如果函数成功,则传回一个I/O completion port的handle。如果函数失败,则传回FALSE。GetLastError()可以获得更详细的失败原因。

CreateIoCompletionPort()通常要被调用两次。第一次先指定FileHandle为INVALID_HANDLE_VALUE,并设定ExistingCompletionPort为NULL,用以产生一个核心对象port。然后再为每一个欲附着上去的I/O调用一次CreateIoCompletionPort().后续的调用应该将ExistingCompletionPort设定为第一次调用所传回的handle。

在I/O Completion Port上等待

[cpp] view
plaincopy

BOOL GetQueuedCompletionStatus(

HANDLE CompletionPort, // CompletionPort参数对应于要在上面等待的完成端口

LPDWORD lpNumberOfBytes, // 参数负责在完成了一次I/O操作后(如WSASend或WSARecv),接收实际传输的字节数

PULONG_PTR lpCompletionKey, // 为原先传递进入CreateIoCompletionPort函数的套接字返回“单句柄数据”,最好将套接字句柄保存在这个“键”(Key)中。

LPOVERLAPPED* lpOverlapped, // 用于接收完成的I/O操作的重叠结果

DWORD dwMilliseconds // 用于指定调用者希望等待一个完成数据包在完成端口上出现的时间

);

返回值:

如果函数成功的将一个completion packet从队列中取出,并完成一个成功的操作,函数将传回TRUE,并填写由lpNumberOfBytes、lpCompletionKey、lpOverlapped所指向的变量内容。

如果操作失败,但completion packet已经从队列中取出,则函数传回FALSE,并将lpOverlapped设置为NULL,调用GetLastError()可以获得更详细的失败原因。同其他核心对象不同,在completion port上等待的线程是以先进后出的次序提供服务,这是因为所有的线程提供的服务都一样。

强行释放完成端口上的所有工作者线程

[cpp] view
plaincopy

BOOL PostQueuedCompletionStatus(

HANDLE CompletionPort, // 指定想向其发送一个完成数据包的完成端口对象

DWORD dwNumberOfBytesTransferred, // 参数负责在完成了一次I/O操作后(如WSASend或WSARecv),接收实际传输的字节数。设置为0,完成端口上的工作线程被强制释放

ULONG_PTR dwCompletionKey, // 为原先传递进入CreateIoCompletionPort函数的套接字返回“单句柄数据”,最好将套接字句柄保存在这个“键”(Key)中。

LPOVERLAPPED lpOverlapped // 用于接收完成的I/O操作的重叠结果

);

下面是Win32 程序设计上的一个例子:

客户端:

[cpp] view
plaincopy

/*

* EchoCli.c

*

* Sample code for "Multithreading Applications in Win32"

* This is from Chapter 6.

*

* This is a command line client to drive the ECHO server.

* Run the server in one Commmand Prompt Window,

* then run this program in one or more other windows.

* EchoCli will wait for you to type in some text when

* it starts up. Each line of text will be sent to the

* echo server on TCP port 5554.

*/

#include <windows.h>

#include <tchar.h>

#include <stdio.h>

#include <stdlib.h>

#include <string.h>

#include <winsock.h>

/* Function Prototypes */

void FatalError(char *s);

int writen(SOCKET sock, char *ptr, int nBytes);

int readline(SOCKET sock, char *ptr, int maxlen);

void DoClientLoop(FILE *fp, SOCKET sock);

/* Constants */

#define MAXLINE 512

#define SERVER_TCP_PORT 5554

#define SERVER_ADDRESS "127.0.0.1"

/*

* Error handler

*/

void FatalError(char *s)

{

fprintf(stdout, "%s\n", s);

exit(EXIT_FAILURE);

}

/*

* Write bytes to the port with proper block size handling.

*/

int writen(SOCKET sock, char *ptr, int nBytes)

{

int nleft;

int nwritten;

nleft = nBytes;

while (nleft > 0)

{

nwritten = send(sock,

ptr,

nBytes,

0

);

if (nwritten == SOCKET_ERROR)

{

fprintf(stdout, "Send Failed\n");

exit(1);

}

nleft -= nwritten;

ptr += nwritten;

}

return nBytes - nleft;

}

/*

* Read a line of text of the port. This version

* is very inefficient, but it's simple.

*/

int readline(SOCKET sock, char *ptr, int maxlen)

{

int n;

int rc;

char c;

for (n=1; n<maxlen; n++)

{

if ( ( rc= recv(sock, &c, 1, 0)) == 1)

{

*ptr++ = c;

if (c=='\n')

break;

}

else if (rc == 0)

{

if (n == 1)

return 0;

else

break;

}

else

return -1; /* Error */

}

*ptr = '\0';

return n;

}

int main(int argc, char *argv[])

{

WSADATA WsaData;

SOCKET sock;

struct sockaddr_in serv_addr;

int err;

puts("EchoCli - client for echo server sample program\n");

puts("Type a line of text followed by Return.");

puts("Exit this program by typing Ctrl+Z followed by Return.");

err = WSAStartup(0x0101, &WsaData);

if (err == SOCKET_ERROR)

FatalError("WSAStartup Failed");

/*

* Bind our local address

*/

memset(&serv_addr, 0, sizeof(serv_addr));

serv_addr.sin_family = AF_INET;

// Use the local host

serv_addr.sin_addr.s_addr = inet_addr(SERVER_ADDRESS);

serv_addr.sin_port = htons(SERVER_TCP_PORT);

/*

* Open a TCP socket (an Internet stream socket)

*/

sock = socket(AF_INET, SOCK_STREAM, 0);

if (sock < 0)

FatalError("socket() failed -- do you have TCP/IP networking installed?");

if (connect(sock, (struct sockaddr *) &serv_addr,

sizeof(serv_addr)) < 0)

FatalError("connect() failed -- is the server running?");

DoClientLoop(stdin, sock);

closesocket(sock);

return EXIT_SUCCESS;

}

/*

* As long as there is input from "fp", copy it to

* the server, read what the server gives back,

* and print it.

*/

void DoClientLoop(FILE *fp, SOCKET sock)

{

int n;

char sendline[MAXLINE];

char recvline[MAXLINE+1];

while (fgets(sendline, MAXLINE, fp) != NULL)

{

n = strlen(sendline);

if (writen(sock, sendline, n) != n)

FatalError("DoClientLoop: writen() error");

n = readline(sock, recvline, MAXLINE);

if (n < 0)

FatalError("DoClientLoop: readline() error");

recvline
= '\0';

fputs(recvline, stdout);

}

if (ferror(fp))

FatalError("DoClientLoop: error reading file");

}

服务器端:

[cpp] view
plaincopy

/*

* EchoCli.c

*

* Sample code for "Multithreading Applications in Win32"

* This is from Chapter 6.

*

* This is a command line client to drive the ECHO server.

* Run the server in one Commmand Prompt Window,

* then run this program in one or more other windows.

* EchoCli will wait for you to type in some text when

* it starts up. Each line of text will be sent to the

* echo server on TCP port 5554.

*/

#include <windows.h>

#include <tchar.h>

#include <stdio.h>

#include <stdlib.h>

#include <string.h>

#include <winsock.h>

/* Function Prototypes */

void FatalError(char *s);

int writen(SOCKET sock, char *ptr, int nBytes);

int readline(SOCKET sock, char *ptr, int maxlen);

void DoClientLoop(FILE *fp, SOCKET sock);

/* Constants */

#define MAXLINE 512

#define SERVER_TCP_PORT 5554

#define SERVER_ADDRESS "127.0.0.1"

/*

* Error handler

*/

void FatalError(char *s)

{

fprintf(stdout, "%s\n", s);

exit(EXIT_FAILURE);

}

/*

* Write bytes to the port with proper block size handling.

*/

int writen(SOCKET sock, char *ptr, int nBytes)

{

int nleft;

int nwritten;

nleft = nBytes;

while (nleft > 0)

{

nwritten = send(sock,

ptr,

nBytes,

0

);

if (nwritten == SOCKET_ERROR)

{

fprintf(stdout, "Send Failed\n");

exit(1);

}

nleft -= nwritten;

ptr += nwritten;

}

return nBytes - nleft;

}

/*

* Read a line of text of the port. This version

* is very inefficient, but it's simple.

*/

int readline(SOCKET sock, char *ptr, int maxlen)

{

int n;

int rc;

char c;

for (n=1; n<maxlen; n++)

{

if ( ( rc= recv(sock, &c, 1, 0)) == 1)

{

*ptr++ = c;

if (c=='\n')

break;

}

else if (rc == 0)

{

if (n == 1)

return 0;

else

break;

}

else

return -1; /* Error */

}

*ptr = '\0';

return n;

}

int main(int argc, char *argv[])

{

WSADATA WsaData;

SOCKET sock;

struct sockaddr_in serv_addr;

int err;

puts("EchoCli - client for echo server sample program\n");

puts("Type a line of text followed by Return.");

puts("Exit this program by typing Ctrl+Z followed by Return.");

err = WSAStartup(0x0101, &WsaData);

if (err == SOCKET_ERROR)

FatalError("WSAStartup Failed");

/*

* Bind our local address

*/

memset(&serv_addr, 0, sizeof(serv_addr));

serv_addr.sin_family = AF_INET;

// Use the local host

serv_addr.sin_addr.s_addr = inet_addr(SERVER_ADDRESS);

serv_addr.sin_port = htons(SERVER_TCP_PORT);

/*

* Open a TCP socket (an Internet stream socket)

*/

sock = socket(AF_INET, SOCK_STREAM, 0);

if (sock < 0)

FatalError("socket() failed -- do you have TCP/IP networking installed?");

if (connect(sock, (struct sockaddr *) &serv_addr,

sizeof(serv_addr)) < 0)

FatalError("connect() failed -- is the server running?");

DoClientLoop(stdin, sock);

closesocket(sock);

return EXIT_SUCCESS;

}

/*

* As long as there is input from "fp", copy it to

* the server, read what the server gives back,

* and print it.

*/

void DoClientLoop(FILE *fp, SOCKET sock)

{

int n;

char sendline[MAXLINE];

char recvline[MAXLINE+1];

while (fgets(sendline, MAXLINE, fp) != NULL)

{

n = strlen(sendline);

if (writen(sock, sendline, n) != n)

FatalError("DoClientLoop: writen() error");

n = readline(sock, recvline, MAXLINE);

if (n < 0)

FatalError("DoClientLoop: readline() error");

recvline
= '\0';

fputs(recvline, stdout);

}

if (ferror(fp))

FatalError("DoClientLoop: error reading file");

}

对于完成端口的更多内容可以参考下面一些资料:

完成端口(CompletionPort)详解 - 手把手教你玩转网络编程系列之三

完成端口通讯服务器设计
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: