您的位置:首页 > 运维架构 > Linux

epoll和poll剖析

2015-05-15 09:25 267 查看

1. 概述

       poll、select、epoll是linux常用的多路IO复用的方法。其中poll和select的原理相似,本文以poll为例,而epoll在处理大量fd的时候,其效率和性能比poll和select好很多。本文将从源码的角度上上来分析poll和epoll的实现,从而说明epoll和poll性能差距的主要原因。
      从个人观点来看,觉得poll和epoll的效率和性能上的差距,主要有以下两点:
      1. 支持的fd的数量。poll监听的fd占用的是当前进程打开的fd,因此支持的fd的数量最大只能是FD_SETSIZE,一般为1024。epoll监听的fd是添加到epoll自己的红黑树上,能支持较多的fd,当然fd的数量和内存是相关的,比如1G的内存大概能支持10W个fd。
      2. 效率。每次调用poll时,都需要将fd从用户空间copy到内核空间以及对每个fd调用filp->f_op->poll()函数,这样的代价比较大。而epoll在epoll_ctl()时,将fd从用户空间copy到内核空间,后续调用epoll_wait()时,不用再copy,另外,epoll也只是在epoll_ctl()增加fd的时候,需要调用filp->f_op->poll()一次。因此对比来讲,epoll()的效率比较高,特别是在监听大量fd的时候。

2. poll实现

       接下来直接从poll()的系统调用开始,分析代码。分析的目的是说清楚poll的实现,因此下面的代码是精简过的,判断和检查逻辑都删掉了,如果大家有兴趣,可以下载完整的kernel源码,然后根据本文进行查阅。
       fs/select.c
SYSCALL_DEFINE3(poll, struct pollfd __user *, ufds, unsigned int, nfds,
long, timeout_msecs)
{
struct timespec end_time, *to = NULL;
int ret;

//计算超时时间
if (timeout_msecs >= 0) {
to = &end_time;
poll_select_set_timeout(to, timeout_msecs / MSEC_PER_SEC,
NSEC_PER_MSEC * (timeout_msecs % MSEC_PER_SEC));
}
//主要函数
ret = do_sys_poll(ufds, nfds, to);
return ret;
}

        poll() --> do_sys_poll():

int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds,
struct timespec *end_time)
{
struct poll_wqueues table;
<span style="white-space:pre">	</span>//以下循环精简了
for (;;) {
//循环将每个pollfd从用户空间copy到内核空间,效率低
if (copy_from_user(walk->entries, ufds + nfds-todo,
sizeof(struct pollfd) * walk->len));
}
//初始化poll_wqueues:
//	1.设置poll_table的回调函数__pollwait()
//   2.记录当前进程current到poll_wqueues->polling_task。这样后面唤醒的时候,能找到该进程
poll_initwait(&table);
//对每个fd调用filp->f_op->poll()函数,将current添加到每个fd的等待队列上
fdcount = do_poll(nfds, head, &table, end_time);
//将current进程从每个fd的等待队列上移除
poll_freewait(&table);

<span style="white-space:pre">	</span>//此处精简了
return err;
}

      do_sys_poll() --> poll_initwait():

void poll_initwait(struct poll_wqueues *pwq)
{
//给pwq->pt 设置回调函数__pollwait(),__pollwait()会在filp->f_op->poll()中被调用
init_poll_funcptr(&pwq->pt, __pollwait);
//将当前进程current记录到pwq->polling_task上,这样唤醒的时候,才能找到对应的进程
pwq->polling_task = current;
}

     do_sys_poll() --> do_poll():

