您的位置:首页 > 运维架构 > Nginx

Nginx学习(16)—worker进程循环工作

2014-04-28 12:03 411 查看

worker进程循环工作

Nginx采用信号的IPC方式对worker进程进行控制,其中的ngx_terminate、ngx_quit、ngx_repon都将由在ngx_signal_handler方法根据收到的信号进行处理。而信号的初始化都是在main()中的ngx_init_signals()中完成的。下面看源码吧:

[cpp] view
plaincopy





static void

ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)

{

ngx_int_t worker = (intptr_t) data;

ngx_uint_t i;

ngx_connection_t *c;

/* 从master进程继承过来的全局变量,设置其为worker进程 */

ngx_process = NGX_PROCESS_WORKER;

/* Worker进程初始化,下面详述 */

ngx_worker_process_init(cycle, worker);

/* 更改进程名字,这个很有意思...详见《笔记十五》 */

ngx_setproctitle("worker process");

/* 针对worker子进程中的线程机制的,一般用不到,这里通过设置NGX_THREADS宏来开启 */

#if (NGX_THREADS)

{

ngx_int_t n;

ngx_err_t err;

ngx_core_conf_t *ccf;

ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);

if (ngx_threads_n) {

if (ngx_init_threads(ngx_threads_n, ccf->thread_stack_size, cycle)

== NGX_ERROR)

{

/* fatal */

exit(2);

}

err = ngx_thread_key_create(&ngx_core_tls_key);

if (err != 0) {

ngx_log_error(NGX_LOG_ALERT, cycle->log, err,

ngx_thread_key_create_n " failed");

/* fatal */

exit(2);

}

for (n = 0; n < ngx_threads_n; n++) {

ngx_threads
.cv = ngx_cond_init(cycle->log);

if (ngx_threads
.cv == NULL) {

/* fatal */

exit(2);

}

if (ngx_create_thread((ngx_tid_t *) &ngx_threads
.tid,

ngx_worker_thread_cycle,

(void *) &ngx_threads
, cycle->log)

!= 0)

{

/* fatal */

exit(2);

}

}

}

}

#endif

/* 这里Nginx是通过采用信号的方式,控制worker进程运行,具体的处理信号的方法

* 其实在main()函数中的ngx_init_signals()初始化工作中已经弄好,可以见signals[]数组中的

* ngx_signal_handler()处理函数,之后打算好好看看ngx_init_signals()这个函数

*/

for ( ;; ) {

/* ngx_exiting将在worker进程收到SIGQUIT信号后设置,具体设置就在下面的ngx_quit语句中 */

if (ngx_exiting) {

c = cycle->connections;

/* 这是在worker进程收到quit信号退出前,必须将connections上读到的事件处理完 */

for (i = 0; i < cycle->connection_n; i++) {

/* THREAD: lock */

if (c[i].fd != -1 && c[i].idle) {

c[i].close = 1;

c[i].read->handler(c[i].read);

}

}

/* 这里就可以调用ngx_worker_process_exit退出了,这个if条件用到红黑树。。怎么用到的,以后在细究 */

if (ngx_event_timer_rbtree.root == ngx_event_timer_rbtree.sentinel)

{

ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");

ngx_worker_process_exit(cycle);

}

}

ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");

/* 这个就是worker进程具体的时间和定时器核心处理函数!!!以后慢慢看! */

ngx_process_events_and_timers(cycle);

/* worker进程收到了SIGINT信号,直接强制退出 */

if (ngx_terminate) {

ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");

ngx_worker_process_exit(cycle);

}

/* worker进程收到了SIGQUIT信号,如果此时worker进程不是不是处于exiting状态,

* 就将设置ngx_exiting为1,让其进入exiting状态;同时关闭监听套接口,待看...

*/

if (ngx_quit) {

ngx_quit = 0;

ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,

"gracefully shutting down");

ngx_setproctitle("worker process is shutting down");

if (!ngx_exiting) {

ngx_close_listening_sockets(cycle);

ngx_exiting = 1;

}

}

/* worker进程收到了SIGUSR1信号,通知Nginx需要重新打开文件 */

