目录

前言

匿名管道进程间通信

进程间管道 的创建与图解

MediaSoup中的管道创建

MediaSoup Channel的创建

NodeJs和 C++ 管道通信的过程

MediaSoup 消息确认与事件通知

小结


前言

上篇文章对MediaSoup源码的调试方法  以及运行时分析、调试、查看核心信息  【流媒体服务器Mediasoup】 源码中重要类基本概念 、上层代码作用详解、底层C++类关系详解(四),本章节主要对MediaSoup的源码中源码中NodeJs与C++信令通信详解,以及讲解在Linux下管道通信的使用

在下一篇文章中将继续对MediaSoup的源码进行分析和架构的讲解。

匿名管道进程间通信

Linux下常见的进程间通讯方式(IPC)

1:管道:匿名管道,有名管道

匿名管道需要有关联如父子进程时间的联系

2:Socket:unixsocket,普通的socket

3:共享内存

4:信号 (kill命令)

管道的原理: 管道实为内核使用环形队列机制,借助内核缓冲区(4k)实现。

对管道的具体概念不进行详解,如需可以参考网上资料。

进程间管道 的创建与图解

    

  前三个是固定的特殊的管道,标准输入,标准输出,标准错误,最后两个由两个文件描述符引用,一个表示读端,一个表示          写端。  

对于fd[0]=3 或者4来说 既可以读也可以写为全双工通信。fd[0]=4 写入的数据需要 fd[0]=3 来读取,反之fd[0]=3 写入的数据需要fd[0]= 4 来读取 ,这是只有一个进程的情况下创建的socketpair。但fork了子进程后,子进程的fd[0]=3和fd[0]=4 一样指向了 读和写管道,这样很容易造成混乱。

        那么父进程fork子进程后,显然 如果同时写和读会造成混乱,下图改进方案为

                                                   

   父进程发送信息给子进程,则就父写子读,就形成半双工的父进程到子进程的通信

         Mediasoup底层使用的是Unixsocket全双工进程间通讯.那么要想实现全双工, 修改后的方案如下

                                                    

最后实现的原理:

  父进程关掉3,子进程关闭掉4,父进程用4去接收或者发送管道,子进程用3去接收或者发送管道

     代码实战

       代码中给出了明显的注释,阅读后来看下最后运行的结果

#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <sys/socket.h>
const char* str = "SOCKET PAIR TEST.";
int main(int argc, char* argv[]){   int    socket_pair[2];pid_t  id;//创建socketpairif(socketpair(AF_UNIX, SOCK_STREAM, 0, socket_pair) == -1){     printf("Error, socketpair create failed, errno(%d): %s\n", errno, strerror(errno));return EXIT_FAILURE;}  //创建子进程id =fork(); if(id == 0){//子进程char buffer[512]={0, };//子进程关闭管道4close(socket_pair[1]);while(1){//发送数据给父进程printf("childer send \n");write(socket_pair[0], str , strlen(str));sleep(1);            //接收父进程数据ssize_t len = read(socket_pair[0], buffer, sizeof(buffer));if(len > 0 ){buffer[len] = '\0';printf("childer: recv from parent : %s \n",buffer);    }}      }else if(id > 0){//父进程char buffer[512]={0, };//父进程关闭管道3close(socket_pair[0]);       while(1){//接收子进程数据ssize_t len = read(socket_pair[1], buffer, sizeof(buffer));if(len > 0 ){buffer[len] = '\0';printf("father: recv from childer : %s \n",buffer);               }sleep(1);//发送数据给父进程printf("father send \n");write(socket_pair[1], str , strlen(str));}       }else{      printf("Error, fork failed, errno(%d): %s\n", errno, strerror(errno));return EXIT_FAILURE;}   return 0;
}

 root@ubuntu:/work/guo#   g++ hello.cpp -o hello  //编译

  root@ubuntu:/work/guo#   ./hello                          //运行

最后运行结果查看:

从结果中可以看到父进程发子进程收,子进程发父进程收,这样就达到了父子进程的通讯效果。

     在MediaSoup中在NodeJs层  根据CPU核心数量 用spawn 创建了这么多数量的创建了Woker(子进程),那么NodeJs层和C++层的子进程这个是通过这种管道通信方式进行信令的传输。

MediaSoup中的管道创建

管道在源码中 哪里创建,怎么创建?

前面了解那么多我们知道Worker 在底层实际上是一个进程或者说子进程,那么管道是用来进程间通讯的,可以猜测到创建

管道的地方在和woker有关的类中,那么根据源码一层层剖析下去。

首先定位  mediasoup-demo/server/server.js

/*** Launch as many mediasoup Workers as given in the configuration file.*/
async function runMediasoupWorkers()
{const { numWorkers } = config.mediasoup;logger.info('running %d mediasoup Workers...', numWorkers);for (let i = 0; i < numWorkers; ++i){     //服务启动后开始创建workerconst worker = await mediasoup.createWorker({logLevel   : config.mediasoup.workerSettings.logLevel,logTags    : config.mediasoup.workerSettings.logTags,rtcMinPort : Number(config.mediasoup.workerSettings.rtcMinPort),rtcMaxPort : Number(config.mediasoup.workerSettings.rtcMaxPort)});....省略}}

const worker = await mediasoup.createWorker   最后实际调用到MediaSoup库中的Woker.js

继续定位到 mediasoup-demo\server\node_modules\mediasoup\lib\Worker.js

class Worker extends EnhancedEventEmitter_1.EnhancedEventEmitter {/*** @private* @emits died - (error: Error)* @emits @success* @emits @failure - (error: Error)*/constructor({ logLevel, logTags, rtcMinPort, rtcMaxPort, dtlsCertificateFile, dtlsPrivateKeyFile, appData }) {super();...省略//启动核心文件 实际上新建woker即线程this._child = child_process_1.spawn(// commandspawnBin, // argsspawnArgs,   // options{env: {MEDIASOUP_VERSION: '3.4.11'},detached: false,// fd 0 (stdin)   : Just ignore it.// fd 1 (stdout)  : Pipe it for 3rd libraries that log their own stuff.// fd 2 (stderr)  : Same as stdout.// fd 3 (channel) : Producer Channel fd.// fd 4 (channel) : Consumer Channel fd.stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe']});this._pid = this._child.pid;// 创建的管道交给channel处理this._channel = new Channel_1.Channel({producerSocket: this._child.stdio[3],consumerSocket: this._child.stdio[4],pid: this._pid});this._appData = appData;let spawnDone = false;// Listen for 'ready' notification.this._channel.once(String(this._pid), (event) => {...省略});this._child.on('exit', (code, signal) => {...省略});this._child.on('error', (error) => {...省略});// Be ready for 3rd party worker libraries logging to stdout.this._child.stdout.on('data', (buffer) => {...省略});// In case of a worker bug, mediasoup will log to stderr.this._child.stderr.on('data', (buffer) => {...省略});}

首先对node child_process模块 spawn 的使用不了解的话可以搜索网上的资料,这里简单的介绍下

child_process 模块Api :  https://nodejs.org/api/child_process.html

//spawn 实际上是执行Linux系统下的命令

const ls = spawn('ls', ['-lh', '/usr']);

ls.stdout.on('data', (data) => { console.log(`输出:${data}`); });

ls.stderr.on('data', (data) => { console.log(`错误:${data}`); });

ls.on('close', (code) => { console.log(`子进程退出码:${code}`); });

最后实际执行的结果会回调在 stdout这个文件描述符  data的内容就是/usr 下的所有目录和文件打印。

  • spawn : 子进程中执行的是非node程序,提供一组参数后,执行的结果以流的形式返回。

默认情况下,Node.js 的父进程与衍生的子进程之间会建立 stdin、stdout 和 stderr 的管道

在源码使用spawn中,来详解一些参数,其中

    command  为 spawnBin 实际上是一个MediaSoup编译完的一个可执行库的路径

args 是一些执行时的参数,主要是一些 config.js中mediasoup节点下的值拼接成字符串作为参数 来启动核心文件。

    option 可以有很多选项,这边只列举了上述代码中使用到的,具体参数意义如下图所示

child_process.spawn(command[, args][, options])

  • command <string> The command to run.       //要运行的命令
  • args <string[]> List of string arguments.     //字符串参数列表
  • options <Object>

    • env <Object> Environment key-value pairs. Default: process.env.      //环境键值对。默认值:process.env。
    • stdio <Array> | <string> Child's stdio configuration (see options.stdio).   //options.stdio选项用于配置在父进程和子进程之间建立的管道。默认情况下,子进程的stdin、stdout和stderr被重定向到相应的subprocess.std
    • detached <boolean> Prepare child to run independently of its parent process. Specific behavior depends on the platform, see options.detached). //准备子进程独立于其父进程运行。具体行为取决于平台
  • Returns: <ChildProcess>

所以根据根据中 可知

producerSocket  ->   stdio[3]      consumerSocket -> stdio[4]

stdio[3] 用来发送数据给子进程,stdio[4] 用来接收数据。

  最后管道处理都交给了Channel 类来处理了。

this._channel = new Channel_1.Channel({producerSocket: this._child.stdio[3],consumerSocket: this._child.stdio[4],pid: this._pid});

到这里我们就知道具体管道建立的过程。

MediaSoup Channel的创建

JS首先组成JSON格式的命令最后将它转成字符串 通过channel通道传给C++端,C++有个接收管道接收到数据之后,再转成JSON,最后在解析成Request(c++类) 中的一些字段,根据Methodid去处理相对应的信令。

NodeJs和 C++ 管道通信的过程

上述已经对管道的创建过程大致的说明,当通过spwn创建子进程后管道其实就已经建立成功,这些管道最后交给Channel管理,

继续定位到 mediasoup-demo\server\node_modules\mediasoup\lib\Channel.js

根据上述管道了解知道,producerSocket   stdio[3] 用来发送数据给子进程,consumerSocket  stdio[4] 用来接收数据。

那么具体是如何发送或者接受呢?

那么先来看发送数据:

将数据写入管道 这里真正的是进行管道写入
             this._producerSocket.write(ns);

用异步方式去监听底层是否接受并处理信息,这里的确认结果和接收中的逻辑相匹配
           会通过this._sents.set(id, sent); sent的里的resolve 或者 pReject 返回

发送之后会保存在一个Map对象里,等待后续消息确认回来根据对应的id进行处理。

request(method, internal, data) {return __awaiter(this, void 0, void 0, function* () {...省略//将数据写入管道 这里真正的是进行管道写入this._producerSocket.write(ns);//用异步方式去监听底层是否接受并处理信息,这里的确认结果和接收中的逻辑相匹配//会通过this._sents.set(id, sent); sent的里的resolve 或者 pReject 返回return new Promise((pResolve, pReject) => {const timeout = 1000 * (15 + (0.1 * this._sents.size));const sent = {id: id,method: method,resolve: (data2) => {if (!this._sents.delete(id))return;clearTimeout(sent.timer);pResolve(data2);},reject: (error) => {if (!this._sents.delete(id))return;clearTimeout(sent.timer);pReject(error);},timer: setTimeout(() => {if (!this._sents.delete(id))return;pReject(new Error('Channel request timeout'));}, timeout),close: () => {clearTimeout(sent.timer);pReject(new errors_1.InvalidStateError('Channel closed'));}};// Add sent stuff to the map.this._sents.set(id, sent);});});}

在下面哪段代码中的Channel构造函数中,

this._consumerSocket.on('data', (buffer) =>   回调函数里监听或者接受数据,真正的处理有效数据其实在 this._processMessage(JSON.parse(nsPayload)); 函数中。

class Channel extends EnhancedEventEmitter_1.EnhancedEventEmitter {constructor({ producerSocket, consumerSocket, pid }) {...省略this._producerSocket = producerSocket;this._consumerSocket = consumerSocket;// Read Channel responses/notifications from the worker.//用于接受底层C++发来的信令数据this._consumerSocket.on('data', (buffer) => {...省略try {// We can receive JSON messages (Channel messages) or log strings.switch (nsPayload[0]) {// 123 = '{' (a Channel JSON messsage).case 123://真正的处理有效的信令数据this._processMessage(JSON.parse(nsPayload));break;// 68 = 'D' (a debug log).case 68:logger.debug(`[pid:${pid}] ${nsPayload.toString('utf8', 1)}`);break;// 87 = 'W' (a warn log).case 87:logger.warn(`[pid:${pid}] ${nsPayload.toString('utf8', 1)}`);break;// 69 = 'E' (an error log).case 69:logger.error(`[pid:${pid} ${nsPayload.toString('utf8', 1)}`);break;// 88 = 'X' (a dump log).case 88:// eslint-disable-next-line no-consoleconsole.log(nsPayload.toString('utf8', 1));break;default:....省略}

定位到  _processMessage(msg)

从下面哪段代码中可以看出,其中处理信令又种方式一种msg带id 一种不带

其原因是一种是  消息确认 和 事件通知 区别。

其中上层发送信令给底层会暂时保存起来消息确认需要携带id,上层才能通过id来确定是哪条信令完成。

如果是不带id,那么属于事件通知,最终会调用 this.emit(msg.targetId, msg.event, msg.data);  发送出去。

  _processMessage(msg) {if (msg.id) {...省略if (msg.accepted) {logger.debug('request succeeded [method:%s, id:%s]', sent.method, sent.id);//确定消息处理完成,并在信令 Map表里确认并回调sent.resolve(msg.data);}else if (msg.error) {switch (msg.error) {case 'TypeError':sent.reject(new TypeError(msg.reason));break;default:sent.reject(new Error(msg.reason));}}else {logger.error('received response is not accepted nor rejected [method:%s, id:%s]', sent.method, sent.id);}}// If a notification emit it to the corresponding entity.else if (msg.targetId && msg.event) {this.emit(msg.targetId, msg.event, msg.data);}// Otherwise unexpected message.else {logger.error('received message is not a response nor a notification');}}

MediaSoup 消息确认与事件通知

        消息的确认是指上层给mediasoup底层发送消息时,底层处理完要发送消息确认给上层处理结果。

事件通知是底层的一些操作导致状态变化要通知到到上层进行操作同步。简单初步的看下C++是如何执行消息确认与事件通知的。

返回信令确认消息给上层

...

Request->Accept(data); & Request->Accept();

...

给上层发送通知 Notifier

在main函数里初始化

....

Channel::Notifier::ClassInit(channel);

...

Channel::Notifier::Emit(this->id, "icestatechange", data);

...

无论是事件通知上层或者返回消息,两者都是通过管道传给上层

最终都调用channel->send()

小结

JS首先组成JSON格式的命令最后将它转成字符串 通过channel通道传给C++端,C++有个接收管道接收到数据之后,再转成JSON,最后在解析成Request (C++中)中的一些字段,根据Methodid去处理相对应的信令。处理完消息后再生成字符串的发送给上层去确认。 通知事件是由底层主动发起的通知。

因此整个通信架构基本清楚,通过管道进行进程间通讯,在Linux下也是非常高效常见的一种方式。

【流媒体服务器Mediasoup】 NodeJs与C++信令通信详解及Linux下管道通信的详解(五)相关推荐

  1. linux下TCP通信简单实例

    linux下TCP通信简单实例 基于TCP(面向连接)的socket编程,分为服务器端和客户端 服务器端的流程如下: (1)创建套接字(socket) (2)将套接字绑定到一个本地地址和端口上(bin ...

  2. Linux下进程通信的八种方法

    Linux下进程通信的八种方法:管道(pipe),命名管道(FIFO),内存映射(mapped memeory),消息队列(message queue),共享内存(shared memory),信号量 ...

  3. 【流媒体服务器Mediasoup】多人音视频架构、流媒体的比较、mediasoup介绍 (一)

    目录             前言 多人音视频架构 流媒体服务器的比较 Mediasoup流媒体服务器架构及特点 前言 WebRtc有两种含义,其一是Google开源的流媒体实时通讯客户端,主要运用于 ...

  4. 【流媒体服务器Mediasoup】环境部署与demo搭建(二)

    目录 前言 服务器环境 NodeJs的安装 下载Demo源码 配置服务端 部署及测试 可能会遇到的问题 前言 上篇文章对MediaSoup进行简单的介绍  [MediaSoup]多人音视频架构.流媒体 ...

  5. linux socket ip层配置,Linux下Socket通信(TCP实现)

    近期在做的项目中,涉及到了进程间数据传输,系统的原本实现是通过管道,但是原有的实现中两个进程是在同一台机器,而且两个进程的关系为父子关系,而我们要做的是将其中一个进程移植到服务器上,因此两个进程要分开 ...

  6. Linux 下socket通信终极指南(附TCP、UDP完整代码)

    linux下用socket通信,有TCP.UDP两种协议,网上的很多教程把两个混在了一起,或者只讲其中一种.现在我把自己这两天研究的成果汇总下来,写了一个完整的,适合初学者参考,也方便自己以后查阅. ...

  7. Linux下进程通信知识点学习笔记(一)

    4种主要事件导致进程创建: 系统的初始化: 执行了正在运行的进程所调用的进程创建系统调用: 用户请求创建一个进程: 一个批处理作业的初始化: 进程的终止: 正常退出: 出错退: 严重错误: 被其他进程 ...

  8. linux下串口通信程序,关于Linux下串口通信的一点心得

    1. 打开串口 与其他的关于设备编程的方法一样,在 Linux 下,操作.控制串口也是通过操作起设备文件进行的.在 Linux 下,串口的设备文件是 /dev/ttyS0 或 /dev/ttyS1 等 ...

  9. Linux下Socket通信中非阻塞connect、select、recv 和 recvfrom、send和sendto大致讲解,附带非租塞connect代码、MSG_NOSIGNAL

    linux中send函数MSG_NOSIGNAL异常消息 在服务器端用ctrl+c 来结束服务器接收进程来模拟服务器宕机的情况,结束服务 socket 进程之后,服务端自然关闭进程,可是 client ...

最新文章

  1. list_for_each引起的问题
  2. charshow技术预研
  3. python 用twisted 问题 zope.interface
  4. 程序员面试题精选100题(13)-第一个只出现一次的字符[算法]
  5. 【五校联考3day2】B
  6. Java 基础 - 面向对象(不错N多教程集合)
  7. 297. Serialize and Deserialize Binary Tree
  8. Angular6错误 Service: No provider for Renderer2
  9. 跟多导出数据库的方法
  10. C++字符串分割替换 ubuntu版本
  11. 第5堂:看到词句就会读-上
  12. python 笔记 之 练习答案-ABCD乘以9=DCBA
  13. R可视化:图片为背景的气泡地图
  14. MSDN Windows 10 21H1 64位19043原版系统
  15. 寒假刷刷算法题(13)
  16. 团队springboot基础镜像选择思考
  17. SparkLink星闪技术之SLB概述
  18. Tuxera NTFS2022Mac驱动完美支持NTFS硬盘读写
  19. PMAC应用六-前瞻
  20. Mysql断流_Hystrix 断流器

热门文章

  1. 独木舟上的旅行(船问题贪心)
  2. android相册幻灯片功能,玩机教程 篇四十五:「MIUI玩机技巧63」MIUI相册新增“幻灯片播放”功能...
  3. Java——批量更改图片像素(大小)
  4. Python哈希表的例子:dict、set
  5. 软件测试,软件测试练习题
  6. windows 2008 server 服务器远程桌面连接会话自动注销,在服务器上开掉的软件全部自动关闭的解决办法...
  7. ffmpeg图片+音频合成视频
  8. 怎样批量查询顺心捷达单号信息并分析是否延误
  9. kernel panic分析
  10. 在html中什么标签可以显示小方块,css列表前的小方块