您的位置:首页 > 其它

Libevent学习----信号事件

2017-06-09 16:00 387 查看
信号是一种异步事件:信号处理函数和程序的主循环是两条不同的执行路线。很明显,信号处理函数需要尽可能快地执行完毕,以确保该信号不被屏蔽太久(信号在处理期间,系统不会再次触发它)。



上面这幅图是网上找到的,很好地从整体上表述了原理,摘抄到最前面

下图是本人自己总结的信号事件代码级原理图





下面是展开分析代码实现

一、初始化信号事件

调用base->evbase = base->evsel->init(base); 实际就是调用epoll_init, base->evbase最终指向了epollop结构,

在epoll_init的最后,调用了evsig_init.

/*设置观察ev_signal_pair[1]的event事件ev_signal, 事件类型为EV_READ|EV_PERSIST,

* 这个event将加入到evmap_io数组中

并且设置信号的统一处理函数evsig_cb/

/* Data structure for the default signal-handling implementation in signal.c
*/
struct evsig_info {
/* Event watching ev_signal_pair[1] */
struct event ev_signal;/*观察ev_signal_pair[1]的event事件*/
/* Socketpair used to send notifications from the signal handler */
evutil_socket_t ev_signal_pair[2];
/* True iff we've added the ev_signal event yet. */
int ev_signal_added;
/* Count of the number of signals we're currently watching. */
int ev_n_signals_added;

/* Array of previous signal handler objects before Libevent started
* messing with them.  Used to restore old signal handlers. */
#ifdef _EVENT_HAVE_SIGACTION
struct sigaction **sh_old;
#else
ev_sighandler_t **sh_old;
#endif
/* Size of sh_old. */
int sh_old_max;
};

/*此函数主要用于设置event base结构里面的struct evsig_info sig*/
*struct evsig_info主要用于实现通用的信号处理
*创建unix socket一端fd对应的event,并将其保存在base->sig.ev_signal事件
*中,在第一次调用event_add添加信号的时候,将ev_signal事件加入I/O事件处理 */
int
evsig_init(struct event_base *base)
{
/*
* Our signal handler is going to write to one end of the socket
* pair to wake up our event loop.  The event loop then scans for
* signals that got delivered.
*/
/*创建一对unix套接字,用于传递信号值*/
if (evutil_socketpair(
AF_UNIX, SOCK_STREAM, 0, base->sig.ev_signal_pair) == -1) {
#ifdef WIN32
/* Make this nonfatal on win32, where sometimes people
have localhost firewalled. */
event_sock_warn(-1, "%s: socketpair", __func__);
#else
event_sock_err(1, -1, "%s: socketpair", __func__);
#endif
return -1;
}
/*设置unix套接字的exec执行时关闭*/
evutil_make_socket_closeonexec(base->sig.ev_signal_pair[0]);
evutil_make_socket_closeonexec(base->sig.ev_signal_pair[1]);
base->sig.sh_old = NULL;
base->sig.sh_old_max = 0;

/*设置unix套接字为非阻塞套接字*/
evutil_make_socket_nonblocking(base->sig.ev_signal_pair[0]);
evutil_make_socket_nonblocking(base->sig.ev_signal_pair[1]);
/*设置观察ev_signal_pair[1]的event事件ev_signal, 事件类型为EV_READ|EV_PERSIST,
* 这个event将加入到evmap_io数组中
*并且设置信号的统一处理函数evsig_cb*/
event_assign(&base->sig.ev_signal, base, base->sig.ev_signal_pair[1],
EV_READ | EV_PERSIST, evsig_cb, base);

base->sig.ev_signal.ev_flags |= EVLIST_INTERNAL;
/*保证信号的处理进入第一优先级队列*/
event_priority_set(&base->sig.ev_signal, 0);
/*设置evsigsel*/
base->evsigsel = &evsigops;

return 0;
}


二、创建信号事件

#define evsignal_new(b, x, cb, arg)
event_new((b), (x), EV_SIGNAL|EV_PERSIST, (cb), (arg))


主要的区别在于此处的x(fd)为signal值, events类型为EV_SIGNAL | EV_PERSIST.

/*
* 该函数完成了 1、event内存的分配; 2、设置可所属的event_base;
* 3、初始化了event结构; 4、设置了fd, events, cb和arg等参数
*/
struct event *
event_new(struct event_base *base, evutil_socket_t fd, short events,
void (*cb)(evutil_socket_t, short, void *), void *arg)
{
struct event *ev;
ev = mm_malloc(sizeof(struct event));             //分配内存
if (ev == NULL)
return (NULL);
//将base,fd,events,cb和arg分别传入对应的结构字段中
if (event_assign(ev, base, fd, events, cb, arg) < 0) {
mm_free(ev);
return (NULL);
}

return (ev);
}


