原文链接: https://segmentfault.com/a/1190000009924164

RxJS 是响应式编程 (reactive programming) 强大的工具,今天我们将深入介绍 Observables 和 Observers 的内容,以及介绍如何创建自己的操作符 (operators)。

如果你之前已经使用过 RxJS,并希望了解 Observable 及 Operators (操作符) 的内部工作原理,那么这篇文章非常适合你。

什么是 Observable

Observable 就是一个拥有以下特性的函数:

  • 它接收一个 observer 对象作为参数,该对象中包含 nexterror 和 complete 方法
  • 它返回一个函数,用于在销毁 Observable 时,执行清理操作

在我们实现的示例中,我们将定义一个简单的 unsubscribe 函数来实现取消订阅的功能。然而在 RxJS 中,返回的是 Subcription 对象,该对象中包含一个 unsubscribe 方法。

一个 Observable 对象设置观察者 (observer),并将它与生产者关联起来。该生产者可能是 DOM 元素产生的 click 或 input 事件,也可能是更复杂的事件,如 HTTP。

为了更好地理解 Observable,我们来自定义 Observable。首先,我们先来看一个订阅的例子:

const node = document.querySelector('input[type=text]');const input$ = Rx.Observable.fromEvent(node, 'input');input$.subscribe({next: (event) => console.log(`You just typed ${event.target.value}!`),error: (err) => console.log(`Oops... ${err}`),complete: () => console.log(`Complete!`)
});

该示例中,Rx.Observable.formEvent() 方法接收一个 input 元素和事件名作为参数,然后返回一个 $input Observable 对象。接下来我们使用 subscribe() 方法来定于该 Observable 对象。当触发 input 事件后,对应的值将会传递给 observer 对象。

什么是 Observer

Observer (观察者) 非常简单,在上面的示例中,观察者是一个普通的对象,该对象会作为 subscribe() 方法的参数。此外 subscribe(next, error, complete) 也是一个有效的语法,但在本文中我们将讨论对象字面量的形式。

当 Observable 对象产生新值的时候,我们可以通过调用 next() 方法来通知对应的观察者。若出现异常,则会调用观察者的 error() 方法。当我们订阅 Observable 对象后,只要有新的值,都会通知对应的观察者。但在以下两种情况下,新的值不会再通知对应的观察者:

  • 已调用 observer 对象的 complete() 方法
  • 消费者对数据不再感兴趣,执行取消订阅操作

此外在执行最终的 subscribe() 订阅操作前,我们传递的值可以经过一系列的链式处理操作。执行对应操作的东西叫操作符,每个操作符执行完后会返回一个新的 Observable 对象,然后继续我们的处理流程。

什么是 Operator

正如上面所说的,Observable 对象能够执行链式操作,具体如下所示:

const input$ = Rx.Observable.fromEvent(node, 'input').map(event => event.target.value).filter(value => value.length >= 2).subscribe(value => {// use the `value`
});

上面代码的执行流程如下:

  • 假设用户在输入框中输入字符 a
  • Observable 对象响应对应的 input 事件,然后把值传递给 observer
  • map() 操作符返回一个新的 Observable 对象
  • filter() 操作符执行过滤操作,然后又返回一个新的 Observable 对象
  • 最后我们通过调用 subscribe() 方法,来获取最终的值

简而言之,Operator 就是一个函数,它接收一个 Observable 对象,然后返回一个新的 Observable 对象。当我们订阅新返回的 Observable 对象时,它内部会自动订阅前一个 Observable 对象。

自定义 Observable

Observable 构造函数

function Observable(subscribe) {this.subscribe = subscribe;
}

每个 subscribe 回调函数被赋值给 this.subscribe 属性,该回调函数将会被我们或其它 Observable 对象调用。

Observer 示例

在我们深入介绍前,我们先来看一个简单的示例。之前我们已经创建完 Observable 函数,现在我们可以调用我们的观察者 (observer),然后传递数值 1,然后订阅它:

const one$ = new Observable((observer) => {observer.next(1);observer.complete();
});one$.subscribe({next: (value) => console.log(value) // 1
});

即我们订阅我们创建的 Observable 实例,然后通过 subscribe() 方法调用通过构造函数设置的回调函数。

Observable.fromEvent

下面就是我们需要的基础结构,即在 Observable 对象上需要新增一个静态方法 fromEvent :

Observable.fromEvent = (element, name) => { };

接下来我们将参考 RxJS 为我们提供的方法来实现自定义的 fromEvent() 方法:

const node = document.querySelector('input');
const input$ = Observable.fromEvent(node, 'input');

按照上面的使用方式,我们的 fromEvent() 方法需要接收两个参数,同时需要返回一个新的 Observable 对象,具体如下:

