原文出处:http://blog.chengyunfeng.com/?p=948#ixzz4KaT7AZOW

Rx 有两个最基础的类型,和其他一些扩展这两种类型的类。两个核心的类为: Observable 和 Observer。Subject 是同时继承了 Observable 和 Observer。

Rx 是在 Observer 模式之上建立起来的。这种模式很常见,在 Java 中有很多地方都使用了该模式,比如 JavaFx 中的 EventHandler。 这些简单的使用方式和 Rx 对比有如下区别:

  • 使用 event handler 来处理事件很难组合使用
  • 无法延时处理查询事件
  • 可能会导致内存泄露
  • 没有标准的标示完成的方式
  • 需要手工的来处理并行和多线程

Observable

Observable 是第一个核心类。该类包含了 Rx 中的很多实现,以及所有核心的操作函数(operator、或者说 操作符)。在本系列教程中会逐步介绍每个操作函数。现在我们只需要理解 subscribe 函数即可,下面是该函数的一种定义:

public final Subscription subscribe(Subscriber<? super T> subscriber)

该函数是用来接收 observable 发射的事件的。当事件被发射后,他们就丢给了 subscriber, subscriber 是用来处理事件的实现。这里的 Subscriber 参数实现了 Observer 接口。

一个 Observable 发射 三种类型的事件:

  • Values (数据)
  • 完成状态,告诉 Subscriber 事件(数据) 发射完毕,没有其他数据了
  • Error, 错误状态,如果在发射数据的过程中出现错误了。会发送该事件。

Observer

Subscriber 是 Observer 的一个实现。 Subscriber 实现了其他一些额外的功能,可以作为我们实现 Observer 的基类。现在先看看 Observer 的接口定义:

interface Observer<T> {void onCompleted();void onError(java.lang.Throwable e);void onNext(T t);
}

每次 Observable 发射事件的时候就会执行这三个对应的函数。Observer 的 onNext 函数会被调用0次或者多次,然后会调用 onCompleted 或者 onError。在 onCompleted 或者 onError 发生以后就不会再有其他事件发射出来了。

在使用 Rx 开发的过程中,你会看到很多 Observable,但是 Observer 出场的时候很少。但是理解 Observer 的概念是非常重要的,虽然有很多简写方式来帮助更加简洁的使用 Observer。

实现 Observable 和 Observer

你可以手工的实现 Observer 或者扩展 Observable。 在真实场景中并不需要这样做,Rx 已经提供了很多可以直接使用的工厂方法了。使用 Rx 提供的工具来创建 Observable 和 Observer 比手工实现要更加安全和简洁。

要订阅到一个 Observable,并不需要提供一个 Observer 示例。subscribe 函数有各种重载方法可以使用,你可以只订阅 onNext 事件,有可以只订阅 onError 事件,这样就不用提供 Observer 对象就可以接受事件了。每次只需要提供你关心的函数即可,例如 如果你不关心 error 和完成事件,则只提供 onNext 来接收每次发送的数据即可。
配合 Java 8 的 Lambda表达式则使用起来代码看起来会更加简洁,所以本系列示例代码会使用 lambda 表达式,如果你不了解的话,可以先看看掌握 Java 8 Lambda表达式。

Subject

Subject 是 Observable 的一个扩展,同时还实现了 Observer 接口。第一眼看上去可能有点奇怪,但是在有些场合下使用 Subject 将会非常便捷。他们可以像 Observer 一样接收事件,同时还可以像 Observable 一样把接收到的事件再发射出去。这种特性非常适合 Rx 中的接入点,当你的事件来至于 Rx 框架之外的代码的时候,你可以把这些数据先放到 Subject 中,然后再把 Subject转换为一个 Observable,就可以在 Rx 中使用它们了。你可以把 Subject 当做 Rx 中的 事件管道。

Subject 有两个参数类型:输入参数和输出参数。这样设计是为了抽象而不是应为使用 Subject 是为了转换数据类型。转换数据应该使用转换操作函数来完成,后面我们将介绍各种操作函数。
Subject 有各种不同的具体实现。下面将介绍一些非常重要的实现以及他们之间的区别。

1、PublishSubject

PublishSubject 是最直接的一个 Subject。当一个数据发射到 PublishSubject 中时,PublishSubject 将立刻把这个数据发射到订阅到该 subject 上的所有 subscriber 中。

public static void main(String[] args) {PublishSubject<Integer> subject = PublishSubject.create();subject.onNext(1);subject.subscribe(System.out::println);subject.onNext(2);subject.onNext(3);subject.onNext(4);
}

输出的结果:

2
3
4

上面的 System.out::println 是 Lambda 表达式中的函数引用,如果表达式代码块只有一个函数调用,则可以直接使用函数引用来简化代码
可以看到,数据 1 并没有打印出来,原因是当我们订阅到 subject 的时候,1 已经发射出去了。当订阅到 subject 后就开始接收 发射到 subject 中的数据了。

这是我们初次看到如何使用 subscribe 函数,值得详细研究下是如何用的。 这里我们使用了一个重载的参数只有一个 Function 类型。这个参数 Function 接收一个参数 Integer 并且没有返回值。 没有返回值的 Function 在 Rx 中被称之为 action。 可以使用下面几种方式来提供这个 Function:

  • 提供一个 Action1 的实现对象
  • 使用 Lambda 表达式 实现
  • 使用符合该接口定义类型的 Lambda 表达式函数引用。这里System.out::println 函数可以接受一个 Object 对象,符合 Action 的定义(接受一个参数并没有返回值),所以我们可以把该函数作为函数应用使用。 subscribe 将会使用他收到的值作为 println 函数的参数来调用 println 函数。

2、ReplaySubject

ReplaySubject 可以缓存所有发射给他的数据。当一个新的订阅者订阅的时候,缓存的所有数据都会发射给这个订阅者。 由于使用了缓存,所以每个订阅者都会收到所以的数据:

ReplaySubject<Integer> s = ReplaySubject.create();
s.subscribe(v -> System.out.println("Early:" + v));
s.onNext(0);
s.onNext(1);
s.subscribe(v -> System.out.println("Late: " + v));
s.onNext(2);

输出结果:

Early:0
Early:1
Late: 0
Late: 1
Early:2
Late: 2

不管是何时订阅的,每个订阅者都收到了所有的数据。注意后一个订阅者在处理 2 之前就先收到了之前发射的数据 0和1

缓存所有的数据并不是一个十分理想的情况,如果 Observable 事件流运行很长时间,则缓存所有的数据会消耗很多内存。可以限制缓存数据的数量和时间。 ReplaySubject.createWithSize 限制缓存多少个数据;而 ReplaySubject.createWithTime 限制一个数据可以在缓存中保留多长时间。

ReplaySubject<Integer> s = ReplaySubject.createWithSize(2);
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println("Late: " + v));
s.onNext(3);

结果:

Late: 1
Late: 2
Late: 3

由于指定只缓存两个数据,所以当订阅的时候第一个数据 0 就收不到了。 限制缓存的时间也是一样的情况:

ReplaySubject<Integer> s = ReplaySubject.createWithTime(150, TimeUnit.MILLISECONDS,Schedulers.immediate());
s.onNext(0);
Thread.sleep(100);
s.onNext(1);
Thread.sleep(100);
s.onNext(2);
s.subscribe(v -> System.out.println("Late: " + v));
s.onNext(3);

结果:

Late: 1
Late: 2
Late: 3

使用时间缓存创建 ReplaySubject 需要指定一个 Scheduler, Scheduler 是 Rx 中保持时间的方式。现在可以假装他不存在,不用关心他。
ReplaySubject.createWithTimeAndSize 则可以同时限制时间和个数。

3、BehaviorSubject

BehaviorSubject 只保留最后一个值。 等同于限制 ReplaySubject 的个数为 1 的情况。在创建的时候可以指定一个初始值,这样可以确保党订阅者订阅的时候可以立刻收到一个值。

BehaviorSubject<Integer> s = BehaviorSubject.create();
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println("Late: " + v));
s.onNext(3);

结果:

Late: 2
Late: 3

下面的示例只是打印出 Completed, 由于最后一个事件就是 Completed。

BehaviorSubject<Integer> s = BehaviorSubject.create();
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.onCompleted();
s.subscribe(v -> System.out.println("Late: " + v),e -> System.out.println("Error"),() -> System.out.println("Completed")
);

这里使用了 subscribe 函数的另外一种重载形式,接受三个参数。
下面使用了默认初始化值,如果订阅者的发射数据之前就订阅了,则会收到这个初始化的值:

BehaviorSubject<Integer> s = BehaviorSubject.create(0);
s.subscribe(v -> System.out.println(v));
s.onNext(1);

结果:

0
1

由于 BehaviorSubject 的定义就是总是有可用的数据,所以一般都会使用初始化值来创建 BehaviorSubject 。

4、AsyncSubject

AsyncSubject 也缓存最后一个数据。区别是 AsyncSubject 只有当数据发送完成时(onCompleted 调用的时候)才发射这个缓存的最后一个数据。可以使用 AsyncSubject 发射一个数据并立刻结束。

AsyncSubject<Integer> s = AsyncSubject.create();
s.subscribe(v -> System.out.println(v));
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.onCompleted();

结果:

2

如果上面的示例不调用 s.onCompleted(); 则什么结果都不会打印出来。

隐含的规则

Rx 中有一些隐含的规则在代码中并不太容易看到。一个重要的规则就是当一个事件流结束(onError 或者 onCompleted 都会导致事件流结束)后就不会发射任何数据了。这些 Subject 的实现都遵守这个规则,subscribe 函数也拒绝违反该规则的情况。

Subject<Integer, Integer> s = ReplaySubject.create();
s.subscribe(v -> System.out.println(v));
s.onNext(0);
s.onCompleted();
s.onNext(1);
s.onNext(2);

结果:

0

但是在 Rx 实现中并没有完全确保这个规则,所以你在使用 Rx 的过程中要注意遵守该规则,否则会出现意料不到的情况。

RxJava 教程第一部分:入门之 关键的类相关推荐

  1. RxJava 教程第一部分:入门之 生命周期管理

    原文出处:http://blog.chengyunfeng.com/?p=954#ixzz4KaYiN2oT Rx 背后的理念是:无法知道事件流何时发射数据.也不知何时结束发射,但是你需要控制何时开始 ...

  2. RxJava 教程第一部分:为何使用RxJava

    原文出处:http://blog.chengyunfeng.com/?p=947 为何使用RxJava 用户希望查看实时的数据,他们现在就要看最新的微博:他们的订单立刻就被确认:价格需要现在定下:在线 ...

  3. ETL工具之Kettle开发教程第一节-入门

    ETL简介 概念 ETL是数据抽取(Extract).转换(Transform).装载(Loading)的缩写. 数据抽取 数据抽取是指从数据源获取所需数据的过程.数据抽取过程会过滤掉目标数据集中不需 ...

  4. Electron系列教程——第一篇:入门

    Electron系列教程--第一篇:入门 一.楔子 想要学习Electron,跟着官网或者中文网,仔细阅读,并实践,其实是够了,不必要重复.那为什么还要写这个系列呢?大概有两方面原因,其一:我使用el ...

  5. 微信公众号开发入门教程第一篇

    微信公众号开发入门教程第一篇 关键字:微信公众平台开发 作者:方倍工作室 在这篇微信公众平台开发教程中,我们假定你已经有了PHP语言程序.MySQL数据库.计算机网络通讯.及HTTP/XML/CSS/ ...

  6. android 教程概要,Android精通教程-第一节Android入门简介

    前言 大家好,我是 Vic,今天给大家带来Android精通教程-第一节Android入门简介的概述,希望你们喜欢 每日一句 If life were predictable it would cea ...

  7. Processing入门教程第一课-Processing的“前世今生”

    很早以前大概13.14年就通过清华大学付志勇教授了解到了Processing这个工具,起初只是初步了解并没有下定决心学习(当初资料太少了).由于当时只是初步的看了看,所以很多内容和知识点都是一知半解的 ...

  8. LATEX教程第一讲—— LATEX入门

    LATEX教程第一讲--LATEX入门 1.1 什么是LATEX? 1.2 怎么下载? 1.3 Latex初入门 1.3.1 基本框架 1. 一篇Latex的开始--\documentclass[** ...

  9. 视频教程-19全新mysql教程零基础入门实战精讲mysql视频DBA数据库视频教程SQL教程-MySQL

    19全新mysql教程零基础入门实战精讲mysql视频DBA数据库视频教程SQL教程 7年的开发架构经验,曾就职于国内一线互联网公司,开发工程师,现在是某创业公司技术负责人, 擅长语言有node/ja ...

最新文章

  1. SVM原理详细图文教程来了!一行代码自动选择核函数
  2. 两个变量交换值 和按位异或的理解
  3. 模型学习 - CNN及一系列发展
  4. css实现让页面的footer始终位于底部
  5. Java集合篇:ConcurrentHashMap详解(JDK1.6)
  6. windows修改远程桌面RDP连接数
  7. (Docker实战) 第3篇:Centos7 拉取和部署Mysql
  8. 爬虫app信息抓取之apk反编译抓取
  9. Ubuntu 主题美化
  10. Vue H5 项目模板
  11. Python之pygame安装教程
  12. 微软云服务器的优点,探寻:微软私有云的优势究竟是什么
  13. 深度学习教程(2) | 神经网络基础(吴恩达·完整版)
  14. 前端开发人员必备的十项技能
  15. 计算机网络(三)—— 数据链路层(7):MAC地址、IP地址以及数据链路层使用的ARP协议
  16. 无法启动计算机的杀毒软件,电脑中毒杀毒软件无法启动任务管理器也被禁用怎么办?...
  17. c语言 at指令gprs,gprs AT 指令
  18. 修改微信电脑版的字体
  19. 可裂解组织蛋白酶的ADC偶联物-靶向抗体偶联技术
  20. 2017年9月10日训练日记

热门文章

  1. 面向对象第四单元(UML)总结体会课程总结
  2. python_day6.2
  3. iOS GCD中级篇 - dispatch_group的理解及使用
  4. 如何把你的搜索引擎也加入到Firefox中
  5. Java内存模型与线程(思维导图)
  6. *62.分页和分段的区别
  7. C#字符串与unicode互相转换
  8. Discuz验证码识别(上线篇)-写给程序员的TensorFlow教程
  9. 修复Chrome上的ERR_TOO_MANY_REDIRECTS错误?
  10. matlab添加多个legend