彻底理解RxJS里面的Observable 、Observer 、Subject
最近闲来无事,常常重读Angular官方文档,颇能发现些有趣的地方。让我想起海澜之家的广告词:每次都有新体验。
Observable和Observer
关于RXJS的基础概念,observable
和observer
,我们好多次搞得头晕眼花。
其实,看下面这简简单单的一行代码就懂了它们的关系:
observable.subscribe(observer);
observable是数据源头,是生产者,是待订阅者,通过subscribe方法可以被订阅,而observer 是观察者,数据使用者,数据消费者。
这句代码返回的是一个订阅对象,代表着一个订阅的发生,一个订阅的过程,一个Subscription对象的实例化。
observer其实是一个有三个回调函数的对象,每个回调函数对应三种Observable 发送的通知类型(next
,error
,complete
)。回调函数不必每次都提供三个。如果我们只提供了一个回调函数作为参数,subscribe会将我们提供的函数参数作为next的回调处理函数。
observable.subscribe()
是一个Subscription对象。Subscription就是表示Observable 的执行,可以被清理。这个对象最常用的方法就是unsubscribe方法。同时,它还有 add方法可以使我们取消多个订阅。
我有一个比喻可以很好的理解这种订阅关系:现在有一家牛奶生产商,它们家的牛奶质优价廉,鲜美醉人。它在电视上发布广告,所有人都可以打它们的电话订奶。这个时候,牛奶商就是Observable,市民就是Observer。如果市民打电话(subscribe
)给牛奶商,它们就会在牛奶商送奶(next
)成功的时候收到牛奶,至于怎么喝就是自己的事情了,而市民是不关心牛奶是怎么生产和如何送过来的(比如数据库,HTTP过程的TCP/IP协议,握手过程等)。送奶过程可能会遇到意外导致送奶失败(error
),而成功之后,牛奶商会把这次送奶标记为已送达(complete
)。
这些都是基础中的基础,即使有时候我们被它搞得头晕眼花,但是我们也大概知道它们的意思。而下面这个Subject,周围很多人感觉都不太理解。
我们常常搞不清楚RXJS里面Subject的概念以及何时使用它。其实,在官方文档的最开始,简单的HERO教程里,早就有这么一段:
Subject其实是一个很有用,很优雅的东西。在RXJS官方文档,我们可以看到关于Subject的介绍:
后面还有这么一句话:Subjects 是将任意 Observable 执行共享给多个观察者的唯一方式。
这段官方文档的翻译显然有些生涩。其实,简单的说,Subject既是Observable,也是观察者(可以多个)。
我们知道,对于Observable来说,每个观察者有自己的订阅者的独立执行。什么意思呢?比如说,有一个人订奶,就要有一个送奶的过程发生;有一个Service里面的Get请求被订阅,就会发生一个Http请求。
这种模式在我们的某些情况下不太适合,比如说,输入框input change的过程,可能是毫秒之间,如果采用这种方式,我们需要创建多少个Subscription对象?或者牛奶商双十一做促销活动,突然一秒钟就有一个订单产生,我们如果还采用一个一个送,需要多少个送奶执行过程?
聪明的你应该想到了,那么一次送多份奶呗。 这就是RXJS官方文档说的:Subject 像是 Observable,但是可以多播给多个观察者。Subject 还像是 EventEmitters,维护着多个监听器的注册表。
再来看Hero教程里面的应用:
private searchTerms = new Subject<string>();
search(term: string): void { this.searchTerms.next(term);
}
searchTerms
现在就是一个源源不断的能发出输入框值的流。即可以不断的发出值,又可以使用subscribe源源不断的获得值。而这一切,通过Subject变得非常简单。
Subject还有三个比较常用的子类:ReplaySubject,AsyncSubject,BehaviorSubject
。它们代表什么意思?之间有什么区别呢?
Subject:
代码例子:
import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject } from 'rxjs';let subject1: Subject<number> = new Subject<number>();
subject1.next(100);
subject1.subscribe(res=>{console.log('SubjectA:'+res);})
subject1.subscribe(res=>{console.log('SubjectB:'+res);})
subject1.next(200);
subject1.next(300);
执行结果:
SubjectA:200SubjectB:200SubjectA:300SubjectB:300
可见,Subject只有在订阅之后,才能收到数据源发出的值。 subject1.next(100)的时候,还没有被订阅,因此不会打印结果。假如我们想在订阅者创建之后,无论什么时候都能拿到数据, 这应该怎么办呢? 下面的就派上用场了。
BehaviorSubject
代码例子:
let subject2: BehaviorSubject<number> = new BehaviorSubject<number>(0);
subject2.next(100);
subject2.subscribe(res=>{console.log('Behavior-SubjectA:'+res);
});
subject2.next(200);
subject2.subscribe(res=>{console.log('Behavior-SubjectB:'+res);
});
subject2.next(300);
执行结果:
Behavior-SubjectA:100Behavior-SubjectA:200Behavior-SubjectB:200Behavior-SubjectA:300Behavior-SubjectB:300
可见,BehaviorSubject会保存最新的发送数据,当被订阅时,会立即使用这个最新数据。 然后会继续接收新的值。Behavior-SubjectA发生订阅时候,当前值是100,所以会立即打印100,然后它收到了新发出的200;这个时候Behavior-SubjectB发生订阅了,它会立即打印subject2流的最新值200;然后A、B依次都接收到了300。
需要注意:BehaviorSubject必须设置默认值。因为它有一个最新值(当前值)的概念。那么我们如果想保存所有的数据,而不只是最新值,怎么办呢?
ReplaySubject
代码例子:
let subject3: ReplaySubject<number> = new ReplaySubject<number>();
subject3.next(100);
subject3.next(200);
subject3.subscribe(res=>{console.log('Replay-SubjectA:'+res);
});
subject3.next(300);
subject3.subscribe(res=>{console.log('Replay-SubjectB:'+res);
});
subject3.next(400);
打印结果:
Replay-SubjectA:100Replay-SubjectA:200Replay-SubjectA:300Replay-SubjectB:100Replay-SubjectB:200Replay-SubjectB:300Replay-SubjectA:400Replay-SubjectB:400
ReplaySubject会保存所有值,然后回放给新的订阅者。
这里Replay-SubjectA订阅的时候,数据流里面有100,200,之后接收300;这个时候新的订阅Replay-SubjectB发生了,数据流里面有100,200,300,所以它会依次打印出来这些已经保存的值;然后A和B依次收到了400。
我们可以理解为,这些数据流全都会被保存下来,当有新的订阅发生时,像放电影一样回放给订阅者。
AsyncSubject
AsyncSubject 只有当 Observable 执行完成时(执行complete()),它才会将执行的最后一个值发送给观察者。
也就是说,它只会保存流里的最后一条数据,而且只会在数据流complete时候才会发送。
let subject4: AsyncSubject<number> = new AsyncSubject<number>();
subject4.next(100);
subject4.subscribe(res => {console.log('Async-SubjectA:' + res);
});
subject4.next(200);
subject4.subscribe(res => {console.log('Async-SubjectB:' + res);
});
subject4.next(300);
subject4.subscribe(res => {console.log('Async-SubjectC:' + res);
});
subject4.complete();
subject4.next(400);
执行结果:
Async-SubjectA:300Async-SubjectB:300Async-SubjectC:300
可见, 数据流在complete之前,有100,200,300。最后一条是300。所有订阅者都只会收到最后一条300。而complete之后,自然不会再发送值了。
附加知识:
最后:其实Subject还有一个子类AnonymousSubject,只是这个子类很少有使用场景。本着好奇的研究心态,我们来看看它到底是什么东西呢?
有时候我们想当然认为使用create
可以实例化Subject,实际上,Subject.create() 会返回AnonymousSubject对象实例,而new Subject()会返回Subject对象实例。AnonymousSubject不会像一般的Subject一样订阅它自己。它会标记数据源,然后在订阅发生的时候,它会直接连接起数据源和观察者但是却不追踪我们创建的订阅的过程(即数据变化的过程)。
比如:
var timer$ = Rx.Observable.timer(1000, 2000);
var timerSubject = Rx.Subject.create(null, timer$);var subscription1 = timerSubject.subscribe(n => console.log(n));
var subscription2 = timerSubject.subscribe(n => console.log(n));setTimeout(() => timerSubject.unsubscribe(), 4000); //不生效
这里的subscription1实际上是直接订阅了timer$,所以它其实是不可unsubscribe的。
至于AnonymousSubject到底有什么应用场景,官方文档也没有介绍。
彻底理解RxJS里面的Observable 、Observer 、Subject相关推荐
- 通过一个最简单的例子,理解Angular rxjs里的Observable对象的pipe方法
源代码: import { of } from 'rxjs'; import { Injectable } from '@angular/core'; import { map } from 'rxj ...
- rxjs里的Observable对象和map配合的一个用法
源代码: import { of } from 'rxjs'; import { Injectable } from '@angular/core'; import { map } from 'rxj ...
- rxjs里的Observable对象subscribe方法的执行原理
看个例子: const myObservable = of(1, 2, 3);// 创建一个观察者对象-Observer(处理next.error.complete回调)const myObserve ...
- rxjs里的Observable对象如何消费
测试代码: import { of } from 'rxjs'; import { Injectable } from '@angular/core';@Injectable() export cla ...
- 你会用RxJS吗?【初识 RxJS中的Observable和Observer】
概念 RxJS是一个库,可以使用可观察队列来编写异步和基于事件的程序的库. RxJS 中管理和解决异步事件的几个关键点: Observable: 表示未来值或事件的可调用集合的概念. Observer ...
- [译] Subject 和 Observable + Observer 的混淆指北[ Android RxJava2 ] ( 这什么鬼系列 ) 第八话...
原文地址:Confusion between Subject and Observable + Observer [ Android RxJava2 ] ( What the hell is this ...
- Simple java里面的paint(Graphics g)的理解
以前写画图板时,总不理解什么时候要调用这个方法,老师说:你写了paint(g)方法后,在窗体改变时,就会调用,可是不太理解. 没写paint(g)的演示:画图板v1 1:绘制简单直线 2.执行最小化操 ...
- 理解Java集合框架里面的的transient关键字
2019独角兽企业重金招聘Python工程师标准>>> 在分析HashMap和ArrayList的源码时,我们会发现里面存储数据的数组都是用transient关键字修饰的,如下: H ...
- 如何理解 RxJS?RxJS的中文API和使用教程
如何理解 RxJS? 我先附上RxJS的中文教程地址方便大家去了解和使用 中文使用教程:http://rxjs-china.org/_book/ 官方中文文档:https://buctwbzs.git ...
- 通过 Marble Test 理解 RxJS
上篇文章介绍了一些 RxJS 的相关概念,本文通过学习 Marble Test 进一步的理解 RxJS. Marble Diagram 是理解 RxJS 的重要辅助工具,在 RxJS 的文档中有很多以 ...
最新文章
- Kali Linux攻防系统(三:在Kali Linux系统中配置安全测试浏览器及系统清理备份)
- 有了这个 IDEA的兄弟,你还用 Navicat 吗?全家桶不香吗?
- mysql for循环_基于Swoole扩展开发异步高性能的MySQL代理服务器
- Factory - 工厂模式
- Android 使用Toolbar+DrawerLayout快速实现仿“知乎APP”侧滑导航效果
- delphi 的 ORM 框架
- BZOJ1452 [JSOI2009]Count 树状数组
- linux deepin/ubuntu 搭nginx文件服务器配置
- java selenium用js点击_Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?...
- 北大清华的状元之争:一个典型的囚徒困境
- 线程池很难么?带你从头到尾捋一遍,不信你听不懂!
- 使用jQuery创建模态窗口登陆效果
- Linux 命令(107)—— systemctl 命令
- 50 家硅谷 IT 公司技术博客
- js如何在字符串里加变量
- 【R语言进行数据挖掘】决策树和随机森林
- c#Winform程序CPU占用高的原因和解决方法(转载)
- oracle报错imp报错00008,imp导入时遭遇IMP-00032,IMP-00008错误.
- 广播电台常用51首背景音乐~甘醇永久
- 河北专升本经验总结分享