文章目录

  • RxJS概述
  • Redux VS RxJS
  • RxJS核心概念解析
  • 热观察和冷观察
  • merge/combine合流
  • RXJS6 的变化

RxJS概述

RxJS 全称 Reactive Extensions for JavaScript

RxJS 结合了函数式编程、观察者模式(例如 DOM EventListener)、迭代器模式(例如 ES6 Iterater)

RxJS 官方是这样说的: Think of RxJS as Lodash for events. 把 RxJS 想像成针对 events 的 lodash

RxJS 本质是个工具库,处理的是事件,这里的 events,可以称之为流

那么流是指什么呢?举个例子,代码中每 1s 输出一个数字,用户每一次对元素的点击,就像是在时间这个维度上,产生了一个数据集。这个数据集不像数组那样,它不是一开始都存在的,而是随着时间的流逝,数据一个一个被输出。这种异步行为产生的数据,就可以被称之为一个流。在 RxJS 中,称之为 ovservalbe(抛开英文,本质其实就是一个数据的集合,只是这些数据不一定是一开始就设定好的,而是随着时间而不断产生的)

而 RxJS,就是为了处理这种流而产生的工具,比如流与流的合并、流的截断、延迟、消抖等

Redux VS RxJS

RxJS核心概念解析

Observable

它的本质其实就是一个随时间不断产生数据的一个集合,称之为流更容易理解。而其对象存在一个 subscribe 方法,调用该方法后,才会启动这个流(也就是数据才会开始产生),这里需要注意的是多次启动的每一个流都是独立的,互不干扰。

Observer

从行为上来看,无非就是定义了如何处理上述流产生的数据,称之为流的处理方法。

Subscribtion

它的本质就是暂存了一个启动后的流,之前提到,每一个启动后的流都是相互独立的,而这个启动后的流,就存储在 subscription 中,提供了 unsubscribe,来停止这个流。

简单理解了这三个名词 observable, observer, subscription 后,从数据的角度来思考:

observable 定义了要生成一个什么样的数据,其 subscribe 方法,接收一个 observer(定义了接收到数据如何处理),并开始产生数据。该方法的返回值 subscription, 存储了这个已经开启的流,同时具有 unscbscribe 方法,可以将这个流停止。

整理成这个公式:

Subscription = Observable.subscribe (observer)

observable: 随着时间产生的数据集合,可以理解为流,其 subscribe 方法可以启动该流

observer: 决定如何处理数据

subscription: 存储已经启动过的流,其 unsubscribe 方法可以停止该流

Subject

它是一个代理对象,既是一个 Observable 又是一个 Observer,它可以同时接受 Observable 发射出的数据,也可以向订阅了它的 observer 发射数据,同时,Subject 会对内部的 observers 清单进行多播 (multicast)

Subjects 是将任意 Observable 执行共享给多个观察者的唯一方式

Subject 有三个变体:

  • BehaviorSubject是一种在有新的订阅时会额外发出最近一次改变的值的 Subject,需要传入一个参数即初始值(如果没改变则发送初始值)
  • ReplaySubject会保存所有值,然后回放给新的订阅者,需要传入一个参数即初始值,用于控制重放值的数量(默认重放所有)
  • AsyncSubject只有当 Observable 执行完成时 (执行 complete()),才会将执行的最后一个值发送给观察者。如果因异常而终止,AsyncSubject 将不会释放任何数据,但是会向 Observer 传递一个异常通知。

Scheduler

用来控制并发并且是中央集权的调度员,允许在发生计算时进行协调,例如 setTimeoutrequestAnimationFrame 或其他。

调度器是一种数据结构。 它知道如何根据优先级或其他标准来存储任务和将任务进行排序。

调度器是执行上下文。 它表示在何时何地执行任务 (举例来说,立即的,或另一种回调函数机制 (比如 setTimeoutprocess.nextTick),或动画帧)。

调度器有一个 (虚拟的) 时钟。 调度器功能通过它的 getter 方法 now() 提供了 “时间” 的概念。在具体调度器上安排的任务将严格遵循该时钟所表示的时间。

