您的位置:首页 > 运维架构 > Nginx

Nginx服务模块开发

2016-01-02 22:19 746 查看
纵观网上各种关于nginx的模块开发的资料都是基于HTTP的模块开发,这里就整理一篇是基于TCP协议的服务模块开发,并且给出一套自定义服务模块的源码,该模块主要实现的功能是监听某一端口,然后把接收到的客户端数据转发的上流服务器,再把上流服务器的返回数据回传给客户端。模块源码下载地址:https://code.csdn.net/gamer727/nginx_mypo_module

Nginx的服务模块一般都是以四个字母命名的,这里就命名为mypo模块(mypo取“my port”前四个字母)。本模块开发的nginx的版本为1.6.2

根据个人的研究整理,nginx的开发步骤大致归纳以下五步:

第一步,在配置文件中增加自定义配置;

第二步,根据配置文件的内容初始化各种参数;

第三步,开始初始化服务,也就是开始绑定端口,监听端口;

第四步,给各个连接(connection)设置业务处理,这里的业务处理的细节步骤就取决于需求的复杂度了;

第五步,把自定义模块编译进nginx。

第一步,在配置文件nginx.conf中增加自定义配置

mypo {
server {
listen 1025;
upstream 127.0.0.1:1027;

}

}
配置文件参考


http的,配置文件一般分成三个等级:main、server、location,由于mypo只实现基本的转发功能,所以这里就两个配置,一个listen监听本地端口的,和一个upstream配置上流服务器的。

第二步,根据配置文件的内容初始化各种参数

这里开始进入源码,首先参考http模块新建mypo模块的核心文件:

ngx_mypo_config.h,定义mypo模块的主要结构体和模块标识;

ngx_mypo.c、ngx_mypo.h,mypo模块的入口,调用各种子模块;

ngx_mypo_core_module.c、ngx_mypo_core_module.h,子模块的核心,读取server级别的配置。

首先讲解一下全局配置头文件ngx_mypo_config.h的部分内容:

typedef struct {
void        **main_conf;
void        **srv_conf;
void        **loc_conf;
} ngx_mypo_conf_ctx_t;

typedef struct {
ngx_int_t   (*preconfiguration)(ngx_conf_t *cf);
ngx_int_t   (*postconfiguration)(ngx_conf_t *cf);

void       *(*create_main_conf)(ngx_conf_t *cf);
char       *(*init_main_conf)(ngx_conf_t *cf, void *conf);

void       *(*create_srv_conf)(ngx_conf_t *cf);
char       *(*merge_srv_conf)(ngx_conf_t *cf, void *prev, void *conf);

void       *(*create_loc_conf)(ngx_conf_t *cf);
char       *(*merge_loc_conf)(ngx_conf_t *cf, void *prev, void *conf);
} ngx_mypo_module_t;

//定义mypo模块的标识,为了避免数值冲突和统一风格,数值是模块名的ASCII码
#define NGX_MYPO_MODULE           0x4D59504F   /* "MYPO" */


ngx_mypo.c:

//读取完配置文件后,如果匹配到了mypo{}就会调用这个结构体里赋值的
//ngx_mypo_block函数,可以说ngx_mypo_block是mypo的整体框架。
static ngx_command_t  ngx_mypo_commands[] = {

{ ngx_string("mypo"),
NGX_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS,
ngx_mypo_block,
0,
0,
NULL },

ngx_null_command
};

static ngx_core_module_t  ngx_mypo_module_ctx = {
ngx_string("mypo"),
NULL,
NULL
};

//服务模块入口的结构体,所有的服务模块入口结构体的ngx_module_t.type
//都是设置为NGX_CORE_MODULE的,子模块就是自定义的,如NGX_MYPO_MODULE。
ngx_module_t  ngx_mypo_module = {
NGX_MODULE_V1,
&ngx_mypo_module_ctx,                  /* module context */
ngx_mypo_commands,                     /* module directives */
NGX_CORE_MODULE,                       /* module type */
NULL,                                  /* init master */
NULL,                                  /* init module */
NULL,                                  /* init process */
NULL,                                  /* init thread */
NULL,                                  /* exit thread */
NULL,                                  /* exit process */
NULL,                                  /* exit master */
NGX_MODULE_V1_PADDING
};

static char *
ngx_mypo_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
char                        *rv;
ngx_uint_t                   mi, m, s;
ngx_conf_t                   pcf;
ngx_mypo_module_t           *module;
ngx_mypo_conf_ctx_t         *ctx;
ngx_mypo_core_loc_conf_t    *clcf;
ngx_mypo_core_srv_conf_t   **cscfp;
ngx_mypo_core_main_conf_t   *cmcf;

/* the main mypo context */

ctx = ngx_pcalloc(cf->pool, sizeof(ngx_mypo_conf_ctx_t));
if (ctx == NULL) {
return NGX_CONF_ERROR;
}

*(ngx_mypo_conf_ctx_t **) conf = ctx;

/* count the number of the mypo modules and set up their indices */