int
event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, short events, void (*callback)(evutil_socket_t, short, void *), void *arg)
{
if (!base)
base = current_base;

_event_debug_assert_not_added(ev);
/*初始化event结构中的元素,如base, fd, callback, callback arg,event等*/
ev->ev_base = base;

ev->ev_callback = callback;
ev->ev_arg = arg;
ev->ev_fd = fd;
ev->ev_events = events;
ev->ev_res = 0;
ev->ev_flags = EVLIST_INIT;
ev->ev_ncalls = 0;
ev->ev_pncalls = NULL;

/* 根据不同的events位值,设置ev_closure, ev_closure指定了event_base执行事件处理器的回调函数时的行为。
* #define EV_CLOSURE_NONE 0 ---默认行为,直接执行回调函数
* #define EV_CLOSURE_SIGNAL 1 ------执行ev.ev_signal.ev_ncalls次回调函数
* #define EV_CLOSURE_PERSIST 2 -----执行完回调函数后,再次将事件处理器加入注册事件队列中*/
if (events & EV_SIGNAL) {
if ((events & (EV_READ|EV_WRITE)) != 0) {
event_warnx("%s: EV_SIGNAL is not compatible with "
"EV_READ or EV_WRITE", __func__);
return -1;
}
ev->ev_closure = EV_CLOSURE_SIGNAL;
} else {
if (events & EV_PERSIST) {
evutil_timerclear(&ev->ev_io_timeout);
ev->ev_closure = EV_CLOSURE_PERSIST;
} else {
ev->ev_closure = EV_CLOSURE_NONE;
}
}

min_heap_elem_init(ev);

if (base != NULL) {
/* 默认情况下只有1个优先级队列, 在创建base(event_base_new_with_config)时,
* 调用event_base_priority_init(base, 1) */
/* by default, we put new events into the middle priority */
ev->ev_pri = base->nactivequeues / 2;
}

_event_debug_note_setup(ev);

return 0;
}


三、通过event_add添加信号和对应的处理函数

event_add调用event_add_internal,然后调用evmap_signal_add

int
evmap_signal_add(struct event_base *base, int sig, struct event *ev)
{
/*此处的evsigsel在evsig_init 中已经赋值为evsigops*/
const struct eventop *evsel = base->evsigsel;
struct event_signal_map *map = &base->sigmap;
struct evmap_signal *ctx = NULL;
/*如果信号值sig大于当前hash组数的最大值,重新分配一下哈希数组的内存*/
if (sig >= map->nentries) {
if (evmap_make_space(
map, sig, sizeof(struct evmap_signal *)) == -1)
return (-1);
}
/*根据sig值获取对应的hash数组*/
GET_SIGNAL_SLOT_AND_CTOR(ctx, map, sig, evmap_signal, evmap_signal_init,
base->evsigsel->fdinfo_len);

/*如果hash数组对应的event队列为空,表明是第一次添加该信号,
*则调用base->evsigsel->add函数来添加该信号的信号处理函数,
*如果ev_signal 还没有加入epoll,添加ev_signal事件到evmap_io数组中,并unix socket值注册到epoll中*/
if (TAILQ_EMPTY(&ctx->events)) {
if (evsel->add(base, ev->ev_fd, 0, EV_SIGNAL, NULL)
== -1)
return (-1);
}
/*将event加入到sigmap队列中*/
TAILQ_INSERT_TAIL(&ctx->events, ev, ev_signal_next);

return (1);
}


如果hash数组对应的event队列为空,则调用base->evsigsel->add函数来初始化队列, 即evsig_add

static const struct eventop evsigops = {
"signal",
NULL,
evsig_add,
evsig_del,
NULL,
NULL,
0, 0, 0
};