Observable.fromEvent = (element, name) => {return new Observable((observer) => {});
};

接下来我们来实现事件监听功能:

Observable.fromEvent = (element, name) => {return new Observable((observer) => {element.addEventListener(name, (event) => {}, false);});
};

那么我们的 observer 参数来自哪里? 其实 observer 对象就是包含 nexterror 和 complete 方法的对象字面量。

需要注意的是,我们的 observer 参数不会被传递,直到 subscribe() 方法被调用。这意味着 addEventListener() 方法不会被调用,除非你订阅该 Observable 对象。

当我们调用 subscribe() 方法,之前设置的 this.subscribe 回调函数会被调用,对应的参数是我们定义的 observer 对象字面量,接下来将使用新的值,作为 next() 方法的参数,调用该方法。

很好,那接下来我们要做什么?之前版本我们只是设置了监听,但没有调用 observer 对象的 next() 方法,接下来让我们来修复这个问题:

Observable.fromEvent = (element, name) => {return new Observable((observer) => {element.addEventListener(name, (event) => {observer.next(event);}, false);});
};

如你所知,当销毁 Observables 对象时,需要调用一个函数用来执行清理操作。针对目前的场景,在销毁时我们需要移除事件监听:

Observable.fromEvent = (element, name) => {return new Observable((observer) => {const callback = (event) => observer.next(event);element.addEventListener(name, callback, false);return () => element.removeEventListener(name, callback, false);});
};

我们没有调用 complete() 方法,因为该 Observable 对象处理的 DOM 相关的事件,在时间维度上它们可能是无终止的。

现在让我们来验证一下最终实现的功能:

const node = document.querySelector('input');
const p = document.querySelector('p');function Observable(subscribe) {this.subscribe = subscribe;
}Observable.fromEvent = (element, name) => {return new Observable((observer) => {const callback = (event) => observer.next(event);element.addEventListener(name, callback, false);return () => element.removeEventListener(name, callback, false);});
};const input$ = Observable.fromEvent(node, 'input');const unsubscribe = input$.subscribe({next: (event) => {p.innerHTML = event.target.value;}
});// automatically unsub after 5ssetTimeout(unsubscribe, 5000);

自定义操作符

创建我们自己的操作符应该会更容易一些,现在我们了解 Observable 和 Observable 背后的概念。我们将在 Observable 的原型对象上添加一个方法:

Observable.prototype.map = function (mapFn) { };

该方法的功能与 JavaScript 中的 Array.prototype.map 方法类似:

const input$ = Observable.fromEvent(node, 'input').map(event => event.target.value);

所以我们需要应用回调函数并调用它,这用于获取我们所需要的数据。在我们这样做之前,我们需要流中的最新值。这里是巧妙的部分,在 map() 操作符中,我们需要访问 Observable 实例。因为 map 方法在原型上,我们可以通过以下方式访问 Observable 实例:

Observable.prototype.map = function (mapFn) {const input = this;
};

接下来我们在返回的 Observable 对象中执行 input 对象的订阅操作:

Observable.prototype.map = function(mapFn) {const input = this;return new Observable((observer) => {return input.subscribe();});
};

我们返回了 input.subscribe() 方法执行的结果,因为当我们执行取消订阅操作时,将会依次调用每个 Observable 对象取消订阅的方法。

最后我们来完善一下 map 操作符的内部代码:

Observable.prototype.map = function (mapFn) {const input = this;return new Observable((observer) => {return input.subscribe({next: (value) => observer.next(mapFn(value)),error: (err) => observer.error(err),complete: () => observer.complete()});});
};

现在我们已经可以执行链式操作了:

const input$ = Observable.fromEvent(node, 'input').map(event => event.target.value);input$.subscribe({next: (value) => {p.innerHTML = value;}
});

我有话说

Observable 与 Promise 有什么区别?

Observable(可观察对象)是基于推送(Push)运行时执行(lazy)的多值集合。

Untitled

  • Promise

    • 返回单个值
    • 不可取消的
  • Observable
    • 随着时间的推移发出多个值
    • 可以取消的
    • 支持 map、filter、reduce 等操作符
    • 延迟执行,当订阅的时候才会开始执行

什么是 SafeObserver ?

上面的示例中,我们使用一个包含了 next、error、complete 方法的普通 JavaScript 对象来定义观察者。一个普通的 JavaScript 对象只是一个开始,在 RxJS 5 里面,为开发者提供了一些保障机制,来保证一个更安全的观察者。以下是一些比较重要的原则:

  • 传入的 Observer 对象可以不实现所有规定的方法 (next、error、complete 方法)
  • 在 complete 或者 error 触发之后再调用 next 方法是没用的
  • 调用 unsubscribe 方法后,任何方法都不能再被调用了
  • complete 和 error 触发后,unsubscribe 也会自动调用
  • 当 nextcompleteerror 出现异常时,unsubscribe 也会自动调用以保证资源不会浪费
  • nextcompleteerror是可选的。按需处理即可,不必全部处理

