master-worker进程间通信

1 内部流程 — socketpair 基础

进程是按照顺序创建的,那么后创建的进程是怎么知道前面已经创建的进程呢
实际上子进程work_process之间也是通过这些socket进行通信的 : master父进程每次fork一个新进程的时候都会把这个新进程的信息告知前面已经生成的子进程

当 socketpair 执行成功时,sv[2] 这两个套接字具备下列关系:
向 sv[0] 套接字写入数据,将可以从 sv[1] 套接字中读取到刚写入的数据;
同样,向 sv[1] 套接字写入数据,也可以从 sv[0] 中读取到写入的数据。

通常,在父、子进程通信前,会先调用 socketpair 方法创建这样一组套接字,在调用 fork 方法创建出子进程后,将会在父进程中关闭 sv[1] 套接字,仅使用 sv[0] 套接字用于向子进程发送数据以及接收子进程发送来的数据;
而在子进程中则关闭 sv[0] 套接字,仅使用 sv[1] 套接字既可以接收父进程发送来的数据,也可以向父进程发送数据。
先使用socketpair创建本地套接字,然后fork父子进程会各有一份,在父进程中关闭sv[1] 往 sv[0]写,在子进程中读sv[1],关闭sv[0],至此在父进程中写sv[0],在子进程中读sv[1]从而完成了父进程写,子进程读
在父进程中关闭的ngx_processes[n].channel[1]

        if (close(ngx_processes[n].channel[1]) == -1) {ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,"close() channel failed");}

在子进程中关闭 ngx_processes[ngx_process_slot].channel[0]

    // 在子进程中关闭了ngx_processes[ngx_process_slot].channel[0]if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) {ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,"close() channel failed");}
