纵观网上各种关于nginx的模块开发的资料都是基于HTTP的模块开发,这里就整理一篇是基于TCP协议的服务模块开发,并且给出一套自定义服务模块的源码,该模块主要实现的功能是监听某一端口,然后把接收到的客户端数据转发的上流服务器,再把上流服务器的返回数据回传给客户端。模块源码下载地址:https://code.csdn.net/gamer727/nginx_mypo_module
Nginx的服务模块一般都是以四个字母命名的,这里就命名为mypo模块(mypo取“my port”前四个字母)。本模块开发的nginx的版本为1.6.2

根据个人的研究整理,nginx的开发步骤大致归纳以下五步:
第一步,在配置文件中增加自定义配置;
第二步,根据配置文件的内容初始化各种参数;
第三步,开始初始化服务,也就是开始绑定端口,监听端口;
第四步,给各个连接(connection)设置业务处理,这里的业务处理的细节步骤就取决于需求的复杂度了;
第五步,把自定义模块编译进nginx。

第一步,在配置文件nginx.conf中增加自定义配置

mypo {server {listen 1025;upstream 127.0.0.1:1027;}}
配置文件参考

http的,配置文件一般分成三个等级:main、server、location,由于mypo只实现基本的转发功能,所以这里就两个配置,一个listen监听本地端口的,和一个upstream配置上流服务器的。

第二步,根据配置文件的内容初始化各种参数

这里开始进入源码,首先参考http模块新建mypo模块的核心文件:
ngx_mypo_config.h,定义mypo模块的主要结构体和模块标识;
ngx_mypo.c、ngx_mypo.h,mypo模块的入口,调用各种子模块;
ngx_mypo_core_module.c、ngx_mypo_core_module.h,子模块的核心,读取server级别的配置。

首先讲解一下全局配置头文件ngx_mypo_config.h的部分内容:

typedef struct {void        **main_conf;void        **srv_conf;void        **loc_conf;
} ngx_mypo_conf_ctx_t;typedef struct {ngx_int_t   (*preconfiguration)(ngx_conf_t *cf);ngx_int_t   (*postconfiguration)(ngx_conf_t *cf);void       *(*create_main_conf)(ngx_conf_t *cf);char       *(*init_main_conf)(ngx_conf_t *cf, void *conf);void       *(*create_srv_conf)(ngx_conf_t *cf);char       *(*merge_srv_conf)(ngx_conf_t *cf, void *prev, void *conf);void       *(*create_loc_conf)(ngx_conf_t *cf);char       *(*merge_loc_conf)(ngx_conf_t *cf, void *prev, void *conf);
} ngx_mypo_module_t;//定义mypo模块的标识,为了避免数值冲突和统一风格,数值是模块名的ASCII码
#define NGX_MYPO_MODULE           0x4D59504F   /* "MYPO" */

ngx_mypo.c:

