• Filter 只发射通过了谓词测试的数据项
  • OfType ofType是filter操作符的一个特殊形式它过滤一个Observable只返回指定类型的数据
  • Take 只发射开始的N项数据
  • TakeLast 只发射最后N个元素
  • TakeLastBuffer 将最后的N项数据当做单个数据发射
  • Skip 跳过开始的N项数据
  • SkipLast 跳过后面的N项数据
  • Distinct 过滤掉重复数据
  • DistinctUntilChanged 过滤掉连续重复的数据
  • ElementAt 发射第N项数据
  • ElementAtOrDefault 发射第N项数据如果索引值大于数据项数它会发射一个默认值通过额外的参数指定
  • First 只发射第一项数据
  • TakeFirst 返回一个可观察到的发射仅由源观测中满足指定条件发射的第一个项目
  • Single single操作符也与first类似
  • Last 只发射最后一项或者满足某个条件的最后一项数据
  • Sample 定期发射Observable最近发射的数据项
  • ThrottleFirst throttleFirst与throttleLastsample不同在每个采样周期内它总是发射原始Observable的第一项数据而不是最近的一项
  • ThrottleWithTimeout  or Debounce  只有当Observable在指定的时间后还没有发射数据时才发射一个数据
  • Timeout 如果在一个指定的时间段后还没发射数据就发射一个异常
  • IgnoreElements 丢弃所有的正常数据只发射错误或完成通知

1. Filter —> 只发射通过了谓词测试的数据项

Filter操作符使用你指定的一个谓词函数测试数据项,只有通过测试的数据才会被发射。

示例代码

Observable.just(1, 2, 3, 4, 5).filter(new Func1<Integer, Boolean>() {@Overridepublic Boolean call(Integer item) {return( item < 4 );}}).subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {System.out.println("Next: " + item);}@Overridepublic void onError(Throwable error) {System.err.println("Error: " + error.getMessage());}@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}});

输出:

Next: 1
Next: 2
Next: 3
Sequence complete.

filter默认不在任何特定的调度器上执行。

  • Javadoc:filter(Func1)

2. OfType —> ofType是filter操作符的一个特殊形式。它过滤一个Observable只返回指定类型的数据。

ofType默认不在任何特定的调度器上指定。

示例代码:

Observable.just(1,"sb",0.1f).ofType(String.class).subscribe(new Action1<String>() {@Overridepublic void call(String s) {System.out.println(s);}});

输出:

sb
  • Javadoc:ofType(Class)

3. Take —> 只发射开始的N项数据

使用Take操作符让你可以修改Observable的行为,只返回前面的N项数据,然后发射完成通知,忽略剩余的数据。

如果你对一个Observable使用take(n)(或它的同义词limit(n))操作符,而那个Observable发射的数据少于N项,那么take操作生成的Observable不会抛异常或发射onError通知,在完成前它只会发射相同的少量数据。

示例代码:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8).take(4).subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {System.out.println("Next: " + item);}@Overridepublic void onError(Throwable error) {System.err.println("Error: " + error.getMessage());}@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}});

输出:

Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.

take(int)默认不任何特定的调度器上执行。

  • Javadoc:take(int)

take的这个变体接受一个时长而不是数量参数。它会丢发射Observable开始的那段时间发射的数据,时长和时间单位通过参数指定。

take的这个变体默认在computation调度器上执行,但是你可以使用第三个参数指定其它的调度器。

  • Javadoc:take(long,TimeUnit)
  • Javadoc:take(long,TimeUnit,Scheduler)

4. TakeLast —> 只发射最后N个元素

takeLast操作符是把源Observable产生的结果的后n项提交给订阅者,提交时机是Observable发布onCompleted通知之时。

示例代码:

Observable.just(1,2,3,4,5,6,7).takeLast(2).subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {System.out.println("Next: " + item);}@Overridepublic void onError(Throwable error) {System.err.println("Error: " + error.getMessage());}@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}});

输出:

Next: 6
Next: 7
Sequence complete.
  • Javadoc:takeLast(int)

5. TakeLastBuffer —> 将最后的N项数据当做单个数据发射

它和takeLast类似,,唯一的不同是它把所有的数据项收集到一个List再发射,而不是依次发射一个。

示例代码:

Observable.just(1,2,3,4).takeLastBuffer(2).subscribe(new Action1<List<Integer>>() {@Overridepublic void call(List<Integer> integers) {String s = "";for(Integer str : integers){s = s +str +",";}System.out.println(s);}});

输出:

I/System.out: 3,4,
  • Javadoc:takeLastBuffer(int)
  • Javadoc:takeLastBuffer(long,TimeUnit)
  • Javadoc:takeLastBuffer(long,TimeUnit,Scheduler)
  • Javadoc:takeLastBuffer(int,long,TimeUnit)
  • Javadoc:takeLastBuffer(int,long,TimeUnit,Scheduler)

6. Skip —> 跳过开始的N项数据

抑制Observable发射的前N项数据

使用Skip操作符,你可以忽略Observable发射的前N项数据,只保留之后的数据。

skip的这个变体默认不在任何特定的调度器上执行。

示例代码

Observable.just(1,2,3,4).skip(1).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {System.out.println(integer+"");}});

输出

2
3
4
  • Javadoc: skip(int)

skip的这个变体接受一个时长而不是数量参数。它会丢弃原始Observable开始的那段时间发射的数据,时长和时间单位通过参数指定。

skip的这个变体默认在computation调度器上执行,但是你可以使用第三个参数指定其它的调度器。

  • Javadoc: skip(long,TimeUnit)
  • Javadoc: skip(long,TimeUnit,Scheduler)

7. SkipLast —> 跳过后面的N项数据

抑制Observable发射的后N项数据

使用SkipLast操作符修改原始Observable,你可以忽略Observable发射的后N项数据,只保留前面的数据。

使用SkipLast操作符,你可以忽略原始Observable发射的后N项数据,只保留之前的数据。注意:这个机制是这样实现的:延迟原始Observable发射的任何数据项,直到它发射了N项数据。

示例代码:

Observable.just(1,2,3,4).skipLast(1).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {System.out.println(integer+"");}});

输出

1
2
3

skipLast的这个变体默认不在任何特定的调度器上执行。

  • Javadoc:skipLast(int)

还有一个skipLast变体接受一个时长而不是数量参数。它会丢弃在原始Observable的生命周期内最后一段时间内发射的数据。时长和时间单位通过参数指定。

注意:这个机制是这样实现的:延迟原始Observable发射的任何数据项,直到自这次发射之后过了给定的时长。

skipLast的这个变体默认在computation调度器上执行,但是你可以使用第三个参数指定其它的调度器。

  • Javadoc:skipLast(long,TimeUnit)
  • Javadoc:skipLast(long,TimeUnit,Scheduler)

8. Distinct —> 过滤掉重复数据

Distinct的过滤规则是:只允许还没有发射过的数据项通过。

在某些实现中,有一些变体允许你调整判定两个数据不同(distinct)的标准。还有一些实现只比较一项数据和它的直接前驱,因此只会从序列中过滤掉连续重复的数据。

distinct()

示例代码:

Observable.just(1, 2, 1, 1, 2, 3).distinct().subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {System.out.println("Next: " + item);}@Overridepublic void onError(Throwable error) {System.err.println("Error: " + error.getMessage());}@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}});

输出

Next: 1
Next: 2
Next: 3
Sequence complete.
  • Javadoc:distinct()

distinct(Func1)

这个操作符有一个变体接受一个函数。这个函数根据原始Observable发射的数据项产生一个Key,然后,比较这些Key而不是数据本身,来判定两个数据是否是不同的。

示例代码:

Observable.just(1,2,3,4,5,6).distinct(new Func1<Integer, Integer>() {@Overridepublic Integer call(Integer integer) {return integer%3;}}).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {System.out.println(integer+"");}});

输出: 1% 3= 1 , 2%3 =2 ,3%3 = 0 , 4%3 = 1 , 5%3=2 ,6%3 = 0 ,后面三个和前面三个的值重复去掉

I/System.out: 1
I/System.out: 2
I/System.out: 3
  • Javadoc:distinct(Func1)

9. DistinctUntilChanged —> 过滤掉连续重复的数据

DistinctUntilChanged()

示例代码:

Observable.just(1,2,2,2,5,6).distinctUntilChanged().subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {System.out.println(integer+"");}});

输出:

1
2
5
6

DistinctUntilChanged(Func1)

distinct(Func1)一样,根据一个函数产生的Key判定两个相邻的数据项是不是不同的。

示例代码

Observable.just(1,2,2,2,5,11).distinctUntilChanged(new Func1<Integer, Integer>() {@Overridepublic Integer call(Integer integer) {return integer %2;}}).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {System.out.println(integer+"");}});

输出

1
2
5

distinctdistinctUntilChanged默认不在任何特定的调度器上执行。

  • Javadoc:distinctUntilChanged(Func1)
  • *

10. ElementAt —> 发射第N项数据

ElementAt操作符获取原始Observable发射的数据序列指定索引位置的数据项,然后当做自己的唯一数据发射。

RxJava将这个操作符实现为elementAt,给它传递一个基于0的索引值,它会发射原始Observable数据序列对应索引位置的值,如果你传递给elementAt的值为5,那么它会发射第项的数据。

如果你传递的是一个负数,或者原始Observable的数据项数小于index+1,将会抛出一个IndexOutOfBoundsException异常。

示例代码:

Observable.just(1,2,3,4,5,6).elementAt(2).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {System.out.println("Next:" + integer);}}, new Action1<Throwable>() {@Overridepublic void call(Throwable throwable) {System.out.println("Error:" + throwable.getMessage());}}, new Action0() {@Overridepublic void call() {System.out.println("completed!");}});

输出:

Next:3
completed!
  • Javadoc: elementAt(int)

11. ElementAtOrDefault —> 发射第N项数据,如果索引值大于数据项数,它会发射一个默认值(通过额外的参数指定)

RxJava还实现了elementAtOrDefault操作符。与elementAt的区别是,如果索引值大于数据项数,它会发射一个默认值(通过额外的参数指定),而不是抛出异常。但是如果你传递一个负数索引值,它仍然会抛出一个IndexOutOfBoundsException异常。

示例代码:

Observable.just(1,2,3,4,5,6).elementAtOrDefault(13,999).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {System.out.println(integer+"");}});

输出:

999

elementAt和elementAtOrDefault默认不在任何特定的调度器上执行。

  • Javadoc:Javadoc: elementAtOrDefault(int,T)

12. First —> 只发射第一项数据

如果你只对Observable发射的第一项数据,或者满足某个条件的第一项数据感兴趣,你可以使用First操作符。

在某些实现中,First没有实现为一个返回Observable的过滤操作符,而是实现为一个在当时就发射原始Observable指定数据项的阻塞函数。在这些实现中,如果你想要的是一个过滤操作符,最好使用Take(1)或者ElementAt(0)

在一些实现中还有一个Single操作符。它的行为与First类似,但为了确保只发射单个值,它会等待原始Observable终止(否则,不是发射那个值,而是以一个错误通知终止)。你可以使用它从原始Observable获取第一项数据,而且也确保只发射一项数据。

RxJava中,这个操作符被实现为first,firstOrDefaulttakeFirst

可能容易混淆,BlockingObservable也有名叫firstfirstOrDefault的操作符,它们会阻塞并返回值,不是立即返回一个Observable

还有几个其它的操作符执行类似的功能。

First()

示例代码:

Observable.just(1, 2, 3).first().subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {System.out.println("Next: " + item);}@Overridepublic void onError(Throwable error) {System.err.println("Error: " + error.getMessage());}@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}});

输出:

Next: 1
Sequence complete.
  • Java:first()

First(Func1)

传递一个谓词函数给first,然后发射这个函数判定为true的第一项数据。

示例代码:

Observable.just(1,2,3,4,5,6).first(new Func1<Integer, Boolean>() {@Overridepublic Boolean call(Integer integer) {return integer>3;}}).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {System.out.println(integer+"");}});

输出

4
  • Javadoc:first(Func1)

FirstOrDefault(T)

firstOrDefaultfirst类似,但是在Observable没有发射任何数据时发射一个你在参数中指定的默认值。

示例代码:

Observable.empty().firstOrDefault("fuck you").subscribe(new Action1<Object>() {@Overridepublic void call(Object o) {System.out.println(o+"");}});

输出

fuck you
  • Javadoc:firstOrDefault(T)

FirstOrDefault(T, Func1)

传递一个谓词函数给firstOrDefault,然后发射这个函数判定为true的第一项数据,如果没有数据通过了谓词测试就发射一个默认值。

示例代码:

Observable.just(1,2,3,4,5,6).firstOrDefault(99, new Func1<Integer, Boolean>() {@Overridepublic Boolean call(Integer integer) {return integer == 4;}}).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {System.out.println(integer+"");}});

输出:

4
  • Javadoc:firstOrDefault(T, Func1)

13. TakeFirst —> 返回一个可观察到的发射仅由源观测中满足指定条件发射的第一个项目。

takeFirst操作符类似于take操作符,同时也类似于first操作符,都是获取源Observable产生的结果列表中符合指定条件的前一个或多个,与first操作符不同的是,first操作符如果获取不到数据,则会抛出NoSuchElementException异常,而takeFirst则会返回一个空的Observable,该Observable只有onCompleted通知而没有onNext通知。

示例代码:

Observable.just(1,2,3,4,5,6,7).takeFirst(new Func1<Integer, Boolean>() {@Overridepublic Boolean call(Integer integer) {//获取数值大于3的数据return integer>3;}}).subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {System.out.println("Next: " + item);}@Overridepublic void onError(Throwable error) {System.err.println("Error: " + error.getMessage());}@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}});

输出

Next: 4
Sequence complete.
  • Javadoc:takeFirst(Func1)

14. Single —> single操作符也与first类似

Single()

single操作符也与first类似,但是如果原始Observable在完成之前不是正好发射一次数据,它会抛出一个NoSuchElementException

示例代码:


Observable.just(1,2).single().subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {System.out.println("===============>"+integer+"");}});

输出

rx.exceptions.OnErrorNotImplementedException: Sequence contains too many elements
  • Javadoc:single()

Single(Func1)

single的变体接受一个谓词函数,发射满足条件的单个值,如果不是正好只有一个数据项满足条件,会以错误通知终止。

示例代码:

Observable.just(1,2,3,4,5,6).single(new Func1<Integer, Boolean>() {@Overridepublic Boolean call(Integer integer) {return integer>5; // 输出值为6return integer>3; // 报错 Sequence contains too many elementsreturn integer>6; // 报错 Sequence contains no elements}}).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {System.out.println(integer+"");}});

输出:

return integer>5; // 输出值为6
return integer>3; // 报错 Sequence contains too many elements
return integer>6; // 报错 Sequence contains no elements
  • Javadoc:single(Func1)

singleOrDefault(T)

firstOrDefault类似,但是如果原始Observable发射超过一个的数据,会以错误通知终止。

示例代码:

Observable.just("1","233").singleOrDefault("fuck you two").subscribe(new Action1<Object>() {@Overridepublic void call(Object o) {Log.i("sss",o+"");}});

输出:

rx.exceptions.OnErrorNotImplementedException: Sequence contains too many elements
  • Javadoc:singleOrDefault(T)
  • singleOrDefault(T,Func1))

firstOrDefault(T, Func1)类似,如果没有数据满足条件,返回默认值;如果有多个数据满足条件,以错误通知终止。

示例代码

Observable.just(1,2,3,4,5,6,7,8).singleOrDefault(666, new Func1<Integer, Boolean>() {@Overridepublic Boolean call(Integer s) {return s>4;       //rx.exceptions.OnErrorNotImplementedException: Sequence contains too many elementsreturn s>12;      // 666return s>7;       // 8}}).subscribe(new Action1<Object>() {@Overridepublic void call(Object o) {Log.i("sss",o+"");}});
  • Javadoc:singleOrDefault(Func1,T)

15. Last —> 只发射最后一项(或者满足某个条件的最后一项)数据

如果你只对Observable发射的最后一项数据,或者满足某个条件的最后一项数据感兴趣,你可以使用Last操作符。

在某些实现中,Last没有实现为一个返回Observable的过滤操作符,而是实现为一个在当时就发射原始Observable指定数据项的阻塞函数。在这些实现中,如果你想要的是一个过滤操作符,最好使用TakeLast(1)。

RxJava中的实现是lastlastOrDefault

可能容易混淆,BlockingObservable也有名叫lastlastOrDefault的操作符,它们会阻塞并返回值,不是立即返回一个Observable

last()

只发射最后一项数据,使用没有参数的last操作符。

示例代码:

Observable.just(1, 2, 3).last().subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {System.out.println("Next: " + item);}@Overridepublic void onError(Throwable error) {System.err.println("Error: " + error.getMessage());}@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}});

输出:

Next: 3
Sequence complete.
  • Javadoc:just()

last(Func1)

这个版本的last也是接受一个谓词函数,返回一个发射原始Observable中满足条件的最后一项数据的Observable

示例代码:

Observable.just(1,2,3,4,5,6,7,8).last(new Func1<Integer, Boolean>() {@Overridepublic Boolean call(Integer integer) {return integer < 6;}}).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer o) {Log.i("sss",o+"");tv.setText(""+o);}});

输出:

5
  • Javadoc:last(Func1)

lastOrDefault(T)

lastOrDefaultlast类似,不同的是,如果原始Observable没有发射任何值,它发射你指定的默认值。

示例代码:

Observable.empty().lastOrDefault(99).subscribe(new Action1<Object>() {@Overridepublic void call(Object o) {Log.i("sss",o+"");}});

输出:

99
  • Javadoc:lastOrDefault(T)

lastOrDefault(T,Fun1)

示例代码:

Observable.just(1,2,3,4,5,6,7,8,9).lastOrDefault(99, new Func1<Integer, Boolean>() {@Overridepublic Boolean call(Integer integer) {return integer > 10;}}).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer o) {Log.i("sss",o+"");}});

输出:

999
  • Javadoc:lastOrDefault(T,Func1)

16. Sample —> 定期发射Observable最近发射的数据项

Sample操作符定时查看一个Observable,然后发射自上次采样以来它最近发射的数据。

在某些实现中,有一个ThrottleFirst操作符的功能类似,但不是发射采样期间的最近的数据,而是发射在那段时间内的第一项数据。

RxJava将这个操作符实现为samplethrottleLast

注意:如果自上次采样以来,原始Observable没有发射任何数据,这个操作返回的Observable在那段时间内也不会发射任何数据。

sample的这个变体每当第二个Observable发射一个数据(或者当它终止)时就对原始Observable进行采样。第二个Observable通过参数传递给sample

sample的这个变体默认不在任何特定的调度器上执行。

示例代码:

Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {if(subscriber.isUnsubscribed()) return;try {//前8个数字产生的时间间隔为1秒,后一个间隔为3秒for (int i = 1; i < 9; i++) {subscriber.onNext(i);Thread.sleep(1000);}Thread.sleep(2000);subscriber.onNext(9);subscriber.onCompleted();} catch(Exception e){subscriber.onError(e);}}}).subscribeOn(Schedulers.newThread()).sample(2200, TimeUnit.MILLISECONDS)  //采样间隔时间为2200毫秒.subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {System.out.println("Next: " + item);}@Overridepublic void onError(Throwable error) {System.err.println("Error: " + error.getMessage());}@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}});

输出:

Next: 3
Next: 5
Next: 7
Next: 8
Next: 9
Sequence complete.

sample(别名throttleLast)的一个变体按照你参数中指定的时间间隔定时采样(TimeUnit指定时间单位)。

sample的这个变体默认在computation调度器上执行,但是你可以使用第三个参数指定其它的调度器。

  • Javadoc:sample(long,TimeUnit)或throttleLast(long,TimeUnit)
  • Javadoc:sample(long,TimeUnit,Scheduler) 或throttleLast(long,TimeUnit,Scheduler)

17. ThrottleFirst —> throttleFirst与throttleLast/sample不同,在每个采样周期内,它总是发射原始Observable的第一项数据,而不是最近的一项。

throttleFirst操作符默认在computation调度器上执行,但是你可以使用第三个参数指定其它的调度器。

示例代码:

Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {if(subscriber.isUnsubscribed()) return;try {//前8个数字产生的时间间隔为1秒,后一个间隔为3秒for (int i = 1; i < 9; i++) {subscriber.onNext(i);Thread.sleep(1000);}Thread.sleep(2000);subscriber.onNext(9);subscriber.onCompleted();} catch(Exception e){subscriber.onError(e);}}}).subscribeOn(Schedulers.newThread()).throttleFirst(2200, TimeUnit.MILLISECONDS)  //采样间隔时间为2200毫秒.subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {System.out.println("Next: " + item);}@Overridepublic void onError(Throwable error) {System.err.println("Error: " + error.getMessage());}@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}});

输出:

Next: 1
Next: 4
Next: 7
Next: 9
Sequence complete.
  • Javadoc:throttleFirst(long,TimeUnit)
  • Javadoc:throttleFirst(long,TimeUnit,Scheduler)

18. ThrottleWithTimeout( ) or Debounce( ) —> 只有当Observable在指定的时间后还没有发射数据时,才发射一个数据

Debounce操作符会过滤掉发射速率过快的数据项。

RxJava将这个操作符实现为throttleWithTimeoutdebounce

注意:这个操作符会接着最后一项数据发射原始ObservableonCompleted通知,即使这个通知发生在你指定的时间窗口内(从最后一项数据的发射算起)。也就是说,onCompleted通知不会触发限流。

throtleWithTimeout/debounce的一个变体根据你指定的时间间隔进行限流,时间单位通过TimeUnit参数指定。

这种操作符默认在computation调度器上执行,但是你可以通过第三个参数指定。

示例代码:

Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {if(subscriber.isUnsubscribed()) return;try {for (int i = 0; i < 10; i++) {subscriber.onNext(i);Thread.sleep(i * 100);}subscriber.onCompleted();}catch(Exception e){subscriber.onError(e);}}}).subscribeOn(Schedulers.newThread()).debounce(400, TimeUnit.MILLISECONDS)  .subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {System.out.println("Next:" + integer);}}, new Action1<Throwable>() {@Overridepublic void call(Throwable throwable) {System.out.println("Error:" + throwable.getMessage());}}, new Action0() {@Overridepublic void call() {System.out.println("completed!");}});

输出:

Next:4
Next:5
Next:6
Next:7
Next:8
Next:9
completed!
  • Javadoc:debounce(long,TimeUnit) and throttleWithTimeout(long,TimeUnit)
  • Javadoc:debounce(long,TimeUnit,Scheduler) and throttleWithTimeout(long,TimeUnit,Scheduler)

debounce操作符的一个变体通过对原始Observable的每一项应用一个函数进行限流,这个函数返回一个Observable。如果原始Observable在这个新生成的Observable终止之前发射了另一个数据,debounce会抑制(suppress)这个数据项。

debounce的这个变体默认不在任何特定的调度器上执行。

  • Javadoc:debounce(Func1)

19. Timeout —> 如果在一个指定的时间段后还没发射数据,就发射一个异常

timeout(long,TimeUnit)

如果原始Observable过了指定的一段时长没有发射任何数据,Timeout操作符会以一个onError通知终止这个Observable

第一个变体接受一个时长参数,每当原始Observable发射了一项数据,timeout就启动一个计时器,如果计时器超过了指定指定的时长而原始Observable没有发射另一项数据,timeout就抛出TimeoutException,以一个错误通知终止Observable

这个timeout默认在computation调度器上执行,你可以通过参数指定其它的调度器。

示例代码:

Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {if(subscriber.isUnsubscribed()) return;try {for (int i = 0; i < 10; i++) {subscriber.onNext(i);Thread.sleep(i*100);}subscriber.onCompleted();}catch(Exception e){subscriber.onError(e);}}}).subscribeOn(Schedulers.newThread()).timeout(300,TimeUnit.MILLISECONDS).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {System.out.println("Next:" + integer);}}, new Action1<Throwable>() {@Overridepublic void call(Throwable throwable) {System.out.println("Error:" + throwable.getMessage());}}, new Action0() {@Overridepublic void call() {System.out.println("completed!");}});

输出:

Next:0
Next:1
Next:2
Next:3
Next:4
Error:null
  • Javadoc:timeout(long,TimeUnit)
  • Javadoc:timeout(long,TimeUnit,Scheduler)

timeout(long,TimeUnit,Observable)

这个版本的timeout在超时时会切换到使用一个你指定的备用的Observable,而不是发错误通知。它也默认在computation调度器上执行。

示例代码:

Observable.create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {if(subscriber.isUnsubscribed()) return;try {for (int i = 0; i < 10; i++) {subscriber.onNext(i);Thread.sleep(i*100);}subscriber.onCompleted();}catch(Exception e){subscriber.onError(e);}}}).subscribeOn(Schedulers.newThread()).timeout(300,TimeUnit.MILLISECONDS,Observable.just(555)).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {System.out.println("Next:" + integer);}}, new Action1<Throwable>() {@Overridepublic void call(Throwable throwable) {System.out.println("Error:" + throwable.getMessage());}}, new Action0() {@Overridepublic void call() {System.out.println("completed!");}});

输出:

Next:0
Next:1
Next:2
Next:3
Next:555
completed!
  • Javadoc:timeout(long,TimeUnit,Observable)
  • Javadoc:timeout(long,TimeUnit,Observable,Scheduler)

timeout(Func1)


这个版本的timeout使用一个函数针对原始Observable的每一项返回一个Observable,如果当这个Observable终止时原始Observable还没有发射另一项数据,就会认为是超时了,timeout就抛出TimeoutException,以一个错误通知终止Observable

这个timeout默认在immediate调度器上执行。

  • Javadoc:timeout(Func1)

这个版本的timeout同时指定超时时长和备用的Observable。它默认在immediate调度器上执行。

  • Javadoc:timeout(Func1,Observable)

这个版本的time除了给每一项设置超时,还可以单独给第一项设置一个超时。它默认在immediate调度器上执行。

  • Javadoc:timeout(Func0,Func1)

同上,但是同时可以指定一个备用的Observable。它默认在immediate调度器上执行。

  • Javadoc:timeout(Func0,Func1,Observable)

20. IgnoreElements —> 丢弃所有的正常数据,只发射错误或完成通知

不发射任何数据,只发射Observable的终止通知

IgnoreElements操作符抑制原始Observable发射的所有数据,只允许它的终止通知(onErroronCompleted)通过。

如果你不关心一个Observable发射的数据,但是希望在它完成时或遇到错误终止时收到通知,你可以对Observable使用ignoreElements操作符,它会确保永远不会调用观察者的onNext()方法。

RxJava将这个操作符实现为ignoreElements

示例代码:

Observable.just(1,2,3,4,5,6,7,8).ignoreElements().subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {System.out.println("Next: " + item);}@Overridepublic void onError(Throwable error) {System.err.println("Error: " + error.getMessage());}@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}});

输出:

Sequence complete.
  • Javadoc:ignoreElements()

RxJava 学习笔记(七) --- Filtering 过滤操作相关推荐

  1. Mr.J-- jQuery学习笔记(七)--CSS类操作文本值操作

    不了解属性以及属性操作的同学可以看我之前的博客:Mr.J-- jQuery学习笔记(五)--属性及属性节点 下面demo 中btn用到的角标,之前写验证码动态强度测试时也用过同样方法:Mr.J--密码 ...

  2. RxJava 学习笔记(八) --- Combining 结合操作

    @(Rxjava学习笔记) RxJava 学习笔记(八) - Combining 结合操作 RxJava 学习笔记八 Combining 结合操作 StartWith 在数据序列的开头插入一条指定的项 ...

  3. C# 学习笔记(18)操作SQL Server 中

    C# 学习笔记(18)操作SQL Server 中 数据库基础操作 SQL语法可以参考 菜鸟教程 或者微软官方的SQL示例 注意SQL不区分大小写 查 1.基础查询 --最基础的查询语句, selec ...

  4. Java中expecial,RxJava 学习笔记 (一)

    作者: 一字马胡 转载标志 [2017-12-13] 更新日志 日期 更新内容 备注 2017-12-13 RxJava学习笔记系列 系列笔记 (一) 2017-12-15 增加系列笔记(二) 201 ...

  5. 吴恩达《机器学习》学习笔记七——逻辑回归(二分类)代码

    吴恩达<机器学习>学习笔记七--逻辑回归(二分类)代码 一.无正则项的逻辑回归 1.问题描述 2.导入模块 3.准备数据 4.假设函数 5.代价函数 6.梯度下降 7.拟合参数 8.用训练 ...

  6. websocket 获取连接id_Swoole学习笔记七:搭建WebSocket长连接 之 使用 USER_ID 作为身份凭证...

    Swoole学习笔记七:搭建WebSocket长连接 之 使用 USER_ID 作为身份凭证 2年前 阅读 3678 评论 0 喜欢 0 ### 0.前言 前面基本的WebSocket操作,我们基本都 ...

  7. python复制指定字符串_python3.4学习笔记(十五) 字符串操作(string替换、删除、截取、复制、连接、比较、查找、包含、大小写转换、分割等)...

    python3.4学习笔记(十五) 字符串操作(string替换.删除.截取.复制.连接.比较.查找.包含.大小写转换.分割等) python print 不换行(在后面加上,end=''),prin ...

  8. window的dos命令学习笔记 七

    文章目录 一.dos历史学习笔记(后期整合到这里,我想能学到这里的应该不多了,嘿嘿,加油) 二.执行状态返回值(`%errorlevel%`,和shell中`$?`相似): 三.视窗 1.color ...

  9. 逆向脱壳破解分析基础学习笔记七 堆栈图(重点)

    本文为本人 大神论坛 逆向破解脱壳学习笔记之一,为本人对以往所学的回顾和总结,可能会有谬误之处,欢迎大家指出. 陆续将不断有笔记放出,希望能对想要入门的萌新有所帮助,一起进步 堆栈图 首先给定一段反汇 ...

  10. 2022Java学习笔记七十三(异常处理:运行时异常、编译时异常、异常的默认处理的流程)

    2022Java学习笔记七十三(异常处理:运行时异常.编译时异常.异常的默认处理的流程) 一.异常体系 1.Exception:java.lang包下,称为异常类,它表示程序本身可以处理的问题 2.R ...

最新文章

  1. 基于Struts2框架的名片管理系统
  2. SQL with NUll处理,Join系列,between,in对比exists以及少量题目
  3. JQuery动态执行javascript代码的方法
  4. 求int在二进制存储时1的个数(C++)
  5. kotlin dsl_Spring Webflux – Kotlin DSL –实现的演练
  6. boat启动器 minecraft_minecraft boat
  7. django filter查询多选_动态filter查询数据Django实现方法
  8. Visio画图--我的形状
  9. 牛津美女硕士放弃百万年薪,用废弃物做轻奢包,马斯克妈妈也来捧场
  10. NSX发布Guest Introspection虚拟机时,主机报错的解决方法
  11. IT技术中的言情小说
  12. 眼图在通信系统中有什么意义_KT124煤矿调度通信系统和传统调度系统相比有什么优势...
  13. mongodb 的 GridFS 详细分析(二)
  14. BT601、BT656和BT.709、BT1120
  15. 西门子plc cpu228 4路模拟量输入 2路模拟量输出
  16. Rtmp协议看一篇就够了
  17. 性能优化实战-sql递归查询效率低下
  18. java调用ip138实现ip地址查询
  19. 双冠!网易互娱AI Lab包揽NTIRE 2022高动态范围成像(HDR)两项冠军
  20. imx8mqevk OPTEE 全系统构建

热门文章

  1. C语言入门实战(11):输入一组正整数,求逆序数的和
  2. 如何制作3D动画人物
  3. JSP+Servlet技术实现分页 首页 下一页 每一页显示10条页码 下一页 尾页 第页/共页 (利用PageBean实现)
  4. 推荐 10 款 C++ 在线编译器
  5. WEB CTF入门题解析
  6. 以太网没有有效的IP配置
  7. 韩国28岁自由职业者生活曝光,引40万人围观:自律的人生,到底有多爽?
  8. 变分法证明两点之间线段最短
  9. Transaction marked as rollbackOnly
  10. c计算机怎么读音发音英语,CACD是什么意思