您的位置:首页 > 其它

libevent源码学习-----时间管理

2017-10-12 16:28 459 查看
libevent监听的event有以下几种

文件描述符/套接字,没有设定超时时长

信号

文件描述符/套接字,设定超时时长

对于时间,libevent内部的时间管理是通过最小堆实现的,原因如下

既然某些fd有规定的超时时长,那么io多路复用函数就不能永久阻塞,需要设定一个超时时长(最后一个参数)

用户在使用event_add设定的时间是相对于event_add调用的相对时间,这就导致所有具有超时时长的event什么时候超时是杂乱无章的,没办法为io函数设定某个时长

针对以上原因,libevent内部将所有的event超时时长全部转化为绝对时间,有以下几点好处

可以对所有的超时时间进行排序,获得最早超时的那个event

判断是否超时时只需和现有时间比较大小,不需要每次都进行相对时间的判断

可以采用最小堆存储所有具有超时时间的event,如果堆顶event未超时,那么所有的event都不会超时,可以选择这个event的超时时长最为io函数的阻塞时间

使用绝对时间带来的麻烦是如果event是一个永久事件,那么当event被激活后仍然需要重新注册到base中,此时因为event的时间是绝对时间的缘故,不能够直接调用event_add_internal添加event,而是需要重新计算超时时间再添加,这就导致仍然需要在event中存储用户提供的超时时长,在重新添加之前计算绝对时间的超时时间

/*
* event_add调用的内部函数,用于将event添加到base的注册队列中
* 同时添加到相应的map中
*
* 注意:这个函数不仅仅只由event_add调用,还有event_persist_closure调用
* 由这个函数调用是因为当具有超时时间的event被激活后,需要先从base中的所有队列中删除
* 然后重新计算超时时间,再重新添加到base中,所以又重新调用了这个函数
*
* 注意:event不仅代表文件描述符,还有可能是信号的event,当是信号时,会递归
* 调用两遍这个函数,第一遍调用时判断是信号则调用evsig_map_add函数,在这个函数中
* 进行两步
*   将信号event添加到base的信号map中
*   调用evsigops的add函数,即调用evsig_add,这个函数中绑定内部信号处理函数,同时将socketpair的event
*   添加到base中,使用event_add,也就是调用event_add_internal
* 不过只会执行两遍,因为在evsig_add中会进行判断,只有第一次添加socketpair的event时才会执行第二次调用
*
* 见evsig_add
*/
static inline int
event_add_internal(struct event *ev, const struct timeval *tv,
int tv_is_absolute)
{
struct event_base *base = ev->ev_base;
int res = 0;
int notify = 0;

EVENT_BASE_ASSERT_LOCKED(base);
/* ... */

/*
* prepare for timeout insertion further below, if we get a
* failure on any step, we should not change any state.
*/
/*
* 这一步主要是用来让最小堆增加一个位置,并没有实际添加到最小堆上
* 判断条件是这是一个具有超时时间的event,同时在最小堆中没有这个event
* 这样就需要在最小堆上留出一个位置来存放这个event
* 因为用户可以对同一个event调用event_add多次,这就可能两次event_add除了超时时间不同
* 其他的都相同,这样就不需要在留出一个位置,直接替换以前的就可以
*
* 如果已经在最小堆中,ev_flags将是EVLIST_TIMEOUT
*/
if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
if (min_heap_reserve(&base->timeheap,
1 + min_heap_size(&base->timeheap)) == -1)
return (-1);  /* ENOMEM == errno */
}

/*
* we should change the timeout state only if the previous event
* addition succeeded.
*/
/* 这一步开始处理具有超时时间的event */
if (res != -1 && tv != NULL) {
struct timeval now;
int common_timeout;

/*
* for persistent timeout events, we remember the
* timeout value and re-add the event.
*
* If tv_is_absolute, this was already set.
*/
/*
* event分为永久的和一次的,是用户在调用event_new时传入的参数
* 对于永久的event,在被激活一次之后还需要继续监听,
* 而对于有超时时间的event,需要对event的超时时间进行更新
*
* 为什么:因为base在进行超时判断时是通过绝对时间进行判断的,也就是说在添加event的时候
* 将当前时间+时间间隔获得的绝对时间作为判断超时的依据
* 这样做的原因是不需要在判断超时时比较时间差,只需要比较当前时间和超时时间即可
*
* 所以,如果event是永久的,那么再处理过一次之后需要更新超时绝对时间,方法就是保存用户
* 传入的时间间隔,再下一次添加时使用
*
* tv_is_absolute是传入的参数,event_add传入时设为0,表示传入的时间是时间间隔,不是绝对时间
*/
if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute)
ev->ev_io_timeout = *tv;