Scheduler调度器能做四种调度:

  • queue:将每个下一个任务放在队列中,而不是立即执行。queue 延迟使用调度程序时,其行为与 async 调度程序相同。当没有延迟使用时,它将同步安排给定的任务 - 在安排好任务后立即执行。但是,当递归调用时(即在已调度的任务内部),将使用队列调度程序调度另一个任务,而不是立即执行,该任务将被放入队列并等待当前任务完成。
  • asap:内部基于 Promise 实现(Node 端采用 process.nextTick),会使用可用的最快的异步传输机制,如果不支持 Promiseprocess.nextTick 或者 Web WorkerMessageChannel 也可能会调用 setTimeout 方式进行调度。
  • async:与 asap 方式很像,只不过内部采用 setInterval 进行调度,大多用于基于时间的操作符。
  • animationFrame:内部基于 requestAnimationFrame 来实现调度,所以执行的时机将与 window.requestAnimationFrame 保持一致,适用于需要频繁渲染或操作动画的场景。

Operators

RxJS操作符非常多,在此只介绍几个常用的:

  • create

    createonSubscription 函数转化为一个实际的 Observable 。每当有人订阅该 Observable 的时候,onSubscription 函数会接收 Observer 实例作为唯一参数行。onSubscription 应该调用观察者对象的 next, errorcomplete 方法。

    const source = Rx.Observable.create(((observer: any) => {observer.next(1);observer.next(2);setTimeout(() => {observer.next(3);}, 1000)
    }))// 方式一
    source.subscribe({next(val) {console.log('A:' + val);}}
    );
    // 方式二
    source.subscribe((val) => console.log('B:' + val));// A:1
    // A:2
    // B:1
    // B:2
    //- 1s后:
    // A:3
    // B:3
    
  • from

    从一个数组、类数组对象、Promise、迭代器对象或者类 Observable 对象创建一个 Observable。该方法就有点像 js 中的 Array.from 方法(可以从一个类数组或者可迭代对象创建一个新的数组),只不过在 RxJS 中是转成一个 Observable 给使用者使用。

    const source = Rx.Observable.from([10, 20, 30]);
    source.subscribe(v => console.log(v));// 10
    // 20
    // 30
    
  • of

    from 的能力差不多,只不过在使用的时候是传入一个一个参数来调用的,有点类似于 js 中的 concat 方法。同样也会返回一个 Observable,它会依次将你传入的参数合并并将数据以同步的方式发出。

    const source = Rx.Observable.of(1, 2, 3);
    source.subscribe(v => console.log(v));// 1
    // 2
    // 3
    
  • debounceTime

    功能与 debounce 防抖函数差不多,只有在特定的一段时间经过后并且没有发出另一个源值,才从源 Observable 中发出一个值。

    假设一个数据源每隔一秒发送一个数,而我们使用了 debounceTime 操作符,并设置了延时时间,那么在数据源发送一个新数据之后,如果在延时时间内数据源又发送了一个新数据,这个新的数据就会被先缓存住不会发送,等待发送完数据之后并等待延时时间结束才会发送给订阅者。不仅如此,在延时时间未到的时候并且已有一个值在缓冲区,这个时候又收到一个新值,那么缓冲区就会把老的数据抛弃放入新的,然后重新等待延时时间到达然后将其发送。

    const source = Rx.Observable.interval(1000).take(3);
    const result = source.debounceTime(2000);
    result.subscribe(x => console.log(x));// 程序启动之后的前三秒没有数据打印,等到五秒到了之后,打印出一个2,接着就没有再打印了
    // 数据源会每秒依次发送三个数 0、1、2,由于我们设定了延时时间为 2 秒,那么也就是说,我们在数据发送完成之前都是不可能看到数据的,因为发送源的发送频率为 1 秒,延时时间却有两秒,也就是除非发送完,否则不可能满足发送源等待两秒再发送新数据,每次发完新数据之后要等两秒之后才会有打印,所以不论我们该数据源发送多少个数,最终订阅者收到的只有最后一个数。
    
  • take

    只发出源 Observable 最初发出的 N 个值 (N = count)

    这个操作符在前面出现了很多次了,还挺常见的,用于控制只获取特定数目的值,跟 interval 这种会持续发送数据的配合起来就能自主控制要多少个值了。

  • skip

    返回一个 Observable, 该 Observable 跳过源 Observable 发出的前 N 个值 (N = count)

    假设这个数据源发送 6 个值,可以使用 skip 操作符来跳过前多少个。

    const source = Rx.Observable.from([1, 2, 3, 2, 4, 3]);
    const result = source.skip(2);
    result.subscribe(x => console.log(x));// 打印结果为:3、2、4、3,跳过了前面两个数。
    
  • concat

    concatconcatAll效果是一样的,区别在于 concat 要传递参数,参数必须是 Observable

    concat 将多个 observable 串接起来,前一个完成好了再执行下一个。

    const source1 = interval(1000).pipe(take(3));
    const source2 = of(3);
    const source3 = of (4,5);
    const example = source1.pipe(concat(source2,source3))
    example.subscribe({next: value => {console.log(value);},error: err => {console.log("Error: " + err);},complete: () => {console.log("complete");}
    });// 0
    // 1
    // 2
    // 3
    // 4
    // 5
    // complete
    

