webflux之reactor-Subscriber
在进行第二个demo之前。先来讲解一下reactive-streams官网中有关Subscriber的一些规则。这里将基于第一篇reactor博客中的demo进行讲解。demo1。
public interface Subscriber<T> {public void onSubscribe(Subscription s);public void onNext(T t);public void onError(Throwable t);public void onComplete();
}
Subscriber规则
订阅者必须通过Subscription.request(long n)发信号通知需要接收onNext信号。
此规则的目的是确定订阅者有责任决定它能够和愿意接收的元素的时间和数量。为了避免由重入的订阅方法引起的信号重新排序,强烈建议同步订阅服务器实现在任何信号处理的最后调用订阅方法。建议订阅者请求他们能够处理的上限,因为一次只请求一个元素会导致本身效率低下的“停止等待”协议。
代码1 LambdaSubscriber类@Override public final void onSubscribe(Subscription s) {//这里是对重入做了验证,当前订阅不为空则取消新订阅并返回falseif (Operators.validate(subscription, s)) {this.subscription = s;if (subscriptionConsumer != null) {try {subscriptionConsumer.accept(s);}catch (Throwable t) {Exceptions.throwIfFatal(t);s.cancel();onError(t);}}else {//这里进行了request的调用,并且是直接调用可以处理的上限s.request(Long.MAX_VALUE);}} }/*** Check Subscription current state and cancel new Subscription if current is set,* or return true if ready to subscribe.** @param current current Subscription, expected to be null* @param next new Subscription* @return true if Subscription can be used*/ public static boolean validate(@Nullable Subscription current, Subscription next) {Objects.requireNonNull(next, "Subscription cannot be null");if (current != null) {next.cancel();//reportSubscriptionSet();return false;}return true; }
如果订阅者怀疑其信号处理将对其发布者的响应性产生负面影响,则建议异步调度其信号。
此规则的目的是订阅者不应该从执行的角度阻碍Publisher的进度。换句话说,订阅者不应该使发布者饥饿,没法使用CPU周期。Subscriber.onComplete()和Subscriber.onError(Throwable t)绝不能调用Subscription或Publisher上的任何方法。
此规则的目的是在处理完成信号期间防止发布者,订阅和订阅者之间的周期和竞争条件。
代码2 LambdaSubscriber类//这里2个方法都没有调用Subscription或Publisher上的方法@Override public final void onComplete() {Subscription s = S.getAndSet(this, Operators.cancelledSubscription());if (s == Operators.cancelledSubscription()) {return;}if (completeConsumer != null) {try {completeConsumer.run();}catch (Throwable t) {Exceptions.throwIfFatal(t);onError(t);}} }@Override public final void onError(Throwable t) {Subscription s = S.getAndSet(this, Operators.cancelledSubscription());if (s == Operators.cancelledSubscription()) {Operators.onErrorDropped(t, Context.empty());return;}if (errorConsumer != null) {errorConsumer.accept(t);}else {throw Exceptions.errorCallbackNotImplemented(t);} }
Subscriber.onComplete()和Subscriber.onError(Throwable t)必须考虑在收到信号后取消订阅。
这条规则可以参考Publisher的规则6订阅者必须在onSubscribe信号之后调用给定订阅的Subscription.cancel(),如果它已经有一个活动的订阅。
此规则的目的是防止两个或更多个单独的发布者认为他们可以与同一个订阅服务器进行交互。执行此规则意味着防止资源泄漏,因为将取消额外的订阅。
这条规则的实现可以参考代码1中的逻辑判断if (Operators.validate(subscription, s)) {
如果不再需要订阅,订阅者必须调用Subscription.cancel()。
此规则的目的是确定订阅者不能仅在不再需要订阅时抛弃订阅,他们必须调用取消,以便可以安全,及时地回收该订阅所持有的资源。订阅者必须确保其订阅上的所有呼叫都来自同一线程或提供相应的外部同步。
此规则的目的是确定如果订阅者将由两个或多个线程同时使用订阅,则必须添加外部同步。订阅者必须准备好在调用Subscription.cancel()之后接收一个或多个onNext信号,如果仍有请求的元素待解决。 Subscription.cancel()不保证立即执行底层清理操作。
此规则的目的是强调调用cancel()和发布者注意到取消之间可能存在延迟。订阅者必须准备好接收带有或不带有预设Subscription.request(long n)调用的onComplete信号。
此规则的目的是确定完成与需求流无关,这允许提前完成的流,并且不需要轮询完成。订阅者必须准备好接收带有或不带有预设Subscription.request(long n)调用的onError信号。
同第9条规则订阅者必须确保在处理相应信号之前,所有对其信号方法的调用都已经准备好。即订阅者必须注意将信号正确发布到其处理逻辑。
此规则的目的是确定订阅服务器实现的责任是确保其信号的异步处理是线程安全的。请参阅第17.4.5节中的Happens-Before的JMM定义。对于给定的订阅者,必须最多调用一次Subscriber.onSubscribe(基于对象相等)。
代码3 FluxRange类
//从这个方法可以看到onSubscribe方法的调用最终只会进入到一个判断逻辑,符合了规则12 @Override @SuppressWarnings("unchecked") public void subscribe(CoreSubscriber<? super Integer> actual) {long st = start;long en = end;if (st == en) {Operators.complete(actual);return;} elseif (st + 1 == en) {actual.onSubscribe(Operators.scalarSubscription(actual, (int)st));return;}if (actual instanceof ConditionalSubscriber) {actual.onSubscribe(new RangeSubscriptionConditional((ConditionalSubscriber<? super Integer>) actual, st, en));return;}actual.onSubscribe(new RangeSubscription(actual, st, en)); }
调用onSubscribe,onNext,onError或onComplete必须正常返回,除非任何提供的参数为null,在这种情况下它必须向调用者抛出java.lang.NullPointerException,对于所有其他情况,订阅者发出信号失败的唯一合法方式是取消其订阅。 在违反此规则的情况下,任何与订阅服务器相关联的订阅必须被视为已取消,并且调用者必须以适合运行时环境的方式提高此错误条件。
此规则的目的是为订阅者的方法建立语义,以及在违反此规则的情况下允许发布者执行的操作。 «以适合运行时环境的方式提高此错误条件»可能意味着记录错误 - 或者让某人或某事意识到这种情况 - 因为错误无法通知错误的订阅者。
这条规则的实现可以参考LambdaSubscriber类中的onSubscribe,onNext,onError和onComplete方法的源码,前面的规则中都有展示过源码,这里就不再赘述了。
Processor规则
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
- 处理器代表一个处理阶段 - 既是订阅者又是发布者,必须遵守两者的规则条约。
- 处理器可以选择恢复onError信号。如果它选择这样做,它必须考虑取消订阅,否则它必须立即将onError信号传播给它的订阅者。
webflux之reactor-Subscriber相关推荐
- webflux系列--reactor源码(一)
文章目录 基础 顶级接口 reactor 核心原理 core 声明阶段 FluxOperator OptimizableOperator InternalFluxOperator InnerOpera ...
- webflux系列--reactor功能
创建一个新的Flux just 指定序列中包含的全部元素.创建出来的 Flux 序列在发布这些元素之后会自动结束. 即有限序列. public static <T> Flux<T&g ...
- webflux系列--reactor源码(二)
操作符(Operator) 合并多个Flux combineLatest concat concatMap merge repeat cache 行为(behavior) 聚合操作 collect r ...
- Spring Webflux 响应式编程 (二) - WebFlux编程实战
第一章 Reactive Stream 第1节 jdk9的响应式流 就是reactive stream,也就是flow.其实和jdk8的stream没有一点关系.说白了就一个发布-订阅模式,一共只有4 ...
- Spring5 新特性之 webflux
议题: 为什么要使用 Web Flux 从 Web MVC 过度到 Web Flux 函数式 Endpoint 为什么要使用 Web Flux 非阻塞编程 NIO Reactive 函数式编程 Lam ...
- Reactor编程之旅
文章目录 lamda与FunctionalInterface Reactive Programming.Reactive Streams和Reactor Thread per Connection 和 ...
- springwebflux 页面_Spring WebFlux 入门
1. WebFlux介绍 Spring WebFlux 是 Spring Framework 5.0中引入的新的响应式web框架.与Spring MVC不同,它不需要Servlet API,是完全异步 ...
- springboot异步注解_Spring Boot 2 :Spring Boot 中的响应式编程和 WebFlux 入门
[小宅按]Spring 5.0 中发布了重量级组件 Webflux,拉起了响应式编程的规模使用序幕. WebFlux 使用的场景是异步非阻塞的,使用 Webflux 作为系统解决方案,在大多数场景下可 ...
- Reactive(3)5分钟理解 SpringBoot 响应式的核心-Reactor
目录 一.前言 二. Mono 与 Flux 构造器 三. 流计算 1. 缓冲 2. 过滤/提取 3. 转换 4. 合并 5. 合流 6. 累积 四.异常处理 五.线程调度 小结 参考阅读 一.前言 ...
- (转)Spring Boot 2 (十):Spring Boot 中的响应式编程和 WebFlux 入门
http://www.ityouknow.com/springboot/2019/02/12/spring-boot-webflux.html Spring 5.0 中发布了重量级组件 Webflux ...
最新文章
- 这些 Python 不为人知的「坑」,躲都躲不开
- 下列不是python对文件的读操作方法是-大工20春《数据挖掘》在线作业1【参考答案】...
- C++ string 类常用函数
- java阶乘求和正负交替_C语言程序设计课件第4章090909
- 关系式调用c语言脚本_认识LoadRunner脚本语言
- JSF 2.0/2.1 生命周期简介
- 个人作业5——软件工程总结
- mysql事务锁导致tomcat崩溃_数据库连接池连接耗尽,导致tomcat请求无响应,呈现出假死状态...
- 数据科学入门与实战:玩转pandas之三
- 常见移动机器人轮直径校准(图片版)
- 升级到ASP.NET2.0之后的疑问
- .NET框架怎样解决DLL Hell问题?
- 前向算法(Forward Algorithm)
- 如何七周成为数据分析师
- 深入浅出AOP(一)
- Ubuntu下编译vtk(java版本)【超详细-带过程截图】
- 论项目管理中当面沟通的重要性
- 债券收益率预测模型_基于时间序列模型的可转换债券收益率的实证研究
- 带上萌宠去上班 | IT办公室宠物报告
- 不知哪位仁兄有该软件,csdn好像不提供下载了哦
热门文章
- 微信小程序订阅消息报错,by user TAP gesture (适用于tabBar页面)
- 高德导航过程中实时获取道路信息
- 2021牛客寒假算法基础集训营1-E-三棱锥之刻-(计算几何)
- 30 行代码实现蚂蚁森林自动收能量
- PyTorch系列 | correct += (predicted == labels).sum().item()的理解
- 电子邮箱号码大全,至尊邮为你打开邮箱的正确格式
- UWB通信中TOF技术详解
- UWB定位与蓝牙定位的优缺点分析
- mac nmap 的下载
- wifi和服务器之间通信协议,wifi模块串口通信协议.doc