ngx_mypo_max_module = 0;
for (m = 0; ngx_modules[m]; m++) {
if (ngx_modules[m]->type != NGX_MYPO_MODULE) {
continue;
}

ngx_modules[m]->ctx_index = ngx_mypo_max_module++;
}

/* the mypo main_conf context, it is the same in the all mypo contexts */

ctx->main_conf = ngx_pcalloc(cf->pool,
sizeof(void *) * ngx_mypo_max_module);
if (ctx->main_conf == NULL) {
return NGX_CONF_ERROR;
}

/*
* the mypo null srv_conf context, it is used to merge
* the server{}s' srv_conf's
*/

ctx->srv_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_mypo_max_module);
if (ctx->srv_conf == NULL) {
return NGX_CONF_ERROR;
}

/*
* the mypo null loc_conf context, it is used to merge
* the server{}s' loc_conf's
*/

ctx->loc_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_mypo_max_module);
if (ctx->loc_conf == NULL) {
return NGX_CONF_ERROR;
}

/*
* create the main_conf's, the null srv_conf's, and the null loc_conf's
* of the all mypo modules
*/

for (m = 0; ngx_modules[m]; m++) {
if (ngx_modules[m]->type != NGX_MYPO_MODULE) {
continue;
}

module = ngx_modules[m]->ctx;
mi = ngx_modules[m]->ctx_index;

if (module->create_main_conf) {
ctx->main_conf[mi] = module->create_main_conf(cf);
if (ctx->main_conf[mi] == NULL) {
return NGX_CONF_ERROR;
}
}

if (module->create_srv_conf) {
ctx->srv_conf[mi] = module->create_srv_conf(cf);
if (ctx->srv_conf[mi] == NULL) {
return NGX_CONF_ERROR;
}
}

if (module->create_loc_conf) {
ctx->loc_conf[mi] = module->create_loc_conf(cf);
if (ctx->loc_conf[mi] == NULL) {
return NGX_CONF_ERROR;
}
}
}

pcf = *cf;
cf->ctx = ctx;

for (m = 0; ngx_modules[m]; m++) {
if (ngx_modules[m]->type != NGX_MYPO_MODULE) {
continue;
}

module = ngx_modules[m]->ctx;

if (module->preconfiguration) {
if (module->preconfiguration(cf) != NGX_OK) {
return NGX_CONF_ERROR;
}
}
}

/* parse inside the mypo{} block */

cf->module_type = NGX_MYPO_MODULE;
cf->cmd_type = NGX_MYPO_MAIN_CONF;
rv = ngx_conf_parse(cf, NULL);

if (rv != NGX_CONF_OK) {
goto failed;
}

/*
* init mypo{} main_conf's, merge the server{}s' srv_conf's
* and its location{}s' loc_conf's
*/

cmcf = ctx->main_conf[ngx_mypo_core_module.ctx_index];
cscfp = cmcf->servers.elts;

for (m = 0; ngx_modules[m]; m++) {
if (ngx_modules[m]->type != NGX_MYPO_MODULE) {
continue;
}

module = ngx_modules[m]->ctx;
mi = ngx_modules[m]->ctx_index;

/* init mypo{} main_conf's */

if (module->init_main_conf) {
rv = module->init_main_conf(cf, ctx->main_conf[mi]);
if (rv != NGX_CONF_OK) {
goto failed;
}
}

rv = ngx_mypo_merge_servers(cf, cmcf, module, mi);
if (rv != NGX_CONF_OK) {
goto failed;
}
}

/* create location trees */
for (s = 0; s < cmcf->servers.nelts; s++) {

clcf = cscfp[s]->ctx->loc_conf[ngx_mypo_core_module.ctx_index];

if (ngx_mypo_init_locations(cf, cscfp[s], clcf) != NGX_OK) {
return NGX_CONF_ERROR;
}

if (ngx_mypo_init_static_location_trees(cf, clcf) != NGX_OK) {
return NGX_CONF_ERROR;
}
}

for (m = 0; ngx_modules[m]; m++) {
if (ngx_modules[m]->type != NGX_MYPO_MODULE) {
continue;
}

module = ngx_modules[m]->ctx;

if (module->postconfiguration) {
if (module->postconfiguration(cf) != NGX_OK) {
return NGX_CONF_ERROR;
}
}
}

*cf = pcf;

/* optimize the lists of ports, addresses and server names */

if (ngx_mypo_optimize_servers(cf, cmcf, cmcf->ports) != NGX_OK) {
return NGX_CONF_ERROR;
}

return NGX_CONF_OK;

failed:

*cf = pcf;

return rv;
}


ngx_mypo_core_module.c:

