文章目录

  • 背景
  • 什么是reactive stream
  • 深入了解java版本的reactive stream
    • Publisher
    • Subscriber
    • Subscription
    • Processor
  • JDK中reactive stream的实现
  • 总结

背景

Stream大家应该都很熟悉了,java8中为所有的集合类都引入了Stream的概念。优雅的链式操作,流式处理逻辑,相信用过的人都会爱不释手。

每个数据流都有一个生产者一个消费者。生产者负责产生数据,而消费者负责消费数据。如果是同步系统,生产一个消费一个没什么问题。但是如果在异步系统中,就会产生问题。

因为生产者无法感知消费者的状态,不知道消费者到底是繁忙状态还是空闲状态,是否有能力去消费更多的数据。

一般来说数据队列的长度都是有限的,即使没有做限制,但是系统的内存也是有限的。当太多的数据没有被消费的话,会导致内存溢出或者数据得不到即使处理的问题。

这时候就需要back-pressure了。

如果消息接收方消息处理不过来,则可以通知消息发送方,告知其正在承受压力,需要降低负载。back-pressure是一种消息反馈机制,从而使系统得以优雅地响应负载, 而不是在负载下崩溃。

而reactive stream的目的就是用来管理异步服务的流数据交换,并能够让接收方自主决定接受数据的频率。back-pressure就是reactive stream中不可或缺的一部分。

更多内容请访问www.flydean.com

什么是reactive stream

上面我们讲到了reactive stream的作用,大家应该对reactive stream有了一个基本的了解。这里我们再给reactive stream做一个定义:

reactive stream就是一个异步stream处理的标准,它的特点就是非阻塞的back pressure。

reactive stream只是一个标准,它定义了实现非阻塞的back pressure的最小区间的接口,方法和协议。

所以reactive stream其实有很多种实现的,不仅仅是java可以使用reactive stream,其他的编程语言也可以。

reactive stream只是定义了最基本的功能,各大实现在实现了基本功能的同时可以自由扩展。

目前reactive stream最新的java版本是1.0.3,是在2019年8月23发布的。它包含了java API,协议定义文件,测试工具集合和具体的实现例子。

深入了解java版本的reactive stream

在介绍java版本的reactive stream之前,我们先回顾一下reactive stream需要做哪些事情:

  1. 能够处理无效数量的消息
  2. 消息处理是有顺序的
  3. 可以异步的在组件之间传递消息
  4. 一定是非阻塞和backpressure的

为了实现这4个功能,reactive stream定义了4个接口,Publisher,Subscriber,Subscription,Processor。这四个接口实际上是一个观察者模式的实现。接下来我们详细来分析一下各个接口的作用和约定。

Publisher

先看下Publisher的定义:

public interface Publisher<T> {public void subscribe(Subscriber<? super T> s);
}

Publisher就是用来生成消息的。它定义了一个subscribe方法,传入一个Subscriber。这个方法用来将Publisher和Subscriber进行连接。

一个Publisher可以连接多个Subscriber。

每次调用subscribe建立连接,都会创建一个新的Subscription,Subscription和subscriber是一一对应的。

一个Subscriber只能够subscribe一次Publisher。

如果subscribe失败或者被拒绝,则会出发Subscriber.onError(Throwable)方法。

Subscriber

先看下Subscriber的定义:

public interface Subscriber<T> {public void onSubscribe(Subscription s);public void onNext(T t);public void onError(Throwable t);public void onComplete();
}

Subscriber就是消息的接收者。

在Publisher和Subscriber建立连接的时候会触发onSubscribe(Subscription s)方法。

当调用Subscription.request(long)方法时,onNext(T t)会被触发,根据request请求参数的大小,onNext会被触发一次或者多次。

在发生异常或者结束时会触发onError(Throwable t)或者onComplete()方法。

Subscription

先看下Subscription的定义:

public interface Subscription {public void request(long n);public void cancel();
}

Subscription代表着一对一的Subscriber和Publisher之间的Subscribe关系。

request(long n)意思是向publisher请求多少个events,这会触发Subscriber.onNext方法。

cancel()则是请求Publisher停止发送信息,并清除资源。

Processor

先看下Processor的定义:

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

Processor即是Subscriber又是Publisher,它代表着一种处理状态。

JDK中reactive stream的实现

在JDK中java.util.concurrent.Flow就是reactive stream语义的一种实现。

Flow从JDK9就开始有了。我们看下它的结构:

从上图我们可以看到在JDK中Flow是一个final class,而Subscriber,Publisher,Subscription,Processor都是它的内部类。

我们会在后面的文章中继续讲解JDK中Flow的使用。敬请期待。

总结

reactive stream的出现有效的解决了异步系统中的背压问题。只不过reactive stream只是一个接口标准或者说是一种协议,具体的实现还需要自己去实现。

更多精彩内容且看:

  • 区块链从入门到放弃系列教程-涵盖密码学,超级账本,以太坊,Libra,比特币等持续更新
  • Spring Boot 2.X系列教程:七天从无到有掌握Spring Boot-持续更新
  • Spring 5.X系列教程:满足你对Spring5的一切想象-持续更新
  • java程序员从小工到专家成神之路(2020版)-持续更新中,附详细文章教程

