您的位置:首页 > 其它

libevent总结(下)

2016-06-23 10:04 316 查看
本文转载自:(http://blog.csdn.net/qq_15457239/article/details/51320917)

八、统一定时器事件和I/O事件详解

和信号事件相比,把定时器事件和I/O事件统一起来就变得十分容易了,为什么?因为I/O复用机制如select(),poll(),epoll_wait()都允许设置一个最大等待时间^_^。So,让我们来看看libevent是怎样做的吧。PS:实际上很多事件驱动的软件都是这样做的。

1.实现方法

核心就是在每次事件循环中设置I/O复用的最大等待时间为定时器小顶堆中的顶节点的时间(即将要最早发送的定时器事件)。当然,如果进入一个事件循环时,激活事件队列不为空(即有就绪事件没有被处理),则设置最大等待时间为0。

具体的代码在源文件 event.c 的 event_base_loop()中:

/*event_dispatch调用了这个函数,事件驱动机制的核心循环,在这个事件循环中监听事件并处理就绪的事件*/
int
event_base_loop(struct event_base *base, int flags)
{
......
/*flags目前没有什么作用,传递的都是0,如果当前没有被激活的事件,从小顶堆中
取出时间,作为回调epoll_wait第三个参数*/
if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base, &tv_p);//根据Timer事件计算IO复用的最大等待时间
} else {
/*如果仍然有激活的事件,并不是马上处理,而是将时间便为0,让epoll_wait立刻
返回,按照以前的流程继续走*/
evutil_timerclear(&tv);
}

......
/*这里调用了I/O复用的驱动器,在epoll中相当与是epoll_wait,如果这个函数返回,说明
存在事件需要处理   等待这I/O就绪*/
res = evsel->dispatch(base, evbase, tv_p);

......
/*使用小顶堆的堆顶作为循环的时间是将定时事件融合到I/O机制中的关键,
在这个函数中将合适的超时事件添加到激活队列中*/
timeout_process(base);
/*根据活跃事件的个数(event_count_active)来进行处理*/
if (base->event_count_active) {
/*处理事件的函数*/
event_process_active(base);
if (!base->event_count_active && (flags & EVLOOP_ONCE))
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
}
......
}


timeout_next()函数根据堆中具有最小超时值的事件和当前时间来计算等待时间