为了完成上述目标,我们得把传入的匿名 Observer 对象封装在一个 SafeObserver 里以提供上述保障。

若想进一步了解详细信息,请参考 Observable详解 文章中 "自定义 Observable" 章节的内容。

参考资源

  • rxjs-observables-observers-operators

rxjs 的 observable 是什么?相关推荐

  1. ajax和rxjs,javascript – RxJS 5 Observable和Angular2 http:调用ajax一次,保存结果,随后的ajax调用使用缓存结果...

    下面的代码是我目前拥有的简化版本: name.service.ts@Injectable() export class NameService { const nameURL = "http ...

  2. RXJS 中 Observable 和 Subject

    1.消费者 决定何时从数据 生产者 中接收数据,生产者本身不知道数据何时会被传递给消费者,函数本身不知道何时会被调用,只有调用函数的人才知道. 2.生产者 决定何时将数据发送给 消费者, 消费者不知道 ...

  3. RxJS 系列之二 - Observable 详解

    查看新版教程,请访问前端修仙之路 RxJS 系列目录 RxJS 系列之一 - Functional Programming 简介 RxJS 系列之二 - Observable 详解 (本文) RxJS ...

  4. Rxjs源码解析(一)Observable

    学习一个库最好的方法就是看其源码,理解其 api 的调用原理,用起来自然也就很清楚自己到底在干什么了,秉持着此观念,为了更好地理解 rxjs,抽空将其源码看了一遍 本系列文章不会刻意涉及概念性的东西, ...

  5. 你会用RxJS吗?【初识 RxJS中的Observable和Observer】

    概念 RxJS是一个库,可以使用可观察队列来编写异步和基于事件的程序的库. RxJS 中管理和解决异步事件的几个关键点: Observable: 表示未来值或事件的可调用集合的概念. Observer ...

  6. redux rxjs_可观察的RxJS和Redux入门指南

    redux rxjs Redux-Observable is an RxJS-based middleware for Redux that allows developers to work wit ...

  7. rxjs ThrottleTime 和 debounceTime 的操作符区别

    throttleTime 的作⽤是限制在 duration 时间范围内,从上游传递给下游数据的个数: debounceTime 的作⽤是让传递给下游的数据间隔不能⼩于给定的时间 dueTime. 一旦 ...

  8. 什么是 Rxjs 的 subscription

    什么是订阅? Subscription 是一个对象,该对象代表一个一次性资源,通常是一个 Observable 的执行. Subscription 有一个重要的方法,unsubscribe,它不接受任 ...

  9. rxjs angular_Angular RxJS深度

    rxjs angular In this tutorial, we'll learn to use the RxJS 6 library with Angular 6 or Angular 7. We ...

最新文章

  1. JFreeChart_API
  2. 2016-2017-1 《信息安全系统设计基础》 学生博客及Git@OSC 链接
  3. SAP Spartacus pop over 元素的单元测试
  4. linux 文件系统---类型、创建、
  5. UITableView知识梳理须知—(一)
  6. 数据建模大数据就业挑战月薪30K
  7. 2016校招真题编程练习——微信红包(腾讯)
  8. html5 progress css,CSS内容:HTML5进度上的attr()不起作用
  9. Java实现面向对象编程
  10. ResNet网络结构代码该怎么看
  11. 大数据-数据仓库的概念
  12. 解决 VMware 无法复制粘贴问题
  13. 从PhotoShop入门学习深入了解设计原则
  14. 早上空腹喝酸奶好吗?
  15. 怎样使用轻快pdf阅读器
  16. html java实训心得,学习中关于HTML的总结与一些心得
  17. Driller源码阅读笔记(二)
  18. oracle设行宽 每次,解决每次从cmd进入sqlplus,都得重新设置pagesize、linesize的问题...
  19. 计算机网络(2)--- 因特网的发展阶段与组织
  20. 挑战感知极限:智能安全感知驱动设计

热门文章

  1. wp主题模版xsbrand修改
  2. AI视频融合平台EasyCVR现已支持华为宇视等四种SDK接入
  3. Flash与3D编程探秘(一)- Flash与3D空间
  4. 如何快速掌握正确的UI配色方案?6种技巧不容错过!
  5. 店盈通:拼多多一个店铺推几个产品最好
  6. pmp--责任分配矩阵,资源分解结构,工作分解结构
  7. 如何清理C盘的垃圾文件
  8. Python爬虫——建立IP代理池
  9. unity关于对其他脚本对象的修改
  10. 网络封包分析工具Charles