这可能是最好的 RxJava 2.x 入门教程系列专栏
文章链接:
这可能是最好的 RxJava 2.x 入门教程(完结版)[推荐直接看这个]
这可能是最好的RxJava 2.x 入门教程(一)
这可能是最好的RxJava 2.x 入门教程(二)
这可能是最好的RxJava 2.x 入门教程(三)
这可能是最好的RxJava 2.x 入门教程(四)
这可能是最好的RxJava 2.x 入门教程(五)
GitHub 代码同步更新:https://github.com/nanchen2251/RxJava2Examples
为了满足大家的饥渴难耐,GitHub 将同步更新代码,主要包含基本的代码封装,RxJava 2.x 所有操作符应用场景介绍和实际应用场景,后期除了 RxJava 可能还会增添其他东西,总之,GitHub 上的 Demo 专为大家倾心打造。传送门:https://github.com/nanchen2251/RxJava2Examples

前言

很快我们就迎来了第二期,上一期我们主要讲解了 RxJava 1.x 到 2.x 的变化概览,相信各位熟练掌握RxJava 1.x的老司机们随便看一下变化概览就可以上手RxJava 2.x了,但为了满足更广大的年轻一代司机(未来也是老司机),在本节中,我们将学习RxJava 2.x 强大的操作符章节。
【注】以下所有操作符标题都可直接点击进入官方doc查看。

正题

Create

create 操作符应该是最常见的操作符了,主要用于产生一个 Obserable 被观察者对象,为了方便大家的认知,以后的教程中统一把被观察者 Observable 称为发射器(上游事件),观察者 Observer 称为接收器(下游事件)。

Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {mRxOperatorsText.append("Observable emit 1" + "\n");Log.e(TAG, "Observable emit 1" + "\n");e.onNext(1);mRxOperatorsText.append("Observable emit 2" + "\n");Log.e(TAG, "Observable emit 2" + "\n");e.onNext(2);mRxOperatorsText.append("Observable emit 3" + "\n");Log.e(TAG, "Observable emit 3" + "\n");e.onNext(3);e.onComplete();mRxOperatorsText.append("Observable emit 4" + "\n");Log.e(TAG, "Observable emit 4" + "\n" );e.onNext(4);}}).subscribe(new Observer<Integer>() {private int i;private Disposable mDisposable;@Overridepublic void onSubscribe(@NonNull Disposable d) {mRxOperatorsText.append("onSubscribe : " + d.isDisposed() + "\n");Log.e(TAG, "onSubscribe : " + d.isDisposed() + "\n" );mDisposable = d;}@Overridepublic void onNext(@NonNull Integer integer) {mRxOperatorsText.append("onNext : value : " + integer + "\n");Log.e(TAG, "onNext : value : " + integer + "\n" );i++;if (i == 2) {// 在RxJava 2.x 中,新增的Disposable可以做到切断的操作,让Observer观察者不再接收上游事件mDisposable.dispose();mRxOperatorsText.append("onNext : isDisposable : " + mDisposable.isDisposed() + "\n");Log.e(TAG, "onNext : isDisposable : " + mDisposable.isDisposed() + "\n");}}@Overridepublic void onError(@NonNull Throwable e) {mRxOperatorsText.append("onError : value : " + e.getMessage() + "\n");Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );}@Overridepublic void onComplete() {mRxOperatorsText.append("onComplete" + "\n");Log.e(TAG, "onComplete" + "\n" );}});

输出:

需要注意的几点是:

  • 在发射事件中,我们在发射了数值 3 之后,直接调用了 e.onComlete(),虽然无法接收事件,但发送事件还是继续的。

  • 另外一个值得注意的点是,在 RxJava 2.x 中,可以看到发射事件方法相比 1.x 多了一个 throws Excetion,意味着我们做一些特定操作再也不用 try-catch 了。

  • 并且 2.x 中有一个 Disposable 概念,这个东西可以直接调用切断,可以看到,当它的 isDisposed() 返回为 false 的时候,接收器能正常接收事件,但当其为 true 的时候,接收器停止了接收。所以可以通过此参数动态控制接收事件了。

Map

Map 基本算是 RxJava 中一个最简单的操作符了,熟悉 RxJava 1.x 的知道,它的作用是对发射时间发送的每一个事件应用一个函数,是的每一个事件都按照指定的函数去变化,而在 2.x 中它的作用几乎一致。

Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);}}).map(new Function<Integer, String>() {@Overridepublic String apply(@NonNull Integer integer) throws Exception {return "This is result " + integer;}}).subscribe(new Consumer<String>() {@Overridepublic void accept(@NonNull String s) throws Exception {mRxOperatorsText.append("accept : " + s +"\n");Log.e(TAG, "accept : " + s +"\n" );}});

输出:

是的,map 基本作用就是将一个 Observable 通过某种函数关系,转换为另一种 Observable,上面例子中就是把我们的 Integer 数据变成了 String 类型。从Log日志显而易见。

Zip

zip 专用于合并事件,该合并不是连接(连接操作符后面会说),而是两两配对,也就意味着,最终配对出的 Observable 发射事件数目只和少的那个相同。

Observable.zip(getStringObservable(), getIntegerObservable(), new BiFunction<String, Integer, String>() {@Overridepublic String apply(@NonNull String s, @NonNull Integer integer) throws Exception {return s + integer;}}).subscribe(new Consumer<String>() {@Overridepublic void accept(@NonNull String s) throws Exception {mRxOperatorsText.append("zip : accept : " + s + "\n");Log.e(TAG, "zip : accept : " + s + "\n");}});
private Observable<String> getStringObservable() {return Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {if (!e.isDisposed()) {e.onNext("A");mRxOperatorsText.append("String emit : A \n");Log.e(TAG, "String emit : A \n");e.onNext("B");mRxOperatorsText.append("String emit : B \n");Log.e(TAG, "String emit : B \n");e.onNext("C");mRxOperatorsText.append("String emit : C \n");Log.e(TAG, "String emit : C \n");}}});}private Observable<Integer> getIntegerObservable() {return Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {if (!e.isDisposed()) {e.onNext(1);mRxOperatorsText.append("Integer emit : 1 \n");Log.e(TAG, "Integer emit : 1 \n");e.onNext(2);mRxOperatorsText.append("Integer emit : 2 \n");Log.e(TAG, "Integer emit : 2 \n");e.onNext(3);mRxOperatorsText.append("Integer emit : 3 \n");Log.e(TAG, "Integer emit : 3 \n");e.onNext(4);mRxOperatorsText.append("Integer emit : 4 \n");Log.e(TAG, "Integer emit : 4 \n");e.onNext(5);mRxOperatorsText.append("Integer emit : 5 \n");Log.e(TAG, "Integer emit : 5 \n");}}});}

输出:

需要注意的是:

  • zip 组合事件的过程就是分别从发射器 A 和发射器 B 各取出一个事件来组合,并且一个事件只能被使用一次,组合的顺序是严格按照事件发送的顺序来进行的,所以上面截图中,可以看到,1 永远是和 A 结合的,2 永远是和 B 结合的。

  • 最终接收器收到的事件数量是和发送器发送事件最少的那个发送器的发送事件数目相同,所以如截图中,5 很孤单,没有人愿意和它交往,孤独终老的单身狗。

Concat

对于单一的把两个发射器连接成一个发射器,虽然 zip 不能完成,但我们还是可以自力更生,官方提供的 concat 让我们的问题得到了完美解决。

Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6)).subscribe(new Consumer<Integer>() {@Overridepublic void accept(@NonNull Integer integer) throws Exception {mRxOperatorsText.append("concat : "+ integer + "\n");Log.e(TAG, "concat : "+ integer + "\n" );}});

输出:

如图,可以看到。发射器 B 把自己的三个孩子送给了发射器 A,让他们组合成了一个新的发射器,非常懂事的孩子,有条不紊的排序接收。

FlatMap

FlatMap 是一个很有趣的东西,我坚信你在实际开发中会经常用到。它可以把一个发射器 Observable 通过某种方法转换为多个 Observables,然后再把这些分散的 Observables装进一个单一的发射器 Observable。但有个需要注意的是,flatMap 并不能保证事件的顺序,如果需要保证,需要用到我们下面要讲的 ConcatMap

Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);}}).flatMap(new Function<Integer, ObservableSource<String>>() {@Overridepublic ObservableSource<String> apply(@NonNull Integer integer) throws Exception {List<String> list = new ArrayList<>();for (int i = 0; i < 3; i++) {list.add("I am value " + integer);}int delayTime = (int) (1 + Math.random() * 10);return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);}}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {@Overridepublic void accept(@NonNull String s) throws Exception {Log.e(TAG, "flatMap : accept : " + s + "\n");mRxOperatorsText.append("flatMap : accept : " + s + "\n");}});

输出:

一切都如我们预期中的有意思,为了区分 concatMap(下一个会讲),我在代码中特意动了一点小手脚,我采用一个随机数,生成一个时间,然后通过 delay(后面会讲)操作符,做一个小延时操作,而查看 Log 日志也确认验证了我们上面的说法,它是无序的。

concatMap

上面其实就说了,concatMapFlatMap 的唯一区别就是 concatMap 保证了顺序,所以,我们就直接把 flatMap 替换为 concatMap 验证吧。

Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);}}).concatMap(new Function<Integer, ObservableSource<String>>() {@Overridepublic ObservableSource<String> apply(@NonNull Integer integer) throws Exception {List<String> list = new ArrayList<>();for (int i = 0; i < 3; i++) {list.add("I am value " + integer);}int delayTime = (int) (1 + Math.random() * 10);return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);}}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {@Overridepublic void accept(@NonNull String s) throws Exception {Log.e(TAG, "flatMap : accept : " + s + "\n");mRxOperatorsText.append("flatMap : accept : " + s + "\n");}});

输出:

结果的确和我们预想的一样。

写在最后

好了,这一节就先介绍到这里,下一节我们将学习其它的一些操作符,在操作符讲完后再带大家进入实际情景,希望持续关注,代码传送门

做不完的开源,写不完的矫情。欢迎扫描下方二维码或者公众号搜索「nanchen」关注我的微信公众号,目前多运营 Android ,尽自己所能为你提升。如果你喜欢,为我点赞分享吧~

nanchen

这可能是最好的RxJava 2.x 入门教程(二)相关推荐

  1. 【知识整理】这可能是最好的RxJava 2.x 入门教程(四)

    这可能是最好的RxJava 2.x入门教程系列专栏 文章链接: 这可能是最好的RxJava 2.x 入门教程(完结版)[强力推荐] 这可能是最好的RxJava 2.x 入门教程(一) 这可能是最好的R ...

  2. RxJava 2.x入门教程

    前言 首先来说一下rxjava1和rxjava2的区别吧,附带一些RxJava 1升级到RxJava 2过程中踩过的一些"坑",RxJava 对大家而言肯定不陌生,其受欢迎程度不言 ...

  3. RxJava 2.0 入门教程

    RxJava 2.0 入门教程 RxJava 2.0 是来自NetFlix的开源java异步编程框架.和java 8 lambda表达式很接近,响应式编程的基本构建快是被观察对象(Observable ...

  4. 这可能是最好的RxJava 2.x 入门教程学习系列

    前言 在网上看到一个讲rxjava2系列的文章,然后跟着学了一遍,下面是我跟着学习的代码,后续还会附上一张rxjav2学习的思维导图. github官方链接 https://github.com/Re ...

  5. RxJava 2.x 入门

    之前只大概了解RxJava,并没在实际的项目中实战过,但最近在研究讯飞语音的一个demo的时候发现,他们都在使用mvvm,dagger2,rxjava2.x, 姿态很优雅,很吸引人,心想,卧槽再不尝试 ...

  6. java rx_史上最浅显易懂的RxJava入门教程

    什么是RxJava 将上面的例子进行代码抽象,步骤如下: 提供观察者(因为你是关心杂志内容的人 所以你是观察该事件的人) 提供被观察者(只要有新的杂志出来 就需要通知关心的人 所以报社是被观察的对象) ...

  7. Android响应式编程(一)RxJava前篇[入门基础]

    1.RxJava概述 ReactiveX与RxJava 在讲到RxJava之前我们首先要了解什么是ReactiveX,因为RxJava是ReactiveX的一种java实现. ReactiveX是Re ...

  8. 史上最浅显易懂的 RxJava 入门教程

    来源:KunMinX www.jianshu.com/p/f392727c5aca 工作需要刚好在学习 RxJava + Retrofit2 + OkHttp3 网络请求框架,网上搜了一些 RxJav ...

  9. Rxjava:基础入门

    定义 Rxjava基于事件流.实现异步操作. 使用 引入依赖 implementation "io.reactivex.rxjava3:rxjava:3.1.2" implemen ...

最新文章

  1. 第二节 数学基础与语言学基础
  2. 谈谈SaaS创业和企业服务的常识
  3. 留的住叫做幸福. 流逝的叫做遗憾
  4. 为什么在C#中捕获并抛出异常?
  5. DUMP3 企业级电商项目
  6. BOOST_VMD_ASSERT_IS_NUMBER宏相关的测试程序
  7. 剑指OFFER之字符串的排列(九度OJ1369)
  8. 台电x80plus装linux,纤巧却不简单——台电X80 Plus评测
  9. IT兄弟连 JavaWeb教程 JSP经典面试题
  10. Linux内核网络数据包处理流程
  11. linux yum 目录在哪,急问怎么知道yum从哪个地址下载的文件呢?
  12. (ExcelVBA编程入门范例)
  13. 信工所复试收集材料分享
  14. ibm软件工程师含金量_.NET软件工程师最好取得哪些证书?(高分求高手)
  15. Android开发之so文件使用方法详解
  16. 携程、同程、QQ音乐、天猫...等14款APP被点名:涉嫌过度收集用户信息
  17. 2021阿里云服务器购买攻略-618年中大促专场
  18. 各种各样的搜索(⊙ ▽ ⊙)(1)
  19. 如何在浏览器中直接打开Word/Excel等Office文档?
  20. 使用MATLAB的EEGLAB和BCT工具箱画脑网络连接图

热门文章

  1. 【C】浅谈strcpy
  2. 【CodeForces】961 F. k-substrings 字符串哈希+二分
  3. Python 学习笔记 - 字典
  4. F5负载均衡配置手册-实操后的
  5. Linux基础 常用命令学习
  6. 【EntityFramework系列教程三,翻译】在ASP.NET MVC程序中使用EntityFramework对数据进行排序、过滤筛选以及实现分页...
  7. AWS太过强势?VMware为自保疑与微软达成合作
  8. React使用Styled-Componets来添加样式
  9. Redis 4.0.X版本reshard出现错误的解决办法
  10. python 字符编码