热观察和冷观察

在 RxJS 中,有热观察和冷观察的概念。其中的区别:

  • Hot Observable:可以理解为现场直播,我们进场的时候只能看到即时的内容
  • Cold Observable:可以理解为点播(电影),我们打开的时候会从头播放

RxJS 中 Observable 默认为冷观察,而通过 publish()connect() 可以将冷的 Observable 转变成热的

let publisher$ = Rx.Observable.interval(1000).take(5).publish();publisher$.subscribe(data => console.log('subscriber from first minute',data),err => console.log(err),() => console.log('completed')
)setTimeout(() => {publisher$.subscribe(data => console.log('subscriber from 2nd minute', data),err => console.log(err),() => console.log('completed'))
}, 3000)publisher$.connect();
// 第一个订阅者输出的是0,1,2,3,4,而第二个输出的是3,4,此处为热观察

热观察和冷观察根据具体的场景可能会有不同的需要,而 Observable 提供的缓存能力也能解决不少业务场景。

例如,如果我们想要在拉群后,自动同步之前的聊天记录,通过冷观察就可以做到。

merge/combine合流

一般来说,合流有两种方式:

// merge
--1----2-----3--------4---
----a-----b----c---d------merge
--1-a--2--b--3-c---d--4---// combine
--1----2-----3--------4---
----a-----b-----c--d------combine
--1a-2a-2b-3b-3c-3d-4d--

merge 的合流方式可以用在聊天室、多人协作、公众号订阅就可以通过这样的方式合流,最终按照顺序地展示出对应的操作记录。

在 Excel 中,通过函数计算了 A1 和 B2 两个格子的相加。这种情况下可以使用 combine 合流:

const streamA1 = Rx.Observable.fromEvent(inputA1, "input"); // 监听 A1 单元格的 input 事件
const streamB2 = Rx.Observable.fromEvent(inputB2, "input"); // 监听 B2 单元格的 input 事件const subscribe = combineLatest(streamA1, streamB2).subscribe((valueA1, valueB2) => {// 从 streamA1 和 streamB2 中获取最新发出的值return valueA1 + valueB2;
});
// 获取函数计算结果
observable.subscribe((x) => console.log(x));

在一个较大型的前端应用中,通常会拆分成渲染层、数据层、网络层、其他服务等多个功能模块。

虽然服务按照功能结构进行拆分了,但依然会存在服务间调用导致依赖关系复杂、事件触发和监听满天飞等情况。这种情况下,只能通过全局搜索关键字来找到上下游数据流、信息流,通过节点和关键字搜索才能大概理清楚某个数据来源哪里。

如果使用了响应式编程,我们可以通过各种合流的方式、订阅分流的方式,来将应用中的数据流动串在一起。这样,我们可以很清晰地当前节点上的数据来自于哪里,是用户的操作还是来自网络请求。

RXJS6 的变化

RXJS6 改变了包的结构,主要变化在 import 方式和 operator 以及使用 pipe ()

Imports 方式改变

从 rxjs 中类似像导入 observable subject 等的不再进一步导入,而是止于 rxjs, rxjs6 在包的结构上进行了改变

operator 的改变

总而言之: 类似于创建之类的用的 API 都是从 rxjs 引入的,类似于 map 之类的操作都是从 rxjs/operators 引入的

pipeable observable

被重新命名的 API

在这里只是简单介绍 RxJS ,帮助新手快速了解 RxJS ,更详细的 RxJS 学习可以查看官方文档

