您的位置:首页 > 编程语言 > C语言/C++

C++ SOCKET通信模型(三)IOApc

2017-08-28 11:55 323 查看
说IOCP之前,不得不说下基于Overlapped IO模型的另外一种设计思路,上篇说的是基于事件通知的,这篇就说下完成例程,也可以说是回调。基于事件通知,始终有每个线程最多只能监听64个事件的限制,就算经过我上篇写的多线程优化,但线程数始终是有限的,我8G 内存 大概就1500多个线程左右,那么 1500*64=9W6,看上去感觉也足够了,但过多的上下文切换不说,离单机可连接的socket最大值还差很远。一般我做服务器,要求线程数不多余单机线程数的两倍,所以不可能去开这么多线程。接下来就有了另外一种思路,也就是微软以前大力推广的APC回调队列。

先看看百科上的WSARecv,看过上篇文章的应该会发现,WSARecv里面最后一个参数我写的NULL

int WSARecv(

SOCKET s, // 当然是投递这个操作的套接字

,与Recv函数不同

// 这里需要一个由WSABUF结构构成的数组

DWORD dwBufferCount, // 数组中WSABUF结构的数量

LPDWORD lpNumberOfBytesRecvd, // 如果接收操作立即完成,这里会返回函数调用所接收到的字节数

LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine // 完成例程中将会用到的参数,我们这里设置为 NULL

);

在最后一个参数上绑定完成例程代替事件通知,即与IOEvent可达到一致的效果。维护也更简单,不再需要事件数组,所以client的存储也就不用再与事件保持对应关系,可以生成ID,放在map中,看起来更切合实际。至此,不得不说APC的缺陷:1、若线程被占用,APC队列上的回调将无法得到执行,必须SleepEx,不能阻塞,不能Sleep 2、APC回调执行后还是在执行线程,无法转接到其他线程,所以必须在最开始与accept分离 3、既然SleepEx 是休眠,那么无可避免的是,最开始的时候会有一定延迟,但服务器在热状态的时候几乎不造成影响
4、APC上回调唤醒SleepEx后 函数内部的语句也会执行,所以会有多余的重复检测,我这是(_acceptLock[I].lock();if(_acceptDeque[I].size()==0))5、负载均衡问题成为难题,由于线程里面都是回调,每个回调执行时间是未知的,可能造成一些线程很忙,一些线程比较闲的情况

那么还是来看代码吧:非调试 请关掉输出,不然单个Do 执行速度太慢了,线程内对应的回调都是顺序执行的,具体请看APC队列

server:

// IOApc.cpp: 定义控制台应用程序的入口点。
//

#include "stdafx.h"
#include <iostream>
#include<WinSock2.h>
#include<process.h>
#include<mutex>
#include <deque>
#include <map>
#pragma comment(lib,"ws2_32.lib")
unsigned int WINAPI CreateServ(LPVOID args);
unsigned int WINAPI Proc(LPVOID args);
void CALLBACK Do(IN DWORD dwError,
IN DWORD cbTransferred,
IN LPWSAOVERLAPPED lpOverlapped,
IN DWORD dwFlags);
using namespace std;
const int _thread_count = 8;
const int _bufLen = 1024;

struct Client
{
WSAOVERLAPPED overlapped;
SOCKET s;
WSABUF buf;
int procId;
int id;
};

