我们知道 可读流是作为数据生产者,而可写流作为数据消费者。

那么二者必然是可以结合使用的。即可读流生产出来的数据给可写流消费。

我们这里使用文件可读流和文件可写流来模拟这种情况:

实现很简单,可读流对象通过data事件可以进入流动模式,还是源源不断地生产数据,而可写流对象通过write方法可以写入数据并消费。所以二者结合就实现了生产与消费地结合。

但是我们注意到:文件可读流的缓冲区水位线默认是64KB,而文件可写流的缓冲区水位线默认是16KB。为何官方要设计出这种差异呢?

其实这和磁盘文件的读取和写入速度有关,一般来说,从磁盘中读取文件的速度 要快于 向磁盘中写入文件。

举个例子:抄作业很快,写作业很慢。

而文件可读流其实就是从磁盘中读取文件数据,类似于抄作业,

文件可写流就是向磁盘中写入文件数据,类似于写作业,所以文件可读流速度快于文件可写流。

我再站在生产者,消费者角度思考:

即 生产者的生产速度,快于,消费者的消费速度。

而生产速度和消费速度的不匹配,就会产生一个严重的问题:供大于求。

在经济学上,这是要发生美国奶农倾倒牛奶的事件的。而这是人文社会所不愿意看到的,所以出现了政府干预市场,比如中国一直推行的减排政策,归根揭底还是减少不必要的生产活动。

而在流的世界里,也模拟出了这种减排政策,即背压机制。

即当消费者的消费速度赶不上生产者的生产速度时,就让生产者暂停生产,让消费者能缓一口气去消费。而等消费者消费完后,再通知生产者继续生产。

const fs = require('fs')
const path = require('path')const rs = fs.createReadStream(path.join(__dirname, 'test.txt'), {highWaterMark: 4  // 可读流缓冲区水位线默认64KB
})const ws = fs.createWriteStream(path.join(__dirname, 'test2.txt'), {highWaterMark: 1 // 可写流缓冲区水位线默认16KB
})rs.on('data', (chunk) => {let flag = ws.write(chunk)if (!flag) {rs.pause()}
})ws.on('drain', () => {rs.resume()
})

监听可读流的data事件,会触发可读流的流动模式,会源源不断地生产数据,生产出来的chunk,会被交给可写流write消费,但是如果write返回false,表示数据量已经超过了缓冲区水位线,如果继续生产下去,可能会造成写流缓冲区发生内存溢出,所以此时需要暂停生产,即rs.pause(),让可读流进入暂停模式。

而当可写流将缓冲区数据消费完了,就会触发drain事件,即表示生产者可以继续生产了,即rs.resume()。

这就是背压机制的基本逻辑。

以上对于背压机制的实现有点公式化,所以针对可读流内置了一个pipe方法,

其内在实现就是背压机制。

