一.node.js中的流是什么

stream(流)是Node.js提供的又一个仅在服务区端可用的模块,流是一种抽象的数据结构。Stream 是一个抽象接口,Node 中有很多对象实现了这个接口。例如,对http 服务器发起请求的request 对象就是一个 Stream,还有stdout(标准输出流)。
       顾名思义,流的意思就是数据的流动,就好比停水了,楼上的人存了一些水,楼下的人发请求想借楼上的人一桶水,直接搬费力又麻烦,直接往下倒容易倒到地上,于是可以用一根管子连接两个桶(A和B),楼上的人直接把A桶里的水通过管子流到楼下的B桶里,这就类似于request对象向服务器发请求要资源,在这request 请求资源的传播方式通过流来实现。
       再举个例子,可以把数据看成是数据流,如果我们在键盘上打字,电脑程序把字符一个一个输出到屏幕上,这也可以看成是一个流,这个可以叫做标准输出(stdout)。

二.为什么要在node中使用流

看了前面稍微了解node的同学可能就要问了,流的作用不就是传递数据麽,也就是把一个地方数据拷贝到另一个地方,不用流也可以这样实现:

var water = fs.readFileSync('a.txt', {encoding: 'utf8'});
fs.writeFileSync('b.txt', water);
复制代码

是的,只要使用node的读写文件的功能就能实现上面借水的效果,但这样做有个致命问题:

  • 处理数据量较大的文件时不能分块处理,导致速度慢,内存容易爆满。

使用读写方式是把文件内容全部读入内存,然后再写入文件,对于小型的文本文件问题不大,但是遇到较大的比如音频、视频文件,动辄几个GB大小实在承受不住,而流可以把文件资源拆分成小块,一块一块的运输,资源就像水流一样进行传输,使用流的话上述功能可以这样写:

var fs = require('fs');
var readStream = fs.createReadStream('a.mp4'); // 创建可读流
var writeStream = fs.createWriteStream('b.mp4'); // 创建可写流readStream.on('data', function(chunk) { // 当有数据流出时,写入数据writeStream.write(chunk);
});readStream.on('end', function() { // 当没有数据时,关闭数据流writeStream.end();
});
复制代码
  • 但这样写还是有一些问题的,如果说写入的速度跟不上读取的速度,有可能导致数据丢失。正常的情况应该是,写完一段,再读取下一段,如果没有写完的话,就让读取流先暂停,等写完再继续,所以为了让可读流和可写流速度一致,就要用到流中必不可少的属性pipe了,pipe翻译过来意思是管道,顾名思义,就想上面的倒水一样,如果不用一根管子相连,A桶倒进B桶的水不会均速传输,可能会导致水的浪费,用pipe可以这样解决上述问题:
