win32命名管道/完成端口用法练习
2015-10-18 22:05
573 查看
所谓完成端口,就是当事件触发时,io端口操作已完成。
此时完成的是上次执行过的操作,一般是readfile/writefile/waitforconnect。
所以一般的流程是:
添加等待事件->连接成功,readfile(相当于注册回调)。
写动作单独处理。
windows完成端口跟linux的epoll不同。
epoll的特点是,事件触发时,只是通知你io端口操作就绪,要自己执行io操作
客户端代码也贴出来吧:
此时完成的是上次执行过的操作,一般是readfile/writefile/waitforconnect。
所以一般的流程是:
添加等待事件->连接成功,readfile(相当于注册回调)。
写动作单独处理。
windows完成端口跟linux的epoll不同。
epoll的特点是,事件触发时,只是通知你io端口操作就绪,要自己执行io操作
// cpio_server.cpp : 定义控制台应用程序的入口点。 // #include "stdafx.h" #include <vector> #include <Windows.h> #include <iostream> #include <algorithm> #include <tchar.h> #include <process.h> #define OP_ACCEPT 1 #define OP_READ 2 #define OP_WRITE 3 #define THREAD_NO 4 #define SIZE 1024 #define PIPE_NAME "\\\\.\\pipe\\completeio_pipe" using namespace std; struct PipeNode { PipeNode():hPipe(NULL), nOperationType(OP_ACCEPT),nLen(0) { memset(buf, 0, SIZE); memset(&hOver, 0, sizeof(hOver)); } PipeNode(HANDLE p):hPipe(p), nOperationType(OP_ACCEPT),nLen(0) { memset(buf, 0, SIZE); memset(&hOver, 0, sizeof(hOver)); } PipeNode(HANDLE p, OVERLAPPED op):hPipe(p), hOver(op), nOperationType(OP_ACCEPT),nLen(0) { memset(buf, 0, SIZE); } HANDLE hPipe; OVERLAPPED hOver; volatile DWORD nOperationType; DWORD nLen; char buf[SIZE]; }; std::vector<PipeNode*> g_pNodeList; UINT WINAPI pipeThreadFun(LPVOID lpParam) { HANDLE hCompletion = (HANDLE)lpParam; DWORD dwTrans = 0, dwErr = 0; PipeNode* pipeNode = NULL; OVERLAPPED *pOverlapped = NULL; while(TRUE) { BOOL bOK = ::GetQueuedCompletionStatus(hCompletion, &dwTrans, (PULONG_PTR)&pipeNode, &pOverlapped, INFINITE); dwErr = GetLastError(); if(pOverlapped == NULL) { printf("*******pOverlapped nulll: %d\r\n", GetLastError()); continue; } // 外部主动发起退出信号 if((int)pipeNode == -1) { printf("工作线程退出信号: %d\r\n", GetLastError()); break; } if(!bOK) { if (dwErr == ERROR_INVALID_HANDLE) { printf("完成端口失效: %d\r\n", GetLastError()); break; } cout<< "有管道出错, err code: " << dwErr << ", threadid: " << GetCurrentThreadId() << "node addr: " << (int)pipeNode << endl; auto f = find(g_pNodeList.begin(), g_pNodeList.end(), pipeNode); if( f != g_pNodeList.end()) { cout << "删除节点" <<endl; cout.flush(); g_pNodeList.erase(f); CloseHandle(pipeNode->hPipe); delete pipeNode; } continue; } auto len = pipeNode->hOver.InternalHigh; auto ilen = pOverlapped->InternalHigh; switch(pipeNode->nOperationType) { case OP_READ: cout << "读取:" << pipeNode->buf << ", thread id: " << GetCurrentThreadId() << endl; cout.flush(); break; case OP_WRITE: printf("write pipe EVENT:len: %d, Thread id:\r\n", dwTrans, GetCurrentThreadId()); pipeNode->nOperationType = OP_READ; continue; case OP_ACCEPT: printf("---------------客户端连接----------------%d\r\n", GetCurrentThreadId()); break; } memset(pipeNode->buf, 0 , sizeof(pipeNode->buf)); pipeNode->nOperationType = OP_READ; BOOL fSuccess = ReadFile( pipeNode->hPipe, pipeNode->buf, SIZE, &pipeNode->nLen, &pipeNode->hOver); if (fSuccess && pipeNode->nLen != 0) { //虽然有数据,但是不能在本次处理。 //因为完成端口会再下次触发事件,所以不再在这里处理数据,这是完成端口的关键点 printf("bSuccess-----iopending:len: %d, Thread id:%d\r\n", dwTrans, GetCurrentThreadId()); continue; } // The read operation is still pending. dwErr = GetLastError(); if (! fSuccess && (dwErr == ERROR_IO_PENDING)) { printf("-----iopending:len: %d, Thread id:%d\r\n", dwTrans, GetCurrentThreadId()); continue; } else { //TODO: printf("other error, error code: %d, Thread id:%d\r\n", GetLastError(), GetCurrentThreadId()); } } return 3; } int _tmain(int argc, _TCHAR* argv[]) { HANDLE hCompletion = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, THREAD_NO); if ( hCompletion == NULL) { cout << "完成端口创建失败" << endl; return -1; } HANDLE hThreadArray[THREAD_NO] = {0}; for(int i = 1; i <= THREAD_NO; i++) { hThreadArray[i - 1] = (HANDLE)_beginthreadex(NULL,0,pipeThreadFun,(void*)hCompletion,0, NULL); } char c = 0; while('q' != (c = getchar())) { if (c == '\n') continue; if(c == '1' && g_pNodeList.size() > 0) { char data[16] = "你好,管道."; DWORD rLen = 0; g_pNodeList.front()->nOperationType = OP_WRITE; auto l = WriteFile(g_pNodeList.front()->hPipe,data,strlen(data),&rLen,&g_pNodeList.front()->hOver); cout << "write pipe len: " << rLen << endl; continue; } HANDLE pipe = CreateNamedPipe( PIPE_NAME, PIPE_ACCESS_DUPLEX|FILE_FLAG_OVERLAPPED, PIPE_TYPE_MESSAGE|PIPE_READMODE_MESSAGE|PIPE_WAIT|PIPE_ACCEPT_REMOTE_CLIENTS, PIPE_UNLIMITED_INSTANCES, SIZE,SIZE,NMPWAIT_USE_DEFAULT_WAIT,NULL); if(INVALID_HANDLE_VALUE == pipe) { cout << "创建命名管道失败了" << endl; break; } else cout << "创建管道成功" << endl; auto node = new PipeNode(pipe); auto bIn = hCompletion == CreateIoCompletionPort(pipe, hCompletion, (ULONG_PTR)node, 0); auto b = ConnectNamedPipe(node->hPipe, &node->hOver) ? TRUE : (GetLastError() == ERROR_IO_PENDING || GetLastError() == ERROR_PIPE_CONNECTED); auto ee = GetLastError(); if (!b) { cout << "ConnectNamedPipe失败了" << endl; break; } g_pNodeList.push_back(node); } for(int i = 0; i < THREAD_NO; i++) { PostQueuedCompletionStatus(hCompletion,0, -1, NULL); } for(int i = 0; i < THREAD_NO; i++) { WaitForSingleObject(hThreadArray[i], INFINITE); CloseHandle(hThreadArray[i]); } for(auto i = g_pNodeList.begin(); i != g_pNodeList.end();) { i = g_pNodeList.erase(i); } CloseHandle(hCompletion); return 0; }
客户端代码也贴出来吧:
// cpio_client.cpp : 定义控制台应用程序的入口点。 // #include "stdafx.h" #include <windows.h> #include <stdio.h> #include <conio.h> #include <tchar.h> #include <iostream> #define BUFSIZE 1024 #define PIPE_NAME "\\\\.\\pipe\\completeio_pipe" using namespace std; UINT WINAPI pipeReadFun(LPVOID lpParam) { HANDLE hPipe = (HANDLE) lpParam; DWORD fSuccess = 0; do { DWORD cbRead = 0, b = 0, c = 0; char chBuf[BUFSIZE] = {0}; fSuccess = PeekNamedPipe(hPipe,chBuf,128, &cbRead,&b, &c); if (fSuccess == 0) { cout << "读取出错,退出线程:" << GetLastError() <<endl; break; } if (cbRead == 0) { Sleep(10); continue; } fSuccess = ReadFile(hPipe, chBuf, BUFSIZE, &cbRead, NULL); if ( ! fSuccess && GetLastError() != ERROR_MORE_DATA ) break; cout << "客户端管道读取:"<< chBuf <<endl; } while (fSuccess); if ( ! fSuccess) { cout << ("ReadFile from pipe failed. GLE=%d\n") << GetLastError()<< endl; return 2; } return 2; } int _tmain(int argc, _TCHAR* argv[]) { HANDLE hPipe =NULL; LPTSTR lpvMessage=TEXT("Default message from client."); TCHAR chBuf[BUFSIZE] = {0}; BOOL fSuccess = FALSE; DWORD cbRead = 0, cbToWrite = 0, cbWritten = 0, dwMode = 0; LPTSTR lpszPipename = PIPE_NAME; // Try to open a named pipe; wait for it, if necessary. while (1) { hPipe = CreateFile( lpszPipename, // pipe name GENERIC_READ | // read and write access GENERIC_WRITE, 0, // no sharing NULL, // default security attributes OPEN_EXISTING, // opens existing pipe 0, // default attributes NULL); // no template file if (hPipe != INVALID_HANDLE_VALUE) break; // Exit if an error other than ERROR_PIPE_BUSY occurs. if (GetLastError() != ERROR_PIPE_BUSY) { printf("Could not open pipe. GLE=%d\n", GetLastError() ); return -1; } printf("wait WaitNamedPipe ."); if ( ! WaitNamedPipe(lpszPipename, 20000)) { printf("Could not open pipe: 20 second wait timed out."); return -1; } } auto h = (HANDLE)_beginthreadex(NULL,0,pipeReadFun,hPipe,0,NULL); CloseHandle(h); int c = 0, d = 0; char data[50] = {0}; while('q' != (d = getchar())) { if (d == '\n') continue; sprintf(data, "%d", c++); auto fSuccess = WriteFile( hPipe, data, strlen(data), &cbWritten, NULL); if(!fSuccess) { printf("write pipe failed:%d\r\n", GetLastError()); break; } else { printf("send pipe :%s\r\n", data); } } printf("\n<End of message, press ENTER to terminate connection and exit>"); _getch(); CloseHandle(hPipe); return 0; }
相关文章推荐
- CentOS yum 升级Python2.6 到 2.7
- java冒泡排序法
- 九度OJ 1050:完数 (数字特性)
- asp.net 回发或回调参数无效的各种情况分析及解决办法
- 九度OJ 1050:完数 (数字特性)
- C# webrequest 抓取数据时,多个域Cookie的问题
- 使用Vim编辑、运行Processing程序
- 中文分词器IKAnalyzer——IKQueryParser主类分析
- 反转链表
- 织梦 dede:channelartlist标签获取项目总数的方法
- Android目录
- ASP.NET5 Beta8
- android.view.WindowLeaked解决办法
- 第六周--数据结构之自建算法库之迷宫问题(用栈结构)
- ZOJ 3202 (A)
- EffectiveC#16--垃圾最小化
- 单配送物流算法(简化为旅行商问题)
- 阿里RocketMQ Quick Start
- Java小陷阱
- CF Rating 2015.10