主要是通过socketpair()函数实现的,下面捋一下内部流程:
1. 话说要从ngx_start_worker_processes函数讲起:
static void
ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type)
{ngx_int_t      i;ngx_channel_t  ch;ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start worker processes");ch.command = NGX_CMD_OPEN_CHANNEL;/*n是由配置文件中获取,子进程个数*/for (i = 0; i < n; i++) {/*子进程创建函数,下面具体讲*/ngx_spawn_process(cycle, ngx_worker_process_cycle,(void *) (intptr_t) i, "worker process", type);ch.pid = ngx_processes[ngx_process_slot].pid;ch.slot = ngx_process_slot;ch.fd = ngx_processes[ngx_process_slot].channel[0];/*父进程中执行该函数,主要像前面各个子进程的channel[0]发送消息*/ngx_pass_open_channel(cycle, &ch);}
}ngx_pid_t
ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data,char *name, ngx_int_t respawn)
{//...//.../*创建一对套接字讲用于父子进程间通信*/if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1){ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,"socketpair() failed while spawning \"%s\"", name);return NGX_INVALID_PID;}//.../*各种套接字属性设置*/ngx_channel = ngx_processes[s].channel[1];//ngx_channel干啥用的?留个悬念//.../*重点来了,创建子进程*/pid = fork();switch (pid) {case -1:ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,"fork() failed while spawning \"%s\"", name);ngx_close_channel(ngx_processes[s].channel, cycle->log);return NGX_INVALID_PID;case 0:ngx_pid = ngx_getpid();/*调用ngx_worker_process_cycle函数,子进程死循环处理流程,下面再讲*/proc(cycle, data);break;default:break;}//.../*对子进程的相关信息进行保存*/ngx_processes[s].pid//.../*数组下标加1*/if (s == ngx_last_process) {ngx_last_process++;}return pid;
}ngx_spawn_process()函数是在一个for循环中调用,假如有4个子进程,也就是说会进行四次
socketpair()的创建执行,每一次的创建,ngx_processes[s].channel都会有两个fd生成,
假设进行第一次循环时,ngx_processes[0].channel里面已经有ngx_processes[0].channel[0]和
ngx_processes[0].channel[1],继续往下执行,执行子进程创建执行proc(cycle, data);时,调用
static void
ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{//.../*子进程初始化相关工作*/ngx_worker_process_init(cycle, worker);//...for( ;; ){/*子进程死循环,虽然里面内容很重要,但与本节无关*/}
}
static void
ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker)
{//...//.../*for循环,进行了下异常处理*/for (n = 0; n < ngx_last_process; n++) {}/*在子进程中关闭掉channel[0],只需要用channel[1]*/if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) {ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,"close() channel failed");}/*这个比较重要,但还没理解透*/if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT,ngx_channel_handler)== NGX_ERROR){/* fatal */exit(2);}
}
ngx_int_t
ngx_add_channel_event(ngx_cycle_t *cycle, ngx_fd_t fd, ngx_int_t event,ngx_event_handler_pt handler)
{//...//...ev = (event == NGX_READ_EVENT) ? rev : wev;/*将ngx_channel_handler函数挂在ev->handler上*/ev->handler = handler;/*如果是epoll,则执行ngx_epoll_add_connection函数,在里面可以看到if (epoll_ctl(ep, EPOLL_CTL_ADD, c->fd, &ee) == -1) {ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno,"epoll_ctl(EPOLL_CTL_ADD, %d) failed", c->fd);return NGX_ERROR;}将描述符第二个参数fd,而fd就是ngx_channel加入到了事件中ngx_channel在哪里?ngx_channel来自于ngx_spawn_process()中ngx_channel = ngx_processes[s].channel[1];
*/if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {if (ngx_add_conn(c) == NGX_ERROR) {ngx_free_connection(c);return NGX_ERROR;}} else {/*添加事件,一旦有可读事件到来时,执行ev->handler*/if (ngx_add_event(ev, event, 0) == NGX_ERROR) {ngx_free_connection(c);return NGX_ERROR;}}return NGX_OK;
}/*当有可读事件来的时候,触发该函数*/
static void
ngx_channel_handler(ngx_event_t *ev)
{//...ngx_connection_t  *c;c = ev->data; //下面的c->fd来自哪里?/*ev->data,*///...for ( ;; ) {/*里面就是调用recvmsg(s, &msg, 0);读取消息*/n = ngx_read_channel(c->fd, &ch, sizeof(ngx_channel_t), ev->log);switch(){//...case NGX_CMD_OPEN_CHANNEL://.../*保存各个槽位的子进程信息*///...}}//...
}
从上面可以小结一下,感觉一下,子进程拥有套接字ngx_processes[s].channel[1],并加入
了可读事件中,一直等待着读,即等待着调用recvmsg(),那么由谁来sendmsg呢?通过哪个
套接字呢?
继续:
视线回到ngx_spawn_process()函数中,该函数带领我们一步步走进子进程的处理过程,回到
该函数调用的地方即函数ngx_start_worker_processes()中,它下面接着执行ch.pid、ch.slot
、ch.fd的赋值,用处在下面:接着调用
/*注意调用该函数是在父进程中执行*/
static void
ngx_pass_open_channel(ngx_cycle_t *cycle, ngx_channel_t *ch)
{/*for循环,细细体味下,假如多个子进程时,通过该循环向不同的channel[0]发送msg*/for (i = 0; i < ngx_last_process; i++) {/*一些异常处理*///.../*该函数实际上就是调用sendmsg(s, &msg, 0);进行消息的发送*//*注意参数:第一个参数ngx_processes[i].channel[0]就是要发送的fd,其实也在ch里面包含着*/ngx_write_channel(ngx_processes[i].channel[0],ch, sizeof(ngx_channel_t), cycle->log);}
}
综上: 创建一个子进程时,父进程就会向各个channel[0]中sendmsg,子进程从channel[1]recvmsg

2 socketpair 函数可创建本地套接字

ngx_channel_t 频道是 Nginx master 进程与 worker 进程之间通信的常用工具,它是使用本机套接字实现的,即 socketpair 方法,它用于创建父子进程间使用的套接字。

#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
int socketpair(int domain, int type, int protocol, int sv[2]);
这个方法可以创建一对关联的套接字 sv[2]。
domain:表示域,在 Linux 下通常取值为 AF_UNIX;
type:取值为 SOCK_STREAM 或 SOCK_DGRAM,它表示在套接字上使用的是 TCP 还是 UDP;
protocol:必须传递 0;
sv[2]:是一个含有两个元素的整型数组,实际上就是两个套接字。
当 socketpair 返回 0 时,sv[2] 这两个套接字创建成功,否则 sockpair 返回 -1 表示失败.
当 socketpair 执行成功时,sv[2] 这两个套接字具备下列关系:
向 sv[0] 套接字写入数据,将可以从 sv[1] 套接字中读取到刚写入的数据;
同样,向 sv[1] 套接字写入数据,也可以从 sv[0] 中读取到写入的数据。

通常,在父、子进程通信前,会先调用 socketpair 方法创建这样一组套接字,在调用 fork 方法创建出子进程后,将会在父进程中关闭 sv[1] 套接字,仅使用 sv[0] 套接字用于向子进程发送数据以及接收子进程发送来的数据;
而在子进程中则关闭 sv[0] 套接字,仅使用 sv[1] 套接字既可以接收父进程发送来的数据,也可以向父进程发送数据。
ngx_channel_t 结构体是 Nginx 定义的 master 父进程与 worker 子进程间的消息格式,如下:

typedef struct {// 传递的 TCP 消息中的命令ngx_uint_t  command;// 进程 ID,一般是发送命令方的进程 IDngx_pid_t   pid;// 表示发送命令方在 ngx_processes 进程数组间的序号ngx_int_t   slot;// 通信的套接字句柄ngx_fd_t    fd;
}ngx_channel_t;

Nginx 针对 command 成员定义了如下命令:
// 打开频道,使用频道这种方式通信前必须发送的命令
#define NGX_CMD_OPEN_CHANNEL 1
// 关闭已经打开的频道,实际上也就是关闭套接字
#define NGX_CMD_CLOSE_CHANNEL 2
// 要求接收方正常地退出进程
#define NGX_CMD_QUIT 3
// 要求接收方强制地结束进程
#define NGX_CMD_TERMINATE 4
// 要求接收方重新打开进程已经打开过的文件
#define NGX_CMD_REOPEN 5
问:master 是如何启动、停止 worker 子进程的?
答:正是通过 socketpair 产生的套接字发送命令的,即每次要派生一个子进程之前,都会先调用 socketpair 方法。
在 Nginx 派生子进程的 ngx_spawn_process 方法中,会首先派生基于 TCP 的套接字,如下:

ngx_pid_t
ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, char *name, ngx_int_t respawn)
{if (respawn != NGX_PROCESS_DETACHED) {  /* Solaris 9 still has no AF_LOCAL */// ngx_processes[s].channel 数组正是将要用于父、子进程间通信的套接字对if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1){return NGX_INVALID_PID;}// 将 channel 套接字对都设置为非阻塞模式if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) {ngx_close_channel(ngx_processes[s].channel, cycle->log);return NGX_INVALID_PID;}if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) {ngx_close_channel(ngx_processes[s].channel, cycle->log);return NGX_INVALID_PID;}...
}
ngx_processes 数组定义了 Nginx 服务中所有的进程,包括 master 进程和 worker 进程,如下:
#define NGX_MAX_PROCESSES 1024

// 虽然定义了 NGX_MAX_PROCESSES 个成员,但是已经使用的元素仅与启动的进程个数有关
ngx_processes_t ngx_processes[NGX_MAX_PROCESSES];
ngx_processes 数组的类型是 ngx_processes_t,对于频道来说,这个结构体只关心它的 channel 成员:

typedef struct {...// socketpair 创建的套接字对ngx_socket_t channel[2];
}ngx_processes_t;
1. ngx_write_channel:使用频道发送 ngx_channel_t 消息
ngx_int_t
ngx_write_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size,ngx_log_t *log)
{ssize_t         n;ngx_err_t       err;struct iovec    iov[1];struct msghdr   msg;#if (NGX_HAVE_MSGHDR_MSG_CONTROL)union {struct cmsghdr  cm;char            space[CMSG_SPACE(sizeof(int))];}cmsg;if (ch->fd == -1) {msg.msg_control = NULL;msg.msg_controllen = 0;} else {// 辅助数据msg.msg_control = (caddr_t)&cmsg;msg.msg_controllen = sizeof(cmsg);ngx_memzero(&cmsg, sizeof(cmsg));cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));cmsg.cm.cmsg_level = SOL_SOCKET;cmsg.cm.cmsg_type = SCM_RIGHTS;/** We have to use ngx_memcpy() instead of simple*   *(int *) CMSG_DATA(&cmsg.cm) = ch->fd;* because some gcc 4.4 with -O2/3/s optimization issues the warning:*   dereferencing type-punned pointer will break strict-aliasing rules** Fortunately, gcc with -O1 compiles this ngx_memcpy()* in the same simple assignment as in the code above*/ngx_memcpy(CMSG_DATA(&cmsg.cm), &ch->fd, sizeof(int));}   msg.msg_flags = 0;
#elseif (ch->fd == -1) {msg.msg_accrights = NULL;msg.msg_accrightslen = 0;} else {msg.msg_accrights = (caddr_t) &ch->fd;msg.msg_accrightslen = sizeof(int);}
#endif// 指向要发送的 ch 起始地址iov[0].iov_base = (char *) ch;iov[0].iov_len = size;// msg_name 和 msg_namelen 仅用于未连接套接字(如UDP)msg.msg_name = NULL;msg.msg_namelen = 0;msg.msg_iov = iov;msg.msg_iovlen = 1;// 将该 ngx_channel_t 消息发出去n = sendmsg(s, &msg, 0);if (n == -1) {err = ngx_errno;if (err == NGX_EAGAIN) {return NGX_AGAIN;}return NGX_ERROR;}  return NGX_OK;
}
2. ngx_read_channel: 读取消息
ngx_int_t
ngx_read_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size, ngx_log_t *log)
{ssize_t             n;ngx_err_t           err;struct iovec        iov[1];struct msghdr       msg;
#if (NGX_HAVE_MSGHDR_MSG_CONTROL)union {struct cmsghdr  cm;char            space[CMSG_SPACE(sizeof(int))];} cmsg;
#elseint                 fd;
#endifiov[0].iov_base = (char *)ch;iov[0].iov_len = size;msg.msg_name = NULL;msg.msg_namelen = 0;msg.msg_iov = iov;msg.msg_iovlen = 1;#if (NGX_HAVE_MSGHDR_MSG_CONTROL)msg.msg_control = (caddr_t) &cmsg;msg.msg_controllen = sizeof(cmsg);
#elsemsg.msg_accrights = (caddr_t) &fd;msg.msg_accrightslen = sizeof(int);
#endif// 接收命令n = recvmsg(s, &msg, 0);if (n == -1) {err = ngx_errno;if (err == NGX_EAGAIN) {return NGX_AGAIN;}return NGX_ERROR;}if (n == 0) {return NGX_ERROR;}  // 接收的数据不足if ((size_t) n < sizeof(ngx_channel_t)) {return NGX_ERROR;}
#if (NGX_HAVE_MSGHDR_MSG_CONTROL)  // 若接收到的命令为"打开频道,使用频道这种方式通信前必须发送的命令"if (ch->command == NGX_CMD_OPEN_CHANNEL) {if (cmsg.cm.cmsg_len < (socklen_t) CMSG_LEN(sizeof(int))) {return NGX_ERROR;}   if (cmsg.cm.cmsg_level != SOL_SOCKET || cmsg.cm.cmsg_type != SCM_RIGHTS){return NGX_ERROR;}      /* ch->fd = *(int *) CMSG_DATA(&cmsg.cm); */ngx_memcpy(&ch->fd, CMSG_DATA(&cmsg.cm), sizeof(int));} // 若接收到的消息是被截断的if (msg.msg_flags & (MSG_TRUNC|MSG_CTRUNC)) {ngx_log_error(NGX_LOG_ALERT, log, 0,"recvmsg() truncated data");}
#else if (ch->command == NGX_CMD_OPEN_CHANNEL) {if (msg.msg_accrightslen != sizeof(int)) {return NGX_ERROR;}    ch->fd = fd;}
#endifreturn n;
}