static int do_poll(unsigned int nfds,  struct poll_list *list,
struct poll_wqueues *wait, struct timespec *end_time)
{
poll_table* pt = &wait->pt;

for (;;) {
struct poll_list *walk;

for (walk = list; walk != NULL; walk = walk->next) {
struct pollfd * pfd, * pfd_end;

pfd = walk->entries;
pfd_end = pfd + walk->len;
for (; pfd != pfd_end; pfd++) {
//循环对每个fd调用do_pollfd(),实际就是filp->f_op->poll()函数
if (do_pollfd(pfd, pt)) {
count++;
pt = NULL;
}
}
}

//如果有监听的时间发生或超时
if (count || timed_out)
break;
//设置超时时间,设置当前进程状态为TASK_INTERRUPTIBLE,然后让出cpu,等待唤醒或者超时。
if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack))
timed_out = 1;
}
return count;
}
      do_poll() --> do_pollfd():

static inline unsigned int do_pollfd(struct pollfd *pollfd, poll_table *pwait)
{
unsigned int mask;
int fd;

mask = 0;
fd = pollfd->fd;
if (fd >= 0) {
int fput_needed;
struct file * file;
//根据fd找到对应的file结构体,关于file结构体可以查看"linux VFS"中的描述
file = fget_light(fd, &fput_needed);
mask = POLLNVAL;
if (file != NULL) {
mask = DEFAULT_POLLMASK;
if (file->f_op && file->f_op->poll) {
if (pwait)
pwait->key = pollfd->events |
POLLERR | POLLHUP;
//就是这里了~,对每个fd调用file->f_op->poll()函数
//file是fd对应的文件结构体,file->f_op是文件的操作函数集,该调用关系在"linux VFS"中有讲述
mask = file->f_op->poll(file, pwait);
}
/* Mask out unneeded events. */
mask &= pollfd->events | POLLERR | POLLHUP;
fput_light(file, fput_needed);
}
}
pollfd->revents = mask;

return mask;
}
       do_pollfd()中有关于VFS的一些知识,大家有兴趣可以参考《linux VFS》

       f_op->poll()函数是每个支持poll()操作的文件系统或者驱动需要实现的,网络协议栈的传输层TCP/UDP也是实现了poll()函数,其实来说socket也是fd,那么当然也会实现f_op函数集。关于网络协议栈中中的流程分析以及函数调用,大家可以参考《linux 网络协议栈流程》

      接下来将以udp的poll()函数udp_poll()继续分析。

       net/ipv4/udp.c

unsigned int udp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
unsigned int mask = datagram_poll(file, sock, wait);
struct sock *sk = sock->sk;
return mask;

}
      udp_poll() --> datagram_poll():

unsigned int datagram_poll(struct file *file, struct socket *sock,
poll_table *wait)
{
struct sock *sk = sock->sk;
unsigned int mask;

//sk->sk_sleep: sock的等待队列头wait_queue_head_t;
//wait:在poll_initwait()中初始化,回调函数是__pollwait(),该函数马上就会用到~
sock_poll_wait(file, sk->sk_sleep, wait);
mask = 0;

//精简了很多判断逻辑

return mask;
}
      datagram_poll() --> sock_poll_wait():

static inline void sock_poll_wait(struct file *filp,
wait_queue_head_t *wait_address, poll_table *p)
{
if (p && wait_address) {
poll_wait(filp, wait_address, p);
/*
* We need to be sure we are in sync with the
* socket flags modification.
*
* This memory barrier is paired in the sk_has_sleeper.
*/
smp_mb();
}
}
      sock_poll_wait() --> poll_wait():

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && wait_address)
//p->qproc就是__pollwait(),是在前面的poll_initwait()中设置的~
p->qproc(filp, wait_address, p);
}
      poll_wait() --> __pollwait():