//server级别的配置
static ngx_command_t  ngx_mypo_core_commands[] = {

{ ngx_string("server"),
NGX_MYPO_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS,
ngx_mypo_core_server,
0,
0,
NULL },
//代理监听的IP端口,调用ngx_mypo_core_listen函数解析listen行的数据,
//NGX_CONF_1MORE标识标识可以带多个参数
{ ngx_string("listen"),
NGX_MYPO_SRV_CONF|NGX_CONF_1MORE,
ngx_mypo_core_listen,
NGX_MYPO_SRV_CONF_OFFSET,
0,
NULL },
//upstream配置的是上流服务器的IP端口,调用ngx_mypo_core_upstream函数解析数据。
{ ngx_string("upstream"),
NGX_MYPO_SRV_CONF|NGX_CONF_1MORE,
ngx_mypo_core_upstream,
NGX_MYPO_SRV_CONF_OFFSET,
0,
NULL },

ngx_null_command

};

//子模块的一下初始化,合并配置的函数声明
static ngx_mypo_module_t  ngx_mypo_core_module_ctx = {
ngx_mypo_core_preconfiguration,        /* preconfiguration */
NULL,                                  /* postconfiguration */

ngx_mypo_core_create_main_conf,        /* create main configuration */
ngx_mypo_core_init_main_conf,          /* init main configuration */

ngx_mypo_core_create_srv_conf,         /* create server configuration */
ngx_mypo_core_merge_srv_conf,          /* merge server configuration */

ngx_mypo_core_create_loc_conf,         /* create location configuration */
ngx_mypo_core_merge_loc_conf           /* merge location configuration */
};

//mypo子模块核心模块的入口结构体
ngx_module_t  ngx_mypo_core_module = {
NGX_MODULE_V1,
&ngx_mypo_core_module_ctx,             /* module context */
ngx_mypo_core_commands,                /* module directives */
NGX_MYPO_MODULE,                       /* module type */
NULL,                                  /* init master */
NULL,                                  /* init module */
NULL,                                  /* init process */
NULL,                                  /* init thread */
NULL,                                  /* exit thread */
NULL,                                  /* exit process */
NULL,                                  /* exit master */
NGX_MODULE_V1_PADDING
};
static char *
ngx_mypo_core_server(ngx_conf_t *cf, ngx_command_t *cmd, void *dummy)
{
char                        *rv;
void                        *mconf;
ngx_uint_t                   i;
ngx_conf_t                   pcf;
ngx_mypo_module_t           *module;
struct sockaddr_in          *sin;
ngx_mypo_conf_ctx_t         *ctx, *mypo_ctx;
ngx_mypo_listen_opt_t        lsopt;
ngx_mypo_core_srv_conf_t    *cscf, **cscfp;
ngx_mypo_core_main_conf_t   *cmcf;

ctx = ngx_pcalloc(cf->pool, sizeof(ngx_mypo_conf_ctx_t));
if (ctx == NULL) {
return NGX_CONF_ERROR;
}

mypo_ctx = cf->ctx;
ctx->main_conf = mypo_ctx->main_conf;

/* the server{}'s srv_conf */

ctx->srv_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_mypo_max_module);
if (ctx->srv_conf == NULL) {
return NGX_CONF_ERROR;
}

/* the server{}'s loc_conf */

ctx->loc_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_mypo_max_module);
if (ctx->loc_conf == NULL) {
return NGX_CONF_ERROR;
}

for (i = 0; ngx_modules[i]; i++) {
if (ngx_modules[i]->type != NGX_MYPO_MODULE) {
continue;
}

module = ngx_modules[i]->ctx;

if (module->create_srv_conf) {
mconf = module->create_srv_conf(cf);
if (mconf == NULL) {
return NGX_CONF_ERROR;
}

ctx->srv_conf[ngx_modules[i]->ctx_index] = mconf;
}

if (module->create_loc_conf) {
mconf = module->create_loc_conf(cf);
if (mconf == NULL) {
return NGX_CONF_ERROR;
}

ctx->loc_conf[ngx_modules[i]->ctx_index] = mconf;
}
}

/* the server configuration context */

cscf = ctx->srv_conf[ngx_mypo_core_module.ctx_index];
cscf->ctx = ctx;

cmcf = ctx->main_conf[ngx_mypo_core_module.ctx_index];

cscfp = ngx_array_push(&cmcf->servers);
if (cscfp == NULL) {
return NGX_CONF_ERROR;
}

*cscfp = cscf;

/* parse inside server{} */

pcf = *cf;
cf->ctx = ctx;
cf->cmd_type = NGX_MYPO_SRV_CONF;

rv = ngx_conf_parse(cf, NULL);

*cf = pcf;

if (rv == NGX_CONF_OK && !cscf->listen) {
ngx_memzero(&lsopt, sizeof(ngx_mypo_listen_opt_t));

sin = &lsopt.u.sockaddr_in;

sin->sin_family = AF_INET;
sin->sin_port = htons((getuid() == 0) ? 80 : 8000);
sin->sin_addr.s_addr = INADDR_ANY;

lsopt.socklen = sizeof(struct sockaddr_in);

lsopt.backlog = NGX_LISTEN_BACKLOG;
lsopt.rcvbuf = -1;
lsopt.sndbuf = -1;
lsopt.wildcard = 1;

(void) ngx_sock_ntop(&lsopt.u.sockaddr, lsopt.socklen, lsopt.addr,
NGX_SOCKADDR_STRLEN, 1);

if (ngx_mypo_add_listen(cf, cscf, &lsopt) != NGX_OK) {
return NGX_CONF_ERROR;
}
}

