C++ SOCKET通信模型(三)IOApc
2017-08-28 11:55
323 查看
说IOCP之前,不得不说下基于Overlapped IO模型的另外一种设计思路,上篇说的是基于事件通知的,这篇就说下完成例程,也可以说是回调。基于事件通知,始终有每个线程最多只能监听64个事件的限制,就算经过我上篇写的多线程优化,但线程数始终是有限的,我8G 内存 大概就1500多个线程左右,那么 1500*64=9W6,看上去感觉也足够了,但过多的上下文切换不说,离单机可连接的socket最大值还差很远。一般我做服务器,要求线程数不多余单机线程数的两倍,所以不可能去开这么多线程。接下来就有了另外一种思路,也就是微软以前大力推广的APC回调队列。
先看看百科上的WSARecv,看过上篇文章的应该会发现,WSARecv里面最后一个参数我写的NULL
int WSARecv(
SOCKET s, // 当然是投递这个操作的套接字
,与Recv函数不同
// 这里需要一个由WSABUF结构构成的数组
DWORD dwBufferCount, // 数组中WSABUF结构的数量
LPDWORD lpNumberOfBytesRecvd, // 如果接收操作立即完成,这里会返回函数调用所接收到的字节数
LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine // 完成例程中将会用到的参数,我们这里设置为 NULL
);
在最后一个参数上绑定完成例程代替事件通知,即与IOEvent可达到一致的效果。维护也更简单,不再需要事件数组,所以client的存储也就不用再与事件保持对应关系,可以生成ID,放在map中,看起来更切合实际。至此,不得不说APC的缺陷:1、若线程被占用,APC队列上的回调将无法得到执行,必须SleepEx,不能阻塞,不能Sleep 2、APC回调执行后还是在执行线程,无法转接到其他线程,所以必须在最开始与accept分离 3、既然SleepEx 是休眠,那么无可避免的是,最开始的时候会有一定延迟,但服务器在热状态的时候几乎不造成影响
4、APC上回调唤醒SleepEx后 函数内部的语句也会执行,所以会有多余的重复检测,我这是(_acceptLock[I].lock();if(_acceptDeque[I].size()==0))5、负载均衡问题成为难题,由于线程里面都是回调,每个回调执行时间是未知的,可能造成一些线程很忙,一些线程比较闲的情况
那么还是来看代码吧:非调试 请关掉输出,不然单个Do 执行速度太慢了,线程内对应的回调都是顺序执行的,具体请看APC队列
server:
client:
与C++ SOCKET通信模型(一)相同,这里就不再重复粘贴出来
先看看百科上的WSARecv,看过上篇文章的应该会发现,WSARecv里面最后一个参数我写的NULL
int WSARecv(
SOCKET s, // 当然是投递这个操作的套接字
,与Recv函数不同
// 这里需要一个由WSABUF结构构成的数组
DWORD dwBufferCount, // 数组中WSABUF结构的数量
LPDWORD lpNumberOfBytesRecvd, // 如果接收操作立即完成,这里会返回函数调用所接收到的字节数
LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine // 完成例程中将会用到的参数,我们这里设置为 NULL
);
在最后一个参数上绑定完成例程代替事件通知,即与IOEvent可达到一致的效果。维护也更简单,不再需要事件数组,所以client的存储也就不用再与事件保持对应关系,可以生成ID,放在map中,看起来更切合实际。至此,不得不说APC的缺陷:1、若线程被占用,APC队列上的回调将无法得到执行,必须SleepEx,不能阻塞,不能Sleep 2、APC回调执行后还是在执行线程,无法转接到其他线程,所以必须在最开始与accept分离 3、既然SleepEx 是休眠,那么无可避免的是,最开始的时候会有一定延迟,但服务器在热状态的时候几乎不造成影响
4、APC上回调唤醒SleepEx后 函数内部的语句也会执行,所以会有多余的重复检测,我这是(_acceptLock[I].lock();if(_acceptDeque[I].size()==0))5、负载均衡问题成为难题,由于线程里面都是回调,每个回调执行时间是未知的,可能造成一些线程很忙,一些线程比较闲的情况
那么还是来看代码吧:非调试 请关掉输出,不然单个Do 执行速度太慢了,线程内对应的回调都是顺序执行的,具体请看APC队列
server:
// IOApc.cpp: 定义控制台应用程序的入口点。 // #include "stdafx.h" #include <iostream> #include<WinSock2.h> #include<process.h> #include<mutex> #include <deque> #include <map> #pragma comment(lib,"ws2_32.lib") unsigned int WINAPI CreateServ(LPVOID args); unsigned int WINAPI Proc(LPVOID args); void CALLBACK Do(IN DWORD dwError, IN DWORD cbTransferred, IN LPWSAOVERLAPPED lpOverlapped, IN DWORD dwFlags); using namespace std; const int _thread_count = 8; const int _bufLen = 1024; struct Client { WSAOVERLAPPED overlapped; SOCKET s; WSABUF buf; int procId; int id; }; DWORD dwRecvCount = 0; DWORD nFlag = 0; deque<Client*> _acceptDeque[_thread_count]; mutex _acceptLock[_thread_count]; map<int, Client*> _clients; mutex m; int main() { _beginthreadex(0, 0, CreateServ, 0, 0, 0); for (int i = 0; i < _thread_count; i++) { int* temp = new int(i); _beginthreadex(0, 0, Proc, temp, 0, 0); } cin.get(); cin.get(); return 0; } void release(Client* c) { m.lock(); _clients.erase(c->id); m.unlock(); cout << "release" << endl; closesocket(c->s); //关闭套接字 delete[] c->buf.buf; delete c; } unsigned int WINAPI Proc(LPVOID args) { int I = *(int*)args; while (true) { while (true) { _acceptLock[I].lock(); if(_acceptDeque[I].size()==0) { _acceptLock[I].unlock(); break; } Client* c= _acceptDeque[I].front(); _acceptDeque[I].pop_front(); _acceptLock[I].unlock(); c->procId = I; m.lock(); int i; do{ i = rand() % MAXINT32; } while (_clients.find(i) != _clients.end()); c->id = i; _clients.insert(pair<int, Client*>(i, c)); m.unlock(); if(WSARecv(c->s, &c->buf, 1, &dwRecvCount, &nFlag, &c->overlapped, Do)== SOCKET_ERROR) { int err = WSAGetLastError(); if(err!=WSA_IO_PENDING) { release(c); } } } SleepEx(1000, true); } } void CALLBACK Do(IN DWORD dwError, IN DWORD cbTransferred, IN LPWSAOVERLAPPED lpOverlapped, IN DWORD dwFlags) { Client* c = (Client*)lpOverlapped; if (dwError != 0 || cbTransferred == 0) //有错误发生或者对方断开连接 { release(c); return; } cout << "proc by:" << c->procId << endl; //cout << c->buf.buf << endl; memset(c->buf.buf, 0, _bufLen); char buf[128]; sprintf_s(buf, "hello client"); send(c->s, buf, 128, 0); if (WSARecv(c->s, &c->buf, 1, &dwRecvCount, &nFlag, &c->overlapped, Do) == SOCKET_ERROR) { int err = WSAGetLastError(); if (err != WSA_IO_PENDING) { release(c); } } } unsigned int WINAPI CreateServ(LPVOID args) { srand(time(0)); WORD wVersion; WSADATA wsaData; int err; wVersion = MAKEWORD(2, 1); err = WSAStartup(wVersion, &wsaData); if (err != 0) { return 0; } if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 1) { WSACleanup(); return 0; } SOCKET sockSrv = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, 0, 0, WSA_FLAG_OVERLAPPED); const char chOpt = 1; setsockopt(sockSrv, IPPROTO_TCP, TCP_NODELAY, &chOpt, sizeof(chOpt)); int nSendBufLen = 16 * 1024 * 1024; setsockopt(sockSrv, SOL_SOCKET, SO_SNDBUF, (const char*)&nSendBufLen, sizeof(int)); SOCKADDR_IN addrSrv; addrSrv.sin_addr.S_un.S_addr = htonl(ADDR_ANY); addrSrv.sin_family = AF_INET; addrSrv.sin_port = htons(6001); ::bind(sockSrv, (SOCKADDR*)&addrSrv, sizeof(SOCKADDR)); err = listen(sockSrv, SOMAXCONN); if (err == SOCKET_ERROR) { cout << "listen failed" << endl; WSACleanup(); return 0; } SOCKADDR_IN remoteAddr; int addrSize = sizeof(remoteAddr); //accept loop while (true) { SOCKET s = accept(sockSrv, (SOCKADDR*)&remoteAddr, &addrSize); Client* c = new Client; c->s = s; char* buf = new char[_bufLen]; memset(buf, 0, _bufLen); c->buf.buf = buf; c->buf.len = _bufLen; int min=0; for(int i=1;i<_thread_count;i++) { if(_acceptDeque[i].size()<_acceptDeque[min].size()) { min = i; } } _acceptLock[min].lock(); _acceptDeque[min].push_back(c); _acceptLock[min].unlock(); } return 0; }
client:
与C++ SOCKET通信模型(一)相同,这里就不再重复粘贴出来
相关文章推荐
- C++ SOCKET通信模型(二)IOEvent
- HTML5之WebSocket入门3 -通信模型socket.io
- 架构设计:系统间通信(3)——IO通信模型和JAVA实践 上篇
- Netty(一):IO通信模型
- C/S通信---服务器IO多路复用模型之poll的使用
- IO通信模型和Netty 上篇
- 架构设计:系统间通信(5)——IO通信模型和JAVA实践 下篇
- C++ SOCKET通信模型(一)select
- IO通信模型(三)多路复用IO
- IO通信模型和Netty 下篇
- 网络通信模型(IO模型)学习摘要
- 架构设计:系统间通信(4)——IO通信模型和JAVA实践 中篇
- 架构设计:系统间通信(7)——IO通信模型和Netty 下篇
- Linux网络通信编程(套接字模型TCP\UDP与IO多路复用模型select\poll\epoll)
- 突袭HTML5之WebSocket入门3 - 通信模型socket.io
- 架构设计:系统间通信(3)——IO通信模型和JAVA实践 上篇
- 架构设计:系统间通信(5)——IO通信模型和JAVA实践 下篇
- IO通信模型(一)同步阻塞模式BIO(Blocking IO)
- 系统间通信(4)——IO通信模型和JAVA实践 中篇
- 浅谈网络通信之IO模型