Nginx服务模块开发
纵观网上各种关于nginx的模块开发的资料都是基于HTTP的模块开发,这里就整理一篇是基于TCP协议的服务模块开发,并且给出一套自定义服务模块的源码,该模块主要实现的功能是监听某一端口,然后把接收到的客户端数据转发的上流服务器,再把上流服务器的返回数据回传给客户端。模块源码下载地址:https://code.csdn.net/gamer727/nginx_mypo_module
Nginx的服务模块一般都是以四个字母命名的,这里就命名为mypo模块(mypo取“my port”前四个字母)。本模块开发的nginx的版本为1.6.2
根据个人的研究整理,nginx的开发步骤大致归纳以下五步:
第一步,在配置文件中增加自定义配置;
第二步,根据配置文件的内容初始化各种参数;
第三步,开始初始化服务,也就是开始绑定端口,监听端口;
第四步,给各个连接(connection)设置业务处理,这里的业务处理的细节步骤就取决于需求的复杂度了;
第五步,把自定义模块编译进nginx。
第一步,在配置文件nginx.conf中增加自定义配置
mypo {server {listen 1025;upstream 127.0.0.1:1027;}}
配置文件参考
http的,配置文件一般分成三个等级:main、server、location,由于mypo只实现基本的转发功能,所以这里就两个配置,一个listen监听本地端口的,和一个upstream配置上流服务器的。
第二步,根据配置文件的内容初始化各种参数
这里开始进入源码,首先参考http模块新建mypo模块的核心文件:
ngx_mypo_config.h,定义mypo模块的主要结构体和模块标识;
ngx_mypo.c、ngx_mypo.h,mypo模块的入口,调用各种子模块;
ngx_mypo_core_module.c、ngx_mypo_core_module.h,子模块的核心,读取server级别的配置。
首先讲解一下全局配置头文件ngx_mypo_config.h的部分内容:
typedef struct {void **main_conf;void **srv_conf;void **loc_conf;
} ngx_mypo_conf_ctx_t;typedef struct {ngx_int_t (*preconfiguration)(ngx_conf_t *cf);ngx_int_t (*postconfiguration)(ngx_conf_t *cf);void *(*create_main_conf)(ngx_conf_t *cf);char *(*init_main_conf)(ngx_conf_t *cf, void *conf);void *(*create_srv_conf)(ngx_conf_t *cf);char *(*merge_srv_conf)(ngx_conf_t *cf, void *prev, void *conf);void *(*create_loc_conf)(ngx_conf_t *cf);char *(*merge_loc_conf)(ngx_conf_t *cf, void *prev, void *conf);
} ngx_mypo_module_t;//定义mypo模块的标识,为了避免数值冲突和统一风格,数值是模块名的ASCII码
#define NGX_MYPO_MODULE 0x4D59504F /* "MYPO" */
ngx_mypo.c:
//读取完配置文件后,如果匹配到了mypo{}就会调用这个结构体里赋值的
//ngx_mypo_block函数,可以说ngx_mypo_block是mypo的整体框架。
static ngx_command_t ngx_mypo_commands[] = {{ ngx_string("mypo"),NGX_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS,ngx_mypo_block,0,0,NULL },ngx_null_command
};static ngx_core_module_t ngx_mypo_module_ctx = {ngx_string("mypo"),NULL,NULL
};//服务模块入口的结构体,所有的服务模块入口结构体的ngx_module_t.type
//都是设置为NGX_CORE_MODULE的,子模块就是自定义的,如NGX_MYPO_MODULE。
ngx_module_t ngx_mypo_module = {NGX_MODULE_V1,&ngx_mypo_module_ctx, /* module context */ngx_mypo_commands, /* module directives */NGX_CORE_MODULE, /* module type */NULL, /* init master */NULL, /* init module */NULL, /* init process */NULL, /* init thread */NULL, /* exit thread */NULL, /* exit process */NULL, /* exit master */NGX_MODULE_V1_PADDING
};static char *
ngx_mypo_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{char *rv;ngx_uint_t mi, m, s;ngx_conf_t pcf;ngx_mypo_module_t *module;ngx_mypo_conf_ctx_t *ctx;ngx_mypo_core_loc_conf_t *clcf;ngx_mypo_core_srv_conf_t **cscfp;ngx_mypo_core_main_conf_t *cmcf;/* the main mypo context */ctx = ngx_pcalloc(cf->pool, sizeof(ngx_mypo_conf_ctx_t));if (ctx == NULL) {return NGX_CONF_ERROR;}*(ngx_mypo_conf_ctx_t **) conf = ctx;/* count the number of the mypo modules and set up their indices */ngx_mypo_max_module = 0;for (m = 0; ngx_modules[m]; m++) {if (ngx_modules[m]->type != NGX_MYPO_MODULE) {continue;}ngx_modules[m]->ctx_index = ngx_mypo_max_module++;}/* the mypo main_conf context, it is the same in the all mypo contexts */ctx->main_conf = ngx_pcalloc(cf->pool,sizeof(void *) * ngx_mypo_max_module);if (ctx->main_conf == NULL) {return NGX_CONF_ERROR;}/** the mypo null srv_conf context, it is used to merge* the server{}s' srv_conf's*/ctx->srv_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_mypo_max_module);if (ctx->srv_conf == NULL) {return NGX_CONF_ERROR;}/** the mypo null loc_conf context, it is used to merge* the server{}s' loc_conf's*/ctx->loc_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_mypo_max_module);if (ctx->loc_conf == NULL) {return NGX_CONF_ERROR;}/** create the main_conf's, the null srv_conf's, and the null loc_conf's* of the all mypo modules*/for (m = 0; ngx_modules[m]; m++) {if (ngx_modules[m]->type != NGX_MYPO_MODULE) {continue;}module = ngx_modules[m]->ctx;mi = ngx_modules[m]->ctx_index;if (module->create_main_conf) {ctx->main_conf[mi] = module->create_main_conf(cf);if (ctx->main_conf[mi] == NULL) {return NGX_CONF_ERROR;}}if (module->create_srv_conf) {ctx->srv_conf[mi] = module->create_srv_conf(cf);if (ctx->srv_conf[mi] == NULL) {return NGX_CONF_ERROR;}}if (module->create_loc_conf) {ctx->loc_conf[mi] = module->create_loc_conf(cf);if (ctx->loc_conf[mi] == NULL) {return NGX_CONF_ERROR;}}}pcf = *cf;cf->ctx = ctx;for (m = 0; ngx_modules[m]; m++) {if (ngx_modules[m]->type != NGX_MYPO_MODULE) {continue;}module = ngx_modules[m]->ctx;if (module->preconfiguration) {if (module->preconfiguration(cf) != NGX_OK) {return NGX_CONF_ERROR;}}}/* parse inside the mypo{} block */cf->module_type = NGX_MYPO_MODULE;cf->cmd_type = NGX_MYPO_MAIN_CONF;rv = ngx_conf_parse(cf, NULL);if (rv != NGX_CONF_OK) {goto failed;}/** init mypo{} main_conf's, merge the server{}s' srv_conf's* and its location{}s' loc_conf's*/cmcf = ctx->main_conf[ngx_mypo_core_module.ctx_index];cscfp = cmcf->servers.elts;for (m = 0; ngx_modules[m]; m++) {if (ngx_modules[m]->type != NGX_MYPO_MODULE) {continue;}module = ngx_modules[m]->ctx;mi = ngx_modules[m]->ctx_index;/* init mypo{} main_conf's */if (module->init_main_conf) {rv = module->init_main_conf(cf, ctx->main_conf[mi]);if (rv != NGX_CONF_OK) {goto failed;}}rv = ngx_mypo_merge_servers(cf, cmcf, module, mi);if (rv != NGX_CONF_OK) {goto failed;}}/* create location trees */for (s = 0; s < cmcf->servers.nelts; s++) {clcf = cscfp[s]->ctx->loc_conf[ngx_mypo_core_module.ctx_index];if (ngx_mypo_init_locations(cf, cscfp[s], clcf) != NGX_OK) {return NGX_CONF_ERROR;}if (ngx_mypo_init_static_location_trees(cf, clcf) != NGX_OK) {return NGX_CONF_ERROR;}}for (m = 0; ngx_modules[m]; m++) {if (ngx_modules[m]->type != NGX_MYPO_MODULE) {continue;}module = ngx_modules[m]->ctx;if (module->postconfiguration) {if (module->postconfiguration(cf) != NGX_OK) {return NGX_CONF_ERROR;}}}*cf = pcf;/* optimize the lists of ports, addresses and server names */if (ngx_mypo_optimize_servers(cf, cmcf, cmcf->ports) != NGX_OK) {return NGX_CONF_ERROR;}return NGX_CONF_OK;failed:*cf = pcf;return rv;
}
ngx_mypo_core_module.c:
//server级别的配置
static ngx_command_t ngx_mypo_core_commands[] = {{ ngx_string("server"),NGX_MYPO_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS,ngx_mypo_core_server,0,0,NULL },
//代理监听的IP端口,调用ngx_mypo_core_listen函数解析listen行的数据,
//NGX_CONF_1MORE标识标识可以带多个参数{ ngx_string("listen"),NGX_MYPO_SRV_CONF|NGX_CONF_1MORE,ngx_mypo_core_listen,NGX_MYPO_SRV_CONF_OFFSET,0,NULL },
//upstream配置的是上流服务器的IP端口,调用ngx_mypo_core_upstream函数解析数据。{ ngx_string("upstream"),NGX_MYPO_SRV_CONF|NGX_CONF_1MORE,ngx_mypo_core_upstream,NGX_MYPO_SRV_CONF_OFFSET,0,NULL },ngx_null_command};//子模块的一下初始化,合并配置的函数声明
static ngx_mypo_module_t ngx_mypo_core_module_ctx = {ngx_mypo_core_preconfiguration, /* preconfiguration */NULL, /* postconfiguration */ngx_mypo_core_create_main_conf, /* create main configuration */ngx_mypo_core_init_main_conf, /* init main configuration */ngx_mypo_core_create_srv_conf, /* create server configuration */ngx_mypo_core_merge_srv_conf, /* merge server configuration */ngx_mypo_core_create_loc_conf, /* create location configuration */ngx_mypo_core_merge_loc_conf /* merge location configuration */
};//mypo子模块核心模块的入口结构体
ngx_module_t ngx_mypo_core_module = {NGX_MODULE_V1,&ngx_mypo_core_module_ctx, /* module context */ngx_mypo_core_commands, /* module directives */NGX_MYPO_MODULE, /* module type */NULL, /* init master */NULL, /* init module */NULL, /* init process */NULL, /* init thread */NULL, /* exit thread */NULL, /* exit process */NULL, /* exit master */NGX_MODULE_V1_PADDING
};
static char *
ngx_mypo_core_server(ngx_conf_t *cf, ngx_command_t *cmd, void *dummy)
{char *rv;void *mconf;ngx_uint_t i;ngx_conf_t pcf;ngx_mypo_module_t *module;struct sockaddr_in *sin;ngx_mypo_conf_ctx_t *ctx, *mypo_ctx;ngx_mypo_listen_opt_t lsopt;ngx_mypo_core_srv_conf_t *cscf, **cscfp;ngx_mypo_core_main_conf_t *cmcf;ctx = ngx_pcalloc(cf->pool, sizeof(ngx_mypo_conf_ctx_t));if (ctx == NULL) {return NGX_CONF_ERROR;}mypo_ctx = cf->ctx;ctx->main_conf = mypo_ctx->main_conf;/* the server{}'s srv_conf */ctx->srv_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_mypo_max_module);if (ctx->srv_conf == NULL) {return NGX_CONF_ERROR;}/* the server{}'s loc_conf */ctx->loc_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_mypo_max_module);if (ctx->loc_conf == NULL) {return NGX_CONF_ERROR;}for (i = 0; ngx_modules[i]; i++) {if (ngx_modules[i]->type != NGX_MYPO_MODULE) {continue;}module = ngx_modules[i]->ctx;if (module->create_srv_conf) {mconf = module->create_srv_conf(cf);if (mconf == NULL) {return NGX_CONF_ERROR;}ctx->srv_conf[ngx_modules[i]->ctx_index] = mconf;}if (module->create_loc_conf) {mconf = module->create_loc_conf(cf);if (mconf == NULL) {return NGX_CONF_ERROR;}ctx->loc_conf[ngx_modules[i]->ctx_index] = mconf;}}/* the server configuration context */cscf = ctx->srv_conf[ngx_mypo_core_module.ctx_index];cscf->ctx = ctx;cmcf = ctx->main_conf[ngx_mypo_core_module.ctx_index];cscfp = ngx_array_push(&cmcf->servers);if (cscfp == NULL) {return NGX_CONF_ERROR;}*cscfp = cscf;/* parse inside server{} */pcf = *cf;cf->ctx = ctx;cf->cmd_type = NGX_MYPO_SRV_CONF;rv = ngx_conf_parse(cf, NULL);*cf = pcf;if (rv == NGX_CONF_OK && !cscf->listen) {ngx_memzero(&lsopt, sizeof(ngx_mypo_listen_opt_t));sin = &lsopt.u.sockaddr_in;sin->sin_family = AF_INET;sin->sin_port = htons((getuid() == 0) ? 80 : 8000);sin->sin_addr.s_addr = INADDR_ANY;lsopt.socklen = sizeof(struct sockaddr_in);lsopt.backlog = NGX_LISTEN_BACKLOG;lsopt.rcvbuf = -1;lsopt.sndbuf = -1;lsopt.wildcard = 1;(void) ngx_sock_ntop(&lsopt.u.sockaddr, lsopt.socklen, lsopt.addr,NGX_SOCKADDR_STRLEN, 1);if (ngx_mypo_add_listen(cf, cscf, &lsopt) != NGX_OK) {return NGX_CONF_ERROR;}}return rv;
}
函数ngx_mypo_core_listen定义:
static char *
ngx_mypo_core_listen(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{ngx_mypo_core_srv_conf_t *cscf = conf;ngx_str_t *value /*, size*/;ngx_url_t u;//ngx_uint_t n;ngx_mypo_listen_opt_t lsopt;cscf->listen = 1;value = cf->args->elts;ngx_memzero(&u, sizeof(ngx_url_t));u.url = value[1];u.listen = 1;u.default_port = 80;//解析IP端口主要是调用nginx里的内部函数ngx_parse_url进行解析。if (ngx_parse_url(cf->pool, &u) != NGX_OK) {if (u.err) {ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,"%s in \"%V\" of the \"listen\" directive",u.err, &u.url);}return NGX_CONF_ERROR;}ngx_memzero(&lsopt, sizeof(ngx_mypo_listen_opt_t));ngx_memcpy(&lsopt.u.sockaddr, u.sockaddr, u.socklen);lsopt.socklen = u.socklen;lsopt.backlog = NGX_LISTEN_BACKLOG;lsopt.rcvbuf = -1;lsopt.sndbuf = -1;lsopt.wildcard = u.wildcard;
#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)lsopt.ipv6only = 1;
#endif(void) ngx_sock_ntop(&lsopt.u.sockaddr, lsopt.socklen, lsopt.addr,NGX_SOCKADDR_STRLEN, 1);//保存监听IP端口的列表到conf中,conf是底层调用时传过来的全局自定义变量,//全局配置数据都保存到这里if (ngx_mypo_add_listen(cf, cscf, &lsopt) == NGX_OK) {return NGX_CONF_OK;}return NGX_CONF_ERROR;
}
函数ngx_mypo_core_upstream的定义:
static char *
ngx_mypo_core_upstream(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{ngx_mypo_core_srv_conf_t *cscf = conf;ngx_str_t *value /*, size*/;ngx_url_t u;value = cf->args->elts;ngx_memzero(&u, sizeof(ngx_url_t));u.url = value[1];u.listen = 1;u.default_port = 80;if (ngx_parse_url(cf->pool, &u) != NGX_OK) {if (u.err) {ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,"%s in \"%V\" of the \"upstream\" directive",u.err, &u.url);}return NGX_CONF_ERROR;}ngx_memzero(&(cscf->upstream_addr), sizeof(cscf->upstream_addr));//解析创建出来的上流服务器IP端口保存到全局的配置ngx_memcpy(&(cscf->upstream_addr), u.sockaddr, u.socklen);return NGX_CONF_OK;
}
第三步,开始初始化服务,也就是开始绑定端口,监听端口
从函数ngx_mypo_optimize_servers开始:
static ngx_int_t
ngx_mypo_optimize_servers(ngx_conf_t *cf, ngx_mypo_core_main_conf_t *cmcf,ngx_array_t *ports)
{ngx_uint_t p, a;ngx_mypo_conf_port_t *port;ngx_mypo_conf_addr_t *addr;if (ports == NULL) {return NGX_OK;}//这里的架构基本都是全套用http的,然后删掉一下http业务和mypo业务不同的部分。port = ports->elts;for (p = 0; p < ports->nelts; p++) {ngx_sort(port[p].addrs.elts, (size_t) port[p].addrs.nelts,sizeof(ngx_mypo_conf_addr_t), ngx_mypo_cmp_conf_addrs);/** check whether all name-based servers have the same* configuration as a default server for given address:port*/addr = port[p].addrs.elts;for (a = 0; a < port[p].addrs.nelts; a++) {if (addr[a].servers.nelts > 1){if (ngx_mypo_server_names(cf, cmcf, &addr[a]) != NGX_OK) {return NGX_ERROR;}}}if (ngx_mypo_init_listening(cf, &port[p]) != NGX_OK) {return NGX_ERROR;}}return NGX_OK;
}
函数ngx_mypo_init_listening里调用ngx_mypo_add_listening最终调用nginx的API创建非阻塞sock:
static ngx_listening_t *
ngx_mypo_add_listening(ngx_conf_t *cf, ngx_mypo_conf_addr_t *addr)
{ngx_listening_t *ls;ngx_mypo_core_loc_conf_t *clcf;ngx_mypo_core_srv_conf_t *cscf;ls = ngx_create_listening(cf, &addr->opt.u.sockaddr, addr->opt.socklen);if (ls == NULL) {return NULL;}ls->addr_ntop = 1;//开发服务器时的基本步骤就是bind->listen->accept,bind和listen都是按照流程//走下来不会有阻塞的了,而accept什么时候调用到,取决于客户端何时有连接才会返回socket//进行读写的,这一步是交给nginx底层的event来处理的才能发挥出nginx的高效性,上层要做的//就是把处理socket的方法传给nginx底层,传递步骤就是给ngx_create_listening返回//结构体的handler成员赋值:ls->handler = ngx_mypo_init_connection;cscf = addr->default_server;cscf->connection_pool_size = 256;cscf->client_header_timeout = 60000;ls->pool_size = cscf->connection_pool_size;ls->post_accept_timeout = cscf->client_header_timeout;clcf = cscf->ctx->loc_conf[ngx_mypo_core_module.ctx_index];//这里的error_log是不能为空的,否则在nginx底层调用初始化的时候就会crash掉了if (clcf->error_log == NULL) {clcf->error_log = &cf->cycle->new_log;}ls->logp = clcf->error_log;ls->log.data = &ls->addr_text;//ls->log.handler = ngx_accept_log_error;ls->log.handler = NULL;ls->backlog = addr->opt.backlog;ls->rcvbuf = addr->opt.rcvbuf;ls->sndbuf = addr->opt.sndbuf;ls->keepalive = addr->opt.so_keepalive;#if (NGX_HAVE_KEEPALIVE_TUNABLE)ls->keepidle = addr->opt.tcp_keepidle;ls->keepintvl = addr->opt.tcp_keepintvl;ls->keepcnt = addr->opt.tcp_keepcnt;
#endifreturn ls;
}
第四步,给各个连接(connection)设置业务处理
ngx_create_listening返回的参数里的handler成员赋值ngx_mypo_init_connection:
void
ngx_mypo_init_connection(ngx_connection_t *c)
{ngx_uint_t i;ngx_event_t *rev;struct sockaddr_in *sin;ngx_mypo_port_t *port;ngx_mypo_in_addr_t *addr;ngx_mypo_log_ctx_t *ctx;
#if (NGX_HAVE_INET6)struct sockaddr_in6 *sin6;ngx_mypo_in6_addr_t *addr6;
#endifport = c->listening->servers;if (port->naddrs > 1) {/** there are several addresses on this port and one of them* is an "*:port" wildcard so getsockname() in ngx_mypo_server_addr()* is required to determine a server address*/if (ngx_connection_local_sockaddr(c, NULL, 0) != NGX_OK) {ngx_mypo_close_connection(c);return;}switch (c->local_sockaddr->sa_family) {#if (NGX_HAVE_INET6)case AF_INET6:sin6 = (struct sockaddr_in6 *) c->local_sockaddr;addr6 = port->addrs;/* the last address is "*" */for (i = 0; i < port->naddrs - 1; i++) {if (ngx_memcmp(&addr6[i].addr6, &sin6->sin6_addr, 16) == 0) {break;}}break;
#endifdefault: /* AF_INET */sin = (struct sockaddr_in *) c->local_sockaddr;addr = port->addrs;/* the last address is "*" */for (i = 0; i < port->naddrs - 1; i++) {if (addr[i].addr == sin->sin_addr.s_addr) {break;}}// add core module configure to the connectionc->data = &addr[i].conf;break;}} else {switch (c->local_sockaddr->sa_family) {#if (NGX_HAVE_INET6)case AF_INET6:addr6 = port->addrs;//hc->addr_conf = &addr6[0].conf;break;
#endifdefault: /* AF_INET */addr = port->addrs;//hc->addr_conf = &addr[0].conf;c->data = &addr[0].conf;break;}}/* the default server configuration for the address:port *///hc->conf_ctx = hc->addr_conf->default_server->ctx;ctx = ngx_palloc(c->pool, sizeof(ngx_mypo_log_ctx_t));if (ctx == NULL) {ngx_mypo_close_connection(c);return;}ctx->connection = c;ctx->request = NULL;ctx->current_request = NULL;c->log->connection = c->number;c->log->handler = NULL/*ngx_mypo_log_error*/;c->log->data = ctx;c->log->action = "waiting for request";c->log_error = NGX_ERROR_INFO;rev = c->read;//accept到客户端的连接后就是数据的读写了,这又是一步阻塞的过程,需要让nginx底层//把它变成非阻塞,基本不用自己处理的了,同样只需要把读写数据的处理方法传递给nginx//底层,让nginx底层来调用:rev->handler = ngx_mypo_wait_request_handler;c->write->handler = ngx_mypo_empty_handler;if (rev->ready) {/* the deferred accept(), rtsig, aio, iocp */if (ngx_use_accept_mutex) {ngx_post_event(rev, &ngx_posted_events);return;}rev->handler(rev);return;}ngx_add_timer(rev, c->listening->post_accept_timeout);ngx_reusable_connection(c, 1);if (ngx_handle_read_event(rev, 0) != NGX_OK) {ngx_mypo_close_connection(c);return;}
}
接收数据的处理接口实现:
rev->handler = ngx_mypo_wait_request_handler;c->write->handler = ngx_mypo_empty_handler;
ngx_mypo_wait_request_handler:
static void
ngx_mypo_wait_request_handler(ngx_event_t *rev)
{//u_char *p;size_t size;ssize_t n;ngx_buf_t *b;ngx_connection_t *c;//接收到来自底层的网络数据消息,Nginx是事件驱动的,所以一切信息都封装到ngx_event_t里去。//想要读取数据就先获取到connection:c = rev->data;if (rev->timedout) {ngx_mypo_close_connection(c);return;}if (c->close) {ngx_mypo_close_connection(c);return;}//这里接收数据的大小应该根据协议的规定,解析和读取,这里只是为了方便固定设置成12而已。size = 12;b = c->buffer;if (b == NULL) {b = ngx_create_temp_buf(c->pool, size);if (b == NULL) {ngx_mypo_close_connection(c);return;}c->buffer = b;} else if (b->start == NULL) {b->start = ngx_palloc(c->pool, size);if (b->start == NULL) {ngx_mypo_close_connection(c);return;}b->pos = b->start;b->last = b->start;b->end = b->last + size;}//从这里就开始读取客户端的请求数据了n = c->recv(c, b->last, size);if (n == NGX_AGAIN) {if (!rev->timer_set) {ngx_add_timer(rev, c->listening->post_accept_timeout);ngx_reusable_connection(c, 1);}if (ngx_handle_read_event(rev, 0) != NGX_OK) {ngx_mypo_close_connection(c);return;}/** We are trying to not hold c->buffer's memory for an idle connection.*/if (ngx_pfree(c->pool, b->start) == NGX_OK) {b->start = NULL;}return;}if (n == NGX_ERROR) {ngx_mypo_close_connection(c);return;}if (n == 0) {ngx_log_error(NGX_LOG_INFO, c->log, 0,"client closed connection");ngx_mypo_close_connection(c);return;}b->last += n;c->log->action = "reading client request line";ngx_reusable_connection(c, 0);//create the upstream and send data to the upstream server//保存客户端的数据,然后创建上流服务器的连接,把数据转发给上流服务器。ngx_mypo_upstream_init(rev);//send data to client
/*//如果服务器不做代理转发,而是直接返回数据到客户端,可以在这里进行处理。ngx_chain_t out[1];b = ngx_pcalloc(c->pool, sizeof(ngx_buf_t));out[0].buf = b;out[0].next = NULL;b->pos = (u_char*)"hello_world, from Nginx";b->last = b->pos + sizeof("hello_world, from Nginx") - 1;b->memory = 1;ngx_mypo_send(rev, out, 0);
*/}
ngx_mypo_upstream_init:
static int ngx_mypo_upstream_init(ngx_event_t *rev)
{size_t size;ngx_buf_t *b;ngx_connection_t *c;ngx_int_t rc;ngx_mypo_addr_conf_t *addr_conf;ngx_mypo_core_srv_conf_t *core_conf;c = rev->data;addr_conf = c->data;core_conf = addr_conf->default_server;if (rev->timedout) {ngx_mypo_close_connection(c);return -1;}if (c->close) {ngx_mypo_close_connection(c);return -1;}//size = cscf->client_header_buffer_size;size = 16;b = c->buffer;ngx_peer_connection_t *peer = ngx_palloc(c->pool, sizeof(ngx_peer_connection_t));if (NULL == peer){ngx_mypo_close_connection(c);return -1;}//赋值上流服务器的IP和端口,然后连接上流服务器。peer->sockaddr = &(core_conf->upstream_addr);peer->socklen = sizeof(core_conf->upstream_addr);peer->name = NULL/*&ahcf->peer->name*/;peer->get = ngx_event_get_peer;peer->log = c->log;peer->log_error = NGX_ERROR_ERR;rc = ngx_event_connect_peer(peer);if (rc == NGX_ERROR || rc == NGX_BUSY || rc == NGX_DECLINED) {if (peer->connection) {ngx_close_connection(peer->connection);}ngx_close_connection(c);return -1;}//接收到上流服务器返回的数据,需要把数据回传给客户端,但是不管上流//服务器的连接还是客户端的连接,都是底层传过来的,如果自定义一个//全局变量来保存这些连接并且进行管理,也是可以,但维护起来就会很//繁琐,这里可以使用ngx_connection_t结构体里的data字段,它是//void*类型的,专门用来保存自定义类型的变量,自己可以把所需要的//业务信息封装成一个结构体,然后分配内存赋值给data,由于这里业务//简单,直接使用Nginx本身的ngx_event_t就足够了。peer->connection->data = rev;peer->connection->pool = c->pool;//指定与上流服务器读写数据事件的方法peer->connection->read->handler = ngx_mypo_upstream_rec_data;peer->connection->write->handler = ngx_mypo_upstream_send_data;return 0;
}
ngx_mypo_upstream_rec_data:
static void ngx_mypo_upstream_rec_data(ngx_event_t *rev)
{size_t size;ssize_t n;ngx_buf_t *b;ngx_connection_t *sc, *cc;ngx_event_t *cevent;//首先获取服务器和客户端的connection:sc = rev->data;cevent = sc->data;cc = cevent->data;if (rev->timedout) {ngx_mypo_close_connection(sc);return ;}if (sc->close) {ngx_mypo_close_connection(sc);return ;}size = 16;b = sc->buffer;if (b == NULL) {b = ngx_create_temp_buf(sc->pool, size);if (b == NULL) {ngx_mypo_close_connection(sc);return;}sc->buffer = b;} else if (b->start == NULL) {b->start = ngx_palloc(sc->pool, size);if (b->start == NULL) {ngx_mypo_close_connection(sc);return ;}b->pos = b->start;b->last = b->start;b->end = b->last + size;}n = sc->recv(sc, b->last, size);if (n == NGX_AGAIN) {if (!rev->timer_set) {ngx_add_timer(rev, sc->listening->post_accept_timeout);ngx_reusable_connection(sc, 1);}if (ngx_handle_read_event(rev, 0) != NGX_OK) {ngx_mypo_close_connection(sc);return ;}/** We are trying to not hold c->buffer's memory for an idle connection.*/if (ngx_pfree(sc->pool, b->start) == NGX_OK) {b->start = NULL;}return;}if (n == NGX_ERROR) {ngx_mypo_close_connection(sc);return;}if (n == 0) {ngx_mypo_close_connection(sc);return ;}b->last += n;sc->log->action = "reading client request line";ngx_reusable_connection(sc, 0);ngx_chain_t out[1];//b = ngx_pcalloc(sc->pool, sizeof(ngx_buf_t));out[0].buf = b;out[0].next = NULL;//b->pos = (u_char*)"hello_world, from Nginx";//b->last = b->pos + sizeof("hello_world, from Nginx") - 1;//b->memory = 1;ngx_mypo_send(cevent, out, 0);ngx_mypo_close_connection(cc);return;
}
方法ngx_mypo_upstream_send_data的定义:
static void ngx_mypo_upstream_send_data(ngx_event_t *rev)
{ngx_buf_t *b;ngx_connection_t *sc, *cc;ngx_event_t *cevent;//一旦connect上流服务器成功,可以发送数据的时候就会调用到这个方法。sc = rev->data;cevent = sc->data;cc = cevent->data;b = cc->buffer;//客户端接收到数据以后已经封装到buffer中了,现在只需要把它读出来,转发给上流服务器。ngx_chain_t out[1];out[0].buf = b;out[0].next = NULL;ngx_mypo_send(rev, out, 0);return;
}
方法ngx_mypo_send:
static void ngx_mypo_send(ngx_event_t *wev, ngx_chain_t *in, off_t limit)
{ngx_connection_t *cc;ngx_chain_t *cl;cc = wev->data;if (cc->destroyed) {return;}if (wev->timedout) {ngx_log_error(NGX_LOG_INFO, cc->log, NGX_ETIMEDOUT,"netcall: client send timed out");cc->timedout = 1;ngx_mypo_close_connection(cc);return;}if (wev->timer_set) {ngx_del_timer(wev);}cl = cc->send_chain(cc, in, limit);if (cl == NGX_CHAIN_ERROR) {ngx_mypo_close_connection(cc);return;}/* more data to send? */if (cl) {ngx_add_timer(wev, 10000);if (ngx_handle_write_event(wev, 0) != NGX_OK) {ngx_mypo_close_connection(cc);}return;}/* we've sent everything we had.* now receive reply */ngx_del_event(wev, NGX_WRITE_EVENT, 0);}
第五步,把自定义模块编译进nginx
有两种方法:
第一种修改ojbs目录下的makefile和ngx_modules.c文件,以http模块为模板,修改成mypo。
ngx_modules.c:
//声明外部变量
extern ngx_module_t ngx_mypo_module;
extern ngx_module_t ngx_mypo_core_module;ngx_module_t *ngx_modules[] = {
......&ngx_mypo_module,&ngx_mypo_core_module,NULL
};
makefile:
#"..."是Makefile原来的配置,由于mypo模块是放到nginx目录里的src下,
#所以路径是"./src/mypo/",如果是其他模块,可以把该模块的绝对路径替换"./src/mypo/"即可。
CFLAGS = ... -I./src/mypo/ADDON_DEPS = ... ./src/mypo/ngx_mypo_core_module.h ./src/mypo/ngx_mypo_config.h ./src/mypo/ngx_mypo.h objs/nginx: ... \objs/addon/mypo/ngx_mypo.o \objs/addon/mypo/ngx_mypo_core_module.o \...$(LINK) -o ... \objs/addon/mypo/ngx_mypo.o \objs/addon/mypo/ngx_mypo_core_module.o \...objs/addon/mypo/ngx_mypo.o: $(ADDON_DEPS) \./src/mypo//ngx_mypo.c$(CC) -c $(CFLAGS) $(ALL_INCS) \-o objs/addon/mypo/ngx_mypo.o \./src/mypo/ngx_mypo.cobjs/addon/mypo/ngx_mypo_core_module.o: $(ADDON_DEPS) \./src/mypo//ngx_mypo_core_module.c$(CC) -c $(CFLAGS) $(ALL_INCS) \-o objs/addon/mypo/ngx_mypo_core_module.o \./src/mypo/ngx_mypo_core_module.c
第二种,在mypo目录下添加config文件,编译nginx前configure的时候加入mypo模块的目录到–add-module选项,这种方法简单,但之前configure没有保存的话,之前的编译配置都会被清掉,所以不要轻易地configure。
configure文件的内容:
ngx_addon_name="ngx_mypo_module"CORE_MODULES="$CORE_MODULESngx_mypo_module \ngx_mypo_core_module \"NGX_ADDON_DEPS="$NGX_ADDON_DEPS \$ngx_addon_dir/ngx_mypo_core_module.h \$ngx_addon_dir/ngx_mypo_config.h \$ngx_addon_dir/ngx_mypo.h \"NGX_ADDON_SRCS="$NGX_ADDON_SRCS \$ngx_addon_dir/ngx_mypo.c \$ngx_addon_dir/ngx_mypo_core_module.c \"
CFLAGS="$CFLAGS -I$ngx_addon_dir"
最后归纳一下调用到的nginx API:
void *ngx_pcalloc(ngx_pool_t *pool, size_t size);
char *ngx_conf_parse(ngx_conf_t *cf, ngx_str_t *filename);
void *ngx_array_push(ngx_array_t *a);size_t
ngx_sock_ntop(struct sockaddr *sa, socklen_t socklen, u_char *text, size_t len,ngx_uint_t port);#define ngx_memzero(buf, n) (void) memset(buf, 0, n)ngx_array_t *
ngx_array_create(ngx_pool_t *p, ngx_uint_t n, size_t size);static ngx_inline ngx_int_t
ngx_array_init(ngx_array_t *array, ngx_pool_t *pool, ngx_uint_t n, size_t size);ngx_int_t
ngx_parse_url(ngx_pool_t *pool, ngx_url_t *u);ngx_listening_t *
ngx_create_listening(ngx_conf_t *cf, void *sockaddr, socklen_t socklen);ngx_int_t
ngx_connection_local_sockaddr(ngx_connection_t *c, ngx_str_t *s,ngx_uint_t port);void
ngx_close_connection(ngx_connection_t *c);ngx_buf_t *
ngx_create_temp_buf(ngx_pool_t *pool, size_t size);void
ngx_reusable_connection(ngx_connection_t *c, ngx_uint_t reusable);ngx_int_t
ngx_handle_read_event(ngx_event_t *rev, ngx_uint_t flags);ngx_int_t
ngx_event_connect_peer(ngx_peer_connection_t *pc);
代码看多了,就会觉得,再庞大的系统如Linux内核、rtos之类的,核心的就几个,其他的都是按照那几个核心模板复制出来的。不用自己想的,直接套模板,跟着代码节奏去走。就好像开车那样,开到一定境界,不用特意去想怎么加油、挂当,车自己就会走了,更高的境界就是自己去改装车子了,让车子变成自己的手和脚。
顺便也谈谈这些年心得吧,其实做网络开发,阅读服务端代码,不管是nginx还是Apache,还是其他的abcd系统,只要是网络服务器的,就一定不会逃过以下的几步:
1、创建socket;
2、绑定(bind)地址和端口号;
3、监听(listen)端口;(如果是udp,这步就跳过了)
4、接收(accept)客户端链接;(udp这步也跳过)
5、读写数据,业务处理。这里能够调用的函数种类就很多了,有read/write、rec/send、recfrom/sendto等等;
这几步说是各个网络服务类代码的主干,其他都是旁枝末节,如果看代码的时候在旁枝末节上迷了路,可以回到主干上寻找突破口。
Nginx服务模块开发相关推荐
- 《深入理解Nginx:模块开发与架构解析》一1.2 为什么选择Nginx
1.2 为什么选择Nginx 为什么选择Nginx?因为它具有以下特点: (1)更快 这表现在两个方面:一方面,在正常情况下,单次请求会得到更快的响应:另一方面,在高峰期(如有数以万计的并发请求),N ...
- 《深入理解Nginx:模块开发与架构解析》一1.6 Nginx的命令行控制
1.6 Nginx的命令行控制 在Linux中,需要使用命令行来控制Nginx服务器的启动与停止.重载配置文件.回滚日志文件.平滑升级等行为.默认情况下,Nginx被安装在目录/usr/local/n ...
- nginx的模块开发
nginx刚刚在国内开始流行的时候, 我就把它引入公司技术体系,用来替代apache主要做动静分离. nginx的并发处理能力和稳定性,以及优秀的软件架构深深得吸引了我, 让我跨入了高性能服务器开发的 ...
- 《深入理解Nginx:模块开发与架构解析》一3.3 如何将自己的HTTP模块编译进Nginx...
3.3 如何将自己的HTTP模块编译进Nginx Nginx提供了一种简单的方式将第三方的模块编译到Nginx中.首先把源代码文件全部放到一个目录下,同时在该目录中编写一个文件用于通知Nginx如何编 ...
- 推荐我的新书《深入理解Nginx:模块开发与架构解析》
http://www.china-pub.com/STATIC/zt_mb/zt_huodong_2013_3.asp?filename=2013_jsj_nginx_20130401 目录 < ...
- Nginx模块开发入门
前言 Nginx是当前最流行的HTTP Server之一,根据W3Techs的统计,目前世界排名(根据Alexa)前100万的网站中,Nginx的占有率为6.8%.与Apache相比,Nginx在高并 ...
- 深入理解Nginx 模块开发与架构解析-陶辉 读书笔记
前言 1. nginx是一个优秀的事件驱动框架,nginx非常适合开发在传输层以TCP对外提供服务的服务器程序.基于nginx框架开发程序有5个优势: * nginx将网络.磁盘及定时器等异步事件的驱 ...
- Nginx 模块开发
Nginx 模块概述 Nginx 模块有三种角色: 处理请求并产生输出的 Handler 模块: 处理由 Handler 产生的输出的 Filter(滤波器)模块: 当出现多个后台服务器时,Load- ...
- 《深入理解NGINX 模块开发与架构解析》之摘抄学习
1.基于Nginx框架开发程序有5个优势: (1).Nginx将网络.磁盘及定时器等异步事件的驱动都做了非常好的封装,基于它开发将可以忽略这些事件处理的细节; (2).Nginx封装了许多平台无关的接 ...
最新文章
- Sql高级查询(三)
- 运行错误5无效的过程调用或参数_FANUC系统常用参数汇总
- pytorch 图像增强
- 工作队列 ( workqueue )
- 程序员自学路上的一些感悟
- lucene 多字段查询-MultiFieldQueryParser
- Ubuntu18.04/16.04 安装glog
- 《当程序员的那些狗日日子》三
- 【第三课】Arcgis软件详细介绍
- springboot Could not resolve placeholder
- Word 2019如何从任意页开始设置页码?
- 解读Android日志
- 新建一个工作空间,复制原来工作空间的配置
- 质数的判断(Python)找到100(1000)以内的所有质数
- Yoshua Bengio——《Deep Learning》学习笔记1
- audio音频播放标签样式优化自定义
- 20个月股票投资复盘:在被割韭菜中成长
- 软件开发常用工具和网站
- 流体力学控制方程——能量方程
- RHEL 7.1 系统部署OpenStack kilo版本