Nginx服务模块开发
2016-01-02 22:19
746 查看
纵观网上各种关于nginx的模块开发的资料都是基于HTTP的模块开发,这里就整理一篇是基于TCP协议的服务模块开发,并且给出一套自定义服务模块的源码,该模块主要实现的功能是监听某一端口,然后把接收到的客户端数据转发的上流服务器,再把上流服务器的返回数据回传给客户端。模块源码下载地址:https://code.csdn.net/gamer727/nginx_mypo_module
Nginx的服务模块一般都是以四个字母命名的,这里就命名为mypo模块(mypo取“my port”前四个字母)。本模块开发的nginx的版本为1.6.2
根据个人的研究整理,nginx的开发步骤大致归纳以下五步:
第一步,在配置文件中增加自定义配置;
第二步,根据配置文件的内容初始化各种参数;
第三步,开始初始化服务,也就是开始绑定端口,监听端口;
第四步,给各个连接(connection)设置业务处理,这里的业务处理的细节步骤就取决于需求的复杂度了;
第五步,把自定义模块编译进nginx。
http的,配置文件一般分成三个等级:main、server、location,由于mypo只实现基本的转发功能,所以这里就两个配置,一个listen监听本地端口的,和一个upstream配置上流服务器的。
ngx_mypo_config.h,定义mypo模块的主要结构体和模块标识;
ngx_mypo.c、ngx_mypo.h,mypo模块的入口,调用各种子模块;
ngx_mypo_core_module.c、ngx_mypo_core_module.h,子模块的核心,读取server级别的配置。
首先讲解一下全局配置头文件ngx_mypo_config.h的部分内容:
ngx_mypo.c:
ngx_mypo_core_module.c:
函数ngx_mypo_core_listen定义:
函数ngx_mypo_core_upstream的定义:
函数ngx_mypo_init_listening里调用ngx_mypo_add_listening最终调用nginx的API创建非阻塞sock:
接收数据的处理接口实现:
ngx_mypo_wait_request_handler:
ngx_mypo_upstream_init:
ngx_mypo_upstream_rec_data:
方法ngx_mypo_upstream_send_data的定义:
方法ngx_mypo_send:
第一种修改ojbs目录下的makefile和ngx_modules.c文件,以http模块为模板,修改成mypo。
ngx_modules.c:
makefile:
第二种,在mypo目录下添加config文件,编译nginx前configure的时候加入mypo模块的目录到–add-module选项,这种方法简单,但之前configure没有保存的话,之前的编译配置都会被清掉,所以不要轻易地configure。
configure文件的内容:
最后归纳一下调用到的nginx API:
代码看多了,就会觉得,再庞大的系统如Linux内核、rtos之类的,核心的就几个,其他的都是按照那几个核心模板复制出来的。不用自己想的,直接套模板,跟着代码节奏去走。就好像开车那样,开到一定境界,不用特意去想怎么加油、挂当,车自己就会走了,更高的境界就是自己去改装车子了,让车子变成自己的手和脚。
顺便也谈谈这些年心得吧,其实做网络开发,阅读服务端代码,不管是nginx还是Apache,还是其他的abcd系统,只要是网络服务器的,就一定不会逃过以下的几步:
1、创建socket;
2、绑定(bind)地址和端口号;
3、监听(listen)端口;(如果是udp,这步就跳过了)
4、接收(accept)客户端链接;(udp这步也跳过)
5、读写数据,业务处理。这里能够调用的函数种类就很多了,有read/write、rec/send、recfrom/sendto等等;
这几步说是各个网络服务类代码的主干,其他都是旁枝末节,如果看代码的时候在旁枝末节上迷了路,可以回到主干上寻找突破口。
Nginx的服务模块一般都是以四个字母命名的,这里就命名为mypo模块(mypo取“my port”前四个字母)。本模块开发的nginx的版本为1.6.2
根据个人的研究整理,nginx的开发步骤大致归纳以下五步:
第一步,在配置文件中增加自定义配置;
第二步,根据配置文件的内容初始化各种参数;
第三步,开始初始化服务,也就是开始绑定端口,监听端口;
第四步,给各个连接(connection)设置业务处理,这里的业务处理的细节步骤就取决于需求的复杂度了;
第五步,把自定义模块编译进nginx。
第一步,在配置文件nginx.conf中增加自定义配置
mypo { server { listen 1025; upstream 127.0.0.1:1027; } } 配置文件参考
http的,配置文件一般分成三个等级:main、server、location,由于mypo只实现基本的转发功能,所以这里就两个配置,一个listen监听本地端口的,和一个upstream配置上流服务器的。
第二步,根据配置文件的内容初始化各种参数
这里开始进入源码,首先参考http模块新建mypo模块的核心文件:ngx_mypo_config.h,定义mypo模块的主要结构体和模块标识;
ngx_mypo.c、ngx_mypo.h,mypo模块的入口,调用各种子模块;
ngx_mypo_core_module.c、ngx_mypo_core_module.h,子模块的核心,读取server级别的配置。
首先讲解一下全局配置头文件ngx_mypo_config.h的部分内容:
typedef struct { void **main_conf; void **srv_conf; void **loc_conf; } ngx_mypo_conf_ctx_t; typedef struct { ngx_int_t (*preconfiguration)(ngx_conf_t *cf); ngx_int_t (*postconfiguration)(ngx_conf_t *cf); void *(*create_main_conf)(ngx_conf_t *cf); char *(*init_main_conf)(ngx_conf_t *cf, void *conf); void *(*create_srv_conf)(ngx_conf_t *cf); char *(*merge_srv_conf)(ngx_conf_t *cf, void *prev, void *conf); void *(*create_loc_conf)(ngx_conf_t *cf); char *(*merge_loc_conf)(ngx_conf_t *cf, void *prev, void *conf); } ngx_mypo_module_t; //定义mypo模块的标识,为了避免数值冲突和统一风格,数值是模块名的ASCII码 #define NGX_MYPO_MODULE 0x4D59504F /* "MYPO" */
ngx_mypo.c:
//读取完配置文件后,如果匹配到了mypo{}就会调用这个结构体里赋值的 //ngx_mypo_block函数,可以说ngx_mypo_block是mypo的整体框架。 static ngx_command_t ngx_mypo_commands[] = { { ngx_string("mypo"), NGX_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS, ngx_mypo_block, 0, 0, NULL }, ngx_null_command }; static ngx_core_module_t ngx_mypo_module_ctx = { ngx_string("mypo"), NULL, NULL }; //服务模块入口的结构体,所有的服务模块入口结构体的ngx_module_t.type //都是设置为NGX_CORE_MODULE的,子模块就是自定义的,如NGX_MYPO_MODULE。 ngx_module_t ngx_mypo_module = { NGX_MODULE_V1, &ngx_mypo_module_ctx, /* module context */ ngx_mypo_commands, /* module directives */ NGX_CORE_MODULE, /* module type */ NULL, /* init master */ NULL, /* init module */ NULL, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ NULL, /* exit process */ NULL, /* exit master */ NGX_MODULE_V1_PADDING }; static char * ngx_mypo_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { char *rv; ngx_uint_t mi, m, s; ngx_conf_t pcf; ngx_mypo_module_t *module; ngx_mypo_conf_ctx_t *ctx; ngx_mypo_core_loc_conf_t *clcf; ngx_mypo_core_srv_conf_t **cscfp; ngx_mypo_core_main_conf_t *cmcf; /* the main mypo context */ ctx = ngx_pcalloc(cf->pool, sizeof(ngx_mypo_conf_ctx_t)); if (ctx == NULL) { return NGX_CONF_ERROR; } *(ngx_mypo_conf_ctx_t **) conf = ctx; /* count the number of the mypo modules and set up their indices */ ngx_mypo_max_module = 0; for (m = 0; ngx_modules[m]; m++) { if (ngx_modules[m]->type != NGX_MYPO_MODULE) { continue; } ngx_modules[m]->ctx_index = ngx_mypo_max_module++; } /* the mypo main_conf context, it is the same in the all mypo contexts */ ctx->main_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_mypo_max_module); if (ctx->main_conf == NULL) { return NGX_CONF_ERROR; } /* * the mypo null srv_conf context, it is used to merge * the server{}s' srv_conf's */ ctx->srv_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_mypo_max_module); if (ctx->srv_conf == NULL) { return NGX_CONF_ERROR; } /* * the mypo null loc_conf context, it is used to merge * the server{}s' loc_conf's */ ctx->loc_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_mypo_max_module); if (ctx->loc_conf == NULL) { return NGX_CONF_ERROR; } /* * create the main_conf's, the null srv_conf's, and the null loc_conf's * of the all mypo modules */ for (m = 0; ngx_modules[m]; m++) { if (ngx_modules[m]->type != NGX_MYPO_MODULE) { continue; } module = ngx_modules[m]->ctx; mi = ngx_modules[m]->ctx_index; if (module->create_main_conf) { ctx->main_conf[mi] = module->create_main_conf(cf); if (ctx->main_conf[mi] == NULL) { return NGX_CONF_ERROR; } } if (module->create_srv_conf) { ctx->srv_conf[mi] = module->create_srv_conf(cf); if (ctx->srv_conf[mi] == NULL) { return NGX_CONF_ERROR; } } if (module->create_loc_conf) { ctx->loc_conf[mi] = module->create_loc_conf(cf); if (ctx->loc_conf[mi] == NULL) { return NGX_CONF_ERROR; } } } pcf = *cf; cf->ctx = ctx; for (m = 0; ngx_modules[m]; m++) { if (ngx_modules[m]->type != NGX_MYPO_MODULE) { continue; } module = ngx_modules[m]->ctx; if (module->preconfiguration) { if (module->preconfiguration(cf) != NGX_OK) { return NGX_CONF_ERROR; } } } /* parse inside the mypo{} block */ cf->module_type = NGX_MYPO_MODULE; cf->cmd_type = NGX_MYPO_MAIN_CONF; rv = ngx_conf_parse(cf, NULL); if (rv != NGX_CONF_OK) { goto failed; } /* * init mypo{} main_conf's, merge the server{}s' srv_conf's * and its location{}s' loc_conf's */ cmcf = ctx->main_conf[ngx_mypo_core_module.ctx_index]; cscfp = cmcf->servers.elts; for (m = 0; ngx_modules[m]; m++) { if (ngx_modules[m]->type != NGX_MYPO_MODULE) { continue; } module = ngx_modules[m]->ctx; mi = ngx_modules[m]->ctx_index; /* init mypo{} main_conf's */ if (module->init_main_conf) { rv = module->init_main_conf(cf, ctx->main_conf[mi]); if (rv != NGX_CONF_OK) { goto failed; } } rv = ngx_mypo_merge_servers(cf, cmcf, module, mi); if (rv != NGX_CONF_OK) { goto failed; } } /* create location trees */ for (s = 0; s < cmcf->servers.nelts; s++) { clcf = cscfp[s]->ctx->loc_conf[ngx_mypo_core_module.ctx_index]; if (ngx_mypo_init_locations(cf, cscfp[s], clcf) != NGX_OK) { return NGX_CONF_ERROR; } if (ngx_mypo_init_static_location_trees(cf, clcf) != NGX_OK) { return NGX_CONF_ERROR; } } for (m = 0; ngx_modules[m]; m++) { if (ngx_modules[m]->type != NGX_MYPO_MODULE) { continue; } module = ngx_modules[m]->ctx; if (module->postconfiguration) { if (module->postconfiguration(cf) != NGX_OK) { return NGX_CONF_ERROR; } } } *cf = pcf; /* optimize the lists of ports, addresses and server names */ if (ngx_mypo_optimize_servers(cf, cmcf, cmcf->ports) != NGX_OK) { return NGX_CONF_ERROR; } return NGX_CONF_OK; failed: *cf = pcf; return rv; }
ngx_mypo_core_module.c:
//server级别的配置 static ngx_command_t ngx_mypo_core_commands[] = { { ngx_string("server"), NGX_MYPO_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS, ngx_mypo_core_server, 0, 0, NULL }, //代理监听的IP端口,调用ngx_mypo_core_listen函数解析listen行的数据, //NGX_CONF_1MORE标识标识可以带多个参数 { ngx_string("listen"), NGX_MYPO_SRV_CONF|NGX_CONF_1MORE, ngx_mypo_core_listen, NGX_MYPO_SRV_CONF_OFFSET, 0, NULL }, //upstream配置的是上流服务器的IP端口,调用ngx_mypo_core_upstream函数解析数据。 { ngx_string("upstream"), NGX_MYPO_SRV_CONF|NGX_CONF_1MORE, ngx_mypo_core_upstream, NGX_MYPO_SRV_CONF_OFFSET, 0, NULL }, ngx_null_command }; //子模块的一下初始化,合并配置的函数声明 static ngx_mypo_module_t ngx_mypo_core_module_ctx = { ngx_mypo_core_preconfiguration, /* preconfiguration */ NULL, /* postconfiguration */ ngx_mypo_core_create_main_conf, /* create main configuration */ ngx_mypo_core_init_main_conf, /* init main configuration */ ngx_mypo_core_create_srv_conf, /* create server configuration */ ngx_mypo_core_merge_srv_conf, /* merge server configuration */ ngx_mypo_core_create_loc_conf, /* create location configuration */ ngx_mypo_core_merge_loc_conf /* merge location configuration */ }; //mypo子模块核心模块的入口结构体 ngx_module_t ngx_mypo_core_module = { NGX_MODULE_V1, &ngx_mypo_core_module_ctx, /* module context */ ngx_mypo_core_commands, /* module directives */ NGX_MYPO_MODULE, /* module type */ NULL, /* init master */ NULL, /* init module */ NULL, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ NULL, /* exit process */ NULL, /* exit master */ NGX_MODULE_V1_PADDING }; static char * ngx_mypo_core_server(ngx_conf_t *cf, ngx_command_t *cmd, void *dummy) { char *rv; void *mconf; ngx_uint_t i; ngx_conf_t pcf; ngx_mypo_module_t *module; struct sockaddr_in *sin; ngx_mypo_conf_ctx_t *ctx, *mypo_ctx; ngx_mypo_listen_opt_t lsopt; ngx_mypo_core_srv_conf_t *cscf, **cscfp; ngx_mypo_core_main_conf_t *cmcf; ctx = ngx_pcalloc(cf->pool, sizeof(ngx_mypo_conf_ctx_t)); if (ctx == NULL) { return NGX_CONF_ERROR; } mypo_ctx = cf->ctx; ctx->main_conf = mypo_ctx->main_conf; /* the server{}'s srv_conf */ ctx->srv_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_mypo_max_module); if (ctx->srv_conf == NULL) { return NGX_CONF_ERROR; } /* the server{}'s loc_conf */ ctx->loc_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_mypo_max_module); if (ctx->loc_conf == NULL) { return NGX_CONF_ERROR; } for (i = 0; ngx_modules[i]; i++) { if (ngx_modules[i]->type != NGX_MYPO_MODULE) { continue; } module = ngx_modules[i]->ctx; if (module->create_srv_conf) { mconf = module->create_srv_conf(cf); if (mconf == NULL) { return NGX_CONF_ERROR; } ctx->srv_conf[ngx_modules[i]->ctx_index] = mconf; } if (module->create_loc_conf) { mconf = module->create_loc_conf(cf); if (mconf == NULL) { return NGX_CONF_ERROR; } ctx->loc_conf[ngx_modules[i]->ctx_index] = mconf; } } /* the server configuration context */ cscf = ctx->srv_conf[ngx_mypo_core_module.ctx_index]; cscf->ctx = ctx; cmcf = ctx->main_conf[ngx_mypo_core_module.ctx_index]; cscfp = ngx_array_push(&cmcf->servers); if (cscfp == NULL) { return NGX_CONF_ERROR; } *cscfp = cscf; /* parse inside server{} */ pcf = *cf; cf->ctx = ctx; cf->cmd_type = NGX_MYPO_SRV_CONF; rv = ngx_conf_parse(cf, NULL); *cf = pcf; if (rv == NGX_CONF_OK && !cscf->listen) { ngx_memzero(&lsopt, sizeof(ngx_mypo_listen_opt_t)); sin = &lsopt.u.sockaddr_in; sin->sin_family = AF_INET; sin->sin_port = htons((getuid() == 0) ? 80 : 8000); sin->sin_addr.s_addr = INADDR_ANY; lsopt.socklen = sizeof(struct sockaddr_in); lsopt.backlog = NGX_LISTEN_BACKLOG; lsopt.rcvbuf = -1; lsopt.sndbuf = -1; lsopt.wildcard = 1; (void) ngx_sock_ntop(&lsopt.u.sockaddr, lsopt.socklen, lsopt.addr, NGX_SOCKADDR_STRLEN, 1); if (ngx_mypo_add_listen(cf, cscf, &lsopt) != NGX_OK) { return NGX_CONF_ERROR; } } return rv; }
函数ngx_mypo_core_listen定义:
static char * ngx_mypo_core_listen(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { ngx_mypo_core_srv_conf_t *cscf = conf; ngx_str_t *value /*, size*/; ngx_url_t u; //ngx_uint_t n; ngx_mypo_listen_opt_t lsopt; cscf->listen = 1; value = cf->args->elts; ngx_memzero(&u, sizeof(ngx_url_t)); u.url = value[1]; u.listen = 1; u.default_port = 80; //解析IP端口主要是调用nginx里的内部函数ngx_parse_url进行解析。 if (ngx_parse_url(cf->pool, &u) != NGX_OK) { if (u.err) { ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "%s in \"%V\" of the \"listen\" directive", u.err, &u.url); } return NGX_CONF_ERROR; } ngx_memzero(&lsopt, sizeof(ngx_mypo_listen_opt_t)); ngx_memcpy(&lsopt.u.sockaddr, u.sockaddr, u.socklen); lsopt.socklen = u.socklen; lsopt.backlog = NGX_LISTEN_BACKLOG; lsopt.rcvbuf = -1; lsopt.sndbuf = -1; lsopt.wildcard = u.wildcard; #if (NGX_HAVE_INET6 && defined IPV6_V6ONLY) lsopt.ipv6only = 1; #endif (void) ngx_sock_ntop(&lsopt.u.sockaddr, lsopt.socklen, lsopt.addr, NGX_SOCKADDR_STRLEN, 1); //保存监听IP端口的列表到conf中,conf是底层调用时传过来的全局自定义变量, //全局配置数据都保存到这里 if (ngx_mypo_add_listen(cf, cscf, &lsopt) == NGX_OK) { return NGX_CONF_OK; } return NGX_CONF_ERROR; }
函数ngx_mypo_core_upstream的定义:
static char * ngx_mypo_core_upstream(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { ngx_mypo_core_srv_conf_t *cscf = conf; ngx_str_t *value /*, size*/; ngx_url_t u; value = cf->args->elts; ngx_memzero(&u, sizeof(ngx_url_t)); u.url = value[1]; u.listen = 1; u.default_port = 80; if (ngx_parse_url(cf->pool, &u) != NGX_OK) { if (u.err) { ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "%s in \"%V\" of the \"upstream\" directive", u.err, &u.url); } return NGX_CONF_ERROR; } ngx_memzero(&(cscf->upstream_addr), sizeof(cscf->upstream_addr)); //解析创建出来的上流服务器IP端口保存到全局的配置 ngx_memcpy(&(cscf->upstream_addr), u.sockaddr, u.socklen); return NGX_CONF_OK; }
第三步,开始初始化服务,也就是开始绑定端口,监听端口
从函数ngx_mypo_optimize_servers开始:static ngx_int_t ngx_mypo_optimize_servers(ngx_conf_t *cf, ngx_mypo_core_main_conf_t *cmcf, ngx_array_t *ports) { ngx_uint_t p, a; ngx_mypo_conf_port_t *port; ngx_mypo_conf_addr_t *addr; if (ports == NULL) { return NGX_OK; } //这里的架构基本都是全套用http的,然后删掉一下http业务和mypo业务不同的部分。 port = ports->elts; for (p = 0; p < ports->nelts; p++) { ngx_sort(port[p].addrs.elts, (size_t) port[p].addrs.nelts, sizeof(ngx_mypo_conf_addr_t), ngx_mypo_cmp_conf_addrs); /* * check whether all name-based servers have the same * configuration as a default server for given address:port */ addr = port[p].addrs.elts; for (a = 0; a < port[p].addrs.nelts; a++) { if (addr[a].servers.nelts > 1 ) { if (ngx_mypo_server_names(cf, cmcf, &addr[a]) != NGX_OK) { return NGX_ERROR; } } } if (ngx_mypo_init_listening(cf, &port[p]) != NGX_OK) { return NGX_ERROR; } } return NGX_OK; }
函数ngx_mypo_init_listening里调用ngx_mypo_add_listening最终调用nginx的API创建非阻塞sock:
static ngx_listening_t * ngx_mypo_add_listening(ngx_conf_t *cf, ngx_mypo_conf_addr_t *addr) { ngx_listening_t *ls; ngx_mypo_core_loc_conf_t *clcf; ngx_mypo_core_srv_conf_t *cscf; ls = ngx_create_listening(cf, &addr->opt.u.sockaddr, addr->opt.socklen); if (ls == NULL) { return NULL; } ls->addr_ntop = 1; //开发服务器时的基本步骤就是bind->listen->accept,bind和listen都是按照流程 //走下来不会有阻塞的了,而accept什么时候调用到,取决于客户端何时有连接才会返回socket //进行读写的,这一步是交给nginx底层的event来处理的才能发挥出nginx的高效性,上层要做的 //就是把处理socket的方法传给nginx底层,传递步骤就是给ngx_create_listening返回 //结构体的handler成员赋值: ls->handler = ngx_mypo_init_connection; cscf = addr->default_server; cscf->connection_pool_size = 256; cscf->client_header_timeout = 60000; ls->pool_size = cscf->connection_pool_size; ls->post_accept_timeout = cscf->client_header_timeout; clcf = cscf->ctx->loc_conf[ngx_mypo_core_module.ctx_index]; //这里的error_log是不能为空的,否则在nginx底层调用初始化的时候就会crash掉了 if (clcf->error_log == NULL) { clcf->error_log = &cf->cycle->new_log; } ls->logp = clcf->error_log; ls->log.data = &ls->addr_text; //ls->log.handler = ngx_accept_log_error; ls->log.handler = NULL; ls->backlog = addr->opt.backlog; ls->rcvbuf = addr->opt.rcvbuf; ls->sndbuf = addr->opt.sndbuf; ls->keepalive = addr->opt.so_keepalive; #if (NGX_HAVE_KEEPALIVE_TUNABLE) ls->keepidle = addr->opt.tcp_keepidle; ls->keepintvl = addr->opt.tcp_keepintvl; ls->keepcnt = addr->opt.tcp_keepcnt; #endif return ls; }
第四步,给各个连接(connection)设置业务处理
ngx_create_listening返回的参数里的handler成员赋值ngx_mypo_init_connection:void ngx_mypo_init_connection(ngx_connection_t *c) { ngx_uint_t i; ngx_event_t *rev; struct sockaddr_in *sin; ngx_mypo_port_t *port; ngx_mypo_in_addr_t *addr; ngx_mypo_log_ctx_t *ctx; #if (NGX_HAVE_INET6) struct sockaddr_in6 *sin6; ngx_mypo_in6_addr_t *addr6; #endif port = c->listening->servers; if (port->naddrs > 1) { /* * there are several addresses on this port and one of them * is an "*:port" wildcard so getsockname() in ngx_mypo_server_addr() * is required to determine a server address */ if (ngx_connection_local_sockaddr(c, NULL, 0) != NGX_OK) { ngx_mypo_close_connection(c); return; } switch (c->local_sockaddr->sa_family) { #if (NGX_HAVE_INET6) case AF_INET6: sin6 = (struct sockaddr_in6 *) c->local_sockaddr; addr6 = port->addrs; /* the last address is "*" */ for (i = 0; i < port->naddrs - 1; i++) { if (ngx_memcmp(&addr6[i].addr6, &sin6->sin6_addr, 16) == 0) { break; } } break; #endif default: /* AF_INET */ sin = (struct sockaddr_in *) c->local_sockaddr; addr = port->addrs; /* the last address is "*" */ for (i = 0; i < port->naddrs - 1; i++) { if (addr[i].addr == sin->sin_addr.s_addr) { break; } } // add core module configure to the connection c->data = &addr[i].conf; break; } } else { switch (c->local_sockaddr->sa_family) { #if (NGX_HAVE_INET6) case AF_INET6: addr6 = port->addrs; //hc->addr_conf = &addr6[0].conf; break; #endif default: /* AF_INET */ addr = port->addrs; //hc->addr_conf = &addr[0].conf; c->data = &addr[0].conf; break; } } /* the default server configuration for the address:port */ //hc->conf_ctx = hc->addr_conf->default_server->ctx; ctx = ngx_palloc(c->pool, sizeof(ngx_mypo_log_ctx_t)); if (ctx == NULL) { ngx_mypo_close_connection(c); return; } ctx->connection = c; ctx->request = NULL; ctx->current_request = NULL; c->log->connection = c->number; c->log->handler = NULL/*ngx_mypo_log_error*/; c->log->data = ctx; c->log->action = "waiting for request"; c->log_error = NGX_ERROR_INFO; rev = c->read; //accept到客户端的连接后就是数据的读写了,这又是一步阻塞的过程,需要让nginx底层 //把它变成非阻塞,基本不用自己处理的了,同样只需要把读写数据的处理方法传递给nginx //底层,让nginx底层来调用: rev->handler = ngx_mypo_wait_request_handler; c->write->handler = ngx_mypo_empty_handler; if (rev->ready) { /* the deferred accept(), rtsig, aio, iocp */ if (ngx_use_accept_mutex) { ngx_post_event(rev, &ngx_posted_events); return; } rev->handler(rev); return; } ngx_add_timer(rev, c->listening->post_accept_timeout); ngx_reusable_connection(c, 1); if (ngx_handle_read_event(rev, 0) != NGX_OK) { ngx_mypo_close_connection(c); return; } }
接收数据的处理接口实现:
rev->handler = ngx_mypo_wait_request_handler; c->write->handler = ngx_mypo_empty_handler;
ngx_mypo_wait_request_handler:
static void ngx_mypo_wait_request_handler(ngx_event_t *rev) { //u_char *p; size_t size; ssize_t n; ngx_buf_t *b; ngx_connection_t *c; //接收到来自底层的网络数据消息,Nginx是事件驱动的,所以一切信息都封装到ngx_event_t里去。 //想要读取数据就先获取到connection: c = rev->data; if (rev->timedout) { ngx_mypo_close_connection(c); return; } if (c->close) { ngx_mypo_close_connection(c); return; } //这里接收数据的大小应该根据协议的规定,解析和读取,这里只是为了方便固定设置成12而已。 size = 12; b = c->buffer; if (b == NULL) { b = ngx_create_temp_buf(c->pool, size); if (b == NULL) { ngx_mypo_close_connection(c); return; } c->buffer = b; } else if (b->start == NULL) { b->start = ngx_palloc(c->pool, size); if (b->start == NULL) { ngx_mypo_close_connection(c); return; } b->pos = b->start; b->last = b->start; b->end = b->last + size; } //从这里就开始读取客户端的请求数据了 n = c->recv(c, b->last, size); if (n == NGX_AGAIN) { if (!rev->timer_set) { ngx_add_timer(rev, c->listening->post_accept_timeout); ngx_reusable_connection(c, 1); } if (ngx_handle_read_event(rev, 0) != NGX_OK) { ngx_mypo_close_connection(c); return; } /* * We are trying to not hold c->buffer's memory for an idle connection. */ if (ngx_pfree(c->pool, b->start) == NGX_OK) { b->start = NULL; } return; } if (n == NGX_ERROR) { ngx_mypo_close_connection(c); return; } if (n == 0) { ngx_log_error(NGX_LOG_INFO, c->log, 0, "client closed connection"); ngx_mypo_close_connection(c); return; } b->last += n; c->log->action = "reading client request line"; ngx_reusable_connection(c, 0); //create the upstream and send data to the upstream server //保存客户端的数据,然后创建上流服务器的连接,把数据转发给上流服务器。 ngx_mypo_upstream_init(rev); //send data to client /* //如果服务器不做代理转发,而是直接返回数据到客户端,可以在这里进行处理。 ngx_chain_t out[1]; b = ngx_pcalloc(c->pool, sizeof(ngx_buf_t)); out[0].buf = b; out[0].next = NULL; b->pos = (u_char*)"hello_world, from Nginx"; b->last = b->pos + sizeof("hello_world, from Nginx") - 1; b->memory = 1; ngx_mypo_send(rev, out, 0); */ }
ngx_mypo_upstream_init:
static int ngx_mypo_upstream_init(ngx_event_t *rev) { size_t size; ngx_buf_t *b; ngx_connection_t *c; ngx_int_t rc; ngx_mypo_addr_conf_t *addr_conf; ngx_mypo_core_srv_conf_t *core_conf; c = rev->data; addr_conf = c->data; core_conf = addr_conf->default_server; if (rev->timedout) { ngx_mypo_close_connection(c); return -1; } if (c->close) { ngx_mypo_close_connection(c); return -1; } //size = cscf->client_header_buffer_size; size = 16; b = c->buffer; ngx_peer_connection_t *peer = ngx_palloc(c->pool, sizeof(ngx_peer_connection_t)); if (NULL == peer){ ngx_mypo_close_connection(c); return -1; } //赋值上流服务器的IP和端口,然后连接上流服务器。 peer->sockaddr = &(core_conf->upstream_addr); peer->socklen = sizeof(core_conf->upstream_addr); peer->name = NULL/*&ahcf->peer->name*/; peer->get = ngx_event_get_peer; peer->log = c->log; peer->log_error = NGX_ERROR_ERR; rc = ngx_event_connect_peer(peer); if (rc == NGX_ERROR || rc == NGX_BUSY || rc == NGX_DECLINED) { if (peer->connection) { ngx_close_connection(peer->connection); } ngx_close_connection(c); return -1; } //接收到上流服务器返回的数据,需要把数据回传给客户端,但是不管上流 //服务器的连接还是客户端的连接,都是底层传过来的,如果自定义一个 //全局变量来保存这些连接并且进行管理,也是可以,但维护起来就会很 //繁琐,这里可以使用ngx_connection_t结构体里的data字段,它是 //void*类型的,专门用来保存自定义类型的变量,自己可以把所需要的 //业务信息封装成一个结构体,然后分配内存赋值给data,由于这里业务 //简单,直接使用Nginx本身的ngx_event_t就足够了。 peer->connection->data = rev; peer->connection->pool = c->pool; //指定与上流服务器读写数据事件的方法 peer->connection->read->handler = ngx_mypo_upstream_rec_data; peer->connection->write->handler = ngx_mypo_upstream_send_data; return 0; }
ngx_mypo_upstream_rec_data:
static void ngx_mypo_upstream_rec_data(ngx_event_t *rev) { size_t size; ssize_t n; ngx_buf_t *b; ngx_connection_t *sc, *cc; ngx_event_t *cevent; //首先获取服务器和客户端的connection: sc = rev->data; cevent = sc->data; cc = cevent->data; if (rev->timedout) { ngx_mypo_close_connection(sc); return ; } if (sc->close) { ngx_mypo_close_connection(sc); return ; } size = 16; b = sc->buffer; if (b == NULL) { b = ngx_create_temp_buf(sc->pool, size); if (b == NULL) { ngx_mypo_close_connection(sc); return; } sc->buffer = b; } else if (b->start == NULL) { b->start = ngx_palloc(sc->pool, size); if (b->start == NULL) { ngx_mypo_close_connection(sc); return ; } b->pos = b->start; b->last = b->start; b->end = b->last + size; } n = sc->recv(sc, b->last, size); if (n == NGX_AGAIN) { if (!rev->timer_set) { ngx_add_timer(rev, sc->listening->post_accept_timeout); ngx_reusable_connection(sc, 1); } if (ngx_handle_read_event(rev, 0) != NGX_OK) { ngx_mypo_close_connection(sc); return ; } /* * We are trying to not hold c->buffer's memory for an idle connection. */ if (ngx_pfree(sc->pool, b->start) == NGX_OK) { b->start = NULL; } return; } if (n == NGX_ERROR) { ngx_mypo_close_connection(sc); return; } if (n == 0) { ngx_mypo_close_connection(sc); return ; } b->last += n; sc->log->action = "reading client request line"; ngx_reusable_connection(sc, 0); ngx_chain_t out[1]; //b = ngx_pcalloc(sc->pool, sizeof(ngx_buf_t)); out[0].buf = b; out[0].next = NULL; //b->pos = (u_char*)"hello_world, from Nginx"; //b->last = b->pos + sizeof("hello_world, from Nginx") - 1; //b->memory = 1; ngx_mypo_send(cevent, out, 0); ngx_mypo_close_connection(cc); return; }
方法ngx_mypo_upstream_send_data的定义:
static void ngx_mypo_upstream_send_data(ngx_event_t *rev) { ngx_buf_t *b; ngx_connection_t *sc, *cc; ngx_event_t *cevent; //一旦connect上流服务器成功,可以发送数据的时候就会调用到这个方法。 sc = rev->data; cevent = sc->data; cc = cevent->data; b = cc->buffer; //客户端接收到数据以后已经封装到buffer中了,现在只需要把它读出来,转发给上流服务器。 ngx_chain_t out[1]; out[0].buf = b; out[0].next = NULL; ngx_mypo_send(rev, out, 0); return; }
方法ngx_mypo_send:
static void ngx_mypo_send(ngx_event_t *wev, ngx_chain_t *in, off_t limit) { ngx_connection_t *cc; ngx_chain_t *cl; cc = wev->data; if (cc->destroyed) { return; } if (wev->timedout) { ngx_log_error(NGX_LOG_INFO, cc->log, NGX_ETIMEDOUT, "netcall: client send timed out"); cc->timedout = 1; ngx_mypo_close_connection(cc); return; } if (wev->timer_set) { ngx_del_timer(wev); } cl = cc->send_chain(cc, in, limit); if (cl == NGX_CHAIN_ERROR) { ngx_mypo_close_connection(cc); return; } /* more data to send? */ if (cl) { ngx_add_timer(wev, 10000); if (ngx_handle_write_event(wev, 0) != NGX_OK) { ngx_mypo_close_connection(cc); } return; } /* we've sent everything we had. * now receive reply */ ngx_del_event(wev, NGX_WRITE_EVENT, 0); }
第五步,把自定义模块编译进nginx
有两种方法:第一种修改ojbs目录下的makefile和ngx_modules.c文件,以http模块为模板,修改成mypo。
ngx_modules.c:
//声明外部变量 extern ngx_module_t ngx_mypo_module; extern ngx_module_t ngx_mypo_core_module; ngx_module_t *ngx_modules[] = { ...... &ngx_mypo_module, &ngx_mypo_core_module, NULL };
makefile:
#"..."是Makefile原来的配置,由于mypo模块是放到nginx目录里的src下, #所以路径是"./src/mypo/",如果是其他模块,可以把该 模块的绝对路径替换"./src/mypo/"即可。 CFLAGS = ... -I./src/mypo/ ADDON_DEPS = ... ./src/mypo/ngx_mypo_core_module.h ./src/mypo/ngx_mypo_config.h ./src/mypo/ngx_mypo.h objs/nginx: ... \ objs/addon/mypo/ngx_mypo.o \ objs/addon/mypo/ngx_mypo_core_module.o \ ... $(LINK) -o ... \ objs/addon/mypo/ngx_mypo.o \ objs/addon/mypo/ngx_mypo_core_module.o \ ... objs/addon/mypo/ngx_mypo.o: $(ADDON_DEPS) \ ./src/mypo//ngx_mypo.c $(CC) -c $(CFLAGS) $(ALL_INCS) \ -o objs/addon/mypo/ngx_mypo.o \ ./src/mypo/ngx_mypo.c objs/addon/mypo/ngx_mypo_core_module.o: $(ADDON_DEPS) \ ./src/mypo//ngx_mypo_core_module.c $(CC) -c $(CFLAGS) $(ALL_INCS) \ -o objs/addon/mypo/ngx_mypo_core_module.o \ ./src/mypo/ngx_mypo_core_module.c
第二种,在mypo目录下添加config文件,编译nginx前configure的时候加入mypo模块的目录到–add-module选项,这种方法简单,但之前configure没有保存的话,之前的编译配置都会被清掉,所以不要轻易地configure。
configure文件的内容:
ngx_addon_name="ngx_mypo_module" CORE_MODULES="$CORE_MODULES ngx_mypo_module \ ngx_mypo_core_module \ " NGX_ADDON_DEPS="$NGX_ADDON_DEPS \ $ngx_addon_dir/ngx_mypo_core_module.h \ $ngx_addon_dir/ngx_mypo_config.h \ $ngx_addon_dir/ngx_mypo.h \ " NGX_ADDON_SRCS="$NGX_ADDON_SRCS \ $ngx_addon_dir/ngx_mypo.c \ $ngx_addon_dir/ngx_mypo_core_module.c \ " CFLAGS="$CFLAGS -I$ngx_addon_dir"
最后归纳一下调用到的nginx API:
void *ngx_pcalloc(ngx_pool_t *pool, size_t size); char *ngx_conf_parse(ngx_conf_t *cf, ngx_str_t *filename); void *ngx_array_push(ngx_array_t *a); size_t ngx_sock_ntop(struct sockaddr *sa, socklen_t socklen, u_char *text, size_t len, ngx_uint_t port); #define ngx_memzero(buf, n) (void) memset(buf, 0, n) ngx_array_t * ngx_array_create(ngx_pool_t *p, ngx_uint_t n, size_t size); static ngx_inline ngx_int_t ngx_array_init(ngx_array_t *array, ngx_pool_t *pool, ngx_uint_t n, size_t size); ngx_int_t ngx_parse_url(ngx_pool_t *pool, ngx_url_t *u); ngx_listening_t * ngx_create_listening(ngx_conf_t *cf, void *sockaddr, socklen_t socklen); ngx_int_t ngx_connection_local_sockaddr(ngx_connection_t *c, ngx_str_t *s, ngx_uint_t port); void ngx_close_connection(ngx_connection_t *c); ngx_buf_t * ngx_create_temp_buf(ngx_pool_t *pool, size_t size); void ngx_reusable_connection(ngx_connection_t *c, ngx_uint_t reusable); ngx_int_t ngx_handle_read_event(ngx_event_t *rev, ngx_uint_t flags); ngx_int_t ngx_event_connect_peer(ngx_peer_connection_t *pc);
代码看多了,就会觉得,再庞大的系统如Linux内核、rtos之类的,核心的就几个,其他的都是按照那几个核心模板复制出来的。不用自己想的,直接套模板,跟着代码节奏去走。就好像开车那样,开到一定境界,不用特意去想怎么加油、挂当,车自己就会走了,更高的境界就是自己去改装车子了,让车子变成自己的手和脚。
顺便也谈谈这些年心得吧,其实做网络开发,阅读服务端代码,不管是nginx还是Apache,还是其他的abcd系统,只要是网络服务器的,就一定不会逃过以下的几步:
1、创建socket;
2、绑定(bind)地址和端口号;
3、监听(listen)端口;(如果是udp,这步就跳过了)
4、接收(accept)客户端链接;(udp这步也跳过)
5、读写数据,业务处理。这里能够调用的函数种类就很多了,有read/write、rec/send、recfrom/sendto等等;
这几步说是各个网络服务类代码的主干,其他都是旁枝末节,如果看代码的时候在旁枝末节上迷了路,可以回到主干上寻找突破口。
相关文章推荐
- Nginx的启动、停止与重启
- Nginx中文手冊
- nginx模块开发-在handle函数中获取页面请求的参数
- Nginx配置-伪静态,隐藏index.php大入口
- Kali Linux配置nginx支持php
- nginx启动脚本
- Nginx配置文件nginx.conf中文详解
- nginx + uwsgi + django 配置与安装
- nginx_lua vs nginx+php 应用场景
- CentOS 6.7配置Nginx 1.8负载均衡
- python+django+gunicorn+nginx的配置
- Nginx安装与使用
- 【Linux运维-集群技术进阶】Nginx的安装配置
- 【Linux运维-集群技术进阶】一分钟认识Nginx
- Ubuntu 安装Nginx
- nginx-1.8.0源码包编译安装
- nginx的部署与发布
- nginx安装
- Nginx优秀设计--ngx_tolower相关宏
- Nginx Location配置总结