static int timeout_next(struct event_base *base, struct timeval **tv_p)
{
struct timeval now;
struct event *ev;
struct timeval *tv = *tv_p;
<pre name="code" class="cpp">


// 堆的首元素具有最小的超时值 if ((ev = min_heap_top(&base->timeheap)) == NULL) { // 如果没有定时事件,将等待时间设置为NULL,表示一直阻塞直到有I/O事件发生 *tv_p = NULL; return (0); } // 取得当前时间 gettime(base, &now); // 如果超时时间<=当前值,不能等待,需要立即返回 if (evutil_timercmp(&ev->ev_timeout, &now, <=)) { evutil_timerclear(tv); return (0); } // 计算等待的时间=当前时间-最小的超时时间 evutil_timersub(&ev->ev_timeout, &now, tv); return (0);}

2、基础数据结构

Libevent中管理定时事件的数据结构是小顶堆,源码位于文件min_heap.h中,向最小堆中插入、删除元素的时间复杂度是O(lgN),获取最小值得时间复杂度是O(1)。另外,堆是一个完全二叉树,基本存储方式是一个数组

下面是小顶堆的插入元素的典型代码逻辑:

Heap[size++]<-new; // 先放到数组末尾,元素个数+1
// 下面就是 shift_up()的代码逻辑,不断的将 new 向上调整
_child = size;
while(_child>0) // 循环
{
_parent<-(_child-1)/2; // 计算 parent
if(Heap[_parent].key < Heap[_child].key)
break; // 调整结束,跳出循环
swap(_parent, _child); // 交换 parent 和 child
}


Libevent对插入操作进行了优化:

// 下面就是 shift_up()的代码逻辑,不断的将 new 的“预留位置”向上调整
_hole = size; // _hole 就是为 new 预留的位置,但并不立刻将 new 放上
while(_hole>0) // 循环
{
_parent<-(_hole-1)/2; // 计算 parent
if(Heap[_parent].key < new.key)
break; // 调整结束,跳出循环
Heap[_hole] = Heap[_parent]; // 将 parent 向下调整
_hole = _parent; // 将_hole 调整到_parent
}
Heap[_hole] = new; // 调整结束,将 new 插入到_hole 指示的位置
size++; // 元素个数+1


注:以上伪代码来自libevent源码深度剖析

3、总结

Libevent实际上是利用最小堆去管理定时事件(当定时事件很少时,可以用链表,redis就是这么干的),然后用最小堆中时间最近的定时事件的时间去设置I/O复用的最大等待时间,从而实现了定时器事件和I/O事件的统一。从而,我们将三类事件全部统一到了事件主循环中去了。

注:libevent1.4.12中的epoll没有提供边沿触发,而是使用的默认的水平触发。另外,我想说的是libevent1.4.12不支持持久的定时器事件。我将在我的简化版的libevent(likevent)中增加这个功能。

九、选择最优的I/O复用

1、将I/O复用封装成事件多路分发器

前面我们说过,Libevet本身是一种典型的Reactor模式,Reactor模式中有一个组件叫做事件多路分发器,这个组件实际上就是对某一种I/O复用的封装。那么问题来了,每种系统下提供的I/O复用机制不全相同,即使是同一个操作系统中提供的接口也有多种,那么怎么统一这些I/O复用机制来提供一个标准的事件多路分发器给其他组件使用呢?Java中,我们可以采用接口;c++中,我们可以采用包含虚函数的类。Libevent展现了一种c中实现统一接口的方法,带有函数指针的结构体。

Libevent支持多种I/O复用技术的关键就在于结构体eventop:

struct event_op {
const char *name;//io复用的名字
void *(*init)(struct event_base *);//初始化
int (*add)(void *, struct event *);//注册事件
int (*del)(void *, struct event *);//删除事件
int (*dispatch)(struct event_base *, void *, struct timeval *);//事件分发
void (*dealloc)(struct event_base *, void *);//销毁资源
/* set if we need to reinitialize the event base */
int need_reinit;
};


每种I/O复用机制都必须提供这五个函数的实现,从而完成自身的初始化、销毁,对事件的注册、注销和分发,下面以epoll为例

static void *epoll_init (struct event_base *);
static int epoll_add    (void *, struct event *);
static int epoll_del    (void *, struct event *);
static int epoll_dispatch   (struct event_base *, void *, struct timeval *);
static void epoll_dealloc   (struct event_base *, void *);

const struct event_op epollops = {
"epoll",
epoll_init,
epoll_add,
epoll_del,
epoll_dispatch,
epoll_dealloc,
1 /* need reinit */
};


epollops和epoll对五个函数接口的实现定义在epoll.c文件中,对外是不可见的,从而实现了信息隐藏。

2、Libevent怎么选择最优的I/O复用机制

这里可以分解成两个问题:

(1)怎么知道有哪些I/O复用机制可用

Libevent的编译用的是autotools,她的编译脚本configure在执行时会检测系统中提供的api,并生成一个存放测试结果的头文件(例如:如果测试得知系统中有epoll,则在存放测试结果的头文件中加入一个宏,即#define HAVE_EPOLL)。同理,我们就可以知道哪些I/O复用机制可用。

(2)如何选择最优的I/O复用机制

Libevent中由base->evbase去存放唯一一个会使用的事件多路分发器实例,但是如果系统中有多个I/O多路复用机制,我们在初始化base->evbase前就有多个事件多路分发器实例,应该选哪个来初始化base->evbase呢?答案就是对所有的I/O复用机制按性能进行排序,然后按性能由低到高将对应的事件多路分发器实例放到一个数组里,将该数组中的所有事件多路分发器实例由前向后依次赋予base->evbase,这样就能保证最后base->evbase中存放的是最优的I/O复用机制实现的事件多路分发器实例。

下面是实现这一机制的核心代码:

static const struct event_op *eventops[] = {
#ifdef HAVE_EVENT_PORTS
&evportops,
#endif
#ifdef HAVE_SELECT
&selectops,
#endif
#ifdef HAVE_POLL
&pollops,
#endif
#ifdef HAVE_EPOLL
&epollops,
#endif
#ifdef HAVE_WORKING_KQUEUE
&event_op kqops,
#endif
#ifdef HAVE_DEVPOLL
&event_op devpollops,
#endif
#ifdef WIN32
&struct event_op win32ops,
#endif
NULL
};

const struct event_op epollops = {
"epoll",
epoll_init,
epoll_add,
epoll_del,
epoll_dispatch,
epoll_dealloc,
1 /* need reinit */
};

base->evbase = NULL;
/*寻找合适的I/O复用机制,在这里说明,libevent库使用的I/O机制是在编译的时候确定的
其实evbase和某一个复用机制的关系就像类和对象的关系一样,复用机制是evbase的具体实现
*/
for (i = 0; eventops[i] && !base->evbase; i++) {

/*eventops 是一个全局的结构体,结构体中都是不同内核所支持的几种I/O复用机制*/
base->evsel = eventops[i];
/*注意:在这里调用了一个非常重要的函数base->evsel->init(base),使用这个回调函数来初始
化套接着的信息,如果使用的是epoll_wait复用机制,这个回调函数中最重要的
就是epoll_create函数....
*/
/*在回调函数中的初始化就是做一些和系统调用相关的操作,注意回调函数的的返回值是一个void*
但是这个指针是和epoll_wait联通的桥梁,返回的就是eventops[i]这个结构体指针*/
base->evbase = base->evsel->init(base);
}


3、总结

Libevent通过函数指针实现了对多种I/O复用机制的支持,同时也展现了c语言中的条件编译的应用,实际上我们完全可以更改configure生成的头文件来手动选择用哪个I/O复用机制。后面,我将提供一份我针对linux简化了的libevent(likevent),供大家参考。

九、时间缓存和校正

1、原理

如果《libevent源码深度剖析》中所说,如果系统支持monotonic时间,该时间是系统从boot后到现在的时间,因此不需要执行校正(归根到底,是因为用户不能手动更改monotonic时间)。否则,就要在事件循环中执行时间校验。why?你想想啦,如果你加了个定时器事件,准备两个小时后处理(或许是放音乐叫你起床),结果有个就家伙恶作剧把系统时间往前调了一个小时,然后电脑放音乐的时候已经过了3个小时了,今天的工资估计就扣的差不多了,哈哈。这种情况,libevent是可以帮你处理的。但是,有一点必须注意,如果那个家伙也了解libevent的原理(而且你的系统不支持monotonic时间),他可能把系统时间往后调1个小时,这个时候libevent就帮不了你了,可能还没睡着,音乐就响了。那么,libevent是如何处理系统时间被往前调了这个情况的呢?(当然,如果你的系统支持monotonic时间,libevent就不会操这么多心了)

在回答上面那个问题之前,我们必须要回答的一个问题是服务器为什么一般都要做时间缓存,需要的时候直接从系统取不就可以了吗?效率。现在的服务器都要求有很高的效率,然后系统调用是一种非常消耗cpu资源的行为,它伴随着用户空间和内核空间的切换。所以我们就把时间缓存在用户空间,大部分时候需要时间的话,就直接读时间缓存啦,等适当的时间再更新时间缓存。libevent正是处于效率的考虑,也用了时间缓存机制。

libevent更新时间缓存的时机是事件循环中单次循环结束后。时间缓存tv_cache放在base对象中,在base对象中还有一个时间缓存的副本event_tv,跟新这个副本的时机是单次下次循环开始前(注:单次和下次只是表示紧挨着的两次循环)。在跟新时间缓存的副本event_tv(时间缓存tv_cache存放的已经是当前时间),如果一切正常,在更新event_tv前,是不是应该event_tv<=tv_cache(因为这时,event_tv表示过去,而tv_cache表示现在嘛),要是,event_tv>tv_cache(表示时间倒流了,一个美妙的幻想。),libevent就会明白,系统时间一定是被哪个家伙偷偷修改了。于是,libevent就根据event_tv和tv_cache的时间差来调整时间堆里面每个定时器事件的时间(libevent默认为系统时间没向前调了event_tv-tv_cache的时间)。这样以来,就再也不用担心系统时间没小伙伴恶意往前调了。但是,系统时间往后调了怎么办,libevent就只能表示很无奈了。

2、下面是时间校验的核心代码

static void
timeout_correct(struct event_base *base, struct timeval *tv)
{
struct event **pev;
unsigned int size;
struct timeval off;

if (use_monotonic)//有monotonic时间的支持,就是这么任性
return;

/* Check if time is running backwards */
/*tv <----- tv_cache*/
gettime(base, tv);
/*本来event_tv应该小于tv_cache
如果 tv< event_tv 表明用户向前调整了时间,需要校正*/
if (evutil_timercmp(tv, &base->event_tv, >=)) {
base->event_tv = *tv;
return;
}

event_debug(("%s: time is running backwards, corrected",
__func__));
/*计算时间差值*/
evutil_timersub(&base->event_tv, tv, &off);

/*
* We can modify the key element of the node without destroying
* the key, beause we apply it to all in the right order.
*/
/*调整定时事件的小顶堆*/
pev = base->timeheap.p;
size = base->timeheap.n;
for (; size-- > 0; ++pev) {
struct timeval *ev_tv = &(**pev).ev_timeout;
evutil_timersub(ev_tv, &off, ev_tv);
}
/* Now remember what the new time turned out to be. */
/*更新event_tv为tv_cache*/
base->event_tv = *tv;
}


总结:一般的服务器都会有时间缓存这种机制,libevent这样一个以高性能为目标的库当然也不例外。另外,libevent能够发现系统时间是否被往前调了,从而调整时间堆。

系统时间要是被往后调了,那么libevent就玩不动了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: