rx.observable

我想探索一个使用Java 8 CompletableFuture和Rx-Java Observable的简单分散聚集场景。

场景很简单–产生大约10个任务,每个任务返回一个字符串,最终将结果收集到一个列表中。

顺序的

其顺序版本如下:

public void testSequentialScatterGather() throws Exception {List<String> list =IntStream.range(0, 10).boxed().map(this::generateTask).collect(Collectors.toList());logger.info(list.toString());
}private String generateTask(int i) {Util.delay(2000);return i + "-" + "test";
}

随着CompletableFuture

可以使用称为supplyAsync的实用程序方法来使方法返回CompletableFuture,我正在使用此方法的一种变体,它接受要使用的显式Executor ,而且我故意为其中一个输入抛出异常:

private CompletableFuture<String> generateTask(int i,ExecutorService executorService) {return CompletableFuture.supplyAsync(() -> {Util.delay(2000);if (i == 5) {throw new RuntimeException("Run, it is a 5!");}return i + "-" + "test";}, executorService);
}

现在分散任务:

List<CompletableFuture<String>> futures =IntStream.range(0, 10).boxed().map(i -> this.generateTask(i, executors).exceptionally(t -> t.getMessage())).collect(Collectors.toList());

在分散任务结束时,结果是CompletableFuture列表。 现在,要从中获取String列表有些棘手,这里我使用Stackoverflow中建议的一种解决方案:

CompletableFuture<List<String>> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));

CompletableFuture.allOf方法在这里纯粹用于组成下一步操作,一旦所有分散的任务都完成,则一旦完成任务,期货就会再次流式传输并收集到字符串列表中。

然后可以异步显示最终结果:

result.thenAccept(l -> {logger.info(l.toString());
});

使用Rx-java Observable

使用Rx-java进行分散收集比使用CompletableFuture版本相对更清洁,因为Rx-java提供了更好的方法将结果组合在一起,这也是执行分散任务的方法:

private Observable<String> generateTask(int i, ExecutorService executorService) {return Observable.<String>create(s -> {Util.delay(2000);if ( i == 5) {throw new RuntimeException("Run, it is a 5!");}s.onNext( i + "-test");s.onCompleted();}).onErrorReturn(e -> e.getMessage()).subscribeOn(Schedulers.from(executorService));
}

并分散任务:

List<Observable<String>> obs =IntStream.range(0, 10).boxed().map(i -> generateTask(i, executors)).collect(Collectors.toList());

我又有了一个Observable的列表,而我需要的是一个结果列表,Observable提供了一个合并方法来做到这一点:

Observable<List<String>> merged = Observable.merge(obs).toList();

可以订阅并在可用时打印结果:

merged.subscribe(l -> logger.info(l.toString()));

翻译自: https://www.javacodegeeks.com/2015/08/using-java-8-completablefuture-and-rx-java-observable.html

rx.observable

