一、JDK9响应式编程

Java是一个“古老”并且广泛应用的编程语言,但Java9中引入了一些新鲜有趣的特性。这篇文章主要介绍FlowAPI这个新特性,通过FlowAPI我们仅仅使用JDK就能够搭建响应式应用程序,而不需要其他额外的类库,如RxJava或Project Reactor。

尽管如此,当你看到过接口文档后你就会明白到正如字面所说,这只是一个API而已。她仅仅包含了一些Interface和一个实现类:

1.Interface Flow.Publisher<T>定义了生产数据和控制事件的方法。
2.Interface Flow.Subscriber<T>定义了消费数据和事件的方法。
3.Interface Flow.Subscription 定义了链接Publisher和Subscriber的方法。
4.Interface Flow.Processor<T,R>定义了转换Publisher到Subscriber的方法
5.最后,class SubmissionPublisher<T>是Flow.Publisher<T>的实现,她可以灵活的生产数据,同时与Reactive Stream兼容。

虽然Java9中没有很多FlowAPI的实现类可供我们使用,但是依靠这些接口第三方可以提供的响应式编程得到了规范和统一,比如从JDBC driver到RabbitMQ的响应式实现。

其中Publisher为数据发布者,Subscriber为数据订阅者,Subscription为发布者和订阅者之间的订阅关系,Processor为数据处理器。

  • 关系图:

二、Pull,Push,Pull-Push

我对响应式编程的理解是, 这是一种数据消费者控制数据流的编程方式。需要指出是,当消费速度低于生产速度时,消费者要求生产者降低速度以完全消费数据(这个现象称作back-pressure(背压))。这种处理方式不是在制造混乱,你可能已经使用过这种模式,只是最近因为在主要框架和平台上使用才变得更流行,比如Java9,Spring5。另外在分布式系统中处理大规模数据传输时也使用到了这种模式。

回顾过去可以帮我们更好的理解这种模式。

  • pull模式

几年前,最常见的消费数据模式是pull-based。client端不断轮询服务端以获取数据。这种模式的优点是当client端资源有限时可以更好的控制数据流(停止轮询),而缺点是当服务端没有数据时轮询是对计算资源和网络资源的浪费。

  • push模式

随着时间推移,处理数据的模式转变为push-based,生产者不关心消费者的消费能力,直接推送数据。这种模式的缺点是当消费资源低于生产资源时会造成缓冲区溢出从而数据丢失,当丢失率维持在较小的数值时还可以接受,但是当这个比率变大时我们会希望生产者降速以避免大规模数据丢失。

  • pull-push模式

响应式编程是一种pull-push混合模式以综合他们的优点,这种模式下消费者负责请求数据以控制生产者数据流,同时当处理资源不足时也可以选择阻断或者丢弃数据,接下来我们会看到一个典型案例。

三、Flow与Stream

响应式编程并不是为了替换传统编程,其实两者相互兼容而且可以互相协作完成任务。Java8中引入的StreamAPI通过map,reduce以及其他操作可以完美的处理数据集,而FlowAPI则专注于处理数据的流通,比如对数据的请求,减速,丢弃,阻塞等。同时你可以使用Streams作为数据源(publisher),当必要时阻塞丢弃其中的数据。你也可以在Subscriber中使用Streams以进行数据的归并操作。更值得一提的是:reactive streams不仅兼容传统编程方式,而且还支持函数式编程以极大的提高可读性和可维护性。有一点可能会使我们感到困惑:如果你需要在两个系统间传输数据,同时进行转形操作,如何使用Flows和Streams来完成?这种情况下,我们使用Java8的Function来做数据转换,但是如何在Publisher和Subscriber之间使用StreamAPI呢?答案是我们可以在Publisher和Subscriber之间再加一个subscriber,她可以从最初的publisher获取数据,转换,然后再作为一个新的publisher,而使最初的subscriber订阅这个新的publisher,也是Java9中的接口Flow.Processor<T,R>,我们只需要实现这个接口并编写转换数据的functions。从技术上讲,我们完全可以使用Flows来替换Streams,但任何时候都这么做就显得过于偏激。比如,我们创建一个Publisher来作为int数组的数据源,然后在Processor中转换Integer为String,最后创建一个Subscriber来归并到一个String中。这个时候就完全没有必要使用Flows,因为这不是在控制两个模块或两个线程间的数据通信,这个时候使用Streams更为合理。