本文作者:flydean程序那些事

本文链接:http://www.flydean.com/reactive-stream-protocol/

本文来源:flydean的博客

欢迎关注我的公众号:程序那些事,更多精彩等着您!

reactive stream协议详解相关推荐

  1. rtmp官方协议详解

    标准规范学习: rtmp消息结构,包括几个部分: 时戳:4  byte,单位毫秒.超过最大值后会翻转. 长度:消息负载的长度. 类型ID:Type Id 一部分ID范围用于rtmp的控制信令.还有一部 ...

  2. zz:NETCONF协议详解

    随着SDN的大热,一个诞生了十年之久的协议焕发了第二春,它就是NETCONF协议.如果你在两年前去搜索NETCONF协议,基本得到的信息都是"这个协议是一个网管协议,主要目的是弥补SNMP协 ...

  3. RTMP协议封装H264和H265协议详解

    RTMP协议封装H264和H265协议详解 文章目录 RTMP协议封装H264和H265协议详解 1 RTMP和FLV 2 RTMP协议封装H264视频流 2.1 RTMP发送AVC sequence ...

  4. 【计算机网络面试高频】-HTTP协议详解,HTTP协议常见问题

    4.HTTP协议详解有关问题 4.1 HTTP请求 一个HTTP请求报文由请求行.请求头部.空行和请求数据四个部分组成. 1.请求行 请求行中有请求方法字段.URL字段和HTTP协议版本3个字段组成. ...

  5. HTTP/2 协议详解

    HTTP/2 协议详解 作者保留所有权利.All rights reserved. 关于作者 目录 HTTP/1.x 简介 要想深刻的了解 HTTP/2 ,那么我们必须对 HTTP/1.x 本身以及它 ...

  6. AXI接口协议详解-AXI总线、接口、协议

    转自:https://cloud.tencent.com/developer/article/1695010 AXI接口协议详解-AXI总线.接口.协议 AXI 总线 上面介绍了AMBA总线中的两种, ...

  7. 最详细的流媒体传输协议-rtsp协议详解

    流媒体传输协议-rtsp协议详解 参阅:RTSP协议详解和分析从零开始写一个RTSP服务器(一)RTSP协议讲解关于RTSP_RTP_RTCP协议的深刻初步介绍 rtsp RTSP出现以前,最热的大概 ...

  8. Telnet协议详解及使用C# 用Socket 编程来实现Telnet协议 - Atpking - 博客园

    Telnet协议详解及使用C# 用Socket 编程来实现Telnet协议 - Atpking - 博客园 Telnet协议详解及使用C# 用Socket 编程来实现Telnet协议 - Atpkin ...

  9. NETCONF协议详解

    目录 目录 概述 NETCONF11协议详解 1 安全传输层 2 消息层 21 hello 211 几种常用的能力 22 rpc和rpc-reply 23 notification 3 操作层 31 ...

最新文章

  1. 服务器虚拟化集成项目投标方案,服务器虚拟化投标技术方案(纯方案,16页).doc...
  2. python 清华镜像_Python pip conda 设置为清华镜像
  3. 运行里用\\加IP地址访问远程主机和用mstsc登录远程主机有什么区别??
  4. 【解决方案】UserWarning: Possibly corrupt EXIF data.导致读取图片失败
  5. k3 物料收发明细表sql_物料管理:班组长物料使用应知应会4件事
  6. C. Goodbye Souvenir(CDQ 或 树套树)
  7. Web项目替换jar包中的文件的方法
  8. python requests 上传文件_Python requests上传文件实现步骤
  9. nginx优缺点 优化
  10. mysql php pdo 迭代器_php – 创建PDO迭代器
  11. 经典面试题:有序矩阵的快速查找
  12. oracle+imp部分数据,ORACLE数据库IMP逻辑导入出现ORA-00910的处理案例
  13. Angular4+ 页面切换 显示进度条
  14. spring 依赖注入_这几个关于Spring 依赖注入的问题你清楚吗?
  15. ArcMap数据处理
  16. Django--学生管理系统(django慢更)
  17. 腾讯地图标记点击事件
  18. Ajax --- 获取服务器端的响应
  19. Android 简单实现三级列表
  20. 【FTP】FTP常用命令,持续更新中……

热门文章

  1. 5个常用的Python功能代码
  2. TCP长连接,短连接
  3. Redis的内存淘汰策略问题
  4. 面试官:为什么 wait() 方法需要写在循环里?
  5. Go 超时引发大量 fin-wait2
  6. 计算机基础- -应用和硬件的关系
  7. 实时远程医学影像服务质量保障与网络优化
  8. LiveVideoStackCon讲师热身分享 ( 四 ) —— 基于强化学习的自动码率调节
  9. 设计模式C++实现(4)——桥接模式
  10. 腾讯TAPD发布2018企业敏捷协作数据报告