rx.observable_使用Java 8 CompletableFuture和Rx-Java Observable
rx.observable
我想探索一个使用Java 8 CompletableFuture和Rx-Java Observable的简单分散聚集场景。
场景很简单–产生大约10个任务,每个任务返回一个字符串,最终将结果收集到一个列表中。
顺序的
其顺序版本如下:
public void testSequentialScatterGather() throws Exception {List<String> list =IntStream.range(0, 10).boxed().map(this::generateTask).collect(Collectors.toList());logger.info(list.toString());
}private String generateTask(int i) {Util.delay(2000);return i + "-" + "test";
}
随着CompletableFuture
可以使用称为supplyAsync的实用程序方法来使方法返回CompletableFuture,我正在使用此方法的一种变体,它接受要使用的显式Executor ,而且我故意为其中一个输入抛出异常:
private CompletableFuture<String> generateTask(int i,ExecutorService executorService) {return CompletableFuture.supplyAsync(() -> {Util.delay(2000);if (i == 5) {throw new RuntimeException("Run, it is a 5!");}return i + "-" + "test";}, executorService);
}
现在分散任务:
List<CompletableFuture<String>> futures =IntStream.range(0, 10).boxed().map(i -> this.generateTask(i, executors).exceptionally(t -> t.getMessage())).collect(Collectors.toList());
在分散任务结束时,结果是CompletableFuture列表。 现在,要从中获取String列表有些棘手,这里我使用Stackoverflow中建议的一种解决方案:
CompletableFuture<List<String>> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
CompletableFuture.allOf方法在这里纯粹用于组成下一步操作,一旦所有分散的任务都完成,则一旦完成任务,期货就会再次流式传输并收集到字符串列表中。
然后可以异步显示最终结果:
result.thenAccept(l -> {logger.info(l.toString());
});
使用Rx-java Observable
使用Rx-java进行分散收集比使用CompletableFuture版本相对更清洁,因为Rx-java提供了更好的方法将结果组合在一起,这也是执行分散任务的方法:
private Observable<String> generateTask(int i, ExecutorService executorService) {return Observable.<String>create(s -> {Util.delay(2000);if ( i == 5) {throw new RuntimeException("Run, it is a 5!");}s.onNext( i + "-test");s.onCompleted();}).onErrorReturn(e -> e.getMessage()).subscribeOn(Schedulers.from(executorService));
}
并分散任务:
List<Observable<String>> obs =IntStream.range(0, 10).boxed().map(i -> generateTask(i, executors)).collect(Collectors.toList());
我又有了一个Observable的列表,而我需要的是一个结果列表,Observable提供了一个合并方法来做到这一点:
Observable<List<String>> merged = Observable.merge(obs).toList();
可以订阅并在可用时打印结果:
merged.subscribe(l -> logger.info(l.toString()));
翻译自: https://www.javacodegeeks.com/2015/08/using-java-8-completablefuture-and-rx-java-observable.html
rx.observable
rx.observable_使用Java 8 CompletableFuture和Rx-Java Observable相关推荐
- rx.observable_在Spring MVC流中使用rx-java Observable
rx.observable Spring MVC现在已经支持异步请求处理流程了一段时间,该支持内部利用了Tomcat / Jetty等容器的Servlet 3异步支持. Spring Web Asyn ...
- Java 8 CompletableFuture 教程
Java 8 有大量的新特性和增强如 Lambda 表达式,Streams,CompletableFuture等.在本篇文章中我将详细解释清楚CompletableFuture以及它所有方法的使用. ...
- JUC系列(十一) | Java 8 CompletableFuture 异步编程
多线程一直Java开发中的难点,也是面试中的常客,趁着还有时间,打算巩固一下JUC方面知识,我想机会随处可见,但始终都是留给有准备的人的,希望我们都能加油!!! 沉下去,再浮上来,我想我们会变的不一样 ...
- Java 8 CompletableFuture 教程 1
Java 8 有大量的新特性和增强如 Lambda 表达式,Streams,CompletableFuture等.在本篇文章中我将详细解释清楚CompletableFuture以及它所有方法的使用. ...
- 使用Java 8 CompletableFuture和Rx-Java Observable
我想使用Java 8 CompletableFuture和Rx-Java Observable探索一个简单的分散聚集场景. 场景很简单–产生大约10个任务,每个任务返回一个字符串,最终将结果收集到一个 ...
- 多线程与并发 - Java 8 CompletableFuture 异步多线程
1.一个示例回顾Future 一些业务场景我们需要使用多线程异步执行任务,加快任务执行速度. JDK5新增了Future接口,用于描述一个异步计算的结果. 虽然 Future 以及相关使用方法提供了异 ...
- JAVA基于CompletableFuture的流水线并行处理深度实践,满满干货
在项目开发中,后端服务对外提供API接口一般都会关注响应时长.但是某些情况下,由于业务规划逻辑的原因,我们的接口可能会是一个聚合信息处理类的处理逻辑,比如我们从多个不同的地方获取数据,然后汇总处理为最 ...
- 一篇文章搞清楚Java中CompletableFuture的使用
---------- Yesterday is history, tomorrow is a mystery, but today is a gift. That is why it's called ...
- [转]【JAVA各版本特性】JAVA 1.0
闲来想了解下各版本之间的特性,搜索没有最新的特性说明,故想写一份.废话不多说. JDK Version 1.0 1996-01-23 Oak(橡树) 初代版本,伟大的一个里程碑,但是是纯解释运行,使用 ...
最新文章
- 解决pytorch半精度amp训练nan问题
- Adam又要“退休”了?耶鲁大学团队提出AdaBelief,却引来网友质疑
- 小鱼天气android,小鱼天气(cn.microsoft.cig.uair) - 1.3.14 - 应用 - 酷安
- 利用fastjson对json转map的操作
- buck电路上下管_推荐 | 学好电路设计与仿真?你不能错过这两本书籍 ~
- android 聊天功能实现,Android聊天背景功能实现
- Mbatis是什么?怎么运行?
- 配置git账号和密码
- 学习网页前的网页知识储备
- 【涡动协方差及能量平衡系统】
- sql语句中表格缩写命名_数据库表、字段命名规范
- Invalid prop:type check failed for prop“value“.Expected String with value“8“,got Number with value 8
- C语言编程题最简分式,C语言 程序设计入门 最简分式
- 魔兽版无间道,5区一骗情骗装备的垃圾战士(zz)
- Zabbix使用指南
- 计算机主机内的零件有什么用,ROM和RAM分别是什么?有什么区别?与电脑的什么配件的作用是一? 爱问知识人...
- 《小猪佩奇拜年歌》在QQ音乐和网易云音乐上线
- 机器学习之梯度下降法(GD)、随机梯度下降法(SGD)和随机平均梯度下降法(SAGD)
- USB音频编解码芯片电路方案设计(原理图)|TYPEC音频方案|TYPEC扩展坞方案|USB音频方案
- 修改yum源为国内yum源和本地yum源
热门文章
- P1297-[国家集训队]单选错位【期望概率】
- 亿些模板【字符串+其他】
- P2158,jzoj1709-仪仗队【欧拉函数,数论】
- codeforces1481 E. Sorting Books(贪心+dp)
- codeforces1167 E. Range Deleting(双指针)
- 【最小生成树】水箱(P5952)
- 【动态规划】石子合并 (ssl 2863)
- Codeforces1080F. Katya and Segments Sets
- SoundHound Inc. Programming Contest 2018[C. Ordinary Beauty]
- Java 9 新特性概述