RxJS——异步数据流的响应式编程库(适合新手入门)相关推荐

  1. Kotlin Flow响应式编程,基础知识入门

    Kotlin在推出多年之后已经变得非常普及了.相信现在至少有80%的Android项目已经在使用Kotlin开发,或者有部分功能使用Kotlin开发. 关于Kotlin方面的知识,我其实分享的文章并不 ...

  2. springboot异步注解_Spring Boot 2 :Spring Boot 中的响应式编程和 WebFlux 入门

    [小宅按]Spring 5.0 中发布了重量级组件 Webflux,拉起了响应式编程的规模使用序幕. WebFlux 使用的场景是异步非阻塞的,使用 Webflux 作为系统解决方案,在大多数场景下可 ...

  3. RxJava响应式编程学习笔记

    1.概述 RxJava是一个著名的开源库,是ReactiveX(Reactive Extensions)的一种java实现.ReactiveX是一种响应式扩展框架,有很多实现,如RxAndroid,R ...

  4. (转)Spring Boot 2 (十):Spring Boot 中的响应式编程和 WebFlux 入门

    http://www.ityouknow.com/springboot/2019/02/12/spring-boot-webflux.html Spring 5.0 中发布了重量级组件 Webflux ...

  5. response获取响应内容_Project Reactor 深度解析 - 1. 响应式编程介绍,实现以及现有问题

    现在, Java 的各种基于 Reactor 模型的响应式编程库或者框架越来越多了,像是 RxJava,Project Reactor,Vert.x 等等等等.在 Java 9, Java 也引入了自 ...

  6. ⒈响应式编程 Project Reactor 概述

    文章目录 What is Reactive Programming? Project Reactor & Reactive Programming 总览 Features Best pract ...

  7. Kotlin Flow响应式编程,操作符函数进阶

    本文同步发表于我的微信公众号,扫一扫文章底部的二维码或在微信搜索 郭霖 即可关注,每个工作日都有文章更新. 大家好,今天原创. 在上一篇原创文章当中,我跟大家说了会开启一个新的系列,讲一讲Kotlin ...

  8. 通过RxJS理解响应式编程

    什么时候响应式编程 一句话概括就是用异步数据流来编程 从某种程度上讲,一个点击事件就是一个异步事件流,我们可以注册监听然后再做一些其他的事情.正是这样我们就应该有一个工具包来创建,组合,过滤这些流.一 ...

  9. 【响应式编程的思维艺术】 (5)Angular中Rxjs的应用示例

    [摘要] Rxjs在angular中的基本应用 本文是[Rxjs 响应式编程-第四章 构建完整的Web应用程序]这篇文章的学习笔记. 示例代码托管在:http://www.github.com/das ...

最新文章

  1. 一位美女博士的人脸识别历程
  2. 一直出现 Enter passphrase for key '/root/.ssh/gitkey12.pub'
  3. 2020年11月3日 星期二 工作日志 github登陆不了了,晕
  4. 【英语学习】 - 君若不离不弃,吾当生死相依
  5. Rust之控制流,条件语句,模式匹配
  6. python 替换字符串内容_python如何替换字符串的内容
  7. Dev-C++ MinGW wxWindows2.6.2 开发环境的配置
  8. 锐捷交换机配置保存到计算机,锐捷交换机常用配置命令汇总
  9. 学习Struts2框架笔记-第2天
  10. 微信公众号迁移公证书办理流程
  11. 【GNSS】GNSS数据下载工具
  12. EXCEL单元格内怎么换行?Alt+Enter
  13. 免费Bootstrap后台管理模板
  14. springboot项目导出excel 合并单元格表格
  15. 【SVN】——svn协议和http协议
  16. linux alsa驱动讲解
  17. qq电脑登录二维码加载失败怎么回事(解决办法)
  18. 影视感悟专题---2、《大染坊》
  19. 世界第一虚拟乐队举办了一场 AR 音乐会 #Gorillaz
  20. 项目管理方法:时间进度管理的几个要点

热门文章

  1. 复盘618:数字狂欢背后透露了哪些隐藏信号?
  2. 数学规划模型(二):线性规划模型
  3. DC-DC电源市场现状及未来发展趋势分析
  4. 1000道最新高频Java面试题,覆盖25个技术栈(多线程、JVM、高并发、spring、微服务、kafka,redis、分布式)从底层原理到架构!
  5. Server2012 下 部署ADFS IFD
  6. 为实施了IFD的Dynamics 365更换自签名的SSL证书以符合Chrome的要求
  7. DEM数据获取、数据镶嵌、掩膜提取
  8. 无法加载SQLite.Interop.dll:找不到指定模块
  9. 服务器性能监控之New Relic 入门教程
  10. 对话混沌创新商学院6期校友:“朋克养生”背后的产业密码