您的位置:首页 > 其它

win32命名管道/完成端口用法练习

2015-10-18 22:05 573 查看
所谓完成端口,就是当事件触发时,io端口操作已完成。

此时完成的是上次执行过的操作,一般是readfile/writefile/waitforconnect。

所以一般的流程是:

添加等待事件->连接成功,readfile(相当于注册回调)。

写动作单独处理。

windows完成端口跟linux的epoll不同。

epoll的特点是,事件触发时,只是通知你io端口操作就绪,要自己执行io操作

// cpio_server.cpp : 定义控制台应用程序的入口点。
//

#include "stdafx.h"
#include <vector>
#include <Windows.h>
#include <iostream>
#include <algorithm>
#include <tchar.h>
#include <process.h>

#define OP_ACCEPT   1
#define OP_READ     2
#define OP_WRITE    3
#define THREAD_NO   4
#define SIZE        1024
#define PIPE_NAME "\\\\.\\pipe\\completeio_pipe"
using namespace std;

struct PipeNode
{
PipeNode():hPipe(NULL), nOperationType(OP_ACCEPT),nLen(0)
{
memset(buf, 0, SIZE);
memset(&hOver, 0, sizeof(hOver));
}
PipeNode(HANDLE p):hPipe(p), nOperationType(OP_ACCEPT),nLen(0)
{
memset(buf, 0, SIZE);
memset(&hOver, 0, sizeof(hOver));
}
PipeNode(HANDLE p, OVERLAPPED op):hPipe(p), hOver(op), nOperationType(OP_ACCEPT),nLen(0)
{
memset(buf, 0, SIZE);
}

HANDLE hPipe;
OVERLAPPED hOver;
volatile DWORD nOperationType;
DWORD nLen;
char buf[SIZE];
};
std::vector<PipeNode*> g_pNodeList;
UINT WINAPI pipeThreadFun(LPVOID lpParam)
{
HANDLE hCompletion = (HANDLE)lpParam;
DWORD dwTrans = 0, dwErr = 0;
PipeNode* pipeNode = NULL;
OVERLAPPED *pOverlapped = NULL;
while(TRUE)
{
BOOL bOK = ::GetQueuedCompletionStatus(hCompletion, &dwTrans, (PULONG_PTR)&pipeNode, &pOverlapped, INFINITE);
dwErr = GetLastError();
if(pOverlapped == NULL)
{
printf("*******pOverlapped nulll: %d\r\n", GetLastError());
continue;
}

// 外部主动发起退出信号
if((int)pipeNode == -1)
{
printf("工作线程退出信号: %d\r\n", GetLastError());
break;
}
if(!bOK)
{
if (dwErr == ERROR_INVALID_HANDLE)
{

printf("完成端口失效: %d\r\n", GetLastError());
break;
}
cout<< "有管道出错, err code: " << dwErr << ", threadid: "
<< GetCurrentThreadId() << "node addr: " << (int)pipeNode << endl;
auto f = find(g_pNodeList.begin(), g_pNodeList.end(), pipeNode);
if( f != g_pNodeList.end())
{
cout << "删除节点" <<endl;
cout.flush();
g_pNodeList.erase(f);
CloseHandle(pipeNode->hPipe);
delete pipeNode;
}
continue;
}
auto len = pipeNode->hOver.InternalHigh;
auto ilen = pOverlapped->InternalHigh;
switch(pipeNode->nOperationType)
{
case OP_READ:
cout << "读取:" << pipeNode->buf << ", thread id: " << GetCurrentThreadId() << endl;
cout.flush();
break;
case OP_WRITE:
printf("write pipe EVENT:len: %d,  Thread id:\r\n", dwTrans, GetCurrentThreadId());
pipeNode->nOperationType = OP_READ;
continue;
case OP_ACCEPT:
printf("---------------客户端连接----------------%d\r\n", GetCurrentThreadId());
break;
}
memset(pipeNode->buf, 0 , sizeof(pipeNode->buf));
pipeNode->nOperationType = OP_READ;
BOOL fSuccess = ReadFile(
pipeNode->hPipe,
pipeNode->buf,
SIZE,
&pipeNode->nLen,
&pipeNode->hOver);
if (fSuccess && pipeNode->nLen != 0)
{
//虽然有数据,但是不能在本次处理。
//因为完成端口会再下次触发事件,所以不再在这里处理数据,这是完成端口的关键点
printf("bSuccess-----iopending:len: %d,  Thread id:%d\r\n", dwTrans, GetCurrentThreadId());
continue;
}
// The read operation is still pending.
dwErr = GetLastError();
if (! fSuccess && (dwErr == ERROR_IO_PENDING))
{
printf("-----iopending:len: %d,  Thread id:%d\r\n", dwTrans, GetCurrentThreadId());
continue;
}
else
{
//TODO:
printf("other error, error code: %d,  Thread id:%d\r\n", GetLastError(), GetCurrentThreadId());
}
}
return 3;
}
int _tmain(int argc, _TCHAR* argv[])
{
HANDLE hCompletion = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, THREAD_NO);
if ( hCompletion == NULL)
{
cout << "完成端口创建失败" << endl;
return -1;
}

