什么是反应流?

在许多应用程序中,不是从固定存储设备中检索数据,而是几乎实时地处理数据,而用户或其他系统会迅速将信息注入到我们的系统中。多数情况下,此数据注入是异步的,因此我们无法提前知道何时会出现数据。为了促进这种异步的数据处理方式,我们必须重新考虑较旧的基于轮询的模型,而应使用更轻便,更简化的方法。

处理器

如果实体既是发布者又是订阅者,则称为处理器。处理器通常充当另一个发布者和订阅者(​​两者都可以是另一个处理器)之间的中介,对数据流执行某种转换。例如,可以创建处理器,该处理器在将符合某些条件的项目传递给其订户之前将其过滤掉。下图显示了处理器的直观表示。

通过对反应流的工作方式有基本的了解,我们可以通过将它们编码为接口将这些概念转换为Java领域。

接口表示

根据上面的描述,响应流由四个主要实体组成:(1)发布者,(2)订阅者,(3)订阅和(4)处理器。从界面的角度来看,仅要求发布者允许订阅者订阅它们。因此,我们可以为发布者创建一个简单的接口,其中正式的泛型类型参数T代表发布者生产的商品的类型:

public interface Publisher {

public void subscribe(Subscriber super T> s);

}

此接口定义要求我们随后为订户定义接口。如上所述,订阅者具有四个主要交互:(1)订阅的通知;(2)接受推送的项目;(3)接受在订阅的发布者中发生的错误;以及(4)发布者完成时的通知。这将导致以下界面,该界面同样由其请求的项目类型进行参数化:

public interface Subscriber {

public void onSubscribe(Subscription s);

public void onNext(T t);

public void onError(Throwable t);

public void onComplete();

}

接下来,我们必须定义订阅的接口。该实体比订户更简单,并且仅负责以下两个动作:(1)接受物品请求;(2)被取消。这将导致以下接口定义:

public interface Subscription {

public void request(long n);

public void cancel();

}

最后,我们将处理器定义为发布者和订阅者接口的组合,这有一个重要的怪癖:处理器产生的商品类型可能不同于其消费的商品类型。因此,我们将使用形式化的泛型类型参数T表示处理器消耗的项目类型,并R表示其返回(或产生)的项目类型。请注意,生产者的实现可以使用和生产相同类型的项目,但是没有编译时必须这样做的限制。这将产生以下界面:

public interface Processor extends Subscriber, Publisher {}

尽管这四个接口构成了反应流的统一合同,但这些接口还必须遵守许多其他限制和预期的行为。这些规范以及上面的接口定义可以在Reactive Streams JVM Specification中找到。正如我们将在下一节中看到的那样,Reactive Stream规范的标准Java实现与Reactive Streams JVM规范的实现几乎相同,并且充当Java Standard Library中 Reactive Streams合同的标准化。

反应性流如何在Java中工作?

Reactive Streams接口的标准Java端口可在java.util.concurrent.Flow类中找到,并捆绑为Flow该类中的静态接口。删除JavaDocs后,Flow该类的定义如下:

public final class Flow {

private Flow() {} // uninstantiable

@FunctionalInterface

public static interface Publisher {

public void subscribe(Subscriber super T> subscriber);

}

public static interface Subscriber {

public void onSubscribe(Subscription subscription);

public void onNext(T item);

public void onError(Throwable throwable);

public void onComplete();

}

public static interface Subscription {

public void request(long n);

public void cancel();

}

public static interface Processor extends Subscriber, Publisher {}

}

将Reactive Streams JVM规范与标准Java定义进行比较时,没有太多新的讨论要讨论,但是标准Java版本确实包含一个发布者实现:SubmissionPublisher。的SubmissionPublisher类作为一个简单的发布者,它接受项推到使用订户submit(T item)方法。将项目提交给submit方法后,它将异步推送给订阅者,如以下示例所示:

public class PrintSubscriber implements Subscriber {

private Subscription subscription;

@Override

public void onSubscribe(Subscription subscription) {

this.subscription = subscription;

subscription.request(1);

}

@Override

public void onNext(Integer item) {

System.out.println("Received item: " + item);

subscription.request(1);

}

@Override

public void onError(Throwable error) {

System.out.println("Error occurred: " + error.getMessage());

}

@Override

public void onComplete() {

System.out.println("PrintSubscriber is complete");

}

}