/*
* we already reserved memory above for the case where we
* are not replacing an existing timeout.
*/

/*
* 对于用户对同一个event调用event_add多次的情况,先将以前的从最小堆
* 中删除,再添加更新的这个
*/
if (ev->ev_flags & EVLIST_TIMEOUT) {
/* XXX I believe this is needless. */
if (min_heap_elt_is_top(ev))
notify = 1;
event_queue_remove(base, ev, EVLIST_TIMEOUT);
}

/* Check if it is active due to a timeout.  Rescheduling
* this timeout before the callback can be executed
* removes it from the active list. */
/*
* 如果此时event正处于激活队列中,从激活队列删了
* 如果是信号,将其发生次数设为0,就不会调用信号处理函数了
*/
if ((ev->ev_flags & EVLIST_ACTIVE) &&
(ev->ev_res & EV_TIMEOUT)) {
if (ev->ev_events & EV_SIGNAL) {
/* See if we are just active executing
* this event in a loop
*/
if (ev->ev_ncalls && ev->ev_pncalls) {
/* Abort loop */
*ev->ev_pncalls = 0;
}
}

event_queue_remove(base, ev, EVLIST_ACTIVE);
}

/* 计算超时绝对事件 */
gettime(base, &now);

common_timeout = is_common_timeout(tv, base);
if (tv_is_absolute) {
ev->ev_timeout = *tv;
} else if (common_timeout) {
struct timeval tmp = *tv;
tmp.tv_usec &= MICROSECONDS_MASK;
evutil_timeradd(&now, &tmp, &ev->ev_timeout);
ev->ev_timeout.tv_usec |=
(tv->tv_usec & ~MICROSECONDS_MASK);
} else {
evutil_timeradd(&now, tv, &ev->ev_timeout);
}

event_debug((
"event_add: timeout in %d seconds, call %p",
(int)tv->tv_sec, ev->ev_callback));

/* 调用event_queue_insert()将具有超时时间的event添加到base最小堆中 */
event_queue_insert(base, ev, EVLIST_TIMEOUT);

/* See if the earliest timeout is now earlier than it
* was before: if so, we will need to tell the main
* thread to wake up earlier than it would
* otherwise. */
if (min_heap_elt_is_top(ev))
notify = 1;

}
return (res);
}


这个函数被event_add调用,用于添加所有的event到base中,很明显函数内部是调用event_queue_insert添加event的,event_add_internal主要用于分配event,判断它应该添加到base的哪个地方。

下面是event_queue_insert添加到最小堆的部分

/* 这个函数用于将event根据queue的不同添加到不同的base队列中/或者最小堆中 */
static void
event_queue_insert(struct event_base *base, struct event *ev, int queue)
{
EVENT_BASE_ASSERT_LOCKED(base);

/*
* ev_flags作用如上,
* 注意是或运算,以前的状态仍然保留,其实就是可能同时存在多个队列
*/
ev->ev_flags |= queue;
/*
* 根据queue的不同值插入到不同的队列中
* event_add添加event时为EVLIST_INSERTED,添加到注册队列中
* 激活event时为EVLIST_ACTIVE,添加到激活队列中
* 添加具有超时时间的event时为EVLIST_TIMEOUT,添加到最小堆中
* 注意,对于有超时时间的event,会调用这个函数两次,先注册到队列,再加入到堆中
*/
switch (queue) {
/* ... */
case EVLIST_TIMEOUT: {

min_heap_push(&base->timeheap, ev);
break;
}
default:
event_errx(1, "%s: unknown queue %x", __func__, queue);
}
}


上述这些都是为了能够处理超时event做铺垫,现在转到event_base_loop中

/*
* 实际的事件驱动循环,其实就是一个while循环,每次调用io复用函数进行事件监听
* 监听返回之前将活跃的event都按优先级添加到base的激活队列中
* 回到循环后对base的激活队列中的event按照优先级顺序调用回调函数
* 再根据是否是永久event决定要不要从base的所有队列中删除event
* 对于具有超时时间的event则需要特殊处理,见timeout_process
*/
int
event_base_loop(struct event_base *base, int flags)
{
const struct eventop *evsel = base->evsel;
struct timeval tv;
struct timeval *tv_p;
int res, done, retval = 0;

done = 0;

while (!done) {

tv_p = &tv;

/*
* 这一步是用来获取io复用函数的阻塞时间,此时有两种情况
* 当此时没有处于激活状态的event,就从最小堆中取得堆顶event的超时时间
* 如果有已经激活的event,则阻塞时间为0,直接处理,原因可能是因为其他线程激活了某个时间造成的
* timeout_next函数取得最小堆堆顶元素的超时时间,并与当前时间做差计算时间间隔
* evutil_timeclear直接清零时间
* 这么取阻塞时间的原因见下面
*/
if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base, &tv_p);
} else {
/*
* if we have active events, we just poll new events
* without waiting.
*/
evutil_timerclear(&tv);
}

/*
* 调用Io复用函数的监听函数,开始阻塞/非阻塞的监听
* 超时时间设置为最小堆中堆顶event的超时时间,原因如下
*
* 此时监听的有三种event
* 第一种是没有设置超时时间的,包括信号,所以什么时候返回都不影响
* 第二种是取得最小超时时间的堆顶event,此时可以满足在超时时间返回
* 第三种是最小堆中的其他event,这些event的超时时间在堆顶event之后,因为超时时间是绝对时间
*      也就是说如果堆顶event没有超时,那么其它的event将不可能超时
*      而当最小超时时间后返回处理超时之后重新开始监听,
*      因为是绝对时间,所以不会影响最小堆的其他event的超时
*
* 在返回之间,将活跃的event添加到base的激活队列中
*
* 注意:不处理具有超时时间的event,因为这些event根本就没有添加到io函数中
* 处理这些是在timeout_process函数中
*/
res = evsel->dispatch(base, tv_p);

/*
* 专门处理超时的event
* 注意为什么可以单独处理超时event,因为具有超时时间的event都被添加到最小堆里面了
* 这样,只需要遍历最小堆,用堆元素的超时时间和当前时间比较,就可以判断是否超时
* 其实不需要全部遍历,而是当遇到第一个没有超时的event就可以退出遍历
* 因为最小堆的当前节点永远比两个子节点小,所以子节点的超时时间会更长,不可能超时
*
* 需要遍历而不是只取堆顶的原因是在从io函数返回到执行timeout_process的过程中
* 其他线程可能又向最小堆中添加了超时时间更小的event,这就导致了事先使用的
* 时间的那个event已经不是堆顶元素了
*/
timeout_process(base);
}
return (retval);
}


下面是timeout_process函数

/*
* 将最小堆中超时的event添加到激活队列中
* 此函数由event_base_loop调用
*/
static void
timeout_process(struct event_base *base)
{
/* Caller must hold lock. */
struct timeval now;
struct event *ev;

if (min_heap_empty(&base->timeheap)) {
return;
}

//获取当前时间,用于判断是否超时
gettime(base, &now);

/*
* 需要循环判断而不是只取堆顶的原因
* 因为其它线程有可能添加了更小的超时event
* 见event_base_loop中timeout_process的调用
*/
while ((ev = min_heap_top(&base->timeheap))) {
//当遇到第一个没有超时的event就可以退出了,原因见base的主循环
if (evutil_timercmp(&ev->ev_timeout, &now, >))
break;

/* delete this event from the I/O queues */
/*
* 取出一个就将这个event从所有队列中删除
* 然后再添加到激活队列中
* 原因:因为要重新计算超时时间,需要从所有队列中删除
*/
event_del_internal(ev);

event_debug(("timeout_process: call %p",
ev->ev_callback));
/*
* 将event添加到超时队列中
* 此处需要设置event被激活的原因,EV_TIMEOUT表示由于超时被激活
*/
event_active_nolock(ev, EV_TIMEOUT, 1);
}
}


总结

至此就完成了对具有超时时长的event的监控和处理,libevent的时间管理可以学习的地方在于将相对时间转换成绝对时间,不要一根劲,以为用户提供相对时间程序设计的时候就必须使用相对时间,反而可以变相思考,进行适当转换,便于程序设计。libevent在转换后既解决了io复用函数阻塞时间问题,又提高了时间管理的效率(使用最小堆)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息