一、ReactiveX简介

在学习RxJava前首先需要了解ReactiveX,因为RxJava是ReactiveX的一种Java的实现形式

  • ReactiveX的官网地址为:
 http://reactivex.io/

ReactiveX官网对于自身的介绍是:

An API for asynchronous programming with observable streams

实质上我们可以对其解读为三部分:

ReactiveX的解读
①An API: 首先它是个编程接口,不同语言提供不同实现。例如Java中的RxJava。
②For asynchronous programming: 在异步编程设计中使用。例如开启子线程处理耗时的网络请求。
③With observable streams: 基于可观察的事件流。例如观察者模式中观察者对被观察者的监听。

而ReactiveX结合了如下三部分内容:

  1. 观察者模式,即定义对象间一种一对多的依赖关系,当一个对象改变状态时,则所有依赖它的对象都会被改变。
  2. Iterator模式,即迭代流式编程模式。
  3. 函数式编程模式,即提供一系列函数样式的方法供快速开发。

Reactive的模式图如下:

图1.1 ReactiveX的模式图

二、RxJava的使用

1、RxJava的优势

在Android的SDK中,给开发者提供的用于异步操作的原生内容有AsyncTask和Handler。对于简单的异步请求来说,使用Android原生的AsyncTask和Handler即可满足需求,但是对于复杂的业务逻辑而言,依然使用AsyncTask和Handler会导致代码结构混乱,代码的可读性非常差。
但是RxJava的异步操作是基于观察者模式实现的,在越来越复杂的业务逻辑中,RxJava依旧可以保持简洁

2、RxJava的配置

首先,在Android Studio中配置Module的build.gradle,在这里我们使用的版本是1.2版本,并且导入RxAndroid,辅助RxJava完成线程调度:

        implementation "io.reactivex:rxjava:1.2.0"implementation "io.reactivex:rxandroid:1.2.0"

然后,RxJava基于观察者设计模式,其中的关键性三个步骤如下:

(1)Observable被观察者

Observable被观察者创建的代码如下:

        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("Alex");subscriber.onCompleted();}});

在这里,要强调的是Observable被观察者是类类型,其中有诸多方法,我们关注其构造函数与创建Observable对象的方法,查看如下图对应的视图结构:

图2.2.1 Observable被观察者对象的视图结构

查看源码:

        protected Observable(OnSubscribe<T> f) {this.onSubscribe = f;}public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {}
        public static <T> Observable<T> create(OnSubscribe<T> f) {return new Observable<T>(RxJavaHooks.onCreate(f));}public static <S, T> Observable<T> create(SyncOnSubscribe<S, T> syncOnSubscribe) {return create((OnSubscribe<T>)syncOnSubscribe);}public static <S, T> Observable<T> create(AsyncOnSubscribe<S, T> asyncOnSubscribe) {return create((OnSubscribe<T>)asyncOnSubscribe);}

通过源码分析,可知Observable提供了create()方法来获取Observable实例对象。
此外,除了基本的创建的方法,Observable还提供了便捷的创建Observable序列的两种方式,代码如下:

  • 第一种,会将参数逐个发送
        Observable<String> observable1 = Observable.just("Alex","Payne");
  • 第二种,会将数组元素逐个转换完毕后逐个发送
        String[] observableArr = {"Alex", "Payne"};Observable<String> observable2 = Observable.from(observableArr);

其中Observable.just()方法会调用from()方法,详情可查看源码。

(2)Observer观察者

Observer观察者创建的代码如下:

        Observer<String> observer = new Observer<String>() {@Overridepublic void onCompleted() {Log.e(TAG, "onCompleted");}@Overridepublic void onError(Throwable e) {Log.e(TAG, "onError,Error Info is:" + e.getMessage());}@Overridepublic void onNext(String s) {Log.e(TAG, s);}};

Observer是接口,其中包含的方法有onCompleted()、onNext()、onError()。查看如下图所示Observer的视图结构:

图2.2.2 Observer观察者对象的视图结构

那么在RxJava中,Observer有其接口实现类对象Subscriber,它们的使用onNext、onCompleted、onError方法是一样的,但是Subscriber对于Observer接口进行了拓展,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用,代码如下:

        Subscriber<String> subscriber = new Subscriber<String>() {@Overridepublic void onStart() {Log.e(TAG, "onStart");}@Overridepublic void onCompleted() {Log.e(TAG, "onCompleted");}@Overridepublic void onError(Throwable e) {Log.e(TAG, "onError,Error Info is:" + e.getMessage());}@Overridepublic void onNext(String s) {Log.e(TAG, s);}};

其中,onStart()方法会在事件未发送前被调用,可以用于订阅关系建立前的准备工作,例如将数据清空或者重置,在Subscriber中默认是空实现,我们可以在该方法中调用自己的业务逻辑代码。在如下的视图结构中我们可以看到Subscriber的拓展内容,重点是add()、unsubscribe()方法以及名为subscription的Subscription队列

图2.2.3 Subscriber对象视图结构

(3)Subscribe订阅关系

Observable与observer形成订阅关系代码如下:

            observable.subscribe(observer);//或者observable.subscribe(subscriber);

那么我们以observable.subscribe(observer)为例在这里继续查看源码,查看subscribe()方法到底做了什么:

图2.3.1 Observable调用Subscribe将Observer转换为Subscriber对象

Observer转换为Subscriber对象在这里得到印证。

  • 在之后的内容中统一以Subscriber作为订阅观察者对象

继续深入,我们可以看到订阅关系中的关键步骤(仅核心代码):

            subscriber.onStart();RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);return RxJavaHooks.onObservableReturn(subscriber);

在这里RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)等价于OnSubscribe.call(subscriber),见下图2.3.2:

图2.3.2 RxJavaHooks.onObservableStart()转换为OnSubscribe

在return RxJavaHooks.onObservableReturn(subscriber)这里等价于return subscription,见下图2.3.3:

图2.3.3 RxJavaHooks.onObservableReturn()转换为Subscrition

  • 可以看到,subscriber() 做了3件事:
  1. 调用 Subscriber.onStart() 。该方法用于在订阅关系建立之前的准备。
  2. 调用 Observable 中的 OnSubscribe.call(Subscriber) 。OnSubscribe是Observable的内部接口,而事件发送的逻辑在这里开始运行。从这也可以看出,在 RxJava 中当 subscribe() 方法执行的时候订阅关系确立,Observable 开始发送事件。
  3. 将传入的 Subscriber 作为 Subscription 返回。这是为了方便后续调用unsubscribe()。

三、RxJava的不完整回调

1、不完整回调的代码示例

        Observable<String> observable = Observable.just("Alex", "Payne");Action1<String> onNextAction = new Action1<String>() {@Overridepublic void call(String s) {Log.e(TAG, s);}};Action1<Throwable> onErrorAction = new Action1<Throwable>() {@Overridepublic void call(Throwable throwable) {Log.e(TAG, "onError,Error Info is:" + throwable.getMessage());}};Action0 onCompletedAction = new Action0() {@Overridepublic void call() {Log.e(TAG, "onCompleted");}};// 根据onNextAction 来定义 onNext()observable.subscribe(onNextAction);// 根据onNextAction 来定义 onNext()、根据onErrorAction 来定义 onError()observable.subscribe(onNextAction, onErrorAction);// 根据onNextAction 来定义 onNext()、根据onErrorAction 来定义 onError()、onCompletedAction 来定义 onCompleted()observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

2、不完整回调的原理分析

在这里我们可以看到:

Action0无参数泛型无返回值类型,而Subscriber中的onCompleted()方法也没有参数泛型
Action1有1个参数泛型无返回值类型 ,onNextAction设置的参数泛型为String,而Subscriber中的onNext()方法参数泛型也是String(和本文中观察者对象中的OnNext方法对比)
Action1有1个参数泛型无返回值类型,onErrorAction设置的参数泛型为Throwable,而Subscriber中的onError()方法参数泛型也是Throwable

那么,我们来查看observable.subscribe(onNextAction)的源码,在这里, Action1可以被当成一个包装对象,将onNext()方法进行包装作为不完整的回调传入到observable.subscribe()中

图3.2.1 传入的onNextAction最终被包装成ActionSubscriber

我们来看看Action1有何玄机,Action1的源码如下图所示:

图3.2.2 Action1接口源码

实质上,这种根据参数泛型的个数且无返回值类型的包装在RxJava中有多种如下图所示的体现,例如Action0的参数个数为0,Action1的参数个数为1以此类推:

图3.2.3 根据参数泛型的个数且无返回值类型的包装

四、RxJava的线程切换

1、Scheduler线程调度器

如果不指定线程,默认是在调用subscribe方法的线程上进行回调,那么如果子线程中调用subscibe方法,而想在主线程中进行UI更新,则会抛出异常。当然了RxJava已经帮我们考虑到了这一点,所以提供了Scheduler线程调度器帮助我们进行线程之间的切换。
实质上,Scheduler线程调度器和RxJava的操作符有紧密关联,我将在下一篇文章中进行详细介绍。

  • RxJava内置了如下所示几个的线程调度器:
  1. Schedulers.immediate():在当前线程中执行
  2. Schedulers.newThread():启动新线程,在新线程中进行操作
  3. Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  4. Schedulers.computation():计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  5. Schedulers.trampoline():会将任务按添加顺序依次放入当前线程中等待执行。线程一次只执行一个任务,其余任务排队等待,一个任务都执行完成后再开始下一个任务的执行。
  • 此外RxJava还提供了用于测试的调度器Schedulers.test() 及 可自定义Scheduler—-Schedulers.form() 。

RxAndroid并且其为我们提供了AndroidSchedulers.mainThread()进行主线程的回调

2、线程控制

调用Observable对象的subscribeOn()、observeOn()方法即可完成线程控制。

  • subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
  • observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
            Observable.just("Alex", "Payne").subscribeOn(Schedulers.io())//指定 subscribe() 所发生的线程.unsubscribeOn(Schedulers.io())//事件发送完毕后,及时取消发送.observeOn(AndroidSchedulers.mainThread())//指定 Subscriber 所运行在的线程.subscribe(new Action1<String>() {@Overridepublic void call(String s) {Log.e(TAG, s);}});

五、总结

本文主要介绍了RxJava的由来、使用步骤、部分内容的原理解析。在下篇文章中我会详细介绍RxJava的操作符。希望本文对你在学习RxJava的路上有所启发。

小礼物走一走,来简书关注我

作者:Alex_Payne
链接:https://www.jianshu.com/p/2d3d7c77dc92
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

ReactiveX简介相关推荐

  1. ReactiveX 简介

    Observable 在ReactiveX中,观察者订阅了一个Observable.然后,该观察者对Observable发出的任何项目或项目序列做出反应. 这种模式有利于并发操作,因为它不需要在等待O ...

  2. Java中ReactiveX(RxJava)的使用

    1.1 ReactiveX概述 ReactiveX官网:ReactiveX 1.1.1 Android的MVP开发模式 MVP的工作流程 Presenter负责逻辑的处理 Model提供数据 View ...

  3. 响应式编程简介之:Reactor

    文章目录 简介 Reactor简介 reactive programming的发展史 Iterable-Iterator 和Publisher-Subscriber的区别 为什么要使用异步reacti ...

  4. Redux中的功能式React式编程简介

    by Bhuvan Malik 通过布凡·马利克(Bhuvan Malik) Redux中的功能式React式编程简介 (An introduction to functional Reactive ...

  5. RxJS异步编程的简介

    RxJs.RxJava.RxAndroid.RxSwift等是异步编程框架,对于前端工程师来说,RxJs框架是比较新颖. 中文官网地址:http://cn.rx.js.org/ RxJs简介 RxJS ...

  6. 《Kotlin 程序设计》第一章 Kotlin简介

    第一章 Kotlin简介 科特林岛(Котлин)是一座俄罗斯的岛屿,位于圣彼得堡以西约30公里处,形状狭长,东西长度约14公里,南北宽度约2公里,面积有16平方公里,扼守俄国进入芬兰湾的水道.科特林 ...

  7. 【ReactiveX】基于Golang pmlpml/RxGo程序包的二次开发

    基于Golang pmlpml/RxGo程序包的二次开发[阅读时间:约20分钟] 一.ReactiveX & RxGo介绍 1.ReactiveX 2.RxGo 二.系统环境&项目介绍 ...

  8. etcd 笔记(01)— etcd 简介、特点、应用场景、常用术语、分布式 CAP 理论、分布式原理

    1. etcd 简介 etcd 官网定义: A highly-available key value store for shared configuration and service discov ...

  9. Docker学习(一)-----Docker简介与安装

    一.Docker介绍 1.1什么是docker Docker是一个开源的应用容器引擎,基于Go语言并遵从Apache2.0协议开源 Docker可以让开发者打包他们的应用以及依赖包到一个轻量级,可移植 ...

最新文章

  1. python中递归函数特点,Python递归函数特点及原理解析
  2. spring el 表达式的上下文关联到 ApplicationContext
  3. map平均准确率_第五篇 目标检测评价标准—MAP
  4. 方法的重载与重写_我们不一样,不一样,重写与重载
  5. 爬虫 推送到discord_如何将自定义表情符号添加到Discord服务器
  6. 人人接龙助手,三分钟为你的微信群创建打卡活动
  7. 微信检测器 微信过滤工具
  8. Windows 8 激活信息备份还原方法与工具
  9. Unity获取手机app列表《三》安卓端
  10. 代码随想录第八天 LeetCode 344、541、剑指Offer 05、151、剑指Offer58 (字符串)
  11. jenkins k8s 动态增减 jenkins-salve (1) 制作部署jenkins-master 镜像
  12. js数组与字符串的相互转化
  13. 2022下半年教师资格证报名学历要求
  14. magisk安装与配置
  15. oracle user does not exist,user 'scott' does not exist! 又学了一招
  16. 自控考研复习 自我梳理(三) 知识来自网络,纯为总结侵权即删(一阶系统)
  17. 制作Linux版PE系统
  18. 一篇很好的机器学习介绍
  19. 八大排序算法 —— 归并排序
  20. 如何将手机PDF转成JPG图片?分享两个技巧

热门文章

  1. 华为的FusionInsight智能数据生态地图
  2. 用js随机生成随机车牌照
  3. 天涯明月刀手游显示服务器已满,《天涯明月刀手游》服务器爆满进不去游戏怎么办,排队时间太长怎么办?...
  4. 计算机环境与应用专业就业前景,2018资源环境科学专业就业前景和就业方向分析...
  5. 计算机毕业设计JAVA宠物咖啡馆平台系统mybatis+源码+调试部署+系统+数据库+lw
  6. 【家庭财务管理系统】
  7. 疫情期间给女朋友敲段代码,给女朋友个小惊喜
  8. 在React中使用防抖节流
  9. 使用C#实现简单的21点小游戏
  10. css分隔线中间带文字