fs.createReadStream('a.mp4').pipe(fs.createWriteStream('b.mp4));
// pipe自动调用了data,end等事件
复制代码
  • 需要特别注意的是,pipe()只是可读流的方法,也就是说只能从可读流中通过pipe方法拷贝数据到可写流,反之则不行,写的时候要注意顺序。

三.流的四种类型

Stream提供了以下四种类型的流:

  1. Readable 可读流
  2. Writable 可写流
  3. Duplex 可读可写流
  4. Transform 在读写过程中可以修改和变换数据的Duplex流

1.Readable
可读流有五个参数:

  • highWaterMark 缓存区字节大小,默认16384
  • encoding 字符编码,默认为null,就是buffer
  • objectMode 是否操作js其他类型 默认false
  • read 对内部的_read()方式实现 子类实现,父类调用
  • destroy 对内部的_ destroy()方法实现 子类实现,父类调用

可读流中分为2种模式流动模式和暂停模式。

1、流动模式:可读流自动读取数据,通过EventEmitter接口的事件尽快将数据提供给应用。
2、暂停模式:必须显式调用stream.read()方法来从流中读取数据片段。

暂停模式切换到流动模式i:

1、监听“data”事件
2、调用 stream.resume()方法
3、调用 stream.pipe()方法将数据发送到可写流

流动模式切换到暂停模式:

1、如果不存在管道目标,调用stream.pause()方法
2、如果存在管道目标,调用 stream.unpipe()并取消'data'事件监听
可读流事件:'data','readable','error','close','end'

监听data事件,触发流动模式

const { Readable } = require('stream');let i = 0;const rs = Readable({encoding: 'utf8',// 这里传入的read方法,会被写入_read()read: (size) => {// size 为highWaterMark大小// 在这个方法里面实现获取数据,读取到数据调用rs.push([data]),如果没有数据了,push(null)结束流if (i < 6) {rs.push(`当前读取数据: ${i++}`);} else {rs.push(null);}},// 源代码,可覆盖destroy(err, cb) {rs.push(null);cb(err);}});rs.on('data', (data) => {console.log(data);// 每次push数据则触发data事件
复制代码

监听readable事件,触发暂停模式,当流有了新数据或到了流结束之前触发readable事件,需要显示调用read([size])读取数据:

    const { Readable } = require('stream');let i = 0;const rs = Readable({encoding: 'utf8',highWaterMark: 9,// 这里传入的read方法,会被写入_read()read: (size) => {// size 为highWaterMark大小// 在这个方法里面实现获取数据,读取到数据调用rs.push([data]),如果没有数据了,push(null)结束流if (i < 10) {// push其实是把数据放入缓存区rs.push(`当前读取数据: ${i++}`);} else {rs.push(null);}}});rs.on('readable', () => {const data = rs.read(9);console.log(data);// })
复制代码

2. Writable
可写流有以下参数:

highWaterMark 缓存区字节大小,默认16384 decodeStrings 是否将字符编码传入缓冲区 objectMode 是否操作js其他类型 默认false write 子类实现,供父类调用 实现写入底层数据 writev 子类实现,供父类调用 一次处理多个chunk写入底层数据 destroy 可以覆盖父类方法,不能直接调用,销毁流时,父类调用 final 完成写入所有数据时父类触发

const Writable = require('stream').Writableconst writable = Writable()
// 实现`_write`方法
// 这是将数据写入底层的逻辑
writable._write = function (data, enc, next) {// 将流中的数据写入底层process.stdout.write(data.toString().toUpperCase())// 写入完成时,调用`next()`方法通知流传入下一个数据process.nextTick(next)
}// 所有数据均已写入底层
writable.on('finish', () => process.stdout.write('DONE'))// 将一个数据写入流中
writable.write('a' + '\n')
writable.write('b' + '\n')
writable.write('c' + '\n')// 再无数据写入流时,需要调用`end`方法
writable.end()
复制代码

3. Duplex
Duplex为读写流,既可当成可读流来使用,也可当成可写流来使用,实际上就是继承了Readable和Writable的一类流。所以,Duplex拥有Writable和Readable所有方法和事件,但各自独立缓存区,一个Duplex对象可以同时实现_read()和_write()方法。

var Duplex = require('stream').Duplexvar duplex = Duplex()// 可读端底层读取逻辑
duplex._read = function () {this._readNum = this._readNum || 0if (this._readNum > 1) {this.push(null)} else {this.push('' + (this._readNum++))}
}// 可写端底层写逻辑
duplex._write = function (buf, enc, next) {// a, bprocess.stdout.write('_write ' + buf.toString() + '\n')next()
}// 0, 1
duplex.on('data', data => console.log('ondata', data.toString()))duplex.write('a')
duplex.write('b')duplex.end()
复制代码

4. Transform
Tranform为转换流,它继承自Duplex,并已经实现了_read和_write方法,同时要求用户实现一个_transform方法,从一个地方读取数据,转换数据后输出到一个地方。

    const stream = require('stream');const transform = stream.Transform({transform(chunk, encoding, cb){// 把数据转换成小写字母,然后push到缓存区this.push(chunk.toString().toLowerCase());cb();}});transform.write('D');console.log(transform.read(1).toString()); // d
复制代码

四.stream中的pipe

前面已经说过,pipe的作用是在流中搭建一条管道,从可读流中到可写流,目的是实现读取和写入步调一致,边读边写。

   const stream = require('stream');const readStream = stream.Readable({read() {this.push(fs.readFileSync('a.txt')); // 文件内容 testthis.push(null);}});const writeStream = stream.Writable({write(chunk, encoding, cb) {// chunk为test bufferfs.writeFileSync('b.txt', chunk.toString());cb();}});writeStream.on('pipe', data => {// 触发pipe事件console.log(data);});readStream.pipe(writeStream);
复制代码

Node.js Stream(流) 简单易懂全解析相关推荐

  1. node.js Stream(流) 和 EJS 模板引擎——0822

    一.node.js 中的 Stream(流) 1.什么是 Stream ? Stream 是一个抽象接口,Node 中有很多对象实现了这个接口.例如,对http服务器发起请求的request 对象就是 ...

  2. 流线动态图python_Node.js Stream(流)

    Node.js Stream(流) Stream 是一个抽象接口,Node 中有很多对象实现了这个接口.例如,对http 服务器发起请求的request 对象就是一个 Stream,还有stdout( ...

  3. Node.js: 认识流stream

    流是Node.js中一个非常重要的概念, 也是Node.js之所以适用于I/O密集型场景的重要原因之一. 流是Node.js移动数据的方式,流可以是可读的和/或可写的.在Node.js中很多模块都使用 ...

  4. 如何用 Node.js 实现一个简单的 Websocket 服务?

    最近正在研究 Websocket 相关的知识,想着如何能自己实现 Websocket 协议.到网上搜罗了一番资料后用 Node.js 实现该协议,倒也没有想象中那么复杂,除去注释语句和 console ...

  5. Node.js Stream - 基础篇

    背景 在构建较复杂的系统时,通常将其拆解为功能独立的若干部分.这些部分的接口遵循一定的规范,通过某种方式相连,以共同完成较复杂的任务.譬如,shell通过管道|连接各部分,其输入输出的规范是文本流. ...

  6. 【转】JS回调函数--简单易懂有实例

    JS回调函数--简单易懂有实例 初学js的时候,被回调函数搞得很晕,现在回过头来总结一下什么是回调函数. 我们先来看看回调的英文定义:A callback is a function that is ...

  7. 一、node.js搭建最简单的服务器

    node.js搭建最简单的服务器 代码演示: // 1. 加载http核心模块 var http = require('http')// 2. 使用http.createServer()方法创建一个W ...

  8. java文件边读边写_[Java教程]node.js 利用流实现读写同步,边读边写

    [Java教程]node.js 利用流实现读写同步,边读边写 0 2017-09-10 13:00:14 //10个数 10个字节,每次读4b,写1blet fs=require("fs&q ...

  9. Node.js搭建一个简单的服务器

    文章目录 Node.js的安装 了解Node.js模块系统 服务器的搭建 一.创建一个Web服务器 注意 程序代码 运行 二.静态资源托管 静态资源 注意 程序代码 运行 三.简单接口的实现(简单服务 ...

最新文章

  1. 12-1054. 求平均值
  2. 升腾联手VMware 发布首款本土化桌面虚拟化
  3. python怎样创建桌面快捷方式_python创建桌面快捷方式的代码
  4. 信用评分卡 (part 2of 7)
  5. 视频加密技术的实与破解
  6. Android NFC识别CPU卡和m1卡
  7. 使用gcov和lcov测试代码覆盖率
  8. 新书上市|豆瓣8.6,首部全面披露中国游戏发展史的奇书!
  9. 怎么搭建自己的内测分发平台?
  10. 查看服务器ip修改记录,如果查看服务器ip地址和修改ip
  11. UEFI是什么?与BIOS的区别在哪?
  12. ROS2利用cartographer算法进行激光建图
  13. 99. Recover Binary Search Tree(恢复二叉搜索树)
  14. Jenkins流水线打包微服务构建docker镜像运行
  15. CVPR 2022 | 惊呆了!只用一张图+相机走位,AI就能脑补周围环境!
  16. CCTrans: Simplifying and Improving Crowd Counting with Transformer
  17. 一、RISC-V SoC内核——取指 代码讲解
  18. Java300集速学堂第四章作业答案
  19. SAP FICO全解析之-定义国家代码
  20. winzip vs winrar

热门文章

  1. mysql5.5乱码问题_如何解决MySQL5.5的中文乱码问题
  2. python dataframe列数值相加,python合并dataframe中的行并将值相加
  3. 格灵深瞳开盘破发,市值73亿,创始人曾是谷歌眼镜创始成员
  4. Adobe奇葩续费机制被网友狂喷:一不留神就扣2500,按月付费还随时取订?长点心吧...
  5. 橡鹭科技获源码资本亿元融资,美团原核心高管加盟,发力服务机器人
  6. 300多工程师死磕2年,vivo终于发布自研芯片V1,降低功耗50%全片上储存
  7. 微软旷视人脸识别100%失灵!北京十一学校校友新研究「隐身衣」,帮你保护照片隐私数据...
  8. 一场实验室意外爆炸事故,解决了58年量子难题,让科学家意外发现“核电共振”...
  9. java:方法覆盖与方法重载
  10. 对网页是否为当前展示标签页、是否最小化、以及是否后台运行进行监听