Readable.prototype.pipe = function(dest, pipeOpts) {const src = this;const state = this._readableState;if (state.pipes.length === 1) {if (!state.multiAwaitDrain) {state.multiAwaitDrain = true;state.awaitDrainWriters = new SafeSet(state.awaitDrainWriters ? [state.awaitDrainWriters] : []);}}state.pipes.push(dest);debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts);const doEnd = (!pipeOpts || pipeOpts.end !== false) &&dest !== process.stdout &&dest !== process.stderr;const endFn = doEnd ? onend : unpipe;if (state.endEmitted)process.nextTick(endFn);elsesrc.once('end', endFn);dest.on('unpipe', onunpipe);function onunpipe(readable, unpipeInfo) {debug('onunpipe');if (readable === src) {if (unpipeInfo && unpipeInfo.hasUnpiped === false) {unpipeInfo.hasUnpiped = true;cleanup();}}}function onend() {debug('onend');dest.end();}let ondrain;let cleanedUp = false;function cleanup() {debug('cleanup');// Cleanup event handlers once the pipe is broken.dest.removeListener('close', onclose);dest.removeListener('finish', onfinish);if (ondrain) {dest.removeListener('drain', ondrain);}dest.removeListener('error', onerror);dest.removeListener('unpipe', onunpipe);src.removeListener('end', onend);src.removeListener('end', unpipe);src.removeListener('data', ondata);cleanedUp = true;// If the reader is waiting for a drain event from this// specific writer, then it would cause it to never start// flowing again.// So, if this is awaiting a drain, then we just call it now.// If we don't know, then assume that we are waiting for one.if (ondrain && state.awaitDrainWriters &&(!dest._writableState || dest._writableState.needDrain))ondrain();}function pause() {// If the user unpiped during `dest.write()`, it is possible// to get stuck in a permanently paused state if that write// also returned false.// => Check whether `dest` is still a piping destination.if (!cleanedUp) {if (state.pipes.length === 1 && state.pipes[0] === dest) {debug('false write response, pause', 0);state.awaitDrainWriters = dest;state.multiAwaitDrain = false;} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {debug('false write response, pause', state.awaitDrainWriters.size);state.awaitDrainWriters.add(dest);}src.pause();}if (!ondrain) {// When the dest drains, it reduces the awaitDrain counter// on the source.  This would be more elegant with a .once()// handler in flow(), but adding and removing repeatedly is// too slow.ondrain = pipeOnDrain(src, dest);dest.on('drain', ondrain);}}src.on('data', ondata);function ondata(chunk) {debug('ondata');const ret = dest.write(chunk);debug('dest.write', ret);if (ret === false) {pause();}}// If the dest has an error, then stop piping into it.// However, don't suppress the throwing behavior for this.function onerror(er) {debug('onerror', er);unpipe();dest.removeListener('error', onerror);if (EE.listenerCount(dest, 'error') === 0) {const s = dest._writableState || dest._readableState;if (s && !s.errorEmitted) {// User incorrectly emitted 'error' directly on the stream.errorOrDestroy(dest, er);} else {dest.emit('error', er);}}}// Make sure our error handler is attached before userland ones.prependListener(dest, 'error', onerror);// Both close and finish should trigger unpipe, but only once.function onclose() {dest.removeListener('finish', onfinish);unpipe();}dest.once('close', onclose);function onfinish() {debug('onfinish');dest.removeListener('close', onclose);unpipe();}dest.once('finish', onfinish);function unpipe() {debug('unpipe');src.unpipe(dest);}// Tell the dest that it's being piped to.dest.emit('pipe', src);// Start the flow if it hasn't been started already.if (dest.writableNeedDrain === true) {if (state.flowing) {pause();}} else if (!state.flowing) {debug('pipe resume');src.resume();}return dest;
};

pipe方法可以实现协调数据生成和消费。我们通常将pipe形象的成为管道。

pipe方法的调用者一般是可读流对象,而pipe方法参数一般是可写流对象,当pipe方法调用时,就会触发可写流对象的pipe事件。

pipe事件的回调函数会收到调用pipe事件的可读流对象

pipe方法的返回值是pipe方法参数对应的可写流对象。

pipe的本质作用还是将上游数据传递到下游,pipe就是一个传输管道。

我们也可以通过unpipe方法来切断传输管道,即切断传输。这样就能终端pipe传输数据。