四、例子

  1. Publisher

Publisher部分的源码如下所示:

它是一个函数式接口,只包含一个subscribe方法,通过这个方法将数据发布出去。

  1. Subscriber

Subscriber部分的源码如下所示:


该接口包含了四个方法:

  1. Subscription

Subscription部分的源码如下所示:

  1. Processor
    Processor部分的代码如下所示:


它是一个空接口,但是它继承了Publisher和Subscriber,所以它既能发布数据也能订阅数据。基于这个特性,它可以充当数据转换的角色,先从数据发布者那接收数据项,然后经过处理后再发布给最终的数据订阅者。

  1. 发布订阅示例

接下来我们举个数据发布和数据订阅的简单示例,以此了解Java 9 Flow API的使用。先入为主,直接贴出整个示例代码:

public class FlowApiTest {public static void main(String[] args) throws InterruptedException {// 1. 定义 String 类型的数据发布者,JDK 9自带的// SubmissionPublisher 实现了 PublisherSubmissionPublisher<String> publisher = new SubmissionPublisher<>();// 2. 创建一个订阅者,用于接收发布者的消息Subscriber<String> subscriber = new Subscriber<>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 通过 Subscription 和发布者保持订阅关系,并用它来给发布者反馈this.subscription = subscription;// 请求一个数据this.subscription.request(1);}@Overridepublic void onNext(String item) {// 接收发布者发布的消息System.out.println("【订阅者】接收消息 <------ " + item);// 接收后再次请求一个数据this.subscription.request(1);// 如果不想再接收数据,也可以直接调用 cancel,表示不再接收了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 过程中出现异常会回调这个方法System.out.println("【订阅者】数据接收出现异常," + throwable);// 出现异常,取消订阅,告诉发布者我不再接收数据了// 实际测试发现,只要订阅者接收消息出现异常,进入了这个回调// 订阅者就不会再继续接收消息了this.subscription.cancel();}@Overridepublic void onComplete() {// 当发布者发出的数据都被接收了,// 并且发布者关闭后,会回调这个方法System.out.println("【订阅者】数据接收完毕");}};// 3. 发布者和订阅者需要建立关系publisher.subscribe(subscriber);// 4. 发布者开始发布数据for (int i = 0; i < 10; i++) {String message = "hello flow api " + i;System.out.println("【发布者】发布消息 ------> " + message);publisher.submit(message);}// 5. 发布结束后,关闭发布者publisher.close();// main线程延迟关闭,不然订阅者还没接收完消息,线程就被关闭了Thread.currentThread().join(2000);}
}

上面使用JDK 自带的Publisher实现类SubmissionPublisher来发布 String类型的数据,然后用匿名实现类的方式创建了一个Subscriber实现类。接着使用SubmissionPublisher的subscribe方法来为发布者和订阅者建立关系。建立关系后,发布者就可以发布数据,接收者也开始接收数据。详细的说明注释里都写了,这里就不再赘述代码的逻辑了。

