ReactiveX 操作符

  • 创建操作
    • from
    • just
    • create
    • defer
    • interval
    • range
    • timer
    • empty never error
  • 变换操作
    • map
    • scan
    • buffer
    • window
    • groupBy
    • flatMap
  • 过滤操作
  • 结合操作
    • join
    • merge
    • and
    • startWith
    • combineLatest
  • 辅助操作
  • 条件和布尔操作

ReactiveX的每种编程语言的实现都实现了一组操作符的集合。不同的实现之间有很多重叠的部分,也有一些操作符只存在特定的实现中。每种实现都倾向于用那种编程语言中他们熟悉的上下文中相似的方法给这些操作符命名。

创建操作

创建Observable的各种方法。

  • just( ) — 将一个或多个对象转换成发射这个或这些对象的一个Observable
  • from( ) — 将一个Iterable, 一个Future, 或者一个数组转换成一个Observable
  • repeat( ) — 创建一个重复发射指定数据或数据序列的Observable
  • repeatWhen( ) — 创建一个重复发射指定数据或数据序列的Observable,它依赖于另一个Observable发射的数据
  • create( ) — 使用一个函数从头创建一个Observable
  • defer( ) — 只有当订阅者订阅才创建Observable;为每个订阅创建一个新的Observable
  • range( ) — 创建一个发射指定范围的整数序列的Observable
  • interval( ) — 创建一个按照给定的时间间隔发射整数序列的Observable
  • timer( ) — 创建一个在给定的延时之后发射单个数据的Observable
  • empty( ) — 创建一个什么都不做直接通知完成的Observable
  • error( ) — 创建一个什么都不做直接通知错误的Observable
  • never( ) — 创建一个不发射任何数据的Observable

from

        String[] array = {"a", "b", "c"};Observable.fromArray(array).subscribe(System.out::println);List<Integer> list = Arrays.asList(1, 2, 3, 45);Observable.fromIterable(list).subscribe(System.out::println);
        ExecutorService executor = Executors.newFixedThreadPool(3);Future<String> future = executor.submit(new Callable<String>() {@Overridepublic String call() throws Exception {return "hello world";}});Observable.fromFuture(future).subscribe(System.out::println);
        Observable.fromPublisher(new Publisher<String>() {@Overridepublic void subscribe(Subscriber<? super String> subscriber) {subscriber.onNext("fromPublisher");}}).subscribe(System.out::println);

just

        Observable.just("a", "b", "c").map(s -> "hello " + s).subscribe(System.out::println);

create

        Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> observableEmitter) throws Throwable {try {observableEmitter.onNext("world");observableEmitter.onComplete();//throw new RuntimeException("this is a question.");} catch (Throwable t) {observableEmitter.onNext("error:" + t.toString());}}}).subscribe(System.out::println);// world

defer

        Observable.defer(new Supplier<ObservableSource<String>>() {@Overridepublic ObservableSource<String> get() throws Throwable {return new ObservableSource<String>() {@Overridepublic void subscribe(Observer<? super String> observer) {observer.onNext("world");observer.onComplete();}};}}).subscribe(System.out::println);// world

interval

        Observable.interval(1, TimeUnit.SECONDS).subscribe(System.out::println);TimeUnit.SECONDS.sleep(10);// 0 1 2 3 4 5 6 7 8 9

range

        Observable.range(10, 10).subscribe(System.out::println);TimeUnit.SECONDS.sleep(3);// 10 11 12 13 ... 19

timer

        Observable.timer(1, TimeUnit.SECONDS).subscribe(System.out::println);TimeUnit.SECONDS.sleep(10);// 0

empty never error

        // 创建一个不发射任何数据但是正常终止的ObservableObservable o1 = Observable.empty();// 创建一个不发射数据也不终止的ObservableObservable o2 = Observable.never();// 创建一个不发射数据以一个错误终止的ObservableObservable o3 = Observable.error(new RuntimeException("rxjava error."));

变换操作

对Observable发射的数据执行变换操作的各种操作符。

  • map( ) — 对序列的每一项都应用一个函数来变换Observable发射的数据序列
  • flatMap( ), concatMap( ), and flatMapIterable( ) — 将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable
  • switchMap( ) — 将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据
  • scan( ) — 对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值
  • groupBy( ) — 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据
  • buffer( ) — 它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
  • window( ) — 定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项
  • cast( ) — 在发射之前强制将Observable发射的所有数据转换为指定类型

map

        Observable.just("a", "b", "c").map(s -> "hello " + s).subscribe(System.out::println);

scan

        // Scan操作符对原始Observable发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。这个操作符在某些情况下被叫做accumulatorObservable.range(1, 100).scan((s, v) -> s + v).subscribe(System.out::println);

buffer

        // 定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。// buffer(count)以列表(List)的形式发射非重叠的缓存,每一个缓存至多包含来自原始Observable的count项数据(最后发射的列表数据可能少于count项)Observable.just("a", "b", "c", "g", "q").buffer(2).subscribe(System.out::println);
       // buffer(count, skip)从原始Observable的第一项数据开始创建新的缓存,此后每当收到skip项数据,用count项数据填充缓存:开头的一项和后续的count-1项,它以列表(List)的形式发射缓存,取决于count和skip的值,这些缓存可能会有重叠部分(比如skip < count时),也可能会有间隙(比如skip > count时)。Observable.just("a", "b", "c", "g", "q").buffer(2,3).subscribe(System.out::println);

window

       // 定期将来自原始Observable的数据分解为一个Observable窗口,发射这些窗口,而不是每次发射一项数据// Window和Buffer类似,但不是发射来自原始Observable的数据包,它发射的是ObservablesObservable.just("a", "b", "c", "g", "q").window(2).subscribe(System.out::println);

groupBy

        Observable.just("a", "b", "c", "g", "q").groupBy(new Function<String, Integer>() {@Overridepublic Integer apply(String s) throws Throwable {return Integer.valueOf(s.toCharArray()[0]).intValue();}}).subscribe(new Consumer<GroupedObservable<Integer, String>>() {@Overridepublic void accept(GroupedObservable<Integer, String> integerStringGroupedObservable) throws Throwable {System.out.println(integerStringGroupedObservable.getKey());}});

flatMap

   Observable.just("a", "b", "c", "g", "q").flatMap(v -> Observable.just(v).map(w -> "hello " + w)).subscribe(System.out::println);

过滤操作

可用于过滤和选择Observable发射的数据序列

  • filter( ) — 过滤数据
  • takeLast( ) — 只发射最后的N项数据
  • last( ) — 只发射最后的一项数据
  • lastOrDefault( ) — 只发射最后的一项数据,如果Observable为空就发射默认值
  • takeLastBuffer( ) — 将最后的N项数据当做单个数据发射
  • skip( ) — 跳过开始的N项数据
  • skipLast( ) — 跳过最后的N项数据
  • take( ) — 只发射开始的N项数据
  • first( ) and takeFirst( ) — 只发射第一项数据,或者满足某种条件的第一项数据
  • firstOrDefault( ) — 只发射第一项数据,如果Observable为空就发射默认值
  • elementAt( ) — 发射第N项数据
  • elementAtOrDefault( ) — 发射第N项数据,如果Observable数据少于N项就发射默认值
  • sample( ) or throttleLast( ) — 定期发射Observable最近的数据
  • throttleFirst( ) — 定期发射Observable发射的第一项数据
  • throttleWithTimeout( ) or debounce( ) — 只有当Observable在指定的时间后还没有发射数据时,才发射一个数据
  • timeout( ) — 如果在一个指定的时间段后还没发射数据,就发射一个异常
  • distinct( ) — 过滤掉重复数据
  • distinctUntilChanged( ) — 过滤掉连续重复的数据
  • ofType( ) — 只发射指定类型的数据
  • ignoreElements( ) — 丢弃所有的正常数据,只发射错误或完成通知

结合操作

可用于组合多个Observables。

startWith( ) — 在数据序列的开头增加一项数据
merge( ) — 将多个Observable合并为一个
mergeDelayError( ) — 合并多个Observables,让没有错误的Observable都完成后再发射错误通知
zip( ) — 使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果
and( ), then( ), and when( ) — (rxjava-joins) 通过模式和计划组合多个Observables发射的数据集合
combineLatest( ) — 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果
join( ) and groupJoin( ) — 无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射
switchOnNext( ) — 将一个发射Observables的Observable转换成另一个Observable,后者发射这些Observables最近发射的数据

join

merge

and

startWith

combineLatest

辅助操作

用于Observable的辅助操作符

materialize( ) — 将Observable转换成一个通知列表convert an Observable into a list of Notifications
dematerialize( ) — 将上面的结果逆转回一个Observable
timestamp( ) — 给Observable发射的每个数据项添加一个时间戳
serialize( ) — 强制Observable按次序发射数据并且要求功能是完好的
cache( ) — 记住Observable发射的数据序列并发射相同的数据序列给后续的订阅者
observeOn( ) — 指定观察者观察Observable的调度器
subscribeOn( ) — 指定Observable执行任务的调度器
doOnEach( ) — 注册一个动作,对Observable发射的每个数据项使用
doOnCompleted( ) — 注册一个动作,对正常完成的Observable使用
doOnError( ) — 注册一个动作,对发生错误的Observable使用
doOnTerminate( ) — 注册一个动作,对完成的Observable使用,无论是否发生错误
doOnSubscribe( ) — 注册一个动作,在观察者订阅时使用
doOnUnsubscribe( ) — 注册一个动作,在观察者取消订阅时使用
finallyDo( ) — 注册一个动作,在Observable完成时使用
delay( ) — 延时发射Observable的结果
delaySubscription( ) — 延时处理订阅请求
timeInterval( ) — 定期发射数据
using( ) — 创建一个只在Observable生命周期存在的资源
single( ) — 强制返回单个数据,否则抛出异常
singleOrDefault( ) — 如果Observable完成时返回了单个数据,就返回它,否则返回默认数据
toFuture( ), toIterable( ), toList( ) — 将Observable转换为其它对象或数据结构

条件和布尔操作

用于根据条件发射或变换Observables,或者对它们做布尔运算:

条件操作符

amb( ) — 给定多个Observable,只让第一个发射数据的Observable发射全部数据
defaultIfEmpty( ) — 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据
(rxjava-computation-expressions) doWhile( ) — 发射原始Observable的数据序列,然后重复发射这个序列直到不满足这个条件为止
(rxjava-computation-expressions) ifThen( ) — 只有当某个条件为真时才发射原始Observable的数据序列,否则发射一个空的或默认的序列
skipUntil( ) — 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据
skipWhile( ) — 丢弃原始Observable发射的数据,直到一个特定的条件为假,然后发射原始Observable剩余的数据
(rxjava-computation-expressions) switchCase( ) — 基于一个计算结果,发射一个指定Observable的数据序列
takeUntil( ) — 发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知
takeWhile( ) and takeWhileWithIndex( ) — 发射原始Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据
(rxjava-computation-expressions) whileDo( ) — 如果条件为true,则发射源Observable数据序列,并且只要条件保持为true就重复发射此数据序列

布尔操作符

all( ) — 判断是否所有的数据项都满足某个条件
contains( ) — 判断Observable是否会发射一个指定的值
exists( ) and isEmpty( ) — 判断Observable是否发射了一个值
sequenceEqual( ) — 判断两个Observables发射的序列是否相等

ReactiveX 操作符相关推荐

  1. UniRx之操作符详解-ReactiveX语法

    前言 UniRx中由很多操作符,注意要分为三类 Linq操作符,和Linq语法风格一致 ReactiveX操作符,从Rx.Net库继承下来的操作符. UniRx操作符,UniRx针对Unity的独有操 ...

  2. RxJava 基础的使用

    学习地址: github wiki 学习地址 ReactiveX 操作符学习地址 RxJava是ReactiveX(反应性扩展)的Java VM实现:一个用于通过使用可观察的序列组成异步和基于事件的程 ...

  3. RxJava 操作符 do

    看下文档给的图片 注册一项操作以应对各种可观察的生命周期事件 do的操作符有很多具体如下 下面看下器使用 Observable.create(new ObservableOnSubscribe< ...

  4. RxJava操作符在android中的使用场景详解(一)

    转载请注明出处:http://www.wangxinarhat.com/2016/04/19/2016-04-19-rxjava-android-operate1/ 最近学习了RxJava在andro ...

  5. RxSwift笔记七其他操作符

    简介 git地址: https://github.com/ReactiveX/RxSwift参考资料:http://t.swift.gg/d/2-rxswiftReactiveX是通过可观察的流实现异 ...

  6. ReactiveX流式编程—从xstream讲起

    ReactiveX流式编程 ReactiveX来自微软,它是一种针对异步数据流的编程.简单来说,它将一切数据,包括HTTP请求,DOM事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处 ...

  7. Rxjava(2.操作符)

    参考地址 英文版 中文版 Rxjava(1.基础篇) lambda表达式 ReactiveX中文翻译文档 本文主要分为: 准备工作 改进 还可以更好 丰富的操作符 其他的操作符(归纳几十个) 写下本文 ...

  8. RxJava过滤操作符

    概述 过滤操作符用于过滤和选择Observable发射的数据序列,让Observable只返回满足我们条件的数据. Debounce Debounce会过滤掉发射速率过快的数据项,相当于限流,但是需要 ...

  9. Carson带你学Android:RxJava过滤操作符

    前言 Rxjava由于其基于事件流的链式调用.逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎. 今天,我将为大家详细介绍RxJava操作符中最常用的 过滤操作符,希望你们会 ...

最新文章

  1. ASP.NET MVC使用Bootstrap系列(3)——使用Bootstrap 组件
  2. js 乘法除法精度问题
  3. 《每日一题》738. Monotone Increasing Digits 单调递增的数字
  4. 【转】jenkins 忘记admin用户账号密码
  5. php h2,微信连接失败:一直返回h2Moved/h2 (终于搞定了)
  6. pythonbyte转int_Python将byte数组转换为int详解
  7. mysql查询操作的5种子句
  8. python特性描述_详解 Python 最优雅的特性之一 — 描述符
  9. 888. 公平的糖果棒交换
  10. 具体数学-第1课(递归求解实际问题)
  11. 面试要10K,HR说你只值7K,怎么斗得过?
  12. 推荐几个我珍藏的公众号~超级无敌!
  13. 12星座哪些人会重友轻爱?
  14. 汇编语言10堆栈平衡
  15. DSP28335学习记录(五)——eCAP、eQEP
  16. 【从零开始学习YOLOv3】3.YOLOv3的数据组织和处理
  17. socket传输案例
  18. 迪杰斯特拉(Dijkstra)
  19. STM32F105RCT6使用CubeMX初始化工程——2:初始化CAN通信
  20. 视觉镜头上的闪回画面

热门文章

  1. Android逆向之去除APK中的广告
  2. ff14注册完服务器可以转,FF14怎么转服 FF14转服条件一览-游侠网
  3. Mac iterm/iterm2快捷键
  4. javac编译错误: 程序包 com.sun.xxx 不存在
  5. vue实现3D饼状图
  6. 传智杯C语言题库,[传智杯]补刀-题解(C语言代码)
  7. 用matlab实时读取串口数据并动态显示曲线
  8. 让CocoStudio变成libgdx的UI编辑器
  9. JavaScript字符串操作,把短线(-)命名格式改变为驼峰命名
  10. 苹果CMS接入个人支付收款GOGO支付插件