rxjava 并行

通过RxJava 1.1.1中引入的Completable抽象,如何并行执行阻止“仅副作用”(也称为void)任务的并行执行变得更加容易。

正如您可能已经注意到,阅读我的博客时,我主要专注于软件Craft.io和自动代码测试。 但是,此外,我还是持续交付和广义并发的狂热者。 最后一点从C中的纯线程和信号量到更高级别的解决方案(例如ReactiveX和actor模型)不等。 这次是全新RxJava 1.1.1 – rx.Completable引入的非常方便(在特定情况下)功能的用例。 与我的许多博客条目类似,这也反映了我在处理实际任务和用例时遇到的实际事件。

要做的任务

想象一下,一个系统对来自不同来源的异步事件进行了非常复杂的处理。 过滤,合并,转换,分组,丰富等等。 RxJava非常适合这里,特别是如果我们想要React式的话。 假设我们已经实现了它(外观和效果很好),只剩下一件事了。 在开始处理之前,需要告知3个外部系统我们准备好接收消息。 对遗留系统的3个同步调用(通过RMI,JMX或SOAP)。 它们每个都可以持续几秒钟,我们需要等待所有它们之后才能开始。 幸运的是,它们已经实现,我们将它们视为可能成功(或因异常而失败)的黑匣子。 我们只需要调用它们(最好同时调用)并等待完成即可。

rx.Observable –方法1

触手可及的RxJava似乎是显而易见的方法。 首先,可以使用Observable来包装作业执行:

private Observable<Void> rxJobExecute(Job job) {return Observable.fromCallable(() -> { job.execute();return null;});
}

不幸的是(在我们的例子中) Observable期望返回一些元素。 我们需要使用Void并且尴尬地return null (而不是仅仅引用方法job::execute

接下来,我们可以使用subscribeOn()方法来使用另一个线程来执行我们的作业(而不是阻塞主/当前线程–我们不想顺序执行我们的作业)。 Schedulers.io()为调度Schedulers.io()提供了一组用于IO绑定工作的线程。

Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());
Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());

最后,我们需要等待所有它们完成(所有Obvervable s完成)。 为此,可以调整zip功能。 它结合了Obserbable拉链发射的物品的序列号。 在我们的例子中,我们只对每个Observable到的作业中的第一个伪项目感兴趣(我们仅发出null以满足API),并以阻塞的方式等待它们。 zip运算符中的zip函数需要返回某些内容,因此我们需要重复null的变通方法。

Observable.zip(run1, run2, (r1, r2) -> return null).toBlocking().single();

显而易见, Observable设计为Observable使用值流,并且需要进行一些额外的工作才能将其调整为仅产生副作用(不返回任何值)操作。 当我们需要将仅具有副作用的操作与其他返回一些值的值合并(例如合并)时,情况变得更加糟糕–需要更丑陋的转换。 请参阅RxNetty API的实际用例 。

public void execute() {Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());Observable.zip(run1, run2, (r1, r2) -> null).toBlocking().single();
}private Observable<Void> rxJobExecute(Job job) {return Observable.fromCallable(() -> { job.execute();return null;});
}

rx.Observable –方法2

可能会使用另一种方法。 代替生成人工项目,可以将我们的任务的空Observable作为onComplete操作执行。 这迫使我们从zip操作切换到merge 。 结果,我们需要提供一个onNext动作(对于空的Observable永远不会执行),这肯定了我们试图破解该系统的信念。

public void execute() {Observable<Object> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());Observable<Object> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());Observable.merge(run1, run2).toBlocking().subscribe(next -> {});
}private Observable<Object> rxJobExecute(Job job) {return Observable.empty().doOnCompleted(job::execute);
}

rx.Completable

RxJava 1.1.1解决了对不返回任何值的Observable的更好支持。 Completable可以视为Observable的简化版本,可以成功完成(发出onCompleted事件)或失败( onError )。 创建Completable实例的最简单方法是使用fromAction方法,该方法采用不返回任何值的Action0 (例如Runnable )。

