RxJava 提供了对事件序列进行变换的支持; 所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。

       不仅可以针对事件对象,还可以针对整个事件队列。

       变换部分主要来自《给Android 开发者的 RxJava 详解》

1.   事件对象变换-map()

  map():事件对象的直接变换;它是 RxJava最常用的变换;可以将Observable深入的对象1转换为对象2发送给Subscriber。

基本用法如下:

Observable.just(R.drawable.t)
        .map(new Func1<Integer, Drawable>() {
            @Override
            public Drawable call(Integer integer) {
                return getResources().getDrawable(integer);
            }
        })
        .subscribe(new Action1<Drawable>() {
            @Override
            public void call(Drawable drawable) {
                showImg(drawable);
            }
        });

2.    事件序列变换-flatMap()

flatMap() 也和 map() 相同,也是把传入的参数转化之后返回另一个对象;和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。

         flatMap() 的原理:

       1. 使用传入的事件对象创建一个 Observable 对象;

       2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;

       3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。

       这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。

//

扩展:由于可以在嵌套的Observable 中添加异步代码, flatMap() 也常用于嵌套的异步操作,例如嵌套的网络请求。

flatmap()基本用法如下,定义了一个Book类型,包含名称和章节列表;通过flatmap()打印出全部的章节列表。

Ø  Book.java

public class Book {
    public String name;
    public List<String> chapterList = new ArrayList<String>();

public void addChapter(String chapter) {
        chapterList.add(chapter);
    }
}

Ø  flagmap()使用

Book[] books = getBookList(5);
Observable.from(books)
        .flatMap(new Func1<Book, Observable<String>>() {
            @Override
            public Observable<String> call(Book book) {
                return Observable.from(book.chapterList);
            }
        })
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.v(TAG, s);
            }
        });

3.     变换的原理:lift()

所有的变换功能可能有所不同,实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。

lift()实现的源码:

/**
 * Lifts a function to the current Observable and returns a new Observable that when subscribed to will pass
 * the values of the current Observable through the Operator function.
 * <p>
 * In other words, this allows chaining Observers together on an Observable for acting on the values within
 * the Observable.
 * <p> {@code
 * observable.map(...).filter(...).take(5).lift(new OperatorA()).lift(new OperatorB(...)).subscribe()
 * }
 * <p>
 * If the operator you are creating is designed to act on the individual items emitted by a source
 * Observable, use {@code lift}. If your operator is designed to transform the source Observable as a whole
 * (for instance, by applying a particular set of existing RxJava operators to it) use {@link #compose}.
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code lift} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 * 
 * @param operator the Operator that implements the Observable-operating function to be applied to the source
 *             Observable
 * @return an Observable that is the result of applying the lifted Operator to the source Observable
 * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
 */
  public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber<? super R> o) {
            try {
                Subscriber<? super T> st = hook.onLift(operator).call(o);
                try {
                    // new Subscriber created and being subscribed with so 'onStart' it
                    st.onStart();
                    onSubscribe.call(st);
                } catch (Throwable e) {
                    // localized capture of errors rather than it skipping all operators 
                    // and ending up in the try/catch of the subscribe method which then
                    // prevents onErrorResumeNext and other similar approaches to error handling
                    Exceptions.throwIfFatal(e);
                    st.onError(e);
                }
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // if the lift function failed all we can do is pass the error to the final Subscriber
                // as we don't have the operator available to us
                o.onError(e);
            }
        }
    });
  }

通过lift()源码可以发现,生成了一个新的Observable(new);并且在新的Observable(new)的OnSubscribe(new)的call()回调方法中,创建了一个新的Subscriber(new),该新的Subscriber(最终目标)使用了最终调用的Subscriber的代理;再使用原来的Observable(old)的onSubscribe(old)来调用新的Subscriber(new)。

图示lift()变换过程-(图片来自《给Android 开发者的 RxJava 详解》)

lift() 实现过程

1)  lift() 创建一个新Observable ,此时加上之前的原始 Observable,已经有两个 Observable 了;

2) 而同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe;

3)  当用户调用经过 lift() 后的 Observable 的 subscribe() 的时候,使用的是 lift() 所返回的新的 Observable ,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新OnSubscribe,即在 lift() 中生成的那个 OnSubscribe;

4) 而这个新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在这个 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一个新的 Subscriber(Operator 就是在这里,通过自己的call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的『变换』代码以实现变换),然后利用这个新 Subscriber 向原始 Observable 进行订阅。

lift() 过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。

在 Observable 执行了lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。

4.   自定义Observable.Operator,直接调用lift()

下面实例实现的功能跟前面flatMap()实现的功能一致:

Observable.from(getBookList(4))
        .lift(new Observable.Operator<String, Book>() {
            @Override
            public Subscriber<? super Book> call(final Subscriber<? super String> subscriber) {
                return new Subscriber<Book>() {
                    @Override
                    public void onCompleted() {
                        subscriber.onCompleted();
                    }
  
                    @Override
                    public void onError(Throwable e) {
                        subscriber.onError(e);
                    }
  
                    @Override
                    public void onNext(Book book) {
                        for(String chapter : book.chapterList) {
                            subscriber.onNext(chapter);
                        }
                    }
                };
            }
        })
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.v(TAG, s);
            }
        });

5.   compose(Transformer)变换

compose(Transformer)变换,针对Observable 自身进行变换。假设在程序中有多个 Observable ,并且他们都需要应用一组相同的 lift() 变换,可以使用compose方式处理,而不是每个都调用一次相同的变换过程

实例代码,如需要将Book转换得到对应的全部章节,两个Observable对章节数据有不同的处理方式:

final Observable.Transformer bookTtransformer = new Observable.Transformer<Book, String>() {
    @Override
    public Observable<String> call(Observable<Book> bookObservable) {
        return bookObservable.flatMap(new Func1<Book, Observable<String>>() {
            @Override
            public Observable<String> call(Book book) {
                return Observable.from(book.chapterList);
            }
        });
    }
};
  
Observable.from(getBookList(4))
      .compose(bookTtransformer)
      .subscribe(new Action1<String>() {
          @Override
          public void call(String s) {
              //处理方式1
              Log.v(TAG, s);
          }
      });

Observable.from(getBookList(2))
      .compose(bookTtransformer)
      .subscribe(new Action1<String>() {
          @Override
          public void call(String s) {
              //处理方式2
              Log.v(TAG, s);
          }
      });

RxJava使用(四)变换相关推荐

  1. RxJava的四种Subjects:PublishSubject、ReplaySubject、BehaviorSubject、AsyncSubject的理解

    RxJava的四种Subjects:PublishSubject.ReplaySubject.BehaviorSubject.AsyncSubject的理解 Subject:它既是Observable ...

  2. RxJava系列四(过滤操作符)

    转载请注明出处:https://zhuanlan.zhihu.com/p/21966621 RxJava系列1(简介) RxJava系列2(基本概念及使用介绍) RxJava系列3(转换操作符) Rx ...

  3. RxJava学习(四利用RxJava打造自己的RxBus)

    前面说过Rxjava的功能很强大,不仅仅是实现链式的异步操作,它的功能很强大还可以通过RxBus实现EventBus的消息/事件传递功 能,我们来看看 RxBus 1 package com.exam ...

  4. 深入Java泛型(四):RxJava中泛型的使用分析

    RxJava出现在我们的视线已经很久了,我自己也有阅读过非常多的文章,谈不上精通,但是勉强称得上会一些简单的使用,近日总是对这种响应式的编程,对RxJava魂牵梦绕,深刻的感觉到自己对泛型的认识,理解 ...

  5. RxJava 源码解析之观察者模式

    了解 RxJava 的应该都知道是一个基于事务驱动的库,响应式编程的典范.提到事务驱动和响应就不得不说说,设计模式中观察者模式,已经了解的朋友,可以直接跳过观察者模式的介绍,直接到 RxJava 源码 ...

  6. android ------- 开发者的 RxJava 详解

    在正文开始之前的最后,放上 GitHub 链接和引入依赖的 gradle 代码: Github:  https://github.com/ReactiveX/RxJava  https://githu ...

  7. 给 Android 开发人员的 RxJava 具体解释

    前言 我从去年開始使用 RxJava .到如今一年多了. 今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava .并且使用的场景越来越多 . 而 ...

  8. RxJava学习入门

    RxJava是什么 一个词:异步. RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-bas ...

  9. RxJava菜鸟驿站(一)

    前言 终究没有经受住RxJava的诱惑,只恨自己来的比较晚,走起~ RxJava 是什么? 一个在 Java VM 上使用可观测的序列来组成异步的.基于事件的程序的库~ Rx Java 有什么优势? ...

  10. Android开源—RXJava观察者设计模式

    简介 RxJava 是一个响应式编程框架,采用观察者设计模式.所以自然少不了 Observable 和 Subscriber 这两个东东了. RxJava最核心的两个东西是Observables(被观 ...

最新文章

  1. 半年总结(2018-10)
  2. python 浏览器自动化测试,python中使用chrome进行自动化测试,浏览器变量设置
  3. 网络编程- 解决黏包现象方案一(六)
  4. Eclipse Source not found
  5. 掩膜区域内像素值_基于颜色空间采样的抠图算法
  6. 【转载】现代浏览器的工作原理
  7. coreboot学习6:ramstage阶段之芯片初始化流程
  8. Python学习-day20 django进阶篇
  9. 怎么爬before after之间的内容_关于伪元素::before和::after的用法
  10. AI人才平均月薪3万,最赚钱岗位出炉;上海人才吸引力跌至第四
  11. 10001_关于window10下其它主机无法ping通本地的设置
  12. 利用FreeType和OpenGL进行矢量字体渲染
  13. 自定义系统右键菜单工具-使用说明
  14. Codeforces Round #439 (Div. 2) E. The Untended Antiquity
  15. 实训——基于大数据Hadoop平台的医疗平台项目实战
  16. 一个人有没有领导力,就看这3点
  17. 如何打造爆款知识产品?
  18. 基于stm32f1(正点原子)的tft_lcd(ILI9341)学习
  19. 小学计算机小组期末总结范文,第一学期小学三年级级组工作总结
  20. 正在此计算机上搜索更新,解决:已在此计算机上检测到Microsoft Visual C++ 2010 Redistributable的更新版本...

热门文章

  1. 编java用jdk还是editplus_怎样运用EditPlus进行配置Java编译环境
  2. 251f与ips屏显示器对比_1千多元预算,2020年PS平面设计/摄影后期显示器推荐/选购指南(2k+高色域屏)...
  3. sql 只取一条记录_后端程序员必备:书写高质量SQL的30条建议
  4. Python进阶7——字典与集合
  5. python连接linux服务器并使用命令_python基于paramiko模块实现远程连接Linux虚拟机(服务器)并执行指定命令返回输出结果...
  6. c盘怎么扩容_给电脑减压,C盘清理全攻略!
  7. webpack搭建vue项目开发环境【文档向学习】
  8. Jupyter on Kubernetes机器学习-MLflow
  9. 半导体并购停不下来 ADI拟148亿美元收购Linear
  10. 面向对象数据库NDatabase_初识