IO多路复用 --select、poll、epoll模式
2016-09-27 11:27
756 查看
一:select模式
头文件:#include <stdio.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <assert.h> #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h> #include <sys/select.h> #define MAX_LINE 1024 #define MAX_SIZE 10 #define IP_ADDR "127.0.0.1" #define PORT 8787 #define MAX_LISTEN 5 #define _DEBUG_
服务器端:
#include "utili.h" typedef struct vector_fd{ int maxfd; int cli_count; int cli_fds[MAX_SIZE]; fd_set allfds; } vector_fd; static vector_fd* vector_fd_ptr = NULL; static void init_vector_fd() { vector_fd_ptr = (vector_fd *)malloc(sizeof(vector_fd)); assert(vector_fd_ptr != NULL); memset(vector_fd_ptr, 0, sizeof(vector_fd)); for(int i=0; i<MAX_SIZE; ++i) vector_fd_ptr->cli_fds[i] = -1; } static void destroy_vector_fd() { if(vector_fd_ptr != NULL){ free(vector_fd_ptr); vector_fd_ptr = NULL; } } static int indexof(const int connfd) { for(int i=0; i<vector_fd_ptr->cli_count; ++i){ if(vector_fd_ptr->cli_fds[i] == connfd) return i; } } static void remove_vector_fd(const int connfd) { #ifdef _DEBUG_ printf("client : %d got died.\n", indexof(connfd)); #endif close(connfd); vector_fd_ptr->cli_fds[indexof(connfd)] = -1; } static int create_server_proc(const char *ip, const int port) { assert(ip != NULL); int listenfd; listenfd = socket(AF_INET, SOCK_STREAM, 0); assert(listenfd != -1); struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(port); servaddr.sin_addr.s_addr = inet_addr(ip); int on = 1; setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); int ret; ret = bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)); assert(ret != -1); ret = listen(listenfd, MAX_LISTEN); assert(ret != -1); return listenfd; } static void accept_event_handler(const int listenfd) { int connfd; if( (connfd = accept(listenfd, NULL, NULL)) < 0) ERR_EXIT("accept error."); int i; for(i=0; i<MAX_SIZE && vector_fd_ptr->cli_fds[i]!=-1; ++i); if(i != MAX_SIZE){ #ifdef _DEBUG_ printf("client : %d connected.\n", i); #endif vector_fd_ptr->cli_fds[i] = connfd; ++vector_fd_ptr->cli_count; //notice, when client connected, do ++cli_count } else printf("too many client.\n"); } static void communication_detail(const int connfd) { char buffer[MAX_LINE]; int ret; ret = recv(connfd, buffer, sizeof(buffer), 0); assert(ret != -1); if(ret == 0){ #ifdef _DEBUG_ printf("client : %d close.\n", indexof(connfd)); #endif remove_vector_fd(connfd); return; } fprintf(stdout, "%s", buffer); ret = send(connfd, buffer, strlen(buffer)+1, 0); assert(ret != -1); } static void connected_event_handler(fd_set *readfds) { int clifd; for(int i=0; i<vector_fd_ptr->cli_count; ++i){ clifd = vector_fd_ptr->cli_fds[i]; if(clifd < 0) continue; if(FD_ISSET(clifd, readfds)){ communication_detail(clifd); break; } } } static void manage_client_proc(const int listenfd) { fd_set* readfds = &vector_fd_ptr->allfds; vector_fd_ptr->maxfd = listenfd; for(; ;){ FD_ZERO(readfds); FD_SET(listenfd, readfds); for(int i=0; i<vector_fd_ptr->cli_count; ++i){ int clifd = vector_fd_ptr->cli_fds[i]; FD_SET(clifd, readfds); vector_fd_ptr->maxfd = clifd > vector_fd_ptr->maxfd ? clifd : vector_fd_ptr->maxfd; } int ret; struct timeval tv = {5, 0}; if( (ret = select(vector_fd_ptr->maxfd+1, readfds, NULL, NULL, &tv)) < 0) ERR_EXIT("select error."); else if(ret == 0){ //when tv is set to {0, 0}, always timeout printf("time out and reselect.\n"); continue; } else{ if(FD_ISSET(listenfd, readfds)){ #ifdef _DEBUG_ printf("the client connecting.\n"); #endif accept_event_handler(listenfd); } else{ connected_event_handler(readfds); } } } } int main() { int listenfd = create_server_proc(IP_ADDR, PORT); init_vector_fd(); manage_client_proc(listenfd); destroy_vector_fd(); return 0; }
客户端:
#include "utili.h" static int create_socket_proc() { int sockfd; sockfd = socket(AF_INET, SOCK_STREAM, 0); assert(sockfd != -1); return sockfd; } static int connect_server_proc(int sockfd, const char* ip, const int port) { assert(ip != NULL && port > 0); struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(port); servaddr.sin_addr.s_addr = inet_addr(ip); int ret; ret = connect(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr)); assert(ret != -1); return 0; } static int connected_event_handler(int sockfd) { fd_set new_set; char buffer[MAX_LINE]; fgets(buffer, sizeof(buffer), stdin); int ret = send(sockfd, buffer, strlen(buffer)+1, 0); assert(ret != -1); int maxfd = sockfd; for(; ;){ FD_ZERO(&new_set); FD_SET(sockfd, &new_set); ret = select(maxfd+1, &new_set, NULL, NULL, NULL); assert(ret != -1); if(FD_ISSET(sockfd, &new_set)){ ret = recv(sockfd, buffer, sizeof(buffer), 0); assert(ret != -1); fputs(buffer, stdout); fgets(buffer, sizeof(buffer), stdin); ret = send(sockfd, buffer, strlen(buffer)+1, 0); assert(ret != -1); } } } int main() { int sockfd = create_socket_proc(); connect_server_proc(sockfd, IP_ADDR, PORT); connected_event_handler(sockfd); return 0; }
二:poll模式
头文件:#include <unistd.h> #include <stdio.h> #include <string.h> #include <stdlib.h> #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h> #include <assert.h> #include <poll.h> #define IP_ADDR "127.0.0.1" #define PORT 8787 #define CLIENT_SIZE 10 #define LISTEN_SIZE 5 #define BUFFER_SIZE 256 #define _DEBUG_
服务器端:
#include "utili.h" static int create_socket_proc(const char *ip, const int port) { int listenfd = socket(AF_INET, SOCK_STREAM, 0); assert(listenfd != 0); struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(port); servaddr.sin_addr.s_addr = inet_addr(ip); int on = 1; int ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); assert(ret != -1); ret = bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)); assert(ret != -1); ret = listen(listenfd, LISTEN_SIZE); assert(ret != -1); return listenfd; } static void accept_client_proc(struct pollfd (*clifds)[CLIENT_SIZE], const int listenfd, int* maxi) { int connfd = accept(listenfd, NULL, NULL); assert(connfd != -1); int i; for(i=1; i<CLIENT_SIZE && (*clifds)[i].fd!=-1; ++i); if(i != CLIENT_SIZE){ (*clifds)[i].fd = connfd; (*clifds)[i].events = POLLIN; #ifdef _DEBUG_ printf("%d has connected.\n", i); #endif } else printf("too many clients.\n"); if(i > *maxi) *maxi = i; } static void remove_client_fd(struct pollfd *clifds, const int index) { close(clifds[index].fd); clifds[index].fd = -1; } static void connected_client_proc(struct pollfd *clifds, const int maxi) { char buffer[BUFFER_SIZE]; for(int i=1; i<=maxi; ++i){ //maxi is the max index, not the max quantity if(clifds[i].fd < 0) continue; int sockfd = clifds[i].fd; if(clifds[i].revents & POLLIN){ #ifdef _DEBUG_ printf("it is from %dth message.\n", i); #endif int recv_sz = recv(sockfd, buffer, sizeof(buffer), 0); assert(recv_sz != -1); if(recv_sz == 0){ remove_client_fd(clifds, i); break; } fputs(buffer, stdout); int ret = send(sockfd, buffer, strlen(buffer)+1, 0); assert(ret != -1); break; } } } static void do_poll(int listenfd) { struct pollfd clifds[CLIENT_SIZE]; clifds[0].fd = listenfd; clifds[0].events = POLLIN; for(int i=1; i<CLIENT_SIZE; ++i) clifds[i].fd = -1; int maxi = 0; for(; ;){ #ifdef _DEBUG_ printf("before poll function.\n"); #endif int ret = poll(clifds, maxi+1, -1); assert(ret != -1); #ifdef _DEBUG_ printf("after poll function.\n"); #endif if(clifds[0].revents & POLLIN){ accept_client_proc(&clifds, listenfd, &maxi); } connected_client_proc(clifds, maxi); } } int main() { int listenfd = create_socket_proc(IP_ADDR, PORT); do_poll(listenfd); return 0; }
客户端:
#include "utili.h" static int create_socket_proc() { int sockfd; sockfd = socket(AF_INET, SOCK_STREAM, 0); assert(sockfd != -1); return sockfd; } static int connect_server_proc(int sockfd, const char* ip, const int port) { assert(ip != NULL && port > 0); struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(port); servaddr.sin_addr.s_addr = inet_addr(ip); int ret; ret = connect(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr)); assert(ret != -1); return 0; } static void connected_event_handler(const int sockfd) { struct pollfd fds[2]; fds[0].fd = sockfd; fds[0].events = POLLIN; fds[1].fd = STDIN_FILENO; fds[1].events = POLLIN; char buffer[BUFFER_SIZE]; for(; ;){ int ret = poll(fds, 2, -1); //2 for sockfd or STDIN_FILENO assert(ret != -1); if(fds[0].revents & POLLIN){ int recv_sz = recv(sockfd, buffer, sizeof(buffer), 0); assert(recv_sz != -1); if(recv_sz == 0){ close(sockfd); return; } fputs(buffer, stdout); } else if(fds[1].revents & POLLIN){ int ret; ret = read(STDIN_FILENO, buffer, sizeof(buffer)); assert(ret != -1); buffer[ret] = '\0'; ret = send(sockfd, buffer, strlen(buffer)+1, 0); assert(ret != -1); } } } int main() { int sockfd = create_socket_proc(); connect_server_proc(sockfd, IP_ADDR, PORT); connected_event_handler(sockfd); return 0; }
三:epoll模式
头文件:#include<unistd.h> #include<stdio.h> #include<string.h> #include<stdlib.h> #include<sys/socket.h> #include<arpa/inet.h> #include<netinet/in.h> #include<assert.h> #include<sys/epoll.h> #define IP_ADDR "127.0.0.1" #define PORT 8787 #define LISTEN_SIZE 5 #define BUFFER_SIZE 1024 #define EPOLL_EVENTS 100 #define FD_SIZE 1000
服务器端:
#include "utili.h" static int create_listen_proc(const char* ip, const int port); static void do_epoll(const int listenfd); static void events_handler(const int epollfd, struct epoll_event *events, const int num, const int listenfd, char* buffer); static void do_accept(const int epollfd, const int listenfd); static void do_read(const int epollfd, const int fd, char* buffer); static void do_write(const int epollfd, const int fd, char *buffer); static void add_event(const int epollfd, const int fd, const int state); static void delete_event(const int epollfd, const int fd, const int state); static void modify_event(const int epollfd, const int fd, const int state); int main() { int listenfd = create_listen_proc(IP_ADDR, PORT); do_epoll(listenfd); return 0; } static void add_event(const int epollfd, const int fd, const int state) { struct epoll_event ev; ev.events = state; ev.data.fd = fd; epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev); } static void delete_event(const int epollfd, const int fd, const int state) { struct epoll_event ev; ev.events = state; ev.data.fd = fd; epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev); } static void modify_event(const int epollfd, const int fd, const int state) { struct epoll_event ev; ev.events = state; ev.data.fd = fd; epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev); } static void do_accept(const int epollfd, const int listenfd) { int connfd = accept(listenfd, NULL, NULL); assert(connfd != -1); add_event(epollfd, connfd, EPOLLIN); //add connfd to epollfd } static void do_read(const int epollfd, const int fd, char* buffer) { int read_sz = read(fd, buffer, BUFFER_SIZE); assert(read_sz != -1); buffer[read_sz] = '\0'; if(read_sz == 0){ close(fd); delete_event(epollfd, fd, EPOLLIN); } else{ fputs(buffer, stdout); modify_event(epollfd, fd, EPOLLOUT); } } static void do_write(const int epollfd, const int fd, char *buffer) { int ret = write(fd, buffer, strlen(buffer)); assert(ret != -1); modify_event(epollfd, fd, EPOLLIN); } static void events_handler(const int epollfd, struct epoll_event *events, const int num, const int listenfd, char* buffer) { for(int i=0; i<num; ++i){ int fd = events[i].data.fd; if((fd == listenfd) && (events[i].events & EPOLLIN)){ #ifdef _DEBUG_ printf("accept client.\n"); #endif do_accept(epollfd, listenfd); } else if(events[i].events & EPOLLIN){ #ifdef _DEBUG_ printf("read from client.\n"); #endif do_read(epollfd, fd, buffer); } else if(events[i].events & EPOLLOUT){ #ifdef _DEBUG_ printf("write to client.\n"); #endif do_write(epollfd, fd, buffer); } } } static void do_epoll(const int listenfd) { int epollfd; struct epoll_event events[EPOLL_EVENTS]; char buffer[BUFFER_SIZE]; memset(buffer, 0, sizeof(buffer)); epollfd = epoll_create(FD_SIZE); //don't leave add_event(epollfd, listenfd, EPOLLIN); for(; ;){ #ifdef _DEBUG_ printf("before epoll\n"); #endif int ret = epoll_wait(epollfd, events, EPOLL_EVENTS, -1); #ifdef _DEBUG_ printf("after epoll\n"); #endif events_handler(epollfd, events, ret, listenfd, buffer); } } static int create_listen_proc(const char* ip, const int port) { int listenfd = socket(AF_INET, SOCK_STREAM, 0); assert(listenfd != -1); struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(port); servaddr.sin_addr.s_addr = inet_addr(ip); int on = 1; setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); int ret; ret = bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)); assert(ret != -1); ret = listen(listenfd, LISTEN_SIZE); assert(ret != -1); return listenfd; }
客户端:
#include "utili.h" static int create_connection_proc(const char* ip, const int port); static void connection_handler(const int sockfd); static void do_read(const int epollfd, const int fd, const int sockfd, char* buffer); static void do_write(const int epollfd, const int fd, char* buffer); static void add_event(const int epollfd, const int fd, const int state); static void delete_event(const int epollfd, const int fd, const int state); static void modify_event(const int epollfd, const int fd, const int state); int main() { int sockfd = create_connection_proc(IP_ADDR, PORT); connection_handler(sockfd); return 0; } static void add_event(const int epollfd, const int fd, const int state) { struct epoll_event ev; ev.events = state; ev.data.fd = fd; epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev); } static void delete_event(const int epollfd, const int fd, const int state) { struct epoll_event ev; ev.events = state; ev.data.fd = fd; epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev); } static void modify_event(const int epollfd, const int fd, const int state) { struct epoll_event ev; ev.events = state; ev.data.fd = fd; epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev); } static void do_read(const int epollfd, const int fd, const int sockfd, char* buffer) { int read_sz = read(fd, buffer, BUFFER_SIZE); assert(read_sz != -1); buffer[read_sz] = '\0'; if(read_sz == 0){ close(fd); delete_event(epollfd, fd, EPOLLIN); } else{ if(fd == STDIN_FILENO) add_event(epollfd, sockfd, EPOLLOUT); else{ fputs(buffer, stdout); delete_event(epollfd, fd, EPOLLIN); //fd = sockfd add_event(epollfd, STDIN_FILENO, EPOLLOUT); } } } static void do_write(const int epollfd, const int fd, char* buffer) { int ret = write(fd, buffer, strlen(buffer)); assert(ret != -1); if(fd == STDIN_FILENO) delete_event(epollfd, STDIN_FILENO, EPOLLOUT); else modify_event(epollfd, fd, EPOLLIN); //fd = sockfd } static void events_handler(const int epollfd, struct epoll_event *events, const int num, const int sockfd, char* buffer) { for(int i=0; i<num; ++i){ int fd = events[i].data.fd; if(events[i].events & EPOLLIN){ #ifdef _DEBUG_ printf("read\n"); #endif do_read(epollfd, fd, sockfd, buffer); } else if(events[i].events & EPOLLOUT){ #ifdef _DEBUG_ printf("write\n"); #endif do_write(epollfd, fd, buffer); } } } static void connection_handler(const int sockfd) { int epollfd; struct epoll_event events[EPOLL_EVENTS]; char buffer[BUFFER_SIZE]; memset(buffer, 0, sizeof(buffer)); epollfd = epoll_create(FD_SIZE); add_event(epollfd, STDIN_FILENO, EPOLLIN); for(; ;){ #ifdef _DEBUG_ printf("before epoll\n"); #endif int ret = epoll_wait(epollfd, events, EPOLL_EVENTS, -1); #ifdef _DEBUG_ printf("after epoll\n"); #endif events_handler(epollfd, events, ret, sockfd, buffer); } } static int create_connection_proc(const char* ip, const int port) { int sockfd = socket(AF_INET, SOCK_STREAM, 0); assert(sockfd != -1); struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(port); servaddr.sin_addr.s_addr = inet_addr(ip); int ret = connect(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr)); assert(ret != -1); return sockfd; }
*以上代码均经过测试。
ET模式:
因为ET模式只有从unavailable到available才会触发,所以
1、读事件:需要使用while循环读取完,一般是读到EAGAIN,也可以读到返回值小于缓冲区大小;
如果应用层读缓冲区满:那就需要应用层自行标记,解决OS不再通知可读的问题
2、写事件:需要使用while循环写到EAGAIN,也可以写到返回值小于缓冲区大小
如果应用层写缓冲区空(无内容可写):那就需要应用层自行标记,解决OS不再通知可写的问题。
LT模式:
因为LT模式只要available就会触发,所以:
1、读事件:因为一般应用层的逻辑是“来了就能读”,所以一般没有问题,无需while循环读取到EAGAIN;
如果应用层读缓冲区满:就会经常触发,解决方式如下;
2、写事件:如果没有内容要写,就会经常触发,解决方式如下。
LT经常触发读写事件的解决办法:修改fd的注册事件,或者把fd移出epollfd。
总结:
目前好像还是LT方式应用较多,包括redis、libuv等。(nginx使用ET)
LT模式的优点在于:事件循环处理比较简单,无需关注应用层是否有缓冲或缓冲区是否满,只管上报事件。缺点是:可能经常上报,可能影响性能。
ET本身并不会造成饥饿,由于事件只通知一次,开发者一不小心就容易遗漏了待处理的数据,像是饥饿,实质是bug
使用ET模式,特定场景下会比LT更快,因为它可以便捷的处理EPOLLOUT事件,省去打开与关闭EPOLLOUT的epoll_ctl(EPOLL_CTL_MOD)调用。从而有可能让你的性能得到一定的提升。
例如你需要写出1M的数据,写出到socket 256k时,返回了EAGAIN,ET模式下,当再次epoll返回EPOLLOUT事件时,继续写出待写出的数据,当没有数据需要写出时,不处理直接略过即可。而LT模式则需要先打开EPOLLOUT,当没有数据需要写出时,再关闭EPOLLOUT(否则会一直返回EPOLLOUT事件)
总体来说,ET处理EPOLLOUT方便高效些,LT不容易遗漏事件、不易产生bug
如果server的响应通常较小,不会触发EPOLLOUT,那么适合使用LT,例如redis等。而nginx作为高性能的通用服务器,网络流量可以跑满达到1G,这种情况下很容易触发EPOLLOUT,则使用ET。
关于某些场景下ET模式比LT模式效率更好。
更新:
(1)ET模式由于需要循环读取直到EAGAIN或EWOULDBLOCK,所以文件描述符connfd必须是非阻塞的。否则可能读到缓冲区为空阻塞住了,就无法处理后续其他事件。比如,其他用户连接。因为你还一直阻塞在当前connfd的读取,相当于本线程全部阻塞在了这里。epoll不起作用了。
(2)由于LT会多次触发,所以不需while循环。而ET只触发一次,所以需要。
相关文章推荐
- IO 多路复用之 select、poll、epoll 详解
- Linux下多路复用IO接口 epoll select poll 的区别
- [转载] Linux下多路复用IO接口 epoll select poll 的区别
- IO多路复用之select、poll、epoll详解
- 服务器基础:IO多路复用之select、poll、epoll详解
- 三种多路复用IO实现方式:select,poll,epoll的区别
- Linux下多路复用IO接口epoll/select/poll的区别
- Linux网络通信编程(套接字模型TCP\UDP与IO多路复用模型select\poll\epoll)
- 基于表格形式的select,poll,epoll对比-IO多路复用函数的应用场景
- 聊聊IO多路复用之select、poll、epoll详解
- IO多路复用之select、poll、epoll详解
- Linux下多路复用IO接口 epoll select poll 的区别
- IO复用模式--select、poll、epoll详解
- 三种多路复用IO实现方式:select,poll,epoll的区别
- io多路复用之select,poll,epoll总结
- IO多路复用之select、poll以及epoll
- IO多路复用--select、 poll、 epoll的区别
- python网络编程——IO多路复用总结(select/poll/epoll)
- [nginx] Linux下多路复用IO接口 epoll select poll 的区别
- IO多路复用select,poll,epoll的区别