static int
evsig_add(struct event_base *base, evutil_socket_t evsignal, short old, short events, void *p)
{
struct evsig_info *sig = &base->sig;
(void)p;

EVUTIL_ASSERT(evsignal >= 0 && evsignal < NSIG);

/* catch signals if they happen quickly */
EVSIGBASE_LOCK();
if (evsig_base != base && evsig_base_n_signals_added) {
event_warnx("Added a signal to event base %p with signals "
"already added to event_base %p.  Only one can have "
"signals at a time with the %s backend.  The base with "
"the most recently added signal or the most recent "
"event_base_loop() call gets preference; do "
"not rely on this behavior in future Libevent versions.",
base, evsig_base, base->evsel->name);
}
evsig_base = base;
evsig_base_n_signals_added = ++sig->ev_n_signals_added;
evsig_base_fd = base->sig.ev_signal_pair[0];
EVSIGBASE_UNLOCK();

event_debug(("%s: %d: changing signal handler", __func__, (int)evsignal));
/*为信号evsignal注册信号处理函数evsig_handler,其实这里每一个信号对应的信号处理函数都是evsig_handler*/
if (_evsig_set_handler(base, (int)evsignal, evsig_handler) == -1) {
goto err;
}

/*如果ev_signal事件(即unix socket事件ev_signal_pair[1])还没有添加,则添加ev_signal事件到evmap_io数组中,
*并unix socket值注册到epoll中*/
if (!sig->ev_signal_added) {
if (event_add(&sig->ev_signal, NULL))
goto err;
sig->ev_signal_added = 1;
}

return (0);

err:
EVSIGBASE_LOCK();
--evsig_base_n_signals_added;
--sig->ev_n_signals_added;
EVSIGBASE_UNLOCK();
return (-1);
}


让我们看看信号处理函数evsig_handler做些什么动作?

动作: 将信号值作为数据在unix socket ev_signal_pair[0]上发送,让ev_signal_pair[1]接收,而ev_signal_pair[1]已经注册到epoll上

所以信号处理函数将触发epoll EPOLLIN 有数据发生, 触发ev_signal事件的回调函数evsig_cb

static void __cdecl
evsig_handler(int sig)
{
int save_errno = errno;
#ifdef WIN32
int socket_errno = EVUTIL_SOCKET_ERROR();
#endif
ev_uint8_t msg;

if (evsig_base == NULL) {
event_warnx(
"%s: received signal %d, but have no base configured",
__func__, sig);
return;
}

#ifndef _EVENT_HAVE_SIGACTION
signal(sig, evsig_handler);
#endif

/* Wake up our notification mechanism */
/*将信号值作为数据在unix socket ev_signal_pair[0]上发送,让ev_signal_pair[1]接收,而ev_signal_pair[1]已经注册到epoll上,所以信号处理函数将触发epoll EPOLLIN 有数据发生*/
msg = sig;
send(evsig_base_fd, (char*)&msg, 1, 0);
errno = save_errno;
#ifdef WIN32
EVUTIL_SET_SOCKET_ERROR(socket_errno);
#endif
}


接下来,我们看看evsig_cb做了什么?

epoll 返回有数据可读, 循环读取数据, 并将数据一个字节一个字节记录到ncaught数组中,因为信号一般是在32以内的整数值

将信号值对应的event添加到激活队列中

最后依旧是处理激活队列, 信号激活队列处理函数event_signal_closure,然后调用对应的event处理函数

/* Callback for when the signal handler write a byte to our signaling socket */
static void
evsig_cb(evutil_socket_t fd, short what, void *arg)
{
static char signals[1024];
ev_ssize_t n;
int i;
//NSIG是信号的个数。定义在系统头文件中
int ncaught[NSIG];
struct event_base *base;

base = arg;

memset(&ncaught, 0, sizeof(ncaught));
/*epoll 返回有数据可读, 循环读取数据, 并将数据一个字节一个字节记录到ncaught数组中,因为信号一般是在32以内的整数值*/
while (1) {
n = recv(fd, signals, sizeof(signals), 0);
if (n == -1) {
int err = evutil_socket_geterror(fd);
if (! EVUTIL_ERR_RW_RETRIABLE(err))
event_sock_err(1, fd, "%s: recv", __func__);
break;
} else if (n == 0) {
/* XXX warn? */
break;
}
for (i = 0; i < n; ++i) {
ev_uint8_t sig = signals[i];
if (sig < NSIG)
ncaught[sig]++;
}
}
/*将信号值对应的event添加到激活队列中*/
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
for (i = 0; i < NSIG; ++i) {
if (ncaught[i])
evmap_signal_active(base, i, ncaught[i]);
}
EVBASE_RELEASE_LOCK(base, th_base_lock);
}


evmap_signal_active的实现,以信号值为key,查找信号hash表,找到对应的hash链中的信号事件,依次加入激活优先队列

void
evmap_signal_active(struct event_base *base, evutil_socket_t sig, int ncalls)
{
struct event_signal_map *map = &base->sigmap;
struct evmap_signal *ctx;
struct event *ev;

EVUTIL_ASSERT(sig < map->nentries);
GET_SIGNAL_SLOT(ctx, map, sig, evmap_signal);

TAILQ_FOREACH(ev, &ctx->events, ev_signal_next)
event_active_nolock(ev, EV_SIGNAL, ncalls);
}


