ReactiveX简介
一、ReactiveX简介
在学习RxJava前首先需要了解ReactiveX,因为RxJava是ReactiveX的一种Java的实现形式。
- ReactiveX的官网地址为:
http://reactivex.io/
ReactiveX官网对于自身的介绍是:
An API for asynchronous programming with observable streams
实质上我们可以对其解读为三部分:
ReactiveX的解读 |
---|
①An API: 首先它是个编程接口,不同语言提供不同实现。例如Java中的RxJava。 |
②For asynchronous programming: 在异步编程设计中使用。例如开启子线程处理耗时的网络请求。 |
③With observable streams: 基于可观察的事件流。例如观察者模式中观察者对被观察者的监听。 |
而ReactiveX结合了如下三部分内容:
- 观察者模式,即定义对象间一种一对多的依赖关系,当一个对象改变状态时,则所有依赖它的对象都会被改变。
- Iterator模式,即迭代流式编程模式。
- 函数式编程模式,即提供一系列函数样式的方法供快速开发。
Reactive的模式图如下:
图1.1 ReactiveX的模式图
二、RxJava的使用
1、RxJava的优势
在Android的SDK中,给开发者提供的用于异步操作的原生内容有AsyncTask和Handler。对于简单的异步请求来说,使用Android原生的AsyncTask和Handler即可满足需求,但是对于复杂的业务逻辑而言,依然使用AsyncTask和Handler会导致代码结构混乱,代码的可读性非常差。
但是RxJava的异步操作是基于观察者模式实现的,在越来越复杂的业务逻辑中,RxJava依旧可以保持简洁
2、RxJava的配置
首先,在Android Studio中配置Module的build.gradle,在这里我们使用的版本是1.2版本,并且导入RxAndroid,辅助RxJava完成线程调度:
implementation "io.reactivex:rxjava:1.2.0"implementation "io.reactivex:rxandroid:1.2.0"
然后,RxJava基于观察者设计模式,其中的关键性三个步骤如下:
(1)Observable被观察者
Observable被观察者创建的代码如下:
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("Alex");subscriber.onCompleted();}});
在这里,要强调的是Observable被观察者是类类型,其中有诸多方法,我们关注其构造函数与创建Observable对象的方法,查看如下图对应的视图结构:
图2.2.1 Observable被观察者对象的视图结构
查看源码:
protected Observable(OnSubscribe<T> f) {this.onSubscribe = f;}public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {}
public static <T> Observable<T> create(OnSubscribe<T> f) {return new Observable<T>(RxJavaHooks.onCreate(f));}public static <S, T> Observable<T> create(SyncOnSubscribe<S, T> syncOnSubscribe) {return create((OnSubscribe<T>)syncOnSubscribe);}public static <S, T> Observable<T> create(AsyncOnSubscribe<S, T> asyncOnSubscribe) {return create((OnSubscribe<T>)asyncOnSubscribe);}
通过源码分析,可知Observable提供了create()方法来获取Observable实例对象。
此外,除了基本的创建的方法,Observable还提供了便捷的创建Observable序列的两种方式,代码如下:
- 第一种,会将参数逐个发送
Observable<String> observable1 = Observable.just("Alex","Payne");
- 第二种,会将数组元素逐个转换完毕后逐个发送
String[] observableArr = {"Alex", "Payne"};Observable<String> observable2 = Observable.from(observableArr);
其中Observable.just()方法会调用from()方法,详情可查看源码。
(2)Observer观察者
Observer观察者创建的代码如下:
Observer<String> observer = new Observer<String>() {@Overridepublic void onCompleted() {Log.e(TAG, "onCompleted");}@Overridepublic void onError(Throwable e) {Log.e(TAG, "onError,Error Info is:" + e.getMessage());}@Overridepublic void onNext(String s) {Log.e(TAG, s);}};
Observer是接口,其中包含的方法有onCompleted()、onNext()、onError()。查看如下图所示Observer的视图结构:
图2.2.2 Observer观察者对象的视图结构
那么在RxJava中,Observer有其接口实现类对象Subscriber,它们的使用onNext、onCompleted、onError方法是一样的,但是Subscriber对于Observer接口进行了拓展,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用,代码如下:
Subscriber<String> subscriber = new Subscriber<String>() {@Overridepublic void onStart() {Log.e(TAG, "onStart");}@Overridepublic void onCompleted() {Log.e(TAG, "onCompleted");}@Overridepublic void onError(Throwable e) {Log.e(TAG, "onError,Error Info is:" + e.getMessage());}@Overridepublic void onNext(String s) {Log.e(TAG, s);}};
其中,onStart()方法会在事件未发送前被调用,可以用于订阅关系建立前的准备工作,例如将数据清空或者重置,在Subscriber中默认是空实现,我们可以在该方法中调用自己的业务逻辑代码。在如下的视图结构中我们可以看到Subscriber的拓展内容,重点是add()、unsubscribe()方法以及名为subscription的Subscription队列。
图2.2.3 Subscriber对象视图结构
(3)Subscribe订阅关系
Observable与observer形成订阅关系代码如下:
observable.subscribe(observer);//或者observable.subscribe(subscriber);
那么我们以observable.subscribe(observer)为例在这里继续查看源码,查看subscribe()方法到底做了什么:
图2.3.1 Observable调用Subscribe将Observer转换为Subscriber对象
Observer转换为Subscriber对象在这里得到印证。
- 在之后的内容中统一以Subscriber作为订阅观察者对象
继续深入,我们可以看到订阅关系中的关键步骤(仅核心代码):
subscriber.onStart();RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);return RxJavaHooks.onObservableReturn(subscriber);
在这里RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)等价于OnSubscribe.call(subscriber),见下图2.3.2:
图2.3.2 RxJavaHooks.onObservableStart()转换为OnSubscribe
在return RxJavaHooks.onObservableReturn(subscriber)这里等价于return subscription,见下图2.3.3:
图2.3.3 RxJavaHooks.onObservableReturn()转换为Subscrition
- 可以看到,subscriber() 做了3件事:
- 调用 Subscriber.onStart() 。该方法用于在订阅关系建立之前的准备。
- 调用 Observable 中的 OnSubscribe.call(Subscriber) 。OnSubscribe是Observable的内部接口,而事件发送的逻辑在这里开始运行。从这也可以看出,在 RxJava 中当 subscribe() 方法执行的时候订阅关系确立,Observable 开始发送事件。
- 将传入的 Subscriber 作为 Subscription 返回。这是为了方便后续调用unsubscribe()。
三、RxJava的不完整回调
1、不完整回调的代码示例
Observable<String> observable = Observable.just("Alex", "Payne");Action1<String> onNextAction = new Action1<String>() {@Overridepublic void call(String s) {Log.e(TAG, s);}};Action1<Throwable> onErrorAction = new Action1<Throwable>() {@Overridepublic void call(Throwable throwable) {Log.e(TAG, "onError,Error Info is:" + throwable.getMessage());}};Action0 onCompletedAction = new Action0() {@Overridepublic void call() {Log.e(TAG, "onCompleted");}};// 根据onNextAction 来定义 onNext()observable.subscribe(onNextAction);// 根据onNextAction 来定义 onNext()、根据onErrorAction 来定义 onError()observable.subscribe(onNextAction, onErrorAction);// 根据onNextAction 来定义 onNext()、根据onErrorAction 来定义 onError()、onCompletedAction 来定义 onCompleted()observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
2、不完整回调的原理分析
在这里我们可以看到:
Action0无参数泛型无返回值类型,而Subscriber中的onCompleted()方法也没有参数泛型 |
---|
Action1有1个参数泛型无返回值类型 ,onNextAction设置的参数泛型为String,而Subscriber中的onNext()方法参数泛型也是String(和本文中观察者对象中的OnNext方法对比) |
Action1有1个参数泛型无返回值类型,onErrorAction设置的参数泛型为Throwable,而Subscriber中的onError()方法参数泛型也是Throwable |
那么,我们来查看observable.subscribe(onNextAction)的源码,在这里, Action1可以被当成一个包装对象,将onNext()方法进行包装作为不完整的回调传入到observable.subscribe()中。
图3.2.1 传入的onNextAction最终被包装成ActionSubscriber
我们来看看Action1有何玄机,Action1的源码如下图所示:
图3.2.2 Action1接口源码
实质上,这种根据参数泛型的个数且无返回值类型的包装在RxJava中有多种如下图所示的体现,例如Action0的参数个数为0,Action1的参数个数为1以此类推:
图3.2.3 根据参数泛型的个数且无返回值类型的包装
四、RxJava的线程切换
1、Scheduler线程调度器
如果不指定线程,默认是在调用subscribe方法的线程上进行回调,那么如果子线程中调用subscibe方法,而想在主线程中进行UI更新,则会抛出异常。当然了RxJava已经帮我们考虑到了这一点,所以提供了Scheduler线程调度器帮助我们进行线程之间的切换。
实质上,Scheduler线程调度器和RxJava的操作符有紧密关联,我将在下一篇文章中进行详细介绍。
- RxJava内置了如下所示几个的线程调度器:
- Schedulers.immediate():在当前线程中执行
- Schedulers.newThread():启动新线程,在新线程中进行操作
- Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
- Schedulers.computation():计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
- Schedulers.trampoline():会将任务按添加顺序依次放入当前线程中等待执行。线程一次只执行一个任务,其余任务排队等待,一个任务都执行完成后再开始下一个任务的执行。
- 此外RxJava还提供了用于测试的调度器Schedulers.test() 及 可自定义Scheduler—-Schedulers.form() 。
RxAndroid并且其为我们提供了AndroidSchedulers.mainThread()进行主线程的回调
2、线程控制
调用Observable对象的subscribeOn()、observeOn()方法即可完成线程控制。
- subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
- observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
Observable.just("Alex", "Payne").subscribeOn(Schedulers.io())//指定 subscribe() 所发生的线程.unsubscribeOn(Schedulers.io())//事件发送完毕后,及时取消发送.observeOn(AndroidSchedulers.mainThread())//指定 Subscriber 所运行在的线程.subscribe(new Action1<String>() {@Overridepublic void call(String s) {Log.e(TAG, s);}});
五、总结
本文主要介绍了RxJava的由来、使用步骤、部分内容的原理解析。在下篇文章中我会详细介绍RxJava的操作符。希望本文对你在学习RxJava的路上有所启发。
小礼物走一走,来简书关注我
作者:Alex_Payne
链接:https://www.jianshu.com/p/2d3d7c77dc92
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
ReactiveX简介相关推荐
- ReactiveX 简介
Observable 在ReactiveX中,观察者订阅了一个Observable.然后,该观察者对Observable发出的任何项目或项目序列做出反应. 这种模式有利于并发操作,因为它不需要在等待O ...
- Java中ReactiveX(RxJava)的使用
1.1 ReactiveX概述 ReactiveX官网:ReactiveX 1.1.1 Android的MVP开发模式 MVP的工作流程 Presenter负责逻辑的处理 Model提供数据 View ...
- 响应式编程简介之:Reactor
文章目录 简介 Reactor简介 reactive programming的发展史 Iterable-Iterator 和Publisher-Subscriber的区别 为什么要使用异步reacti ...
- Redux中的功能式React式编程简介
by Bhuvan Malik 通过布凡·马利克(Bhuvan Malik) Redux中的功能式React式编程简介 (An introduction to functional Reactive ...
- RxJS异步编程的简介
RxJs.RxJava.RxAndroid.RxSwift等是异步编程框架,对于前端工程师来说,RxJs框架是比较新颖. 中文官网地址:http://cn.rx.js.org/ RxJs简介 RxJS ...
- 《Kotlin 程序设计》第一章 Kotlin简介
第一章 Kotlin简介 科特林岛(Котлин)是一座俄罗斯的岛屿,位于圣彼得堡以西约30公里处,形状狭长,东西长度约14公里,南北宽度约2公里,面积有16平方公里,扼守俄国进入芬兰湾的水道.科特林 ...
- 【ReactiveX】基于Golang pmlpml/RxGo程序包的二次开发
基于Golang pmlpml/RxGo程序包的二次开发[阅读时间:约20分钟] 一.ReactiveX & RxGo介绍 1.ReactiveX 2.RxGo 二.系统环境&项目介绍 ...
- etcd 笔记(01)— etcd 简介、特点、应用场景、常用术语、分布式 CAP 理论、分布式原理
1. etcd 简介 etcd 官网定义: A highly-available key value store for shared configuration and service discov ...
- Docker学习(一)-----Docker简介与安装
一.Docker介绍 1.1什么是docker Docker是一个开源的应用容器引擎,基于Go语言并遵从Apache2.0协议开源 Docker可以让开发者打包他们的应用以及依赖包到一个轻量级,可移植 ...
最新文章
- python中递归函数特点,Python递归函数特点及原理解析
- spring el 表达式的上下文关联到 ApplicationContext
- map平均准确率_第五篇 目标检测评价标准—MAP
- 方法的重载与重写_我们不一样,不一样,重写与重载
- 爬虫 推送到discord_如何将自定义表情符号添加到Discord服务器
- 人人接龙助手,三分钟为你的微信群创建打卡活动
- 微信检测器 微信过滤工具
- Windows 8 激活信息备份还原方法与工具
- Unity获取手机app列表《三》安卓端
- 代码随想录第八天 LeetCode 344、541、剑指Offer 05、151、剑指Offer58 (字符串)
- jenkins k8s 动态增减 jenkins-salve (1) 制作部署jenkins-master 镜像
- js数组与字符串的相互转化
- 2022下半年教师资格证报名学历要求
- magisk安装与配置
- oracle user does not exist,user 'scott' does not exist! 又学了一招
- 自控考研复习 自我梳理(三) 知识来自网络,纯为总结侵权即删(一阶系统)
- 制作Linux版PE系统
- 一篇很好的机器学习介绍
- 八大排序算法 —— 归并排序
- 如何将手机PDF转成JPG图片?分享两个技巧
热门文章
- 华为的FusionInsight智能数据生态地图
- 用js随机生成随机车牌照
- 天涯明月刀手游显示服务器已满,《天涯明月刀手游》服务器爆满进不去游戏怎么办,排队时间太长怎么办?...
- 计算机环境与应用专业就业前景,2018资源环境科学专业就业前景和就业方向分析...
- 计算机毕业设计JAVA宠物咖啡馆平台系统mybatis+源码+调试部署+系统+数据库+lw
- 【家庭财务管理系统】
- 疫情期间给女朋友敲段代码,给女朋友个小惊喜
- 在React中使用防抖节流
- 使用C#实现简单的21点小游戏
- css分隔线中间带文字