源代码:

const a = of([1, 2, 3]);const b = map((data: number[]) => {for( let i = 0; i < data.length; i++){data[i] = data[i] + 1;}console.log('data: ' + data);return data;} );const c = a.pipe(b);c.subscribe((data) => console.log('Fairy:' + data));

注意,传入pipe的operations不是应用程序传入的fn,而是执行了map operator后,被MapOperation包裹过的fn:

this还是指向Observable对象。

因为我只传入了一个operation,所以不会调用array的reduce,而是直接返回数组里唯一的一个元素:

pipeFromArray(operations)返回operations数组里唯一的元素,即mapOperations,然后将this即当前调用pipe的Observable对象传入mapOperations函数:

此时source就是调用pipe函数的Observable对象:

调用Observable对象的lift方法,输入为新建的MapOperator:

这个MapOperator其实就是一个POJO - Plain Old JavaScript Object,就是回调project和this上下文的封装。

Observable对象的lift操作也很好理解:新建一个Observable对象,将其source指向原始Observable,然后operator设置为刚才新建的MapOperator:

注意,此时我们执行subscribe的Observable对象,实际上是pipe操作返回的新Observable:

其map operator里包含了map应该执行的回调函数:

source指向的是原始的Observable对象。

Observable.subscribe里第一个重要的操作,就是创建subscriber对象。既然是subscribe,就得有subscribe对象,这个对象就是应用程序传入的callback的封装。

至此我们已经认识了一些对象的封装了:

MapOperation和MapOperator是map操作符里传入的function(称为project)的封装,其中MapOperator包含project和thisArg,而MapOperation内部的逻辑有二:一是确保project的类型为function,二是创建MapOperator,然后调用Observable的lift生成新的Observable.

Subscriber是应用程序传入的callback的封装。

Subscriber的destination指向了SafeSubscriber,后者包含的app callback:

继续回到Observable的subscribe操作,下图的含义是,如果新的Observable对象包含mapOperator(在我们的例子里确实包含),则调用该MapOperator,同时传入的输入参数为原始的Observable对象。

这里看到MapOperator的call方法了:

把subscribe调用链式传递给source了,这也是为什么MapOperation里为什么要调用原始Observable的lift方法——建立Observable与Observable之间的链接关系。

MapSubscriber和普通的Subscriber有何区别?上图做了对比,就多了一个project字段。

原始对象(即ofType返回的Observable)调用trySubscribe:

sink即MapSubscriber,里面包含了app callback:

以及map project:

有了这些信息,可以执行map操作了。

始终记住这个语义:Observable.subscribe(subscriber)

2021-1-31 更新

Observable pipe方法驱动的流,每一个map函数的输出是下一个函数的输入。下列代码打印:[3,4,5]

更多Jerry的原创文章,尽在:“汪子熙”:

rxjs 里的pipe operator相关推荐

  1. rxjs 里的map operator

    只有调用函数subscribeToArray的上下文,才知道array的内容到底是什么. 调用上下文,只有一个参数input传给了subscribeToArray: 因此只有在Observable构造 ...

  2. rxjs里使用from operator从一个generator里生成Observable

    源代码: constructor(private jerryService: JerrySandBoxService,private englishGreet: GreetingService,@In ...

  3. rxjs里switchMap operators的用法

    switchMap相关文章 rxjs里switchMap operators的用法 通过rxjs的一个例子, 来学习SwitchMap的使用方法 rxjs switchMap的实现原理 rxjs的ma ...

  4. rxjs里的Observable对象和map配合的一个用法

    源代码: import { of } from 'rxjs'; import { Injectable } from '@angular/core'; import { map } from 'rxj ...

  5. rxjs里scan operator的执行研究

    源代码: ngOnInit(): void {console.log('before ngOnInit');const source$ = range(0, 10);source$.pipe(/*fi ...

  6. 通过一个最简单的例子,理解Angular rxjs里的Observable对象的pipe方法

    源代码: import { of } from 'rxjs'; import { Injectable } from '@angular/core'; import { map } from 'rxj ...

  7. Angular rxjs里自定义operator的使用

    操作符是函数,它基于当前的 Observable 创建一个新的 Observable.这是一个无副作用的操作:前面的 Observable 保持不变. 操作符本质上是一个纯函数 (pure funct ...

  8. rxjs里b = a.pipe(map(mapFn))的执行示意图

  9. rxjs里subscribe和tap的区别

    stackoverflow上的讨论:https://stackoverflow.com/questions/49184754/tap-vs-subscribe-to-set-a-class-prope ...

最新文章

  1. VXLAN和GRE的区别
  2. 网络编程4之UDP协议
  3. Py-博客学习50问
  4. js数组初始化——ES6 Array.prototype.fill()实践记录
  5. No-3.Linux 终端命令格式
  6. AdaX:一个比Adam更优秀,带”长期记忆“的优化器
  7. Win10与Ubuntu18.04之smb相互共享
  8. HTML当当图书馆作业介绍
  9. tecplot脚本在Linux系统下运行,Linux系统下安装Tecplot的步骤
  10. 无法打开包括文件qfiledialog.h头文件
  11. java计算机毕业设计体检系统源码+系统+数据库+lw文档
  12. 微信怎么录屏聊天记录?这两个方法值得收藏!
  13. 用Ruby替代Java做rest接口的单元测试!
  14. 如何加强数字版权保护
  15. 手机辐射对人体的危害
  16. Fast Global Registration(快速全局配准)
  17. 信息解码(ACM)刘汝佳
  18. python设置文件编码_python修改文件编码为utf-8格式
  19. Android 把键盘上的 确认键 变成搜索键
  20. casual discovery Toolbox使用(R语言做因果分析)

热门文章

  1. 每周.NET前沿技术文章摘要(2017-05-24)
  2. Oracle CPU使用率过高问题处理
  3. 【POI word】使用POI实现对Word的读取以及生成
  4. 将不确定变为确定~transactionscope何时提升为分布式事务?(sql2005数据库解决提升到MSDTC的办法)...
  5. CF613D Kingdom and its Cities
  6. [2018/11/13]图像处理
  7. python time模块
  8. 团队作业8——测试与发布(Beta阶段)
  9. 洗澡或游泳等导致的耳朵进水的解决方案
  10. rsync安装与配置使用 数据同步方案(centos6.5)