event_active_nolock中res参数为EV_SIGNAL, 所以会在事件中记录ev_ncalls,即信号被调用的次数

void
event_active_nolock(struct event *ev, int res, short ncalls)
{
struct event_base *base;

event_debug(("event_active: %p (fd "EV_SOCK_FMT"), res %d, callback %p",
ev, EV_SOCK_ARG(ev->ev_fd), (int)res, ev->ev_callback));

/* We get different kinds of events, add them together */
if (ev->ev_flags & EVLIST_ACTIVE) {
ev->ev_res |= res;
return;
}

base = ev->ev_base;

EVENT_BASE_ASSERT_LOCKED(base);

ev->ev_res = res;

if (ev->ev_pri < base->event_running_priority)
base->event_continue = 1;

if (ev->ev_events & EV_SIGNAL) {
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
if (base->current_event == ev && !EVBASE_IN_THREAD(base)) {
++base->current_event_waiters;
EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
}
#endif
ev->ev_ncalls = ncalls;
ev->ev_pncalls = NULL;
}

event_queue_insert(base, ev, EVLIST_ACTIVE);

if (EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base);
}


调用event_queue_insert将事件加入激活优先队列中

static void
event_queue_insert(struct event_base *base, struct event *ev, int queue)
{
EVENT_BASE_ASSERT_LOCKED(base);

if (ev->ev_flags & queue) {
/* Double insertion is possible for active events */
if (queue & EVLIST_ACTIVE)
return;

event_errx(1, "%s: %p(fd "EV_SOCK_FMT") already on queue %x", __func__,
ev, EV_SOCK_ARG(ev->ev_fd), queue);
return;
}

if (~ev->ev_flags & EVLIST_INTERNAL)
base->event_count++;

ev->ev_flags |= queue;
switch (queue) {
case EVLIST_INSERTED:
TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next);
break;
case EVLIST_ACTIVE:
base->event_count_active++;
TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri],
ev,ev_active_next);
break;
case EVLIST_TIMEOUT: {
if (is_common_timeout(&ev->ev_timeout, base)) {
struct common_timeout_list *ctl =
get_common_timeout_list(base, &ev->ev_timeout);
insert_common_timeout_inorder(ctl, ev);
} else
min_heap_push(&base->timeheap, ev);
break;
}
default:
event_errx(1, "%s: unknown queue %x", __func__, queue);
}
}


处理激活队列的函数,在执行ev_signal事件的回调函数时(注意ev_signal事件的优先级必须是0),将信号事件根据优先级添加入激活优先队列,然后还是继续调用event_process_active, 处理其它优先队列中的回调函数。

static int
event_process_active(struct event_base *base)
{
/* Caller must hold th_base_lock */
struct event_list *activeq = NULL;
int i, c = 0;

for (i = 0; i < base->nactivequeues; ++i) {
if (TAILQ_FIRST(&base->activequeues[i]) != NULL) {
base->event_running_priority = i;
activeq = &base->activequeues[i];
c = event_process_active_single_queue(base, activeq);
if (c < 0) {
base->event_running_priority = -1;
return -1;
} else if (c > 0)
break; /* Processed a real event; do not
* consider lower-priority events */
/* If we get here, all of the events we processed
* were internal.  Continue. */
}
}

event_process_deferred_callbacks(&base->defer_queue,&base->event_break);
base->event_running_priority = -1;
return c;
}


信号激活队列处理函数, 对于一个信号,发生了几次,就调用几次回调函数

static inline void
event_signal_closure(struct event_base *base, struct event *ev)
{
short ncalls;
int should_break;

/* Allows deletes to work */
ncalls = ev->ev_ncalls;
if (ncalls != 0)
ev->ev_pncalls = &ncalls;
EVBASE_RELEASE_LOCK(base, th_base_lock);
while (ncalls) {
ncalls--;
ev->ev_ncalls = ncalls;
if (ncalls == 0)
ev->ev_pncalls = NULL;
(*ev->ev_callback)(ev->ev_fd, ev->ev_res, ev->ev_arg);

EVBASE_ACQUIRE_LOCK(base, th_base_lock);
should_break = base->event_break;
EVBASE_RELEASE_LOCK(base, th_base_lock);

if (should_break) {
if (ncalls != 0)
ev->ev_pncalls = NULL;
return;
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  libevent