if (ngx_reopen) {

ngx_reopen = 0;

ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs");

ngx_reopen_files(cycle, -1);

}

}

}

下面是ngx_worker_process_init()的工作:

[cpp] view
plaincopy





static void

ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker)

{

sigset_t set;

uint64_t cpu_affinity;

ngx_int_t n;

ngx_uint_t i;

struct rlimit rlmt;

ngx_core_conf_t *ccf;

ngx_listening_t *ls;

/* 设置全局的环境变量environ,不细看了,细看暂时也弄不明白 */

if (ngx_set_environment(cycle, NULL) == NULL) {

/* fatal */

exit(2);

}

/* 获取核心模块配置 */

ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);

/* 设置worker进程优先级 */

if (worker >= 0 && ccf->priority != 0) {

if (setpriority(PRIO_PROCESS, 0, ccf->priority) == -1) {

ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,

"setpriority(%d) failed", ccf->priority);

}

}

/* 这里是给成员做资源使用的限制,可以查下setrlimit()系统调用 */

if (ccf->rlimit_nofile != NGX_CONF_UNSET) {

rlmt.rlim_cur = (rlim_t) ccf->rlimit_nofile;

rlmt.rlim_max = (rlim_t) ccf->rlimit_nofile;

if (setrlimit(RLIMIT_NOFILE, &rlmt) == -1) {

ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,

"setrlimit(RLIMIT_NOFILE, %i) failed",

ccf->rlimit_nofile);

}

}

if (ccf->rlimit_core != NGX_CONF_UNSET) {

rlmt.rlim_cur = (rlim_t) ccf->rlimit_core;

rlmt.rlim_max = (rlim_t) ccf->rlimit_core;

if (setrlimit(RLIMIT_CORE, &rlmt) == -1) {

ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,

"setrlimit(RLIMIT_CORE, %O) failed",

ccf->rlimit_core);

}

}

#ifdef RLIMIT_SIGPENDING

if (ccf->rlimit_sigpending != NGX_CONF_UNSET) {

rlmt.rlim_cur = (rlim_t) ccf->rlimit_sigpending;

rlmt.rlim_max = (rlim_t) ccf->rlimit_sigpending;

if (setrlimit(RLIMIT_SIGPENDING, &rlmt) == -1) {

ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,

"setrlimit(RLIMIT_SIGPENDING, %i) failed",

ccf->rlimit_sigpending);

}

}

#endif

/* geteuid()用于获取执行目前进程有效的用户识别码,root用户的uid为0,这里是说如果拥有root超级权限 */

if (geteuid() == 0) {

/* 设置组ID,目的猜测是多个worker进程共享同一个目录下的文件 */

if (setgid(ccf->group) == -1) {

ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,

"setgid(%d) failed", ccf->group);

/* fatal */

exit(2);

}

/* 在user中设置组ID,猜测 */

if (initgroups(ccf->username, ccf->group) == -1) {

ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,

"initgroups(%s, %d) failed",

ccf->username, ccf->group);

}

/* 将worker进程设置为拥有该文件所有者同样的权限 */

if (setuid(ccf->user) == -1) {

ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,

"setuid(%d) failed", ccf->user);

/* fatal */

exit(2);

}

}

/* 这里九分猜测是给worker进程分配CPU */

if (worker >= 0) {

cpu_affinity = ngx_get_cpu_affinity(worker);

if (cpu_affinity) {

ngx_setaffinity(cpu_affinity, cycle->log);

}

}

#if (NGX_HAVE_PR_SET_DUMPABLE)

/* allow coredump after setuid() in Linux 2.4.x */

if (prctl(PR_SET_DUMPABLE, 1, 0, 0, 0) == -1) {

ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,

"prctl(PR_SET_DUMPABLE) failed");

}

#endif

/* 这里是更改工作路径 */

if (ccf->working_directory.len) {

if (chdir((char *) ccf->working_directory.data) == -1) {

ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,

"chdir(\"%s\") failed", ccf->working_directory.data);

/* fatal */

exit(2);

}

}

/* 初始化信号集 */

sigemptyset(&set);

/* 将set集中的信号设为阻塞。

* 这里是让worker进程对set集中的信号延迟处理

*/