public class SubmissionPublisherExample {

public static void main(String... args) throws InterruptedException {

SubmissionPublisher publisher = new SubmissionPublisher<>();

publisher.subscribe(new PrintSubscriber());

System.out.println("Submitting items...");

for (int i = 0; i < 10; i++) {

publisher.submit(i);

}

Thread.sleep(1000);

publisher.close();

}

}

运行此示例将产生以下输出:

Submitting items...

Received item: 0

Received item: 1

Received item: 2

Received item: 3

Received item: 4

Received item: 5

Received item: 6

Received item: 7

Received item: 8

Received item: 9

PrintSubscriber is complete

在我们的订阅者中,我们捕获Subscription传递给该onSubscribe方法的对象,从而允许我们Subscription 在以后与进行交互。一旦存储了Subscription 对象,我们立即通知Subscription订户已准备好接受一项(通过调用subscription.request(1))。onNext在打印收到的项目之后,我们同样会在方法中进行操作。这等于通知发布者,我们在完成项目处理后就可以接受其他项目。

在我们的主要方法中,我们只需实例化a SubmissionPublisher 和our PrintSubscriber 并将后者订阅到前者。一旦认购成立以来,我们提交的值0通过9到发布,这反过来又推动异步的值给用户。然后,订户通过将其值打印到标准输出中来处理每个项目,并通知订阅它已准备好接受另一个值。然后,我们将主线程暂停1秒钟,以允许异步提交完成。这是非常重要的一步,因为该submit 方法是异步的将提交的项目推送给其订阅者。因此,我们必须提供一个合理的时间来完成异步操作。最后,我们关闭发布者,这又通知我们的订阅者订阅已完成。

我们还可以引入处理器,并使用该处理器链接原始的发布者和订阅者。在下面的示例中,我们创建一个处理器,将接收到的值增加10,然后将增加的值推送到其订户:

public class PlusTenProcessor extends SubmissionPublisher implements Subscriber {

private Subscription subscription;

@Override

public void onSubscribe(Subscription subscription) {

this.subscription = subscription;

subscription.request(1);

}

@Override

public void onNext(Integer item) {

submit(item + 10);

subscription.request(1);

}

@Override

public void onError(Throwable error) {

error.printStackTrace();

closeExceptionally(error);

}

@Override

public void onComplete() {

System.out.println("PlusTenProcessor completed");

close();

}

}

public class SubmissionPublisherExample {

public static void main(String... args) throws InterruptedException {

SubmissionPublisher publisher = new SubmissionPublisher<>();

PlusTenProcessor processor = new PlusTenProcessor();

PrintSubscriber subscriber = new PrintSubscriber();

publisher.subscribe(processor);

processor.subscribe(subscriber);

System.out.println("Submitting items...");

for (int i = 0; i < 10; i++) {

publisher.submit(i);

}

Thread.sleep(1000);

publisher.close();

}

}

运行此示例将产生以下输出:

Submitting items...

Received item: 10

Received item: 11

Received item: 12

Received item: 13

Received item: 14

Received item: 15

Received item: 16

Received item: 17

Received item: 18

Received item: 19

PlusTenProcessor completed

PrintSubscriber is complete

就像我们期望的那样,每个推入的值都增加10,并且处理器接收到的事件(例如接收到错误或完成)被转发给订户,从而导致为PlusTenProcessor和发送了一条完成的消息PrintSubscriber。

结论

在近实时数据处理时代,反应流是事实上的标准。这种编程风格的普遍性导致了该标准的众多实现,每个实现都有自己的重复接口集。为了将这些通用接口收集到Java通用标准中,默认情况下,JDK 9现在包括Reactive Streams接口以及强大的发布者实现。正如我们在本文中所看到的,虽然这些接口在外观上并不张扬,但它们提供了一种丰富的方法来以标准的,可互换的方式处理流数据流。

最后,开发这么多年我也总结了一套学习Java的资料与面试题,如果你在技术上面想提升自己的话,可以关注我,私信发送领取资料或者在评论区留下自己的联系方式,有时间记得帮我点下转发让跟多的人看到哦。

