您的位置:首页 > 其它

January 20th Wednesday 2010

2010-01-20 15:43 281 查看
Nginx (十二) 事件

ngx_events_block()函数。这个函数在nginx系统起动后解析配置文件中的events{}块时调用的。是event模块的中第一个被调用的函数。

1. NGX_EVENT_MODULE类型的模块进行点数,并给这些模板建立上下文的访问索引;
2. 给event模块创建上下文数组空间;
3. 调用每个event模块中的create_conf()钩子函数,其返回值放在上下文数组中模块相对槽中;
4. 解析event{}块中的配置项参数;
cf->ctx = ctx;
cf->module_type = NGX_EVENT_MODULE;
cf->cmd_type = NGX_EVENT_CONF;

rv = ngx_conf_parse(cf, NULL);
5. 调用event模块中的init_conf()钩子函数。

typedef struct {
ngx_str_t *name;

void *(*create_conf)(ngx_cycle_t *cycle);
char *(*init_conf)(ngx_cycle_t *cycle, void *conf);

ngx_event_actions_t actions;
} ngx_event_module_t;

ngx_event_module_init()函数。在event模块初始化时调用,来做一些初始化操作。具体内容暂时没有搞清楚,先放一下。
ngx_event_process_init()函数。这个函数间接地在ngx_single_process_cycle()函数或ngx_worker_process_init()函数中调用;也就是在工作进程启动时调用。
1. 获取event模块中对应的存放配置信息的上下文内存;
2. ccf->master && ccf->worker_processes > 1 && ecf->accept_mutex时,
ngx_use_accept_mutex = 1;
ngx_accept_mutex_held = 0;
ngx_accept_mutex_delay = ecf->accept_mutex_delay;
否则,ngx_use_accept_mutex = 0;
3. 调用ngx_event_timer_init()函数,初始化event要用到的timer。
ngx_event_timer_init()函数中调用ngx_rbtree_init()这个宏来初始化ngx_event_timer_rbtree树。
ngx_rbtree_init(&ngx_event_timer_rbtree, &ngx_event_timer_sentinel,
ngx_rbtree_insert_timer_value);

ngx_rbtree_init()宏定义如下:
#define ngx_rbtree_init(tree, s, i) /
ngx_rbtree_sentinel_init(s); /
(tree)->root = s; /
(tree)->sentinel = s; /
(tree)->insert = i

ngx_event_timer_rbtree 是用来排序timer的红黑树的root;
ngx_event_timer_sentinel 是哨兵节点;
ngx_rbtree_insert_timer_value()是红黑树节点上的insert钩子函数。这个函数的逻辑是对timer值在红黑树上进行排序。
a. 将待插入节点值与红黑树上的节点进行对比。
若树上节点值比待插入节点值小,沿着其左子节点向下,否则沿着其右子节点向下;直至找到与哨兵节点相同值的节点;
b. 找到插入处后,插入该节点,同时设定其对应的父节点;
c. 将其节点的左右子节点都设为哨兵节点;
d. 最后将这个插入的节点标为红色。
在支持线程的情况下,暂不做说明。

4. 遍历event模块,调用其中的actions.init()钩子函数。
这里要说明的是在遍历时做了下面的检查。
if (ngx_modules[m]->ctx_index != ecf->use) {
continue;
}

下面是event{}配置块中use参数的说明。
use
Syntax: use [ kqueue | rtsig | epoll | /dev/poll | select | poll | eventport ]
Default:

If you have more than one event-model specified at the ./configure script, then you can tell nginx which one do you want to use. By default nginx looks for the most suitable method for your OS at ./configure time.

You can see the available event-models and how you can activate it at the ./configure state here.

也就是说,这个检查是过滤掉不使用的event模块。例如在linux下,不用kqueue等。

5. ngx_timer_resolution && !(ngx_event_flags &
NGX_USE_TIMER_EVENT)情况下,建立SIGALRM信号处理函数(ngx_timer_signal_handler()),并调用setitimer(ITIMER_REAL, &itv, NULL)来启动一个实时的计时器;

