What is RxJS?

RxJS是ReactiveX编程理念的JavaScript版本。ReactiveX是一种针对异步数据流的编程。简单来说,它将一切数据,包括HTTP请求,DOM事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你能以同步编程的方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能

下面废话不说, 直接切入正题.

准备项目

我使用typescript来介绍rxjs. 因为我主要是在angular项目里面用ts.

全局安装typescript:

npm install -g typescript

全局安装ts-node:

npm install -g ts-node

建立一个文件夹learn-rxjs, 进入并执行:

npm init

安装rxjs:

npm install rxjs --save

RxJS的主要成员

  • Observable: 一系列值的生产者
  • Observer: 它是observable值的消费者
  • Subscriber: 连接observer和observable
  • Operator: 可以在数据流的途中对值进行转换的操作符
  • Subject: 既包括Observable也包括Observer

Observable, Observer, Subscriber的角色关系:

工厂生产杂志, 邮递员去送杂志, 就相当于是Observable, 邮递员给你带来了啥? 带来了杂志, 然后(next)杂志, next杂志.....

把杂志带给了谁? 看看这对夫妇, 可能是丈夫来付账单订杂志, 他就是Subscriber. 而这本女性杂志肯定不是丈夫来看(如果他是正经丈夫的话), 而妻子没有直接去订阅杂志, 但是她看这本杂志有用(知道怎么去用它).

所以可以这样理解, 丈夫(Subscriber)把Observable和Observer联系到了一起, 就是Subscriber为Observable提供了一个Observer(丈夫订杂志, 告诉快递员把货给他媳妇就行).

Observable可以在Observer上调用三种方法(快递员跟他妻子可能会有三种情况...好像这么说不太恰当), 当Observable把数据(杂志)传递过来的时候, 这三种情况是:

  • next(), 这期杂志送完了, 等待下一期吧
  • error(), 送杂志的时候出现问题了, 没送到.
  • complete(), 订的杂志都处理完了, 以后不送了.

下面这个图讲的就是从Observable订阅消息, 并且在Observer里面处理它们:

Observable允许:

  • 订阅/取消订阅它的数据流
  • 发送下一个值给Observer
  • 告诉Observer发生了错误以及错误的信息
  • 告诉Observer整个流结束了.

Observer可以提供:

  • 一个可以处理流(stream)上的next的值的function
  • 处理错误的function
  • 处理流结束的function

创建Observable

  • Observable.from(), 把数组或iterable对象转换成Observable
  • Observable.create(), 返回一个可以在Observer上调用方法的Observable.
  • Observable.fromEvent(), 把event转换成Observable.
  • Observable.fromPromise(), 把Promise转换成Observable.
  • Observable.range(), 在指定范围内返回一串数.

Observable.from()

observable_from.ts:

import { Observable } from "rxjs/Observable"; // 这里没有使用Rx对象而是直接使用其下面的Observable对象, 因为Rx里面很多的功能都用不上.
import 'rxjs/add/observable/from'; // 这里我需要使用from 操纵符(operator)let persons = [{ name: 'Dave', age: 34, salary: 2000 },{ name: 'Nick', age: 37, salary: 32000 },{ name: 'Howie', age: 40, salary: 26000 },{ name: 'Brian', age: 40, salary: 30000 },{ name: 'Kevin', age: 47, salary: 24000 },
];let index = 1;
Observable.from(persons).subscribe(person => {console.log(index++, person);},err => console.log(err),() => console.log("Streaming is over."));

subscribe里面有3个function, 这3个function就是Observer.

第一个function是指当前这个person到来的时候需要做什么;

第二个是错误发生的时候做什么;

第三个function就是流都走完的时候做什么.

注意, 是当执行到.subscribe()的时候, Observable才开始推送数据.

运行这个例子需要执行下面的命令:

ts-node observable_from.ts

Observable.create()

Observable.create是Observable构造函数的一个别名而已. 它只有一个参数就是subscribe function.

observable_creates.ts:

import { Observable } from "rxjs/Observable";function getData() {let persons = [{ name: 'Dave', age: 34, salary: 2000 },{ name: 'Nick', age: 37, salary: 32000 },{ name: 'Howie', age: 40, salary: 26000 },{ name: 'Brian', age: 40, salary: 30000 },{ name: 'Kevin', age: 47, salary: 24000 },];return Observable.create(observer => { // 这部分就是subscribe functionpersons.forEach(p => observer.next(p));observer.complete();});
}getData().subscribe(person => console.log(person.name),err => console.error(err),() => console.log("Streaming is over."));

create里面的部分是subscribe function. 这部分可以理解为, 每当有人订阅这个Observable的时候, Observable会为他提供一个Observer.

在这里面, observer使用next方法对person进行推送. 当循环结束的时候, 使用complete()方法通知Observable流结束了.

尽管getDate里面create了Observable, 但是整个数据流动并不是在这时就开始的. 在这个地方, 这只不过是个声明而已.

只有当有人去订阅这个Observable的时候, 整个数据流才会流动.

运行该文件:

RxJS Operator(操作符)

Operator是一个function, 它有一个输入, 还有一个输出. 这个function输入是Observable输出也是Observable.

在function里面, 可以做一些转换的动作

下面是几个例子:

observablePersons.filter(p => p.age > 40);

这个filter function和数组的filter类似, 它接受另一个function(也可以叫predicate)作为参数, 这个function提供了某种标准, 通过这个标准可以判定是否当前的元素可以被送到订阅者那里.

p => p.age > 40

这个function, 是pure function, 在functional programming(函数式编程)里面, pure function是这样定义的: 如果参数是一样的, 无论外界环境怎么变化, 它的结果肯定是一样的.

pure function不与外界打交道, 不保存到数据库, 不会存储文件, 不依赖于时间....

而这个filter function呢, 在函数式编程里面是一个high order function.

什么是High order function?

如果一个function的参数可以是另一个function, 或者它可以返回另一个function, 那么它就是High Order function.

Marble 图

首先记住这个网址: http://rxmarbles.com/

有时候您可以通过文档查看operator的功能, 有时候文档不是很好理解, 这时你可以参考一下marble 图.

例如 map:

可以看到map接受一个function作为参数, 通过该function可以把每个元素按照function的逻辑进行转换.

例如 filter:

filter就是按条件过滤, 只让合格的元素通过.

例 debounceTime (恢复时间):

如果该元素后10毫秒内, 没有出现其它元素, 那么该元素就可以通过.

例 reduce:

这个也和数组的reduce是一个意思.

例子

import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/from';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/reduce';let persons = [{ name: 'Dave', age: 34, salary: 2000 },{ name: 'Nick', age: 37, salary: 32000 },{ name: 'Howie', age: 40, salary: 26000 },{ name: 'Brian', age: 40, salary: 30000 },{ name: 'Kevin', age: 47, salary: 24000 },
];Observable.from(persons).map(p => p.salary).reduce((total, current) => total+ current, 0).subscribe(totalSalary => console.log(`Total salary is ${totalSalary}`),err => console.log(err));

这个例子非常的简单, 典型的map-reduce, 就不讲了.

结果如下:

用现实世界中炼钢生产流程的例子来解释使用Operator来进行Reactive数据流处理的过程:

原料(矿石)整个过程中会经过很多个工作站, 这里每个工作站都可以看作是RxJS的operator, 原料经过这些operator之后, 成品就被生产了出来.

每个工作站(operator)都是可以被组合使用的, 所以可以再加几个工作站也行.

错误处理

Observable是会发生错误的, 如果错误被发送到了Observer的话, 整个流就结束了.

但是做Reactive编程的话, 有一个原则: Reactive的程序应该很有弹性/韧性.

也就是说, 即使错误发生了, 程序也应该继续运行.

但是如果error function在Observer被调用了的话, 那就太晚了, 这样流就停止了.

那么如何在error到达Observer之前对其进行拦截, 以便流可以继续走下去或者说这个流停止了,然后另外一个流替它继续走下去?

错误处理的Operators:

  • error() 被Observable在Observer上调用
  • catch() 在subscriber里并且在oserver得到它(错误)之前拦截错误,
  • retry(n) 立即重试最多n次
  • retryWhen(fn) 按照参数function的预定逻辑进行重试

使用catch()进行错误处理:

observable_catch.ts:

import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/from';
import 'rxjs/add/operator/catch';
import 'rxjs/add/operator/map';function getFromGoogle(): Observable<any> {return Observable.create(function subscribe(observer) {observer.next('https://google.com');observer.error({message: 'Google can\'t be reached.',status: 404,});observer.complete();});
}function getFromBing(): Observable<any> {return Observable.create(function subscribe(observer) {observer.next('https://global.bing.com');observer.complete();});
}function getFromBaidu(): Observable<any> {return Observable.create(function subscribe(observer) {observer.next('https://www.baidu.com');observer.complete();});
}getFromGoogle().catch(err => {console.error(`Error: ${err.status}: ${err.message}`);if(err.status === 404) {return getFromBaidu();} else {return getFromBing();}}).map(x => `The site is : ${x}`).subscribe(x => console.log('Subscriber got: ' + x),err => console.error(err),() => console.log('The stream is over.'));

在subscribe之前调用catch, catch里可以进行流的替换动作.

运行结果如下:

相当于:

Hot 和 Cold Observable

Cold: Observable可以为每个Subscriber创建新的数据生产者

Hot: 每个Subscriber从订阅的时候开始在同一个数据生产者那里共享其余的数据.

从原理来说是这样的: Cold内部会创建一个新的数据生产者, 而Hot则会一直使用外部的数据生产者.

举个例子:

Cold: 就相当于我在腾讯视频买体育视频会员, 可以从头看里面的足球比赛.

Hot: 就相当于看足球比赛的现场直播, 如果来晚了, 那么前面就看不到了.

Share Operator

share() 操作符允许多个订阅者共享同一个Observable. 也就是把Cold变成Hot.

例子 observable_share.ts:

import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/take';
import 'rxjs/add/operator/share';const numbers = Observable.interval(1000).take(5).share();function subscribeToNumbers(name) {numbers.subscribe(x => console.log(`${name}: ${x}`));
}subscribeToNumbers('Dave');const anotherSubscription = () => subscribeToNumbers('Nick');setTimeout(anotherSubscription, 2500);

这里interval是每隔1秒产生一个数据, take(5)表示取5个数, 也就是1,2,3,4,5.

然后share()就把这个observable从cold变成了hot的.

后边Dave进行了订阅.

2.5秒以后, Nick进行了订阅.

最后结果是:

转自https://www.cnblogs.com/cgzl/p/8641738.html

RxJS速成 (上)相关推荐

  1. 《深入浅出RxJS》读书笔记

    rxjs的引入 // 如果以这种方式导入rxjs,那么整个库都会导入,我们一般不可能在项目中运用到rxjs的所有功能 const Rx = require('rxjs'); 解决这个问题,可以使用深链 ...

  2. 关于Mongodb的全面总结,学习mongodb的人,可以从这里开始!

    转载地址:http://blog.csdn.net/he90227/article/details/45674513 原文地址:http://blog.csdn.NET/jakenson/articl ...

  3. 芬兰开放“线上AI速成班”课程,全球网民均可免费观看

    出品 | AI科技大本营(ID:rgznai100) 去年,芬兰推出了一个免费的"人工智能线上速成班"项目,目的是向该国民众教授与新技术有关的知识.现在,作为送给全世界的圣诞节礼物 ...

  4. Python零基础速成班-第9讲-Python面向对象编程(上),对象和类、初始化、继承、重写、多态、类方法、组合

    Python零基础速成班-第9讲-Python面向对象编程(上),对象和类.初始化.继承.重写.多态.类方法.组合 学习目标 修饰器 面向对象编程:对象和类.初始化.继承.重写.多态.类方法.组合 课 ...

  5. 大学期末不挂科速成课-史上最全

    每到期末肯定有不少小伙伴烦恼趴 今天他来了-史上最全大学期末不挂科速成课 以下链接是合集合集!!单科可以私聊小编喔!! https://download.csdn.net/download/qq_52 ...

  6. 数字IC后端实现40天速成篇(上)

    数字IC后端实现40天速成篇(上) 文章右侧广告为官方硬广告,与吾爱IC社区无关,用户勿点.点击进去后出现任何损失与社区无关. 为了更好服务好大家,现把小编目前这边的主营业务向大家汇报下.其中对于设计 ...

  7. 号称“高薪、转型、改变命运” ,纽约时报曝光煤矿工地上的编程“速成班”

    大数据文摘授权编译自纽约时报 来源:www.nytimes.com 编译:陆震.王念.蒋宝尚 近年一直有这样一种论调:如果你学历不高,想拿高薪,那就去参加个培训班学编程. 尤其是最近爆火的大数据和人工 ...

  8. 号称“高薪、转型、改变命运” ,纽约时报曝光煤矿工地上的编程“速成班”...

    大数据文摘授权编译自纽约时报 来源:www.nytimes.com 编译:陆震.王念.蒋宝尚 近年一直有这样一种论调:如果你学历不高,想拿高薪,那就去参加个培训班学编程. 尤其是最近爆火的大数据和人工 ...

  9. Python零基础速成班-第2讲-Python基础(上),运算、变量、数据类型、输入输出

    Python零基础速成班-第2讲-Python基础(上),运算.变量.数据类型.输入输出 学习目标 使用print输出结果 运算及运算符 变量 数据类型(4种最常用的) 输入输出 课后作业(4必做+1 ...

最新文章

  1. (转)ZeroMQ的模式-Requset-Reply
  2. Contest2071 - 湖南多校对抗赛(2015.03.28)
  3. Deepin15.7 Android8.1 编译 以及问题解决
  4. [云炬创业基础笔记]第七张创业资源测试10
  5. 揭秘《死者之书》之游戏角色资源创作
  6. Upgrade Hole puncher Mathematical Modeling
  7. 现代软件工程 第二章 【个人技术】 练习与讨论
  8. latex 插图解释_仅使用一些插图和视频即可解释Big O符号
  9. 字节跳动的一面内容记录
  10. VMware vSphere 6.7之vSAN配置全程图解
  11. Hadoop系列之OutputCollector
  12. mysql5.7 备份恢复_RDS for MySQL 5.7 备份恢复为本地实例-阿里云开发者社区
  13. Python开发最常犯错误总结10种
  14. 17.基于scrapy-redis两种形式的分布式爬虫
  15. 阿铭Linux_传统IDC 部署网站学习笔记20190118
  16. 头歌-自己动手画CPU(第二关)-原码一位乘法器-Logisim
  17. php在广告策划方面需要什么,网上推广网店运营 系统是基于PHP+MySQL的B2B(电子商务)行业门户解决方案”是什么意思呢,谢谢...
  18. 在谷歌浏览器上面安装AxueRP插件。
  19. 微信公众号监听手机返回键事件jssdk—wx.closeWindow
  20. 一键备份Android系统(软件)

热门文章

  1. 【论文阅读】Learning with Hypergraphs: Clustering, Classification, and Embedding
  2. 字典转换成模型属性代码笔记
  3. 成功改造企业文化的八项原则
  4. 教你用Python感知女朋友的情绪变化?
  5. 揭秘三大运营商在5G专网的布局!
  6. 最小公倍数,甲、乙、丙三人是朋友, 他们每隔不同天数到图书馆去一次。 甲3天去一次,乙4天去一次,丙5天去一次。有一天,他们三人恰好在图书馆相会,问至少再过多少天他们三人又在图书馆相会?
  7. 下班后有什么可以做的副业?分享几个靠谱线上和线下副业
  8. 蓝桥杯python组——平行四边形面积
  9. 《学习如何学习》Week1 3.4 名人采访3: 如何写作?
  10. shell特殊符号 命令