在 Nginx 中,目前仅存在 master 进程向 worker 进程发送消息的场景,这时对于 socketpair 方法创建的 channel[2] 套接字来说,master 进程会使用 channel[0] 套接字来发送消息,而 worker 进程会使用 channel[1] 套接字来接收消息。

  1. ngx_add_channel_event: 把接收频道消息的套接字添加到 epoll 中
    worker 进程调度 ngx_read_channel 方法接收频道消息是通过该 ngx_add_channel_event 函数将接收频道消息的套接字(对于 worker 即为channel[1])添加到 epoll 中,当接收到父进程消息时子进程会通过 epoll 的事件回调相应的 handler 方法来处理这个频道消息,如下:
ngx_int_t
ngx_add_channel_event(ngx_cycle_t *cycle, ngx_fd_t fd, ngx_int_t event, ngx_event_handler_pt handler)
{ngx_event_t         *ev, *rev, *wev;ngx_connection_t    *c;// 获取一个空闲连接c = ngx_get_connection(fd, cycle->log);if (c == NULL) {return NGX_ERROR;}c->pool = cycle->pool;rev = c->read;wev = c->write;rev->log = cycle->log;wev->log = cycle->log;rev->channel = 1;wev->channel = 1; ev = (event == NGX_READ_EVENT) ? rev : wev; // 初始化监听该 ev 事件时调用的回调函数ev->handler = handler; // 将该接收频道消息的套接字添加到 epoll 中if (ngx_add_conn && (ngx_event_flags && NGX_USE_EPOLL_EVENT) == 0) {// 这里是同时监听该套接字的读、写事件if (ngx_add_conn(c) == NGX_ERROR) {ngx_free_connection(c);return NGX_ERROR;}} else {// 这里是仅监听 ev 事件if (ngx_add_event(ev, event, 0) == NGX_ERROR) {ngx_free_connection(c);return NGX_ERROR;}} return NGX_OK;
}
4. ngx_close_channel: 关闭这个频道通信方式
void
ngx_close_channel(ngx_fd_t *fd, ngx_log_t *log)
{if (close(fd[0]) == -1) {}if (close(fd[1]) == -1) {}
}