Completable completable1 = Completable.fromAction(job1::execute).subscribeOn(Schedulers.io());
Completable completable2 = Completable.fromAction(job2::execute).subscribeOn(Schedulers.io());

接下来,我们可以使用merge()方法,该方法返回一个Completable实例,该实例立即订阅所有下游Completable ,并在它们全部完成(或其中一个失败)时完成。 由于我们在外部调度程序中使用了subscribeOn方法,因此所有作业都是并行执行的(在不同的线程中)。

Completable.merge(completable1, completable2).await();

await()方法将阻塞,直到所有作业完成(如果发生错误,将重新抛出异常)。 纯净而简单。

public void execute() {Completable completable1 = Completable.fromAction(job1::execute).subscribeOn(Schedulers.io());Completable completable2 = Completable.fromAction(job2::execute).subscribeOn(Schedulers.io());Completable.merge(completable1, completable2).await();
}

java.util.concurrent.CompletableFuture

有人可能会问:为什么不只使用CompletableFuture ? 这将是一个很好的问题。 Java 5中引入的纯Future可能需要我们做更多的工作,而ListenableFuture (来自Guava)和CompletableFuture (来自Java 8)使其变得微不足道。

首先,我们需要运行/安排作业执行。 接下来,使用CompletableFuture.allOf()方法,我们可以创建一个新的CompletableFuture ,它在所有作业完成时就完成了(我们以前没有看到过这个概念吗?)。 get()方法只是阻止等待。

public void execute() {try {CompletableFuture<Void> run1 = CompletableFuture.runAsync(job1::execute);CompletableFuture<Void> run2 = CompletableFuture.runAsync(job2::execute);CompletableFuture.allOf(run1, run2).get();} catch (InterruptedException | ExecutionException e) {throw new RuntimeException("Jobs execution failed", e);}
}

我们需要对受检查的异常做一些事情(很多时候我们不想使用它们来污染我们的API),但是总的来说,这看起来很明智。 但是,值得记住的是,当需要更复杂的链处理时, CompletableFuture不足。 另外,在我们的项目中已经使用RxJava时,使用相同(或相似)的API而不是引入全新的东西通常会很有用。

摘要

多亏了rx.Completable ,使用RxJava仅完成副作用(不返回任何内容)任务的执行更加轻松。 在已经使用RxJava的代码库中,即使在简单情况下,它也可能比CompletableFuture更受欢迎。 但是, Completable提供了许多先进的操作员和技术,此外,还可以轻松地将它与Observable混合使用,这使其功能更加强大。

要了解有关Completable更多信息,您可能需要查看发行说明 。 对于那些想深入了解主题的人,Advanced RxJava博客( 第1部分和第2 部分 )上有关于Completable API的非常详细的介绍。

  • 可以从GitHub获得代码示例的源代码。

顺便说一句,如果您总体上对RxJava感兴趣,我可以凭良心向您推荐一本书,该书目前由Tomasz Nurkiewicz和Ben Christensen – RxJava的React式编程编写 。

翻译自: https://www.javacodegeeks.com/2016/03/parallel-execution-blocking-tasks-rxjava-completable.html

rxjava 并行

