rxjava 并行_使用RxJava和Completable并行执行阻塞任务
rxjava 并行
通过RxJava 1.1.1中引入的Completable
抽象,如何并行执行阻止“仅副作用”(也称为void)任务的并行执行变得更加容易。 “
正如您可能已经注意到,阅读我的博客时,我主要专注于软件Craft.io和自动代码测试。 但是,此外,我还是持续交付和广义并发的狂热者。 最后一点从C中的纯线程和信号量到更高级别的解决方案(例如ReactiveX和actor模型)不等。 这次是全新RxJava 1.1.1 – rx.Completable
引入的非常方便(在特定情况下)功能的用例。 与我的许多博客条目类似,这也反映了我在处理实际任务和用例时遇到的实际事件。
要做的任务
想象一下,一个系统对来自不同来源的异步事件进行了非常复杂的处理。 过滤,合并,转换,分组,丰富等等。 RxJava非常适合这里,特别是如果我们想要React式的话。 假设我们已经实现了它(外观和效果很好),只剩下一件事了。 在开始处理之前,需要告知3个外部系统我们准备好接收消息。 对遗留系统的3个同步调用(通过RMI,JMX或SOAP)。 它们每个都可以持续几秒钟,我们需要等待所有它们之后才能开始。 幸运的是,它们已经实现,我们将它们视为可能成功(或因异常而失败)的黑匣子。 我们只需要调用它们(最好同时调用)并等待完成即可。
rx.Observable –方法1
触手可及的RxJava似乎是显而易见的方法。 首先,可以使用Observable
来包装作业执行:
private Observable<Void> rxJobExecute(Job job) {return Observable.fromCallable(() -> { job.execute();return null;});
}
不幸的是(在我们的例子中) Observable
期望返回一些元素。 我们需要使用Void
并且尴尬地return null
(而不是仅仅引用方法job::execute
。
接下来,我们可以使用subscribeOn()
方法来使用另一个线程来执行我们的作业(而不是阻塞主/当前线程–我们不想顺序执行我们的作业)。 Schedulers.io()
为调度Schedulers.io()
提供了一组用于IO绑定工作的线程。
Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());
Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());
最后,我们需要等待所有它们完成(所有Obvervable
s完成)。 为此,可以调整zip功能。 它结合了Obserbable
拉链发射的物品的序列号。 在我们的例子中,我们只对每个Observable
到的作业中的第一个伪项目感兴趣(我们仅发出null
以满足API),并以阻塞的方式等待它们。 zip运算符中的zip函数需要返回某些内容,因此我们需要重复null
的变通方法。
Observable.zip(run1, run2, (r1, r2) -> return null).toBlocking().single();
显而易见, Observable
设计为Observable
使用值流,并且需要进行一些额外的工作才能将其调整为仅产生副作用(不返回任何值)操作。 当我们需要将仅具有副作用的操作与其他返回一些值的值合并(例如合并)时,情况变得更加糟糕–需要更丑陋的转换。 请参阅RxNetty API的实际用例 。
public void execute() {Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());Observable.zip(run1, run2, (r1, r2) -> null).toBlocking().single();
}private Observable<Void> rxJobExecute(Job job) {return Observable.fromCallable(() -> { job.execute();return null;});
}
rx.Observable –方法2
可能会使用另一种方法。 代替生成人工项目,可以将我们的任务的空Observable
作为onComplete
操作执行。 这迫使我们从zip
操作切换到merge
。 结果,我们需要提供一个onNext
动作(对于空的Observable
永远不会执行),这肯定了我们试图破解该系统的信念。
public void execute() {Observable<Object> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());Observable<Object> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());Observable.merge(run1, run2).toBlocking().subscribe(next -> {});
}private Observable<Object> rxJobExecute(Job job) {return Observable.empty().doOnCompleted(job::execute);
}
rx.Completable
RxJava 1.1.1解决了对不返回任何值的Observable的更好支持。 Completable
可以视为Observable
的简化版本,可以成功完成(发出onCompleted
事件)或失败( onError
)。 创建Completable
实例的最简单方法是使用fromAction
方法,该方法采用不返回任何值的Action0
(例如Runnable
)。
Completable completable1 = Completable.fromAction(job1::execute).subscribeOn(Schedulers.io());
Completable completable2 = Completable.fromAction(job2::execute).subscribeOn(Schedulers.io());
接下来,我们可以使用merge()
方法,该方法返回一个Completable
实例,该实例立即订阅所有下游Completable
,并在它们全部完成(或其中一个失败)时完成。 由于我们在外部调度程序中使用了subscribeOn
方法,因此所有作业都是并行执行的(在不同的线程中)。
Completable.merge(completable1, completable2).await();
await()
方法将阻塞,直到所有作业完成(如果发生错误,将重新抛出异常)。 纯净而简单。
public void execute() {Completable completable1 = Completable.fromAction(job1::execute).subscribeOn(Schedulers.io());Completable completable2 = Completable.fromAction(job2::execute).subscribeOn(Schedulers.io());Completable.merge(completable1, completable2).await();
}
java.util.concurrent.CompletableFuture
有人可能会问:为什么不只使用CompletableFuture
? 这将是一个很好的问题。 Java 5中引入的纯Future
可能需要我们做更多的工作,而ListenableFuture
(来自Guava)和CompletableFuture
(来自Java 8)使其变得微不足道。
首先,我们需要运行/安排作业执行。 接下来,使用CompletableFuture.allOf()
方法,我们可以创建一个新的CompletableFuture
,它在所有作业完成时就完成了(我们以前没有看到过这个概念吗?)。 get()
方法只是阻止等待。
public void execute() {try {CompletableFuture<Void> run1 = CompletableFuture.runAsync(job1::execute);CompletableFuture<Void> run2 = CompletableFuture.runAsync(job2::execute);CompletableFuture.allOf(run1, run2).get();} catch (InterruptedException | ExecutionException e) {throw new RuntimeException("Jobs execution failed", e);}
}
我们需要对受检查的异常做一些事情(很多时候我们不想使用它们来污染我们的API),但是总的来说,这看起来很明智。 但是,值得记住的是,当需要更复杂的链处理时, CompletableFuture
不足。 另外,在我们的项目中已经使用RxJava时,使用相同(或相似)的API而不是引入全新的东西通常会很有用。
摘要
多亏了rx.Completable
,使用RxJava仅完成副作用(不返回任何内容)任务的执行更加轻松。 在已经使用RxJava的代码库中,即使在简单情况下,它也可能比CompletableFuture
更受欢迎。 但是, Completable
提供了许多先进的操作员和技术,此外,还可以轻松地将它与Observable
混合使用,这使其功能更加强大。
要了解有关Completable
更多信息,您可能需要查看发行说明 。 对于那些想深入了解主题的人,Advanced RxJava博客( 第1部分和第2 部分 )上有关于Completable API的非常详细的介绍。
- 可以从GitHub获得代码示例的源代码。
顺便说一句,如果您总体上对RxJava感兴趣,我可以凭良心向您推荐一本书,该书目前由Tomasz Nurkiewicz和Ben Christensen – RxJava的React式编程编写 。
翻译自: https://www.javacodegeeks.com/2016/03/parallel-execution-blocking-tasks-rxjava-completable.html
rxjava 并行
rxjava 并行_使用RxJava和Completable并行执行阻塞任务相关推荐
- mysql innodb 并行_关于MySQL8.0 InnoDB并行执行的详解
概述 MySQL经过多年的发展已然成为最流行的数据库,广泛用于互联网行业,并逐步向各个传统行业渗透.之所以流行,一方面是其优秀的高并发事务处理的能力,另一方面也得益于MySQL丰富的生态.MySQL在 ...
- 使用RxJava和Completable并行执行阻止任务
借助RxJava 1.1.1中引入的Completable抽象,如何并行执行阻止"仅副作用"(也称为void)任务的并行执行变得更加容易. " 正如您可能已经注意到,阅读 ...
- rxjava 循环发送事件_使用RxJava和SseEmitter进行服务器发送的事件
rxjava 循环发送事件 Spring Framework 4.2 GA即将发布,让我们看一下它提供的一些新功能. 引起我注意的一个事件是一个简单的新类SseEmitter ,它是对Spring M ...
- rxjava背压_如何形象地描述RxJava中的背压和流控机制?
之前我在知乎上受邀回答过一个关于RxJava背压(Backpressure)机制的问题,今天我把它整理出来,希望对更多的人能有帮助. RxJava的官方文档中对于背压(Backpressure)机制比 ...
- selenium并行_如何在不同的浏览器中设置Selenium网格以并行执行
selenium并行 到目前为止,Selenium是最常用的Web自动化测试工具. 如此受欢迎的原因之一是Selenium的自动跨浏览器测试功能. Selenium自动化测试可以帮助您在所有主要浏览器 ...
- rxjava 背压_背压加载文件– RxJava常见问题解答
rxjava 背压 事实证明,将文件作为流进行处理非常有效且方便. 许多人似乎忘记了,自Java 8(3年以上!)以来,我们可以很容易地将任何文件变成一行代码: String filePath = & ...
- 冷热复位_冷热rx-java可观察
冷热复位 我自己对"热和冷可观测"的理解还很不稳定,但这是我到目前为止所了解的! 冷观测 考虑一个返回rx-java Observable的API: import obs.Util ...
- rx 异步执行耗时_使用rx-java的异步抽象
rx 异步执行耗时 对我而言,使用Rx-java的一大好处是,无论底层调用是同步还是异步,因此代码看起来都完全相同,因此该条目的标题也是如此. 考虑一个非常简单的客户代码用例,它执行三个缓慢运行的调用 ...
- java写的酷炫项目_基于RxJava实现酷炫启动页
前言 RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs usi ...
最新文章
- PHP设置禁止目录索引,/var/www/html目录索引禁止
- opencv图像灰化_opencv读入图像、灰度化、归一化、向量化
- bzoj1094[ZJOI2007]粒子运动 计算几何
- Windows下,MySQL root用户忘记密码解决方案
- 11-6渐变的用途和设定技巧
- 科技爱好者周刊(第 199 期):俄罗斯的 HTTPS 证书问题
- 当在jup里面更新了数据源之后就 帅选不了数据 ,代码运行不了
- c站官网(c站官网客户端下载苹果)
- SM4350 背光控制--关闭XBL WLED
- EasyExcel v2.1.6单元格样式设置
- PL/SQL调试存储过程
- 操作系统教程第六版——第三章课后作业
- 基于springboot和mysql的人事管理系统设计与实现
- 2016蓝桥杯报纸页数(C++C组)
- (暂缓通过)机器人工程ROS方向应用型本科毕业设计重点课题学生验收成果
- 家用路由器选购指南基于2019年5月(二)
- 李成山与吴乾云、吴家胜借款合同纠纷案
- 美国推出世界首款可自动调音吉他机器人
- 考研复习计划(8.20-8.31)
- 转---工作两年后感悟的《大话西游》
热门文章
- P3244-[HNOI2015]落忆枫音【dp】
- P4777-[模板]扩展中国剩余定理(EXCRT)
- POJ1845-Sumdiv【逆元,等比数列,约数】
- Codeforces Round #657 (Div. 2)
- 洛谷 一种堆套路 P1631序列合并、P2085最小函数值
- 深入理解分布式系统中的缓存架构(下)
- Java8使用 Optional 处理 null
- Java Web应用的代码分层最佳实践
- Tengine-Ngnix高级版
- 【Android布局】控件布置