epoll原理及线程安全小结
2016-01-28 18:51
316 查看
epoll的红黑树由一个互斥量保护,ready
list是自旋锁保护的。
ready list涉及到add\mod\wait
红黑树涉及到add\mod\del
1 等待队列实现原理
1.1 功能介绍
进程有多种状态,当进程做好准备后,它就处于就绪状态(TASK_RUNNING),放入运行队列,等待内核调度器来调度。当然,同一时刻可能有多个进程进入就绪状态,但是却可能只有1个CPU是空闲的,所以最后能不能在CPU上运行,还要取决于优先级等多种因素。当进程进行外部设备的IO等待操作时,由于外部设备的操作速度一般是非常慢的,所以进程会从就绪状态变为等待状态(休眠),进入等待队列,把CPU让给其它进程。直到IO操作完成,内核“唤醒”等待的进程,于是进程再度从等待状态变为就绪状态。
在用户态,进程进行IO操作时,可以有多种处理方式,如阻塞式IO,非阻塞式IO,多路复用(select/poll/epoll),AIO(aio_read/aio_write)等等。这些操作在内核态都要用到等待队列。
1.2 相关的结构体
typedef struct __wait_queue wait_queue_t;
struct __wait_queue
{
unsigned int flags;
#define WQ_FLAG_EXCLUSIVE 0x01
struct task_struct * task; //等待队列节点对应的进程
wait_queue_func_t func; //等待队列的回调函数,在进程被唤醒
struct list_head task_list;
};
这个是等待队列的节点,在很多等待队列里,这个func函数指针默认为空函数。
但是,在select/poll/epoll函数中,这个func函数指针不为空,并且扮演着重要的角色。
struct __wait_queue_head
{
spinlock_t lock;
struct list_head task_list;
};
typedef struct __wait_queue_head wait_queue_head_t;
这个是等待队列的头部。其中task_list里有指向下一个节点的指针。为了保证对等待队列的操作是原子的,还需要一个自旋锁lock。
这里需要提一下内核队列中被广泛使用的结构体struct list_head。
struct list_head
{
struct list_head *next, *prev;
};
1.3 实现原理
可以看到,等待队列的核心是一个list_head组成的双向链表。
其中,第一个节点是队列的头,类型为wait_queue_head_t,里面包含了一个list_head类型的成员task_list。
接下去的每个节点类型为 wait_queue_t,里面也有一个list_head类型的成员task_list,并且有个指针指向等待的进程。通过这种方式,内核组织了一个等待队列。
那么,这个等待队列怎样与一个事件关联呢?
在内核中,进程在文件操作等事件上的等待,一定会有一个对应的等待队列的结构体与之对应。例如,等待管道的文件操作(在内核看来,管道也是一种文件)的进程都放在管道对应inode.i_pipe->wait这个等待队列中。这样,如果管道文件操作完成,就可以很方便地通过inode.i_pipe->wait唤醒等待的进程。
在大部分情况下(如系统调用read),当前进程等待IO操作的完成,只要在内核堆栈中分配一个wait_queue_t的结构体,然后初始化,把task指向当前进程的task_struct,然后调用add_wait_queue()放入等待队列即可。
但是,在select/poll中,由于系统调用要监视多个文件描述符的操作,因此要把当前进程放入多个文件的等待队列,并且要分配多个wait_queue_t结构体。这时候,在堆栈上分配是不合适的。因为内核堆栈很小。所以要通过动态分配的方式来分配wait_queue_t结构体。除了在一些结构体里直接定义等待队列的头部,内核的信号量机制也大量使用了等待队列。信号量是为了进行进程同步而引入的。与自旋锁不同的是,当一个进程无法获得信号量时,它会把自己放到这个信号量的等待队列中,转变为等待状态。当其它进程释放信号量时,会唤醒等待的进程。
epoll 关键结构体:
struct ep_pqueue
{
poll_table pt;
struct epitem *epi;
};
这个结构体类似于select/poll中的struct poll_wqueues。由于epoll需要在内核态保存大量信息,所以光光一个回调函数指针已经不能满足要求,所以在这里引入了一个新的结构体struct epitem。
struct epitem
{
struct rb_node rbn;
红黑树,用来保存eventpoll
struct list_head rdllink;
双向链表,用来保存已经完成的eventpoll
struct epoll_filefd ffd;
这个结构体对应的被监听的文件描述符信息
int nwait;
poll操作中事件的个数
struct list_head pwqlist;
双向链表,保存着被监视文件的等待队列,功能类似于select/poll中的poll_table
struct eventpoll *ep;
指向eventpoll,多个epitem对应一个eventpoll
struct epoll_event event;
记录发生的事件和对应的fd
atomic_t usecnt;
引用计数
struct list_head fllink;
双向链表,用来链接被监视的文件描述符对应的struct file。因为file里有f_ep_link,
用来保存所有监视这个文件的epoll节点
struct list_head txlink;
双向链表,用来保存传输队列
unsigned int revents;
文件描述符的状态,在收集和传输时用来锁住空的事件集合
};
该结构体用来保存与epoll节点关联的多个文件描述符,保存的方式是使用红黑树实现的hash表。至于为什么要保存,下文有详细解释。它与被监听的文件描述符一一对应。
struct eventpoll
{
spinlock_t lock;
读写锁
struct mutex mtx;
读写信号量
wait_queue_head_t wq;
wait_queue_head_t poll_wait;
struct list_head rdllist;
已经完成的操作事件的队列。
struct rb_root rbr;
保存epoll监视的文件描述符
struct epitem *ovflist;
struct user_struct *user;
};
这个结构体保存了epoll文件描述符的扩展信息,它被保存在file结构体的private_data中。它与epoll文件节点一一对应。通常一个epoll文件节点对应多个被监视的文件描述符。所以一个eventpoll结构体会对应多个epitem结构体。
那么,epoll中的等待事件放在哪里呢?见下面
struct eppoll_entry
{
struct list_head llink;
void *base;
wait_queue_t wait;
wait_queue_head_t *whead;
};
与select/poll的struct poll_table_entry相比,epoll的表示等待队列节点的结构体只是稍有不同,
与struct poll_table_entry比较一下。
struct poll_table_entry
{
struct file * filp;
wait_queue_t wait;
wait_queue_head_t * wait_address;
};
由于epitem对应一个被监视的文件,所以通过base可以方便地得到被监视的文件信息。又因为一个文件可能有多个事件发生,所以用llink链接这些事件。
=====================================================================
相关内核代码:
fs/eventpoll.c
判断一个tcp套接字上是否有激活事件:net/ipv4/tcp.c:tcp_poll函数
每个epollfd在内核中有一个对应的eventpoll结构对象.
其中关键的成员是一个readylist(eventpoll:rdllist)
和一棵红黑树(eventpoll:rbr).
eventpoll的红黑树中.红黑树的作用是使用者调用EPOLL_MOD的时候可以快速找到fd对应的epitem。
epoll_ctl的功能是实现一系列操作,如把文件与eventpollfs文件系统的inode节点关联起来。这里要介绍一下eventpoll结构体,它保存在file->f_private中,记录了eventpollfs文件系统的inode节点的重要信息,其中成员rbr保存了该epoll文件节点监视的所有文件描述符。组织的方式是一棵红黑树,这种结构体在查找节点时非常高效。首先它调用ep_find()从eventpoll中的红黑树获得epitem结构体。然后根据op参数的不同而选择不同的操作。如果op为EPOLL_CTL_ADD,那么正常情况下epitem是不可能在eventpoll的红黑树中找到的,所以调用ep_insert创建一个epitem结构体并插入到对应的红黑树中。
ep_insert()首先分配一个epitem对象,对它初始化后,把它放入对应的红黑树。此外,这个函数还要作一个操作,就是把当前进程放入对应文件操作的等待队列。这一步是由下面的代码完成的。
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
......
revents = tfile->f_op->poll(tfile, &epq.pt);
函数先调用init_poll_funcptr注册了一个回调函数ep_ptable_queue_proc,ep_ptable_queue_proc函数会在调用f_op->poll时被执行。
900 static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
901 poll_table *pt)
902 {
903 struct epitem *epi = ep_item_from_epqueue(pt);
904 struct eppoll_entry *pwq;
905
906 if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
907 init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
908 pwq->whead = whead;
909 pwq->base = epi;
910 add_wait_queue(whead, &pwq->wait);
911 list_add_tail(&pwq->llink, &epi->pwqlist);
912 epi->nwait++;
913 } else {
914
915 epi->nwait = -1;
916 }
917 }
该函数分配一个epoll等待队列结点eppoll_entry:一方面把它挂到文件操作的等待队列中,另一方面把它挂到epitem的队列中。此外,它还注册了一个等待队列的回调函数ep_poll_callback。当文件操作完成,唤醒当前进程之前,会调用ep_poll_callback(),把eventpoll放到epitem的完成队列中(注释:通过查看代码,此处应该是把epitem放到eventpoll的完成队列,只有这样才能在epoll_wait()中只要看eventpoll的完成队列即可得到所有的完成文件描述符),并唤醒等待进程。
如果在执行f_op->poll以后,发现被监视的文件操作已经完成了,那么把它放在完成队列中了,并立即把等待操作的那些进程唤醒。
919 if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
920 return -ENOMEM;
963 ep_rbtree_insert(ep, epi);
调用epoll_wait的时候,将readylist中的epitem出列,将触发的事件拷贝到用户空间.之后判断epitem是否需
要重新添加回readylist.
epitem重新添加到readylist必须满足下列条件:
1) epitem上有用户关注的事件触发.
2) epitem被设置为水平触发模式(如果一个epitem被设置为边界触发则这个epitem不会被重新添加到readylist
中,在什么时候重新添加到readylist请继续往下看).
注意,如果epitem被设置为EPOLLONESHOT模式,则当这个epitem上的事件拷贝到用户空间之后,会将
这个epitem上的关注事件清空(只是关注事件被清空,并没有从epoll中删除,要删除必须对那个描述符调用
EPOLL_DEL),也就是说即使这个epitem上有触发事件,但是因为没有用户关注的事件所以不会被重新添加到
readylist中.
epitem被添加到readylist中的各种情况(当一个epitem被添加到readylist如果有线程阻塞在epoll_wait中,那
个线程会被唤醒):
1)对一个fd调用EPOLL_ADD,如果这个fd上有用户关注的激活事件,则这个fd会被添加到readylist.
2)对一个fd调用EPOLL_MOD改变关注的事件,如果新增加了一个关注事件且对应的fd上有相应的事件激活,
则这个fd会被添加到readylist.
3)当一个fd上有事件触发时(例如一个socket上有外来的数据)会调用ep_poll_callback(见eventpoll::ep_ptable_queue_proc),
如果触发的事件是用户关注的事件,则这个fd会被添加到readylist中.
了解了epoll的执行过程之后,可以回答一个在使用边界触发时常见的疑问.在一个fd被设置为边界触发的情况下,
调用read/write,如何正确的判断那个fd已经没有数据可读/不再可写.epoll文档中的建议是直到触发EAGAIN
错误.而实际上只要你请求字节数小于read/write的返回值就可以确定那个fd上已经没有数据可读/不再可写.
最后用一个epollfd监听另一个epollfd也是合法的,epoll通过调用eventpoll::ep_eventpoll_poll来判断一个
epollfd上是否有触发的事件(只能是读事件).
以下是个人读代码总结:
1709 SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
1710 int, maxevents, int, timeout)
1588 SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
1589 struct epoll_event __user *, event)
epoll_ctl的机制大致如下:
1360 mutex_lock(&ep->mtx);
1361
1362
1367 epi = ep_find(ep, tfile, fd); //这里就是去ep->rbr 红黑树查找
1368
1369 error = -EINVAL;
1370 switch (op) {
1371 case EPOLL_CTL_ADD:
1372 if (!epi) {
1373 epds.events |= POLLERR | POLLHUP;
1374 error = ep_insert(ep, &epds, tfile, fd);
1375 } else
1376 error = -EEXIST;
1377 break;
1378 case EPOLL_CTL_DEL:
1379 if (epi)
1380 error = ep_remove(ep, epi);
1381 else
1382 error = -ENOENT;
1383 break;
1384 case EPOLL_CTL_MOD:
1385 if (epi) {
1386 epds.events |= POLLERR | POLLHUP;
1387 error = ep_modify(ep, epi, &epds);
1388 } else
1389 error = -ENOENT;
1390 break;
1391 }
1392 mutex_unlock(&ep->mtx);
http://blog.csdn.net/justlinux2010/article/details/8510507
====================================
源码分析
====================================
一、sys_epoll_wait()函数
源码及分析如下所示:
[cpp] view
plaincopy
/*
* Implement the event wait interface for the eventpoll file. It is the kernel
* part of the user space epoll_wait(2).
*/
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
int error;
struct file *file;
struct eventpoll *ep;
/* The maximum number of event must be greater than zero */
/*
* 检查maxevents参数。
*/
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;
/* Verify that the area passed by the user is writeable */
/*
* 检查用户空间传入的events指向的内存是否可写。参见__range_not_ok()。
*/
if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
error = -EFAULT;
goto error_return;
}
/* Get the "struct file *" for the eventpoll file */
/*
* 获取epfd对应的eventpoll文件的file实例,file结构是在epoll_create中创建
*/
error = -EBADF;
file = fget(epfd);
if (!file)
goto error_return;
/*
* We have to check that the file structure underneath the fd
* the user passed to us _is_ an eventpoll file.
*/
/*
* 通过检查epfd对应的文件操作是不是eventpoll_fops
* 来判断epfd是否是一个eventpoll文件。如果不是
* 则返回EINVAL错误。
*/
error = -EINVAL;
if (!is_file_epoll(file))
goto error_fput;
/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
ep = file->private_data;
/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
error_fput:
fput(file);
error_return:
return error;
}
sys_epoll_wait()是epoll_wait()对应的系统调用,主要用来获取文件状态已经就绪的事件,该函数检查参数、获取eventpoll文件后调用ep_poll()来完成主要的工作。在分析ep_poll()函数之前,先介绍一下使用epoll_wait()时可能犯的错误(接下来介绍的就是我犯过的错误):
1、返回EBADF错误
除非你故意指定一个不存在的文件描述符,否则几乎百分百肯定,你的程序有BUG了!从源码中可以看到调用fget()函数返回NULL时,会返回此错误。fget()源码如下:
[cpp] view
plaincopy
struct file *fget(unsigned int fd)
{
struct file *file;
struct files_struct *files = current->files;
rcu_read_lock();
file = fcheck_files(files, fd);
if (file) {
if (!atomic_long_inc_not_zero(&file->f_count)) {
/* File object ref couldn't be taken */
rcu_read_unlock();
return NULL;
}
}
rcu_read_unlock();
return file;
}
主要看这句(struct files_struct *files = current->files;),这条语句是获取描述当前进程已经打开的文件的files_struct结构,然后从这个结构中查找传入的fd对应的file实例,如果没有找到,说明当前进程中打开的文件不包括这个fd,所以几乎百分百肯定是程序设计的问题。我的程序出错,就是因为在父进程中创建了文件描述符,但是将子进程变为守护进程了,也就没有继承父进程中打开的文件。
2、死循环(一般不会犯,但是我是第一次用,犯了)
epoll_wait()中有一个设置超时时间的参数,所以我在循环中没有使用睡眠队列的操作,想依赖epoll的睡眠操作,所以在返回值小于等于0时,直接进行下一次循环,没有充分考虑epoll_wait()的返回值小于0时的不同情况,所以代码写成了下面的样子:
[cpp] view
plaincopy
for(;;) {
......
events = epoll_wait(fcluster_epfd, fcluster_wait_events,
fcluster_wait_size, 3000);
if (unlikely(events <= 0)) {
continue;
}
.......
}
当epoll_wait()返回EBADF或EFAULT时,就会陷入死循环,因此此时还没有进入睡眠的操作。
二、ep_poll()函数
下面来看获取事件的主要函数ep_poll(),源码及分析如下:
[cpp] view
plaincopy
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res, eavail;
unsigned long flags;
long jtimeout;
wait_queue_t wait;
/*
* Calculate the timeout by checking for the "infinite" value (-1)
* and the overflow condition. The passed timeout is in milliseconds,
* that why (t * HZ) / 1000.
*/
/*
* timeout是以毫秒为单位,这里是要转换为jiffies时间。
* 这里加上999(即1000-1),是为了向上取整。
*/
jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?
MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;
retry:
spin_lock_irqsave(&ep->lock, flags);
res = 0;
if (list_empty(&ep->rdllist)) {
/*
* We don't have any available event to return to the caller.
* We need to sleep here, and we will be wake up by
* ep_poll_callback() when events will become available.
*/
init_waitqueue_entry(&wait, current);
wait.flags |= WQ_FLAG_EXCLUSIVE;
/*
* 将当前进程加入到eventpoll的等待队列中,
* 等待文件状态就绪或直到超时,或被
* 信号中断。
*/
__add_wait_queue(&ep->wq, &wait);
for (;;) {
/*
* We don't want to sleep if the ep_poll_callback() sends us
* a wakeup in between. That's why we set the task state
* to TASK_INTERRUPTIBLE before doing the checks.
*/
set_current_state(TASK_INTERRUPTIBLE);
/*
* 如果就绪队列不为空,也就是说已经有文件的状态
* 就绪或者超时,则退出循环。
*/
if (!list_empty(&ep->rdllist) || !jtimeout)
break;
/*
* 如果当前进程接收到信号,则退出
* 循环,返回EINTR错误
*/
if (signal_pending(current)) {
res = -EINTR;
break;
}
spin_unlock_irqrestore(&ep->lock, flags);
/*
* 主动让出处理器,等待ep_poll_callback()将当前进程
* 唤醒或者超时,返回值是剩余的时间。从这里开始
* 当前进程会进入睡眠状态,直到某些文件的状态
* 就绪或者超时。当文件状态就绪时,eventpoll的回调
* 函数ep_poll_callback()会唤醒在ep->wq指向的等待队列中的进程。
*/
jtimeout = schedule_timeout(jtimeout);
spin_lock_irqsave(&ep->lock, flags);
}
__remove_wait_queue(&ep->wq, &wait);
set_current_state(TASK_RUNNING);
}
/* Is it worth to try to dig for events ? */
/*
* ep->ovflist链表存储的向用户传递事件时暂存就绪的文件。
* 所以不管是就绪队列ep->rdllist不为空,或者ep->ovflist不等于
* EP_UNACTIVE_PTR,都有可能现在已经有文件的状态就绪。
* ep->ovflist不等于EP_UNACTIVE_PTR有两种情况,一种是NULL,此时
* 可能正在向用户传递事件,不一定就有文件状态就绪,
* 一种情况时不为NULL,此时可以肯定有文件状态就绪,
* 参见ep_send_events()。
*/
eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
spin_unlock_irqrestore(&ep->lock, flags);
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/
/*
* 如果没有被信号中断,并且有事件就绪,
* 但是没有获取到事件(有可能被其他进程获取到了),
* 并且没有超时,则跳转到retry标签处,重新等待
* 文件状态就绪。
*/
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && jtimeout)
goto retry;
/*
* 返回获取到的事件的个数或者错误码
*/
return res;
}
ep_poll()的主要过程是:首先将超时时间(以毫秒为单位)转换为jiffies时间,然后检查是否有事件发生,如果没有事件发生,则将当前进程加入到eventpoll中的等待队列中,直到事件发生或者超时。如果有事件发生,则调用ep_send_events()将发生的事件传入用户空间的内存。ep_send_events()函数将用户传入的内存简单封装到ep_send_events_data结构中,然后调用ep_scan_ready_list()将就绪队列中的事件传入用户空间的内存。
三、ep_scan_ready_list()函数
源码及分析如下:
[cpp] view
plaincopy
/**
* ep_scan_ready_list - Scans the ready list in a way that makes possible for
* the scan code, to call f_op->poll(). Also allows for
* O(NumReady) performance.
*
* @ep: Pointer to the epoll private data structure.
* @sproc: Pointer to the scan callback.
* @priv: Private opaque data passed to the @sproc callback.
*
* Returns: The same integer error code returned by the @sproc callback.
*/
static int ep_scan_ready_list(struct eventpoll *ep,
int (*sproc)(struct eventpoll *,
struct list_head *, void *),
void *priv)
{
int error, pwake = 0;
unsigned long flags;
struct epitem *epi, *nepi;
LIST_HEAD(txlist);
/*
* We need to lock this because we could be hit by
* eventpoll_release_file() and epoll_ctl().
*/
/*
* 获取互斥锁,该互斥锁在移除eventpoll文件(eventpoll_release_file() )、
* 操作文件描述符(epoll_ctl())和向用户传递事件(epoll_wait())之间进行互斥
*/
mutex_lock(&ep->mtx);
/*
* Steal the ready list, and re-init the original one to the
* empty list. Also, set ep->ovflist to NULL so that events
* happening while looping w/out locks, are not lost. We cannot
* have the poll callback to queue directly on ep->rdllist,
* because we want the "sproc" callback to be able to do it
* in a lockless way.
*/
spin_lock_irqsave(&ep->lock, flags);
/*
* 将就绪队列中就绪的文件链表暂存在临时
* 表头txlist中,并且初始化就绪队列。
*/
list_splice_init(&ep->rdllist, &txlist);
/*
* 将ovflist置为NULL,表示此时正在向用户空间传递
* 事件。如果此时有文件状态就绪,不会放在
* 就绪队列中,而是放在ovflist链表中。
*/
ep->ovflist = NULL;
spin_unlock_irqrestore(&ep->lock, flags);
/*
* Now call the callback function.
*/
/*
* 调用ep_send_events_proc()将就绪队列中的事件
* 存入用户传入的内存中。
*/
error = (*sproc)(ep, &txlist, priv);
spin_lock_irqsave(&ep->lock, flags);
/*
* During the time we spent inside the "sproc" callback, some
* other events might have been queued by the poll callback.
* We re-insert them inside the main ready-list here.
*/
/*
* 在调用sproc指向的函数将就绪队列中的事件
* 传递到用户传入的内存的过程中,可能有文件
* 状态就绪,这些事件会暂存在ovflist链表中,
* 所以这里要将ovflist中的事件移到就绪队列中。
*/
for (nepi = ep->ovflist; (epi = nepi) != NULL;
nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
/*
* We need to check if the item is already in the list.
* During the "sproc" callback execution time, items are
* queued into ->ovflist but the "txlist" might already
* contain them, and the list_splice() below takes care of them.
*/
if (!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink, &ep->rdllist);
}
/*
* We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
* releasing the lock, events will be queued in the normal way inside
* ep->rdllist.
*/
/*
* 重新初始化ovflist,表示传递事件已经完成,
* 之后再有文件状态就绪,这些事件会直接
* 放在就绪队列中。
*/
ep->ovflist = EP_UNACTIVE_PTR;
/*
* Quickly re-inject items left on "txlist".
*/
/*
* 如果sproc指向的函数ep_send_events_proc()中处理出错或者某些文件的
* 触发方式设置为水平触发(Level Trigger),txlist中可能还有事件,需要
* 将这些就绪的事件重新添加回eventpoll文件的就绪队列中。
*/
list_splice(&txlist, &ep->rdllist);
if (!list_empty(&ep->rdllist)) {
/*
* Wake up (if active) both the eventpoll wait list and
* the ->poll() wait list (delayed after we release the lock).
*/
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
mutex_unlock(&ep->mtx);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return error;
}
ep_scan_ready_list()函数的参数sproc指向的函数是ep_send_events_proc(),参见ep_send_events()函数。
四、ep_send_events_proc()函数
[cpp] view
plaincopy
/*
* @head:已经就绪的文件列表
* @priv:用来存储已经就绪的文件
*/
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
void *priv)
{
struct ep_send_events_data *esed = priv;
int eventcnt;
unsigned int revents;
struct epitem *epi;
struct epoll_event __user *uevent;
/*
* We can loop without lock because we are passed a task private list.
* Items cannot vanish during the loop because ep_scan_ready_list() is
* holding "mtx" during this call.
*/
for (eventcnt = 0, uevent = esed->events;
!list_empty(head) && eventcnt < esed->maxevents;) {
epi = list_first_entry(head, struct epitem, rdllink);
list_del_init(&epi->rdllink);
/*
* 调用文件的poll函数有两个作用,一是在文件的唤醒
* 队列上注册回调函数,二是返回文件当前的事件状
* 态,如果第二个参数为NULL,则只是查看文件当前
* 状态。
*/
revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
epi->event.events;
/*
* If the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, ep_scan_ready_list()
* is holding "mtx", so no operations coming from userspace
* can change the item.
*/
if (revents) {
/*
* 向用户内存传值失败时,将当前epitem实例重新放回
* 到链表中,从这里也可以看出,在处理失败后,head指向的
* 链表(对应ep_scan_ready_list()中的临时变量txlist)中
* 有可能会没有完全处理完,因此在ep_scan_ready_list()中
* 需要下面的语句
* list_splice(&txlist, &ep->rdllist);
* 来将未处理的事件重新放回到eventpoll文件的就绪队列中。
*/
if (__put_user(revents, &uevent->events) ||
__put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
/*
* 如果此时已经获取了部分事件,则返回已经获取的事件个数,
* 否则返回EFAULT错误。
*/
return eventcnt ? eventcnt : -EFAULT;
}
eventcnt++;
uevent++;
if (epi->event.events & EPOLLONESHOT)
epi->event.events &= EP_PRIVATE_BITS;
/*
* 如果是触发方式不是边缘触发(Edge Trigger),而是水平
* 触发(Level Trigger),需要将当前的epitem实例添加回
* 链表中,下次读取事件时会再次上报。
*/
else if (!(epi->event.events & EPOLLET)) {
/*
* If this file has been added with Level
* Trigger mode, we need to insert back inside
* the ready list, so that the next call to
* epoll_wait() will check again the events
* availability. At this point, noone can insert
* into ep->rdllist besides us. The epoll_ctl()
* callers are locked out by
* ep_scan_ready_list() holding "mtx" and the
* poll callback will queue them in ep->ovflist.
*/
list_add_tail(&epi->rdllink, &ep->rdllist);
}
}
}
return eventcnt;
}
list是自旋锁保护的。
ready list涉及到add\mod\wait
红黑树涉及到add\mod\del
1 等待队列实现原理
1.1 功能介绍
进程有多种状态,当进程做好准备后,它就处于就绪状态(TASK_RUNNING),放入运行队列,等待内核调度器来调度。当然,同一时刻可能有多个进程进入就绪状态,但是却可能只有1个CPU是空闲的,所以最后能不能在CPU上运行,还要取决于优先级等多种因素。当进程进行外部设备的IO等待操作时,由于外部设备的操作速度一般是非常慢的,所以进程会从就绪状态变为等待状态(休眠),进入等待队列,把CPU让给其它进程。直到IO操作完成,内核“唤醒”等待的进程,于是进程再度从等待状态变为就绪状态。
在用户态,进程进行IO操作时,可以有多种处理方式,如阻塞式IO,非阻塞式IO,多路复用(select/poll/epoll),AIO(aio_read/aio_write)等等。这些操作在内核态都要用到等待队列。
1.2 相关的结构体
typedef struct __wait_queue wait_queue_t;
struct __wait_queue
{
unsigned int flags;
#define WQ_FLAG_EXCLUSIVE 0x01
struct task_struct * task; //等待队列节点对应的进程
wait_queue_func_t func; //等待队列的回调函数,在进程被唤醒
struct list_head task_list;
};
这个是等待队列的节点,在很多等待队列里,这个func函数指针默认为空函数。
但是,在select/poll/epoll函数中,这个func函数指针不为空,并且扮演着重要的角色。
struct __wait_queue_head
{
spinlock_t lock;
struct list_head task_list;
};
typedef struct __wait_queue_head wait_queue_head_t;
这个是等待队列的头部。其中task_list里有指向下一个节点的指针。为了保证对等待队列的操作是原子的,还需要一个自旋锁lock。
这里需要提一下内核队列中被广泛使用的结构体struct list_head。
struct list_head
{
struct list_head *next, *prev;
};
1.3 实现原理
可以看到,等待队列的核心是一个list_head组成的双向链表。
其中,第一个节点是队列的头,类型为wait_queue_head_t,里面包含了一个list_head类型的成员task_list。
接下去的每个节点类型为 wait_queue_t,里面也有一个list_head类型的成员task_list,并且有个指针指向等待的进程。通过这种方式,内核组织了一个等待队列。
那么,这个等待队列怎样与一个事件关联呢?
在内核中,进程在文件操作等事件上的等待,一定会有一个对应的等待队列的结构体与之对应。例如,等待管道的文件操作(在内核看来,管道也是一种文件)的进程都放在管道对应inode.i_pipe->wait这个等待队列中。这样,如果管道文件操作完成,就可以很方便地通过inode.i_pipe->wait唤醒等待的进程。
在大部分情况下(如系统调用read),当前进程等待IO操作的完成,只要在内核堆栈中分配一个wait_queue_t的结构体,然后初始化,把task指向当前进程的task_struct,然后调用add_wait_queue()放入等待队列即可。
但是,在select/poll中,由于系统调用要监视多个文件描述符的操作,因此要把当前进程放入多个文件的等待队列,并且要分配多个wait_queue_t结构体。这时候,在堆栈上分配是不合适的。因为内核堆栈很小。所以要通过动态分配的方式来分配wait_queue_t结构体。除了在一些结构体里直接定义等待队列的头部,内核的信号量机制也大量使用了等待队列。信号量是为了进行进程同步而引入的。与自旋锁不同的是,当一个进程无法获得信号量时,它会把自己放到这个信号量的等待队列中,转变为等待状态。当其它进程释放信号量时,会唤醒等待的进程。
epoll 关键结构体:
struct ep_pqueue
{
poll_table pt;
struct epitem *epi;
};
这个结构体类似于select/poll中的struct poll_wqueues。由于epoll需要在内核态保存大量信息,所以光光一个回调函数指针已经不能满足要求,所以在这里引入了一个新的结构体struct epitem。
struct epitem
{
struct rb_node rbn;
红黑树,用来保存eventpoll
struct list_head rdllink;
双向链表,用来保存已经完成的eventpoll
struct epoll_filefd ffd;
这个结构体对应的被监听的文件描述符信息
int nwait;
poll操作中事件的个数
struct list_head pwqlist;
双向链表,保存着被监视文件的等待队列,功能类似于select/poll中的poll_table
struct eventpoll *ep;
指向eventpoll,多个epitem对应一个eventpoll
struct epoll_event event;
记录发生的事件和对应的fd
atomic_t usecnt;
引用计数
struct list_head fllink;
双向链表,用来链接被监视的文件描述符对应的struct file。因为file里有f_ep_link,
用来保存所有监视这个文件的epoll节点
struct list_head txlink;
双向链表,用来保存传输队列
unsigned int revents;
文件描述符的状态,在收集和传输时用来锁住空的事件集合
};
该结构体用来保存与epoll节点关联的多个文件描述符,保存的方式是使用红黑树实现的hash表。至于为什么要保存,下文有详细解释。它与被监听的文件描述符一一对应。
struct eventpoll
{
spinlock_t lock;
读写锁
struct mutex mtx;
读写信号量
wait_queue_head_t wq;
wait_queue_head_t poll_wait;
struct list_head rdllist;
已经完成的操作事件的队列。
struct rb_root rbr;
保存epoll监视的文件描述符
struct epitem *ovflist;
struct user_struct *user;
};
这个结构体保存了epoll文件描述符的扩展信息,它被保存在file结构体的private_data中。它与epoll文件节点一一对应。通常一个epoll文件节点对应多个被监视的文件描述符。所以一个eventpoll结构体会对应多个epitem结构体。
那么,epoll中的等待事件放在哪里呢?见下面
struct eppoll_entry
{
struct list_head llink;
void *base;
wait_queue_t wait;
wait_queue_head_t *whead;
};
与select/poll的struct poll_table_entry相比,epoll的表示等待队列节点的结构体只是稍有不同,
与struct poll_table_entry比较一下。
struct poll_table_entry
{
struct file * filp;
wait_queue_t wait;
wait_queue_head_t * wait_address;
};
由于epitem对应一个被监视的文件,所以通过base可以方便地得到被监视的文件信息。又因为一个文件可能有多个事件发生,所以用llink链接这些事件。
=====================================================================
相关内核代码:
fs/eventpoll.c
判断一个tcp套接字上是否有激活事件:net/ipv4/tcp.c:tcp_poll函数
每个epollfd在内核中有一个对应的eventpoll结构对象.
其中关键的成员是一个readylist(eventpoll:rdllist)
和一棵红黑树(eventpoll:rbr).
eventpoll的红黑树中.红黑树的作用是使用者调用EPOLL_MOD的时候可以快速找到fd对应的epitem。
epoll_ctl的功能是实现一系列操作,如把文件与eventpollfs文件系统的inode节点关联起来。这里要介绍一下eventpoll结构体,它保存在file->f_private中,记录了eventpollfs文件系统的inode节点的重要信息,其中成员rbr保存了该epoll文件节点监视的所有文件描述符。组织的方式是一棵红黑树,这种结构体在查找节点时非常高效。首先它调用ep_find()从eventpoll中的红黑树获得epitem结构体。然后根据op参数的不同而选择不同的操作。如果op为EPOLL_CTL_ADD,那么正常情况下epitem是不可能在eventpoll的红黑树中找到的,所以调用ep_insert创建一个epitem结构体并插入到对应的红黑树中。
ep_insert()首先分配一个epitem对象,对它初始化后,把它放入对应的红黑树。此外,这个函数还要作一个操作,就是把当前进程放入对应文件操作的等待队列。这一步是由下面的代码完成的。
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
......
revents = tfile->f_op->poll(tfile, &epq.pt);
函数先调用init_poll_funcptr注册了一个回调函数ep_ptable_queue_proc,ep_ptable_queue_proc函数会在调用f_op->poll时被执行。
900 static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
901 poll_table *pt)
902 {
903 struct epitem *epi = ep_item_from_epqueue(pt);
904 struct eppoll_entry *pwq;
905
906 if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
907 init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
908 pwq->whead = whead;
909 pwq->base = epi;
910 add_wait_queue(whead, &pwq->wait);
911 list_add_tail(&pwq->llink, &epi->pwqlist);
912 epi->nwait++;
913 } else {
914
915 epi->nwait = -1;
916 }
917 }
该函数分配一个epoll等待队列结点eppoll_entry:一方面把它挂到文件操作的等待队列中,另一方面把它挂到epitem的队列中。此外,它还注册了一个等待队列的回调函数ep_poll_callback。当文件操作完成,唤醒当前进程之前,会调用ep_poll_callback(),把eventpoll放到epitem的完成队列中(注释:通过查看代码,此处应该是把epitem放到eventpoll的完成队列,只有这样才能在epoll_wait()中只要看eventpoll的完成队列即可得到所有的完成文件描述符),并唤醒等待进程。
如果在执行f_op->poll以后,发现被监视的文件操作已经完成了,那么把它放在完成队列中了,并立即把等待操作的那些进程唤醒。
919 if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
920 return -ENOMEM;
963 ep_rbtree_insert(ep, epi);
调用epoll_wait的时候,将readylist中的epitem出列,将触发的事件拷贝到用户空间.之后判断epitem是否需
要重新添加回readylist.
epitem重新添加到readylist必须满足下列条件:
1) epitem上有用户关注的事件触发.
2) epitem被设置为水平触发模式(如果一个epitem被设置为边界触发则这个epitem不会被重新添加到readylist
中,在什么时候重新添加到readylist请继续往下看).
注意,如果epitem被设置为EPOLLONESHOT模式,则当这个epitem上的事件拷贝到用户空间之后,会将
这个epitem上的关注事件清空(只是关注事件被清空,并没有从epoll中删除,要删除必须对那个描述符调用
EPOLL_DEL),也就是说即使这个epitem上有触发事件,但是因为没有用户关注的事件所以不会被重新添加到
readylist中.
epitem被添加到readylist中的各种情况(当一个epitem被添加到readylist如果有线程阻塞在epoll_wait中,那
个线程会被唤醒):
1)对一个fd调用EPOLL_ADD,如果这个fd上有用户关注的激活事件,则这个fd会被添加到readylist.
2)对一个fd调用EPOLL_MOD改变关注的事件,如果新增加了一个关注事件且对应的fd上有相应的事件激活,
则这个fd会被添加到readylist.
3)当一个fd上有事件触发时(例如一个socket上有外来的数据)会调用ep_poll_callback(见eventpoll::ep_ptable_queue_proc),
如果触发的事件是用户关注的事件,则这个fd会被添加到readylist中.
了解了epoll的执行过程之后,可以回答一个在使用边界触发时常见的疑问.在一个fd被设置为边界触发的情况下,
调用read/write,如何正确的判断那个fd已经没有数据可读/不再可写.epoll文档中的建议是直到触发EAGAIN
错误.而实际上只要你请求字节数小于read/write的返回值就可以确定那个fd上已经没有数据可读/不再可写.
最后用一个epollfd监听另一个epollfd也是合法的,epoll通过调用eventpoll::ep_eventpoll_poll来判断一个
epollfd上是否有触发的事件(只能是读事件).
以下是个人读代码总结:
1709 SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
1710 int, maxevents, int, timeout)
1588 SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
1589 struct epoll_event __user *, event)
epoll_ctl的机制大致如下:
1360 mutex_lock(&ep->mtx);
1361
1362
1367 epi = ep_find(ep, tfile, fd); //这里就是去ep->rbr 红黑树查找
1368
1369 error = -EINVAL;
1370 switch (op) {
1371 case EPOLL_CTL_ADD:
1372 if (!epi) {
1373 epds.events |= POLLERR | POLLHUP;
1374 error = ep_insert(ep, &epds, tfile, fd);
1375 } else
1376 error = -EEXIST;
1377 break;
1378 case EPOLL_CTL_DEL:
1379 if (epi)
1380 error = ep_remove(ep, epi);
1381 else
1382 error = -ENOENT;
1383 break;
1384 case EPOLL_CTL_MOD:
1385 if (epi) {
1386 epds.events |= POLLERR | POLLHUP;
1387 error = ep_modify(ep, epi, &epds);
1388 } else
1389 error = -ENOENT;
1390 break;
1391 }
1392 mutex_unlock(&ep->mtx);
http://blog.csdn.net/justlinux2010/article/details/8510507
====================================
源码分析
====================================
一、sys_epoll_wait()函数
源码及分析如下所示:
[cpp] view
plaincopy
/*
* Implement the event wait interface for the eventpoll file. It is the kernel
* part of the user space epoll_wait(2).
*/
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
int error;
struct file *file;
struct eventpoll *ep;
/* The maximum number of event must be greater than zero */
/*
* 检查maxevents参数。
*/
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;
/* Verify that the area passed by the user is writeable */
/*
* 检查用户空间传入的events指向的内存是否可写。参见__range_not_ok()。
*/
if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
error = -EFAULT;
goto error_return;
}
/* Get the "struct file *" for the eventpoll file */
/*
* 获取epfd对应的eventpoll文件的file实例,file结构是在epoll_create中创建
*/
error = -EBADF;
file = fget(epfd);
if (!file)
goto error_return;
/*
* We have to check that the file structure underneath the fd
* the user passed to us _is_ an eventpoll file.
*/
/*
* 通过检查epfd对应的文件操作是不是eventpoll_fops
* 来判断epfd是否是一个eventpoll文件。如果不是
* 则返回EINVAL错误。
*/
error = -EINVAL;
if (!is_file_epoll(file))
goto error_fput;
/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
ep = file->private_data;
/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
error_fput:
fput(file);
error_return:
return error;
}
sys_epoll_wait()是epoll_wait()对应的系统调用,主要用来获取文件状态已经就绪的事件,该函数检查参数、获取eventpoll文件后调用ep_poll()来完成主要的工作。在分析ep_poll()函数之前,先介绍一下使用epoll_wait()时可能犯的错误(接下来介绍的就是我犯过的错误):
1、返回EBADF错误
除非你故意指定一个不存在的文件描述符,否则几乎百分百肯定,你的程序有BUG了!从源码中可以看到调用fget()函数返回NULL时,会返回此错误。fget()源码如下:
[cpp] view
plaincopy
struct file *fget(unsigned int fd)
{
struct file *file;
struct files_struct *files = current->files;
rcu_read_lock();
file = fcheck_files(files, fd);
if (file) {
if (!atomic_long_inc_not_zero(&file->f_count)) {
/* File object ref couldn't be taken */
rcu_read_unlock();
return NULL;
}
}
rcu_read_unlock();
return file;
}
主要看这句(struct files_struct *files = current->files;),这条语句是获取描述当前进程已经打开的文件的files_struct结构,然后从这个结构中查找传入的fd对应的file实例,如果没有找到,说明当前进程中打开的文件不包括这个fd,所以几乎百分百肯定是程序设计的问题。我的程序出错,就是因为在父进程中创建了文件描述符,但是将子进程变为守护进程了,也就没有继承父进程中打开的文件。
2、死循环(一般不会犯,但是我是第一次用,犯了)
epoll_wait()中有一个设置超时时间的参数,所以我在循环中没有使用睡眠队列的操作,想依赖epoll的睡眠操作,所以在返回值小于等于0时,直接进行下一次循环,没有充分考虑epoll_wait()的返回值小于0时的不同情况,所以代码写成了下面的样子:
[cpp] view
plaincopy
for(;;) {
......
events = epoll_wait(fcluster_epfd, fcluster_wait_events,
fcluster_wait_size, 3000);
if (unlikely(events <= 0)) {
continue;
}
.......
}
当epoll_wait()返回EBADF或EFAULT时,就会陷入死循环,因此此时还没有进入睡眠的操作。
二、ep_poll()函数
下面来看获取事件的主要函数ep_poll(),源码及分析如下:
[cpp] view
plaincopy
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res, eavail;
unsigned long flags;
long jtimeout;
wait_queue_t wait;
/*
* Calculate the timeout by checking for the "infinite" value (-1)
* and the overflow condition. The passed timeout is in milliseconds,
* that why (t * HZ) / 1000.
*/
/*
* timeout是以毫秒为单位,这里是要转换为jiffies时间。
* 这里加上999(即1000-1),是为了向上取整。
*/
jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?
MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;
retry:
spin_lock_irqsave(&ep->lock, flags);
res = 0;
if (list_empty(&ep->rdllist)) {
/*
* We don't have any available event to return to the caller.
* We need to sleep here, and we will be wake up by
* ep_poll_callback() when events will become available.
*/
init_waitqueue_entry(&wait, current);
wait.flags |= WQ_FLAG_EXCLUSIVE;
/*
* 将当前进程加入到eventpoll的等待队列中,
* 等待文件状态就绪或直到超时,或被
* 信号中断。
*/
__add_wait_queue(&ep->wq, &wait);
for (;;) {
/*
* We don't want to sleep if the ep_poll_callback() sends us
* a wakeup in between. That's why we set the task state
* to TASK_INTERRUPTIBLE before doing the checks.
*/
set_current_state(TASK_INTERRUPTIBLE);
/*
* 如果就绪队列不为空,也就是说已经有文件的状态
* 就绪或者超时,则退出循环。
*/
if (!list_empty(&ep->rdllist) || !jtimeout)
break;
/*
* 如果当前进程接收到信号,则退出
* 循环,返回EINTR错误
*/
if (signal_pending(current)) {
res = -EINTR;
break;
}
spin_unlock_irqrestore(&ep->lock, flags);
/*
* 主动让出处理器,等待ep_poll_callback()将当前进程
* 唤醒或者超时,返回值是剩余的时间。从这里开始
* 当前进程会进入睡眠状态,直到某些文件的状态
* 就绪或者超时。当文件状态就绪时,eventpoll的回调
* 函数ep_poll_callback()会唤醒在ep->wq指向的等待队列中的进程。
*/
jtimeout = schedule_timeout(jtimeout);
spin_lock_irqsave(&ep->lock, flags);
}
__remove_wait_queue(&ep->wq, &wait);
set_current_state(TASK_RUNNING);
}
/* Is it worth to try to dig for events ? */
/*
* ep->ovflist链表存储的向用户传递事件时暂存就绪的文件。
* 所以不管是就绪队列ep->rdllist不为空,或者ep->ovflist不等于
* EP_UNACTIVE_PTR,都有可能现在已经有文件的状态就绪。
* ep->ovflist不等于EP_UNACTIVE_PTR有两种情况,一种是NULL,此时
* 可能正在向用户传递事件,不一定就有文件状态就绪,
* 一种情况时不为NULL,此时可以肯定有文件状态就绪,
* 参见ep_send_events()。
*/
eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
spin_unlock_irqrestore(&ep->lock, flags);
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/
/*
* 如果没有被信号中断,并且有事件就绪,
* 但是没有获取到事件(有可能被其他进程获取到了),
* 并且没有超时,则跳转到retry标签处,重新等待
* 文件状态就绪。
*/
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && jtimeout)
goto retry;
/*
* 返回获取到的事件的个数或者错误码
*/
return res;
}
ep_poll()的主要过程是:首先将超时时间(以毫秒为单位)转换为jiffies时间,然后检查是否有事件发生,如果没有事件发生,则将当前进程加入到eventpoll中的等待队列中,直到事件发生或者超时。如果有事件发生,则调用ep_send_events()将发生的事件传入用户空间的内存。ep_send_events()函数将用户传入的内存简单封装到ep_send_events_data结构中,然后调用ep_scan_ready_list()将就绪队列中的事件传入用户空间的内存。
三、ep_scan_ready_list()函数
源码及分析如下:
[cpp] view
plaincopy
/**
* ep_scan_ready_list - Scans the ready list in a way that makes possible for
* the scan code, to call f_op->poll(). Also allows for
* O(NumReady) performance.
*
* @ep: Pointer to the epoll private data structure.
* @sproc: Pointer to the scan callback.
* @priv: Private opaque data passed to the @sproc callback.
*
* Returns: The same integer error code returned by the @sproc callback.
*/
static int ep_scan_ready_list(struct eventpoll *ep,
int (*sproc)(struct eventpoll *,
struct list_head *, void *),
void *priv)
{
int error, pwake = 0;
unsigned long flags;
struct epitem *epi, *nepi;
LIST_HEAD(txlist);
/*
* We need to lock this because we could be hit by
* eventpoll_release_file() and epoll_ctl().
*/
/*
* 获取互斥锁,该互斥锁在移除eventpoll文件(eventpoll_release_file() )、
* 操作文件描述符(epoll_ctl())和向用户传递事件(epoll_wait())之间进行互斥
*/
mutex_lock(&ep->mtx);
/*
* Steal the ready list, and re-init the original one to the
* empty list. Also, set ep->ovflist to NULL so that events
* happening while looping w/out locks, are not lost. We cannot
* have the poll callback to queue directly on ep->rdllist,
* because we want the "sproc" callback to be able to do it
* in a lockless way.
*/
spin_lock_irqsave(&ep->lock, flags);
/*
* 将就绪队列中就绪的文件链表暂存在临时
* 表头txlist中,并且初始化就绪队列。
*/
list_splice_init(&ep->rdllist, &txlist);
/*
* 将ovflist置为NULL,表示此时正在向用户空间传递
* 事件。如果此时有文件状态就绪,不会放在
* 就绪队列中,而是放在ovflist链表中。
*/
ep->ovflist = NULL;
spin_unlock_irqrestore(&ep->lock, flags);
/*
* Now call the callback function.
*/
/*
* 调用ep_send_events_proc()将就绪队列中的事件
* 存入用户传入的内存中。
*/
error = (*sproc)(ep, &txlist, priv);
spin_lock_irqsave(&ep->lock, flags);
/*
* During the time we spent inside the "sproc" callback, some
* other events might have been queued by the poll callback.
* We re-insert them inside the main ready-list here.
*/
/*
* 在调用sproc指向的函数将就绪队列中的事件
* 传递到用户传入的内存的过程中,可能有文件
* 状态就绪,这些事件会暂存在ovflist链表中,
* 所以这里要将ovflist中的事件移到就绪队列中。
*/
for (nepi = ep->ovflist; (epi = nepi) != NULL;
nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
/*
* We need to check if the item is already in the list.
* During the "sproc" callback execution time, items are
* queued into ->ovflist but the "txlist" might already
* contain them, and the list_splice() below takes care of them.
*/
if (!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink, &ep->rdllist);
}
/*
* We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
* releasing the lock, events will be queued in the normal way inside
* ep->rdllist.
*/
/*
* 重新初始化ovflist,表示传递事件已经完成,
* 之后再有文件状态就绪,这些事件会直接
* 放在就绪队列中。
*/
ep->ovflist = EP_UNACTIVE_PTR;
/*
* Quickly re-inject items left on "txlist".
*/
/*
* 如果sproc指向的函数ep_send_events_proc()中处理出错或者某些文件的
* 触发方式设置为水平触发(Level Trigger),txlist中可能还有事件,需要
* 将这些就绪的事件重新添加回eventpoll文件的就绪队列中。
*/
list_splice(&txlist, &ep->rdllist);
if (!list_empty(&ep->rdllist)) {
/*
* Wake up (if active) both the eventpoll wait list and
* the ->poll() wait list (delayed after we release the lock).
*/
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
mutex_unlock(&ep->mtx);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return error;
}
ep_scan_ready_list()函数的参数sproc指向的函数是ep_send_events_proc(),参见ep_send_events()函数。
四、ep_send_events_proc()函数
[cpp] view
plaincopy
/*
* @head:已经就绪的文件列表
* @priv:用来存储已经就绪的文件
*/
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
void *priv)
{
struct ep_send_events_data *esed = priv;
int eventcnt;
unsigned int revents;
struct epitem *epi;
struct epoll_event __user *uevent;
/*
* We can loop without lock because we are passed a task private list.
* Items cannot vanish during the loop because ep_scan_ready_list() is
* holding "mtx" during this call.
*/
for (eventcnt = 0, uevent = esed->events;
!list_empty(head) && eventcnt < esed->maxevents;) {
epi = list_first_entry(head, struct epitem, rdllink);
list_del_init(&epi->rdllink);
/*
* 调用文件的poll函数有两个作用,一是在文件的唤醒
* 队列上注册回调函数,二是返回文件当前的事件状
* 态,如果第二个参数为NULL,则只是查看文件当前
* 状态。
*/
revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
epi->event.events;
/*
* If the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, ep_scan_ready_list()
* is holding "mtx", so no operations coming from userspace
* can change the item.
*/
if (revents) {
/*
* 向用户内存传值失败时,将当前epitem实例重新放回
* 到链表中,从这里也可以看出,在处理失败后,head指向的
* 链表(对应ep_scan_ready_list()中的临时变量txlist)中
* 有可能会没有完全处理完,因此在ep_scan_ready_list()中
* 需要下面的语句
* list_splice(&txlist, &ep->rdllist);
* 来将未处理的事件重新放回到eventpoll文件的就绪队列中。
*/
if (__put_user(revents, &uevent->events) ||
__put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
/*
* 如果此时已经获取了部分事件,则返回已经获取的事件个数,
* 否则返回EFAULT错误。
*/
return eventcnt ? eventcnt : -EFAULT;
}
eventcnt++;
uevent++;
if (epi->event.events & EPOLLONESHOT)
epi->event.events &= EP_PRIVATE_BITS;
/*
* 如果是触发方式不是边缘触发(Edge Trigger),而是水平
* 触发(Level Trigger),需要将当前的epitem实例添加回
* 链表中,下次读取事件时会再次上报。
*/
else if (!(epi->event.events & EPOLLET)) {
/*
* If this file has been added with Level
* Trigger mode, we need to insert back inside
* the ready list, so that the next call to
* epoll_wait() will check again the events
* availability. At this point, noone can insert
* into ep->rdllist besides us. The epoll_ctl()
* callers are locked out by
* ep_scan_ready_list() holding "mtx" and the
* poll callback will queue them in ep->ovflist.
*/
list_add_tail(&epi->rdllink, &ep->rdllist);
}
}
}
return eventcnt;
}
相关文章推荐
- hadoop 2.6.3 BlockPlacementPolicyDefault源代码分析
- Codeforces Round #324 (Div. 2) C. Marina and Vasya(贪心)
- 预装oemwin8/8.1升级win10ISO文件
- Excel访问方法论及开源库
- HDU 3091 Necklace (状态压缩dp)
- Python编辑器IDLE 简单操作
- linux bridge 的 vxlan 试验
- fuse 安装与使用
- Swift cell与Label的自适应高度
- HDU-ACM-2018(母牛的故事)
- Charles辅助调试接口(转)
- NSClassFromString那点事
- hdoj 1085 Holding Bin-Laden Captive!【母函数】
- 数组去重
- HDU-ACM-2019
- HDU-ACM-2020
- hdoj 1398 Square Coins【母函数】
- hdoj-1028 Ignatius and the Princess III【母函数】
- Unix 中进程间的通信
- foreach循环时动态往数组里添加数据