您的位置:首页 > 理论基础 > 计算机网络

IO多路复用 --select、poll、epoll模式

2016-09-27 11:27 756 查看

一:select模式

头文件:
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <assert.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/select.h>

#define MAX_LINE   1024
#define MAX_SIZE   10
#define IP_ADDR    "127.0.0.1"
#define PORT       8787
#define MAX_LISTEN 5

#define _DEBUG_

服务器端:
#include "utili.h"

typedef struct vector_fd{
int    maxfd;
int    cli_count;
int    cli_fds[MAX_SIZE];
fd_set allfds;
} vector_fd;

static vector_fd* vector_fd_ptr = NULL;

static void init_vector_fd()
{
vector_fd_ptr = (vector_fd *)malloc(sizeof(vector_fd));
assert(vector_fd_ptr != NULL);
memset(vector_fd_ptr, 0, sizeof(vector_fd));
for(int i=0; i<MAX_SIZE; ++i)
vector_fd_ptr->cli_fds[i] = -1;
}

static void destroy_vector_fd()
{
if(vector_fd_ptr != NULL){
free(vector_fd_ptr);
vector_fd_ptr = NULL;
}
}

static int indexof(const int connfd)
{
for(int i=0; i<vector_fd_ptr->cli_count; ++i){
if(vector_fd_ptr->cli_fds[i] == connfd)
return i;
}
}

static void remove_vector_fd(const int connfd)
{
#ifdef _DEBUG_
printf("client : %d got died.\n", indexof(connfd));
#endif
close(connfd);
vector_fd_ptr->cli_fds[indexof(connfd)] = -1;
}

static int create_server_proc(const char *ip, const int port)
{
assert(ip != NULL);

int listenfd;
listenfd = socket(AF_INET, SOCK_STREAM, 0);
assert(listenfd != -1);

struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(port);
servaddr.sin_addr.s_addr = inet_addr(ip);

int on = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));

int ret;
ret = bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr));
assert(ret != -1);

ret = listen(listenfd, MAX_LISTEN);
assert(ret != -1);

return listenfd;
}

static void accept_event_handler(const int listenfd)
{
int connfd;
if( (connfd = accept(listenfd, NULL, NULL)) < 0)
ERR_EXIT("accept error.");
int i;
for(i=0; i<MAX_SIZE && vector_fd_ptr->cli_fds[i]!=-1; ++i);
if(i != MAX_SIZE){
#ifdef _DEBUG_
printf("client : %d connected.\n", i);
#endif
vector_fd_ptr->cli_fds[i] = connfd;
++vector_fd_ptr->cli_count;   //notice, when client connected, do ++cli_count
}
else
printf("too many client.\n");
}

static void communication_detail(const int connfd)
{
char buffer[MAX_LINE];
int ret;

ret = recv(connfd, buffer, sizeof(buffer), 0);
assert(ret != -1);

if(ret == 0){
#ifdef _DEBUG_
printf("client : %d close.\n", indexof(connfd));
#endif
remove_vector_fd(connfd);
return;
}

fprintf(stdout, "%s", buffer);
ret = send(connfd, buffer, strlen(buffer)+1, 0);
assert(ret != -1);
}

static void connected_event_handler(fd_set *readfds)
{
int clifd;
for(int i=0; i<vector_fd_ptr->cli_count; ++i){
clifd = vector_fd_ptr->cli_fds[i];
if(clifd < 0)
continue;

if(FD_ISSET(clifd, readfds)){
communication_detail(clifd);
break;
}
}
}

static void manage_client_proc(const int listenfd)
{
fd_set* readfds = &vector_fd_ptr->allfds;
vector_fd_ptr->maxfd = listenfd;

for(; ;){
FD_ZERO(readfds);
FD_SET(listenfd, readfds);
for(int i=0; i<vector_fd_ptr->cli_count; ++i){
int clifd = vector_fd_ptr->cli_fds[i];
FD_SET(clifd, readfds);
vector_fd_ptr->maxfd = clifd > vector_fd_ptr->maxfd ?
clifd : vector_fd_ptr->maxfd;
}

int ret;
struct timeval tv = {5, 0};
if( (ret = select(vector_fd_ptr->maxfd+1, readfds, NULL, NULL, &tv)) < 0)
ERR_EXIT("select error.");
else if(ret == 0){          //when tv is set to {0, 0}, always timeout
printf("time out and reselect.\n");
continue;
}
else{
if(FD_ISSET(listenfd, readfds)){
#ifdef _DEBUG_
printf("the client connecting.\n");
#endif
accept_event_handler(listenfd);
}
else{
connected_event_handler(readfds);
}
}
}
}