return rv;
}


函数ngx_mypo_core_listen定义:

static char *
ngx_mypo_core_listen(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_mypo_core_srv_conf_t *cscf = conf;

ngx_str_t              *value /*, size*/;
ngx_url_t               u;
//ngx_uint_t              n;
ngx_mypo_listen_opt_t   lsopt;

cscf->listen = 1;

value = cf->args->elts;

ngx_memzero(&u, sizeof(ngx_url_t));

u.url = value[1];
u.listen = 1;
u.default_port = 80;

//解析IP端口主要是调用nginx里的内部函数ngx_parse_url进行解析。

if (ngx_parse_url(cf->pool, &u) != NGX_OK) {
if (u.err) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"%s in \"%V\" of the \"listen\" directive",
u.err, &u.url);
}

return NGX_CONF_ERROR;
}

ngx_memzero(&lsopt, sizeof(ngx_mypo_listen_opt_t));

ngx_memcpy(&lsopt.u.sockaddr, u.sockaddr, u.socklen);

lsopt.socklen = u.socklen;
lsopt.backlog = NGX_LISTEN_BACKLOG;
lsopt.rcvbuf = -1;
lsopt.sndbuf = -1;
lsopt.wildcard = u.wildcard;
#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
lsopt.ipv6only = 1;
#endif

(void) ngx_sock_ntop(&lsopt.u.sockaddr, lsopt.socklen, lsopt.addr,
NGX_SOCKADDR_STRLEN, 1);

//保存监听IP端口的列表到conf中,conf是底层调用时传过来的全局自定义变量,
//全局配置数据都保存到这里
if (ngx_mypo_add_listen(cf, cscf, &lsopt) == NGX_OK) {
return NGX_CONF_OK;
}

return NGX_CONF_ERROR;
}


函数ngx_mypo_core_upstream的定义:

static char *
ngx_mypo_core_upstream(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_mypo_core_srv_conf_t *cscf = conf;

ngx_str_t              *value /*, size*/;
ngx_url_t               u;

value = cf->args->elts;

ngx_memzero(&u, sizeof(ngx_url_t));

u.url = value[1];
u.listen = 1;
u.default_port = 80;

if (ngx_parse_url(cf->pool, &u) != NGX_OK) {
if (u.err) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"%s in \"%V\" of the \"upstream\" directive",
u.err, &u.url);
}

return NGX_CONF_ERROR;
}

ngx_memzero(&(cscf->upstream_addr), sizeof(cscf->upstream_addr));
//解析创建出来的上流服务器IP端口保存到全局的配置
ngx_memcpy(&(cscf->upstream_addr), u.sockaddr, u.socklen);

return NGX_CONF_OK;
}


第三步,开始初始化服务,也就是开始绑定端口,监听端口

从函数ngx_mypo_optimize_servers开始:

static ngx_int_t
ngx_mypo_optimize_servers(ngx_conf_t *cf, ngx_mypo_core_main_conf_t *cmcf,
ngx_array_t *ports)
{
ngx_uint_t             p, a;
ngx_mypo_conf_port_t  *port;
ngx_mypo_conf_addr_t  *addr;

if (ports == NULL) {
return NGX_OK;
}

//这里的架构基本都是全套用http的,然后删掉一下http业务和mypo业务不同的部分。
port = ports->elts;
for (p = 0; p < ports->nelts; p++) {

ngx_sort(port[p].addrs.elts, (size_t) port[p].addrs.nelts,
sizeof(ngx_mypo_conf_addr_t), ngx_mypo_cmp_conf_addrs);

/*
* check whether all name-based servers have the same
* configuration as a default server for given address:port
*/

addr = port[p].addrs.elts;
for (a = 0; a < port[p].addrs.nelts; a++) {

if (addr[a].servers.nelts > 1
)
{
if (ngx_mypo_server_names(cf, cmcf, &addr[a]) != NGX_OK) {
return NGX_ERROR;
}
}
}

if (ngx_mypo_init_listening(cf, &port[p]) != NGX_OK) {
return NGX_ERROR;
}
}

return NGX_OK;
}


函数ngx_mypo_init_listening里调用ngx_mypo_add_listening最终调用nginx的API创建非阻塞sock:

