C++ SOCKET通信模型(七)异步epoll
2017-09-05 10:32
253 查看
这篇文章写得有问题,libaio+epoll的使用并没有这么简单,可以说要完善处理的话是非常非常的麻烦,并且有些地方要处理得很巧妙才行。由于相关内容涉及我写的毕业论文,所以暂不公开,希望理解
不得不说为这异步epoll模型还真是伤了神。。。主要问题就是在libaio的内核通知上,因为这东西实在太低级了,用起来还比较麻烦。我为了不再多开线程,实现和IOCP基本相同的原理,在PROC内部又用了次epoll,使其内核通知和用户通知都能在一个线程中得到相同处理。并且用map映射eventfd与收发类型的关系,使其能够在收到event的时候,通过eventfd能直接判断出接下来的操作。还有就是由于libaio需要将fd设置为O_DIRECT,所以无论收发都得用自己的缓冲区,不然会导致效率大大降低。这份代码有兴趣自己研究,具体的不多说了,写出来还没测试,不知道应用于实际项目中效果如何,大牛可以自己测下
yum install libaio-devel
-lpthread -std=c++11
#include <iostream>
#include<list>
#include<map>
#include<deque>
#include<mutex>
#include <atomic>
#include<condition_variable>
#include<sys/socket.h>
#include<sys/epoll.h>
#include<netinet/in.h>
#include <pthread.h>
#include<unistd.h>
#include<fcntl.h>
#include<signal.h>
#include<libaio.h>
#include<sys/eventfd.h>
#define RECV_EVENTS 1024
#define SEND_EVENTS 1024
#define PROC_EVENTS 1024
#define BUF_SEND_LEN 1024
#define BUF_RECV_LEN 1024
#define RECV_LEN 128
#define SEND_LEN 128
using namespace std;
typedef unsigned long long ull;
struct Client
{
iocb cb;
int id;
int fd;
char* recvbuf;
char* sendbuf;
};
void* CreateServ(void* args);
void* Proc(void* args);
using namespace std;
const int _thread_count = 8;
int _epfd;
int sockSrv;
int id = 0;
epoll_event _events[RECV_EVENTS] = { 0 };
epoll_event ev[_thread_count];
mutex m;
deque<epoll_event*> _deque;
map<int, Client*> _sockMap;
list<int> _removeList;
mutex lock4cv2;
condition_variable cv2;
int _thread_unfinish;
int _eventfd4send[_thread_count];
int _eventfd4recv[_thread_count];
int _proc_epfd[_thread_count];
map<int, int> _eventfd_type_map;
enum
{
RECV, SEND
};
int main()
{
pthread_t tid;
for (int i = 0; i < _thread_count; i++)
{
_proc_epfd[i] = epoll_create(PROC_EVENTS);
_eventfd4send[i] = eventfd(0, 0);
ev[0].data.fd = _eventfd4send[i];
ev[0].events = EPOLLIN | EPOLLET;
epoll_ctl(_proc_epfd[i], EPOLL_CTL_ADD, _eventfd4send[i], &ev[0]);
_eventfd_type_map.insert(pair<int, int>(_eventfd4send[i], SEND));
_eventfd4recv[i] = eventfd(0, 0);
ev[0].data.fd = _eventfd4recv[i];
ev[0].events = EPOLLIN | EPOLLET;
epoll_ctl(_proc_epfd[i], EPOLL_CTL_ADD, _eventfd4recv[i], &ev[0]);
_eventfd_type_map.insert(pair<int, int>(_eventfd4recv[i], RECV));
}
pthread_create(&tid, 0, CreateServ, 0);
for (int i = 0; i < _thread_count; i++)
{
int* temp = new int(i);
pthread_create(&tid, 0, Proc, temp);
}
cin.get();
cin.get();
return 0;
}
bool _isFinish()
{
return _thread_unfinish <= 0;
}
void release(int fd)
{
cout << "release:" << fd << endl;
ev[0].data.fd = fd;
epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, &ev[0]);
for (auto iter = _sockMap.begin(); iter != _sockMap.end(); ++iter)
{
if (iter->first == fd)
{
free(iter->second->recvbuf);
free(iter->second->sendbuf);
delete iter->second;
_sockMap.erase(iter);
break;
}
}
close(fd);
}
static void recv_callback(io_context_t ctx, struct iocb *iocb, long res, long res2)
{
Client* c = (Client*)iocb;
//cout << c->recvbuf << endl;
memset(c->recvbuf, 0, sizeof(char)* BUF_RECV_LEN);
sprintf(c->sendbuf, "hello client");
io_prep_pwrite(&c->cb, c->fd, c->sendbuf, SEND_LEN, 0);
}
void* Proc(void* args)
{
int* I = (int*)args;
int events_num = 0;
epoll_event threadEvents[PROC_EVENTS];
io_context_t ioctx;
io_event events[RECV_EVENTS];
iocb* io_submit_arr[RECV_EVENTS];
io_queue_init(RECV_EVENTS, &ioctx);
io_context_t ioctx4write;
iocb* io_submit_arr2[SEND_EVENTS];
io_queue_init(SEND_EVENTS, &ioctx4write);
while (true)
{
events_num = epoll_wait(_proc_epfd[*I], threadEvents, PROC_EVENTS, -1);
for (int i = 0; i < events_num; ++i) {
if (threadEvents[i].events == EPOLLIN)
{
int t = _eventfd_type_map[threadEvents[i].data.fd];
if (t == RECV)
{
int i = 0;
while (true)
{
m.lock();
if (_deque.size() == 0)
{
m.unlock();
break;
}
epoll_event* e = _deque.front();
_deque.pop_front();
m.unlock();
if (e->data.fd == -1)
{
continue;
}
if (e->data.fd == sockSrv)
{
}
else if (e->events == EPOLLIN)
{
Client* c = _sockMap[e->data.fd];
io_prep_pread(&c->cb, e->data.fd, c->recvbuf, RECV_LEN, 0);
io_set_eventfd(&c->cb, _eventfd4send[*I]);
io_set_callback(&c->cb, recv_callback);
io_submit_arr[i] = &c->cb;
i++;
e->events = 0;
}
else
{
Client* c = _sockMap[e->data.fd];
_removeList.push_back(c->fd);
io_cancel(ioctx4write, &c->cb, nullptr);
io_cancel(ioctx, &c->cb, nullptr);
}
}
io_submit(ioctx, i, io_submit_arr);
//此处一定要加锁。当wait还未准备好,但_isFinish刚刚检测完时_thread_unfinish -= 1;cv2.notify_all();这两句恰巧在之后执行,那就GG了,两边进入互相等待的状态
lock4cv2.lock();
_thread_unfinish -= 1;
cv2.notify_all();
lock4cv2.unlock();
}
else
{
ull num = 0;
read(threadEvents[i].data.fd, &num, sizeof(ull));
while (num > 0)
{
timespec tms;
tms.tv_sec = 0;
tms.tv_nsec = 0;
io_getevents(ioctx4write, 1, SEND_EVENTS, events, &tms);
int r = io_getevents(ioctx, 1, RECV_EVENTS, events, &tms);
if (r > 0)
{
int i = 0;
for (; i < r; ++i)
{
((io_callback_t)(events[i].data))(ioctx, events[i].obj, events[i].res, events[i].res2);
Client* c = (Client*)events[i].obj;
io_submit_arr2[i] = &c->cb;
}
io_submit(ioctx4write, i, io_submit_arr2);
}
num -= r;
}
}
}
threadEvents[i].events = 0;
}
}
}
void* CreateServ(void* args) {
ull num = 1;
int events_num = 0;
sockSrv = socket(AF_INET, SOCK_STREAM, 0);
// int nSendBufLen = 16 * 1024 * 1024;
// setsockopt(sockSrv, SOL_SOCKET, SO_SNDBUF, (const char*)&nSendBufLen, sizeof(int));
fcntl(sockSrv, F_SETFL, O_NONBLOCK);
// struct in_addr s;
// inet_pton(AF_INET, "127.0.0.1",(void*)&s);
sockaddr_in addrSrv;
addrSrv.sin_addr.s_addr = htonl(INADDR_ANY);
addrSrv.sin_family = AF_INET;
addrSrv.sin_port = htons(6001);
::bind(sockSrv, (sockaddr*)&addrSrv, sizeof(sockaddr));
int err = listen(sockSrv, 1024);
if (err == -1) {
cout << "listen failed" << endl;
return 0;
}
_epfd = epoll_create(RECV_EVENTS);
ev[0].data.fd = sockSrv;
ev[0].events = EPOLLIN | EPOLLET;
epoll_ctl(_epfd, EPOLL_CTL_ADD, sockSrv, &ev[0]);
//accept loop
while (true) {
events_num = epoll_wait(_epfd, _events, RECV_EVENTS, -1);
if (events_num<0)
{
continue;
}
while (true) {
int s = accept(sockSrv, 0, 0);
if (s <= 0)
{
break;
}
fcntl(s, F_SETFL, O_NONBLOCK | O_DIRECT);
Client* c = new Client;
posix_memalign((void**)&c->recvbuf, getpagesize(), BUF_RECV_LEN);
posix_memalign((void**)&c->sendbuf, getpagesize(), BUF_SEND_LEN);
c->id = id;
c->fd = s;
id += 1;
_sockMap.insert(pair<int, Client*>(s, c));
ev[0].data.fd = s;
ev[0].events = EPOLLIN | EPOLLET;
epoll_ctl(_epfd, EPOLL_CTL_ADD, s, &ev[0]);
}
_thread_unfinish = _thread_count;
for (int i = 0; i<events_num; i++)
{
_deque.push_back(&_events[i]);
}
for (int i = 0; i<_thread_count; i++)
{
write(_eventfd4recv[i], &num, sizeof(ull));
}
unique_lock<mutex> l(lock4cv2);
cv2.wait(l, _isFinish);
for (auto iter = _removeList.begin(); iter != _removeList.end(); ++iter)
{
release(*iter);
id -= 1;
}
_removeList.clear();
}
}
不得不说为这异步epoll模型还真是伤了神。。。主要问题就是在libaio的内核通知上,因为这东西实在太低级了,用起来还比较麻烦。我为了不再多开线程,实现和IOCP基本相同的原理,在PROC内部又用了次epoll,使其内核通知和用户通知都能在一个线程中得到相同处理。并且用map映射eventfd与收发类型的关系,使其能够在收到event的时候,通过eventfd能直接判断出接下来的操作。还有就是由于libaio需要将fd设置为O_DIRECT,所以无论收发都得用自己的缓冲区,不然会导致效率大大降低。这份代码有兴趣自己研究,具体的不多说了,写出来还没测试,不知道应用于实际项目中效果如何,大牛可以自己测下
yum install libaio-devel
-lpthread -std=c++11
#include <iostream>
#include<list>
#include<map>
#include<deque>
#include<mutex>
#include <atomic>
#include<condition_variable>
#include<sys/socket.h>
#include<sys/epoll.h>
#include<netinet/in.h>
#include <pthread.h>
#include<unistd.h>
#include<fcntl.h>
#include<signal.h>
#include<libaio.h>
#include<sys/eventfd.h>
#define RECV_EVENTS 1024
#define SEND_EVENTS 1024
#define PROC_EVENTS 1024
#define BUF_SEND_LEN 1024
#define BUF_RECV_LEN 1024
#define RECV_LEN 128
#define SEND_LEN 128
using namespace std;
typedef unsigned long long ull;
struct Client
{
iocb cb;
int id;
int fd;
char* recvbuf;
char* sendbuf;
};
void* CreateServ(void* args);
void* Proc(void* args);
using namespace std;
const int _thread_count = 8;
int _epfd;
int sockSrv;
int id = 0;
epoll_event _events[RECV_EVENTS] = { 0 };
epoll_event ev[_thread_count];
mutex m;
deque<epoll_event*> _deque;
map<int, Client*> _sockMap;
list<int> _removeList;
mutex lock4cv2;
condition_variable cv2;
int _thread_unfinish;
int _eventfd4send[_thread_count];
int _eventfd4recv[_thread_count];
int _proc_epfd[_thread_count];
map<int, int> _eventfd_type_map;
enum
{
RECV, SEND
};
int main()
{
pthread_t tid;
for (int i = 0; i < _thread_count; i++)
{
_proc_epfd[i] = epoll_create(PROC_EVENTS);
_eventfd4send[i] = eventfd(0, 0);
ev[0].data.fd = _eventfd4send[i];
ev[0].events = EPOLLIN | EPOLLET;
epoll_ctl(_proc_epfd[i], EPOLL_CTL_ADD, _eventfd4send[i], &ev[0]);
_eventfd_type_map.insert(pair<int, int>(_eventfd4send[i], SEND));
_eventfd4recv[i] = eventfd(0, 0);
ev[0].data.fd = _eventfd4recv[i];
ev[0].events = EPOLLIN | EPOLLET;
epoll_ctl(_proc_epfd[i], EPOLL_CTL_ADD, _eventfd4recv[i], &ev[0]);
_eventfd_type_map.insert(pair<int, int>(_eventfd4recv[i], RECV));
}
pthread_create(&tid, 0, CreateServ, 0);
for (int i = 0; i < _thread_count; i++)
{
int* temp = new int(i);
pthread_create(&tid, 0, Proc, temp);
}
cin.get();
cin.get();
return 0;
}
bool _isFinish()
{
return _thread_unfinish <= 0;
}
void release(int fd)
{
cout << "release:" << fd << endl;
ev[0].data.fd = fd;
epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, &ev[0]);
for (auto iter = _sockMap.begin(); iter != _sockMap.end(); ++iter)
{
if (iter->first == fd)
{
free(iter->second->recvbuf);
free(iter->second->sendbuf);
delete iter->second;
_sockMap.erase(iter);
break;
}
}
close(fd);
}
static void recv_callback(io_context_t ctx, struct iocb *iocb, long res, long res2)
{
Client* c = (Client*)iocb;
//cout << c->recvbuf << endl;
memset(c->recvbuf, 0, sizeof(char)* BUF_RECV_LEN);
sprintf(c->sendbuf, "hello client");
io_prep_pwrite(&c->cb, c->fd, c->sendbuf, SEND_LEN, 0);
}
void* Proc(void* args)
{
int* I = (int*)args;
int events_num = 0;
epoll_event threadEvents[PROC_EVENTS];
io_context_t ioctx;
io_event events[RECV_EVENTS];
iocb* io_submit_arr[RECV_EVENTS];
io_queue_init(RECV_EVENTS, &ioctx);
io_context_t ioctx4write;
iocb* io_submit_arr2[SEND_EVENTS];
io_queue_init(SEND_EVENTS, &ioctx4write);
while (true)
{
events_num = epoll_wait(_proc_epfd[*I], threadEvents, PROC_EVENTS, -1);
for (int i = 0; i < events_num; ++i) {
if (threadEvents[i].events == EPOLLIN)
{
int t = _eventfd_type_map[threadEvents[i].data.fd];
if (t == RECV)
{
int i = 0;
while (true)
{
m.lock();
if (_deque.size() == 0)
{
m.unlock();
break;
}
epoll_event* e = _deque.front();
_deque.pop_front();
m.unlock();
if (e->data.fd == -1)
{
continue;
}
if (e->data.fd == sockSrv)
{
}
else if (e->events == EPOLLIN)
{
Client* c = _sockMap[e->data.fd];
io_prep_pread(&c->cb, e->data.fd, c->recvbuf, RECV_LEN, 0);
io_set_eventfd(&c->cb, _eventfd4send[*I]);
io_set_callback(&c->cb, recv_callback);
io_submit_arr[i] = &c->cb;
i++;
e->events = 0;
}
else
{
Client* c = _sockMap[e->data.fd];
_removeList.push_back(c->fd);
io_cancel(ioctx4write, &c->cb, nullptr);
io_cancel(ioctx, &c->cb, nullptr);
}
}
io_submit(ioctx, i, io_submit_arr);
//此处一定要加锁。当wait还未准备好,但_isFinish刚刚检测完时_thread_unfinish -= 1;cv2.notify_all();这两句恰巧在之后执行,那就GG了,两边进入互相等待的状态
lock4cv2.lock();
_thread_unfinish -= 1;
cv2.notify_all();
lock4cv2.unlock();
}
else
{
ull num = 0;
read(threadEvents[i].data.fd, &num, sizeof(ull));
while (num > 0)
{
timespec tms;
tms.tv_sec = 0;
tms.tv_nsec = 0;
io_getevents(ioctx4write, 1, SEND_EVENTS, events, &tms);
int r = io_getevents(ioctx, 1, RECV_EVENTS, events, &tms);
if (r > 0)
{
int i = 0;
for (; i < r; ++i)
{
((io_callback_t)(events[i].data))(ioctx, events[i].obj, events[i].res, events[i].res2);
Client* c = (Client*)events[i].obj;
io_submit_arr2[i] = &c->cb;
}
io_submit(ioctx4write, i, io_submit_arr2);
}
num -= r;
}
}
}
threadEvents[i].events = 0;
}
}
}
void* CreateServ(void* args) {
ull num = 1;
int events_num = 0;
sockSrv = socket(AF_INET, SOCK_STREAM, 0);
// int nSendBufLen = 16 * 1024 * 1024;
// setsockopt(sockSrv, SOL_SOCKET, SO_SNDBUF, (const char*)&nSendBufLen, sizeof(int));
fcntl(sockSrv, F_SETFL, O_NONBLOCK);
// struct in_addr s;
// inet_pton(AF_INET, "127.0.0.1",(void*)&s);
sockaddr_in addrSrv;
addrSrv.sin_addr.s_addr = htonl(INADDR_ANY);
addrSrv.sin_family = AF_INET;
addrSrv.sin_port = htons(6001);
::bind(sockSrv, (sockaddr*)&addrSrv, sizeof(sockaddr));
int err = listen(sockSrv, 1024);
if (err == -1) {
cout << "listen failed" << endl;
return 0;
}
_epfd = epoll_create(RECV_EVENTS);
ev[0].data.fd = sockSrv;
ev[0].events = EPOLLIN | EPOLLET;
epoll_ctl(_epfd, EPOLL_CTL_ADD, sockSrv, &ev[0]);
//accept loop
while (true) {
events_num = epoll_wait(_epfd, _events, RECV_EVENTS, -1);
if (events_num<0)
{
continue;
}
while (true) {
int s = accept(sockSrv, 0, 0);
if (s <= 0)
{
break;
}
fcntl(s, F_SETFL, O_NONBLOCK | O_DIRECT);
Client* c = new Client;
posix_memalign((void**)&c->recvbuf, getpagesize(), BUF_RECV_LEN);
posix_memalign((void**)&c->sendbuf, getpagesize(), BUF_SEND_LEN);
c->id = id;
c->fd = s;
id += 1;
_sockMap.insert(pair<int, Client*>(s, c));
ev[0].data.fd = s;
ev[0].events = EPOLLIN | EPOLLET;
epoll_ctl(_epfd, EPOLL_CTL_ADD, s, &ev[0]);
}
_thread_unfinish = _thread_count;
for (int i = 0; i<events_num; i++)
{
_deque.push_back(&_events[i]);
}
for (int i = 0; i<_thread_count; i++)
{
write(_eventfd4recv[i], &num, sizeof(ull));
}
unique_lock<mutex> l(lock4cv2);
cv2.wait(l, _isFinish);
for (auto iter = _removeList.begin(); iter != _removeList.end(); ++iter)
{
release(*iter);
id -= 1;
}
_removeList.clear();
}
}
相关文章推荐
- C++ SOCKET通信模型(六)同步epoll
- linux基础编程:IO模型:阻塞/非阻塞/IO复用 同步/异步 Select/Epoll/AIO
- C++ SOCKET通信模型(三)IOApc
- Python之列表生成式 生成器 异步I/O模型epoll 递归函数 函数式编程的定义 高阶函数
- C/S通信---服务器IO多路复用模型之epoll的使用
- Linux网络通信编程(套接字模型TCP\UDP与IO多路复用模型select\poll\epoll)
- linux基础编程:IO模型:阻塞/非阻塞/IO复用 同步/异步 Select/Epoll/AIO
- socket编程 -- epoll模型服务端/客户端通信的实现
- C++ SOCKET通信模型(二)IOEvent
- Linux下EPoll通信模型简析
- 高效通信模型之 - 异步通信模型
- socket编程 -- epoll模型服务端/客户端通信的实现
- Linux网络通信编程(套接字模型TCP\UDP与IO多路复用模型select\poll\epoll)
- Linux下EPoll通信模型简析
- socket编程 -- epoll模型服务端/客户端通信的实现
- (转)同步异步/阻塞非阻塞 和 5种linux网络通信模型
- socket阻塞与非阻塞,同步与异步、I/O模型,select与poll、epoll比较
- Linux下EPoll通信模型简析
- linux基础编程:IO模型:阻塞/非阻塞/IO复用 同步/异步 Select/Epoll/AIO
- linux基础编程:IO模型:阻塞/非阻塞/IO复用 同步/异步 Select/Epoll/AIO