int main()
{
int listenfd = create_server_proc(IP_ADDR, PORT);
init_vector_fd();
manage_client_proc(listenfd);
destroy_vector_fd();
return 0;
}

客户端:
#include "utili.h"

static int create_socket_proc()
{
int sockfd;
sockfd = socket(AF_INET, SOCK_STREAM, 0);
assert(sockfd != -1);

return sockfd;
}

static int connect_server_proc(int sockfd, const char* ip, const int port)
{
assert(ip != NULL && port > 0);

struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(port);
servaddr.sin_addr.s_addr = inet_addr(ip);

int ret;
ret = connect(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr));
assert(ret != -1);

return 0;
}

static int connected_event_handler(int sockfd)
{
fd_set new_set;
char buffer[MAX_LINE];

fgets(buffer, sizeof(buffer), stdin);
int ret = send(sockfd, buffer, strlen(buffer)+1, 0);
assert(ret != -1);

int maxfd = sockfd;
for(; ;){
FD_ZERO(&new_set);
FD_SET(sockfd, &new_set);

ret = select(maxfd+1, &new_set, NULL, NULL, NULL);
assert(ret != -1);

if(FD_ISSET(sockfd, &new_set)){
ret = recv(sockfd, buffer, sizeof(buffer), 0);
assert(ret != -1);

fputs(buffer, stdout);
fgets(buffer, sizeof(buffer), stdin);

ret = send(sockfd, buffer, strlen(buffer)+1, 0);
assert(ret != -1);
}
}
}

int main()
{
int sockfd = create_socket_proc();
connect_server_proc(sockfd, IP_ADDR, PORT);
connected_event_handler(sockfd);
return 0;
}

二:poll模式

头文件:
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <assert.h>
#include <poll.h>

#define IP_ADDR      "127.0.0.1"
#define PORT         8787
#define CLIENT_SIZE  10
#define LISTEN_SIZE  5
#define BUFFER_SIZE  256

#define _DEBUG_

服务器端:
#include "utili.h"

static int create_socket_proc(const char *ip, const int port)
{
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
assert(listenfd != 0);

struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(port);
servaddr.sin_addr.s_addr = inet_addr(ip);

int on = 1;
int ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
assert(ret != -1);

ret = bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr));
assert(ret != -1);

ret = listen(listenfd, LISTEN_SIZE);
assert(ret != -1);

return listenfd;
}

static void accept_client_proc(struct pollfd (*clifds)[CLIENT_SIZE], const int listenfd, int* maxi)
{
int connfd = accept(listenfd, NULL, NULL);
assert(connfd != -1);

int i;
for(i=1; i<CLIENT_SIZE && (*clifds)[i].fd!=-1; ++i);
if(i != CLIENT_SIZE){
(*clifds)[i].fd = connfd;
(*clifds)[i].events = POLLIN;
#ifdef _DEBUG_
printf("%d has connected.\n", i);
#endif
}
else
printf("too many clients.\n");

if(i > *maxi)
*maxi = i;
}

static void remove_client_fd(struct pollfd *clifds, const int index)
{
close(clifds[index].fd);
clifds[index].fd = -1;
}

static void connected_client_proc(struct pollfd *clifds, const int maxi)
{
char buffer[BUFFER_SIZE];
for(int i=1; i<=maxi; ++i){        //maxi is the max index, not the max quantity
if(clifds[i].fd < 0)
continue;

int sockfd = clifds[i].fd;
if(clifds[i].revents & POLLIN){
#ifdef _DEBUG_
printf("it is from %dth message.\n", i);
#endif
int recv_sz = recv(sockfd, buffer, sizeof(buffer), 0);
assert(recv_sz != -1);

if(recv_sz == 0){
remove_client_fd(clifds, i);
break;
}

fputs(buffer, stdout);
int ret = send(sockfd, buffer, strlen(buffer)+1, 0);
assert(ret != -1);

break;
}
}
}