DWORD dwRecvCount = 0;
DWORD nFlag = 0;
deque<Client*> _acceptDeque[_thread_count];
mutex _acceptLock[_thread_count];
map<int, Client*> _clients;
mutex m;
int main()
{
_beginthreadex(0, 0, CreateServ, 0, 0, 0);
for (int i = 0; i < _thread_count; i++) {
int* temp = new int(i);
_beginthreadex(0, 0, Proc, temp, 0, 0);
}
cin.get();
cin.get();
return 0;
}
void release(Client* c)
{
m.lock();
_clients.erase(c->id);
m.unlock();

cout << "release" << endl;
closesocket(c->s); //关闭套接字
delete[] c->buf.buf;
delete c;
}
unsigned int WINAPI Proc(LPVOID args)
{
int I = *(int*)args;
while (true)
{
while (true)
{
_acceptLock[I].lock();
if(_acceptDeque[I].size()==0)
{
_acceptLock[I].unlock();
break;
}
Client* c= _acceptDeque[I].front();
_acceptDeque[I].pop_front();
_acceptLock[I].unlock();

c->procId = I;

m.lock();
int i;
do{
i = rand() % MAXINT32;
} while (_clients.find(i) != _clients.end());
c->id = i;
_clients.insert(pair<int, Client*>(i, c));
m.unlock();
if(WSARecv(c->s, &c->buf, 1, &dwRecvCount, &nFlag, &c->overlapped, Do)== SOCKET_ERROR)
{
int err = WSAGetLastError();
if(err!=WSA_IO_PENDING)
{
release(c);
}
}
}
SleepEx(1000, true);
}
}

void CALLBACK Do(IN DWORD dwError,
IN DWORD cbTransferred,
IN LPWSAOVERLAPPED lpOverlapped,
IN DWORD dwFlags)
{
Client* c = (Client*)lpOverlapped;
if (dwError != 0 || cbTransferred == 0) //有错误发生或者对方断开连接
{
release(c);
return;
}
cout << "proc by:" << c->procId << endl;
//cout << c->buf.buf << endl;
memset(c->buf.buf, 0, _bufLen);
char buf[128];
sprintf_s(buf, "hello client");
send(c->s, buf, 128, 0);
if (WSARecv(c->s, &c->buf, 1, &dwRecvCount, &nFlag, &c->overlapped, Do) == SOCKET_ERROR)
{
int err = WSAGetLastError();
if (err != WSA_IO_PENDING)
{
release(c);
}
}
}
unsigned int WINAPI CreateServ(LPVOID args) {
srand(time(0));
WORD wVersion;
WSADATA wsaData;
int err;
wVersion = MAKEWORD(2, 1);
err = WSAStartup(wVersion, &wsaData);
if (err != 0) {
return 0;
}
if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 1) {
WSACleanup();
return 0;
}
SOCKET sockSrv = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, 0, 0, WSA_FLAG_OVERLAPPED);
const char chOpt = 1;
setsockopt(sockSrv, IPPROTO_TCP, TCP_NODELAY, &chOpt, sizeof(chOpt));

int nSendBufLen = 16 * 1024 * 1024;
setsockopt(sockSrv, SOL_SOCKET, SO_SNDBUF, (const char*)&nSendBufLen, sizeof(int));

SOCKADDR_IN addrSrv;
addrSrv.sin_addr.S_un.S_addr = htonl(ADDR_ANY);
addrSrv.sin_family = AF_INET;
addrSrv.sin_port = htons(6001);

::bind(sockSrv, (SOCKADDR*)&addrSrv, sizeof(SOCKADDR));

err = listen(sockSrv, SOMAXCONN);
if (err == SOCKET_ERROR) {
cout << "listen failed" << endl;
WSACleanup();
return 0;
}

SOCKADDR_IN remoteAddr;
int addrSize = sizeof(remoteAddr);
//accept loop

while (true) {
SOCKET s = accept(sockSrv, (SOCKADDR*)&remoteAddr, &addrSize);
Client* c = new Client;
c->s = s;
char*  buf = new char[_bufLen];
memset(buf, 0, _bufLen);
c->buf.buf = buf;
c->buf.len = _bufLen;
int min=0;
for(int i=1;i<_thread_count;i++)
{
if(_acceptDeque[i].size()<_acceptDeque[min].size())
{
min = i;
}
}
_acceptLock[min].lock();
_acceptDeque[min].push_back(c);
_acceptLock[min].unlock();
}
return 0;
}


client:

与C++ SOCKET通信模型(一)相同,这里就不再重复粘贴出来
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  C++ Socket