static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
poll_table *p)
{
//使用container_of()获取包含该poll_table p的poll_wqueues,这层关系也是在poll_initwait()中设置的。
struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
struct poll_table_entry *entry = poll_get_entry(pwq);
if (!entry)
return;
get_file(filp);
entry->filp = filp;
entry->wait_address = wait_address;
entry->key = p->key;
//entry->wait: 是struct __wait_queue,该结构体是等待队列的一个基本结构体,代表一个等待对象。
//设置entry->wait的回调函数为pollwake(),该函数也是唤醒时调用的函数。
init_waitqueue_func_entry(&entry->wait, pollwake);
//设置pwq到entry->wait的private上,在poll_initwait()中pwq->pollingtask记录的是进程current
entry->wait.private = pwq;
//wait_address: 就是sk->sk_sleep
//entry->wait: 该结构体中记录了回调函数pollwake(),以及进程current~
add_wait_queue(wait_address, &entry->wait);
}
      到了add_wait_queue()这里,大家应该比较清楚了~,接下来应该就是在合适的时机调用pollwake()唤醒__wait_queue_t中记录的current进程。那么对于udp套接字来说,这个合适的时机就是收到数据包的 时候,为了说明这个情况,接下来我们从udp的网络数据包的接收开始分析。

      网卡收到数据后,经过中断、软中断、IP层等处理后,upd的包会调用udp_rcv()接受。这个调用过程可以参考《linux 网络协议栈流程》

int udp_rcv(struct sk_buff *skb)
{
return __udp4_lib_rcv(skb, &udp_table, IPPROTO_UDP);
}
int __udp4_lib_rcv(struct sk_buff *skb, struct udp_table *udptable,
int proto)
{
struct sock *sk;
struct udphdr *uh;
unsigned short ulen;
struct rtable *rt = skb_rtable(skb);
__be32 saddr, daddr;
struct net *net = dev_net(skb->dev);

uh   = udp_hdr(skb);
ulen = ntohs(uh->len);
saddr = ip_hdr(skb)->saddr;
daddr = ip_hdr(skb)->daddr;

//根据skb的udp头信息(源端口、目的端口)找到对应的sock s
df5d
k;
sk = __udp4_lib_lookup_skb(skb, uh->source, uh->dest, udptable);

if (sk != NULL) {
//sk接受数据包skb,即将skb添加到sk的接受队列上,然后唤醒等待该sk的进程
int ret = udp_queue_rcv_skb(sk, skb);
sock_put(sk);
}

return 0;
}
      接下来是udp_queue_rcv_skb() --> __udp_queue_rcv_skb() --> sock_queue_rcv_skb() :

int sock_queue_rcv_skb(struct sock *sk, struct sk_buff *skb)
{
skb_len = skb->len;
//将skb添加到sk的接受队列 sk_receive_queue上
skb_queue_tail(&sk->sk_receive_queue, skb);

if (!sock_flag(sk, SOCK_DEAD))
//调用sk_data_ready()通知sk等待队列上的等待对象们,即唤醒
sk->sk_data_ready(sk, skb_len);
return err;
}
      sk->sk_data_ready()是在sock初始化调用net/core/sock.c:sock_init_data()时赋值为sock_def_readable();
static void sock_def_readable(struct sock *sk, int len)
{
read_lock(&sk->sk_callback_lock);
if (sk_has_sleeper(sk))
//唤醒等待在sk->sk_sleep上的进程
wake_up_interruptible_sync_poll(sk->sk_sleep, POLLIN |
POLLRDNORM | POLLRDBAND);
sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
read_unlock(&sk->sk_callback_lock);
}
      wake_up_interruptible_sync_poll() --> __wake_up_sync_key() --> __wake_up_common():

static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, int wake_flags, void *key)
{
wait_queue_t *curr, *next;
//对于udp socket来说 wait_queue_head_t *q就是sk->sk_sleep
//遍历等待队列上的wait_queue_t,调用wait_queue_t的回调函数
list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
unsigned flags = curr->flags;
//对于本文分析中,curr->func就是在__pollwait()中设置的pollwake()
if (curr->func(curr, mode, wake_flags, key) &&
(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
}
}
      终于到了调用pollwake()的时候了~

static int __pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
//wait->private 是在前面的__pollwait()中设置的,wait->private->polling_task就是之前的current
struct poll_wqueues *pwq = wait->private;
//pwd->polling_task就是之前的current
DECLARE_WAITQUEUE(dummy_wait, pwq->polling_task);

return default_wake_function(&dummy_wait, mode, sync, key);
}
       接下来 default_wake_function() --> try_to_wake_up(), try_to_wake_up()就会将之前的current的state修改为TASK_RUNNING,然后schedule()。

       好吧~ poll()大概就是如此,可能有点长,接下来 挑重点的重点简化下:

        1. poll() 设置poll_wqueues pwd->pt->qproc = __pollwait(), pwq->polling_task = current;

        2. 对每个fd调用file->f_op->poll(),在f_op->poll()中会调用__pollwait();

        3. __pollwait() 设置wait_queue_t wait->func = pollwake() wait->private = pwq,然后将wait添加到fd的等待队列上。

        4. fd变为可用之后,会循环调用其等待队列上的回调函数,这时poll_wake()就会唤醒进程current。

