基础的api:

1 JDK9 Reactive

  • Publisher: 事件发布者

  • Subscriber: 事件订阅者

  • Processor: 事件流转过程中的处理,可包含多个阶段,同时继承了 Publisher和Subscriber两个接口

  • Subscription: 提供了 requestcancel 两个方法,用于订阅者和发布者之间的通信,Processor中的subscription通过request方法接收publisher(发布者)submit的事件。订阅者通过subscription通过request方法决定是否消费事件,实现背压back pressue,不会因为发布者发布消息过快而把订阅者压垮。

下面的使用JDK9提供的接口演示基础用法:

import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.SubmissionPublisher;
​
public class DemoTest {
​public static void main(String[] args) throws InterruptedException {// #1 创建发布者SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
​// #2 创建处理器Processor<Integer, Integer> processor = new MyProcessor();
​// #3 发布者和处理器建立关系publisher.subscribe(processor);
​Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {Flow.Subscription subscription;
​@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println(">>> subscriber onSubscribe: " + subscription.toString());this.subscription = subscription;
​// 请求一个数据this.subscription.request(1);}
​@Overridepublic void onNext(Integer item) {try {Thread.sleep(30);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(">>> subscriber onNext 接收到数据:" + item);this.subscription.request(1); // 处理下一个事件// this.subscription.cancel(); // 终止处理// throw new RuntimeException(">>> subscriber throw a exception");}
​@Overridepublic void onError(Throwable throwable) {System.out.println(">>> subscriber onError");throwable.printStackTrace();}
​@Overridepublic void onComplete() {System.out.println(">>> subscriber onComplete");}};
​processor.subscribe(subscriber);
​for (int i = 0; i < 5; i++) {publisher.submit(i);
//            try {
//                Thread.sleep(60);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }}// 发布者调用close后会触发Processor执行 onCompletepublisher.close();// publisher.submit(4); // 发布者已经调用 close,再执行submit 报错 ClosedThread.currentThread().join(2000);}
​static class MyProcessor extends SubmissionPublisher<Integer> implements Processor<Integer, Integer> {
​Flow.Subscription subscription;
​@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println(">>> Processor onSubscribe: " + subscription.toString());this.subscription = subscription;
​// 发起请求数据:发起点this.subscription.request(1);}
​@Overridepublic void onNext(Integer item) {System.out.println(">>> Processor onNext 接收到数据:" + item);Integer result = item * item;
​// 发射处理结果(事件结果),会触发订阅者的 onNext 方法this.submit(result);
​// 接收发布者publish的下一个事件this.subscription.request(1);
​// 终止任务,不在接收发布者发布的信息,也不会触发 onComplete 方法// this.subscription.cancel(); // 终止处理
​// 抛出异常,会触发 onError 方法// throw new RuntimeException(">>> Processor throw a exception");}
​@Overridepublic void onError(Throwable throwable) {System.out.println(">>> Processor onError");throwable.printStackTrace();}
​@Overridepublic void onComplete() {System.out.println(">>> Processor onComplete");// 关闭Processor,将无法再调用 submit:即订阅者不会再收到消息this.close();}}
}

运行结果:

可以看到,程序是异步执行的,发布者发布事件,和观察者消费事件是异步非阻塞的。

2 Spring WebFlux

spring webflux 用的是响应式库是 reactive streams ,它的发布者 Publisher 主要是两个抽象类 MonoFlux

从图上可以看出,Flux能发出0~N个数据项,而Mono只能发出零或一个数据项。

1 创建Flux或Mono的简单方法

package com.fmi110.springgatewaylearning;
​
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
​
public class ReactiveDemo {public static void main(String[] args) {
​Flux<Integer> just = Flux.just(1, 2, 3, 4);
​// 创建一个FluxFlux<Integer> ints = Flux.range(1, 4).map(i -> {if (i <= 3) return i * i;throw new RuntimeException("Got to 4");});// 订阅并消费ints.subscribe(i -> System.out.println(i), err -> System.out.println(err));
​// 创建一个空数据发布者Mono<Object> emptyData = Mono.empty();Mono<String> data = Mono.just("foo");
​}
}

2 基于lambda的订阅

subscribe();
​
subscribe(Consumer<? super T> consumer);
​
subscribe(Consumer<? super T> consumer,Consumer<? super Throwable> errorConsumer);
​
subscribe(Consumer<? super T> consumer,Consumer<? super Throwable> errorConsumer,Runnable completeConsumer);
​
subscribe(Consumer<? super T> consumer,Consumer<? super Throwable> errorConsumer,Runnable completeConsumer,Consumer<? super Subscription> subscriptionConsumer); 

1 consumer:正常消费,对应 onNext()

2 errorConsumer:异常时的回调,对应 onError()

3 completeConsumer:执行结束时的回调,对应 onComplete()

4 subscriptionConsumer 为null或者传入一个用于设置消费最大数量的消费者(通过 Subscription#request(long)设置);或者返回一个 Disposable 用于取消订阅

实例代码:

Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),error -> System.err.println("Error " + error),() -> System.out.println("Done"),sub -> sub.request(2));

运行截图:

示例中,虽然发布者发布了四个事件,但是 sub.request(2) 限定的最大消费数,所以消费了两个事件。

3 BaseSubscriber

BaseSubscriber类是一个可用于替换lambda写法的类,内部提供了 hookOnSubscribehookOnNexthookOnCompletehookOnErrorhookOnCancelhookFinally

等钩子

package com.fmi110.springgatewaylearning;
​
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;
​
public class ReactiveDemo {public static void main(String[] args) {DemoBaseSubscriber<Integer> ss = new DemoBaseSubscriber();Flux<Integer> ints = Flux.range(1, 4);ints.subscribe(ss);}
​
​static class DemoBaseSubscriber<T> extends BaseSubscriber<T> {@Overridepublic void hookOnSubscribe(Subscription subscription) {System.out.println("Subscribed");request(1);}@Overridepublic void hookOnNext(T value) {System.out.println(">>> hookOnNext: value = "+value);request(1);}
​@Overrideprotected void hookOnComplete() {super.hookOnComplete();System.out.println(">>> hookOnComplete");}
​@Overrideprotected void hookFinally(SignalType type) {super.hookFinally(type);System.out.println(">>> hookFinally");System.out.println("SignalType:"+type);}}
}

更多详细用法请参考下面的链接。

参考:

Reactor Core Features

外行人都能看懂的WebFlux,错过了血亏

spring-gateway(一)Reactor编程基础相关推荐

  1. 什么是反应式编程(超详细说明),反应式编程和命令式编程的区别。如何使用Spring中的Reactor。Reactor中常用的操作。Mono和Flux。

    文章目录 一.反应式编程初探 什么是反应式编程 为什么需要反应式编程? 反应式编程的规范 二.上手反应式编程(使用Spring中的Reactor) 对比反应式编程和命令式编程代码 添加相应依赖 Mon ...

  2. 对比Eureka和Nacos 的区别 Zuul和Spring Gateway的区别

    目录 Eureka与Nacos的区别 1.功能差异 2.部署安装 3.稳定及扩展 4.总结 Spring gateway与zuul的区别 1. 产品对比 2 性能对比 3总结 Eureka与Nacos ...

  3. Spring Webflux 响应式编程 (二) - WebFlux编程实战

    第一章 Reactive Stream 第1节 jdk9的响应式流 就是reactive stream,也就是flow.其实和jdk8的stream没有一点关系.说白了就一个发布-订阅模式,一共只有4 ...

  4. Spring - Java/J2EE Application Framework 应用框架 第 5 章 Spring AOP: Spring之面向方面编程G

    第 5 章 Spring AOP: Spring之面向方面编程 5.1. 概念 面向方面编程 (AOP) 提供从另一个角度来考虑程序结构以完善面向对象编程(OOP). 面向对象将应用程序分解成 各个层 ...

  5. Spring - Java/J2EE Application Framework 应用框架 第 5 章 Spring AOP: Spring之面向方面编程

    第 5 章 Spring AOP: Spring之面向方面编程 5.1. 概念 面向方面编程 (AOP) 提供从另一个角度来考虑程序结构以完善面向对象编程(OOP). 面向对象将应用程序分解成 各个层 ...

  6. 第 5 章 Spring AOP: Spring之面向方面编程

    http://oss.org.cn/ossdocs/framework/spring/zh-cn/aop.html 第 5 章 Spring AOP: Spring之面向方面编程 5.1. 概念 面向 ...

  7. 迈入JavaWeb第一步,Java网络编程基础,TCP网络编程URL网络编程等

    文章目录 网络编程概述 网络通信要素 要素一IP和端口号 要素二网络协议 TCP网络编程 UDP网络编程 URL网络编程 Java网络编程基础 网络编程概述 Java是Internet上的语言,它从语 ...

  8. java 编程原理_Java网络编程 -- 网络编程基础原理

    Hello,今天记录下 Java网络编程 --> 网络编程基础原理. 一起学习,一起进步.继续沉淀,慢慢强大.希望这文章对您有帮助.若有写的不好的地方,欢迎评论给建议哈! 初写博客不久,我是杨展 ...

  9. 基础 | 这波编程基础绝了!快来学习!

    hi! 我是小小,今天开始本周的第二篇,本周第二篇主要内容是Java编程基础. Java的特性优势 简单性 面向对象性 可移植性 高性能 分布式 动态性 多线程 安全性 健壮性 Java三大版本 wr ...

最新文章

  1. c语言 feof_C语言 实现简单功能的12306火车售票系统【附源码】
  2. div+css中clear用法
  3. python os模块详细用法
  4. 机器学习相关的数学资料下载
  5. BugKuCTF 杂项 隐写2
  6. Java构造方法以及重载
  7. Linux下安装配置git
  8. 冷高轮时间王思聪吃热狗数字安卓版时钟下载方法
  9. 展宽微带天线带宽的方法
  10. ERP仓库管理系统主要功能
  11. 华硕n54u mysql_改华硕[N14U N54U]5G 2G的7620老毛子Padavan固件(私人云储存 ari
  12. 乡镇特色产业发展调研报告2
  13. 【js学习笔记】去除省、市、区、特别行政区、自治区
  14. iOS模拟器iOS Simulator详细图文使用教程
  15. 表格维护生成器-部分字段不能修改或不能看见
  16. 正则表达式基本语法总结
  17. 外贸市场越来越淡,个人soho,小外贸B2C公司如何转型? Read more: http://liedaoshou.com/seo.html#0-sqq-1-60778-9737f6f9e09df
  18. HttpRequest的cookie可以传给IE了
  19. 怎么判断冠词用a还是an_不定冠词a和an有哪些用法
  20. 程序员崩溃了!年终奖怎么说黄就黄?

热门文章

  1. Linux下uniq筛选
  2. 谷歌将比特币现金汇率纳入货币信息搜索
  3. 圈钱的道路上廖翔从不缺席
  4. 山西流感就诊比持续上升 专家为民答疑解惑
  5. 关于B站除夕夜被攻击
  6. nyoj 234 吃土豆
  7. mysql 常用操作
  8. 你不知道的那些“XX即服务”
  9. javax.mail API
  10. Excel超级链接方式应用技巧