前面两部分,我们学习到了如何创建 Observable以及如何从 Observable 中获取数据。本部分将介绍一些更高级的用法,以及一些在大型项目中的最佳实践。

Side effects(副作用)

没有副作用的函数通过参数和返回值来程序中的其他函数相互调用。当一个函数中的操作会影响其他函数中的返回结果时,我们称该函数有副作用。写数据到磁盘、记录日志、打印调试信息都是常见的副作用表现。Java 中在一个函数里面修改另外一个函数使用的对象的值是合法的。

副作用有时候很有用也有必要使用。但是有时候也有很多坑。 Rx 开发者应避免非必要的副作用,如果必须使用副作用的时候,则应该写下详细的说明文档。

副作用的问题

Functional programming 通常避免副作用。带有副作用的函数(尤其是可以修改参数状态的)要求开发者了解跟多实现的细节。增加了函数的复杂度并且导致函数被错误理解和使用,并且难以维护。副作用有故意的和无意的。可以通过封装或者使用不可变对象来避免副作用。有一些明智的封装规则可以显著的提高你 Rx 代码的可维护性。

我们使用一个带有副作用的示例来演示。 Java 中不可以在 Lambda 或者 匿名函数中引用外层的非 final 变量。 但是 Java 中的 final 变量只是保证了该编译引用的对象地址不变,但是对象本身的状态还是可以改变的。例如,下面是一个用来计数的一个类:

class Inc {private int count = 0;public void inc() { count++;}public int getCount() {return count;}
}

即使是一个 final 的 Inc 变量,还是可以通过调用其函数来修改他的状态。 注意 Java 并没有强制显式使用 final ,如果在你 Lambda 表达式中修改外层变量的引用对象地址(把外层变量重新复制为其他对象),则会出错。

Observable<String> values = Observable.just("请", "不要", "有", "副作用");Inc index = new Inc();
Observable<String> indexed = values.map(w -> {index.inc();return w;});
indexed.subscribe(w -> System.out.println(index.getCount() + ": " + w));

结果:

1: 请
2: 不要
3: 有
4: 副作用

目前还来看不出来问题。但是如果我们在该 Observable 上再次订阅一个 subscriber,则问题就出来了。

Observable<String> values = Observable.just("请", "不要", "有", "副作用");Inc index = new Inc();
Observable<String> indexed = values.map(w -> {index.inc();return w;});
indexed.subscribe(w -> System.out.println("1st observer: " + index.getCount() + ": " + w));
indexed.subscribe(w -> System.out.println("2nd observer: " + index.getCount() + ": " + w));

结果:

1st observer: 1: 请
1st observer: 2: 不要
1st observer: 3: 有
1st observer: 4: 副作用
2nd observer: 5: 请
2nd observer: 6: 不要
2nd observer: 7: 有
2nd observer: 8: 副作用

第二个 Subscriber 的索引是从 5 开始的。这明显不是我们想要的结果。这里的副作用很容易发现,但是真实应用中的副作用有些很难发现。

在数据流中组织数据

可以通过 scan 函数来计算每个数据的发射顺序:

class Indexed <T> {public final int index;public final T item;public Indexed(int index, T item) {this.index = index;this.item = item;}
}
Observable<String> values = Observable.just("No", "side", "effects", "please");Observable<Indexed<String>> indexed = values.scan(new Indexed<String>(0, null), (prev,v) -> new Indexed<String>(prev.index+1, v)).skip(1);
indexed.subscribe(w -> System.out.println("1st observer: " + w.index + ": " + w.item));
indexed.subscribe(w -> System.out.println("2nd observer: " + w.index + ": " + w.item));

结果:

1st observer: 1: No
1st observer: 2: side
1st observer: 3: effects
1st observer: 4: please
2nd observer: 1: No
2nd observer: 2: side
2nd observer: 3: effects
2nd observer: 4: please

上面的结果为正确的。 我们把两个 Subscriber 共享的属性给删除了,这样他们就没法相互影响了。

do

像记录日志这样的情况是需要副作用的。subscribe 总是有副作用,否则的话这个函数就没啥用了。虽然可以在 subscriber 中记录日志信息,但是这样做有缺点:

  1. 在核心业务代码中混合了不太重要的日志代码
  2. 如果想记录数据流中数据的中间状态,比如 执行某个操作之前和之后,则需要一个额外的 Subscriber 来实现。这样可能会导致最终 Subscriber 和 日志 Subscriber 看到的状态是不一样的。

下面的这些函数让我们可以更加简洁的实现需要的功能:

public final Observable<T> doOnCompleted(Action0 onCompleted)
public final Observable<T> doOnEach(Action1<Notification<? super T>> onNotification)
public final Observable<T> doOnEach(Observer<? super T> observer)
public final Observable<T> doOnError(Action1<java.lang.Throwable> onError)
public final Observable<T> doOnNext(Action1<? super T> onNext)
public final Observable<T> doOnTerminate(Action0 onTerminate)

这些函数在 Observable 每次事件发生的时候执行,并且返回 Observable。 这些函数明确的表明了他们有副作用,使用起来更加不易混淆:

Observable<String> values = Observable.just("side", "effects");values.doOnEach(new PrintSubscriber("Log")).map(s -> s.toUpperCase()).subscribe(new PrintSubscriber("Process"));

结果:

Log: side
Process: SIDE
Log: effects
Process: EFFECTS
Log: Completed
Process: Completed

这里使用了上一章使用的帮助类 PrintSubscriber 。这些 do 开头的函数并不影响最终的 Subscriber。 例如:

static Observable<String> service() {return  Observable.just("First", "Second", "Third").doOnEach(new PrintSubscriber("Log"));
}

可以这样使用该函数:

service().map(s -> s.toUpperCase()).filter(s -> s.length() > 5).subscribe(new PrintSubscriber("Process"));

结果:

Log: First
Log: Second
Process: SECOND
Log: Third
Log: Completed
Process: Completed

即便最终使用的时候过滤了一些数据,但是我们记录了服务器返回的所有结果。
这些函数中 doOnTerminate 在 Observable 结束发射数据之前发生。不管是因为 onCompleted 还是 onError 导致数据流结束。 另外还有一个 finallyDo 函数在 Observable 结束发射之后发生。

doOnSubscribe, doOnUnsubscribe

public final Observable<T> doOnSubscribe(Action0 subscribe)
public final Observable<T> doOnUnsubscribe(Action0 unsubscribe)

Subscription 和 unsubscription 并不是 Observable 发射的事件。而是 该 Observable 被 Observer 订阅和取消订阅的事件。

ReplaySubject<Integer> subject = ReplaySubject.create();
Observable<Integer> values = subject.doOnSubscribe(() -> System.out.println("New subscription")).doOnUnsubscribe(() -> System.out.println("Subscription over"));Subscription s1 = values.subscribe(new PrintSubscriber("1st"));
subject.onNext(0);
Subscription s2 = values.subscribe(new PrintSubscriber("2st"));
subject.onNext(1);
s1.unsubscribe();
subject.onNext(2);
subject.onNext(3);
subject.onCompleted();

结果:

New subscription
1st: 0
New subscription
2st: 0
1st: 1
2st: 1
Subscription over
2st: 2
2st: 3
2st: Completed
Subscription over

使用 AsObservable 函数来封装

Rx 使用面向对象的 Java 语言来实现 functional programming 风格编码。 需要注意 面向对象中的问题。 例如下面一个天真版的返回 observable 的服务:

public class BrakeableService {public BehaviorSubject<String> items = BehaviorSubject.create("Greet");public void play() {items.onNext("Hello");items.onNext("and");items.onNext("goodbye");}
}

上面的实现中, 调用者可以自己修改 items 引用的对象,也可以修改 Observable 发射的数据。所以需要对调用者隐藏 Subject 接口,只暴露 Observable 接口:

public class BrakeableService {private final BehaviorSubject<String> items = BehaviorSubject.create("Greet");public Observable<String> getValues() {return items;}public void play() {items.onNext("Hello");items.onNext("and");items.onNext("goodbye");}
}

上面的改进版本,看起来我们返回的是一个 Observable,但该返回的对象是不安全的,返回的其实是一个 Subject。

asObservable

由于 Observable 是不可变的,所以 asObservable 函数是为了把一个 Observable 对象包装起来并安全的分享给其他人使用。

public Observable<String> getValues() {return items.asObservable();
}

这样的话,我们的 Subject 对象就被合理的保护起来了。这样其他恶意人员也无法修改你的 Observable 返回的数据了,在使用的过程中也可以避免出现错误了。

无法保护可变对象

在 RxJava 中, Rx 传递的是对象引用 而不是 对象的副本。在一个 地方修改了对象,在传递路径的其他地方上也是可见的。例如下面一个可变的对象:

class Data {public int id;public String name;public Data(int id, String name) {this.id = id;this.name = name;}
}

使用该对象的一个 Observable 和两个 Subscriber:

Observable<Data> data = Observable.just(new Data(1, "Microsoft"),new Data(2, "Netflix")
);data.subscribe(d -> d.name = "Garbage");
data.subscribe(d -> System.out.println(d.id + ": " + d.name));

结果:

1: Garbage
2: Garbage

第一个 Subscriber 先处理每个数据,在第一个 Subscriber 完成后第二个 Subscriber 开始处理数据,由于 Observable 在传递路径中使用的是对象引用,所以 第一个 Subscriber 中对对象做的修改,第二个 Subscriber 也会看到。

本文出自 云在千峰 http://blog.chengyunfeng.com/?p=968

RxJava 驯服数据流之副作用相关推荐

  1. 微服务精通之Hystrix原理解析

    前言 经过微服务精通之Ribbon原理解析的学习,我们了解到了服务消费者获取服务提供者实例的过程,在这之后,服务消费者会调用服务提供者的接口.但是在调用接口的过程中,我们经常会遇见服务之间的延迟和通信 ...

  2. 带你学开源项目:Meizhi Android之RxJava Retrofit最佳实践

    如果你对开源项目分析感兴趣,欢迎加入我们的android-open-source-project-cracking 原创地址:http://www.jianshu.com/p/47e72693a302 ...

  3. rxJava源码解析系列五之Disposable

    这个系列的前几篇文章是传统rxjava的数据流传递模式,我们知道,observable的链式调用,是下游被观察者持有上有被观察者的过程,发起订阅的时候是上游观察者持有下游观察者的过程.到最先的被观察者 ...

  4. RxJava----操作符:辅助操作符

    Observable Utility Operators(辅助操作符) delay 顾名思义,Delay操作符就是让发射数据的时机延后一段时间,这样所有的数据都会依次延后一段时间发射. log(&qu ...

  5. Android系统(74)--- 从零实现灵活且可高度定制的Android图片选择架构

    从零实现灵活且可高度定制的Android图片选择架构 https://www.jianshu.com/u/df76f81fe3ff 前言 这是我花费了数月闲暇时间从零开始写的一个库,在这期间,我学习到 ...

  6. 合并RxJava的Observable数据流

    本文讨论几种不同方式合并RxJava的Observable数据流. Observable介绍 Observable 序列,或简单称为Observable,表示异步数据流.这些概念遵循基于观察者模式,在 ...

  7. io.realm:rea_使Java具有响应性的框架和工具包:RxJava,Spring Reactor,Akka和Vert.x概述...

    io.realm:rea 如今,人们需要具有高用户体验的高响应性,交互式应用程序,这通常意味着要处理异步性,尤其是当应用程序涉及高负载,实时数据和多用户时. 由于Java是一种固有的语言,它固有地支持 ...

  8. 使Java具有响应性的框架和工具包:RxJava,Spring Reactor,Akka和Vert.x概述

    如今,人们需要具有高用户体验的高响应性,交互式应用程序,这通常意味着处理异步性,尤其是当这些应用程序涉及高负载,实时数据和多用户时. 由于Java是一种固有的支持命令式编程风格的面向对象语言,因此异步 ...

  9. RXJava源码详解

    rxjava Rx介绍 ReactiveX的历史 ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的 ...

最新文章

  1. 假如有Thread1、Thread2、ThreaD3、Thread4四条线程分别统计C、D、E、F四个盘的大小,所有线程都统计完毕交给Thread5线程去做汇总,应当如何实现?...
  2. linux 替换内核 img,查看更改linux内核initrd.img-Go语言中文社区
  3. 如何在CSDN上快速引用到网页内的局部位置?
  4. 【Python-ML】SKlearn库多元线性回归性能评估
  5. 在博文顶部添加文章字数及阅读时间信息:阅读本文需要xx分钟
  6. Java Web学习总结(8)——使用Cookie进行会话管理
  7. 十天学会单片机可能吗?单片机入门需要多久?
  8. html+浏览器自动全屏,web 使网站在浏览器中全屏显示 fullscreen=yes
  9. 【STM32F429的DSP教程】第14章 DSP统计函数-最大值,最小值,平均值和功率
  10. 可变剪切(选择性剪接)rmats2sashimiplot可视化安装与使用
  11. HDU 4415 Assassin's Creed(贪心)
  12. 图片裁切器Cropper.js的使用
  13. 使用 CSS 追踪用户
  14. JUR 项目评级:BB ,展望稳定 | TokenInsight
  15. [Android自定义控件]双圆圈内外旋转菜单
  16. my firefox常用的插件介绍
  17. Hive系列 (一):Hive搭建
  18. 固定资产管理系统的一般操作流程
  19. 十大经典php网店系统
  20. vs2010连接mysql数据库

热门文章

  1. python第二版课后习题答案_《python核心编程第二版》课后习题6-12答案
  2. 今宵除夕夜,天涯共此时
  3. html中的colspan是什么意思
  4. 最优控制理论 五+、极大值原理Bang-Bang控制问题的求解
  5. servlet cannot be resolved to a type的原因及解决方法
  6. word2003下的神秘咒语——灵活的棕色狐狸跳过懒狗
  7. 明源云客微信公众号 矩阵 开盘淘宝抢房 的一些技巧
  8. mysql乐观锁 秒杀_使用数据库乐观锁解决高并发秒杀问题,以及如何模拟高并发的场景,CyclicBarrier和CountDownLatch类的用法...
  9. 2022年数维杯国际赛ABCD题思路
  10. 微信公众号支付问题 - 当前页面的URL未注册