rxjava 并行_使用RxJava和Completable并行执行阻塞任务相关推荐

  1. mysql innodb 并行_关于MySQL8.0 InnoDB并行执行的详解

    概述 MySQL经过多年的发展已然成为最流行的数据库,广泛用于互联网行业,并逐步向各个传统行业渗透.之所以流行,一方面是其优秀的高并发事务处理的能力,另一方面也得益于MySQL丰富的生态.MySQL在 ...

  2. 使用RxJava和Completable并行执行阻止任务

    借助RxJava 1.1.1中引入的Completable抽象,如何并行执行阻止"仅副作用"(也称为void)任务的并行执行变得更加容易. " 正如您可能已经注意到,阅读 ...

  3. rxjava 循环发送事件_使用RxJava和SseEmitter进行服务器发送的事件

    rxjava 循环发送事件 Spring Framework 4.2 GA即将发布,让我们看一下它提供的一些新功能. 引起我注意的一个事件是一个简单的新类SseEmitter ,它是对Spring M ...

  4. rxjava背压_如何形象地描述RxJava中的背压和流控机制?

    之前我在知乎上受邀回答过一个关于RxJava背压(Backpressure)机制的问题,今天我把它整理出来,希望对更多的人能有帮助. RxJava的官方文档中对于背压(Backpressure)机制比 ...

  5. selenium并行_如何在不同的浏览器中设置Selenium网格以并行执行

    selenium并行 到目前为止,Selenium是最常用的Web自动化测试工具. 如此受欢迎的原因之一是Selenium的自动跨浏览器测试功能. Selenium自动化测试可以帮助您在所有主要浏览器 ...

  6. rxjava 背压_背压加载文件– RxJava常见问题解答

    rxjava 背压 事实证明,将文件作为流进行处理非常有效且方便. 许多人似乎忘记了,自Java 8(3年以上!)以来,我们可以很容易地将任何文件变成一行代码: String filePath = & ...

  7. 冷热复位_冷热rx-java可观察

    冷热复位 我自己对"热和冷可观测"的理解还很不稳定,但这是我到目前为止所了解的! 冷观测 考虑一个返回rx-java Observable的API: import obs.Util ...

  8. rx 异步执行耗时_使用rx-java的异步抽象

    rx 异步执行耗时 对我而言,使用Rx-java的一大好处是,无论底层调用是同步还是异步,因此代码看起来都完全相同,因此该条目的标题也是如此. 考虑一个非常简单的客户代码用例,它执行三个缓慢运行的调用 ...

  9. java写的酷炫项目_基于RxJava实现酷炫启动页

    前言 RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs usi ...

最新文章

  1. PHP设置禁止目录索引,/var/www/html目录索引禁止
  2. opencv图像灰化_opencv读入图像、灰度化、归一化、向量化
  3. bzoj1094[ZJOI2007]粒子运动 计算几何
  4. Windows下,MySQL root用户忘记密码解决方案
  5. 11-6渐变的用途和设定技巧
  6. 科技爱好者周刊(第 199 期):俄罗斯的 HTTPS 证书问题
  7. 当在jup里面更新了数据源之后就 帅选不了数据 ,代码运行不了
  8. c站官网(c站官网客户端下载苹果)
  9. SM4350 背光控制--关闭XBL WLED
  10. EasyExcel v2.1.6单元格样式设置
  11. PL/SQL调试存储过程
  12. 操作系统教程第六版——第三章课后作业
  13. 基于springboot和mysql的人事管理系统设计与实现
  14. 2016蓝桥杯报纸页数(C++C组)
  15. (暂缓通过)机器人工程ROS方向应用型本科毕业设计重点课题学生验收成果
  16. 家用路由器选购指南基于2019年5月(二)
  17. 李成山与吴乾云、吴家胜借款合同纠纷案
  18. 美国推出世界首款可自动调音吉他机器人
  19. 考研复习计划(8.20-8.31)
  20. 转---工作两年后感悟的《大话西游》

热门文章

  1. P3244-[HNOI2015]落忆枫音【dp】
  2. P4777-[模板]扩展中国剩余定理(EXCRT)
  3. POJ1845-Sumdiv【逆元,等比数列,约数】
  4. Codeforces Round #657 (Div. 2)
  5. 洛谷 一种堆套路 P1631序列合并、P2085最小函数值
  6. 深入理解分布式系统中的缓存架构(下)
  7. Java8使用 Optional 处理 null
  8. Java Web应用的代码分层最佳实践
  9. Tengine-Ngnix高级版
  10. 【Android布局】控件布置