下面是ngx_timer_signal_handler()
SIGALRM信号处理函数的定义:
void ngx_timer_signal_handler(int
signo)
{
ngx_event_timer_alarm = 1;
ngx_time_update(0, 0);
}

1.
ngx_event_flags &
NGX_USE_FD_EVENT情况下,先取得系统中进程可打开文件数,根据这个最大打开文件数,在cycle中创建一个files数组。这个files数组是ngx_connection_t类型。
2.
在cycle中创建一个connections数组。connection_n表示connections数组中的元素数目。
3.
在cycle中创建read_events数组,并将其中每个read
event对象,按下面方式设置;
rev[i].closed = 1;
rev[i].instance = 1;
#if (NGX_THREADS)
rev[i].lock = &c[i].lock;
rev[i].own_lock = &c[i].lock;
#endif
9.在cycle中创建write_events数组,并将其中每个write
event对象,按下面方式设置;

wev[i].closed = 1;
#if (NGX_THREADS)

wev[i].lock = &c[i].lock;

wev[i].own_lock = &c[i].lock;
#endif
注:每个ngx_connection_t对象都对应有一个read和一个write事件对象。所以cycle中的read_evnets和write_events数组大小与connection_n一样。
10. 将cycle中的connection数组中每个ngx_connection_t对象的read、write对象设为对应与read_evnets和write_events数组中的成员;同时将这个ngx_connection_t对象用data指针串起来(参见图)。
11. 将free_connections指向刚创建的connections数组。
12. 为cycle中的listening数组中的ngx_listening_t对象创建对应的ngx_connection_t对象。
a) 在这个循环中,先调用ngx_get_connection()函数,从free_connections中取得一个空闲未用的ngx_connection_t对象,做一些相关初始化工作和设置工作,将connection对象的write事件对象的write标志设为1;同时ngx_cycle中的files存在时,ngx_cycle->files[s] = c;
s 为ngx_listening_t对象中的fd。
b) 然后将取得的ngx_connection_t对象与该ngx_listening_t对象相关联;
c->listening = &ls[i];

ls[i].connection = c;
c) 将该ngx_connection_t对象的read事件对象的accept标志设为1;
支持deferred_accept时,从对应的ngx_listening_t对象中获取deferred_accept设定值;
d) 非Win32系统下,将这个ngx_listening_t对象之前的对象中的ngx_connection_t对象中的read事件中删掉NGX_READ_EVENT类型事件,并将其fd设为-1。
e) 一个关键设置。
rev->handler = ngx_event_accept;
f) ngx_use_accept_mutex有效时,进行下一个ngx_listening_t对象的处理。
g) 否则接着是下面工作:
if
(ngx_event_flags & NGX_USE_RTSIG_EVENT) {

if (ngx_add_conn(c) == NGX_ERROR) {

return NGX_ERROR;
}

}
else {

if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {

return NGX_ERROR;
}
}

注:ngx_event_module_init()和ngx_event_process_init()函数设置在ngx_event_core_module模块中。如下:

ngx_module_t
ngx_event_core_module = {

NGX_MODULE_V1,

&ngx_event_core_module_ctx, /* module context */

ngx_event_core_commands,
/* module directives */

NGX_EVENT_MODULE,
/* module type */

NULL, /* init master */

ngx_event_module_init,
/* init module */

ngx_event_process_init,
/* init process */

NULL,
/* init thread */

NULL, /* exit thread */

NULL,
/* exit process */

NULL,
/* exit master */

NGX_MODULE_V1_PADDING
};

