Java响应式(反应式)编程——RxJava
相关资源
RxJava文档:https://github.com/ReactiveX/RxJava/wiki
RxJava中文文档:https://mcxiaoke.gitbooks.io/rxdocs/content/
RxJava经典资料:https://github.com/lzyzsd/Awesome-RxJava
ReactiveX:https://reactivex.io/
响应式编程概述
响应式编程?是一种基于异步数据流概念的编程模式
关键概念:事件
适用场景:UI(通用)
RxJava
- 异步数据处理库
- 扩展的观察者模式
RxJava与观察者模式
观察者模式的四大要素:
- Observable:被观察者
- Observer:观察者
- Subscribe:订阅
- Event:事件
入门示例
导入RxJava:
/*
// https://mvnrepository.com/artifact/io.reactivex.rxjava3/rxjava
implementation group: 'io.reactivex.rxjava3', name: 'rxjava', version: '3.0.11'// https://mvnrepository.com/artifact/io.reactivex.rxjava2/rxjava
implementation group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.2.21'
*/// 使用课程中的老版本:
// https://mvnrepository.com/artifact/io.reactivex/rxjava
implementation group: 'io.reactivex', name: 'rxjava', version: '1.3.8'
Hellowrld示例:
package top.onefine.rxjava;import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Subscriber;@Slf4j
public class HelloWorld {public static void main(String[] args) {// 1. 创建被观察者Observable<String> observable = Observable.create(subscriber -> {subscriber.onNext("Hello world.");throw new NullPointerException("Throw a Exception...");
// subscriber.onCompleted();});// 2. 创建观察者Subscriber<String> subscriber = new Subscriber<String>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(String s) {log.info("onNext: {}", s);}};// 3. 订阅事件observable.subscribe(subscriber);}
}
输出:
11:27:08.971 [main] INFO top.onefine.rxjava.HelloWorld - onNext: Hello world.
11:27:08.974 [main] INFO top.onefine.rxjava.HelloWorld - onError...
操作符分类
Creating Observables(创建Observable)
示例:
package top.onefine.rxjava;import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Subscriber;@Slf4j
public class HelloWorld {public static void main(String[] args) {// create_demo();
// just_demo();
// from_demo();
// defer_demo();
// range_demo();repeat_demo();}/*14:16:59.385 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 114:16:59.387 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 214:16:59.387 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 314:16:59.390 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 114:16:59.390 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 214:16:59.390 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 314:16:59.390 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void repeat_demo() {Observable.range(1, 3).repeat(2).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer arg) {log.info("onNext: {}", arg);}});}/*14:15:30.089 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 114:15:30.092 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 214:15:30.092 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 314:15:30.092 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 414:15:30.092 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 514:15:30.092 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void range_demo() {Observable.range(1, 5).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer arg) {log.info("onNext: {}", arg);}});}/*14:11:17.146 [main] INFO top.onefine.rxjava.HelloWorld - onNext: Hello World.14:11:17.149 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static String value;private static void defer_demo() {Observable<String> observable = Observable.defer(() -> Observable.just(value));value = "Hello World.";observable.subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(String s) {log.info("onNext: {}", s);}});}/*11:49:42.936 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 111:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 211:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 311:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 411:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 511:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 611:49:42.939 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void from_demo() {// 文档:https://reactivex.io/documentation/operators/from.htmlObservable.from(new Integer[]{1, 2, 3, 4, 5, 6}).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer arg) {log.info("onNext: {}", arg);}});}/*11:43:49.006 [main] INFO top.onefine.rxjava.HelloWorld - onNext: RxJava学习...11:43:49.008 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void just_demo() {Observable.just("RxJava学习...").subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(String s) {log.info("onNext: {}", s);}});}/*11:44:26.187 [main] INFO top.onefine.rxjava.HelloWorld - onNext: RxJava学习...*/private static void create_demo() {Observable.create((Observable.OnSubscribe<String>) subscriber -> subscriber.onNext("RxJava学习...")).subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(String s) {log.info("onNext: {}", s);}});}
}
Transforming Observables(转换Obervable)
示例:
package top.onefine.rxjava;import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.observables.GroupedObservable;import java.util.List;@Slf4j
public class HelloWorld {public static void main(String[] args) {// map_demo();
// flatMap_demo();
// groupBy_demo();
// buffer_demo();scan_demo();}/*14:54:20.924 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 114:54:20.927 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 314:54:20.927 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 614:54:20.927 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1014:54:20.927 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1514:54:20.927 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void scan_demo() {Observable.range(1, 5).scan(Integer::sum).subscribe(new Observer<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*14:50:27.586 [main] INFO top.onefine.rxjava.HelloWorld - onNext: [1, 2]14:50:27.588 [main] INFO top.onefine.rxjava.HelloWorld - onNext: [3, 4]14:50:27.588 [main] INFO top.onefine.rxjava.HelloWorld - onNext: [5]14:50:27.588 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void buffer_demo() {Observable.range(1, 5).buffer(2).subscribe(new Observer<List<Integer>>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(List<Integer> i) {log.info("onNext: {}", i);}});}/*14:40:24.261 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1 ...114:40:24.263 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2 ...014:40:24.263 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3 ...114:40:24.263 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 4 ...014:40:24.263 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 5 ...114:40:24.264 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...014:40:24.264 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...114:40:24.264 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void groupBy_demo() {Observable.just(1, 2, 3, 4, 5).groupBy(arg -> String.valueOf(arg % 2)).subscribe(new Observer<GroupedObservable<String, Integer>>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(GroupedObservable<String, Integer> result) {result.subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...{}", result.getKey());}@Overridepublic void onError(Throwable e) {log.info("onError...{}", result.getKey());}@Overridepublic void onNext(Integer i) {log.info("onNext: {} ...{}", i, result.getKey());}});}});}/*14:30:30.782 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 1 fine.14:30:30.784 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 2 fine.14:30:30.784 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 3 fine.14:30:30.784 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 4 fine.14:30:30.784 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 5 fine.14:30:30.784 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void flatMap_demo() {Observable.just(1, 2, 3, 4, 5).flatMap(arg -> Observable.just("one " + arg + " fine.")).subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(String s) {log.info("onNext: {}", s);}});}/*14:24:51.902 [main] INFO top.onefine.rxjava.HelloWorld - onNext: one 123 fine14:24:51.904 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void map_demo() {Observable.just(123).map(arg -> "one " + arg + " fine").subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(String s) {log.info("onNext: {}", s);}});}}
Filtering Observables(过滤Observable)
示例:
package top.onefine.rxjava;import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Subscriber;import java.util.concurrent.TimeUnit;@Slf4j
public class HelloWorld {public static void main(String[] args) {// debounce_demo();
// distinct_demo();
// elementAt_demo();
// filter_demo();
// first_demo();
// ignoreElements_demo();
// last_demo();
// sample_demo();
// skip_demo();
// skipLast_demo();
// take_demo();takeLast_demo();}/*15:32:23.691 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 315:32:23.695 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 415:32:23.695 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void takeLast_demo() {Observable.just(1, 2, 3, 4).takeLast(2).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:31:39.259 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 115:31:39.261 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 215:31:39.262 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void take_demo() {Observable.just(1, 2, 3, 4).take(2).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:30:16.202 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 115:30:16.205 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 215:30:16.205 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void skipLast_demo() {Observable.just(1, 2, 3, 4).skipLast(2).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:29:14.802 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 315:29:14.805 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 415:29:14.805 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void skip_demo() {Observable.just(1, 2, 3, 4).skip(2).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:26:59.758 [RxComputationScheduler-1] INFO top.onefine.rxjava.HelloWorld - onNext: 215:27:03.757 [RxComputationScheduler-1] INFO top.onefine.rxjava.HelloWorld - onNext: 615:27:05.808 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 915:27:05.808 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void sample_demo() {Observable.create((Observable.OnSubscribe<Integer>) arg -> {try {for (int i = 0; i < 10; i++) {Thread.sleep(1000);arg.onNext(i);}arg.onCompleted();} catch (InterruptedException e) {e.printStackTrace();arg.onError(e);}}).sample(4, TimeUnit.SECONDS).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:24:49.731 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 315:24:49.734 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void last_demo() {Observable.just(1, 2, 3, 2, 3).last().subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:23:42.135 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void ignoreElements_demo() {Observable.just(123).ignoreElements().subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:18:04.901 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 115:18:04.904 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void first_demo() {Observable.just(1, 2, 3, 2, 3).distinct().first().subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:16:14.458 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 315:16:14.461 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void filter_demo() {Observable.just(1, 2, 3, 2, 3).distinct().filter(i -> i > 2).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:14:13.951 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 215:14:13.954 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void elementAt_demo() {Observable.just(1, 2, 3, 2, 3, 4, 5, 1, 6).elementAt(3).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:12:27.291 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 115:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 215:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 315:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 415:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 515:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 615:12:27.294 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void distinct_demo() {Observable.just(1, 2, 3, 2, 3, 4, 5, 1, 6).distinct().subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:06:56.929 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 915:06:56.932 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void debounce_demo() {Observable.create((Observable.OnSubscribe<Integer>) arg -> {try {for (int i = 0; i < 10; i++) {Thread.sleep(1000);arg.onNext(i);}arg.onCompleted();} catch (InterruptedException e) {e.printStackTrace();}}).debounce(1, TimeUnit.SECONDS).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}
}
Combining Observables(组合Observable)
示例:
package top.onefine.rxjava;import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.Subscriber;@Slf4j
public class HelloWorld {public static void main(String[] args) {// zip_demo();
// merge_demo();
// startWith_demo();combineLatest_demo();}/*16:05:38.919 [main] INFO top.onefine.rxjava.HelloWorld - i1:3, i2:416:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 716:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - i1:3, i2:516:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 816:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - i1:3, i2:616:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 916:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - i1:3, i2:716:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1016:05:38.922 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void combineLatest_demo() {Observable<Integer> observable1 = Observable.just(1, 2, 3);Observable<Integer> observable2 = Observable.just(4, 5, 6, 7);Observable.combineLatest(observable1, observable2, (i1, i2) -> {log.info("i1:{}, i2:{}", i1, i2);return i1 + i2;}).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*16:00:48.097 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 416:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 516:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 616:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 716:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 116:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 216:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 316:00:48.100 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void startWith_demo() {Observable<Integer> observable1 = Observable.just(1, 2, 3);Observable<Integer> observable2 = Observable.just(4, 5, 6, 7);observable1.startWith(observable2).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:58:26.334 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1015:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2015:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 3015:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 415:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 815:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1215:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1615:58:26.337 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void merge_demo() {Observable<Integer> observable1 = Observable.just(10, 20, 30);Observable<Integer> observable2 = Observable.just(4, 8, 12, 16);Observable.merge(observable1, observable2).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}/*15:49:52.520 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 1415:49:52.523 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 2815:49:52.523 [main] INFO top.onefine.rxjava.HelloWorld - onNext: 4215:49:52.523 [main] INFO top.onefine.rxjava.HelloWorld - onCompleted...*/private static void zip_demo() {// 用来合并两个Observable发射的数据源,根据Func2函数生成一个新的值并发射出去。// 当其中一个Observable发送数据结束或者出现异常后,另一个Observable也将停止发射数据。Observable<Integer> observable1 = Observable.just(10, 20, 30);Observable<Integer> observable2 = Observable.just(4, 8, 12, 16);Observable.zip(observable1, observable2, Integer::sum).subscribe(new Subscriber<Integer>() {@Overridepublic void onCompleted() {log.info("onCompleted...");}@Overridepublic void onError(Throwable e) {log.info("onError...");}@Overridepublic void onNext(Integer i) {log.info("onNext: {}", i);}});}}
Error Handling Operators(处理错误)
Java响应式(反应式)编程——RxJava相关推荐
- 响应式编程RxJava (一)
1.什么是RxJava? 1.1什么是响应式编程? 是一种基于异步数据流概念的编程模式(异步数据流编程) 数据流 ->河流(被观测.被过滤.被操作) 1.2响应式编程的设计原则是: 保持数据的不 ...
- Java响应式的框架和工具包:RxJava、Spring Reactor 和 Vert.x 概述
今天,人们想要具有强大用户体验的高响应.交互式应用程序,这通常意味着处理异步性,尤其是当应用程序涉及高负载.实时数据和多用户时. 由于 Java 是一种面向对象的语言,本质上支持命令式编程风格,因此异 ...
- Java:理解java响应式编程
原文:Understanding reactive programming in Java https://nullbeans.com/understanding-reactive-programmi ...
- 阿里专家杜万:Java响应式编程,一文全面解读
本篇文章来自于2018年12月22日举办的<阿里云栖开发者沙龙-Java技术专场>,杜万专家是该专场第四位演讲的嘉宾,本篇文章是根据杜万专家在<阿里云栖开发者沙龙-Java技术专场& ...
- java响应式交友网站计算机毕业设计MyBatis+系统+LW文档+源码+调试部署
java响应式交友网站计算机毕业设计MyBatis+系统+LW文档+源码+调试部署 java响应式交友网站计算机毕业设计MyBatis+系统+LW文档+源码+调试部署 本源码技术栈: 项目架构:B/S ...
- Java响应式流框架Reactor中的Mono和Flux
1.响应流的特点 先说一下响应流规范,它是响应式编程的基石,他具有以下特点: 响应流必须是无阻塞的. 响应流必须是一个数据流. 它必须可以异步执行. 并且它也应该能够处理背压. 2.Publisher ...
- 基于JAVA响应式交友网站计算机毕业设计源码+数据库+lw文档+系统+部署
基于JAVA响应式交友网站计算机毕业设计源码+数据库+lw文档+系统+部署 基于JAVA响应式交友网站计算机毕业设计源码+数据库+lw文档+系统+部署 本源码技术栈: 项目架构:B/S架构 开发语言: ...
- java中驼峰编码,驼峰式命名法_小驼峰式命名法编程_java中getter和setter
人们交流靠各种语言,每行都有每行的所谓的"行话".程序员也不例外,众所周知,程序员都是用代码进行交流的.那么除了在代码中的注释之外, 程序员如何读懂别人的程序呢? 当然,程序员之间 ...
- Redux中的功能式React式编程简介
by Bhuvan Malik 通过布凡·马利克(Bhuvan Malik) Redux中的功能式React式编程简介 (An introduction to functional Reactive ...
- 流式低代码编程,拖拽节点画流程图并运行
介绍 今天给大家推荐一款开源项目 flow-eda,后端采用java语言开发,前端有vue3和react两个版本可供选择.主要功能是采用拖拽的形式来完成业务开发,使用对应的功能节点来处理业务,简单.方 ...
最新文章
- 2022-2028年中国pu管行业市场深度分析及市场规模预测报告
- [ CodeVS冲杯之路 ] P1116
- Linux下日志文件过大解决方案
- Stylus插件开发教程
- python函数的两种嵌套方法
- 50条超精辟的经典语录:哗众,可以取宠,也可以失宠!
- 浅谈多线程——NSThread
- 【计算机网络实验·北航】实验一:网络实验入门(1)
- 去广州见了我大学老师标哥
- Android开发中目前流行控件和知识点总结
- mysql cluster 宕机 恢复_mysql cluster 集群恢复不起来,还请大神赐教?报错-问答-阿里云开发者社区-阿里云...
- MonoCSharp Evaluator Extension
- 问答WAP版重新改版上线
- java http远程调用接口下载文件
- 吉客云与金蝶云星空集成方案(吉客云主管库存)
- Android实战【仿探探陌生社交APP】
- 【浏览器书签】浏览器书签解析,导入
- Semantic UI 之 标签 label
- android studio json数据解析汇总(备忘)
- xmind打不开java_XMind 无法打开/保存文件
热门文章
- 一直在说css3,你真的会用css3吗?一文带你快速上手
- 什么是教育大数据?| 教育热点更新
- 波浪形状html,使用CSS3线性渐变(linear-gradient)实现文本波浪线效果_html/css_WEB-ITnose...
- Python数模笔记-Sklearn(3)主成分分析
- miniui列表下拉允许编辑且保存_如何使用Photoshop 15浏览照片编辑器
- 弹跳式ppt模板动态目录怎么制作?
- uniapp+Vue3+Vite+ts+pinia
- 《基于区块链与IPFS的视频版权存证系统》专栏简介
- 检查自己的设备是否支持GPU
- Python安装Anaconda集成环境