并发查询parallel_惯用并发:flatMap()与parallel()– RxJava常见问题解答
并发查询parallel
简单,有效和安全的并发是RxJava的设计原则之一。 然而,具有讽刺意味的是,它可能是该库中最容易被误解的方面之一。 让我们举一个简单的例子:假设我们有一堆UUID
并且对于每个UUID
,我们必须执行一组任务。 第一个问题是每个UUID
都要执行I / O密集型操作,例如,从数据库加载对象:
Flowable<UUID> ids = Flowable.fromCallable(UUID::randomUUID).repeat().take(100);ids.subscribe(id -> slowLoadBy(id));
首先,为了测试,我将生成100个随机UUID。 然后,对于每个UUID,我想使用以下方法加载记录:
Person slowLoadBy(UUID id) {//...
}
slowLoadBy()
的实现是无关紧要的,请记住它是缓慢且阻塞的。 使用subscribe()
调用slowLoadBy()
有许多缺点:
subscribe()
根据设计是单线程的,无法解决。 每个UUID
顺序加载- 当您调用
subscribe()
,无法进一步转换Person
对象。 这是终端操作
一种更健壮,甚至更残破的方法是map()
每个UUID
:
Flowable<Person> people = ids.map(id -> slowLoadBy(id)); //BROKEN
这是非常可读的,但不幸的是损坏了。 就像订阅者一样,运算符也是单线程的。 这意味着在任何给定时间只能映射一个UUID
,此处也不允许并发。 更糟糕的是,我们从上游继承线程/工作者。 这有几个缺点。 如果上游使用某些专用的调度程序产生事件,我们将劫持该调度程序中的线程。 例如,许多操作符(例如interval()
Schedulers.computation()
透明地使用Schedulers.computation()
线程池。 我们突然开始在完全不适合该功能的池上执行I / O密集型操作。 此外,我们通过这一阻塞性顺序步骤降低了整个管道的速度。 非常非常糟糕。
您可能已经听说过这个subscribeOn()
运算符,以及它如何启用并发。 确实,但是在应用它时必须非常小心。 以下示例(再次)是错误的 :
import io.reactivex.schedulers.Schedulers;Flowable<Person> people = ids.subscribeOn(Schedulers.io()).map(id -> slowLoadBy(id)); //BROKEN
上面的代码段仍然损坏。 observeOn()
subscribeOn()
(以及该事件的observeOn()
)几乎不会将执行切换到其他工作程序(线程),而不会引入任何并发性。 流仍然按顺序处理所有事件,但是在不同的线程上。 换句话说,我们现在不是在从上游继承的线程上顺序使用事件,而是在io()
线程上顺序使用事件。 那么,这个神话般的flatMap()
运算符呢?
flatMap()
运算符可以进行救援
flatMap()
运算符通过将事件流分成子流来启用并发。 但首先,还有一个破碎的示例:
Flowable<Person> asyncLoadBy(UUID id) {return Flowable.fromCallable(() -> slowLoadBy(id));
}Flowable<Person> people = ids.subscribeOn(Schedulers.io()).flatMap(id -> asyncLoadBy(id)); //BROKEN
哦,天哪,这还是坏了 ! flatMap()
运算符在逻辑上做两件事:
- 在每个上游事件上应用转换(
id -> asyncLoadBy(id)
)–这将产生Flowable<Flowable<Person>>
。 这是有道理的,对于每个上游UUID
我们都有一个Flowable<Person>
因此最终得到的是Person
对象流 - 然后
flatMap()
尝试一次订阅所有这些内部子流。 每当任何子流发出Person
事件时,它都会作为外部Flowable
的结果透明传递。
从技术上讲, flatMap()
仅创建并预订前128个(默认情况下,可选的maxConcurrency
参数)子流。 同样,当最后一个子流完成时, Person
外部流也将完成。 现在,这到底为什么被打破? 除非明确要求,否则RxJava不会引入任何线程池。 例如,这段代码仍在阻塞:
log.info("Setup");
Flowable<String> blocking = Flowable.fromCallable(() -> {log.info("Starting");TimeUnit.SECONDS.sleep(1);log.info("Done");return "Hello, world!";});
log.info("Created");
blocking.subscribe(s -> log.info("Received {}", s));
log.info("Done");
仔细查看输出,特别是涉及的事件和线程的顺序:
19:57:28.847 | INFO | main | Setup
19:57:28.943 | INFO | main | Created
19:57:28.949 | INFO | main | Starting
19:57:29.954 | INFO | main | Done
19:57:29.955 | INFO | main | Received Hello, world!
19:57:29.957 | INFO | main | Done
没有任何并发,没有额外的线程。 仅将阻塞代码包装在Flowable
中不会神奇地增加并发性。 您必须显式使用… subscribeOn()
:
log.info("Setup");
Flowable<String> blocking = Flowable.fromCallable(() -> {log.info("Starting");TimeUnit.SECONDS.sleep(1);log.info("Done");return "Hello, world!";}).subscribeOn(Schedulers.io());
log.info("Created");
blocking.subscribe(s -> log.info("Received {}", s));
log.info("Done");
这次的输出更有希望:
19:59:10.547 | INFO | main | Setup
19:59:10.653 | INFO | main | Created
19:59:10.662 | INFO | main | Done
19:59:10.664 | INFO | RxCachedThreadScheduler-1 | Starting
19:59:11.668 | INFO | RxCachedThreadScheduler-1 | Done
19:59:11.669 | INFO | RxCachedThreadScheduler-1 | Received Hello, world!
但是我们上次确实使用了subscribeOn()
,这是怎么回事? 嗯,外部流级别的subscribeOn()
基本上说所有事件都应在此流中的不同线程上顺序处理。 我们并没有说应该同时运行许多子流。 并且由于所有子流都处于阻塞状态,因此当RxJava尝试订阅所有子流时,它会有效地依次依次订阅。 asyncLoadBy()
并不是真正的async ,因此当flatMap()
运算符尝试对其进行订阅时,它会阻塞。 修复很容易。 通常,您会将subscribeOn()
放在asyncLoadBy()
但出于教育目的,我将其直接放置在asyncLoadBy()
道中:
Flowable<Person> people = ids.flatMap(id -> asyncLoadBy(id).subscribeOn(Schedulers.io()));
现在它就像一个魅力! 默认情况下,RxJava将接收前128个上游事件( UUID
),将它们转换为子流并订阅所有这些事件。 如果子流是异步且高度可并行化的(例如,网络调用), asyncLoadBy()
获得128个并发调用asyncLoadBy()
。 并发级别(128)可通过maxConcurrency
参数配置:
Flowable<Person> people = ids.flatMap(id ->asyncLoadBy(id).subscribeOn(Schedulers.io()),10 //maxConcurrency);
那是很多工作,您不觉得吗? 并发不应该更具声明性吗? 我们不再处理Executor
和期货,但似乎这种方法太容易出错。 它不能像Java 8流中的parallel()
一样简单吗?
输入ParallelFlowable
让我们首先来看一下我们的示例,并通过添加filter()
使它更加复杂:
Flowable<Person> people = ids.map(this::slowLoadBy) //BROKEN.filter(this::hasLowRisk); //BROKEN
hasLowRisk()
是慢速谓词:
boolean hasLowRisk(Person p) {//slow...
}
我们已经知道,针对此问题的惯用方法是使用flatMap()
两次:
Flowable<Person> people = ids.flatMap(id -> asyncLoadBy(id).subscribeOn(io())).flatMap(p -> asyncHasLowRisk(p).subscribeOn(io()));
asyncHasLowRisk()
相当模糊-谓词通过时返回单元素流,失败则返回空流。 这是使用flatMap()
模拟filter()
flatMap()
。 我们可以做得更好吗? 从RxJava 2.0.5开始,有一个新的运算符叫做… parallel()
! 令人惊讶的是,由于许多误解和滥用,在RxJava成为1.0之前已删除了同名的运算符。 2.x中的parallel()
似乎最终以一种安全且声明性的方式解决了惯用并发问题。 首先,让我们看一些漂亮的代码!
Flowable<Person> people = ids.parallel(10).runOn(Schedulers.io()).map(this::slowLoadBy).filter(this::hasLowRisk).sequential();
就这样! parallel()
和sequential()
之间的代码块parallel()
运行。 我们有什么在这里? 首先,新的parallel()
运算符将Flowable<UUID>
转换为ParallelFlowable<UUID>
,该API的API比Flowable小得多。 您将在第二秒看到原因。 可选的int
参数(在我们的例子中为10
)定义并发性,或者(如文档所述)定义创建并发“ rails”的数量。 因此,对于我们来说,我们将单个Flowable<Person>
分成10个并发的独立轨道(认为是thread )。 来自UUID
原始流的事件被拆分( modulo 10
)为彼此独立的不同轨,子流。 将它们视为将上游事件发送到10个单独的线程中。 但是首先我们必须使用方便的runOn()
运算符定义这些线程的来源。 这比Java 8流上的parallel()
好得多,在Java 8流上,您无法控制并发级别。
至此,我们有了一个ParallelFlowable
。 当事件出现在上游( UUID
)中时,它将委派给10个“轨道”,并发,独立的管道之一。 管道提供了可以安全地同时运行的运算符的有限子集,例如map()
和filter()
,还包括reduce()
。 没有buffer()
, take()
等,因为一次在多个子流上调用它们的语义尚不清楚。 我们的阻塞slowLoadBy()
和hasLowRisk()
仍按顺序调用,但仅在单个“ rail”内部。 因为我们现在有10个并发的“ rails”,所以我们无需花费太多精力就可以有效地并行化它们。
当事件到达子流(“轨道”)的末尾时,它们会遇到sequential()
运算符。 该运算符将ParallelFlowable
回Flowable
。 只要我们的映射器和过滤器是线程安全的, parallel()
/ sequential()
对就提供了非常简单的并行化流的方法。 一个小警告-您将不可避免地使邮件重新排序。 顺序map()
和filter()
始终保留顺序(就像大多数运算符一样)。 但是,一旦在parallel()
块中运行它们,顺序就会丢失。 这允许更大的并发性,但是您必须牢记这一点。
是否应该使用parallel()
而不是嵌套的flatMap()
来并行化代码? 这取决于您,但是parallel()
似乎更容易阅读和掌握。
翻译自: https://www.javacodegeeks.com/2017/09/idiomatic-concurrency-flatmap-vs-parallel-rxjava-faq.html
并发查询parallel
并发查询parallel_惯用并发:flatMap()与parallel()– RxJava常见问题解答相关推荐
- 惯用并发:flatMap()与parallel()– RxJava常见问题解答
简单,有效和安全的并发是RxJava的设计原则之一. 然而,具有讽刺意味的是,它可能是该库中最容易被误解的方面之一. 让我们举一个简单的例子:假设我们有一堆UUID并且对于每个UUID ,我们必须执行 ...
- flatMap()与concatMap()与concatMapEager()– RxJava常见问题解答
RxJava 2.x中共有三个无缝相似的运算符: flatMap() , concatMap()和concatMapEager() . 它们都接受相同的参数-从原始流的单个项目到任意类型的(子)流的函 ...
- flatMap()和事件顺序– RxJava常见问题解答
正如我们已经发现的, flatMap()不会保留原始流的顺序. 让我们使用上一篇文章的GeoNames API示例进行说明 : public interface GeoNames {Flowable& ...
- oracle并行和并发,Oracle 并发查询
需要打开计时器的话可以参考 set timing on; 在开始之前需要强调的是 任何并发操作都是以硬件换速度, 如果没有足够的硬件不要轻易使用并发,另外如果是大访问量的网站系统请慎用,数据库并发查询 ...
- apache lucene_Apache Lucene中的并发查询执行
apache lucene Apache Lucene是一个出色的并发纯Java搜索引擎,如果您愿意,它可以轻松地使服务器上的可用CPU或IO资源饱和. "典型" Lucene应用 ...
- Apache Lucene中的并发查询执行
Apache Lucene是一个出色的并发纯Java搜索引擎,如果您愿意,它可以轻松地使服务器上的可用CPU或IO资源饱和. "典型" Lucene应用程序的并发模型在搜索时每个查 ...
- 《MySQL tips:并发查询与并发连接区别》
并发连接与并发查询,并不是一个概念. 在执行show processlist的结果里,看到了几千个连接,指的是并发连接. 而"当前正在执行"的语句,才是并发查询. 并发连接数多影响 ...
- 先查询后修改并发的时候sql_如何解决并发场景下扣款的数据一致性问题?
1.场景介绍 场景1:扣费,企业账户送流量或者红包,用户签到领取.此场景下就是多用户对某一个账号的并发扣款: 场景2:充值,打赏给主播,这种场景是多用户对同一个账号进行打款,但是方案和问题和场景1是一 ...
- mysql 亿级高并发_亿级流量系统架构之如何设计每秒十万查询的高并发架构.md
亿级流量系统架构之如何设计每秒十万查询的高并发架构 一.前情回顾 上篇文章(亿级流量系统架构之如何设计承载百亿流量的高性能架构)聊了一下系统架构中,百亿流量级别高并发写入场景下,如何承载这种高并发写入 ...
最新文章
- linux平台 使用dlopen接口调用HelloWorld动态库简单实例
- 25匹马,找出最快的3匹,但是只有5个赛道,每次比赛只能得到5匹马的速度排序,那么最少需要多少次比赛
- Matlab subs函数的用法
- 浅析 React Fiber
- puppet kick 功能
- 【MyBatis框架】mybatis逆向工程自动生成代码
- C#出题库项目的总结(1)
- 初识多线程之基础知识与常用方法
- Qt的QDataStream
- 大意导致Java访问DB2库时导出SQLCODE=-301, SQLSTATE=07006错误
- prometheus 异常退出 报错:opening storage failed
- rimraf : 无法加载文件 C:\Users\Admin\AppData\Roaming\npm\rimraf.ps1,因为在此系统上禁止运行脚本。有关详细信息,请参阅
- 小红书笔记无法展示是什么原因?让我们来看看吧
- android listview仿iphone通讯录ios 3dTouch
- 2022全国高职院校教师真实薪酬数据汇总
- matlab 棋盘格畸变矫正
- Java学习之路-数字和日期处理
- GDT(全居描述符表)和LDT(局部描述符表)
- AT88SC0104C读写程序
- anacoda里面安装包显示失败_Premiere2020安装包下载及安装教程(附pr2020配置要求)...