static ngx_listening_t *
ngx_mypo_add_listening(ngx_conf_t *cf, ngx_mypo_conf_addr_t *addr)
{

ngx_listening_t           *ls;
ngx_mypo_core_loc_conf_t  *clcf;
ngx_mypo_core_srv_conf_t  *cscf;

ls = ngx_create_listening(cf, &addr->opt.u.sockaddr, addr->opt.socklen);
if (ls == NULL) {
return NULL;
}

ls->addr_ntop = 1;
//开发服务器时的基本步骤就是bind->listen->accept,bind和listen都是按照流程
//走下来不会有阻塞的了,而accept什么时候调用到,取决于客户端何时有连接才会返回socket
//进行读写的,这一步是交给nginx底层的event来处理的才能发挥出nginx的高效性,上层要做的
//就是把处理socket的方法传给nginx底层,传递步骤就是给ngx_create_listening返回
//结构体的handler成员赋值:
ls->handler = ngx_mypo_init_connection;

cscf = addr->default_server;

cscf->connection_pool_size = 256;
cscf->client_header_timeout = 60000;

ls->pool_size = cscf->connection_pool_size;
ls->post_accept_timeout = cscf->client_header_timeout;

clcf = cscf->ctx->loc_conf[ngx_mypo_core_module.ctx_index];

//这里的error_log是不能为空的,否则在nginx底层调用初始化的时候就会crash掉了
if (clcf->error_log == NULL) {
clcf->error_log = &cf->cycle->new_log;
}

ls->logp = clcf->error_log;
ls->log.data = &ls->addr_text;
//ls->log.handler = ngx_accept_log_error;
ls->log.handler = NULL;

ls->backlog = addr->opt.backlog;
ls->rcvbuf = addr->opt.rcvbuf;
ls->sndbuf = addr->opt.sndbuf;
ls->keepalive = addr->opt.so_keepalive;

#if (NGX_HAVE_KEEPALIVE_TUNABLE)
ls->keepidle = addr->opt.tcp_keepidle;
ls->keepintvl = addr->opt.tcp_keepintvl;
ls->keepcnt = addr->opt.tcp_keepcnt;
#endif

return ls;
}


第四步,给各个连接(connection)设置业务处理

ngx_create_listening返回的参数里的handler成员赋值ngx_mypo_init_connection:

void
ngx_mypo_init_connection(ngx_connection_t *c)
{
ngx_uint_t              i;
ngx_event_t            *rev;
struct sockaddr_in     *sin;
ngx_mypo_port_t        *port;
ngx_mypo_in_addr_t     *addr;
ngx_mypo_log_ctx_t     *ctx;
#if (NGX_HAVE_INET6)
struct sockaddr_in6    *sin6;
ngx_mypo_in6_addr_t    *addr6;
#endif

port = c->listening->servers;

if (port->naddrs > 1) {

/*
* there are several addresses on this port and one of them
* is an "*:port" wildcard so getsockname() in ngx_mypo_server_addr()
* is required to determine a server address
*/

if (ngx_connection_local_sockaddr(c, NULL, 0) != NGX_OK) {
ngx_mypo_close_connection(c);
return;
}

switch (c->local_sockaddr->sa_family) {

#if (NGX_HAVE_INET6)
case AF_INET6:
sin6 = (struct sockaddr_in6 *) c->local_sockaddr;

addr6 = port->addrs;

/* the last address is "*" */

for (i = 0; i < port->naddrs - 1; i++) {
if (ngx_memcmp(&addr6[i].addr6, &sin6->sin6_addr, 16) == 0) {
break;
}
}

break;
#endif

default: /* AF_INET */
sin = (struct sockaddr_in *) c->local_sockaddr;

addr = port->addrs;

/* the last address is "*" */
for (i = 0; i < port->naddrs - 1; i++) {
if (addr[i].addr == sin->sin_addr.s_addr) {
break;
}
}

// add core module configure to the connection
c->data = &addr[i].conf;

break;
}

} else {

switch (c->local_sockaddr->sa_family) {

#if (NGX_HAVE_INET6)
case AF_INET6:
addr6 = port->addrs;
//hc->addr_conf = &addr6[0].conf;
break;
#endif

default: /* AF_INET */
addr = port->addrs;
//hc->addr_conf = &addr[0].conf;

c->data = &addr[0].conf;
break;
}
}

/* the default server configuration for the address:port */
//hc->conf_ctx = hc->addr_conf->default_server->ctx;

ctx = ngx_palloc(c->pool, sizeof(ngx_mypo_log_ctx_t));
if (ctx == NULL) {
ngx_mypo_close_connection(c);
return;
}

ctx->connection = c;
ctx->request = NULL;
ctx->current_request = NULL;

c->log->connection = c->number;
c->log->handler = NULL/*ngx_mypo_log_error*/;
c->log->data = ctx;
c->log->action = "waiting for request";

c->log_error = NGX_ERROR_INFO;

rev = c->read;
//accept到客户端的连接后就是数据的读写了,这又是一步阻塞的过程,需要让nginx底层
//把它变成非阻塞,基本不用自己处理的了,同样只需要把读写数据的处理方法传递给nginx
//底层,让nginx底层来调用:
rev->handler = ngx_mypo_wait_request_handler;
c->write->handler = ngx_mypo_empty_handler;

if (rev->ready) {
/* the deferred accept(), rtsig, aio, iocp */

if (ngx_use_accept_mutex) {
ngx_post_event(rev, &ngx_posted_events);
return;
}
rev->handler(rev);
return;
}

ngx_add_timer(rev, c->listening->post_accept_timeout);
ngx_reusable_connection(c, 1);

if (ngx_handle_read_event(rev, 0) != NGX_OK) {
ngx_mypo_close_connection(c);
return;
}
}