3. epoll分析

      首先是epoll_create()创建epoll fd:
      epoll_create() --> epoll_create1()

SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;

/* Check the EPOLL_* constant for consistency.  */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);

if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
/*
* Create the internal data structure ("struct eventpoll").
*/
//创建eventpoll, struct eventpoll 有以下重要的元素:
// 1. wait_queue_head_t wq;调用epoll_wait()时,当前进程就是挂到该等待队列上。
//相比poll()是将当前进程挂到每个fd的等待队列上,且每次调用poll()都要挂一次。
// 2. wait_queue_head_t poll_wait();用于fd为epoll的fd时的等待队列,
//也就是说poll()监听的fd也可以是一个epoll的fd。
// 3. struct list_head rdlist;就绪链表,epoll中监听的fd如果就绪了会添加到该链表中。
// 4. struct rb_root rbr; 用红黑树存放监听的fd,poll()中监听的fd是占用进程的fd项。
error = ep_alloc(&ep);
if (error < 0)
return error;
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
//获取一个可用的fd
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
if (fd < 0) {
error = fd;
goto out_free_ep;
}
//创建一个epoll fs的file结构体,即inode。epoll fs 也即实现了f_op函数集 eventpoll_fops,
//尽管eventpoll_fops很简单,没有实现read/write等通用接口,因为不会对epoll fd进行读写
//当然也可以实现read/write函数,read()用来读取可用的fd,write()用来修改监听的fd~ 这个建议怎么样~
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (IS_ERR(file)) {
error = PTR_ERR(file);
goto out_free_fd;
}
//fd和file建立关系
fd_install(fd, file);
ep->file = file;
return fd;

out_free_fd:
put_unused_fd(fd);
out_free_ep:
ep_free(ep);
return error;
}
      epoll_ctl() 添加、删除、修改监听的fd:

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
int error;
int did_lock_epmutex = 0;
struct file *file, *tfile;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
ep = file->private_data;
epi = ep_find(ep, tfile, fd);

error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
epds.events |= POLLERR | POLLHUP;
//添加fd到epoll的监听中
error = ep_insert(ep, &epds, tfile, fd);
} else
error = -EEXIST;
clear_tfile_check_list();
break;
case EPOLL_CTL_DEL:
if (epi)
//ep_remove()将之前通过ep_insert()添加到fd等待队列上的wait_queue_t从fd的等待队列上删除。
//相比poll, poll是在每次调用完poll()时,就会将每个fd的等待队列上移除wait_queue_t。
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else
error = -ENOENT;
break;
}
mutex_unlock(&ep->mtx);

error_tgt_fput:
if (did_lock_epmutex)
mutex_unlock(&epmutex);

fput(tfile);
error_fput:
fput(file);
error_return:

return error;
}
      ep_insert():

static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
int error, revents, pwake = 0;
unsigned long flags;
struct epitem *epi;
struct ep_pqueue epq;

if (unlikely(atomic_read(&ep->user->epoll_watches) >=
max_user_watches))
return -ENOSPC;
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM;

/* Item initialization follow here ... */
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;

/* Initialize the poll table using the queue callback */
epq.epi = epi;
//设置回调函数 ep_ptable_queue_proc(),相比poll的此处的回调函数是__pollwait();
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

/*
* Attach the item to the poll hooks and get current event bits.
* We can safely use the file* here because its usage count has
* been increased by the caller of this function. Note that after
* this operation completes, the poll callback can start hitting
* the new item.
*/
//调用file->f_op->poll() 和 之间分析poll一样,对于udp来说,这里会走到udp_poll()
revents = tfile->f_op->poll(tfile, &epq.pt);

/*
* We have to check if something went wrong during the poll wait queue
* install process. Namely an allocation for a wait queue failed due
* high memory pressure.
*/
error = -ENOMEM;
if (epi->nwait < 0)
goto error_unregister;

/* Add the current item to the list of active epoll hook for this file */
spin_lock(&tfile->f_lock);
list_add_tail(&epi->fllink, &tfile->f_ep_links);
spin_unlock(&tfile->f_lock);

/*
* Add the current item to the RB tree. All RB tree operations are
* protected by "mtx", and ep_insert() is called with "mtx" held.
*/
ep_rbtree_insert(ep, epi);

/* now check if we've created too many backpaths */
error = -EINVAL;
if (reverse_path_check())
goto error_remove_epi;

/* We have to drop the new item inside our item list to keep track of it */
spin_lock_irqsave(&ep->lock, flags);

/* If the file is already "ready" we drop it inside the ready list */
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);

/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}

spin_unlock_irqrestore(&ep->lock, flags);

atomic_inc(&ep->user->epoll_watches);

/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);

return 0;

error_remove_epi:
spin_lock(&tfile->f_lock);
if (ep_is_linked(&epi->fllink))
list_del_init(&epi->fllink);
spin_unlock(&tfile->f_lock);

rb_erase(&epi->rbn, &ep->rbr);

error_unregister:
ep_unregister_pollwait(ep, epi);

/*
* We need to do this because an event could have been arrived on some
* allocated wait queue. Note that we don't care about the ep->ovflist
* list, since that is used/cleaned only inside a section bound by "mtx".
* And ep_insert() is called with "mtx" held.
*/
spin_lock_irqsave(&ep->lock, flags);
if (ep_is_linked(&epi->rdllink))
list_del_init(&epi->rdllink);
spin_unlock_irqrestore(&ep->lock, flags);

kmem_cache_free(epi_cache, epi);

return error;
}
      类似poll流程,udp_poll()最后会调用回调函数,对于epoll来说,回调函数为
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;

if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
//设置回调函数为ep_poll_callback(),相比poll()中是pollwake()
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
//whead为sk->sk_sleep, pwq->wait的回调函数为ep_poll_callback()
//pollwake()主要任务是唤醒pwq中记录的进程current
//ep_poll_callback()主要任务是:1. 将fd添加到epoll的rdlist中
// 2. 唤醒eventpoll->wq上等待的进程,该进程正是调用epoll_wait()的进程~
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
/* We have to signal that an error occurred */
epi->nwait = -1;
}
}
       ep_ptable_queue_proc()向fd的等待队列上添加了一个wait_queue_t wait,该wait的回调函数为ep_poll_callback()。类似poll分析一样,在网卡收到属于sock sk的网络数据包,并且最终数据包到达sock sk时,会调用等待队列上的wait的回调函数,对于epoll来说就是ep_poll_callback()。

static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;

/* If this file is already in the ready list we exit soon */
if (!ep_is_linked(&epi->rdllink))
//1. 将epi添加到eventpoll的rdllist上,epi中记录了fd以及事件
list_add_tail(&epi->rdllink, &ep->rdllist);

/*
* Wake up ( if active ) both the eventpoll wait list and the ->poll()
* wait list.
*/
if (waitqueue_active(&ep->wq))
//2. 唤醒eq->wq上的进程,也即调用epoll_wait()的进程,这个逻辑在epoll_wait()还会有体现
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
return 1;
}
      ep_poll_callback()主要做了两件事:1. 将epi添加到eventpoll的rdllist上,epi总记录了fd以及事件 2. 唤醒ep->wq上的进程,其实该进程就是调用epoll_wait()的进程,下面分析epoll_wait()可以看出来:

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
int error;
struct file *file;
struct eventpoll *ep;