ngx_event_accept()函数。
1. 先也是从ngx_cycle->conf_ctx中取得ngx_event_core_module对象对应的ngx_event_cont_t对象;
2. 检查ngx_event_flags标志中若设定了NGX_USE_RTSIG_EVENT,ev实参事件对象中的available设为1;若未设定NGX_USE_KQUEUE_EVENT标志,available标志取ngx_event_cont_t对象中multi_accept的值;
注:ngx_event_t对象中available成员是个计数器。个人猜测是用来表示一个ngx_event_t对象对应的有效的ngx_connection_t对象有多少。目前似乎只有在FreeBSD系统中的KQUEUE架构中才会使available的值大于1。
3. 取出ev对象关联的ngx_connection_t对象和ngx_listening_t对象,并将ev对象中ready设为0;
4. 使用do-while循环,处理ev对象中所有的有效的accept连接。
a) 调用accept()函数,接受一个新的连接,同时创建一个新的socket。
b) ngx_accept_disabled =
ngx_cycle->connection_n / 8
-
ngx_cycle->free_connection_n;
c) 调用ngx_get_connection()为新建的socket获取一个空闲的ngx_connection_t对象;同时新建的socket与获取的ngx_connection_t对象关联起来。
d) 给这个ngx_connection_t对象创建新的pool,创建sockaddr,并将接受到的地址拷贝到sockaddr中。
e) ngx_inherited_nonblocking有效时,且ngx_event_flags中设有NGX_USE_AIO_EVENT标志的情况下,调用ngx_blocking()函数将新建的socket设为阻塞模式;
ngx_inherited_nonblocking无效时,若ngx_event_flags标志没有NGX_USE_AIO_EVENT和NGX_USE_RTSIG_EVENT标志时,调用ngx_nonblocking()函数将新建的socket设为非阻塞模式;
f) 设置recv, send, recv_chain, send_chain钩子函数;
c->recv = ngx_recv;
c->send =
ngx_send;
c->recv_chain = ngx_recv_chain;
c->send_chain = ngx_send_chain;
ngx_recv和ngx_send,ngx_recv_chain和ngx_send_chain都是宏。下面是它们的定义:
#define
ngx_recv ngx_io.recv
#define
ngx_recv_chain ngx_io.recv_chain
#define
ngx_udp_recv ngx_io.udp_recv
#define
ngx_send ngx_io.send
#define
ngx_send_chain ngx_io.send_chain

还记得在epoll_init()函数中有下面这个语句吗?
ngx_io = ngx_os_io;
也就是说ngx_io作为nginx纯一的io接口对象,而ngx_os_io则是针对不同操作系统定制的特定的io对象。它们的结构如下:

typedef struct {
ngx_recv_pt recv;
ngx_recv_chain_pt recv_chain;
ngx_recv_pt udp_recv;
ngx_send_pt send;
ngx_send_chain_pt send_chain;
ngx_uint_t flags;
} ngx_os_io_t;

继续做ngx_os_io相关搜索会发现ngx_os_io
= ngx_linux_io; ngx_os_io = ngx_solaris_io; ngx_os_io = ngx_freebsd_io; ngx_os_io
= ngx_darwin_io; …

我们查找ngx_linux_io这个对象,最后在ngx_linux_init.c文件中有下面的定义:
static
ngx_os_io_t ngx_linux_io = {
ngx_unix_recv,
ngx_readv_chain,
ngx_udp_unix_recv,
ngx_unix_send,
#if
(NGX_HAVE_SENDFILE)
ngx_linux_sendfile_chain,
NGX_IO_SENDFILE
#else
ngx_writev_chain,
0
#endif
};

最后在ngx_recv.c文件中找到ngx_unix_recv()这个函数的实现。如下,其逻辑不难看懂。
ssize_t ngx_unix_recv(ngx_connection_t *c, u_char
*buf, size_t size)
{
ssize_t n;
ngx_err_t err;
ngx_event_t
*rev;

rev = c->read;

do {
n = recv(c->fd, buf, size, 0);

ngx_log_debug3(NGX_LOG_DEBUG_EVENT,
c->log, 0,
"recv: fd:%d %d of
%d", c->fd, n, size);

if (n == 0) {
rev->ready = 0;
rev->eof = 1;
return n;

} else if (n > 0) {

if ((size_t) n < size
&& !(ngx_event_flags
& NGX_USE_GREEDY_EVENT))
{
rev->ready = 0;
}

return n;
}

err = ngx_socket_errno;

if (err == NGX_EAGAIN || err ==
NGX_EINTR) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT,
c->log, err,
"recv() not ready");
n = NGX_AGAIN;

} else {
n = ngx_connection_error(c, err,
"recv() failed");
break;
}

} while (err == NGX_EINTR);

