spring-gateway(一)Reactor编程基础
基础的api:
1 JDK9 Reactive
Publisher: 事件发布者
Subscriber: 事件订阅者
Processor: 事件流转过程中的处理,可包含多个阶段,同时继承了 Publisher和Subscriber两个接口
Subscription: 提供了
request
、cancel
两个方法,用于订阅者和发布者之间的通信,Processor中的subscription通过request方法接收publisher(发布者)submit的事件。订阅者通过subscription通过request方法决定是否消费事件,实现背压back pressue
,不会因为发布者发布消息过快而把订阅者压垮。
下面的使用JDK9提供的接口演示基础用法:
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.SubmissionPublisher;
public class DemoTest {
public static void main(String[] args) throws InterruptedException {// #1 创建发布者SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
// #2 创建处理器Processor<Integer, Integer> processor = new MyProcessor();
// #3 发布者和处理器建立关系publisher.subscribe(processor);
Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {Flow.Subscription subscription;
@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println(">>> subscriber onSubscribe: " + subscription.toString());this.subscription = subscription;
// 请求一个数据this.subscription.request(1);}
@Overridepublic void onNext(Integer item) {try {Thread.sleep(30);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(">>> subscriber onNext 接收到数据:" + item);this.subscription.request(1); // 处理下一个事件// this.subscription.cancel(); // 终止处理// throw new RuntimeException(">>> subscriber throw a exception");}
@Overridepublic void onError(Throwable throwable) {System.out.println(">>> subscriber onError");throwable.printStackTrace();}
@Overridepublic void onComplete() {System.out.println(">>> subscriber onComplete");}};
processor.subscribe(subscriber);
for (int i = 0; i < 5; i++) {publisher.submit(i);
// try {
// Thread.sleep(60);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }}// 发布者调用close后会触发Processor执行 onCompletepublisher.close();// publisher.submit(4); // 发布者已经调用 close,再执行submit 报错 ClosedThread.currentThread().join(2000);}
static class MyProcessor extends SubmissionPublisher<Integer> implements Processor<Integer, Integer> {
Flow.Subscription subscription;
@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println(">>> Processor onSubscribe: " + subscription.toString());this.subscription = subscription;
// 发起请求数据:发起点this.subscription.request(1);}
@Overridepublic void onNext(Integer item) {System.out.println(">>> Processor onNext 接收到数据:" + item);Integer result = item * item;
// 发射处理结果(事件结果),会触发订阅者的 onNext 方法this.submit(result);
// 接收发布者publish的下一个事件this.subscription.request(1);
// 终止任务,不在接收发布者发布的信息,也不会触发 onComplete 方法// this.subscription.cancel(); // 终止处理
// 抛出异常,会触发 onError 方法// throw new RuntimeException(">>> Processor throw a exception");}
@Overridepublic void onError(Throwable throwable) {System.out.println(">>> Processor onError");throwable.printStackTrace();}
@Overridepublic void onComplete() {System.out.println(">>> Processor onComplete");// 关闭Processor,将无法再调用 submit:即订阅者不会再收到消息this.close();}}
}
运行结果:
可以看到,程序是异步执行的,发布者发布事件,和观察者消费事件是异步非阻塞的。
2 Spring WebFlux
spring webflux 用的是响应式库是 reactive streams
,它的发布者 Publisher 主要是两个抽象类 Mono
和 Flux
从图上可以看出,Flux能发出0~N个数据项,而Mono只能发出零或一个数据项。
1 创建Flux或Mono的简单方法
package com.fmi110.springgatewaylearning;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ReactiveDemo {public static void main(String[] args) {
Flux<Integer> just = Flux.just(1, 2, 3, 4);
// 创建一个FluxFlux<Integer> ints = Flux.range(1, 4).map(i -> {if (i <= 3) return i * i;throw new RuntimeException("Got to 4");});// 订阅并消费ints.subscribe(i -> System.out.println(i), err -> System.out.println(err));
// 创建一个空数据发布者Mono<Object> emptyData = Mono.empty();Mono<String> data = Mono.just("foo");
}
}
2 基于lambda的订阅
subscribe();
subscribe(Consumer<? super T> consumer);
subscribe(Consumer<? super T> consumer,Consumer<? super Throwable> errorConsumer);
subscribe(Consumer<? super T> consumer,Consumer<? super Throwable> errorConsumer,Runnable completeConsumer);
subscribe(Consumer<? super T> consumer,Consumer<? super Throwable> errorConsumer,Runnable completeConsumer,Consumer<? super Subscription> subscriptionConsumer);
1
consumer
:正常消费,对应 onNext()2
errorConsumer
:异常时的回调,对应 onError()3
completeConsumer
:执行结束时的回调,对应 onComplete()4
subscriptionConsumer
为null或者传入一个用于设置消费最大数量的消费者(通过 Subscription#request(long)设置);或者返回一个 Disposable 用于取消订阅
实例代码:
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),error -> System.err.println("Error " + error),() -> System.out.println("Done"),sub -> sub.request(2));
运行截图:
示例中,虽然发布者发布了四个事件,但是 sub.request(2)
限定的最大消费数,所以消费了两个事件。
3 BaseSubscriber
BaseSubscriber类是一个可用于替换lambda写法的类,内部提供了 hookOnSubscribe
、hookOnNext
、hookOnComplete
、hookOnError
、hookOnCancel
、hookFinally
等钩子
package com.fmi110.springgatewaylearning;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;
public class ReactiveDemo {public static void main(String[] args) {DemoBaseSubscriber<Integer> ss = new DemoBaseSubscriber();Flux<Integer> ints = Flux.range(1, 4);ints.subscribe(ss);}
static class DemoBaseSubscriber<T> extends BaseSubscriber<T> {@Overridepublic void hookOnSubscribe(Subscription subscription) {System.out.println("Subscribed");request(1);}@Overridepublic void hookOnNext(T value) {System.out.println(">>> hookOnNext: value = "+value);request(1);}
@Overrideprotected void hookOnComplete() {super.hookOnComplete();System.out.println(">>> hookOnComplete");}
@Overrideprotected void hookFinally(SignalType type) {super.hookFinally(type);System.out.println(">>> hookFinally");System.out.println("SignalType:"+type);}}
}
更多详细用法请参考下面的链接。
参考:
Reactor Core Features
外行人都能看懂的WebFlux,错过了血亏
spring-gateway(一)Reactor编程基础相关推荐
- 什么是反应式编程(超详细说明),反应式编程和命令式编程的区别。如何使用Spring中的Reactor。Reactor中常用的操作。Mono和Flux。
文章目录 一.反应式编程初探 什么是反应式编程 为什么需要反应式编程? 反应式编程的规范 二.上手反应式编程(使用Spring中的Reactor) 对比反应式编程和命令式编程代码 添加相应依赖 Mon ...
- 对比Eureka和Nacos 的区别 Zuul和Spring Gateway的区别
目录 Eureka与Nacos的区别 1.功能差异 2.部署安装 3.稳定及扩展 4.总结 Spring gateway与zuul的区别 1. 产品对比 2 性能对比 3总结 Eureka与Nacos ...
- Spring Webflux 响应式编程 (二) - WebFlux编程实战
第一章 Reactive Stream 第1节 jdk9的响应式流 就是reactive stream,也就是flow.其实和jdk8的stream没有一点关系.说白了就一个发布-订阅模式,一共只有4 ...
- Spring - Java/J2EE Application Framework 应用框架 第 5 章 Spring AOP: Spring之面向方面编程G
第 5 章 Spring AOP: Spring之面向方面编程 5.1. 概念 面向方面编程 (AOP) 提供从另一个角度来考虑程序结构以完善面向对象编程(OOP). 面向对象将应用程序分解成 各个层 ...
- Spring - Java/J2EE Application Framework 应用框架 第 5 章 Spring AOP: Spring之面向方面编程
第 5 章 Spring AOP: Spring之面向方面编程 5.1. 概念 面向方面编程 (AOP) 提供从另一个角度来考虑程序结构以完善面向对象编程(OOP). 面向对象将应用程序分解成 各个层 ...
- 第 5 章 Spring AOP: Spring之面向方面编程
http://oss.org.cn/ossdocs/framework/spring/zh-cn/aop.html 第 5 章 Spring AOP: Spring之面向方面编程 5.1. 概念 面向 ...
- 迈入JavaWeb第一步,Java网络编程基础,TCP网络编程URL网络编程等
文章目录 网络编程概述 网络通信要素 要素一IP和端口号 要素二网络协议 TCP网络编程 UDP网络编程 URL网络编程 Java网络编程基础 网络编程概述 Java是Internet上的语言,它从语 ...
- java 编程原理_Java网络编程 -- 网络编程基础原理
Hello,今天记录下 Java网络编程 --> 网络编程基础原理. 一起学习,一起进步.继续沉淀,慢慢强大.希望这文章对您有帮助.若有写的不好的地方,欢迎评论给建议哈! 初写博客不久,我是杨展 ...
- 基础 | 这波编程基础绝了!快来学习!
hi! 我是小小,今天开始本周的第二篇,本周第二篇主要内容是Java编程基础. Java的特性优势 简单性 面向对象性 可移植性 高性能 分布式 动态性 多线程 安全性 健壮性 Java三大版本 wr ...
最新文章
- c语言 feof_C语言 实现简单功能的12306火车售票系统【附源码】
- div+css中clear用法
- python os模块详细用法
- 机器学习相关的数学资料下载
- BugKuCTF 杂项 隐写2
- Java构造方法以及重载
- Linux下安装配置git
- 冷高轮时间王思聪吃热狗数字安卓版时钟下载方法
- 展宽微带天线带宽的方法
- ERP仓库管理系统主要功能
- 华硕n54u mysql_改华硕[N14U N54U]5G 2G的7620老毛子Padavan固件(私人云储存 ari
- 乡镇特色产业发展调研报告2
- 【js学习笔记】去除省、市、区、特别行政区、自治区
- iOS模拟器iOS Simulator详细图文使用教程
- 表格维护生成器-部分字段不能修改或不能看见
- 正则表达式基本语法总结
- 外贸市场越来越淡,个人soho,小外贸B2C公司如何转型? Read more: http://liedaoshou.com/seo.html#0-sqq-1-60778-9737f6f9e09df
- HttpRequest的cookie可以传给IE了
- 怎么判断冠词用a还是an_不定冠词a和an有哪些用法
- 程序员崩溃了!年终奖怎么说黄就黄?