参考资料:

  1. https://blog.csdn.net/xyyaiguozhe/article/details/14124627
  2. https://www.cnblogs.com/jimodetiantang/p/9191092.html

nginx master-worker进程间通信相关推荐

  1. Scala基于Akka模拟Spark Master Worker进程间通信(二):Worker定时向Master心跳

    最终效果 Master package cn.zxl.spark.masterimport akka.actor.{Actor, ActorRef, ActorSystem, Props} impor ...

  2. Scala基于Akka模拟Spark Master Worker进程间通信(一):Worker向Master注册

    最终效果 master: worker: 思路分析 Master代码 package cn.zxl.spark.masterimport akka.actor.{Actor, ActorRef, Ac ...

  3. Nginx源码分析:master/worker工作流程概述

    nginx源码分析 nginx-1.11.1 参考书籍<深入理解nginx模块开发与架构解析> Nginx的master与worker工作模式 在生成环境中的Nginx启动模式基本都是以m ...

  4. Kubernetes 笔记(06)— 搭建多节点集群、kubeadm 安装 master/worker/console/flannel 网络插件

    1. kubeadm 官网:https://kubernetes.io/zh-cn/docs/reference/setup-tools/kubeadm/ 为了简化 Kubernetes 的部署工作, ...

  5. Nginx中worker connections问题的解决方法

    这篇文章主要介绍了Nginx中worker connections问题的解决方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下 查看日志,有一个[warn]: 3660#0: 20000 work ...

  6. Storm通信机制,Worker进程间通信,Worker进程间通信分析,Worker进程间技术(Netty、ZeroMQ),Worker 内部通信技术(Disruptor)(来自学习资料)

    Storm通信机制 Worker间的通信经常需要通过网络跨节点进行,Storm使用ZeroMQ或Netty(0.9以后默认使用)作为进程间通信的消息框架. Worker进程内部通信:不同worker的 ...

  7. nginx没有worker进程_如何优雅地关闭worker进程?

    点击上方"武培轩",选择"设为星标" 技术文章第一时间送达! 之前我们讲解 Nginx 命令行的时候,可以看到 Nginx 停止有两种方式,分别是 nginx ...

  8. linux 并行 模式,并行设计模式-Master/Worker

    Master-Worker设计模式核心思想是将原来串行的逻辑并行化,并将逻辑拆分成很多独立模块并行执行,其中主要包含两个主要组件Master和Worker,Master主要讲逻辑进行查分,拆分为互相独 ...

  9. centos7.6使用kubeadm安装kubernetes的master worker节点笔记及遇到的坑

    个人博客原文地址:http://www.lampnick.com/php/760 本文目标 安装docker及设置docker代理 安装kubeadm 使用kubeadm初始化k8s Master节点 ...

  10. nginx优化worker进程最大打开文件数worker_rlimit_nofile 65535

    来源:https://blog.csdn.net/ljx1528/article/details/87362561 性能优化-优化worker进程最大打开文件数worker_rlimit_nofile ...

最新文章

  1. 2022-2028年中国自动驾驶系统行业现状调研分析报告
  2. R语言nchar函数统计字符串中字符个数实战
  3. LintCode 1.A+B的问题
  4. Kotlin 越来越牛逼了!学Java都我想转了!
  5. c#调用c++的dll接口
  6. python数据结构简单总结
  7. nodejs与javascript中的aes加密
  8. Android 系统(58)---Android 系统 UI - SystemUI之功能介绍和UI布局实现
  9. IFC2x3标准阅读
  10. Linux中进程正常退出return和exit()的区别
  11. jedis操作set_使用 JedisAPI 操作 Redis
  12. 树莓派Python开发第8课: PWM实验
  13. python爬取数据教程_python爬虫爬取网页数据并解析数据
  14. OpenAI Whisper论文笔记
  15. 51单片机通过WIFI模块ESP8266控制LED灯
  16. 根号下的X平方加一C语言,根号下x平方加一分之一怎样积分
  17. maven依赖本地非repository中的jar包-依赖jar包放在WEB-INF/lib等目录下的情况客户端编译出错的处理...
  18. Web应用与Web框架
  19. 读书笔记——弗洛伊德《梦的解析》
  20. Apache Beam指南

热门文章

  1. 搭建Hadoop集群
  2. 29.3 用户模式构造
  3. vue.js 源代码学习笔记 ----- instance inject
  4. 设置配置高可用的Mysql双机热备(Mysql_HA)
  5. HCIE-Security Day27:IPSec:实验(二)两个网关之间通过手工方式创建IPSec PN隧道
  6. HCIE-RS面试--P/A协商(超详细!)
  7. ajax音乐网站,AJAX在线音乐网站(5)测试
  8. php提取bing背景,PHP代码获取bing每日背景
  9. babel css3新特性_css3 transform属性多值的顺序问题
  10. 计算机网路网络层之IP协议(4)——有类IP地址