Node.js stream模块(三)背压机制相关推荐

  1. Node.js中模块加载机制

    Node.js中模块加载机制 模块查找规则-当模块拥有路径但没有后缀时 1. require方法根据模块路径查找模块,如果是完整路径,直接引入模块. 2. 如果模块后缀省略,先找同名JS文件再找同名J ...

  2. Node.js stream模块(一)可读流

    目录 fs.readFile的问题 如何设计出内存友好的,且人性化的数据生产与消费模式 stream模块 创建可读流对象 _read的作用 push的作用 可读流对象的缓冲区buffer 水位线hig ...

  3. Node.js:模块查找,引用及缓存机制

    1. Node.js的模块载入方式与机制 Node.js中模块可以通过文件路径或名字获取模块的引用.模块的引用会映射到一个js文件路径,除非它是一个Node内置模块.Node的内置模块公开了一些常用的 ...

  4. 模块加载及第三方包:Node.js模块化开发、系统模块、第三方模块、package.json文件、Node.js中模块的加载机制、开发环境与生产环境、cookie与session

    1.Node.js模块化开发 1.1 JavaScript开发弊端 JavaScript 在使用时存在两大问题,文件依赖和命名冲突. 1.2 软件中的模块化开发 一个功能就是一个模块,多个模块可以组成 ...

  5. Node.js Stream - 基础篇

    背景 在构建较复杂的系统时,通常将其拆解为功能独立的若干部分.这些部分的接口遵循一定的规范,通过某种方式相连,以共同完成较复杂的任务.譬如,shell通过管道|连接各部分,其输入输出的规范是文本流. ...

  6. JavaScript之后端Web服务器开发Node.JS基本模块学习篇

    JavaScript之后端Web服务器开发Node.JS基本模块学习篇 基本模块 fs文件系统模块 stream支持流模块 http crypto加密模块 基本模块 因为Node.js是运行在服务区端 ...

  7. node.js中模块_在Node.js中需要模块:您需要知道的一切

    node.js中模块 by Samer Buna 通过Samer Buna 在Node.js中需要模块:您需要知道的一切 (Requiring modules in Node.js: Everythi ...

  8. Node.js Web 模块

    Node.js Web 模块 什么是 Web 服务器? Web服务器一般指网站服务器,是指驻留于因特网上某种类型计算机的程序,Web服务器的基本功能就是提供Web信息浏览服务.它只需支持HTTP协议. ...

  9. Rust: 基于 napi-rs 开发 Node.js 原生模块

    Rust: 基于 napi-rs 开发 Node.js 原生模块 文章目录 Rust: 基于 napi-rs 开发 Node.js 原生模块 完整代码示例 背景 & napi 环境/工具链准备 ...

最新文章

  1. python 自己写个调试工具
  2. java学习笔记之条件语句(if...else)
  3. MFC图像点运算之灰度线性变化、灰度非线性变化、阈值化和均衡化处理
  4. windows 2008 R2远程桌面无法本地复制文件到远程解决
  5. rootfs 制作ubuntu_制作ubuntu rootfs
  6. java异常中的Error和Exception的区别是什么?
  7. 嵌入式系统开发之中断控制的实现
  8. java selector wakeup_Selector
  9. This project uses AndroidX dependencies, but the ‘android.useAndroidX‘ property is not enabled
  10. 工厂模式+抽象工厂模式
  11. pajek的net文件格式
  12. TWINCAT3导出html,【图解】TwinCAT 3学习之添加功能库
  13. PS更换证件照背景色,并去除人物像边线
  14. spring-boot-starter-data-elasticsearch 中测试查询语句报错failed to map source
  15. Vue.js :使用LODOP打印表格文件
  16. 雅思-我们遇到过的哪些熟悉又陌生的单词1
  17. 线程同步的几种实现方法
  18. 70. Climbing Stairs. Iter--Sol
  19. effective java之 builder模式
  20. oracle xe 安装配置,(转)oracle 10g xe 我的安装实践及简单配置过程

热门文章

  1. java虚拟机最新安卓版apk
  2. 【CF375C】Circling Round Treasures【XSY1176】大包子环绕宝藏【状压dp】
  3. 奉劝各位学弟学妹们,看看这篇阿里面经吧,不要无脑的向大厂投简历了!
  4. 使用fiddler 去分析视频网站
  5. MVC 音乐商店 第 7 部分: 会员资格和授权
  6. 如何用大数据软件确定 数码电子店铺选址
  7. 杂谈:用 Sublime Text 2 写 ActionScript3
  8. java二分排序法原理_Java常见排序算法详解—— 二分插入排序
  9. 主板开启网络唤醒_网络唤醒bios设置【应用方式】
  10. 初学Java常用设计模式之——工厂模式