java publisher_Java中的反应流详解
什么是反应流?
在许多应用程序中,不是从固定存储设备中检索数据,而是几乎实时地处理数据,而用户或其他系统会迅速将信息注入到我们的系统中。多数情况下,此数据注入是异步的,因此我们无法提前知道何时会出现数据。为了促进这种异步的数据处理方式,我们必须重新考虑较旧的基于轮询的模型,而应使用更轻便,更简化的方法。
处理器
如果实体既是发布者又是订阅者,则称为处理器。处理器通常充当另一个发布者和订阅者(两者都可以是另一个处理器)之间的中介,对数据流执行某种转换。例如,可以创建处理器,该处理器在将符合某些条件的项目传递给其订户之前将其过滤掉。下图显示了处理器的直观表示。
通过对反应流的工作方式有基本的了解,我们可以通过将它们编码为接口将这些概念转换为Java领域。
接口表示
根据上面的描述,响应流由四个主要实体组成:(1)发布者,(2)订阅者,(3)订阅和(4)处理器。从界面的角度来看,仅要求发布者允许订阅者订阅它们。因此,我们可以为发布者创建一个简单的接口,其中正式的泛型类型参数T代表发布者生产的商品的类型:
public interface Publisher {
public void subscribe(Subscriber super T> s);
}
此接口定义要求我们随后为订户定义接口。如上所述,订阅者具有四个主要交互:(1)订阅的通知;(2)接受推送的项目;(3)接受在订阅的发布者中发生的错误;以及(4)发布者完成时的通知。这将导致以下界面,该界面同样由其请求的项目类型进行参数化:
public interface Subscriber {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
接下来,我们必须定义订阅的接口。该实体比订户更简单,并且仅负责以下两个动作:(1)接受物品请求;(2)被取消。这将导致以下接口定义:
public interface Subscription {
public void request(long n);
public void cancel();
}
最后,我们将处理器定义为发布者和订阅者接口的组合,这有一个重要的怪癖:处理器产生的商品类型可能不同于其消费的商品类型。因此,我们将使用形式化的泛型类型参数T表示处理器消耗的项目类型,并R表示其返回(或产生)的项目类型。请注意,生产者的实现可以使用和生产相同类型的项目,但是没有编译时必须这样做的限制。这将产生以下界面:
public interface Processor extends Subscriber, Publisher {}
尽管这四个接口构成了反应流的统一合同,但这些接口还必须遵守许多其他限制和预期的行为。这些规范以及上面的接口定义可以在Reactive Streams JVM Specification中找到。正如我们将在下一节中看到的那样,Reactive Stream规范的标准Java实现与Reactive Streams JVM规范的实现几乎相同,并且充当Java Standard Library中 Reactive Streams合同的标准化。
反应性流如何在Java中工作?
Reactive Streams接口的标准Java端口可在java.util.concurrent.Flow类中找到,并捆绑为Flow该类中的静态接口。删除JavaDocs后,Flow该类的定义如下:
public final class Flow {
private Flow() {} // uninstantiable
@FunctionalInterface
public static interface Publisher {
public void subscribe(Subscriber super T> subscriber);
}
public static interface Subscriber {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
public static interface Subscription {
public void request(long n);
public void cancel();
}
public static interface Processor extends Subscriber, Publisher {}
}
将Reactive Streams JVM规范与标准Java定义进行比较时,没有太多新的讨论要讨论,但是标准Java版本确实包含一个发布者实现:SubmissionPublisher。的SubmissionPublisher类作为一个简单的发布者,它接受项推到使用订户submit(T item)方法。将项目提交给submit方法后,它将异步推送给订阅者,如以下示例所示:
public class PrintSubscriber implements Subscriber {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) {
System.out.println("Received item: " + item);
subscription.request(1);
}
@Override
public void onError(Throwable error) {
System.out.println("Error occurred: " + error.getMessage());
}
@Override
public void onComplete() {
System.out.println("PrintSubscriber is complete");
}
}
public class SubmissionPublisherExample {
public static void main(String... args) throws InterruptedException {
SubmissionPublisher publisher = new SubmissionPublisher<>();
publisher.subscribe(new PrintSubscriber());
System.out.println("Submitting items...");
for (int i = 0; i < 10; i++) {
publisher.submit(i);
}
Thread.sleep(1000);
publisher.close();
}
}
运行此示例将产生以下输出:
Submitting items...
Received item: 0
Received item: 1
Received item: 2
Received item: 3
Received item: 4
Received item: 5
Received item: 6
Received item: 7
Received item: 8
Received item: 9
PrintSubscriber is complete
在我们的订阅者中,我们捕获Subscription传递给该onSubscribe方法的对象,从而允许我们Subscription 在以后与进行交互。一旦存储了Subscription 对象,我们立即通知Subscription订户已准备好接受一项(通过调用subscription.request(1))。onNext在打印收到的项目之后,我们同样会在方法中进行操作。这等于通知发布者,我们在完成项目处理后就可以接受其他项目。
在我们的主要方法中,我们只需实例化a SubmissionPublisher 和our PrintSubscriber 并将后者订阅到前者。一旦认购成立以来,我们提交的值0通过9到发布,这反过来又推动异步的值给用户。然后,订户通过将其值打印到标准输出中来处理每个项目,并通知订阅它已准备好接受另一个值。然后,我们将主线程暂停1秒钟,以允许异步提交完成。这是非常重要的一步,因为该submit 方法是异步的将提交的项目推送给其订阅者。因此,我们必须提供一个合理的时间来完成异步操作。最后,我们关闭发布者,这又通知我们的订阅者订阅已完成。
我们还可以引入处理器,并使用该处理器链接原始的发布者和订阅者。在下面的示例中,我们创建一个处理器,将接收到的值增加10,然后将增加的值推送到其订户:
public class PlusTenProcessor extends SubmissionPublisher implements Subscriber {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Integer item) {
submit(item + 10);
subscription.request(1);
}
@Override
public void onError(Throwable error) {
error.printStackTrace();
closeExceptionally(error);
}
@Override
public void onComplete() {
System.out.println("PlusTenProcessor completed");
close();
}
}
public class SubmissionPublisherExample {
public static void main(String... args) throws InterruptedException {
SubmissionPublisher publisher = new SubmissionPublisher<>();
PlusTenProcessor processor = new PlusTenProcessor();
PrintSubscriber subscriber = new PrintSubscriber();
publisher.subscribe(processor);
processor.subscribe(subscriber);
System.out.println("Submitting items...");
for (int i = 0; i < 10; i++) {
publisher.submit(i);
}
Thread.sleep(1000);
publisher.close();
}
}
运行此示例将产生以下输出:
Submitting items...
Received item: 10
Received item: 11
Received item: 12
Received item: 13
Received item: 14
Received item: 15
Received item: 16
Received item: 17
Received item: 18
Received item: 19
PlusTenProcessor completed
PrintSubscriber is complete
就像我们期望的那样,每个推入的值都增加10,并且处理器接收到的事件(例如接收到错误或完成)被转发给订户,从而导致为PlusTenProcessor和发送了一条完成的消息PrintSubscriber。
结论
在近实时数据处理时代,反应流是事实上的标准。这种编程风格的普遍性导致了该标准的众多实现,每个实现都有自己的重复接口集。为了将这些通用接口收集到Java通用标准中,默认情况下,JDK 9现在包括Reactive Streams接口以及强大的发布者实现。正如我们在本文中所看到的,虽然这些接口在外观上并不张扬,但它们提供了一种丰富的方法来以标准的,可互换的方式处理流数据流。
最后,开发这么多年我也总结了一套学习Java的资料与面试题,如果你在技术上面想提升自己的话,可以关注我,私信发送领取资料或者在评论区留下自己的联系方式,有时间记得帮我点下转发让跟多的人看到哦。
java publisher_Java中的反应流详解相关推荐
- java多线程中的join方法详解
java多线程中的join方法详解 方法Join是干啥用的? 简单回答,同步,如何同步? 怎么实现的? 下面将逐个回答. 自从接触Java多线程,一直对Join理解不了.JDK是这样说的:join p ...
- java 控制jsp_JSP学习之Java Web中的安全控制实例详解
普通用户界面 修改登录的Servlet,修改后的代码如下: LoginProcess.java代码: package servlet; import javabean.User; import jav ...
- Java中的Stream流详解
JDK8 Stream详解 概念 Stream是Java8 API的新成员,它允许以声明性方式处理数据集合 . 特点 (1)代码简洁:函数式编程写出的代码简洁且意图明确,使用stream接口让你从此告 ...
- java.IO字节流和字符流详解
IO流的分类 在学习字节流和字符流之前因该先学习IO流的分类,效果会更好 有多种分类方式: 一种方式是按照流的方向进行分类: 以内存作为参照物, 往内存中去,叫做输入(Input).或者叫做读(Rea ...
- Java 8 中的 Streams API 详解
Java 8 中的 Stream 是对集合(Collection)对象功能的增强,它专注于对集合对象进行各种非常便利.高效的聚合操作,或者大批量数据操作 .Stream API 借助于同样新出现的 L ...
- C++中的字符串流详解iostream,sstream
注:本文只详解C++中的#include <iostream> ,#include <sstream>,暂不详解#include <fstream>.等后期博主会补 ...
- java yeild_Java 中 Thread.yield() 方法详解
对于很多初学 Java 线程的小伙伴们,很容易将 Thread 类里的 yield() 方法理解错误,或者理解得不够透彻,先看下源码: public static native void yield( ...
- c++文件流读取一行_「软帝学院」Java挑战者专栏:IO流详解2
软帝学院笔记Day18 IO流(字符流FileReader) 1.字符流是什么 字符流是可以直接读写字符的IO流 字符流读取字符, 就要先读取到字节数据, 然后转为字符. 如果要写出字符, 需要把字符 ...
- Java IO 字节流、字符流详解
1. 字节流与字符流 流是一个很形象的概念,当程序需要读取数据的时候,就会开启一个通向数据源的流,这个数据源可以是文件,内存,或是网络连接.类似的,当程序需要写入数据的时候,就会开启一个通向目的地的流 ...
最新文章
- IOS之同步请求、异步请求、GET请求、POST请求(转)
- 【C 语言】文件操作 ( 配置文件读写 | 写出或更新配置文件 | 函数形参设置 | 确保打开文件成功 | 统计文件大小 )
- python模拟鼠标拖动_Python+Selenium自动化篇-6-模拟鼠标操作
- vue Mutation 必须是同步函数 为什么_Vue 开发必须知道的 36 个技巧「近1W字」
- 《程序员修炼之道》笔记(九)
- 生成二维码的 jQuery 插件:jquery.qrcode.js
- VMware14.0 安装 CentOS7.2
- git设置master权限_git怎么控制成员的权限?
- 内存泄漏和内存溢出的区别
- 安卓应用安全指南 4.2.2 创建/使用广播接收器 规则书
- 拓展名php,取扩展名_php
- Golang包管理工具之govendor的使用
- Java架构师具备的特点有哪些?
- opencv漫水填充算法
- IBatis的resultMap使用
- CentOS7图形界面模式
- TIT 数据库实验一 在SQL Server中创建数据库(SQL入门教学一 从0到1)
- 想做游戏测试,你一定要知道这几点!
- 西门子触摸屏脚本程序_西门子触摸屏实例程序
- iOS苹果开发者账号申请教程