您的位置:首页 > 运维架构 > Linux

浅谈几种服务器端模型——epoll

2017-11-10 16:54 267 查看
引言:上一篇说到线程池方式来处理服务器端的并发,并给出了一个线程池的方案(半同步,半异步方式)。再来说下同步异步,阻塞非阻塞区别:
同步和异步针对应用程序来,关注的是程序中间的协作关系;阻塞与非阻塞更关注的是单个进程的执行状态。
同步有阻塞和非阻塞之分,异步没有,它一定是非阻塞的。
阻塞、非阻塞、多路IO复用,都是同步IO,异步必定是非阻塞的,所以不存在异步阻塞和异步非阻塞的说法。真正的异步IO需要CPU的深度参与。换句话说,只有用户线程在操作IO的时候根本不去考虑IO的执行全部都交给CPU去完成,而自己只等待一个完成信号的时候,才是真正的异步IO。所以,拉一个子线程去轮询、去死循环,或者使用select、poll、epool,都不是异步。

同步:执行一个操作之后,进程触发IO操作并等待(也就是我们说的阻塞)或者轮询的去查看IO操作(也就是我们说的非阻塞)是否完成,等待结果,然后才继续执行后续的操作。
异步:执行一个操作后,可以去执行其他的操作,然后等待通知再回来执行刚才没执行完的操作。
阻塞:进程给CPU传达一个任务之后,一直等待CPU处理完成,然后才执行后面的操作。
非阻塞:进程给CPU传达任务后,继续处理后续的操作,隔断时间再来询问之前的操作是否完成。这样的过程其实也叫轮询。

言归正传,所谓多路IO转接服务器也叫多任务IO服务器。其主旨思想是:不再由应用程序自己监听客户端连接,取而代之由内核替应用程序监视文件。主要使用的方法有三种:select、poll、epoll,本章重点讲述epoll。
epoll是linux下多路复用IO接口select/poll的增强版本,内核采用红黑树机制,能显著提高了epoll 的性能。目前epoll是linux大规模并发网络c著名的 libevent Nginx等内部都采用这个机制。

基础API

创建一个epoll句柄,参数size用来告诉内核监听的文件描述符的个数,跟内存大小有关。
    #include <sys/epoll.h>
    int epoll_create(int size)     size:监听数目

控制某个epoll监控的文件描述符上的事件:注册、修改、删除。
    #include <sys/epoll.h>
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
        epfd:  为epoll_creat的句柄
        op:    表示动作,用3个宏来表示:
           EPOLL_CTL_ADD (注册新的fd到epfd),
           EPOLL_CTL_MOD (修改已经注册的fd的监听事件),
           EPOLL_CTL_DEL (从epfd删除一个fd);
        event: 告诉内核需要监听的事件

        struct epoll_event {
           __uint32_t events; /* Epoll events */
           epoll_data_t data; /* User data variable */
        };
        typedef union epoll_data {
           void *ptr;
           int fd;
           uint32_t u32;
           uint64_t u64;
        } epoll_data_t;

        EPOLLIN:  表示对应的文件描述符可以读(包括对端SOCKET正常关闭)
        EPOLLOUT:  表示对应的文件描述符可以写
        EPOLLPRI:  表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)
        EPOLLERR:  表示对应的文件描述符发生错误
        EPOLLHUP:  表示对应的文件描述符被挂断;
        EPOLLET:  将EPOLL设为边缘触发(Edge
Triggered)模式,这是相对于水平触发(Level
Triggered)而言的
        EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

等待所监控文件描述符上有事件的产生,类似于select()调用。
    #include <sys/epoll.h>
    int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
        events:        用来存内核得到事件的集合,
        maxevents: 告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,
        timeout:   是超时时间
           -1:阻塞
           0: 立即返回,非阻塞
           >0:指定毫秒
        返回值: 成功返回有多少文件描述符就绪,时间到时返回0,出错返回-1

代码如下:
server
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
#include "wrap.h"

#define MAXLINE 80
#define SERV_PORT 6666
#define OPEN_MAX 1024

int main(int argc, char *argv[])
{
int i, j, maxi, listenfd, connfd, sockfd;
int nready, efd, res;
ssize_t n;
char buf[MAXLINE], str[INET_ADDRSTRLEN];
socklen_t clilen;
int client[OPEN_MAX];
struct sockaddr_in cliaddr, servaddr;
struct epoll_event tep, ep[OPEN_MAX];

listenfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);

Bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr));

Listen(listenfd, 20);

for (i = 0; i < OPEN_MAX; i++)
client[i] = -1;
maxi = -1;

efd = epoll_create(OPEN_MAX);
if (efd == -1)
perr_exit("epoll_create");

tep.events = EPOLLIN; tep.data.fd = listenfd;