static void do_poll(int listenfd)
{
struct pollfd clifds[CLIENT_SIZE];
clifds[0].fd = listenfd;
clifds[0].events = POLLIN;
for(int i=1; i<CLIENT_SIZE; ++i)
clifds[i].fd = -1;

int maxi = 0;
for(; ;){
#ifdef _DEBUG_
printf("before poll function.\n");
#endif
int ret = poll(clifds, maxi+1, -1);
assert(ret != -1);
#ifdef _DEBUG_
printf("after poll function.\n");
#endif
if(clifds[0].revents & POLLIN){
accept_client_proc(&clifds, listenfd, &maxi);
}

connected_client_proc(clifds, maxi);
}
}

int main()
{
int listenfd = create_socket_proc(IP_ADDR, PORT);
do_poll(listenfd);
return 0;
}

客户端:
#include "utili.h"

static int create_socket_proc()
{
int sockfd;
sockfd = socket(AF_INET, SOCK_STREAM, 0);
assert(sockfd != -1);

return sockfd;
}

static int connect_server_proc(int sockfd, const char* ip, const int port)
{
assert(ip != NULL && port > 0);

struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(port);
servaddr.sin_addr.s_addr = inet_addr(ip);

int ret;
ret = connect(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr));
assert(ret != -1);

return 0;
}

static void connected_event_handler(const int sockfd)
{
struct pollfd fds[2];
fds[0].fd = sockfd;
fds[0].events = POLLIN;
fds[1].fd = STDIN_FILENO;
fds[1].events = POLLIN;

char buffer[BUFFER_SIZE];
for(; ;){
int ret = poll(fds, 2, -1);   //2 for sockfd or STDIN_FILENO
assert(ret != -1);

if(fds[0].revents & POLLIN){
int recv_sz = recv(sockfd, buffer, sizeof(buffer), 0);
assert(recv_sz != -1);

if(recv_sz == 0){
close(sockfd);
return;
}
fputs(buffer, stdout);
}
else if(fds[1].revents & POLLIN){
int ret;
ret = read(STDIN_FILENO, buffer, sizeof(buffer));
assert(ret != -1);
buffer[ret] = '\0';

ret = send(sockfd, buffer, strlen(buffer)+1, 0);
assert(ret != -1);
}
}

}

int main()
{
int sockfd = create_socket_proc();
connect_server_proc(sockfd, IP_ADDR, PORT);
connected_event_handler(sockfd);
return 0;
}

三:epoll模式

头文件:
#include<unistd.h>
#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<assert.h>
#include<sys/epoll.h>

#define IP_ADDR      "127.0.0.1"
#define PORT         8787
#define LISTEN_SIZE  5
#define BUFFER_SIZE  1024
#define EPOLL_EVENTS 100
#define FD_SIZE      1000

服务器端:
#include "utili.h"

static int  create_listen_proc(const char* ip, const int port);
static void do_epoll(const int listenfd);
static void events_handler(const int epollfd, struct epoll_event *events,
const int num, const int listenfd, char* buffer);
static void do_accept(const int epollfd, const int listenfd);
static void do_read(const int epollfd, const int fd, char* buffer);
static void do_write(const int epollfd, const int fd, char *buffer);
static void add_event(const int epollfd, const int fd, const int state);
static void delete_event(const int epollfd, const int fd, const int state);
static void modify_event(const int epollfd, const int fd, const int state);

int main()
{
int listenfd = create_listen_proc(IP_ADDR, PORT);
do_epoll(listenfd);
return 0;
}

static void add_event(const int epollfd, const int fd, const int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
}
static void delete_event(const int epollfd, const int fd, const int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev);
}
static void modify_event(const int epollfd, const int fd, const int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev);
}

static void do_accept(const int epollfd, const int listenfd)
{
int connfd = accept(listenfd, NULL, NULL);
assert(connfd != -1);
add_event(epollfd, connfd, EPOLLIN);    //add connfd to epollfd
}

