相关资源

RxJava文档:https://github.com/ReactiveX/RxJava/wiki
RxJava中文文档:https://mcxiaoke.gitbooks.io/rxdocs/content/
RxJava经典资料:https://github.com/lzyzsd/Awesome-RxJava

ReactiveX:https://reactivex.io/

响应式编程概述

响应式编程?是一种基于异步数据流概念的编程模式

关键概念:事件

适用场景:UI(通用)

RxJava

  • 异步数据处理库
  • 扩展的观察者模式


RxJava与观察者模式

观察者模式的四大要素:

  • Observable:被观察者
  • Observer:观察者
  • Subscribe:订阅
  • Event:事件


入门示例

导入RxJava:

/*
// https://mvnrepository.com/artifact/io.reactivex.rxjava3/rxjava
implementation group: 'io.reactivex.rxjava3', name: 'rxjava', version: '3.0.11'// https://mvnrepository.com/artifact/io.reactivex.rxjava2/rxjava
implementation group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.2.21'
*/// 使用课程中的老版本:
// https://mvnrepository.com/artifact/io.reactivex/rxjava
implementation group: 'io.reactivex', name: 'rxjava', version: '1.3.8'

Hellowrld示例:

package top.onefine.rxjava;import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Subscriber;@Slf4j
public class HelloWorld {public static void main(String[] args) {// 1. 创建被观察者Observable<String> observable = Observable.create(subscriber -> {subscriber.onNext("Hello world.");throw new NullPointerException("Throw a Exception...");
//            subscriber.onCompleted();});// 2. 创建观察者Subscriber<String> subscriber = new Subscriber<String>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(String s) {log.info("onNext: {}", s);}};// 3. 订阅事件observable.subscribe(subscriber);}
}

输出:

11:27:08.971 [main] INFO top.onefine.rxjava.HelloWorld - onNext: Hello world.
11:27:08.974 [main] INFO top.onefine.rxjava.HelloWorld - onError...

操作符分类

Creating Observables(创建Observable)


示例:

package top.onefine.rxjava;import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Subscriber;@Slf4j
public class HelloWorld {public static void main(String[] args) {//        create_demo();
//        just_demo();
//        from_demo();
//        defer_demo();
//        range_demo();repeat_demo();}/*14:16:59.385 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 114:16:59.387 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 214:16:59.387 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 314:16:59.390 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 114:16:59.390 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 214:16:59.390 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 314:16:59.390 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void repeat_demo() {Observable.range(1, 3).repeat(2).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer arg) {log.info("onNext: {}", arg);}});}/*14:15:30.089 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 114:15:30.092 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 214:15:30.092 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 314:15:30.092 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 414:15:30.092 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 514:15:30.092 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void range_demo() {Observable.range(1, 5).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer arg) {log.info("onNext: {}", arg);}});}/*14:11:17.146 [main] INFO top.onefine.rxjava.HelloWorld - onNext: Hello World.14:11:17.149 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static String value;private static void defer_demo() {Observable<String> observable = Observable.defer(() -> Observable.just(value));value = "Hello World.";observable.subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(String s) {log.info("onNext: {}", s);}});}/*11:49:42.936 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 111:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 211:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 311:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 411:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 511:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 611:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void from_demo() {// 文档:https://reactivex.io/documentation/operators/from.htmlObservable.from(new Integer[]{1, 2, 3, 4, 5, 6}).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer arg) {log.info("onNext: {}", arg);}});}/*11:43:49.006 [main] INFO top.onefine.rxjava.HelloWorld - onNext: RxJava学习...11:43:49.008 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void just_demo() {Observable.just("RxJava学习...").subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(String s) {log.info("onNext: {}", s);}});}/*11:44:26.187 [main] INFO top.onefine.rxjava.HelloWorld - onNext: RxJava学习...*/private static void create_demo() {Observable.create((Observable.OnSubscribe<String>) subscriber -> subscriber.onNext("RxJava学习...")).subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(String s) {log.info("onNext: {}", s);}});}
}

Transforming Observables(转换Obervable)


示例:

package top.onefine.rxjava;import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.observables.GroupedObservable;import java.util.List;@Slf4j
public class HelloWorld {public static void main(String[] args) {//        map_demo();
//        flatMap_demo();
//        groupBy_demo();
//        buffer_demo();scan_demo();}/*14:54:20.924 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 114:54:20.927 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 314:54:20.927 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 614:54:20.927 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1014:54:20.927 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1514:54:20.927 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void scan_demo() {Observable.range(1, 5).scan(Integer::sum).subscribe(new Observer<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*14:50:27.586 [main] INFO top.onefine.rxjava.HelloWorld - onNext: [1, 2]14:50:27.588 [main] INFO top.onefine.rxjava.HelloWorld - onNext: [3, 4]14:50:27.588 [main] INFO top.onefine.rxjava.HelloWorld - onNext: [5]14:50:27.588 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void buffer_demo() {Observable.range(1, 5).buffer(2).subscribe(new Observer<List<Integer>>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(List<Integer> i) {log.info("onNext: {}", i);}});}/*14:40:24.261 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1 ...114:40:24.263 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2 ...014:40:24.263 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3 ...114:40:24.263 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 4 ...014:40:24.263 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 5 ...114:40:24.264 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...014:40:24.264 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...114:40:24.264 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void groupBy_demo() {Observable.just(1, 2, 3, 4, 5).groupBy(arg -> String.valueOf(arg % 2)).subscribe(new Observer<GroupedObservable<String, Integer>>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(GroupedObservable<String, Integer> result) {result.subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...{}", result.getKey());}@Overridepublic void onError(Throwable e) {log.info("onError...{}", result.getKey());}@Overridepublic void onNext(Integer i) {log.info("onNext: {} ...{}", i, result.getKey());}});}});}/*14:30:30.782 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 1 fine.14:30:30.784 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 2 fine.14:30:30.784 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 3 fine.14:30:30.784 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 4 fine.14:30:30.784 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 5 fine.14:30:30.784 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void flatMap_demo() {Observable.just(1, 2, 3, 4, 5).flatMap(arg -> Observable.just("one " + arg + " fine.")).subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(String s) {log.info("onNext: {}", s);}});}/*14:24:51.902 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 123 fine14:24:51.904 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void map_demo() {Observable.just(123).map(arg -> "one " + arg + " fine").subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(String s) {log.info("onNext: {}", s);}});}}

Filtering Observables(过滤Observable)


示例:

package top.onefine.rxjava;import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Subscriber;import java.util.concurrent.TimeUnit;@Slf4j
public class HelloWorld {public static void main(String[] args) {//        debounce_demo();
//        distinct_demo();
//        elementAt_demo();
//        filter_demo();
//        first_demo();
//        ignoreElements_demo();
//        last_demo();
//        sample_demo();
//        skip_demo();
//        skipLast_demo();
//        take_demo();takeLast_demo();}/*15:32:23.691 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 315:32:23.695 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 415:32:23.695 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void takeLast_demo() {Observable.just(1, 2, 3, 4).takeLast(2).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:31:39.259 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 115:31:39.261 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 215:31:39.262 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void take_demo() {Observable.just(1, 2, 3, 4).take(2).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:30:16.202 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 115:30:16.205 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 215:30:16.205 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void skipLast_demo() {Observable.just(1, 2, 3, 4).skipLast(2).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:29:14.802 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 315:29:14.805 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 415:29:14.805 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void skip_demo() {Observable.just(1, 2, 3, 4).skip(2).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:26:59.758 [RxComputationScheduler-1] INFO top.onefine.rxjava.HelloWorld - onNext: 215:27:03.757 [RxComputationScheduler-1] INFO top.onefine.rxjava.HelloWorld - onNext: 615:27:05.808 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 915:27:05.808 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void sample_demo() {Observable.create((Observable.OnSubscribe<Integer>) arg -> {try {for (int i = 0; i < 10; i++) {Thread.sleep(1000);arg.onNext(i);}arg.onCompleted();} catch (InterruptedException e) {e.printStackTrace();arg.onError(e);}}).sample(4, TimeUnit.SECONDS).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:24:49.731 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 315:24:49.734 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void last_demo() {Observable.just(1, 2, 3, 2, 3).last().subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:23:42.135 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void ignoreElements_demo() {Observable.just(123).ignoreElements().subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:18:04.901 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 115:18:04.904 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void first_demo() {Observable.just(1, 2, 3, 2, 3).distinct().first().subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:16:14.458 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 315:16:14.461 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void filter_demo() {Observable.just(1, 2, 3, 2, 3).distinct().filter(i -> i > 2).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:14:13.951 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 215:14:13.954 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void elementAt_demo() {Observable.just(1, 2, 3, 2, 3, 4, 5, 1, 6).elementAt(3).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:12:27.291 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 115:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 215:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 315:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 415:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 515:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 615:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void distinct_demo() {Observable.just(1, 2, 3, 2, 3, 4, 5, 1, 6).distinct().subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:06:56.929 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 915:06:56.932 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void debounce_demo() {Observable.create((Observable.OnSubscribe<Integer>) arg -> {try {for (int i = 0; i < 10; i++) {Thread.sleep(1000);arg.onNext(i);}arg.onCompleted();} catch (InterruptedException e) {e.printStackTrace();}}).debounce(1, TimeUnit.SECONDS).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}
}

Combining Observables(组合Observable)


示例:

package top.onefine.rxjava;import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Subscriber;@Slf4j
public class HelloWorld {public static void main(String[] args) {//        zip_demo();
//        merge_demo();
//        startWith_demo();combineLatest_demo();}/*16:05:38.919 [main] INFO top.onefine.rxjava.HelloWorld - i1:3, i2:416:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 716:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - i1:3, i2:516:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 816:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - i1:3, i2:616:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 916:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - i1:3, i2:716:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1016:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void combineLatest_demo() {Observable<Integer> observable1 = Observable.just(1, 2, 3);Observable<Integer> observable2 = Observable.just(4, 5, 6, 7);Observable.combineLatest(observable1, observable2, (i1, i2) -> {log.info("i1:{}, i2:{}", i1, i2);return i1 + i2;}).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*16:00:48.097 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 416:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 516:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 616:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 716:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 116:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 216:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 316:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void startWith_demo() {Observable<Integer> observable1 = Observable.just(1, 2, 3);Observable<Integer> observable2 = Observable.just(4, 5, 6, 7);observable1.startWith(observable2).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:58:26.334 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1015:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2015:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3015:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 415:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 815:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1215:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1615:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void merge_demo() {Observable<Integer> observable1 = Observable.just(10, 20, 30);Observable<Integer> observable2 = Observable.just(4, 8, 12, 16);Observable.merge(observable1, observable2).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:49:52.520 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1415:49:52.523 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2815:49:52.523 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 4215:49:52.523 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void zip_demo() {// 用来合并两个Observable发射的数据源,根据Func2函数生成一个新的值并发射出去。// 当其中一个Observable发送数据结束或者出现异常后,另一个Observable也将停止发射数据。Observable<Integer> observable1 = Observable.just(10, 20, 30);Observable<Integer> observable2 = Observable.just(4, 8, 12, 16);Observable.zip(observable1, observable2, Integer::sum).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}}

Error Handling Operators(处理错误)

Java响应式(反应式)编程——RxJava相关推荐

  1. 响应式编程RxJava (一)

    1.什么是RxJava? 1.1什么是响应式编程? 是一种基于异步数据流概念的编程模式(异步数据流编程) 数据流 ->河流(被观测.被过滤.被操作) 1.2响应式编程的设计原则是: 保持数据的不 ...

  2. Java响应式的框架和工具包:RxJava、Spring Reactor 和 Vert.x 概述

    今天,人们想要具有强大用户体验的高响应.交互式应用程序,这通常意味着处理异步性,尤其是当应用程序涉及高负载.实时数据和多用户时. 由于 Java 是一种面向对象的语言,本质上支持命令式编程风格,因此异 ...

  3. Java:理解java响应式编程

    原文:Understanding reactive programming in Java https://nullbeans.com/understanding-reactive-programmi ...

  4. 阿里专家杜万:Java响应式编程,一文全面解读

    本篇文章来自于2018年12月22日举办的<阿里云栖开发者沙龙-Java技术专场>,杜万专家是该专场第四位演讲的嘉宾,本篇文章是根据杜万专家在<阿里云栖开发者沙龙-Java技术专场& ...

  5. java响应式交友网站计算机毕业设计MyBatis+系统+LW文档+源码+调试部署

    java响应式交友网站计算机毕业设计MyBatis+系统+LW文档+源码+调试部署 java响应式交友网站计算机毕业设计MyBatis+系统+LW文档+源码+调试部署 本源码技术栈: 项目架构:B/S ...

  6. Java响应式流框架Reactor中的Mono和Flux

    1.响应流的特点 先说一下响应流规范,它是响应式编程的基石,他具有以下特点: 响应流必须是无阻塞的. 响应流必须是一个数据流. 它必须可以异步执行. 并且它也应该能够处理背压. 2.Publisher ...

  7. 基于JAVA响应式交友网站计算机毕业设计源码+数据库+lw文档+系统+部署

    基于JAVA响应式交友网站计算机毕业设计源码+数据库+lw文档+系统+部署 基于JAVA响应式交友网站计算机毕业设计源码+数据库+lw文档+系统+部署 本源码技术栈: 项目架构:B/S架构 开发语言: ...

  8. java中驼峰编码,驼峰式命名法_小驼峰式命名法编程_java中getter和setter

    人们交流靠各种语言,每行都有每行的所谓的"行话".程序员也不例外,众所周知,程序员都是用代码进行交流的.那么除了在代码中的注释之外, 程序员如何读懂别人的程序呢? 当然,程序员之间 ...

  9. Redux中的功能式React式编程简介

    by Bhuvan Malik 通过布凡·马利克(Bhuvan Malik) Redux中的功能式React式编程简介 (An introduction to functional Reactive ...

  10. 流式低代码编程,拖拽节点画流程图并运行

    介绍 今天给大家推荐一款开源项目 flow-eda,后端采用java语言开发,前端有vue3和react两个版本可供选择.主要功能是采用拖拽的形式来完成业务开发,使用对应的功能节点来处理业务,简单.方 ...

最新文章

  1. 2022-2028年中国pu管行业市场深度分析及市场规模预测报告
  2. [ CodeVS冲杯之路 ] P1116
  3. Linux下日志文件过大解决方案
  4. Stylus插件开发教程
  5. python函数的两种嵌套方法
  6. 50条超精辟的经典语录:哗众,可以取宠,也可以失宠!
  7. 浅谈多线程——NSThread
  8. 【计算机网络实验·北航】实验一:网络实验入门(1)
  9. 去广州见了我大学老师标哥
  10. Android开发中目前流行控件和知识点总结
  11. mysql cluster 宕机 恢复_mysql cluster 集群恢复不起来,还请大神赐教?报错-问答-阿里云开发者社区-阿里云...
  12. MonoCSharp Evaluator Extension
  13. 问答WAP版重新改版上线
  14. java http远程调用接口下载文件
  15. 吉客云与金蝶云星空集成方案(吉客云主管库存)
  16. Android实战【仿探探陌生社交APP】
  17. 【浏览器书签】浏览器书签解析,导入
  18. Semantic UI 之 标签 label
  19. android studio json数据解析汇总(备忘)
  20. xmind打不开java_XMind 无法打开/保存文件

热门文章

  1. 一直在说css3,你真的会用css3吗?一文带你快速上手
  2. 什么是教育大数据?| 教育热点更新
  3. 波浪形状html,使用CSS3线性渐变(linear-gradient)实现文本波浪线效果_html/css_WEB-ITnose...
  4. Python数模笔记-Sklearn(3)主成分分析
  5. miniui列表下拉允许编辑且保存_如何使用Photoshop 15浏览照片编辑器
  6. 弹跳式ppt模板动态目录怎么制作?
  7. uniapp+Vue3+Vite+ts+pinia
  8. 《基于区块链与IPFS的视频版权存证系统》专栏简介
  9. 检查自己的设备是否支持GPU
  10. Python安装Anaconda集成环境