HANDLE hThreadArray[THREAD_NO] = {0};
for(int i = 1; i <= THREAD_NO; i++)
{
hThreadArray[i - 1] = (HANDLE)_beginthreadex(NULL,0,pipeThreadFun,(void*)hCompletion,0, NULL);
}
char c = 0;
while('q' != (c = getchar()))
{
if (c == '\n')
continue;
if(c == '1' && g_pNodeList.size() > 0)
{
char data[16] = "你好,管道.";
DWORD rLen = 0;
g_pNodeList.front()->nOperationType = OP_WRITE;
auto l = WriteFile(g_pNodeList.front()->hPipe,data,strlen(data),&rLen,&g_pNodeList.front()->hOver);
cout << "write pipe len: " << rLen << endl;
continue;
}
HANDLE pipe = CreateNamedPipe(
PIPE_NAME,
PIPE_ACCESS_DUPLEX|FILE_FLAG_OVERLAPPED,
PIPE_TYPE_MESSAGE|PIPE_READMODE_MESSAGE|PIPE_WAIT|PIPE_ACCEPT_REMOTE_CLIENTS,
PIPE_UNLIMITED_INSTANCES,
SIZE,SIZE,NMPWAIT_USE_DEFAULT_WAIT,NULL);
if(INVALID_HANDLE_VALUE == pipe)
{
cout << "创建命名管道失败了" << endl;
break;
}
else
cout << "创建管道成功" << endl;
auto node = new PipeNode(pipe);
auto bIn = hCompletion == CreateIoCompletionPort(pipe, hCompletion, (ULONG_PTR)node, 0);

auto b = ConnectNamedPipe(node->hPipe, &node->hOver)
? TRUE : (GetLastError() == ERROR_IO_PENDING || GetLastError() == ERROR_PIPE_CONNECTED);
auto ee = GetLastError();
if (!b)
{
cout << "ConnectNamedPipe失败了" << endl;
break;
}
g_pNodeList.push_back(node);
}

for(int i = 0; i < THREAD_NO; i++)
{
PostQueuedCompletionStatus(hCompletion,0, -1, NULL);
}
for(int i = 0; i < THREAD_NO; i++)
{
WaitForSingleObject(hThreadArray[i], INFINITE);
CloseHandle(hThreadArray[i]);
}
for(auto i = g_pNodeList.begin(); i != g_pNodeList.end();)
{
i = g_pNodeList.erase(i);
}
CloseHandle(hCompletion);
return 0;
}


客户端代码也贴出来吧:

// cpio_client.cpp : 定义控制台应用程序的入口点。
//
#include "stdafx.h"
#include <windows.h>
#include <stdio.h>
#include <conio.h>
#include <tchar.h>
#include <iostream>

#define BUFSIZE     1024
#define PIPE_NAME "\\\\.\\pipe\\completeio_pipe"
using namespace std;

UINT WINAPI pipeReadFun(LPVOID lpParam)
{
HANDLE hPipe = (HANDLE) lpParam;
DWORD fSuccess = 0;
do
{
DWORD cbRead = 0, b = 0, c = 0;
char chBuf[BUFSIZE] = {0};
fSuccess = PeekNamedPipe(hPipe,chBuf,128, &cbRead,&b, &c);
if (fSuccess == 0)
{
cout << "读取出错,退出线程:" << GetLastError() <<endl;
break;
}
if (cbRead == 0)
{
Sleep(10);
continue;
}
fSuccess = ReadFile(hPipe, chBuf, BUFSIZE, &cbRead, NULL);

if ( ! fSuccess && GetLastError() != ERROR_MORE_DATA )
break;
cout << "客户端管道读取:"<< chBuf <<endl;
} while (fSuccess);

if ( ! fSuccess)
{
cout << ("ReadFile from pipe failed. GLE=%d\n") <<  GetLastError()<< endl;
return 2;
}
return 2;
}
int _tmain(int argc, _TCHAR* argv[])
{
HANDLE hPipe =NULL;
LPTSTR lpvMessage=TEXT("Default message from client.");
TCHAR  chBuf[BUFSIZE] = {0};
BOOL   fSuccess = FALSE;
DWORD  cbRead = 0, cbToWrite = 0, cbWritten = 0, dwMode = 0;
LPTSTR lpszPipename = PIPE_NAME;
// Try to open a named pipe; wait for it, if necessary.
while (1)
{
hPipe = CreateFile(
lpszPipename,   // pipe name
GENERIC_READ |  // read and write access
GENERIC_WRITE,
0,              // no sharing
NULL,           // default security attributes
OPEN_EXISTING,  // opens existing pipe
0,              // default attributes
NULL);          // no template file
if (hPipe != INVALID_HANDLE_VALUE)
break;
// Exit if an error other than ERROR_PIPE_BUSY occurs.
if (GetLastError() != ERROR_PIPE_BUSY)
{
printf("Could not open pipe. GLE=%d\n", GetLastError() );
return -1;
}
printf("wait WaitNamedPipe .");
if ( ! WaitNamedPipe(lpszPipename, 20000))
{
printf("Could not open pipe: 20 second wait timed out.");
return -1;
}
}
auto h = (HANDLE)_beginthreadex(NULL,0,pipeReadFun,hPipe,0,NULL);
CloseHandle(h);
int c = 0, d = 0;
char data[50] = {0};
while('q' != (d = getchar()))
{
if (d == '\n')
continue;
sprintf(data, "%d", c++);
auto fSuccess = WriteFile(
hPipe,
data,
strlen(data),
&cbWritten,
NULL);
if(!fSuccess)
{
printf("write pipe failed:%d\r\n", GetLastError());
break;
}
else
{
printf("send pipe :%s\r\n", data);
}
}
printf("\n<End of message, press ENTER to terminate connection and exit>");
_getch();
CloseHandle(hPipe);
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: