reactive stream 响应式流 — 简而言之,就是多了一个沟通的渠道

发布订阅者

背压 交流

Reactive Stream主要接口

java.util.concurrent.Flow 源码很重要 很有意思 多读几遍


import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;public class FlowDemo {public static void main(String[] args) throws Exception {// 1. 定义发布者, 发布的数据类型是 Integer// 直接使用jdk自带的SubmissionPublisher, 它实现了 Publisher 接口SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();// 2. 定义订阅者Subscriber<Integer> subscriber = new Subscriber<Integer>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 保存订阅关系, 需要用它来给发布者响应this.subscription = subscription;// 请求一个数据this.subscription.request(1);}@Overridepublic void onNext(Integer item) {// 接受到一个数据, 处理System.out.println("接受到数据: " + item);try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 出现了异常(例如处理数据的时候产生了异常)throwable.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}@Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println("处理完了!");}};// 3. 发布者和订阅者 建立订阅关系publiser.subscribe(subscriber);// 4. 生产数据, 并发布// 这里忽略数据生产过程for (int i = 0; i < 1000; i++) {System.out.println("生成数据:" + i);// submit是个block方法publiser.submit(i);}// 5. 结束后 关闭发布者// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭publiser.close();// 主线程延迟停止, 否则数据没有消费就退出Thread.currentThread().join(1000);// debug的时候, 下面这行需要有断点// 否则主线程结束无法debugSystem.out.println();}}

可以看看 此文 Java9 基于异步响应流的发布-订阅框架
https://juejin.im/entry/59d9ec94f265da0673750687

响应式流(ReactiveStreams)为这种非阻塞背压的异步流处理提供了一个标准。在处理系统出现过载的时候,采用异步发送信号的方式通知数据源做相应的处理。这个通知的信号就像是水管的阀门一样,关闭这个阀门会增加背压(数据源对处理系统的压力),同时也会增加处理系统的压力。

这个标准的目的是治理跨异步边界的流数据交换(比如向其他线程传输数据) ,同时确保处理系统不被缓冲数据而压垮。换一种说法,背压是这个标准模型的一个组成部分,以便允许在线程之间调停的队列被界定。特别注意,背压通信是异步的。

完整实例

import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;/*** 带 process 的 flow demo*//*** Processor, 需要继承SubmissionPublisher并实现Processor接口* * 输入源数据 integer, 过滤掉小于0的, 然后转换成字符串发布出去*/
class MyProcessor extends SubmissionPublisher<String>implements Processor<Integer, String> {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 保存订阅关系, 需要用它来给发布者响应this.subscription = subscription;// 请求一个数据this.subscription.request(1);}@Overridepublic void onNext(Integer item) {// 接受到一个数据, 处理System.out.println("处理器接受到数据: " + item);// 过滤掉小于0的, 然后发布出去if (item > 0) {this.submit("转换后的数据:" + item);}// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 出现了异常(例如处理数据的时候产生了异常)throwable.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}@Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println("处理器处理完了!");// 关闭发布者this.close();}}public class FlowDemo2 {public static void main(String[] args) throws Exception {// 1. 定义发布者, 发布的数据类型是 Integer// 直接使用jdk自带的SubmissionPublisherSubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();// 2. 定义处理器, 对数据进行过滤, 并转换为String类型MyProcessor processor = new MyProcessor();// 3. 发布者 和 处理器 建立订阅关系publiser.subscribe(processor);// 4. 定义最终订阅者, 消费 String 类型数据Subscriber<String> subscriber = new Subscriber<String>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 保存订阅关系, 需要用它来给发布者响应this.subscription = subscription;// 请求一个数据this.subscription.request(1);}@Overridepublic void onNext(String item) {// 接受到一个数据, 处理System.out.println("接受到数据: " + item);// 处理完调用request再请求一个数据this.subscription.request(1);// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 出现了异常(例如处理数据的时候产生了异常)throwable.printStackTrace();// 我们可以告诉发布者, 后面不接受数据了this.subscription.cancel();}@Overridepublic void onComplete() {// 全部数据处理完了(发布者关闭了)System.out.println("处理完了!");}};// 5. 处理器 和 最终订阅者 建立订阅关系processor.subscribe(subscriber);// 6. 生产数据, 并发布// 这里忽略数据生产过程publiser.submit(-111);publiser.submit(111);// 7. 结束后 关闭发布者// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭publiser.close();// 主线程延迟停止, 否则数据没有消费就退出Thread.currentThread().join(1000);}}

运行机制

反馈

submit是一个阻塞方法

WebFlux响应式编程基础之 4 reactive stream 响应式流相关推荐

  1. 嵌入式Linux开发环境搭建-4-嵌入式编程基础知识

    嵌入式Linux开发环境搭建-4-嵌入式编程基础知识 1.安装代码编辑器 2.交叉编译工作使用 1.安装代码编辑器 参考文档 ubuntu几款好用的代码编辑器_百度经验 安装sublime text ...

  2. 什么是反应式编程? 这里有你想要了解的反应式编程 (Reactive programming)

    理解反应式编程 你曾有过订阅报纸或者杂志的经历吗?互联网的确从传统的出版发行商那儿分得了一杯羹,但是过去订阅报纸真的是我们了解时事的最佳方式.那时,我们每天早上都会收到一份新鲜出炉的报纸,并在早饭时间 ...

  3. 响应式编程(一)什么是响应式编程

    响应式编程是相对于阻塞式编程,我们在这里主要讲的是springBoot2中响应式webflux Spring Boot 2.0 WebFlux 了解 WebFlux,首先了解下什么是 Reactive ...

  4. 响应式编程优点 有效_什么是响应式编程?

    响应式编程是一种通过异步和数据流来构建事物关系的编程模型.这里每个词都很重要,"事物的关系"是响应式编程的核心理念,"数据流"和"异步"是实 ...

  5. WebFlux响应式编程基础之 2 函数式编程 工具jclasslib bytecode viewer

    函数式编程:告诉他的功能是什么,而不是告诉他怎么做 命令式编程:怎么去做 函数式编程:不需要关注细节,利用系统已经有的API 使用jdk8自带函数接口的好处 函数接口减少接口定义 函数式接口链式操作 ...

  6. WebFlux响应式编程基础之 5 webflux服务端开发讲解

    https://blog.csdn.net/qq_27093465/article/details/64124330 debug技巧 第5章 webflux服务端开发讲解 Spring5 非组塞的开发 ...

  7. WebFlux响应式编程基础之 3 Stream 流 编程

    流水线 外部迭代和内部迭代 惰性求值 惰性求值就是终止没有调用的情况下,中间操作不会执行 package stream;import java.util.stream.IntStream;public ...

  8. WebFlux响应式编程基础之 6 webflux客户端声明式restclient框架开发讲解

    第6章 webflux客户端声明式 restclient框架开发讲解 看不懂,为什么看不懂? 写方法最主要考虑输入与输出 Feign Retrofit 框架 6-1 框架效果介绍 6-2 设计思路 6 ...

  9. ⒈响应式编程 Project Reactor 概述

    文章目录 What is Reactive Programming? Project Reactor & Reactive Programming 总览 Features Best pract ...

最新文章

  1. VMware Workstation 与 Device/Credential Guard 不兼容。在禁用 Device/Credenti
  2. C++面试题:list和vector有什么区别
  3. java args包_Java Args.positive方法代码示例
  4. dart和python混编,Flutter与iOS混编(一)
  5. boost::hana::unpack用法的测试程序
  6. php七牛分片上传_七牛视频切片方案 - 张小超fly的个人空间 - OSCHINA - 中文开源技术交流社区...
  7. centos 卸载软件_Linux服务器运维必备技能 软件包和启动项超详细整理
  8. final关键字_深入分析Java中的final关键字
  9. firefox flash插件_巧用firefox下载视频资源
  10. CentOS虚拟机时间同步
  11. 云服务器 ECS > 安全 > 安全组 > 添加安全组规则
  12. ubuntu 下mongodb安装
  13. Go基础:切片的地址和扩容
  14. 内存管理 —— ION
  15. 闲置域名解析到自己的博客
  16. java 信号处理模块_GitHub - Astoros-Fung/JavaDsp: 数字信号处理(DSP)方面的Java封装,包含常用的一些处理方法,如滤波、信号变换等等。...
  17. 共享办公室租赁,突出市场重围
  18. Windows | 查看内存频率(不用外部软件
  19. 关于我的英文名字Byron
  20. 性能测试平台搭建及简单使用(jmeter分布式+influxdb2.0+grafana)附性能测试报告

热门文章

  1. Request 对象、重定向、请求转发
  2. webapi 设置参数可为空_Web API系列(二):灵活多样的路由配置
  3. 第十七届“科大讯飞杯”高校网络友谊赛——F-排列计算
  4. Java toString()方法的要点
  5. bootstrap bootstraptable 固定列_BootStrapTable分页
  6. Cookie禁用了Session还可以用吗?
  7. Perl中的正则表达式
  8. Exchange 服务器可支持性矩阵
  9. 基于jetty9 编程构建嵌入式https 服务器
  10. WPF中播放Flash动画