rev->ready = 0;

if (n == NGX_ERROR){
rev->error = 1;
}

return n;
}

注:在KQUEUE支持的情况下, 有另一个ngx_unix_recv()函数的实现。

ngx_unix_send()函数的实现如下。它只有一个版本。
ssize_t
ngx_unix_send(ngx_connection_t
*c, u_char *buf, size_t size)
{
ssize_t n;
ngx_err_t err;
ngx_event_t
*wev;

wev = c->write;

#if (NGX_HAVE_KQUEUE)

if ((ngx_event_flags &
NGX_USE_KQUEUE_EVENT) && wev->pending_eof) {
(void) ngx_connection_error(c,
wev->kq_errno,
"kevent()
reported about an closed connection");
wev->error = 1;
return NGX_ERROR;
}

#endif

for ( ;; ) {
n = send(c->fd, buf, size, 0);

ngx_log_debug3(NGX_LOG_DEBUG_EVENT,
c->log, 0,
"send: fd:%d %d of
%d", c->fd, n, size);

if (n > 0) {
if (n < (ssize_t) size) {
wev->ready = 0;
}

c->sent += n;

return n;
}

err = ngx_socket_errno;

if (n == 0) {
ngx_log_error(NGX_LOG_ALERT,
c->log, err, "send() returned zero");

wev->ready = 0;
return n;
}

if (err == NGX_EAGAIN || err ==
NGX_EINTR) {
wev->ready = 0;

ngx_log_debug0(NGX_LOG_DEBUG_EVENT,
c->log, err,
"send() not
ready");

if (err == NGX_EAGAIN) {
return NGX_AGAIN;
}

} else {
wev->error = 1;
(void) ngx_connection_error(c, err,
"send() failed");
return NGX_ERROR;
}
}
g) 接下来是一些设置;
c->socklen = socklen;
c->listening = ls;
c->local_sockaddr = ls->sockaddr;
c->local_socklen = ls->socklen;

c->unexpected_eof = 1;
h) 将c中的write事件的ready设为1;
i) ngx_event_flags标志中没有NGX_USE_AIO_EVENT,NGX_USE_RTSIG_EVENT时,将c中的read事件的ready也设为1;
j) ev对象中deferred_accept有效时,将c中的read事件的ready也设为1;在KQUEUE环境下,rev->available = 1;
k) 支持线程时,按下面在read, write事件对象中设定锁。
rev->lock = &c->lock;
wev->lock = &c->lock;
rev->own_lock = &c->lock;
wev->own_lock = &c->lock;
l) ev对象中的ngx_listening_t对象的addr_ntop标志有效时,为ev对象中ngx_connection_t对象中的addr_text分配空间,将接受地址转为文本格式。
m) ngx_add_conn && (ngx_event_flags &
NGX_USE_EPOLL_EVENT) == 0时,调用ngx_add_conn(c)函数,为这个ngx_connection_t对象激活read, write事件对象。
n) 调用ngx_listening_t 对象中的handler()钩子函数来处理ev对象中的ngx_connection_t对象。
注:ngx_listening_t对象中的handler()钩子函数在不同模块中有不同制定。如在http模块中ls->handler =
ngx_http_init_connection;, 在mail模块中是ls->handler =
ngx_mail_init_connection;。也就是说真正http或mail协议的逻辑处理从这儿开始。
如果在nginx中要开发另一种协议支持的模块时,在解析完自己的配置块时,在监听的ngx_listening_t对象中设置handler()钩子函数。(详情可以参考http或mail模块。)

o) 最后ngx_event_flags标志中NGX_USE_KQUEUE_EVENT有效时,将ev对象中的available计数器减一。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: