转:  http://www.jianshu.com/p/856297523728

.subscribeOn(Schedulers.io()) // 指定subscribe()发生在IO线程  --->call --》被观察者 Observable
.observeOn(AndroidSchedulers.mainThread()) // 指定Subscriber的回调发生在UI线程                -->onNext onComplete onError-->Observer 观察者

写在前面

关注RxJava已经有很久一段时间了,因为当你有一天打开技术论坛、打开Github、打开简书的时候满屏都是各种Rx的时候,心里是很慌的。所以趁着结课大作业全部搞定后,静下心花了几天时间系统地学习了一下RxJava。

现在网上有各种优秀的博客或文档来讲解RxJava,最出名的莫过于扔物线老师(暂且称为老师吧...)的这篇教程了,我也是看这篇文章入门的,强推。

给 Android 开发者的 RxJava 详解——扔物线

当然也有很多很多优秀的教程,在文末我会加链接。但是从我这几天学习的过程和经验来看,教程不在多,而在于吃透。所以尽管网上已经有了很多优秀的教程了,但在这里我还是想把自己学习的过程记录下来。

因为这个RxJava内容不算少,而且应用场景非常广,所以这个大话RxJava应该会写很久。

今天就来先来个入门RxJava吧,下面是本文的目录:

  • 写在前面
  • 初识RxJava
    • 什么是Rx
    • 什么是RxJava
    • 扩展的观察者模式
  • 如何实现RxJava
    • 创建Observer
    • 创建Observable
    • 订阅(Subscribe)
  • 线程控制——Scheduler
  • 第一个RxJava案例
  • 总结
  • 参考资料
  • 项目源码

初识RxJava

什么是Rx

很多教程在讲解RxJava的时候,上来就介绍了什么是RxJava。这里我先说一下什么是Rx,Rx就是ReactiveX,官方定义是:

Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序

一脸懵B

看到这个定义我只能呵呵,稍微通俗点说是这样的:

Rx是微软.NET的一个响应式扩展。Rx借助可观测的序列提供一种简单的方式来创建异步的,基于事件驱动的程序。

这个有点清晰了,至少看到我们熟悉的异步事件驱动,所以简单点且不准确地来说:

Rx就是一种响应式编程,来创建基于事件的异步程序

注意,这个定义是不准确的,但是对于初学者来说,已经可以有个基本的认知了。

另外还有一点就是Rx其实是一种编程思想,用很多语言都可以实现,比如RxJava、RxJS、RxPHP等等。而现在我们要说的就是RxJava。

RxJava是什么

二话不说,先上定义:

RxJava就是一种用Java语言实现的响应式编程,来创建基于事件的异步程序

有人问你这不是废话么,好吧那我上官方定义:

一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库

反正我刚看这句话的时候也呵呵了,当然现在有所领悟了。

除此之外,扔物线老师总结的就更精辟了:异步,它就是一个实现异步操作的库。

扩展的观察者模式

对于普通的观察者模式,这里我就不细说了。简单概括就是,观察者(Observer)需要在被观察者(Observable)变化的一瞬间做出反应。

而两者通过注册(Register)或者订阅(Subscribe)的方式进行绑定。

就拿扔物线老师给的例子来说,我丰富了一下如图所示:

观察者模式

其中这个Button就是被观察者(Observable),OnClickListener就是观察者(Observer),两者通过setOnClickListener达成订阅(Subscribe)关系,之后当Button产生OnClick事件的时候,会直接发送给OnClickListener,它做出相应的响应处理。

当然还有其他的例子,比如Android四大组件中的ContentProvider与ContentObserver之间也存在这样的关系。

而RxJava的观察者模式呢,跟这个差不多,但是也有几点差别:

  • Observer与Observable是通过 subscribe() 来达成订阅关系。
  • RxJava中事件回调有三种:onNext()onCompleted()onError()
  • 如果一个Observerble没有任何的Observer,那么这个Observable是不会发出任何事件的。

其中关于第三点,这里想说明一下,在Rx中,其实Observable有两种形式:热启动Observable和冷启动Observable。

热启动Observable任何时候都会发送消息,即使没有任何观察者监听它。

冷启动Observable只有在至少有一个订阅者的时候才会发送消息

这个地方虽然对于初学者来说区别不大,但是要注意一下,所以上面的第三点其实就针对于冷启动来说的。

另外,关于RxJava的回调事件,扔物线老师总结的很好,我就不班门弄斧了:

  • onNext():基本事件。
  • onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
  • onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。

值得注意的是在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。如果在队列中调用了其中一个,就不应该再调用另一个。

好了,那我们也附一张图对比一下吧:

RxJava观察者模式

如何实现RxJava

关于实现RxJava的步骤,扔物线老师已经说的非常好了,这里我就大体总结概括一下。

创建Observer

在Java中,一想到要创建一个对象,我们马上就想要new一个。没错,这里我们也是要new一个Observer出来,其实就是实现Observer的接口,注意String是接收参数的类型:

//创建Observer
Observer<String> observer = new Observer<String>() {@Overridepublic void onNext(String s) {Log.i("onNext ---> ", "Item: " + s);}@Overridepublic void onCompleted() {Log.i("onCompleted ---> ", "完成");}@Overridepublic void onError(Throwable e) {Log.i("onError ---> ", e.toString());}
};

当然这里也要提一个实现了 Observer 接口的抽象类:Subscriber ,它跟 Observer 接口几乎完全一样,只是多了两个方法,看看扔物线老师的总结:

  • onStart(): 它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。

  • unsubscribe(): 用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态。 要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。

虽然多了两个方法,但是基本实现方式跟Observer是一样的,所以暂时可以不考虑两者的区别。不过值得注意的是:

实质上,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。

创建Observable

与Observer不同的是,Observable是通过 create() 方法来创建的。注意String是发送参数的类型:

//创建Observable
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("Hello");subscriber.onNext("World");subscriber.onCompleted();}
});

关于这其中的流程,我们暂且不考虑,在下一篇的源码解析中,我们再详细说明。

订阅(Subscribe)

在之前,我们创建了 Observable 和 Observer ,现在就需要用 subscribe() 方法来将它们连接起来,形成一种订阅关系:

//订阅
observable.subscribe(observer);

这里其实确实有点奇怪,为什么是Observable(被观察者)订阅了Observer(观察者)呢?其实我们想一想之前Button的点击事件:

Button.setOnClickListener(new View.OnClickListener())

Button是被观察者,OnClickListener是观察者,setOnClickListener是订阅。我们惊讶地发现,也是被观察者订阅了观察者,所以应该是一种流式API的设计吧,也没啥影响。

完整代码如下:

    //创建ObserverObserver<String> observer = new Observer<String>() {@Overridepublic void onNext(String s) {Log.i("onNext ---> ", "Item: " + s);}@Overridepublic void onCompleted() {Log.i("onCompleted ---> ", "完成");}@Overridepublic void onError(Throwable e) {Log.i("onError ---> ", e.toString());}};//创建ObservableObservable observable = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("Hello");subscriber.onNext("World");subscriber.onCompleted();}});//订阅observable.subscribe(observer);

运行的结果如下,可以看到Observable中发送的String已经被Observer接收并打印了出来:

基本实现

线程控制——Scheduler

好了,这里就是RxJava的精髓之一了。

在RxJava中,Scheduler相当于线程控制器,可以通过它来指定每一段代码运行的线程。

RxJava已经内置了几个Scheduler,扔物线老师也总结得完美:

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的Scheduler。

  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。行为模式和newThread()差不多,区别在于io()的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()比newThread()更有效率。不要把计算工作放在io()中,可以避免创建不必要的线程。

  • Schedulers.computation(): 计算所使用的Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个Scheduler使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在computation()中,否则 I/O 操作的等待时间会浪费 CPU。

  • AndroidSchedulers.mainThread(),Android专用线程,指定操作在主线程运行。

那我们如何切换线程呢?RxJava中提供了两个方法:subscribeOn()observeOn() ,两者的不同点在于:

  • subscribeOn(): 指定subscribe()订阅所发生的线程,即 call() 执行的线程。或者叫做事件产生的线程。

  • observeOn(): 指定Observer所运行在的线程,即onNext()执行的线程。或者叫做事件消费的线程。

具体实现如下:

//改变运行的线程
observable.subscribeOn(Schedulers.io());
observable.observeOn(AndroidSchedulers.mainThread());

这里确实不好理解,没关系,下面我们在具体例子中观察现象。

而这其中的原理,会在之后的源码级分析的文章中详细解释,现在我们暂且搁下。

第一个RxJava案例

好了,当看完之前的所有基础东西,现在我们就完全可以写一个基于RxJava的Demo了。

这里我们用一个基于RxJava的异步加载网络图片来演示。

由于重点在于RxJava对于异步的处理,所以关于如何通过网络请求获取图片,这里就不详细说明了。

另外这里采用的是链式调用,并为重要位置打上Log日志,观察方法执行的所在线程。

首先需要添加依赖,这没什么好说的:

dependencies {compile fileTree(include: ['*.jar'], dir: 'libs')testCompile 'junit:junit:4.12'...compile 'io.reactivex:rxjava:1.1.6'compile 'io.reactivex:rxandroid:1.2.1'
}

然后按照步骤来,首先通过create创建Observable,注意发送参数的类型是Bitmap:

//创建被观察者
Observable.create(new Observable.OnSubscribe<Bitmap>() {/*** 复写call方法** @param subscriber 观察者对象*/@Overridepublic void call(Subscriber<? super Bitmap> subscriber) {//通过URL得到图片的Bitmap对象Bitmap bitmap = GetBitmapForURL.getBitmap(url);//回调观察者方法subscriber.onNext(bitmap);subscriber.onCompleted();Log.i(" call ---> ", "运行在 " + Thread.currentThread().getName() + " 线程");}
})

然后我们需要创建Observer,并进行订阅,这里是链式调用

.subscribe(new Observer<Bitmap>() {   //订阅观察者(其实是观察者订阅被观察者)@Overridepublic void onNext(Bitmap bitmap) {mainImageView.setImageBitmap(bitmap);Log.i(" onNext ---> ", "运行在 " + Thread.currentThread().getName() + " 线程");}@Overridepublic void onCompleted() {mainProgressBar.setVisibility(View.GONE);Log.i(" onCompleted ---> ", "完成");}@Overridepublic void onError(Throwable e) {Log.e(" onError --->", e.toString());}});

当然网络请求是耗时操作,我们需要在其他线程中执行,而更新UI需要在主线程中执行,所以需要设置线程:

.subscribeOn(Schedulers.io()) // 指定subscribe()发生在IO线程
.observeOn(AndroidSchedulers.mainThread()) // 指定Subscriber的回调发生在UI线程

这样我们就完成了一个RxJava的基本编写,现在整体看一下代码:

//创建被观察者
Observable.create(new Observable.OnSubscribe<Bitmap>() {/*** 复写call方法** @param subscriber 观察者对象*/@Overridepublic void call(Subscriber<? super Bitmap> subscriber) {//通过URL得到图片的Bitmap对象Bitmap bitmap = GetBitmapForURL.getBitmap(url);//回调观察者方法subscriber.onNext(bitmap);subscriber.onCompleted();Log.i(" call ---> ", "运行在 " + Thread.currentThread().getName() + " 线程");}
})
.subscribeOn(Schedulers.io()) // 指定subscribe()发生在IO线程
.observeOn(AndroidSchedulers.mainThread()) // 指定Subscriber的回调发生在UI线程
.subscribe(new Observer<Bitmap>() {   //订阅观察者(其实是观察者订阅被观察者)@Overridepublic void onNext(Bitmap bitmap) {mainImageView.setImageBitmap(bitmap);Log.i(" onNext ---> ", "运行在 " + Thread.currentThread().getName() + " 线程");}@Overridepublic void onCompleted() {mainProgressBar.setVisibility(View.GONE);Log.i(" onCompleted ---> ", "完成");}@Overridepublic void onError(Throwable e) {Log.e(" onError --->", e.toString());}});

好了,下面是运行的动态图:

RxJava异步加载网络图片

现在来看一下运行的Log日志:

Log

可以看到,call方法(事件产生)执行在IO线程,而onNext方法(事件消费)执行在main线程。说明之前分析的是对的。

总结

好了,由于本文是一个RxJava的基础,所以篇幅稍微过长了点。即使这样,很多细节性问题都没有交代清楚。但所幸的是,本文已经将RxJava必要的基础入门知识讲解完了。

在后期的文章中,主要是对RxJava的源码进行必要的分析,以及RxJava中各种操作符(比如常用的map、from、just、take、flatmap等等)的使用方式进行讲解。

跟现有的博客或者教程不一样的是,我不打算直接用理论知识来讲解,而是会用一个具体的项目来完成一系列的说明。

其实在写博客的时候,对自己所学也是一个复习的过程,能发现之前学习过程中所疏忽的问题。但由于技术水平有限,文中难免会有错误或者疏忽之处,欢迎大家指正与交流。

最后感谢扔物线老师的文章,写的真的是太好了。

文/iamxiarui(简书作者)
原文链接:http://www.jianshu.com/p/856297523728
著作权归作者所有,转载请联系作者获得授权,并标注“简书作者”。

大话RxJava:一、初识RxJava与基本运用相关推荐

  1. 动脑学院 java_动脑学院Rxjava预习资料 Rxjava入门

    前言 Rxjava由于其基于事件流的链式调用.逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎. Github截图 本文主要: 面向 刚接触Rxjava的初学者 提供了一份 ...

  2. 初识RxJava(三)转换类 操作符

    前言: 昨天的两篇介绍了 RxJava 的创建类操作符和延时类操作符,今天笔者记录一下 转换类 操作符,不太监了 ,开始记笔记.对了具体不太清楚转换过程的 可以去拜读 这位大佬 Season_zlc ...

  3. 一起来造一个RxJava,揭秘RxJava的实现原理

    RxJava是一个神奇的框架,用法很简单,但内部实现有点复杂,代码逻辑有点绕.我读源码时,确实有点似懂非懂的感觉.网上关于RxJava源码分析的文章,源码贴了一大堆,代码逻辑绕来绕去的,让人看得云里雾 ...

  4. rxjava背压_Android Rxjava :最简单全面背压讲解 (Flowable)

    1.前言 阅读本文需要对Rxjava了解,如果还没有了解或者使用过Rxjava的兄die们,可以观看我另外一篇 Android Rxjava:不一样的诠释进行学习. Rxjava背压:被观察者发送事件 ...

  5. android rxjava 过滤,解剖 RxJava 之过滤操作符

    介绍 此文章结合 Github AnalyseRxJava 项目,给 Android 开发者带来 RxJava 详细的解说.参考自 RxJava Essential 及书中的例子 关于 RxJava ...

  6. 八个层面比较 Java 8, RxJava, Reactor

    前言 这是一篇译文,原文出处(http://alexsderkach.io/comparing-java-8-rxjava-reactor/).其实很久以前我就看完了这篇文章,只不过个人对响应式编程研 ...

  7. RxJava 源码解析之观察者模式

    了解 RxJava 的应该都知道是一个基于事务驱动的库,响应式编程的典范.提到事务驱动和响应就不得不说说,设计模式中观察者模式,已经了解的朋友,可以直接跳过观察者模式的介绍,直接到 RxJava 源码 ...

  8. 用大白话讲解RxJava原理

    近日外媒报道称,Google 其应用商店 Play Store 一次下架了超过 600 个违规 App:其中中国.印度以及新加坡为本次大规模下架 App 开发者前三的国家,来自中国的猎豹移动所开发的 ...

  9. 当RxJava遇到AOP

    如来神掌.jpg 背景 公司打算开发一款全新的To C产品,因此我开始做一些搭建框架的事儿以及POC.新的产品能够使用一些比较新的技术,在新产品中我大量使用了Rx.这就导致了原先的AOP框架在某些场景 ...

最新文章

  1. DPM2007轻松恢复Exchange邮件,DPM2007系列之三
  2. C++中三种正则表达式比较
  3. Android 请求PHP接口, 返回json, 开头有问号, 解决方案
  4. Html5 History API解析
  5. 软件需求说明的前世和今生
  6. 排球积分程序(三)——模型类的设计
  7. 北京某打工子弟学校之三
  8. Spring IOC 和 AOP 概览
  9. pythonhtml生成word_python如何实现word批量转HTML
  10. datatable刷新数据_UE4 利用SaveGame和CSV进行Runtime数据更新
  11. python能做页面加载动画吗_HTML+CSS实现页面加载(loading)动画效果
  12. 正版python怎么下载_怎么下载官网python并安装
  13. 涉嫌抄袭!致歉,抖音Semi Design承认参考阿里Ant Design
  14. 如何写一份优秀的java程序员简历
  15. 【螺钉和螺母问题】【算法分析与设计】假设我们有n个直径各不相同的螺钉以及n个相应的螺母...
  16. VO、DO、DTO、PO是什么
  17. 剑指 Offer II 017. 含有所有字符的最短字符串
  18. 【B2B2C多用户】WSTMart商城系统 V2.0.6更新版发布
  19. C/C++在线餐馆预订管理系统
  20. 电路中的开漏输出与推挽输出

热门文章

  1. jsp注册里密码强弱怎么弄_jsp+servlet实战酷炫博客+聊天系统
  2. 大数据druid查询不支持分页_Druid实时大数据分析查询(六)
  3. java inputstream的read一次只能读到一个字节_20210118-JAVA面试题
  4. 微服务架构中熔断器_基于 Golang 语言的微服务熔断器
  5. android开发获取应用本身耗电量_别找了,Android常用自动化工具全在这儿了!
  6. opencv-contrib配置过程
  7. TCP/IP学习入门笔记
  8. 产生任意区间内的均匀分布的随机整数序列
  9. linux安装mysql5.6.26_linux mysql-5.6.26 安装
  10. ubuntu 能解析域名但ping不通_域名解析设置方法