  1. 模拟背压

所谓的背压(Backpressure)通俗的讲就是数据接收者的压力,传统模式下,发布者只关心数据的创造与发布,而当数据发布速率远高于数据接收速率的时候,数据接收者缓冲区将被填满,无法再接收数据。发布者并不关心这些,依旧不断地发送数据,所以就造成了IO阻塞。基于响应式模型实现的Flow API可以很好地解决这个问题。在Java 9的Flow API定义中,Subscriber会将Publisher发布的数据缓冲在Subscription中,其长度默认为256

假如当这个缓冲区都被填满后,Publisher将会停止发送数据,直到Subscriber接收了数据Subscription有空闲位置的时候,Publisher才会继续发布数据,而非一味地发个不停。下面用代码来演示这个情况:

public class BPFlowTest {public static void main(String[] args) throws InterruptedException {// 1. 定义String类型的数据发布者,JDK 9自带的// SubmissionPublisher实现了 PublisherSubmissionPublisher<String> publisher = new SubmissionPublisher<>();// 2. 创建一个订阅者,用于接收发布者的消息Subscriber<String> subscriber = new Subscriber<>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 通过 Subscription 和发布者保持订阅关系,并用它来给发布者反馈this.subscription = subscription;// 请求一个数据this.subscription.request(1);}@Overridepublic void onNext(String item) {// 接收发布者发布的消息System.out.println("【订阅者】接收消息 <------ " + item);// 模拟接收数据缓慢,让缓冲池填满try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}// 接收后再次请求一个数据,表示我已经处理完了,你可以再发数据过来了this.subscription.request(1);// 如果不想再接收数据,也可以直接调用cancel,表示不再接收了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 过程中出现异常会回调这个方法System.out.println("【订阅者】数据接收出现异常," + throwable);// 出现异常,取消订阅,告诉发布者我不再接收数据了// 实际测试发现,只要订阅者接收消息出现异常,进入了这个回调// 订阅者就不会再继续接收消息了this.subscription.cancel();}@Overridepublic void onComplete() {// 当发布者发出的数据都被接收了,// 并且发布者关闭后,会回调这个方法System.out.println("【订阅者】数据接收完毕");}};// 3. 发布者和订阅者需要建立关系publisher.subscribe(subscriber);// 4. 发布者开始发布数据for (int i = 0; i < 500; i++) {String message = "hello flow api " + i;System.out.println("【发布者】发布消息 ------> " + message);publisher.submit(message);}// 5. 发布结束后,关闭发布者publisher.close();// main线程延迟关闭,不然订阅者还没接收完消息,线程就被关闭了Thread.currentThread().join(20000);}
}

上面代码中,我们在Subscriber的onNext方法中用下面的代码模拟延迟,让数据处理过程维持在2秒左右:

try {TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {e.printStackTrace();
}

然后数据发布量调整到了500,当程序启动的时候,由于数据发布的速度非常快(普通for循环),所以数据订阅者的数据缓冲区瞬间被填满,于是你会看到下面这个情况,只有当数据订阅者处理了一个数据的时候,数据发布者才会相应地再次发布一个新数据:

  1. Processor示例

Processor的使用也很简单,其实它就是Publisher和Subscriber的结合体,充当数据处理的角色,通常的做法是用它来接收发布者发布的消息,然后进行相应的处理,再将数据发布出去,供消息订阅者接收。下面是一个Processor用法的简单示例:

public class ProcessorTest {static class MyProcessor extends SubmissionPublisher<String> implements Processor<String, String> {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 通过 Subscription 和发布者保持订阅关系,并用它来给发布者反馈this.subscription = subscription;// 请求一个数据this.subscription.request(1);}@Overridepublic void onNext(String item) {// 接收发布者发布的消息System.out.println("【处理器】接收消息 <------ " + item);// 处理器将消息进行转换String newItem = "【处理器加工后的数据: " + item + "】";this.submit(newItem);// 接收后再次请求一个数据,表示我已经处理完了,你可以再发数据过来了this.subscription.request(1);// 如果不想再接收数据,也可以直接调用cancel,表示不再接收了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 过程中出现异常会回调这个方法System.out.println("【处理器】数据接收出现异常," + throwable);// 出现异常,取消订阅,告诉发布者我不再接收数据了this.subscription.cancel();}@Overridepublic void onComplete() {System.out.println("【处理器】数据处理完毕");// 处理器处理完数据后关闭this.close();}}public static void main(String[] args) throws InterruptedException {// 1. 定义String类型的数据发布者,JDK 9自带的// SubmissionPublisher实现了 PublisherSubmissionPublisher<String> publisher = new SubmissionPublisher<>();// 2. 创建处理器,用于接收发布者发布的消息,// 转换后再发送给订阅者MyProcessor processor = new MyProcessor();// 3. 发布者和处理器建立订阅的关系publisher.subscribe(processor);// 4.创建一个订阅者,用于接收处理器的消息Subscriber<String> subscriber = new Subscriber<>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {this.subscription = subscription;this.subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("【订阅者】接收消息 <------ " + item + "");this.subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.out.println("【订阅者】数据接收出现异常," + throwable);this.subscription.cancel();}@Overridepublic void onComplete() {System.out.println("【订阅者】数据接收完毕");}};// 5. 处理器和订阅者建立订阅关系processor.subscribe(subscriber);// 6. 发布者开始发布数据for (int i = 0; i < 10; i++) {String message = "hello flow api " + i;System.out.println("【发布者】发布消息 ------> " + message);publisher.submit(message);}// 7. 发布结束后,关闭发布者publisher.close();// main线程延迟关闭,不然订阅者还没接收完消息,线程就被关闭了Thread.currentThread().join(2000);}
}


参考文章
参考文章

从JDK9的Flow接口说起相关推荐

  1. Reactive Streams规范及常见库

    一.什么是Reactive Streams Reactive Streams is an initiative to provide a standard for asynchronous strea ...

  2. spring的Webflux

    Spring5 框架新功能(Webflux) 1.SpringWebflux 介绍 (1)是 Spring5 添加新的模块,用于 web 开发的,功能和 SpringMVC 类似的,Webflux 使 ...

  3. java消费降速的一般方案_Java9 Flow API(译)

    原文链接 这篇文章中,会展示一个Java9中FlowAPI的列子,通过Publisher和Subscriber接口来构建响应式程序.最后你将会理解这种全新的编程模式和她的优缺点.所有的代码可在Gith ...

  4. 使用JDK9提供的模块化系统,来定义自己的模块

    JDK9提供的模块化系统 Java模块化系统的背景 模块是什么 模块化的目标 可靠的配置 强封装 增强可扩展性能和可维护性 可定制的运行环境 模块的类型 1.具名模块(Named Module) 2. ...

  5. 深度学习编译器Data Flow和Control Flow

    深度学习编译器Data Flow和Control Flow 本文介绍了一下深度学习框架的Data Flow和Control Flow,基于TensorFlow解释了TensorFlow是如何在静态图中 ...

  6. HDU 3549 Flow Problem(最大流模版EK算法)

    题目链接 第一道最大流,赤裸裸的模版题,刚好可以熟悉模版用.今天看了一下最大流,就看了一个EK算法,感觉有点和二分图匹配算法有点相似,对于最大流问题有点了解了,不过为什么这么做,也不是 很懂,只是把代 ...

  7. python输入参数改变图形_Python基于Tensor FLow的图像处理操作详解

    本文实例讲述了Python基于Tensor FLow的图像处理操作.分享给大家供大家参考,具体如下: 在对图像进行深度学习时,有时可能图片的数量不足,或者希望网络进行更多的学习,这时可以对现有的图片数 ...

  8. CAS (10) —— JBoss EAP 6.4下部署CAS时出现错误exception.message=Error decoding flow execution的解决办法...

    CAS (10) -- JBoss EAP 6.4下部署CAS时出现错误exception.message=Error decoding flow execution的解决办法 jboss版本: jb ...

  9. Git Flow—Git团队协作最佳实践

    一.规范的Git使用 Git是一个很好的版本管理工具,不过相比于传统的版本管理工具,学习成本比较高. 实际开发中,如果团队成员比较多,开发迭代频繁,对Git的应用比较混乱,会产生很多不必要的冲突或者代 ...

最新文章

  1. ICMP协议抓包分析-wireshark
  2. MegaSAS RAID卡 BBU Learn Cycle周期的影响
  3. Ubuntu上安装Robomongo及添加到启动器
  4. python3.6找到不_sqlite3模块
  5. tp5写的系统比php源码写的慢多少,基于TP5框架开发的极速企业网站开发框架PHP源码...
  6. np.percentile获取中位数、百分位数
  7. pytroch预训练网络ResNet
  8. p73_万维网和HTTP协议
  9. iOS-代码实现TableViewCell创建多个样式的Cell
  10. 模电:集成运算放大器2
  11. 推荐10个程序员常去的网站
  12. DCOM配置出错: 不小心删除DCOM配置中,“我的电脑”属性的Everyone权限导致......
  13. 华为月薪11万招前端工程师,看到要求我傻眼了!
  14. java 四舍六入五成双
  15. 贺泓胜:2.24黄金今日走势分析操作建议,黄金原油解套指导
  16. 表情识别(二)--基于CNN分类
  17. 【求助】救救“这个可怜的孩子”
  18. 算法都是套路系列-动态规划模板
  19. Unity中游戏存档方式
  20. ATK MT9V034摄像头的学习(二基础知识)

热门文章

  1. list dict 性能测试
  2. SQL注入到EXP编写
  3. bzoj2049 [Sdoi2008]Cave 洞穴勘测
  4. [改善Java代码]覆写equals方法必须覆写hashCode方法
  5. hdu-5003 Osu!(水题)
  6. 在PowerDesigner中设置字段唯一约束 --相当于unique
  7. 吴恩达 coursera ML 第十七课总结+作业答案
  8. CentOS系统NAT共享上网
  9. 【Paddle】解压文件到指定文件夹
  10. [云炬创业基础笔记]第五章创业计划评估16