使用Java 8 CompletableFuture和Rx-Java Observable
我想使用Java 8 CompletableFuture和Rx-Java Observable探索一个简单的分散聚集场景。
场景很简单–产生大约10个任务,每个任务返回一个字符串,最终将结果收集到一个列表中。
顺序的
其顺序版本如下:
public void testSequentialScatterGather() throws Exception {List<String> list =IntStream.range(0, 10).boxed().map(this::generateTask).collect(Collectors.toList());logger.info(list.toString());
}private String generateTask(int i) {Util.delay(2000);return i + "-" + "test";
}
随着CompletableFuture
可以使用称为supplyAsync的实用程序方法来使方法返回CompletableFuture,我正在使用此方法的一种变体,它接受要使用的显式Executor ,而且我故意为其中一个输入抛出异常:
private CompletableFuture<String> generateTask(int i,ExecutorService executorService) {return CompletableFuture.supplyAsync(() -> {Util.delay(2000);if (i == 5) {throw new RuntimeException("Run, it is a 5!");}return i + "-" + "test";}, executorService);
}
现在分散任务:
List<CompletableFuture<String>> futures =IntStream.range(0, 10).boxed().map(i -> this.generateTask(i, executors).exceptionally(t -> t.getMessage())).collect(Collectors.toList());
在分散任务结束时,结果是CompletableFuture列表。 现在,要从中获取String列表有些棘手,这里我使用Stackoverflow中建议的一种解决方案:
CompletableFuture<List<String>> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
这里仅使用CompletableFuture.allOf方法来构成下一步操作,一旦所有分散的任务都完成了,则一旦完成任务,期货就会再次流式传输并收集到一个字符串列表中。
然后可以异步显示最终结果:
result.thenAccept(l -> {logger.info(l.toString());
});
使用Rx-java Observable
使用Rx-java进行分散收集相对比CompletableFuture版本更干净,因为Rx-java提供了更好的方法将结果组合在一起,这也是执行分散任务的方法:
private Observable<String> generateTask(int i, ExecutorService executorService) {return Observable.<String>create(s -> {Util.delay(2000);if ( i == 5) {throw new RuntimeException("Run, it is a 5!");}s.onNext( i + "-test");s.onCompleted();}).onErrorReturn(e -> e.getMessage()).subscribeOn(Schedulers.from(executorService));
}
并分散任务:
List<Observable<String>> obs =IntStream.range(0, 10).boxed().map(i -> generateTask(i, executors)).collect(Collectors.toList());
我又有了一个Observable的列表,而我需要的是一个结果列表,Observable提供了一个合并方法来做到这一点:
Observable<List<String>> merged = Observable.merge(obs).toList();
可以订阅并在可用时打印结果:
merged.subscribe(l -> logger.info(l.toString()));
翻译自: https://www.javacodegeeks.com/2015/08/using-java-8-completablefuture-and-rx-java-observable.html
使用Java 8 CompletableFuture和Rx-Java Observable相关推荐
- rx.observable_使用Java 8 CompletableFuture和Rx-Java Observable
rx.observable 我想探索一个使用Java 8 CompletableFuture和Rx-Java Observable的简单分散聚集场景. 场景很简单–产生大约10个任务,每个任务返回一个 ...
- java rx.observable_Rxjava2 Observable的数据变换详解及实例(二)
1. Window 定期将来自原始Observable的数据分解为一个Observable窗口,发射这些窗口,而不是每次发射一项数据. Window 和 Buffer 类似,但不是发射来自原始Obse ...
- Java 8 CompletableFuture 教程
Java 8 有大量的新特性和增强如 Lambda 表达式,Streams,CompletableFuture等.在本篇文章中我将详细解释清楚CompletableFuture以及它所有方法的使用. ...
- JUC系列(十一) | Java 8 CompletableFuture 异步编程
多线程一直Java开发中的难点,也是面试中的常客,趁着还有时间,打算巩固一下JUC方面知识,我想机会随处可见,但始终都是留给有准备的人的,希望我们都能加油!!! 沉下去,再浮上来,我想我们会变的不一样 ...
- Java 8 CompletableFuture 教程 1
Java 8 有大量的新特性和增强如 Lambda 表达式,Streams,CompletableFuture等.在本篇文章中我将详细解释清楚CompletableFuture以及它所有方法的使用. ...
- 多线程与并发 - Java 8 CompletableFuture 异步多线程
1.一个示例回顾Future 一些业务场景我们需要使用多线程异步执行任务,加快任务执行速度. JDK5新增了Future接口,用于描述一个异步计算的结果. 虽然 Future 以及相关使用方法提供了异 ...
- Rx Java 异步编程框架
Rx Java 文章目录 Rx Java 名词定义 举个例子 基本概念 Backpressure Upstream, Downstream Objects in motion Assembly tim ...
- JAVA基于CompletableFuture的流水线并行处理深度实践,满满干货
在项目开发中,后端服务对外提供API接口一般都会关注响应时长.但是某些情况下,由于业务规划逻辑的原因,我们的接口可能会是一个聚合信息处理类的处理逻辑,比如我们从多个不同的地方获取数据,然后汇总处理为最 ...
- 一篇文章搞清楚Java中CompletableFuture的使用
---------- Yesterday is history, tomorrow is a mystery, but today is a gift. That is why it's called ...
最新文章
- 数据结构学习笔记(2)
- java 8u111 8u112_JDK 8U112
- php 数据库 自增值,Mysql应用MySql数据库自动递增值问题
- Splash resource_timeout 属性
- android Binder机制(一)架构设计
- python接管已经打开的浏览器_Python Webdriver 从新使用已经打开的浏览器实例
- MySQL5.7 group by新特性报错1055的解决办法
- 汇编学习--7.16--中断
- DevTools 无法加载源映射
- Java基础入门及安装准备
- vue项目添加百度统计及设置埋点
- 服务端使用GZIP压缩数据
- 计算机学院早操规定,计算机学院早操动员大会顺利召开
- 一个C语言算法--税收计算
- 百分制成绩转换五分制F
- simplify-js 降低曲线拟合使用的点数
- 比字节还小的计算机单位,电脑里,字节是最小单位吗
- 应用系统如何与外部渠道进行对接?java代码实现篇
- JVM监控常用的6个命令行工具
- python宏观经济研究应用_宏观经济学研究通常用什么软件?
热门文章
- 07-MyBatis 核心配置文件
- JavaScript表单
- 19年8月 字母哥 第六章 生命周期内的拦截过滤与监听 用热点公司网不行
- java.util.concurrent.locks.Lock文档说明
- java线程——什么是线程?
- java编程学习方法_在线学习Java编程的最佳方法
- 实践与反思_在行动中反思的实践
- cov/cor中有遗漏值_协调遗漏的效果–使用简单的NIO客户端/服务器测量回送延迟...
- java 性能调优_Java性能调优调查结果(第一部分)
- 测试Maven版本插件自动递增版本号