rx.observable_使用Java 8 CompletableFuture和Rx-Java Observable相关推荐

  1. rx.observable_在Spring MVC流中使用rx-java Observable

    rx.observable Spring MVC现在已经支持异步请求处理流程了一段时间,该支持内部利用了Tomcat / Jetty等容器的Servlet 3异步支持. Spring Web Asyn ...

  2. Java 8 CompletableFuture 教程

    Java 8 有大量的新特性和增强如 Lambda 表达式,Streams,CompletableFuture等.在本篇文章中我将详细解释清楚CompletableFuture以及它所有方法的使用. ...

  3. JUC系列(十一) | Java 8 CompletableFuture 异步编程

    多线程一直Java开发中的难点,也是面试中的常客,趁着还有时间,打算巩固一下JUC方面知识,我想机会随处可见,但始终都是留给有准备的人的,希望我们都能加油!!! 沉下去,再浮上来,我想我们会变的不一样 ...

  4. Java 8 CompletableFuture 教程 1

    Java 8 有大量的新特性和增强如 Lambda 表达式,Streams,CompletableFuture等.在本篇文章中我将详细解释清楚CompletableFuture以及它所有方法的使用. ...

  5. 使用Java 8 CompletableFuture和Rx-Java Observable

    我想使用Java 8 CompletableFuture和Rx-Java Observable探索一个简单的分散聚集场景. 场景很简单–产生大约10个任务,每个任务返回一个字符串,最终将结果收集到一个 ...

  6. 多线程与并发 - Java 8 CompletableFuture 异步多线程

    1.一个示例回顾Future 一些业务场景我们需要使用多线程异步执行任务,加快任务执行速度. JDK5新增了Future接口,用于描述一个异步计算的结果. 虽然 Future 以及相关使用方法提供了异 ...

  7. JAVA基于CompletableFuture的流水线并行处理深度实践,满满干货

    在项目开发中,后端服务对外提供API接口一般都会关注响应时长.但是某些情况下,由于业务规划逻辑的原因,我们的接口可能会是一个聚合信息处理类的处理逻辑,比如我们从多个不同的地方获取数据,然后汇总处理为最 ...

  8. 一篇文章搞清楚Java中CompletableFuture的使用

    ---------- Yesterday is history, tomorrow is a mystery, but today is a gift. That is why it's called ...

  9. [转]【JAVA各版本特性】JAVA 1.0

    闲来想了解下各版本之间的特性,搜索没有最新的特性说明,故想写一份.废话不多说. JDK Version 1.0 1996-01-23 Oak(橡树) 初代版本,伟大的一个里程碑,但是是纯解释运行,使用 ...

最新文章

  1. 解决pytorch半精度amp训练nan问题
  2. Adam又要“退休”了?耶鲁大学团队提出AdaBelief,却引来网友质疑
  3. 小鱼天气android,小鱼天气(cn.microsoft.cig.uair) - 1.3.14 - 应用 - 酷安
  4. 利用fastjson对json转map的操作
  5. buck电路上下管_推荐 | 学好电路设计与仿真?你不能错过这两本书籍 ~
  6. android 聊天功能实现,Android聊天背景功能实现
  7. Mbatis是什么?怎么运行?
  8. 配置git账号和密码
  9. 学习网页前的网页知识储备
  10. 【涡动协方差及能量平衡系统】
  11. sql语句中表格缩写命名_数据库表、字段命名规范
  12. Invalid prop:type check failed for prop“value“.Expected String with value“8“,got Number with value 8
  13. C语言编程题最简分式,C语言 程序设计入门 最简分式
  14. 魔兽版无间道,5区一骗情骗装备的垃圾战士(zz)
  15. Zabbix使用指南
  16. 计算机主机内的零件有什么用,ROM和RAM分别是什么?有什么区别?与电脑的什么配件的作用是一? 爱问知识人...
  17. 《小猪佩奇拜年歌》在QQ音乐和网易云音乐上线
  18. 机器学习之梯度下降法(GD)、随机梯度下降法(SGD)和随机平均梯度下降法(SAGD)
  19. USB音频编解码芯片电路方案设计(原理图)|TYPEC音频方案|TYPEC扩展坞方案|USB音频方案
  20. 修改yum源为国内yum源和本地yum源

热门文章

  1. P1297-[国家集训队]单选错位【期望概率】
  2. 亿些模板【字符串+其他】
  3. P2158,jzoj1709-仪仗队【欧拉函数,数论】
  4. codeforces1481 E. Sorting Books(贪心+dp)
  5. codeforces1167 E. Range Deleting(双指针)
  6. 【最小生成树】水箱(P5952)
  7. 【动态规划】石子合并 (ssl 2863)
  8. Codeforces1080F. Katya and Segments Sets
  9. SoundHound Inc. Programming Contest 2018[C. Ordinary Beauty]
  10. Java 9 新特性概述