接收数据的处理接口实现:

rev->handler = ngx_mypo_wait_request_handler;
c->write->handler = ngx_mypo_empty_handler;


ngx_mypo_wait_request_handler:

static void
ngx_mypo_wait_request_handler(ngx_event_t *rev)
{
//u_char                    *p;
size_t                     size;
ssize_t                    n;
ngx_buf_t                 *b;
ngx_connection_t          *c;

//接收到来自底层的网络数据消息,Nginx是事件驱动的,所以一切信息都封装到ngx_event_t里去。
//想要读取数据就先获取到connection:
c = rev->data;

if (rev->timedout) {
ngx_mypo_close_connection(c);
return;
}

if (c->close) {
ngx_mypo_close_connection(c);
return;
}

//这里接收数据的大小应该根据协议的规定,解析和读取,这里只是为了方便固定设置成12而已。
size = 12;

b = c->buffer;

if (b == NULL) {
b = ngx_create_temp_buf(c->pool, size);
if (b == NULL) {
ngx_mypo_close_connection(c);
return;
}

c->buffer = b;

} else if (b->start == NULL) {

b->start = ngx_palloc(c->pool, size);
if (b->start == NULL) {
ngx_mypo_close_connection(c);
return;
}

b->pos = b->start;
b->last = b->start;
b->end = b->last + size;
}

//从这里就开始读取客户端的请求数据了
n = c->recv(c, b->last, size);

if (n == NGX_AGAIN) {

if (!rev->timer_set) {
ngx_add_timer(rev, c->listening->post_accept_timeout);
ngx_reusable_connection(c, 1);
}

if (ngx_handle_read_event(rev, 0) != NGX_OK) {
ngx_mypo_close_connection(c);
return;
}

/*
* We are trying to not hold c->buffer's memory for an idle connection.
*/

if (ngx_pfree(c->pool, b->start) == NGX_OK) {
b->start = NULL;
}

return;
}

if (n == NGX_ERROR) {
ngx_mypo_close_connection(c);
return;
}

if (n == 0) {
ngx_log_error(NGX_LOG_INFO, c->log, 0,
"client closed connection");
ngx_mypo_close_connection(c);
return;
}

b->last += n;

c->log->action = "reading client request line";

ngx_reusable_connection(c, 0);

//create the upstream and send data to the upstream server
//保存客户端的数据,然后创建上流服务器的连接,把数据转发给上流服务器。
ngx_mypo_upstream_init(rev);

//send data to client
/*
//如果服务器不做代理转发,而是直接返回数据到客户端,可以在这里进行处理。
ngx_chain_t out[1];

b = ngx_pcalloc(c->pool, sizeof(ngx_buf_t));
out[0].buf = b;
out[0].next = NULL;

b->pos = (u_char*)"hello_world, from Nginx";
b->last = b->pos + sizeof("hello_world, from Nginx") - 1;
b->memory = 1;
ngx_mypo_send(rev, out, 0);
*/

}


ngx_mypo_upstream_init:

static int ngx_mypo_upstream_init(ngx_event_t *rev)
{
size_t                     size;
ngx_buf_t                 *b;
ngx_connection_t          *c;
ngx_int_t                   rc;
ngx_mypo_addr_conf_t      *addr_conf;
ngx_mypo_core_srv_conf_t  *core_conf;

c = rev->data;

addr_conf = c->data;
core_conf = addr_conf->default_server;

if (rev->timedout) {
ngx_mypo_close_connection(c);
return -1;
}

if (c->close) {
ngx_mypo_close_connection(c);
return -1;
}

//size = cscf->client_header_buffer_size;
size = 16;

b = c->buffer;

ngx_peer_connection_t *peer = ngx_palloc(c->pool, sizeof(ngx_peer_connection_t));
if (NULL == peer){
ngx_mypo_close_connection(c);
return -1;
}

//赋值上流服务器的IP和端口,然后连接上流服务器。
peer->sockaddr = &(core_conf->upstream_addr);
peer->socklen = sizeof(core_conf->upstream_addr);
peer->name = NULL/*&ahcf->peer->name*/;
peer->get = ngx_event_get_peer;
peer->log = c->log;
peer->log_error = NGX_ERROR_ERR;

rc = ngx_event_connect_peer(peer);

if (rc == NGX_ERROR || rc == NGX_BUSY || rc == NGX_DECLINED) {
if (peer->connection) {
ngx_close_connection(peer->connection);
}

ngx_close_connection(c);
return -1;
}
//接收到上流服务器返回的数据,需要把数据回传给客户端,但是不管上流
//服务器的连接还是客户端的连接,都是底层传过来的,如果自定义一个
//全局变量来保存这些连接并且进行管理,也是可以,但维护起来就会很
//繁琐,这里可以使用ngx_connection_t结构体里的data字段,它是
//void*类型的,专门用来保存自定义类型的变量,自己可以把所需要的
//业务信息封装成一个结构体,然后分配内存赋值给data,由于这里业务
//简单,直接使用Nginx本身的ngx_event_t就足够了。
peer->connection->data = rev;
peer->connection->pool = c->pool;
//指定与上流服务器读写数据事件的方法
peer->connection->read->handler = ngx_mypo_upstream_rec_data;
peer->connection->write->handler = ngx_mypo_upstream_send_data;

return 0;
}