/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
ep = file->private_data;

/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
return error;
}
       ep_poll()中即是当前进程挂到ep->wait上的过程:

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res, eavail;
unsigned long flags;
long jtimeout;
wait_queue_t wait;

/*
* Calculate the timeout by checking for the "infinite" value (-1)
* and the overflow condition. The passed timeout is in milliseconds,
* that why (t * HZ) / 1000.
*/
jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?
MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;

retry:
spin_lock_irqsave(&ep->lock, flags);

res = 0;
if (list_empty(&ep->rdllist)) {
/*
* We don't have any available event to return to the caller.
* We need to sleep here, and we will be wake up by
* ep_poll_callback() when events will become available.
*/
//将当前进程current挂到ep->wq上。
init_waitqueue_entry(&wait, current);
wait.flags |= WQ_FLAG_EXCLUSIVE;
__add_wait_queue(&ep->wq, &wait);

for (;;) {
/*
* We don't want to sleep if the ep_poll_callback() sends us
* a wakeup in between. That's why we set the task state
* to TASK_INTERRUPTIBLE before doing the checks.
*/
//设置进程状态为TASK_INTERRUPTIBLE
set_current_state(TASK_INTERRUPTIBLE);
if (!list_empty(&ep->rdllist) || !jtimeout)
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}

spin_unlock_irqrestore(&ep->lock, flags);
//切换,让出cpu,等待唤醒或者超时
jtimeout = schedule_timeout(jtimeout);
spin_lock_irqsave(&ep->lock, flags);
}
//将current从ep->wq上移除
__remove_wait_queue(&ep->wq, &wait);
//设置进程为TASK_RUNNING状态
set_current_state(TASK_RUNNING);
}
/* Is it worth to try to dig for events ? */
eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;

spin_unlock_irqrestore(&ep->lock, flags);

/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && jtimeout)
goto retry;

return res;
}
       至此,epoll的流程基本上分析完毕。

4. 总结

       从上面分析epoll和poll的代码可以看出来。epoll的两个回调函数 和 poll 不一样,epoll() 自己实现了两个回调函数,poll使用的回调函数可以说是kernel通用的。epoll()巧妙的利用ep_call_back()将fd添加到eventpoll的rdllist中,避免了传统的项poll、select每次调用需要重新将当前进程挂到fd的 等待队列上。

       另外 epoll()常用的就是监听网络套接字,其实除了网络套接字外,也可以监听其他文件描述符。比如管道,监听管道可以也做可以做进程间唤醒等待,其实android的handler message消息处理机制底层就是用的epoll + pipe来实现时等待唤醒。

        也可以通过 epoll的EPOLLOUT事件来做事件通知机制,因为在fd可写的情况下,往epoll中添加一个fd的EPOLLOUT事件,就会立即触发这个事件,从而可以在epoll_wait()返回时做相应的处理。比如 twmproxy就是这么使用的:

        src/event/nc_epoll.c:

int
event_add_out(struct evbase *evb, struct conn *c)
{
int status;
struct epoll_event event;
int ep = evb->ep;

ASSERT(ep > 0);
ASSERT(c != NULL);
ASSERT(c->sd > 0);
ASSERT(c->recv_active);

if (c->send_active) {
return 0;
}

event.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
event.data.ptr = c;

status = epoll_ctl(ep, EPOLL_CTL_MOD, c->sd, &event);
if (status < 0) {
log_error("epoll ctl on e %d sd %d failed: %s", ep, c->sd,
strerror(errno));
} else {
c->send_active = 1;
}

}
       后续再总结下twmproxy的zero copy和事件驱动的机制。

       另外吐槽下 写blog真心 太费事~
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  linux epoll poll