系列文章

RxJava系列之简介和观察者设计模式
RxJava系列之上游与下游
RxJava系列之常用创建型操作符
RxJava系列之常用变换操作符
RxJava系列之常用过滤操作符
RxJava系列之常用条件操作符
RxJava系列之常用合并操作符
RxJava系列之常用异常操作符
RxJava系列之线程切换实战
RxJava系列之背压模式
RxJava系列之配合Retrofit
RxJava系列之泛型高级
RxJava系列之手写create操作符
RxJava系列之手写create操作符增加泛型限定
RxJava系列之手写just操作符
RxJava系列之手写map操作符
RxJava系列之手写切换线程

什么是上游和下游

在RxJava中,我们将事件的起源称之为上游,事件的处理称之为下游。上游产生事件之后,发射给下游,下游接收到并请求。如下图所示:

简单例子

/*** 流程整理 1* @param vieww*/public void r04(View vieww) {// 上游 Observable 被观察者Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> emitter) throws Exception {// 发射Log.d(TAG, "上游 subscribe: 开始发射..."); // todo 2emitter.onNext("RxJavaStudy");emitter.onComplete(); // 发射完成  // todo 4// 上游的最后log才会打印Log.d(TAG, "上游 subscribe: 发射完成");}}).subscribe(// 下游 Observer 观察者new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {// 弹出 加载框 ....Log.d(TAG, "上游和下游订阅成功 onSubscribe 1"); // todo 1}@Overridepublic void onNext(String s) {Log.d(TAG, "下游接收 onNext: " + s); // todo 3}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {// 隐藏加载框Log.d(TAG, "下游接收完成 onComplete"); // todo 5  只有接收完成之后,上游的最后log才会打印}});/***  D/MainActivity: 上游和下游订阅成功 onSubscribe 1*  D/MainActivity: 上游 subscribe: 开始发射...*  D/MainActivity: 下游接收 onNext: RxJavaStudy*  D/MainActivity: 下游接收完成 onComplete*  D/MainActivity: 上游 subscribe: 发射完成*/}

整个流程是,上下游订阅成功,下游收到通知,处理订阅成功事件。上游在订阅成功的回调中发射事件,下游收到onNext回调,处理上游发送的事件。上游发送完成事件,下游收到onComplete回调。

另外一个例子

  /*** 流程整理2* @param vieww*/public void r05(View vieww) {// 上游 Observable 被观察者Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> emitter) throws Exception {// 发射Log.d(TAG, "上游 subscribe: 开始发射..."); // todo 2emitter.onNext("RxJavaStudy");// emitter.onComplete(); // 发射完成  // todo 4// 上游的最后log才会打印// Log.d(TAG, "上游 subscribe: 发射完成");// emitter.onError(new IllegalAccessException("error rxJava"));// TODO 结论:在 onComplete();/onError 发射完成 之后 再发射事件  下游不再接收上游的事件/*emitter.onNext("a");emitter.onNext("b");emitter.onNext("c");*/// 发一百个事件emitter.onError(new IllegalAccessException("error rxJava")); // 发射错误事件emitter.onComplete(); // 发射完成// TODO 结论:已经发射了onComplete();, 再发射onError RxJava会报错,不允许// TODO 结论:先发射onError,再onComplete();,不会报错, 有问题(onComplete不会接收到了)}}).subscribe(// 下游 Observer 观察者new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {// 弹出 加载框 ....Log.d(TAG, "上游和下游订阅成功 onSubscribe 1"); // todo 1}@Overridepublic void onNext(String s) {Log.d(TAG, "下游接收 onNext: " + s); // todo 3}@Overridepublic void onError(Throwable e) {Log.d(TAG, "下游接收 onError: " + e.getMessage());}@Overridepublic void onComplete() {// 隐藏加载框Log.d(TAG, "下游接收完成 onComplete"); // todo 5  只有接收完成之后,上游的最后log才会打印}});}

在 onComplete();/onError 发射完成 之后 再发射事件 下游不再接收上游的事件
已经发射了onComplete();, 再发射onError RxJava会报错,不允许
已经发射了onComplete();, 再发射onError RxJava会报错,不允许

中断事件

Disposable d;/*** 切断下游,让下游不再接收上游的事件,也就是说不会去更新UI* @param view*/public void r06(View view) {// TODO 上游 ObservableObservable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> e) throws Exception {for (int i = 0; i < 100; i++) {e.onNext(i);}e.onComplete();}})// 订阅下游.subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {MainActivity.this.d = d;}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "下游接收 onNext: " + integer);// 接收上游的一个事件之后,就切断下游,让下游不再接收// d.dispose();}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}@Overrideprotected void onDestroy() {super.onDestroy();// 切断下游if (d != null) d.dispose();}

下游在订阅成功之后onSubscribe(Disposable d) 回调方法传递过来Disposable对象,可以通过这个对象可以切断下游。

RxJava系列之上游与下游相关推荐

  1. RxJava基本使用--上游与下游

    RxJava基本使用–上游与下游 var disposable : Disposable? = nullprivate fun rxTest1() {//起点 被观察者Observable.creat ...

  2. MKS上游和下游集成式压力控制器的技术分析及其替代解决方案

    摘要:目前的MKS系列集成式压力控制器本质上是一种流量调节和测量装置,无法直接用来进行准确的压力控制,而且MKS压力控制器还存在测量精度不高.压力控制范围有限和对工作介质洁净度要求很高的不足.为此,为 ...

  3. RxJava系列6(从微观角度解读RxJava源码)

    RxJava系列1(简介) RxJava系列2(基本概念及使用介绍) RxJava系列3(转换操作符) RxJava系列4(过滤操作符) RxJava系列5(组合操作符) RxJava系列6(从微观角 ...

  4. RxJava系列四(过滤操作符)

    转载请注明出处:https://zhuanlan.zhihu.com/p/21966621 RxJava系列1(简介) RxJava系列2(基本概念及使用介绍) RxJava系列3(转换操作符) Rx ...

  5. python 处理snp的vcf文件,统计snp在基因的intron、exon还是上游、下游还是不在基因及基因附近

    1.c处理vcf文件,初步统计snp位置 位置信息有5种 down_stream是gene下游2k以内,up_stream 是gene上游2k以内,gene_exon是snp在外显子内,gene_in ...

  6. 软件开发设计中的上游与下游

    生产流程中的上下游 让我们以一个简单的生产流程开始,尽管它跟软件开发没有关系,这样我们能以此为基础定义软件开发中的上下游. 上面的例子有三个步骤:收集部件.组装部件.喷漆. 一个生产流程跟一条河流很相 ...

  7. RxJava系列7(最佳实践)

    RxJava系列1(简介) RxJava系列2(基本概念及使用介绍) RxJava系列3(转换操作符) RxJava系列4(过滤操作符) RxJava系列5(组合操作符) RxJava系列6(从微观角 ...

  8. RxJava系列1(简介)

    RxJava系列1(简介) RxJava系列2(基本概念及使用介绍) RxJava系列3(转换操作符) RxJava系列4(过滤操作符) RxJava系列5(组合操作符) RxJava系列6(从微观角 ...

  9. 软件开发中的上游和下游

    听了腾讯犀牛鸟王佳对开源的讲解中,对其中的专业术语上游(upstream)和下游(downstream)不理解,看了这篇blog理解了特此转载记录. 看了以下blog后个人理解:自然界上水是高处往低处 ...

最新文章

  1. js跨域访问,No 'Access-Control-Allow-Origin' header is present on the requested resource
  2. ssh 框架引入service_搭建SSH开发框架时autowired注入为空的问题
  3. 常用数据结构的一部分类
  4. 2018.03.12、Android知识点-Java篇
  5. Redis配置和常用命令
  6. 专科学计算机没有吗,我是专科生,学计算机信息管理专业的,我没有拿到..._考研_帮考网...
  7. (一)数据结构和算法、线性结构
  8. SpringAOP底层API之代理对象执行流程
  9. SQL Server的下载和安装
  10. c语言常用例子,C语言经典例子100个
  11. Striped64 深入源码解析
  12. 我为Bill Gates熬夜加班的那个晚上
  13. Excel调用已有数据利用已经录入的项快速的生成下拉列表
  14. Java+Swing+Mysql学生宿舍管理系统
  15. 跳槽or裸辞?2022年真不建议···
  16. Hystrix Dashboard
  17. 网站是什么?网站的类型构成-从SEO到优化实战大师
  18. 青烟鸿影一盏茶,孤灯小楼听夜雨
  19. 最短路径问题(附航班最小价格练习)
  20. 基于Armitage的MSF自动化集成攻击实践

热门文章

  1. 制作一个简单3D相册
  2. gdal联合编译C++版本proj.4、geos、hdf4、hdf5、netcdf
  3. 微信公众号个性消息模版推送之添加进度条(制表符)
  4. 医学四视图-008-增加十字线开关功能,按钮显隐功能
  5. 开发过程中redis的rehash,布隆过滤器,redis持久化一起解决
  6. 计算机用户原始密码是多少,网上邻居的默认密码一般是多少
  7. 【Offer】ThoughtWorks 2020校招-国内-软件测试工程师 | 线上测评+群面+技术面+HR面(2019.8.4 - 8.22)
  8. Seeed STM32MP1开发板教程(0)前期准备篇
  9. 今天接到蚂蚁金服的电话面试,直接被虐成渣渣
  10. iphone11支持es6吗_iPhone 11 支持北斗导航系统吗?