ngx_mypo_upstream_rec_data:

static void ngx_mypo_upstream_rec_data(ngx_event_t *rev)
{
size_t                     size;
ssize_t                    n;
ngx_buf_t                 *b;
ngx_connection_t          *sc, *cc;
ngx_event_t               *cevent;

//首先获取服务器和客户端的connection:
sc = rev->data;
cevent = sc->data;
cc = cevent->data;

if (rev->timedout) {
ngx_mypo_close_connection(sc);
return ;
}

if (sc->close) {
ngx_mypo_close_connection(sc);
return ;
}

size = 16;

b = sc->buffer;

if (b == NULL) {
b = ngx_create_temp_buf(sc->pool, size);
if (b == NULL) {
ngx_mypo_close_connection(sc);
return;
}

sc->buffer = b;

} else if (b->start == NULL) {

b->start = ngx_palloc(sc->pool, size);
if (b->start == NULL) {
ngx_mypo_close_connection(sc);
return ;
}

b->pos = b->start;
b->last = b->start;
b->end = b->last + size;
}

n = sc->recv(sc, b->last, size);

if (n == NGX_AGAIN) {

if (!rev->timer_set) {
ngx_add_timer(rev, sc->listening->post_accept_timeout);
ngx_reusable_connection(sc, 1);
}

if (ngx_handle_read_event(rev, 0) != NGX_OK) {
ngx_mypo_close_connection(sc);
return ;
}

/*
* We are trying to not hold c->buffer's memory for an idle connection.
*/

if (ngx_pfree(sc->pool, b->start) == NGX_OK) {
b->start = NULL;
}

return;
}

if (n == NGX_ERROR) {
ngx_mypo_close_connection(sc);
return;
}

if (n == 0) {
ngx_mypo_close_connection(sc);
return ;
}

b->last += n;

sc->log->action = "reading client request line";

ngx_reusable_connection(sc, 0);

ngx_chain_t out[1];

//b = ngx_pcalloc(sc->pool, sizeof(ngx_buf_t));
out[0].buf = b;
out[0].next = NULL;

//b->pos = (u_char*)"hello_world, from Nginx";
//b->last = b->pos + sizeof("hello_world, from Nginx") - 1;
//b->memory = 1;
ngx_mypo_send(cevent, out, 0);

ngx_mypo_close_connection(cc);

return;
}


方法ngx_mypo_upstream_send_data的定义:

static void ngx_mypo_upstream_send_data(ngx_event_t *rev)
{
ngx_buf_t                 *b;
ngx_connection_t          *sc, *cc;
ngx_event_t               *cevent;

//一旦connect上流服务器成功,可以发送数据的时候就会调用到这个方法。
sc = rev->data;
cevent = sc->data;
cc = cevent->data;
b = cc->buffer;

//客户端接收到数据以后已经封装到buffer中了,现在只需要把它读出来,转发给上流服务器。
ngx_chain_t out[1];

out[0].buf = b;
out[0].next = NULL;

ngx_mypo_send(rev, out, 0);

return;
}


方法ngx_mypo_send:

static void ngx_mypo_send(ngx_event_t *wev, ngx_chain_t *in, off_t limit)
{
ngx_connection_t                   *cc;
ngx_chain_t                        *cl;

cc = wev->data;

if (cc->destroyed) {
return;
}

if (wev->timedout) {
ngx_log_error(NGX_LOG_INFO, cc->log, NGX_ETIMEDOUT,
"netcall: client send timed out");
cc->timedout = 1;
ngx_mypo_close_connection(cc);
return;
}

if (wev->timer_set) {
ngx_del_timer(wev);
}

cl = cc->send_chain(cc, in, limit);

if (cl == NGX_CHAIN_ERROR) {
ngx_mypo_close_connection(cc);
return;
}

/* more data to send? */
if (cl) {
ngx_add_timer(wev, 10000);
if (ngx_handle_write_event(wev, 0) != NGX_OK) {
ngx_mypo_close_connection(cc);
}
return;
}

/* we've sent everything we had.
* now receive reply */
ngx_del_event(wev, NGX_WRITE_EVENT, 0);

}


第五步,把自定义模块编译进nginx

有两种方法:

第一种修改ojbs目录下的makefile和ngx_modules.c文件,以http模块为模板,修改成mypo。

ngx_modules.c:

//声明外部变量
extern ngx_module_t ngx_mypo_module;
extern ngx_module_t ngx_mypo_core_module;

ngx_module_t *ngx_modules[] = {
......

&ngx_mypo_module,
&ngx_mypo_core_module,

NULL
};


