Observable
Observable
概述
在ReactiveX中,一个观察者(Observer)订阅一个可观察对象(Observable)。观察者对Observable发射的数据或数据序列作出响应。这种模式可以极大地简化并发操作,因为它创建了一个处于待命状态的观察者哨兵,在未来某个时刻响应Observable的通知,不需要阻塞等待Observable发射数据。
这篇文章会解释什么是响应式编程模式(reactive pattern),以及什么是可观察对象(Observables)和观察者(observers),其它几篇文章会展示如何用操作符组合和改变Observable的行为。
相关参考:
- Single - 一个特殊的Observable,只发射单个数据。
背景知识
在很多软件编程任务中,或多或少你都会期望你写的代码能按照编写的顺序,一次一个的顺序执行和完成。但是在ReactiveX中,很多指令可能是并行执行的,之后他们的执行结果才会被观察者捕获,顺序是不确定的。为达到这个目的,你定义一种获取和变换数据的机制,而不是调用一个方法。在这种机制下,存在一个可观察对象(Observable),观察者(Observer)订阅(Subscribe)它,当数据就绪时,之前定义的机制就会分发数据给一直处于等待状态的观察者哨兵。
这种方法的优点是,如果你有大量的任务要处理,它们互相之间没有依赖关系。你可以同时开始执行它们,不用等待一个完成再开始下一个(用这种方式,你的整个任务队列能耗费的最长时间,不会超过任务里最耗时的那个)。
有很多术语可用于描述这种异步编程和设计模式,在在本文里我们使用这些术语:一个观察者订阅一个可观察对象 (An observer subscribes to an Observable)。通过调用观察者的方法,Observable发射数据或通知给它的观察者。
在其它的文档和场景里,有时我们也将Observer叫做Subscriber、Watcher、Reactor。这个模型通常被称作Reactor模式。
创建观察者
本文使用类似于Groovy的伪代码举例,但是ReactiveX有多种语言的实现。
普通的方法调用(不是某种异步方法,也不是Rx中的并行调用),流程通常是这样的:
- 调用某一个方法
- 用一个变量保存方法返回的结果
- 使用这个变量和它的新值做些有用的事
用代码描述就是:
// make the call, assign its return value to `returnVal`
returnVal = someMethod(itsParameters);
// do something useful with returnVal
在异步模型中流程更像这样的:
- 定义一个方法,它完成某些任务,然后从异步调用中返回一个值,这个方法是观察者的一部分
- 将这个异步调用本身定义为一个Observable
- 观察者通过订阅(Subscribe)操作关联到那个Observable
- 继续你的业务逻辑,等方法返回时,Observable会发射结果,观察者的方法会开始处理结果或结果集
用代码描述就是:
// defines, but does not invoke, the Subscriber's onNext handler
// (in this example, the observer is very simple and has only an onNext handler)
def myOnNext = { it -> do something useful with it };
// defines, but does not invoke, the Observable
def myObservable = someObservable(itsParameters);
// subscribes the Subscriber to the Observable, and invokes the Observable
myObservable.subscribe(myOnNext);
// go on about my business
回调方法 (onNext, onCompleted, onError)
Subscribe方法用于将观察者连接到Observable,你的观察者需要实现以下方法的一个子集:
onNext(T item)
Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现。
onError(Exception ex)
当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。
onComplete
正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。
根据Observable协议的定义,onNext可能会被调用零次或者很多次,最后会有一次onCompleted或onError调用(不会同时),传递数据给onNext通常被称作发射,onCompleted和onError被称作通知。
下面是一个更完整的例子:
def myOnNext = { item -> /* do something useful with item */ };
def myError = { throwable -> /* react sensibly to a failed call */ };
def myComplete = { /* clean up after the final response */ };
def myObservable = someMethod(itsParameters);
myObservable.subscribe(myOnNext, myError, myComplete);
// go on about my business
取消订阅 (Unsubscribing)
在一些ReactiveX实现中,有一个特殊的观察者接口Subscriber,它有一个unsubscribe方法。调用这个方法表示你不关心当前订阅的Observable了,因此Observable可以选择停止发射新的数据项(如果没有其它观察者订阅)。
取消订阅的结果会传递给这个Observable的操作符链,而且会导致这个链条上的每个环节都停止发射数据项。这些并不保证会立即发生,然而,对一个Observable来说,即使没有观察者了,它也可以在一个while循环中继续生成并尝试发射数据项。
关于命名约定
ReactiveX的每种特定语言的实现都有自己的命名偏好,虽然不同的实现之间有很多共同点,但并不存在一个统一的命名标准。
而且,在某些场景中,一些名字有不同的隐含意义,或者在某些语言看来比较怪异。
例如,有一个onEvent命名模式(onNext, onCompleted, onError),在一些场景中,这些名字可能意味着事件处理器已经注册。然而在ReactiveX里,他们是事件处理器的名字。
Observables的"热"和"冷"
Observable什么时候开始发射数据序列?这取决于Observable的实现,一个"热"的Observable可能一创建完就开始发射数据,因此所有后续订阅它的观察者可能从序列中间的某个位置开始接受数据(有一些数据错过了)。一个"冷"的Observable会一直等待,直到有观察者订阅它才开始发射数据,因此这个观察者可以确保会收到整个数据序列。
在一些ReactiveX实现里,还存在一种被称作Connectable的Observable,不管有没有观察者订阅它,这种Observable都不会开始发射数据,除非Connect方法被调用。
用操作符组合Observable
对于ReactiveX来说,Observable和Observer仅仅是个开始,它们本身不过是标准观察者模式的一些轻量级扩展,目的是为了更好的处理事件序列。
ReactiveX真正强大的地方在于它的操作符,操作符让你可以变换、组合、操纵和处理Observable发射的数据。
Rx的操作符让你可以用声明式的风格组合异步操作序列,它拥有回调的所有效率优势,同时又避免了典型的异步系统中嵌套回调的缺点。
下面是常用的操作符列表:
- 创建操作 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
- 变换操作 Buffer, FlatMap, GroupBy, Map, Scan和Window
- 过滤操作 Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast
- 组合操作 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
- 错误处理 Catch和Retry
- 辅助操作 Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using
- 条件和布尔操作 All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile
- 算术和集合操作 Average, Concat, Count, Max, Min, Reduce, Sum
- 转换操作 To
- 连接操作 Connect, Publish, RefCount, Replay
- 反压操作,用于增加特殊的流程控制策略的操作符
这些操作符并不全都是ReactiveX的核心组成部分,有一些是语言特定的实现或可选的模块。
RxJava
在RxJava中,一个实现了_Observer_接口的对象可以订阅(subscribe)一个Observable 类的实例。订阅者(subscriber)对Observable发射(emit)的任何数据或数据序列作出响应。这种模式简化了并发操作,因为它不需要阻塞等待Observable发射数据,而是创建了一个处于待命状态的观察者哨兵,哨兵在未来某个时刻响应Observable的通知。
Observable相关推荐
- Angular 可观察对象(Observable)
可观察对象(Observable) 可观察对象支持在应用的发布者和订阅者之间传递消息. 可观察对象是声明式的 -- 即定义的用于发布值的函数,在有消费者订阅它之前,这个函数不会实际执行. 可观察对象可 ...
- EXTJS之Ext.util.Observable自定义事件
暂时还不会用Ext.mixin.Observable, 催悲的测试了近两个小时.这TMD的语法差距也太大了啊.. 在新版EXTJS里,已去除了addEvents. 弄个出来,大概知道下吧. ? 1 2 ...
- java观察者模式类图_设计模式(十八)——观察者模式(JDK Observable源码分析)...
1 天气预报项目需求,具体要求以下: 1) 气象站能够将天天测量到的温度,湿度,气压等等以公告的形式发布出去(好比发布到本身的网站或第三方).java 2) 须要设计开放型 API,便于其余第三方也能 ...
- RxJava2 / RxAndroid2的merge操作合并多个Observable
RxJava2/RxAndroid2的merge操作合并多个Observable RxAndroid2/RxJava2的merge操作合并若干个Observable为单个可观测的Observable, ...
- observable.unsubscribeOn(Schedulers.io())
取消订阅 一般我们在视图消亡后,无需RxJava再执行,可以直接取消订阅 observable.unsubscribeOn(Schedulers.io()); 可用在activity的 onDestr ...
- Observable.OnSubscribe 的理解
-- "通过OnSubscribe的源码的注释 :意思是 当Observable被订阅(subscribe) OnSubscribe接口的call方法会被执行." --subscr ...
- observable_在Spring MVC流中使用rx-java Observable
observable Spring MVC现在已经支持异步请求处理流程了一段时间,该支持内部利用了Tomcat / Jetty等容器的Servlet 3异步支持. Spring Web Async支持 ...
- 30 天精通 RxJS (05): 建立 Observable(一)
Observable 是 RxJS 的核心,今天让我们从如何建立 Observable 开始! 这是[30天精通 RxJS]的 05 篇,如果还没看过 04 篇可以往这边走: [30 天精通 RxJS ...
- Java中使用Observer接口和Observable类实践Observer观察者模式
在Java中通过Observable类和Observer接口实现了观察者模式.实现Observer接口的对象是观察者,继承Observable的对象是被观察者. 1. 实现观察者模式 实现观察者模式非 ...
- JDK中提供的实现——通过 java.util.Observable 类和 java.util.Observer 接口定义了观察者模式,只要实现它们的子类就可以编写观察者模式实例
JDK中提供的实现 在 Java 中,通过 java.util.Observable 类和 java.util.Observer 接口定义了观察者模式,只要实现它们的子类就可以编写观察者模式实例. 1 ...
最新文章
- 生产中NFS案例记录---写入权限解决过程
- Windows10 Postgres11 安装 The Database cluster initialisation failed
- Docker Hub 镜像加速器
- MySQL中的数据查询
- 如何设计java线程安全类_如何设计线程安全的Java程序
- python无法对存在交叉部分的图片进行识别_Python异常样本识别 交叉验证出现错误?...
- html校园首页设计说明范文,网页设计作品设计说明-必看请相互转告
- Scala中的match(模式匹配)
- 基于依存句法规则的篇章级情感分析demo
- css鼠标滑过变大,css 鼠标移上去会变大(示例代码)
- 第一篇:了解和使用MVPArms项目
- 工业级Pass云平台SpringCloudAlibaba综合项目实战(二):解决方案和工作效率
- 姿态估计2-03:PVNet(6D姿态估计)-白话给你讲论文-翻译无死角(1)
- source insight 4.0 的一些设置
- Win7下安装Ubuntu(双硬盘)的简要步骤
- 导出表钩子之EAT HOOK解析
- DNSPod十问百果园焦岳:为什么开水果店是一门高科技生意?
- 创业融资路演PPT模板
- 【分享笔记】android6.0特性
- RQNOJ 篝火晚会
热门文章
- MySQL课程2.表的各种查询
- Python知识——变量、基本数据类型
- disable和readonly的区别和使用
- 国内概率与数理统计专业牛人分布
- 「HTML」什么是 HTML 中的 div 标签
- tm影像辐射定标_Landsat TM 辐射定标和大气校正步骤
- php开发数独,php解数独 - AA星梦无痕AA的个人空间 - OSCHINA - 中文开源技术交流社区...
- 关于土豆网盈利模式的一点儿构思
- AWS 发布新一代Amazon Aurora Serverless,更方便 SQL Server迁移
- pycharm皮肤及快捷键设置