我想使用Java 8 CompletableFuture和Rx-Java Observable探索一个简单的分散聚集场景。

场景很简单–产生大约10个任务,每个任务返回一个字符串,最终将结果收集到一个列表中。

顺序的

其顺序版本如下:

public void testSequentialScatterGather() throws Exception {List<String> list =IntStream.range(0, 10).boxed().map(this::generateTask).collect(Collectors.toList());logger.info(list.toString());
}private String generateTask(int i) {Util.delay(2000);return i + "-" + "test";
}

随着CompletableFuture

可以使用称为supplyAsync的实用程序方法来使方法返回CompletableFuture,我正在使用此方法的一种变体,它接受要使用的显式Executor ,而且我故意为其中一个输入抛出异常:

private CompletableFuture<String> generateTask(int i,ExecutorService executorService) {return CompletableFuture.supplyAsync(() -> {Util.delay(2000);if (i == 5) {throw new RuntimeException("Run, it is a 5!");}return i + "-" + "test";}, executorService);
}

现在分散任务:

List<CompletableFuture<String>> futures =IntStream.range(0, 10).boxed().map(i -> this.generateTask(i, executors).exceptionally(t -> t.getMessage())).collect(Collectors.toList());

在分散任务结束时,结果是CompletableFuture列表。 现在,要从中获取String列表有些棘手,这里我使用Stackoverflow中建议的一种解决方案:

CompletableFuture<List<String>> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));

这里仅使用CompletableFuture.allOf方法来构成下一步操作,一旦所有分散的任务都完成了,则一旦完成任务,期货就会再次流式传输并收集到一个字符串列表中。

然后可以异步显示最终结果:

result.thenAccept(l -> {logger.info(l.toString());
});

使用Rx-java Observable

使用Rx-java进行分散收集相对比CompletableFuture版本更干净,因为Rx-java提供了更好的方法将结果组合在一起,这也是执行分散任务的方法:

private Observable<String> generateTask(int i, ExecutorService executorService) {return Observable.<String>create(s -> {Util.delay(2000);if ( i == 5) {throw new RuntimeException("Run, it is a 5!");}s.onNext( i + "-test");s.onCompleted();}).onErrorReturn(e -> e.getMessage()).subscribeOn(Schedulers.from(executorService));
}

并分散任务:

List<Observable<String>> obs =IntStream.range(0, 10).boxed().map(i -> generateTask(i, executors)).collect(Collectors.toList());

我又有了一个Observable的列表,而我需要的是一个结果列表,Observable提供了一个合并方法来做到这一点:

Observable<List<String>> merged = Observable.merge(obs).toList();

可以订阅并在可用时打印结果:

merged.subscribe(l -> logger.info(l.toString()));

翻译自: https://www.javacodegeeks.com/2015/08/using-java-8-completablefuture-and-rx-java-observable.html

使用Java 8 CompletableFuture和Rx-Java Observable相关推荐

  1. rx.observable_使用Java 8 CompletableFuture和Rx-Java Observable

    rx.observable 我想探索一个使用Java 8 CompletableFuture和Rx-Java Observable的简单分散聚集场景. 场景很简单–产生大约10个任务,每个任务返回一个 ...

  2. java rx.observable_Rxjava2 Observable的数据变换详解及实例(二)

    1. Window 定期将来自原始Observable的数据分解为一个Observable窗口,发射这些窗口,而不是每次发射一项数据. Window 和 Buffer 类似,但不是发射来自原始Obse ...

  3. Java 8 CompletableFuture 教程

    Java 8 有大量的新特性和增强如 Lambda 表达式,Streams,CompletableFuture等.在本篇文章中我将详细解释清楚CompletableFuture以及它所有方法的使用. ...

  4. JUC系列(十一) | Java 8 CompletableFuture 异步编程

    多线程一直Java开发中的难点,也是面试中的常客,趁着还有时间,打算巩固一下JUC方面知识,我想机会随处可见,但始终都是留给有准备的人的,希望我们都能加油!!! 沉下去,再浮上来,我想我们会变的不一样 ...

  5. Java 8 CompletableFuture 教程 1

    Java 8 有大量的新特性和增强如 Lambda 表达式,Streams,CompletableFuture等.在本篇文章中我将详细解释清楚CompletableFuture以及它所有方法的使用. ...

  6. 多线程与并发 - Java 8 CompletableFuture 异步多线程

    1.一个示例回顾Future 一些业务场景我们需要使用多线程异步执行任务,加快任务执行速度. JDK5新增了Future接口,用于描述一个异步计算的结果. 虽然 Future 以及相关使用方法提供了异 ...

  7. Rx Java 异步编程框架

    Rx Java 文章目录 Rx Java 名词定义 举个例子 基本概念 Backpressure Upstream, Downstream Objects in motion Assembly tim ...

  8. JAVA基于CompletableFuture的流水线并行处理深度实践,满满干货

    在项目开发中,后端服务对外提供API接口一般都会关注响应时长.但是某些情况下,由于业务规划逻辑的原因,我们的接口可能会是一个聚合信息处理类的处理逻辑,比如我们从多个不同的地方获取数据,然后汇总处理为最 ...

  9. 一篇文章搞清楚Java中CompletableFuture的使用

    ---------- Yesterday is history, tomorrow is a mystery, but today is a gift. That is why it's called ...

最新文章

  1. 数据结构学习笔记(2)
  2. java 8u111 8u112_JDK 8U112
  3. php 数据库 自增值,Mysql应用MySql数据库自动递增值问题
  4. Splash resource_timeout 属性
  5. android Binder机制(一)架构设计
  6. python接管已经打开的浏览器_Python Webdriver 从新使用已经打开的浏览器实例
  7. MySQL5.7 group by新特性报错1055的解决办法
  8. 汇编学习--7.16--中断
  9. DevTools 无法加载源映射
  10. Java基础入门及安装准备
  11. vue项目添加百度统计及设置埋点
  12. 服务端使用GZIP压缩数据
  13. 计算机学院早操规定,计算机学院早操动员大会顺利召开
  14. 一个C语言算法--税收计算
  15. 百分制成绩转换五分制F
  16. simplify-js 降低曲线拟合使用的点数
  17. 比字节还小的计算机单位,电脑里,字节是最小单位吗
  18. 应用系统如何与外部渠道进行对接?java代码实现篇
  19. JVM监控常用的6个命令行工具
  20. python宏观经济研究应用_宏观经济学研究通常用什么软件?

热门文章

  1. 07-MyBatis 核心配置文件
  2. JavaScript表单
  3. 19年8月 字母哥 第六章 生命周期内的拦截过滤与监听 用热点公司网不行
  4. java.util.concurrent.locks.Lock文档说明
  5. java线程——什么是线程?
  6. java编程学习方法_在线学习Java编程的最佳方法
  7. 实践与反思_在行动中反思的实践
  8. cov/cor中有遗漏值_协调遗漏的效果–使用简单的NIO客户端/服务器测量回送延迟...
  9. java 性能调优_Java性能调优调查结果(第一部分)
  10. 测试Maven版本插件自动递增版本号