res = epoll_ctl(efd, EPOLL_CTL_ADD, listenfd, &tep);
if (res == -1)
perr_exit("epoll_ctl");

while (1) {
nready = epoll_wait(efd, ep, OPEN_MAX, -1); /* 阻塞监听 */
if (nready == -1)
perr_exit("epoll_wait");

for (i = 0; i < nready; i++) {
if (!(ep[i].events & EPOLLIN))
continue;
if (ep[i].data.fd == listenfd) {
clilen = sizeof(cliaddr);
connfd = Accept(listenfd, (struct sockaddr *)&cliaddr, &clilen);
printf("received from %s at PORT %d\n",
inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),
ntohs(cliaddr.sin_port));
for (j = 0; j < OPEN_MAX; j++) {
if (client[j] < 0) {
client[j] = connfd; /* save descriptor */
break;
}
}

if (j == OPEN_MAX)
perr_exit("too many clients");
if (j > maxi)
maxi = j; 		/* max index in client[] array */

tep.events = EPOLLIN;
tep.data.fd = connfd;
res = epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &tep);
if (res == -1)
perr_exit("epoll_ctl");
} else {
sockfd = ep[i].data.fd;
n = Read(sockfd, buf, MAXLINE);
if (n == 0) {
for (j = 0; j <= maxi; j++) {
if (client[j] == sockfd) {
client[j] = -1;
break;
}
}
res = epoll_ctl(efd, EPOLL_CTL_DEL, sockfd, NULL);
if (res == -1)
perr_exit("epoll_ctl");

Close(sockfd);
printf("client[%d] closed connection\n", j);
} else {
for (j = 0; j < n; j++)
buf[j] = toupper(buf[j]);
Writen(sockfd, buf, n);
}
}
}
}
close(listenfd);
close(efd);
return 0;
}
/* client.c */
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <netinet/in.h>
#include "wrap.h"

#define MAXLINE 80
#define SERV_PORT 6666

int main(int argc, char *argv[])
{
struct sockaddr_in servaddr;
char buf[MAXLINE];
int sockfd, n;

sockfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);
servaddr.sin_port = htons(SERV_PORT);

Connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));

while (fgets(buf, MAXLINE, stdin) != NULL) {
Write(sockfd, buf, strlen(buf));
n = Read(sockfd, buf, MAXLINE);
if (n == 0)
printf("the other side has been closed.\n");
else
Write(STDOUT_FILENO, buf, n);
}

Close(sockfd);
return 0;
}


epoll进阶

事件模型

EPOLL事件有两种模型:
Edge Triggered (ET)边缘触发只有数据到来才触发,不管缓存区中是否还有数据。
Level Triggered (LT)水平触发只要有数据都会触发。
思考如下步骤:

假定我们已经把一个用来从管道中读取数据的文件描述符(RFD)添加到epoll描述符。

管道的另一端写入了2KB的数据

调用epoll_wait,并且它会返回RFD,说明它已经准备好读取操作

读取1KB的数据

调用epoll_wait……
在这个过程中,有两种工作模式:

ET模式

ET模式即Edge
Triggered工作模式。
如果我们在第1步将RFD添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候ET工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩余数据。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。最好以下面的方式调用ET模式的epoll接口,在后面会介绍避免可能的缺陷。

基于非阻塞文件句柄

只有当read或者write返回EAGAIN(非阻塞读,暂时无数据)时才需要挂起、等待。但这并不是说每次read时都需要循环读,直到读到产生一个EAGAIN才认为此次事件处理完成,当read返回的读到的数据长度小于请求的数据长度时,就可以确定此时缓冲中已没有数据了,也就可以认为此事读事件已处理完成。
LT模式
LT模式即Level
Triggered工作模式。
与ET模式不同的是,以LT方式调用epoll接口的时候,它就相当于一个速度比较快的poll,无论后面的数据是否被使用。
LT(level triggered):LT是缺省的工作方式,并且同时支持block和no-block
socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。
ET(edge-triggered):ET是高速工作方式,只支持no-block
socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知。请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only
once).

实例一:

基于管道epoll ET触发模式
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <errno.h>
#include <unistd.h>

#define MAXLINE 10

int main(int argc, char *argv[])
{
int efd, i;
int pfd[2];
pid_t pid;
char buf[MAXLINE], ch = 'a';

pipe(pfd);
pid = fork();
if (pid == 0) {
close(pfd[0]);
while (1) {
for (i = 0; i < MAXLINE/2; i++)
buf[i] = ch;
buf[i-1] = '\n';
ch++;

for (; i < MAXLINE; i++)
buf[i] = ch;
buf[i-1] = '\n';
ch++;

write(pfd[1], buf, sizeof(buf));
sleep(2);
}
close(pfd[1]);
} else if (pid > 0) {
struct epoll_event event;
struct epoll_event resevent[10];
int res, len;
close(pfd[1]);

efd = epoll_create(10);
/* event.events = EPOLLIN; */
event.events = EPOLLIN | EPOLLET;		/* ET 边沿触发 ,默认是水平触发 */
event.data.fd = pfd[0];
epoll_ctl(efd, EPOLL_CTL_ADD, pfd[0], &event);

while (1) {
res = epoll_wait(efd, resevent, 10, -1);
printf("res %d\n", res);
if (resevent[0].data.fd == pfd[0]) {
len = read(pfd[0], buf, MAXLINE/2);
write(STDOUT_FILENO, buf, len);
}
}
close(pfd[0]);
close(efd);
} else {
perror("fork");
exit(-1);
}
return 0;
}


实例二:

基于网络C/S模型的epoll ET触发模式
/* server.c */
#include <stdio.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <unistd.h>

#define MAXLINE 10
#define SERV_PORT 8080

int main(void)
{
struct sockaddr_in servaddr, cliaddr;
socklen_t cliaddr_len;
int listenfd, connfd;
char buf[MAXLINE];
char str[INET_ADDRSTRLEN];
int i, efd;

listenfd = socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);

bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));

listen(listenfd, 20);

struct epoll_event event;
struct epoll_event resevent[10];
int res, len;
efd = epoll_create(10);
event.events = EPOLLIN | EPOLLET;		/* ET 边沿触发 ,默认是水平触发 */

printf("Accepting connections ...\n");
cliaddr_len = sizeof(cliaddr);
connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len);
printf("received from %s at PORT %d\n",
inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),
ntohs(cliaddr.sin_port));

event.data.fd = connfd;
epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);

while (1) {
res = epoll_wait(efd, resevent, 10, -1);
printf("res %d\n", res);
if (resevent[0].data.fd == connfd) {
len = read(connfd, buf, MAXLINE/2);
write(STDOUT_FILENO, buf, len);
}
}
return 0;
}
/* client.c */
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <netinet/in.h>

#define MAXLINE 10
#define SERV_PORT 8080

int main(int argc, char *argv[])
{
struct sockaddr_in servaddr;
char buf[MAXLINE];
int sockfd, i;
char ch = 'a';

sockfd = socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);
servaddr.sin_port = htons(SERV_PORT);

connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));

while (1) {
for (i = 0; i < MAXLINE/2; i++)
buf[i] = ch;
buf[i-1] = '\n';
ch++;

for (; i < MAXLINE; i++)
buf[i] = ch;
buf[i-1] = '\n';
ch++;

write(sockfd, buf, sizeof(buf));
sleep(10);
}
Close(sockfd);
return 0;
}


实例三:
基于网络C/S非阻塞模型的epoll ET触发模式
/* server.c */
#include <stdio.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>

#define MAXLINE 10
#define SERV_PORT 8080

int main(void)
{
struct sockaddr_in servaddr, cliaddr;
socklen_t cliaddr_len;
int listenfd, connfd;
char buf[MAXLINE];
char str[INET_ADDRSTRLEN];
int i, efd, flag;

listenfd = socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);

bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));

listen(listenfd, 20);

struct epoll_event event;
struct epoll_event resevent[10];
int res, len;
efd = epoll_create(10);
/* event.events = EPOLLIN; */
event.events = EPOLLIN | EPOLLET;		/* ET 边沿触发 ,默认是水平触发 */

printf("Accepting connections ...\n");
cliaddr_len = sizeof(cliaddr);
connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len);
printf("received from %s at PORT %d\n",
inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),
ntohs(cliaddr.sin_port));

flag = fcntl(connfd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(connfd, F_SETFL, flag);
event.data.fd = connfd;
epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);

while (1) {
printf("epoll_wait begin\n");
res = epoll_wait(efd, resevent, 10, -1);
printf("epoll_wait end res %d\n", res);

if (resevent[0].data.fd == connfd) {
while ((len = read(connfd, buf, MAXLINE/2)) > 0)
write(STDOUT_FILENO, buf, len);
}
}
return 0;
}
/* client.c */
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <netinet/in.h>

#define MAXLINE 10
#define SERV_PORT 8080

int main(int argc, char *argv[])
{
struct sockaddr_in servaddr;
char buf[MAXLINE];
int sockfd, i;
char ch = 'a';

sockfd = socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);
servaddr.sin_port = htons(SERV_PORT);

connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));

while (1) {
for (i = 0; i < MAXLINE/2; i++)
buf[i] = ch;
buf[i-1] = '\n';
ch++;

for (; i < MAXLINE; i++)
buf[i] = ch;
buf[i-1] = '\n';
ch++;

write(sockfd, buf, sizeof(buf));
sleep(10);
}
Close(sockfd);
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  linux 并发