static void do_read(const int epollfd, const int fd, char* buffer)
{
int read_sz = read(fd, buffer, BUFFER_SIZE);
assert(read_sz != -1);
buffer[read_sz] = '\0';

if(read_sz == 0){
close(fd);
delete_event(epollfd, fd, EPOLLIN);
}
else{
fputs(buffer, stdout);
modify_event(epollfd, fd, EPOLLOUT);
}
}

static void do_write(const int epollfd, const int fd, char *buffer)
{
int ret = write(fd, buffer, strlen(buffer));
assert(ret != -1);

modify_event(epollfd, fd, EPOLLIN);
}

static void events_handler(const int epollfd, struct epoll_event *events,
const int num, const int listenfd, char* buffer)
{
for(int i=0; i<num; ++i){
int fd = events[i].data.fd;
if((fd == listenfd) && (events[i].events & EPOLLIN)){
#ifdef _DEBUG_
printf("accept client.\n");
#endif
do_accept(epollfd, listenfd);
}
else if(events[i].events & EPOLLIN){
#ifdef _DEBUG_
printf("read from client.\n");
#endif
do_read(epollfd, fd, buffer);
}
else if(events[i].events & EPOLLOUT){
#ifdef _DEBUG_
printf("write to client.\n");
#endif
do_write(epollfd, fd, buffer);
}
}
}

static void do_epoll(const int listenfd)
{
int epollfd;
struct epoll_event events[EPOLL_EVENTS];
char buffer[BUFFER_SIZE];

memset(buffer, 0, sizeof(buffer));
epollfd = epoll_create(FD_SIZE);        //don't leave
add_event(epollfd, listenfd, EPOLLIN);
for(; ;){
#ifdef _DEBUG_
printf("before epoll\n");
#endif
int ret = epoll_wait(epollfd, events, EPOLL_EVENTS, -1);
#ifdef _DEBUG_
printf("after epoll\n");
#endif
events_handler(epollfd, events, ret, listenfd, buffer);
}
}

static int create_listen_proc(const char* ip, const int port)
{
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
assert(listenfd != -1);

struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(port);
servaddr.sin_addr.s_addr = inet_addr(ip);

int on = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));

int ret;
ret = bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr));
assert(ret != -1);

ret = listen(listenfd, LISTEN_SIZE);
assert(ret != -1);

return listenfd;
}

客户端:
#include "utili.h"

static int create_connection_proc(const char* ip, const int port);
static void connection_handler(const int sockfd);
static void do_read(const int epollfd, const int fd, const int sockfd, char* buffer);
static void do_write(const int epollfd, const int fd, char* buffer);
static void add_event(const int epollfd, const int fd, const int state);
static void delete_event(const int epollfd, const int fd, const int state);
static void modify_event(const int epollfd, const int fd, const int state);

int main()
{
int sockfd = create_connection_proc(IP_ADDR, PORT);
connection_handler(sockfd);
return 0;
}

static void add_event(const int epollfd, const int fd, const int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
}

static void delete_event(const int epollfd, const int fd, const int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev);
}

static void modify_event(const int epollfd, const int fd, const int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev);
}

static void do_read(const int epollfd, const int fd, const int sockfd, char* buffer)
{
int read_sz = read(fd, buffer, BUFFER_SIZE);
assert(read_sz != -1);
buffer[read_sz] = '\0';

if(read_sz == 0){
close(fd);
delete_event(epollfd, fd, EPOLLIN);
}
else{
if(fd == STDIN_FILENO)
add_event(epollfd, sockfd, EPOLLOUT);
else{
fputs(buffer, stdout);
delete_event(epollfd, fd, EPOLLIN);    //fd = sockfd
add_event(epollfd, STDIN_FILENO, EPOLLOUT);
}
}
}

static void do_write(const int epollfd, const int fd, char* buffer)
{
int ret = write(fd, buffer, strlen(buffer));
assert(ret != -1);

if(fd == STDIN_FILENO)
delete_event(epollfd, STDIN_FILENO, EPOLLOUT);
else
modify_event(epollfd, fd, EPOLLIN);       //fd = sockfd
}