//读取完配置文件后,如果匹配到了mypo{}就会调用这个结构体里赋值的
//ngx_mypo_block函数,可以说ngx_mypo_block是mypo的整体框架。
static ngx_command_t  ngx_mypo_commands[] = {{ ngx_string("mypo"),NGX_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS,ngx_mypo_block,0,0,NULL },ngx_null_command
};static ngx_core_module_t  ngx_mypo_module_ctx = {ngx_string("mypo"),NULL,NULL
};//服务模块入口的结构体,所有的服务模块入口结构体的ngx_module_t.type
//都是设置为NGX_CORE_MODULE的,子模块就是自定义的,如NGX_MYPO_MODULE。
ngx_module_t  ngx_mypo_module = {NGX_MODULE_V1,&ngx_mypo_module_ctx,                  /* module context */ngx_mypo_commands,                     /* module directives */NGX_CORE_MODULE,                       /* module type */NULL,                                  /* init master */NULL,                                  /* init module */NULL,                                  /* init process */NULL,                                  /* init thread */NULL,                                  /* exit thread */NULL,                                  /* exit process */NULL,                                  /* exit master */NGX_MODULE_V1_PADDING
};static char *
ngx_mypo_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{char                        *rv;ngx_uint_t                   mi, m, s;ngx_conf_t                   pcf;ngx_mypo_module_t           *module;ngx_mypo_conf_ctx_t         *ctx;ngx_mypo_core_loc_conf_t    *clcf;ngx_mypo_core_srv_conf_t   **cscfp;ngx_mypo_core_main_conf_t   *cmcf;/* the main mypo context */ctx = ngx_pcalloc(cf->pool, sizeof(ngx_mypo_conf_ctx_t));if (ctx == NULL) {return NGX_CONF_ERROR;}*(ngx_mypo_conf_ctx_t **) conf = ctx;/* count the number of the mypo modules and set up their indices */ngx_mypo_max_module = 0;for (m = 0; ngx_modules[m]; m++) {if (ngx_modules[m]->type != NGX_MYPO_MODULE) {continue;}ngx_modules[m]->ctx_index = ngx_mypo_max_module++;}/* the mypo main_conf context, it is the same in the all mypo contexts */ctx->main_conf = ngx_pcalloc(cf->pool,sizeof(void *) * ngx_mypo_max_module);if (ctx->main_conf == NULL) {return NGX_CONF_ERROR;}/** the mypo null srv_conf context, it is used to merge* the server{}s' srv_conf's*/ctx->srv_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_mypo_max_module);if (ctx->srv_conf == NULL) {return NGX_CONF_ERROR;}/** the mypo null loc_conf context, it is used to merge* the server{}s' loc_conf's*/ctx->loc_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_mypo_max_module);if (ctx->loc_conf == NULL) {return NGX_CONF_ERROR;}/** create the main_conf's, the null srv_conf's, and the null loc_conf's* of the all mypo modules*/for (m = 0; ngx_modules[m]; m++) {if (ngx_modules[m]->type != NGX_MYPO_MODULE) {continue;}module = ngx_modules[m]->ctx;mi = ngx_modules[m]->ctx_index;if (module->create_main_conf) {ctx->main_conf[mi] = module->create_main_conf(cf);if (ctx->main_conf[mi] == NULL) {return NGX_CONF_ERROR;}}if (module->create_srv_conf) {ctx->srv_conf[mi] = module->create_srv_conf(cf);if (ctx->srv_conf[mi] == NULL) {return NGX_CONF_ERROR;}}if (module->create_loc_conf) {ctx->loc_conf[mi] = module->create_loc_conf(cf);if (ctx->loc_conf[mi] == NULL) {return NGX_CONF_ERROR;}}}pcf = *cf;cf->ctx = ctx;for (m = 0; ngx_modules[m]; m++) {if (ngx_modules[m]->type != NGX_MYPO_MODULE) {continue;}module = ngx_modules[m]->ctx;if (module->preconfiguration) {if (module->preconfiguration(cf) != NGX_OK) {return NGX_CONF_ERROR;}}}/* parse inside the mypo{} block */cf->module_type = NGX_MYPO_MODULE;cf->cmd_type = NGX_MYPO_MAIN_CONF;rv = ngx_conf_parse(cf, NULL);if (rv != NGX_CONF_OK) {goto failed;}/** init mypo{} main_conf's, merge the server{}s' srv_conf's* and its location{}s' loc_conf's*/cmcf = ctx->main_conf[ngx_mypo_core_module.ctx_index];cscfp = cmcf->servers.elts;for (m = 0; ngx_modules[m]; m++) {if (ngx_modules[m]->type != NGX_MYPO_MODULE) {continue;}module = ngx_modules[m]->ctx;mi = ngx_modules[m]->ctx_index;/* init mypo{} main_conf's */if (module->init_main_conf) {rv = module->init_main_conf(cf, ctx->main_conf[mi]);if (rv != NGX_CONF_OK) {goto failed;}}rv = ngx_mypo_merge_servers(cf, cmcf, module, mi);if (rv != NGX_CONF_OK) {goto failed;}}/* create location trees */for (s = 0; s < cmcf->servers.nelts; s++) {clcf = cscfp[s]->ctx->loc_conf[ngx_mypo_core_module.ctx_index];if (ngx_mypo_init_locations(cf, cscfp[s], clcf) != NGX_OK) {return NGX_CONF_ERROR;}if (ngx_mypo_init_static_location_trees(cf, clcf) != NGX_OK) {return NGX_CONF_ERROR;}}for (m = 0; ngx_modules[m]; m++) {if (ngx_modules[m]->type != NGX_MYPO_MODULE) {continue;}module = ngx_modules[m]->ctx;if (module->postconfiguration) {if (module->postconfiguration(cf) != NGX_OK) {return NGX_CONF_ERROR;}}}*cf = pcf;/* optimize the lists of ports, addresses and server names */if (ngx_mypo_optimize_servers(cf, cmcf, cmcf->ports) != NGX_OK) {return NGX_CONF_ERROR;}return NGX_CONF_OK;failed:*cf = pcf;return rv;
}

ngx_mypo_core_module.c:

//server级别的配置
static ngx_command_t  ngx_mypo_core_commands[] = {{ ngx_string("server"),NGX_MYPO_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS,ngx_mypo_core_server,0,0,NULL },
//代理监听的IP端口,调用ngx_mypo_core_listen函数解析listen行的数据,
//NGX_CONF_1MORE标识标识可以带多个参数{ ngx_string("listen"),NGX_MYPO_SRV_CONF|NGX_CONF_1MORE,ngx_mypo_core_listen,NGX_MYPO_SRV_CONF_OFFSET,0,NULL },
//upstream配置的是上流服务器的IP端口,调用ngx_mypo_core_upstream函数解析数据。{ ngx_string("upstream"),NGX_MYPO_SRV_CONF|NGX_CONF_1MORE,ngx_mypo_core_upstream,NGX_MYPO_SRV_CONF_OFFSET,0,NULL },ngx_null_command};//子模块的一下初始化,合并配置的函数声明
static ngx_mypo_module_t  ngx_mypo_core_module_ctx = {ngx_mypo_core_preconfiguration,        /* preconfiguration */NULL,                                  /* postconfiguration */ngx_mypo_core_create_main_conf,        /* create main configuration */ngx_mypo_core_init_main_conf,          /* init main configuration */ngx_mypo_core_create_srv_conf,         /* create server configuration */ngx_mypo_core_merge_srv_conf,          /* merge server configuration */ngx_mypo_core_create_loc_conf,         /* create location configuration */ngx_mypo_core_merge_loc_conf           /* merge location configuration */
};//mypo子模块核心模块的入口结构体
ngx_module_t  ngx_mypo_core_module = {NGX_MODULE_V1,&ngx_mypo_core_module_ctx,             /* module context */ngx_mypo_core_commands,                /* module directives */NGX_MYPO_MODULE,                       /* module type */NULL,                                  /* init master */NULL,                                  /* init module */NULL,                                  /* init process */NULL,                                  /* init thread */NULL,                                  /* exit thread */NULL,                                  /* exit process */NULL,                                  /* exit master */NGX_MODULE_V1_PADDING
};
static char *
ngx_mypo_core_server(ngx_conf_t *cf, ngx_command_t *cmd, void *dummy)
{char                        *rv;void                        *mconf;ngx_uint_t                   i;ngx_conf_t                   pcf;ngx_mypo_module_t           *module;struct sockaddr_in          *sin;ngx_mypo_conf_ctx_t         *ctx, *mypo_ctx;ngx_mypo_listen_opt_t        lsopt;ngx_mypo_core_srv_conf_t    *cscf, **cscfp;ngx_mypo_core_main_conf_t   *cmcf;ctx = ngx_pcalloc(cf->pool, sizeof(ngx_mypo_conf_ctx_t));if (ctx == NULL) {return NGX_CONF_ERROR;}mypo_ctx = cf->ctx;ctx->main_conf = mypo_ctx->main_conf;/* the server{}'s srv_conf */ctx->srv_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_mypo_max_module);if (ctx->srv_conf == NULL) {return NGX_CONF_ERROR;}/* the server{}'s loc_conf */ctx->loc_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_mypo_max_module);if (ctx->loc_conf == NULL) {return NGX_CONF_ERROR;}for (i = 0; ngx_modules[i]; i++) {if (ngx_modules[i]->type != NGX_MYPO_MODULE) {continue;}module = ngx_modules[i]->ctx;if (module->create_srv_conf) {mconf = module->create_srv_conf(cf);if (mconf == NULL) {return NGX_CONF_ERROR;}ctx->srv_conf[ngx_modules[i]->ctx_index] = mconf;}if (module->create_loc_conf) {mconf = module->create_loc_conf(cf);if (mconf == NULL) {return NGX_CONF_ERROR;}ctx->loc_conf[ngx_modules[i]->ctx_index] = mconf;}}/* the server configuration context */cscf = ctx->srv_conf[ngx_mypo_core_module.ctx_index];cscf->ctx = ctx;cmcf = ctx->main_conf[ngx_mypo_core_module.ctx_index];cscfp = ngx_array_push(&cmcf->servers);if (cscfp == NULL) {return NGX_CONF_ERROR;}*cscfp = cscf;/* parse inside server{} */pcf = *cf;cf->ctx = ctx;cf->cmd_type = NGX_MYPO_SRV_CONF;rv = ngx_conf_parse(cf, NULL);*cf = pcf;if (rv == NGX_CONF_OK && !cscf->listen) {ngx_memzero(&lsopt, sizeof(ngx_mypo_listen_opt_t));sin = &lsopt.u.sockaddr_in;sin->sin_family = AF_INET;sin->sin_port = htons((getuid() == 0) ? 80 : 8000);sin->sin_addr.s_addr = INADDR_ANY;lsopt.socklen = sizeof(struct sockaddr_in);lsopt.backlog = NGX_LISTEN_BACKLOG;lsopt.rcvbuf = -1;lsopt.sndbuf = -1;lsopt.wildcard = 1;(void) ngx_sock_ntop(&lsopt.u.sockaddr, lsopt.socklen, lsopt.addr,NGX_SOCKADDR_STRLEN, 1);if (ngx_mypo_add_listen(cf, cscf, &lsopt) != NGX_OK) {return NGX_CONF_ERROR;}}return rv;
}

函数ngx_mypo_core_listen定义:


static char *
ngx_mypo_core_listen(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{ngx_mypo_core_srv_conf_t *cscf = conf;ngx_str_t              *value /*, size*/;ngx_url_t               u;//ngx_uint_t              n;ngx_mypo_listen_opt_t   lsopt;cscf->listen = 1;value = cf->args->elts;ngx_memzero(&u, sizeof(ngx_url_t));u.url = value[1];u.listen = 1;u.default_port = 80;//解析IP端口主要是调用nginx里的内部函数ngx_parse_url进行解析。if (ngx_parse_url(cf->pool, &u) != NGX_OK) {if (u.err) {ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,"%s in \"%V\" of the \"listen\" directive",u.err, &u.url);}return NGX_CONF_ERROR;}ngx_memzero(&lsopt, sizeof(ngx_mypo_listen_opt_t));ngx_memcpy(&lsopt.u.sockaddr, u.sockaddr, u.socklen);lsopt.socklen = u.socklen;lsopt.backlog = NGX_LISTEN_BACKLOG;lsopt.rcvbuf = -1;lsopt.sndbuf = -1;lsopt.wildcard = u.wildcard;
#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)lsopt.ipv6only = 1;
#endif(void) ngx_sock_ntop(&lsopt.u.sockaddr, lsopt.socklen, lsopt.addr,NGX_SOCKADDR_STRLEN, 1);//保存监听IP端口的列表到conf中,conf是底层调用时传过来的全局自定义变量,//全局配置数据都保存到这里if (ngx_mypo_add_listen(cf, cscf, &lsopt) == NGX_OK) {return NGX_CONF_OK;}return NGX_CONF_ERROR;
}

函数ngx_mypo_core_upstream的定义:


static char *
ngx_mypo_core_upstream(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{ngx_mypo_core_srv_conf_t *cscf = conf;ngx_str_t              *value /*, size*/;ngx_url_t               u;value = cf->args->elts;ngx_memzero(&u, sizeof(ngx_url_t));u.url = value[1];u.listen = 1;u.default_port = 80;if (ngx_parse_url(cf->pool, &u) != NGX_OK) {if (u.err) {ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,"%s in \"%V\" of the \"upstream\" directive",u.err, &u.url);}return NGX_CONF_ERROR;}ngx_memzero(&(cscf->upstream_addr), sizeof(cscf->upstream_addr));//解析创建出来的上流服务器IP端口保存到全局的配置ngx_memcpy(&(cscf->upstream_addr), u.sockaddr, u.socklen);return NGX_CONF_OK;
}

第三步,开始初始化服务,也就是开始绑定端口,监听端口

从函数ngx_mypo_optimize_servers开始:


static ngx_int_t
ngx_mypo_optimize_servers(ngx_conf_t *cf, ngx_mypo_core_main_conf_t *cmcf,ngx_array_t *ports)
{ngx_uint_t             p, a;ngx_mypo_conf_port_t  *port;ngx_mypo_conf_addr_t  *addr;if (ports == NULL) {return NGX_OK;}//这里的架构基本都是全套用http的,然后删掉一下http业务和mypo业务不同的部分。port = ports->elts;for (p = 0; p < ports->nelts; p++) {ngx_sort(port[p].addrs.elts, (size_t) port[p].addrs.nelts,sizeof(ngx_mypo_conf_addr_t), ngx_mypo_cmp_conf_addrs);/** check whether all name-based servers have the same* configuration as a default server for given address:port*/addr = port[p].addrs.elts;for (a = 0; a < port[p].addrs.nelts; a++) {if (addr[a].servers.nelts > 1){if (ngx_mypo_server_names(cf, cmcf, &addr[a]) != NGX_OK) {return NGX_ERROR;}}}if (ngx_mypo_init_listening(cf, &port[p]) != NGX_OK) {return NGX_ERROR;}}return NGX_OK;
}

函数ngx_mypo_init_listening里调用ngx_mypo_add_listening最终调用nginx的API创建非阻塞sock:


static ngx_listening_t *
ngx_mypo_add_listening(ngx_conf_t *cf, ngx_mypo_conf_addr_t *addr)
{ngx_listening_t           *ls;ngx_mypo_core_loc_conf_t  *clcf;ngx_mypo_core_srv_conf_t  *cscf;ls = ngx_create_listening(cf, &addr->opt.u.sockaddr, addr->opt.socklen);if (ls == NULL) {return NULL;}ls->addr_ntop = 1;//开发服务器时的基本步骤就是bind->listen->accept,bind和listen都是按照流程//走下来不会有阻塞的了,而accept什么时候调用到,取决于客户端何时有连接才会返回socket//进行读写的,这一步是交给nginx底层的event来处理的才能发挥出nginx的高效性,上层要做的//就是把处理socket的方法传给nginx底层,传递步骤就是给ngx_create_listening返回//结构体的handler成员赋值:ls->handler = ngx_mypo_init_connection;cscf = addr->default_server;cscf->connection_pool_size = 256;cscf->client_header_timeout = 60000;ls->pool_size = cscf->connection_pool_size;ls->post_accept_timeout = cscf->client_header_timeout;clcf = cscf->ctx->loc_conf[ngx_mypo_core_module.ctx_index];//这里的error_log是不能为空的,否则在nginx底层调用初始化的时候就会crash掉了if (clcf->error_log == NULL) {clcf->error_log = &cf->cycle->new_log;}ls->logp = clcf->error_log;ls->log.data = &ls->addr_text;//ls->log.handler = ngx_accept_log_error;ls->log.handler = NULL;ls->backlog = addr->opt.backlog;ls->rcvbuf = addr->opt.rcvbuf;ls->sndbuf = addr->opt.sndbuf;ls->keepalive = addr->opt.so_keepalive;#if (NGX_HAVE_KEEPALIVE_TUNABLE)ls->keepidle = addr->opt.tcp_keepidle;ls->keepintvl = addr->opt.tcp_keepintvl;ls->keepcnt = addr->opt.tcp_keepcnt;
#endifreturn ls;
}

第四步,给各个连接(connection)设置业务处理

ngx_create_listening返回的参数里的handler成员赋值ngx_mypo_init_connection:

void
ngx_mypo_init_connection(ngx_connection_t *c)
{ngx_uint_t              i;ngx_event_t            *rev;struct sockaddr_in     *sin;ngx_mypo_port_t        *port;ngx_mypo_in_addr_t     *addr;ngx_mypo_log_ctx_t     *ctx;
#if (NGX_HAVE_INET6)struct sockaddr_in6    *sin6;ngx_mypo_in6_addr_t    *addr6;
#endifport = c->listening->servers;if (port->naddrs > 1) {/** there are several addresses on this port and one of them* is an "*:port" wildcard so getsockname() in ngx_mypo_server_addr()* is required to determine a server address*/if (ngx_connection_local_sockaddr(c, NULL, 0) != NGX_OK) {ngx_mypo_close_connection(c);return;}switch (c->local_sockaddr->sa_family) {#if (NGX_HAVE_INET6)case AF_INET6:sin6 = (struct sockaddr_in6 *) c->local_sockaddr;addr6 = port->addrs;/* the last address is "*" */for (i = 0; i < port->naddrs - 1; i++) {if (ngx_memcmp(&addr6[i].addr6, &sin6->sin6_addr, 16) == 0) {break;}}break;
#endifdefault: /* AF_INET */sin = (struct sockaddr_in *) c->local_sockaddr;addr = port->addrs;/* the last address is "*" */for (i = 0; i < port->naddrs - 1; i++) {if (addr[i].addr == sin->sin_addr.s_addr) {break;}}// add core module configure to the connectionc->data = &addr[i].conf;break;}} else {switch (c->local_sockaddr->sa_family) {#if (NGX_HAVE_INET6)case AF_INET6:addr6 = port->addrs;//hc->addr_conf = &addr6[0].conf;break;
#endifdefault: /* AF_INET */addr = port->addrs;//hc->addr_conf = &addr[0].conf;c->data = &addr[0].conf;break;}}/* the default server configuration for the address:port *///hc->conf_ctx = hc->addr_conf->default_server->ctx;ctx = ngx_palloc(c->pool, sizeof(ngx_mypo_log_ctx_t));if (ctx == NULL) {ngx_mypo_close_connection(c);return;}ctx->connection = c;ctx->request = NULL;ctx->current_request = NULL;c->log->connection = c->number;c->log->handler = NULL/*ngx_mypo_log_error*/;c->log->data = ctx;c->log->action = "waiting for request";c->log_error = NGX_ERROR_INFO;rev = c->read;//accept到客户端的连接后就是数据的读写了,这又是一步阻塞的过程,需要让nginx底层//把它变成非阻塞,基本不用自己处理的了,同样只需要把读写数据的处理方法传递给nginx//底层,让nginx底层来调用:rev->handler = ngx_mypo_wait_request_handler;c->write->handler = ngx_mypo_empty_handler;if (rev->ready) {/* the deferred accept(), rtsig, aio, iocp */if (ngx_use_accept_mutex) {ngx_post_event(rev, &ngx_posted_events);return;}rev->handler(rev);return;}ngx_add_timer(rev, c->listening->post_accept_timeout);ngx_reusable_connection(c, 1);if (ngx_handle_read_event(rev, 0) != NGX_OK) {ngx_mypo_close_connection(c);return;}
}

接收数据的处理接口实现:

    rev->handler = ngx_mypo_wait_request_handler;c->write->handler = ngx_mypo_empty_handler;

ngx_mypo_wait_request_handler:


static void
ngx_mypo_wait_request_handler(ngx_event_t *rev)
{//u_char                    *p;size_t                     size;ssize_t                    n;ngx_buf_t                 *b;ngx_connection_t          *c;//接收到来自底层的网络数据消息,Nginx是事件驱动的,所以一切信息都封装到ngx_event_t里去。//想要读取数据就先获取到connection:c = rev->data;if (rev->timedout) {ngx_mypo_close_connection(c);return;}if (c->close) {ngx_mypo_close_connection(c);return;}//这里接收数据的大小应该根据协议的规定,解析和读取,这里只是为了方便固定设置成12而已。size = 12;b = c->buffer;if (b == NULL) {b = ngx_create_temp_buf(c->pool, size);if (b == NULL) {ngx_mypo_close_connection(c);return;}c->buffer = b;} else if (b->start == NULL) {b->start = ngx_palloc(c->pool, size);if (b->start == NULL) {ngx_mypo_close_connection(c);return;}b->pos = b->start;b->last = b->start;b->end = b->last + size;}//从这里就开始读取客户端的请求数据了n = c->recv(c, b->last, size);if (n == NGX_AGAIN) {if (!rev->timer_set) {ngx_add_timer(rev, c->listening->post_accept_timeout);ngx_reusable_connection(c, 1);}if (ngx_handle_read_event(rev, 0) != NGX_OK) {ngx_mypo_close_connection(c);return;}/** We are trying to not hold c->buffer's memory for an idle connection.*/if (ngx_pfree(c->pool, b->start) == NGX_OK) {b->start = NULL;}return;}if (n == NGX_ERROR) {ngx_mypo_close_connection(c);return;}if (n == 0) {ngx_log_error(NGX_LOG_INFO, c->log, 0,"client closed connection");ngx_mypo_close_connection(c);return;}b->last += n;c->log->action = "reading client request line";ngx_reusable_connection(c, 0);//create the upstream and send data to the upstream server//保存客户端的数据,然后创建上流服务器的连接,把数据转发给上流服务器。ngx_mypo_upstream_init(rev);//send data to client
/*//如果服务器不做代理转发,而是直接返回数据到客户端,可以在这里进行处理。ngx_chain_t out[1];b = ngx_pcalloc(c->pool, sizeof(ngx_buf_t));out[0].buf = b;out[0].next = NULL;b->pos = (u_char*)"hello_world, from Nginx";b->last = b->pos + sizeof("hello_world, from Nginx") - 1;b->memory = 1;ngx_mypo_send(rev, out, 0);
*/}

ngx_mypo_upstream_init:


static int ngx_mypo_upstream_init(ngx_event_t *rev)
{size_t                     size;ngx_buf_t                 *b;ngx_connection_t          *c;ngx_int_t                   rc;ngx_mypo_addr_conf_t      *addr_conf;ngx_mypo_core_srv_conf_t  *core_conf;c = rev->data;addr_conf = c->data;core_conf = addr_conf->default_server;if (rev->timedout) {ngx_mypo_close_connection(c);return -1;}if (c->close) {ngx_mypo_close_connection(c);return -1;}//size = cscf->client_header_buffer_size;size = 16;b = c->buffer;ngx_peer_connection_t *peer = ngx_palloc(c->pool, sizeof(ngx_peer_connection_t));if (NULL == peer){ngx_mypo_close_connection(c);return -1;}//赋值上流服务器的IP和端口,然后连接上流服务器。peer->sockaddr = &(core_conf->upstream_addr);peer->socklen = sizeof(core_conf->upstream_addr);peer->name = NULL/*&ahcf->peer->name*/;peer->get = ngx_event_get_peer;peer->log = c->log;peer->log_error = NGX_ERROR_ERR;rc = ngx_event_connect_peer(peer);if (rc == NGX_ERROR || rc == NGX_BUSY || rc == NGX_DECLINED) {if (peer->connection) {ngx_close_connection(peer->connection);}ngx_close_connection(c);return -1;}//接收到上流服务器返回的数据,需要把数据回传给客户端,但是不管上流//服务器的连接还是客户端的连接,都是底层传过来的,如果自定义一个//全局变量来保存这些连接并且进行管理,也是可以,但维护起来就会很//繁琐,这里可以使用ngx_connection_t结构体里的data字段,它是//void*类型的,专门用来保存自定义类型的变量,自己可以把所需要的//业务信息封装成一个结构体,然后分配内存赋值给data,由于这里业务//简单,直接使用Nginx本身的ngx_event_t就足够了。peer->connection->data = rev;peer->connection->pool = c->pool;//指定与上流服务器读写数据事件的方法peer->connection->read->handler = ngx_mypo_upstream_rec_data;peer->connection->write->handler = ngx_mypo_upstream_send_data;return 0;
}

ngx_mypo_upstream_rec_data:


static void ngx_mypo_upstream_rec_data(ngx_event_t *rev)
{size_t                     size;ssize_t                    n;ngx_buf_t                 *b;ngx_connection_t          *sc, *cc;ngx_event_t               *cevent;//首先获取服务器和客户端的connection:sc = rev->data;cevent = sc->data;cc = cevent->data;if (rev->timedout) {ngx_mypo_close_connection(sc);return ;}if (sc->close) {ngx_mypo_close_connection(sc);return ;}size = 16;b = sc->buffer;if (b == NULL) {b = ngx_create_temp_buf(sc->pool, size);if (b == NULL) {ngx_mypo_close_connection(sc);return;}sc->buffer = b;} else if (b->start == NULL) {b->start = ngx_palloc(sc->pool, size);if (b->start == NULL) {ngx_mypo_close_connection(sc);return ;}b->pos = b->start;b->last = b->start;b->end = b->last + size;}n = sc->recv(sc, b->last, size);if (n == NGX_AGAIN) {if (!rev->timer_set) {ngx_add_timer(rev, sc->listening->post_accept_timeout);ngx_reusable_connection(sc, 1);}if (ngx_handle_read_event(rev, 0) != NGX_OK) {ngx_mypo_close_connection(sc);return ;}/** We are trying to not hold c->buffer's memory for an idle connection.*/if (ngx_pfree(sc->pool, b->start) == NGX_OK) {b->start = NULL;}return;}if (n == NGX_ERROR) {ngx_mypo_close_connection(sc);return;}if (n == 0) {ngx_mypo_close_connection(sc);return ;}b->last += n;sc->log->action = "reading client request line";ngx_reusable_connection(sc, 0);ngx_chain_t out[1];//b = ngx_pcalloc(sc->pool, sizeof(ngx_buf_t));out[0].buf = b;out[0].next = NULL;//b->pos = (u_char*)"hello_world, from Nginx";//b->last = b->pos + sizeof("hello_world, from Nginx") - 1;//b->memory = 1;ngx_mypo_send(cevent, out, 0);ngx_mypo_close_connection(cc);return;
}

方法ngx_mypo_upstream_send_data的定义:

static void ngx_mypo_upstream_send_data(ngx_event_t *rev)
{ngx_buf_t                 *b;ngx_connection_t          *sc, *cc;ngx_event_t               *cevent;//一旦connect上流服务器成功,可以发送数据的时候就会调用到这个方法。sc = rev->data;cevent = sc->data;cc = cevent->data;b = cc->buffer;//客户端接收到数据以后已经封装到buffer中了,现在只需要把它读出来,转发给上流服务器。ngx_chain_t out[1];out[0].buf = b;out[0].next = NULL;ngx_mypo_send(rev, out, 0);return;
}

方法ngx_mypo_send:


static void ngx_mypo_send(ngx_event_t *wev, ngx_chain_t *in, off_t limit)
{ngx_connection_t                   *cc;ngx_chain_t                        *cl;cc = wev->data;if (cc->destroyed) {return;}if (wev->timedout) {ngx_log_error(NGX_LOG_INFO, cc->log, NGX_ETIMEDOUT,"netcall: client send timed out");cc->timedout = 1;ngx_mypo_close_connection(cc);return;}if (wev->timer_set) {ngx_del_timer(wev);}cl = cc->send_chain(cc, in, limit);if (cl == NGX_CHAIN_ERROR) {ngx_mypo_close_connection(cc);return;}/* more data to send? */if (cl) {ngx_add_timer(wev, 10000);if (ngx_handle_write_event(wev, 0) != NGX_OK) {ngx_mypo_close_connection(cc);}return;}/* we've sent everything we had.* now receive reply */ngx_del_event(wev, NGX_WRITE_EVENT, 0);}

第五步,把自定义模块编译进nginx

有两种方法:
第一种修改ojbs目录下的makefile和ngx_modules.c文件,以http模块为模板,修改成mypo。
ngx_modules.c:

//声明外部变量
extern ngx_module_t ngx_mypo_module;
extern ngx_module_t ngx_mypo_core_module;ngx_module_t *ngx_modules[] = {
......&ngx_mypo_module,&ngx_mypo_core_module,NULL
};

makefile:


#"..."是Makefile原来的配置,由于mypo模块是放到nginx目录里的src下,
#所以路径是"./src/mypo/",如果是其他模块,可以把该模块的绝对路径替换"./src/mypo/"即可。
CFLAGS =  ... -I./src/mypo/ADDON_DEPS = ... ./src/mypo/ngx_mypo_core_module.h  ./src/mypo/ngx_mypo_config.h ./src/mypo/ngx_mypo.h objs/nginx: ... \objs/addon/mypo/ngx_mypo.o \objs/addon/mypo/ngx_mypo_core_module.o \...$(LINK) -o ... \objs/addon/mypo/ngx_mypo.o \objs/addon/mypo/ngx_mypo_core_module.o \...objs/addon/mypo/ngx_mypo.o: $(ADDON_DEPS) \./src/mypo//ngx_mypo.c$(CC) -c $(CFLAGS)  $(ALL_INCS) \-o objs/addon/mypo/ngx_mypo.o \./src/mypo/ngx_mypo.cobjs/addon/mypo/ngx_mypo_core_module.o: $(ADDON_DEPS) \./src/mypo//ngx_mypo_core_module.c$(CC) -c $(CFLAGS)  $(ALL_INCS) \-o objs/addon/mypo/ngx_mypo_core_module.o \./src/mypo/ngx_mypo_core_module.c

第二种,在mypo目录下添加config文件,编译nginx前configure的时候加入mypo模块的目录到–add-module选项,这种方法简单,但之前configure没有保存的话,之前的编译配置都会被清掉,所以不要轻易地configure。
configure文件的内容:

ngx_addon_name="ngx_mypo_module"CORE_MODULES="$CORE_MODULESngx_mypo_module                             \ngx_mypo_core_module                        \"NGX_ADDON_DEPS="$NGX_ADDON_DEPS                             \$ngx_addon_dir/ngx_mypo_core_module.h       \$ngx_addon_dir/ngx_mypo_config.h            \$ngx_addon_dir/ngx_mypo.h                   \"NGX_ADDON_SRCS="$NGX_ADDON_SRCS                             \$ngx_addon_dir/ngx_mypo.c                   \$ngx_addon_dir/ngx_mypo_core_module.c       \"
CFLAGS="$CFLAGS -I$ngx_addon_dir"

最后归纳一下调用到的nginx API:

void *ngx_pcalloc(ngx_pool_t *pool, size_t size);
char *ngx_conf_parse(ngx_conf_t *cf, ngx_str_t *filename);
void *ngx_array_push(ngx_array_t *a);size_t
ngx_sock_ntop(struct sockaddr *sa, socklen_t socklen, u_char *text, size_t len,ngx_uint_t port);#define ngx_memzero(buf, n)       (void) memset(buf, 0, n)ngx_array_t *
ngx_array_create(ngx_pool_t *p, ngx_uint_t n, size_t size);static ngx_inline ngx_int_t
ngx_array_init(ngx_array_t *array, ngx_pool_t *pool, ngx_uint_t n, size_t size);ngx_int_t
ngx_parse_url(ngx_pool_t *pool, ngx_url_t *u);ngx_listening_t *
ngx_create_listening(ngx_conf_t *cf, void *sockaddr, socklen_t socklen);ngx_int_t
ngx_connection_local_sockaddr(ngx_connection_t *c, ngx_str_t *s,ngx_uint_t port);void
ngx_close_connection(ngx_connection_t *c);ngx_buf_t *
ngx_create_temp_buf(ngx_pool_t *pool, size_t size);void
ngx_reusable_connection(ngx_connection_t *c, ngx_uint_t reusable);ngx_int_t
ngx_handle_read_event(ngx_event_t *rev, ngx_uint_t flags);ngx_int_t
ngx_event_connect_peer(ngx_peer_connection_t *pc);

代码看多了,就会觉得,再庞大的系统如Linux内核、rtos之类的,核心的就几个,其他的都是按照那几个核心模板复制出来的。不用自己想的,直接套模板,跟着代码节奏去走。就好像开车那样,开到一定境界,不用特意去想怎么加油、挂当,车自己就会走了,更高的境界就是自己去改装车子了,让车子变成自己的手和脚。

顺便也谈谈这些年心得吧,其实做网络开发,阅读服务端代码,不管是nginx还是Apache,还是其他的abcd系统,只要是网络服务器的,就一定不会逃过以下的几步:
1、创建socket;
2、绑定(bind)地址和端口号;
3、监听(listen)端口;(如果是udp,这步就跳过了)
4、接收(accept)客户端链接;(udp这步也跳过)
5、读写数据,业务处理。这里能够调用的函数种类就很多了,有read/write、rec/send、recfrom/sendto等等;
这几步说是各个网络服务类代码的主干,其他都是旁枝末节,如果看代码的时候在旁枝末节上迷了路,可以回到主干上寻找突破口。

Nginx服务模块开发相关推荐

  1. 《深入理解Nginx:模块开发与架构解析》一1.2 为什么选择Nginx

    1.2 为什么选择Nginx 为什么选择Nginx?因为它具有以下特点: (1)更快 这表现在两个方面:一方面,在正常情况下,单次请求会得到更快的响应:另一方面,在高峰期(如有数以万计的并发请求),N ...

  2. 《深入理解Nginx:模块开发与架构解析》一1.6 Nginx的命令行控制

    1.6 Nginx的命令行控制 在Linux中,需要使用命令行来控制Nginx服务器的启动与停止.重载配置文件.回滚日志文件.平滑升级等行为.默认情况下,Nginx被安装在目录/usr/local/n ...

  3. nginx的模块开发

    nginx刚刚在国内开始流行的时候, 我就把它引入公司技术体系,用来替代apache主要做动静分离. nginx的并发处理能力和稳定性,以及优秀的软件架构深深得吸引了我, 让我跨入了高性能服务器开发的 ...

  4. 《深入理解Nginx:模块开发与架构解析》一3.3 如何将自己的HTTP模块编译进Nginx...

    3.3 如何将自己的HTTP模块编译进Nginx Nginx提供了一种简单的方式将第三方的模块编译到Nginx中.首先把源代码文件全部放到一个目录下,同时在该目录中编写一个文件用于通知Nginx如何编 ...

  5. 推荐我的新书《深入理解Nginx:模块开发与架构解析》

    http://www.china-pub.com/STATIC/zt_mb/zt_huodong_2013_3.asp?filename=2013_jsj_nginx_20130401 目录 < ...

  6. Nginx模块开发入门

    前言 Nginx是当前最流行的HTTP Server之一,根据W3Techs的统计,目前世界排名(根据Alexa)前100万的网站中,Nginx的占有率为6.8%.与Apache相比,Nginx在高并 ...

  7. 深入理解Nginx 模块开发与架构解析-陶辉 读书笔记

    前言 1. nginx是一个优秀的事件驱动框架,nginx非常适合开发在传输层以TCP对外提供服务的服务器程序.基于nginx框架开发程序有5个优势: * nginx将网络.磁盘及定时器等异步事件的驱 ...

  8. Nginx 模块开发

    Nginx 模块概述 Nginx 模块有三种角色: 处理请求并产生输出的 Handler 模块: 处理由 Handler 产生的输出的 Filter(滤波器)模块: 当出现多个后台服务器时,Load- ...

  9. 《深入理解NGINX 模块开发与架构解析》之摘抄学习

    1.基于Nginx框架开发程序有5个优势: (1).Nginx将网络.磁盘及定时器等异步事件的驱动都做了非常好的封装,基于它开发将可以忽略这些事件处理的细节; (2).Nginx封装了许多平台无关的接 ...

最新文章

  1. Sql高级查询(三)
  2. 运行错误5无效的过程调用或参数_FANUC系统常用参数汇总
  3. pytorch 图像增强
  4. 工作队列 ( workqueue )
  5. 程序员自学路上的一些感悟
  6. lucene 多字段查询-MultiFieldQueryParser
  7. Ubuntu18.04/16.04 安装glog
  8. 《当程序员的那些狗日日子》三
  9. 【第三课】Arcgis软件详细介绍
  10. springboot Could not resolve placeholder
  11. Word 2019如何从任意页开始设置页码?
  12. 解读Android日志
  13. 新建一个工作空间,复制原来工作空间的配置
  14. 质数的判断(Python)找到100(1000)以内的所有质数
  15. Yoshua Bengio——《Deep Learning》学习笔记1
  16. audio音频播放标签样式优化自定义
  17. 20个月股票投资复盘:在被割韭菜中成长
  18. 软件开发常用工具和网站
  19. 流体力学控制方程——能量方程
  20. RHEL 7.1 系统部署OpenStack kilo版本

热门文章

  1. 文件传输协议(FTP)
  2. matlab 汉字特征提取,MATLAB特征提取代码
  3. 安卓开发板之串口通信,通过modbus Rtu协议控制下位机
  4. STM32F1与STM32CubeIDE编程实例-振动传感器驱动
  5. Spark RDD使用详解--RDD原理
  6. 钉钉二次开发-组织机构同步 获取用户信息 单点登录接口
  7. 校园防疫管理系统功能详解,请查看
  8. 金属新材料行业:一体压铸,一触即发
  9. 嵌入式端音频开发(基础篇)之2021-2022年国内主流语音识别芯片科普(1)
  10. 五家共井 穷举法_第5讲地图着色问题.ppt