java publisher_Java中的反应流详解相关推荐

  1. java多线程中的join方法详解

    java多线程中的join方法详解 方法Join是干啥用的? 简单回答,同步,如何同步? 怎么实现的? 下面将逐个回答. 自从接触Java多线程,一直对Join理解不了.JDK是这样说的:join p ...

  2. java 控制jsp_JSP学习之Java Web中的安全控制实例详解

    普通用户界面 修改登录的Servlet,修改后的代码如下: LoginProcess.java代码: package servlet; import javabean.User; import jav ...

  3. Java中的Stream流详解

    JDK8 Stream详解 概念 Stream是Java8 API的新成员,它允许以声明性方式处理数据集合 . 特点 (1)代码简洁:函数式编程写出的代码简洁且意图明确,使用stream接口让你从此告 ...

  4. java.IO字节流和字符流详解

    IO流的分类 在学习字节流和字符流之前因该先学习IO流的分类,效果会更好 有多种分类方式: 一种方式是按照流的方向进行分类: 以内存作为参照物, 往内存中去,叫做输入(Input).或者叫做读(Rea ...

  5. Java 8 中的 Streams API 详解

    Java 8 中的 Stream 是对集合(Collection)对象功能的增强,它专注于对集合对象进行各种非常便利.高效的聚合操作,或者大批量数据操作 .Stream API 借助于同样新出现的 L ...

  6. C++中的字符串流详解iostream,sstream

    注:本文只详解C++中的#include <iostream> ,#include <sstream>,暂不详解#include <fstream>.等后期博主会补 ...

  7. java yeild_Java 中 Thread.yield() 方法详解

    对于很多初学 Java 线程的小伙伴们,很容易将 Thread 类里的 yield() 方法理解错误,或者理解得不够透彻,先看下源码: public static native void yield( ...

  8. c++文件流读取一行_「软帝学院」Java挑战者专栏:IO流详解2

    软帝学院笔记Day18 IO流(字符流FileReader) 1.字符流是什么 字符流是可以直接读写字符的IO流 字符流读取字符, 就要先读取到字节数据, 然后转为字符. 如果要写出字符, 需要把字符 ...

  9. Java IO 字节流、字符流详解

    1. 字节流与字符流 流是一个很形象的概念,当程序需要读取数据的时候,就会开启一个通向数据源的流,这个数据源可以是文件,内存,或是网络连接.类似的,当程序需要写入数据的时候,就会开启一个通向目的地的流 ...

最新文章

  1. IOS之同步请求、异步请求、GET请求、POST请求(转)
  2. 【C 语言】文件操作 ( 配置文件读写 | 写出或更新配置文件 | 函数形参设置 | 确保打开文件成功 | 统计文件大小 )
  3. python模拟鼠标拖动_Python+Selenium自动化篇-6-模拟鼠标操作
  4. vue Mutation 必须是同步函数 为什么_Vue 开发必须知道的 36 个技巧「近1W字」
  5. 《程序员修炼之道》笔记(九)
  6. 生成二维码的 jQuery 插件:jquery.qrcode.js
  7. VMware14.0 安装 CentOS7.2
  8. git设置master权限_git怎么控制成员的权限?
  9. 内存泄漏和内存溢出的区别
  10. 安卓应用安全指南 4.2.2 创建/使用广播接收器 规则书
  11. 拓展名php,取扩展名_php
  12. Golang包管理工具之govendor的使用
  13. Java架构师具备的特点有哪些?
  14. opencv漫水填充算法
  15. IBatis的resultMap使用
  16. CentOS7图形界面模式
  17. TIT 数据库实验一 在SQL Server中创建数据库(SQL入门教学一 从0到1)
  18. 想做游戏测试,你一定要知道这几点!
  19. 西门子触摸屏脚本程序_西门子触摸屏实例程序
  20. iOS苹果开发者账号申请教程

热门文章

  1. 关于__irq的使用
  2. SIMS(secondary ion mass spectroscopy)二次离子质谱
  3. type-c转HDMI+PD+USB3.0多合一拓展转换芯片
  4. 人工神经网络(一)概述
  5. STM32 Keil5踩坑记录
  6. 超炫酷项目来袭!DragGAN 火爆开源社区!
  7. 夏日必备太阳镜的选购技巧
  8. INFA Transformation组件
  9. 卡特兰数问题——一个栈(无穷大)的进栈序列为1,2,3,…,n,有多少个不同的出栈序列?
  10. 20230322华清远见作业