static void events_handler(const int epollfd, struct epoll_event *events,
const int num, const int sockfd, char* buffer)
{
for(int i=0; i<num; ++i){
int fd = events[i].data.fd;
if(events[i].events & EPOLLIN){
#ifdef _DEBUG_
printf("read\n");
#endif
do_read(epollfd, fd, sockfd, buffer);
}
else if(events[i].events & EPOLLOUT){
#ifdef _DEBUG_
printf("write\n");
#endif
do_write(epollfd, fd, buffer);
}
}
}

static void connection_handler(const int sockfd)
{
int epollfd;
struct epoll_event events[EPOLL_EVENTS];
char buffer[BUFFER_SIZE];

memset(buffer, 0, sizeof(buffer));
epollfd = epoll_create(FD_SIZE);
add_event(epollfd, STDIN_FILENO, EPOLLIN);
for(; ;){
#ifdef _DEBUG_
printf("before epoll\n");
#endif
int ret = epoll_wait(epollfd, events, EPOLL_EVENTS, -1);
#ifdef _DEBUG_
printf("after epoll\n");
#endif
events_handler(epollfd, events, ret, sockfd, buffer);
}
}

static int create_connection_proc(const char* ip, const int port)
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
assert(sockfd != -1);

struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(port);
servaddr.sin_addr.s_addr = inet_addr(ip);

int ret = connect(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr));
assert(ret != -1);

return sockfd;
}

*以上代码均经过测试。

ET模式:

因为ET模式只有从unavailable到available才会触发,所以

1、读事件:需要使用while循环读取完,一般是读到EAGAIN,也可以读到返回值小于缓冲区大小;

如果应用层读缓冲区满:那就需要应用层自行标记,解决OS不再通知可读的问题

2、写事件:需要使用while循环写到EAGAIN,也可以写到返回值小于缓冲区大小

如果应用层写缓冲区空(无内容可写):那就需要应用层自行标记,解决OS不再通知可写的问题。

LT模式:

因为LT模式只要available就会触发,所以:

1、读事件:因为一般应用层的逻辑是“来了就能读”,所以一般没有问题,无需while循环读取到EAGAIN;

如果应用层读缓冲区满:就会经常触发,解决方式如下;

2、写事件:如果没有内容要写,就会经常触发,解决方式如下。

LT经常触发读写事件的解决办法:修改fd的注册事件,或者把fd移出epollfd。

总结:

目前好像还是LT方式应用较多,包括redis、libuv等。(nginx使用ET)

LT模式的优点在于:事件循环处理比较简单,无需关注应用层是否有缓冲或缓冲区是否满,只管上报事件。缺点是:可能经常上报,可能影响性能。

ET本身并不会造成饥饿,由于事件只通知一次,开发者一不小心就容易遗漏了待处理的数据,像是饥饿,实质是bug

使用ET模式,特定场景下会比LT更快,因为它可以便捷的处理EPOLLOUT事件,省去打开与关闭EPOLLOUT的epoll_ctl(EPOLL_CTL_MOD)调用。从而有可能让你的性能得到一定的提升。

例如你需要写出1M的数据,写出到socket 256k时,返回了EAGAIN,ET模式下,当再次epoll返回EPOLLOUT事件时,继续写出待写出的数据,当没有数据需要写出时,不处理直接略过即可。而LT模式则需要先打开EPOLLOUT,当没有数据需要写出时,再关闭EPOLLOUT(否则会一直返回EPOLLOUT事件)

总体来说,ET处理EPOLLOUT方便高效些,LT不容易遗漏事件、不易产生bug

如果server的响应通常较小,不会触发EPOLLOUT,那么适合使用LT,例如redis等。而nginx作为高性能的通用服务器,网络流量可以跑满达到1G,这种情况下很容易触发EPOLLOUT,则使用ET。

关于某些场景下ET模式比LT模式效率更好。



更新:

(1)ET模式由于需要循环读取直到EAGAIN或EWOULDBLOCK,所以文件描述符connfd必须是非阻塞的。否则可能读到缓冲区为空阻塞住了,就无法处理后续其他事件。比如,其他用户连接。因为你还一直阻塞在当前connfd的读取,相当于本线程全部阻塞在了这里。epoll不起作用了。

(2)由于LT会多次触发,所以不需while循环。而ET只触发一次,所以需要。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息