if (sigprocmask(SIG_SETMASK, &set, NULL) == -1) {

ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,

"sigprocmask() failed");

}

/*

* disable deleting previous events for the listening sockets because

* in the worker processes there are no events at all at this point

*/

/* worker子进程所有的监听端口初始化 */

ls = cycle->listening.elts;

for (i = 0; i < cycle->listening.nelts; i++) {

ls[i].previous = NULL;

}

/* worker子进程进行所有模块的自定义初始化 */

for (i = 0; ngx_modules[i]; i++) {

if (ngx_modules[i]->init_process) {

if (ngx_modules[i]->init_process(cycle) == NGX_ERROR) {

/* fatal */

exit(2);

}

}

}

/* ngx_processes[]是一个所有worker进程共享的全局变量,这里为了保证当前worker进程

* 禁止读取master进程给其他worker进程的消息,所以这里必须关闭所有其他worker进程的读端描述符。

* 然后,保留对其他worker进程的写端描述符和自身的读端描述符,目的是能够保持所有worker进程间的通信。

* 其他所有worker进程也都如此做。一句话就是,通过其他worker进程写端写入消息给别人,通过自身读端来接收来自master

* 和其他worker进程的消息

*/

for (n = 0; n < ngx_last_process; n++) {

/* 不存在的worker进程,跳过 */

if (ngx_processes
.pid == -1) {

continue;

}

/* 该worker进程本身,跳过 */

if (n == ngx_process_slot) {

continue;

}

/* channel[1]已关闭,跳过 */

if (ngx_processes
.channel[1] == -1) {

continue;

}

/* 对其他worker进程的读端文件描述符关闭,保留写端文件描述符保持worker间的通信之用 */

if (close(ngx_processes
.channel[1]) == -1) {

ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,

"close() channel failed");

}

}

/* 关闭自身的写端文件描述符,因为每个worker进程只需要从自己的读端读取消息,而不用给自己写消息。

* 需要用到自身的写端文件描述符的是master和其他worker进程。这也是上面为什么要关闭其他worker进程的读端,保留写端的原因。

if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) {

ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,

"close() channel failed");

}

#if 0

ngx_last_process = 0;

#endif

/* 这里是令Nginx开始关注当前worker进程中channel读端的读事件,下面稍微描述下,现在还不能一窥全景 */

if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT,

ngx_channel_handler)

== NGX_ERROR)

{

/* fatal */

exit(2);

}

}

ngx_int_t

ngx_add_channel_event(ngx_cycle_t *cycle, ngx_fd_t fd, ngx_int_t event,

ngx_event_handler_pt handler)

{

ngx_event_t *ev, *rev, *wev;

ngx_connection_t *c;

/* 这些个参数配置看是看了,但暂时不清楚怎么用,以后再来说 */

c = ngx_get_connection(fd, cycle->log);

if (c == NULL) {

return NGX_ERROR;

}

c->pool = cycle->pool;

rev = c->read;

wev = c->write;

rev->log = cycle->log;

wev->log = cycle->log;

#if (NGX_THREADS)

rev->lock = &c->lock;

wev->lock = &c->lock;

rev->own_lock = &c->lock;

wev->own_lock = &c->lock;

#endif

rev->channel = 1;

wev->channel = 1;

ev = (event == NGX_READ_EVENT) ? rev : wev;

ev->handler = handler;

if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {

if (ngx_add_conn(c) == NGX_ERROR) {

ngx_free_connection(c);

return NGX_ERROR;

}

} else {

/* 猜的不错,这个应该是重点,调用ngx_add_event宏,在这里如果是epoll的话,

* 应该是调用ngx_epoll_add_event函数,将该channel上的读事件加入到epoll

*/

if (ngx_add_event(ev, event, 0) == NGX_ERROR) {

ngx_free_connection(c);

return NGX_ERROR;

}

}

return NGX_OK;

}

总结

分析源码要考虑前因后果,单单停留在代码的表面表示会有很多难度。看完这个worker循环,对信号量的设置,以及进程初始化的工作有了个初步的认识。另外,就单独对于代码技巧方面,比如加入epoll事件等等。。。Nginx确实是一套很牛的代码。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  nginx