文章目录

  • Observable
    • 基础
      • 概述
    • 操作符
      • 创建操作
        • Create
        • Defer
        • Empty/Never/Throw
        • From
        • Interva
        • just
        • Range
        • Repeat
        • Start
        • Timer
      • 变换操作
        • Buffer
        • FlatMap
        • GroupBy
        • Map
        • Scan
        • Window
      • 过滤操作
        • DeBounce/throttleWithTimeout
        • Distinct
        • Filter
      • 结合操作
        • And/Then/When
        • Join
        • Merge
        • StartWitch
        • Switch
        • Zip
      • 错误处理
        • Catch
        • Retry
      • 辅助操作
        • Delay
        • Do
        • ObserveOn
        • Subscribe
        • SubscribeOn
        • TimeOut

Observable

基础

概述

一个观察者订阅(Observer)一个可观察对象(Observable)。观察者对观察对象发射的数据或数据序列做出响应

普通方法调用流程

  1. 调用某方法
  2. 用变量保存方法保存结果
  3. 使用变量新值继续操作
  • 代码表示为
// make the call, assign its return value to `returnVal`
returnVal = someMethod(itsParameters);
// do something useful with returnVal

异步模型流程

  1. 定义方法,方法拿着异步调用的返回值并处理
  2. 将异步调用本身定义为Observable
  3. 观察者通过订阅(Subscribe)操作关联到Observable
  4. 继续业务逻辑处理
  • 代码表示为
  // defines, but does not invoke, the Subscriber's onNext handler// (in this example, the observer is very simple and has only an onNext handler)def myOnNext = { it -> do something useful with it };// defines, but does not invoke, the Observabledef myObservable = someObservable(itsParameters);// subscribes the Subscriber to the Observable, and invokes the ObservablemyObservable.subscribe(myOnNext);// go on about my business

回调方法

  • onNext(T item)

    • Observable调用这个方法发射数据,参数为Observable发射的数据,可以被调用多次
  • onError(Exception ex)
  • 当Observable遇到错误或者无返回期望值的数据时调用,会终止Observable,后续不会调用onNext和onComplete,参数为抛出异常
  • onComplete
  • 正常终止,如果没有遇到错误,Observable会在最后一次调用onNext之后调用此方法
  • 例子
 def myOnNext     = { item -> /* do something useful with item */ };def myError      = { throwable -> /* react sensibly to a failed call */ };def myComplete   = { /* clean up after the final response */ };def myObservable = someMethod(itsParameters);myObservable.subscribe(myOnNext, myError, myComplete);// go on about my business

取消订阅

特殊观察者接口Subscriber,取消订阅方法unsubscribe方法

操作符列表

  • 创建操作
  • Create、Defer、Empty/Never/Throw、From、Interval、Just、Range、Repeat、Start、Timer
  • 变换操作
  • Buffer、FlatMap、GroupBy、Map、Scan、Window
  • 过滤操作
  • Debounce、Distinct、ElementAt、Filter、First、IgnoreElements、Last、Sample、Skip、SkipLast、Take、TakeLast
  • 组合操作
  • And/Then/When、CombineLatest、Join、Merge、StartWith、Switch、Zip
  • 错误处理
  • Catch、Retry
  • 辅助操作
  • Delay、Do、Materialize/Dematerialize、ObserveOn、Serialize、Subscribe、SubscribeOn、TimeInterval、TimeOut、Timestamp、Using
  • 条件和布尔操作
  • All、Amb、Contains、DefaultIfEmpty、SequenceEqual、SkipUntil、SkipWhile、TakeUntil、TakeWhile
  • 算数和集合操作
  • Average、Concat、Count、Max、Min、Reduce、Sum
  • 转换操作
  • To
  • 连接操作
  • Connect、Publish、RefCount、Replay
  • 反压操作
  • 用于增加特殊的流程控制策略的操作符

操作符

创建操作

Create

使用一个函数从头开始创建一个Observable

  • 示例
Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> observer) {try {if (!observer.isUnsubscribed()) {for (int i = 1; i < 5; i++) {observer.onNext(i);}observer.onCompleted();}} catch (Exception e) {observer.onError(e);}}} ).subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {System.out.println("Next: " + item);}@Overridepublic void onError(Throwable error) {System.err.println("Error: " + error.getMessage());}@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}});
Defer

直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的

  • switchCase

    • 有条件的创建并返回一个可能的Observables集合中的一个
Empty/Never/Throw
  • Empty

    • 创建一个不发射任何数据但是正常终止的Observable,实现为empty
  • Never
    • 创建一个不发射数据也不终止的Observable,实现为never
  • Throw
    • 创建一个不发射数据以一个错误终止的Observable,实现为error
From

将其他种类的对象和数据类型转换为Observable,包括Future、Iterable和数组

  • 示例
Integer[] items = { 0, 1, 2, 3, 4, 5 };
Observable myObservable = Observable.from(items);myObservable.subscribe(new Action1<Integer>() {@Overridepublic void call(Integer item) {System.out.println(item);}},new Action1<Throwable>() {@Overridepublic void call(Throwable error) {System.out.println("Error encountered: " + error.getMessage());}},new Action0() {@Overridepublic void call() {System.out.println("Sequence complete");}}
);
  • RxJavaAsyncUtil
  • runAsync2
  • decode
Interva

创建一个按固定时间间隔发射整数序列的Observable

just

创建一个发射指定值的Observable

  • 示例
Observable.just(1, 2, 3).subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {System.out.println("Next: " + item);}@Overridepublic void onError(Throwable error) {System.err.println("Error: " + error.getMessage());}@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}});
Range

创建一个发射特定整数序列的Observable

Repeat

创建一个发射特定数据重复多次的Obserable

  • repeatWhen

    • 有条件的重新订阅和发射原来的Observable
  • doWhile
    • 在原始序列的每次重复后检查某个条件,如果满足条件才重复发射
  • whileDo
    • 在原始序列的每次重复前检查某个条件,如果满足条件才重复发射
Start

返回一个Observable,它发射一个类似于函数声明的值

  • 从运算结果中获取值的方法
  • functions、futures、actions、callables、runnables
  • toAsync
  • 对于函数(functions),这个操作符调用这个函数获取一个值,然后返回一个会发射这个值给后续观察者的Observable(和start一样)。对于动作(Action),过程类似,但是没有返回值,在这种情况下,这个操作符在终止前会发射一个null值。 注意:这个函数或动作只会被执行一次,即使多个观察者订阅这个返回的Observable。
  • startFuture
  • 参数为返回Future的函数,startFuture会立即调用这个函数返回Future对象,然后调用Future的get()方法尝试获取它的值。返回一个发送这个值给后续观察者的Observable
  • deferFuture
  • 与startFuture不同的是当有观察者订阅它返回的Observable时,才会立即调用Future的get()方法
  • fromFuture
  • 参数为action,返回Observable,一旦Action终止,发射传递给fromAction的数据
  • fromCallable
  • 参数为Callable,返回发射这个Callable的结果的Obserable
  • fromRunable
  • 参数为Runable,返回发射这个Runable的数据
  • forEachFuture
  • 返回一个Future并且在get()方法处阻塞,直到原始Observable执行完毕,然后返回,完成还是错误依赖于原始Observable是完成还是错误
Timer

创建一个Observable,在一个给定的延迟后发射一个特殊的值

变换操作

Buffer

定期收集Observable的数据放进一个数据包裹,然后发送这些数据包裹,而不是一次发射一个值

  • 注意

    • 如果原来的Observable发射了一个onError通知,Buffer会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始Observable发射的数据
  • buffer(count)
    • 以列表形式发射非重叠的缓存,每一个缓存至多包含来自原始Observable的count项数据(最后发射的列表数据可能少于count项)
  • buffer(count,skip)
    • 从原始Observable第一项数据开始建立缓存,每当收到skip项数据,用count项数据填充缓存
  • buffer(bufferClosingSelector)
    • buffer(bufferClosingSelector)开始将数据收集到一个List,然后它调用bufferClosingSelector生成第二个Observable,当第二个Observable发射一个TClosing时,buffer发射当前的List,然后重复这个过程:开始组装一个新的List,然后调用bufferClosingSelector创建一个新的Observable并监视它。它会一直这样做直到原来的Observable执行完成。
  • buffer(boundary)
    • 监视一个名叫boundary的Observable,每当这个Observable发射了一个值,它就创建一个新的list开始收集来自原始Observable的数据并发送原来的List
  • buffer(bufferOpenings,bufferClosingSelector)
    • 监视叫bufferOpenings的Observable(它发射BufferOpenings对象),每当bufferOpenings发射了一个数据时,它就创建一个新的List开始收集原始Observable的数据,并将bufferOpenings传递给closingSelector函数。这个函数返回一个Observable。buffer监视这个Observable,当它检测到一个来自这个Observable的数据时,就关闭List并且发射它自己的数据(之前的那个List)。
  • buffer(timespan, unit)
    • 定期以List的形式发射新的数据,每个时间段,收集来自原始Observable的数据(从前面一个数据包裹之后,或者如果是第一个数据包裹,从有观察者订阅原来的Observale之后开始)
  • buffer(timespan, unit, count)
    • 每当收到来自原始Observable的count项数据,或者每过了一段指定的时间后,buffer(timespan, unit, count)就以List的形式发射这期间的数据,即使数据项少于count项
  • buffer(timespan, timeshift, unit)
    • 在每一个timeshift时期内都创建一个新的List,然后用原始Observable发射的每一项数据填充这个列表(在把这个List当做自己的数据发射前,从创建时开始,直到过了timespan这么长的时间)。如果timespan长于timeshift,它发射的数据包将会重叠,因此可能包含重复的数据项。
  • buffer-backpressure
    • 使用Buffer操作符实现反压backpressure(意思是,处理这样一个Observable:它产生数据的速度可能比它的观察者消费数据的速度快)。
    • Buffer操作符可以将大量的数据序列缩减为较少的数据缓存序列,让它们更容易处理。例如,你可以按固定的时间间隔,定期关闭和发射来自一个爆发性Observable的数据缓存。这相当于一个缓冲区。
      • 示例代码
        // noinspection JSAnnotatorObservable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
      
FlatMap

将一个发射数据的Observable变换为多个Observables,然后将发射的数据合并后放进一个单独的Observables

  • FlatMap对Observables发射的数据做的是合并操作,有可能是会交错的
  • FlatMapIterable
    • 成对的打包数据,然后生成Iterable而不是原始数据和生成的Observables,处理方式相同
  • concatMap
    • 按次序连接而不是合并生成的Observables,然后产生自己的数据序列
  • switchMap
    • 当原始Observable发射一个新的数据(Observable)时,它将取消订阅并停止监视产生执之前的那个数据的Observable,只监视当前这一个
  • spit
    • 将一个发射字符串的Observable转换为另一个发射字符串的Observable,只不过,后者将原始的数据序列当做一个数据流,使用一个正则表达式边界分割它们,然后合并发射分割的结果。
GroupBy

将一个Observable分拆成一些Observables集合,他们中的每一个发射原始Observable的一个子序列

  • groupBy将原始Observable分解为一个发射多个GroupedObservable的Observable,一旦有订阅,每个GroupedObservable就开始缓存数据。因此,如果你忽略这些GroupedObservable中的任何一个,这个缓存可能形成一个潜在的内存泄露。因此,如果你不想观察,也不要忽略GroupedObservable。你应该使用像take(0)这样会丢弃自己的缓存的操作符。
Map

对Observable发射的每一项数据应用一个函数,执行变换操作

  • cast

    • 将原始Observable发射的每一项数据都强制转换为一个指定的类型,然后发射数据。
  • encode
    • 将一个发射字符串的Observable变换为一个发射字节数组的Observable
  • byLine
    • 将一个发射字符串的Observable变换为一个按行发射来自原始Observable的字符串的Observable。
Scan

连续的对数据序列的每一项应用一个函数,然后连续发射结果

  • 示例代码
Observable.just(1, 2, 3, 4, 5).scan(new Func2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer sum, Integer item) {return sum + item;}}).subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {System.out.println("Next: " + item);}@Overridepublic void onError(Throwable error) {System.err.println("Error: " + error.getMessage());}@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}});
Window

个人不常用,暂无描述

过滤操作

DeBounce/throttleWithTimeout

仅在过了一段指定的时间后还没发射数据时才发射一个数据

  • 会过滤掉发射速率过快的数据项
Distinct

过滤掉重复的数据项,只允许还没有发射过的数据项通过

  • 示例代码

      Observable.just(1, 2, 1, 1, 2, 3).distinct().subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {System.out.println("Next: " + item);}@Overridepublic void onError(Throwable error) {System.err.println("Error: " + error.getMessage());}@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}});
    
Filter

只发射通过测试的数据

  • 示例代码

    Observable.just(1, 2, 3, 4, 5).filter(new Func1<Integer, Boolean>() {@Overridepublic Boolean call(Integer item) {return( item < 4 );}}).subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {System.out.println("Next: " + item);}@Overridepublic void onError(Throwable error) {System.err.println("Error: " + error.getMessage());}@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}});
    
  • ofType
    • 一个特殊形式。它过滤一个Observable只返回指定类型的数据

结合操作

And/Then/When

使用Pattern和Plan作为中介,将两个或多个Observable发射的数据合并在一起

Join

任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一个数据,就结合两个Observable发射的数据

Merge

合并多个Observables的发射物

  • 示例代码
Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler);
Observable<Integer> evens = Observable.just(2, 4, 6);Observable.merge(odds, evens).subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {System.out.println("Next: " + item);}@Overridepublic void onError(Throwable error) {System.err.println("Error: " + error.getMessage());}@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}});
StartWitch

在数据序列的开头插入一条指定的数据

Switch

将一个发射多个Obversables的Observable转换成另一个单独的Observerable,后者发射那些Observables最近发射的数据

Zip

通过一个函数将多个Observeables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项

错误处理

Catch

从onError通知中回复发射数据,拦截原始的Observable的onError同时,将它替换为其他的数据项或数据序列,让产生的Observable能够正常终止或者根本不停止

Retry

如果原始Observable遇到错误,重新订阅它期望它能正常终止

辅助操作

Delay

延迟一段指定的时间再来发射来自Observable的发射物

Do

注册一个动作作为原始Observable生命周期时间的一种占位符

ObserveOn

指定一个观察者在哪个调度器上观察这个Observable

Subscribe

操作来自Observable的发射物和通知,是观察者与Observable之间的桥梁

SubscribeOn

指定Observable自身在哪个调度器上面执行

TimeOut

对原始Observable的一个镜像,超时会发错误通知

Observable常用知识点整理相关推荐

  1. 敏捷ACP 常用关键词整理 敏捷ACP 常用知识点整理

    敏捷ACP 常用关键词整理   敏捷ACP 常用知识点整理 一.MoSCoW 1.MoSCoW : 读作"莫斯科",适用于故事优先级的排序,首次出现在 3-13敏捷产品实践:产品待 ...

  2. oracle 常用知识点整理

    转 :  oracle 常用知识点 原文链接:http://blog.csdn.net/weijiaxiaobao/article/details/51323573 Oracle 是一个庞大的系统,里 ...

  3. CTF常用知识点整理(个人刷题中整理)

    由于是之前刷题的整理,参照了很多大神的博客,由于过于零碎,没能记录下各位大神的文章出处(以后会提高版权意识的),如有侵权,私聊补加出处或者删文章. 博主是入门半年的萌新,文章不可避免会有很多错误,还请 ...

  4. HTML中常用知识点整理

    html复习 HTML 是一个超文本标记语言 w3c标准:结构标准,表现标准,行为标准. 基本结构 <!doctype html> <html><head>< ...

  5. Flow 常用知识点整理

    Flow入门初识 Flow是facebook出品的JavaScript静态类型检查工具. 由于JavaScript是动态类型语言,它的灵活性也会造成一些代码隐患,使用Flow可以在编译期尽早发现由类型 ...

  6. matplotlib一些常用知识点的整理,

    本文作为学习过程中对matplotlib一些常用知识点的整理,方便查找. 强烈推荐ipython 无论你工作在什么项目上,IPython都是值得推荐的.利用ipython --pylab,可以进入Py ...

  7. mysql 存储引擎 面试_搞定PHP面试 - MySQL基础知识点整理 - 存储引擎

    MySQL基础知识点整理 - 存储引擎 0. 查看 MySQL 支持的存储引擎 可以在 mysql 客户端中,使用 show engines; 命令可以查看MySQL支持的引擎: mysql> ...

  8. 大数据 -- kafka学习笔记:知识点整理(部分转载)

    一 为什么需要消息系统 1.解耦 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险.许多 ...

  9. 高级 Java 面试通关知识点整理

    转载自 高级 Java 面试通关知识点整理 1.常用设计模式 单例模式:懒汉式.饿汉式.双重校验锁.静态加载,内部类加载.枚举类加载.保证一个类仅有一个实例,并提供一个访问它的全局访问点. 代理模式: ...

最新文章

  1. 安装labelImg(win10,macOS)
  2. Faster RCNN原理分析(二):Region Proposal Networks详解
  3. 读者问:小公司,但工资高,能去吗?
  4. 使用maven引入Apache poi jar包
  5. 复数抽象数据类型C语言,采用C/C++语言如何实现复数抽象数据类型Complex
  6. 打印三角形流程控制练习
  7. hadoop: Operation category READ is not supported in state standby
  8. App后台开发运维和架构实践学习总结(5)——App产品从需求到研发到开发到上线到产品迭代全过程
  9. 复变函数系列(三 ) - 复变函数的积分
  10. 错误使用 network/train (line 340) Output data size does not match net.outputs{2}.size.
  11. 小知识--Windows语音效果
  12. java计算机毕业设计运动会管理系统源码+mysql数据库+系统+lw文档+部署
  13. Web前端开发的项目开发流程
  14. 常用的坐标系及其EPSG编码
  15. python写的flappy bird小游戏(最简单版本)有视频链接
  16. 开闭鸿蒙都只为风味情浓,[红楼梦引子]开辟鸿蒙, 谁为情种? 都只为风月情浓。 趁着...
  17. 小米蓝牙键盘怎么连接_小米2手机连接使用蓝牙键盘和蓝牙鼠标教程(原创)
  18. rk3568 | 瑞芯微平台GPIO引脚驱动编写
  19. adb logcat 查询过滤日志
  20. 如何用计算机名查看共享打印机,共享打印机怎么连接?打印机如何共享?查看图文帮你解决...

热门文章

  1. ajax传json数据到后端struts,js与struts如何通过aja以json数据形式进行数据传输
  2. 三十而立,男人必须明白的事
  3. 升级Android Studio Electric Eel问题汇总
  4. 诛仙哪里炼器服务器最稳定,《诛仙网游》把你们见过最高炼器等级发出来
  5. “雷灵”超高旗舰性价比千元机之王
  6. AutoCut : 通过字幕来剪切视频
  7. 鬼泣最稳定的服务器,DNF:国服特色有多强?韩服第一鬼泣在国服连前十都进不了...
  8. 拜年啦!最受科研人员喜爱的春联来了!
  9. 树莓派安装Lakka打造经典小霸王游戏机
  10. c语言对比两个字符串相等,c语言中如何判断两个字符串相等