makefile:

#"..."是Makefile原来的配置,由于mypo模块是放到nginx目录里的src下,
#所以路径是"./src/mypo/",如果是其他模块,可以把该

模块的绝对路径替换"./src/mypo/"即可。
CFLAGS =  ... -I./src/mypo/

ADDON_DEPS = ... ./src/mypo/ngx_mypo_core_module.h  ./src/mypo/ngx_mypo_config.h ./src/mypo/ngx_mypo.h

objs/nginx: ... \
objs/addon/mypo/ngx_mypo.o \
objs/addon/mypo/ngx_mypo_core_module.o \
...

$(LINK) -o ... \
objs/addon/mypo/ngx_mypo.o \
objs/addon/mypo/ngx_mypo_core_module.o \
...

objs/addon/mypo/ngx_mypo.o: $(ADDON_DEPS) \
./src/mypo//ngx_mypo.c
$(CC) -c $(CFLAGS)  $(ALL_INCS) \
-o objs/addon/mypo/ngx_mypo.o \
./src/mypo/ngx_mypo.c

objs/addon/mypo/ngx_mypo_core_module.o: $(ADDON_DEPS) \
./src/mypo//ngx_mypo_core_module.c
$(CC) -c $(CFLAGS)  $(ALL_INCS) \
-o objs/addon/mypo/ngx_mypo_core_module.o \
./src/mypo/ngx_mypo_core_module.c


第二种,在mypo目录下添加config文件,编译nginx前configure的时候加入mypo模块的目录到–add-module选项,这种方法简单,但之前configure没有保存的话,之前的编译配置都会被清掉,所以不要轻易地configure。

configure文件的内容:

ngx_addon_name="ngx_mypo_module"

CORE_MODULES="$CORE_MODULES
ngx_mypo_module                             \
ngx_mypo_core_module                        \
"

NGX_ADDON_DEPS="$NGX_ADDON_DEPS                             \
$ngx_addon_dir/ngx_mypo_core_module.h       \
$ngx_addon_dir/ngx_mypo_config.h            \
$ngx_addon_dir/ngx_mypo.h                   \
"

NGX_ADDON_SRCS="$NGX_ADDON_SRCS                             \
$ngx_addon_dir/ngx_mypo.c                   \
$ngx_addon_dir/ngx_mypo_core_module.c       \
"
CFLAGS="$CFLAGS -I$ngx_addon_dir"


最后归纳一下调用到的nginx API:

void *ngx_pcalloc(ngx_pool_t *pool, size_t size);
char *ngx_conf_parse(ngx_conf_t *cf, ngx_str_t *filename);
void *ngx_array_push(ngx_array_t *a);

size_t
ngx_sock_ntop(struct sockaddr *sa, socklen_t socklen, u_char *text, size_t len,
ngx_uint_t port);

#define ngx_memzero(buf, n)       (void) memset(buf, 0, n)

ngx_array_t *
ngx_array_create(ngx_pool_t *p, ngx_uint_t n, size_t size);

static ngx_inline ngx_int_t
ngx_array_init(ngx_array_t *array, ngx_pool_t *pool, ngx_uint_t n, size_t size);

ngx_int_t
ngx_parse_url(ngx_pool_t *pool, ngx_url_t *u);

ngx_listening_t *
ngx_create_listening(ngx_conf_t *cf, void *sockaddr, socklen_t socklen);

ngx_int_t
ngx_connection_local_sockaddr(ngx_connection_t *c, ngx_str_t *s,
ngx_uint_t port);

void
ngx_close_connection(ngx_connection_t *c);

ngx_buf_t *
ngx_create_temp_buf(ngx_pool_t *pool, size_t size);

void
ngx_reusable_connection(ngx_connection_t *c, ngx_uint_t reusable);

ngx_int_t
ngx_handle_read_event(ngx_event_t *rev, ngx_uint_t flags);

ngx_int_t
ngx_event_connect_peer(ngx_peer_connection_t *pc);


代码看多了,就会觉得,再庞大的系统如Linux内核、rtos之类的,核心的就几个,其他的都是按照那几个核心模板复制出来的。不用自己想的,直接套模板,跟着代码节奏去走。就好像开车那样,开到一定境界,不用特意去想怎么加油、挂当,车自己就会走了,更高的境界就是自己去改装车子了,让车子变成自己的手和脚。

顺便也谈谈这些年心得吧,其实做网络开发,阅读服务端代码,不管是nginx还是Apache,还是其他的abcd系统,只要是网络服务器的,就一定不会逃过以下的几步:

1、创建socket;

2、绑定(bind)地址和端口号;

3、监听(listen)端口;(如果是udp,这步就跳过了)

4、接收(accept)客户端链接;(udp这步也跳过)

5、读写数据,业务处理。这里能够调用的函数种类就很多了,有read/write、rec/send、recfrom/sendto等等;

这几步说是各个网络服务类代码的主干,其他都是旁枝末节,如果看代码的时候在旁枝末节上迷了路,可以回到主干上寻找突破口。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: