flatMap()和事件顺序– RxJava常见问题解答
正如我们已经发现的, flatMap()
不会保留原始流的顺序。 让我们使用上一篇文章的GeoNames API示例进行说明 :
public interface GeoNames {Flowable<Long> populationOf(String city);}
通过使用flatMap()
请求多个城市的人口,我们不能保证它们会按顺序到达:
Flowable<String> cities = Flowable.just("Warsaw", "Paris", "London", "Madrid");cities.flatMap(geoNames::populationOf).subscribe(response -> log.info("Population: {}", response));
输出有些令人惊讶:
17:09:49.838 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
17:09:49.838 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
17:09:49.838 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
17:09:49.838 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
17:09:49.939 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (98ms)
17:09:49.939 | Rx-3 | <-- 200 OK .../searchJSON?q=London (98ms)
17:09:49.956 | Rx-3 | Population: 7556900
17:09:49.958 | Rx-3 | Population: 3255944
17:09:51.099 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (1258ms)
17:09:51.100 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (1259ms)
17:09:51.100 | Rx-2 | Population: 2138551
17:09:51.100 | Rx-2 | Population: 1702139
一段时间后,我们收到马德里和伦敦的回复,随后又被订户收到。 首先是7556900(伦敦人口)和3255944(马德里)。不久之后,巴黎和华沙也到达了。 一方面,很好的是,我们可以在每个人口到达时立即进行处理。 这使系统看起来响应更快。 但是我们失去了一些东西。 输入流是"Warsaw"
, "Paris"
, "London"
, "Madrid"
而结果流包含"London"
, "Madrid"
, "Paris"
, "Warsaw"
人口。 我们如何分辨哪个数字代表哪个城市?
显然,以下解决方案是完全错误的 ,但在实际的代码库中并非闻所未闻:
Flowable<Long> populations = cities.flatMap(geoNames::populationOf);
cities.zipWith(populations, Pair::of).subscribe(response -> log.info("Population: {}", response));
它编译,运行,甚至产生一些结果。 不幸的是,这些结果是完全错误的:
17:20:03.778 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
17:20:03.778 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
17:20:03.778 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
17:20:03.778 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
17:20:03.953 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (172ms)
17:20:03.959 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (179ms)
17:20:03.975 | Rx-2 | Population: (Warsaw,2138551)
17:20:03.976 | Rx-2 | Population: (Paris,3255944)
17:20:03.988 | Rx-3 | <-- 200 OK .../searchJSON?q=London (207ms)
17:20:03.988 | Rx-3 | Population: (London,7556900)
17:20:04.080 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (299ms)
17:20:04.080 | Rx-1 | Population: (Madrid,1702139)
我们将城市与人口的随机排列组合在一起。 更糟的是,在尝试了十几次之后,我设法得到了错误的结果。 由于某种原因,该程序大多数时候都在我的机器上运行。 您可以想象的最糟糕的错误。
flatMap()
的问题在于它会释放原始请求。 想象一下一个异步系统,在该系统上您收到某种队列的响应,但不知道请求是什么。 一个明显的解决方案是以某种方式将某种关联ID甚至整个请求附加到响应中。 不幸的是, populationOf(String city)
不返回原始请求( city
),仅返回响应( population
)。 如果它返回诸如CityWithPopulation
值对象或Pair<String, Long>
类的东西, CityWithPopulation
容易得多。 现在,假设我们通过附加请求( city
)来增强原始方法:
Flowable<Pair<String, Long>> populationOfCity(String city) {Flowable<Long> population = geoNames.populationOf(city);return population.map(p -> Pair.of(city, p));
}
现在,我们可以将这种方法用于更多的城市:
cities.flatMap(this::populationOfCity).subscribe(response -> log.info("Population: {}", response));
…或避免使用额外的辅助方法:
cities.flatMap(city -> geoNames.populationOf(city).map(p -> Pair.of(city, p))).subscribe(response -> log.info("Population: {}", response));
这次的result
变量是Pair<String, Long>
但是建议您使用更具表现力的value对象。
17:20:03.778 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
17:20:03.778 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
17:20:03.778 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
17:20:03.778 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
17:20:03.953 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (172ms)
17:20:03.959 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (179ms)
17:20:03.975 | Rx-2 | Population: (Paris,2138551)
17:20:03.976 | Rx-2 | Population: (Madrid,3255944)
17:20:03.988 | Rx-3 | <-- 200 OK .../searchJSON?q=London (207ms)
17:20:03.988 | Rx-3 | Population: (London,7556900)
17:20:04.080 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (299ms)
17:20:04.080 | Rx-1 | Population: (Warsaw,1702139)
我发现带有嵌套map()
flatMap()
添加了额外的上下文,这是处理乱序结果的最有效方法。 当然,它不是最易读的反应式代码,因此请确保将这种复杂性隐藏在某些外观之下。
更新
正如DávidKarnok在对本文的评论中指出的那样, flatMap()
内的map()
运算符是一种常见习语,以至于存在专门的flatMap()
重载。 除了标准转换函数(在我们的示例中为(String, Long) -> SomeType
String -> Flowable<Long>
)之外,它还采用了组合器双功能(例如(String, Long) -> SomeType
)。 此功能的目的是提供一种将输入项与通过转换生成的每个输出项组合在一起的转换。 这正是我们对嵌套map()
所做的工作(将人口增加了对应的城市名称),但要短得多:
Flowable<Pair<String, Long>> populations = cities.flatMap(city -> geoNames.populationOf(city), (city, pop) -> Pair.of(city, pop));
第二个lambda表达式( (city, pop) -> Pair.of(city, pop)
)是对populationOf()
产生的每个下游事件执行的。 如果极端,可以使用方法引用:
Flowable<Pair<String, Long>> populations = cities.flatMap(geoNames::populationOf, Pair::of);
花一点时间研究最后一个示例,一旦您掌握了它,它实际上就非常简单:
- 为每个
city
找到人口pop
- 对于每个人口,通过形成
Pair<String, Long>
将其与city
结合起来
PS:这是9年中的第200个帖子!
翻译自: https://www.javacodegeeks.com/2017/08/flatmap-order-events-rxjava-faq.html
flatMap()和事件顺序– RxJava常见问题解答相关推荐
- flatMap()与concatMap()与concatMapEager()– RxJava常见问题解答
RxJava 2.x中共有三个无缝相似的运算符: flatMap() , concatMap()和concatMapEager() . 它们都接受相同的参数-从原始流的单个项目到任意类型的(子)流的函 ...
- flatmap_flatMap()与concatMap()与concatMapEager()– RxJava常见问题解答
flatmap RxJava 2.x中共有三个无缝相似的运算符: flatMap() , concatMap()和concatMapEager() . 它们都接受相同的参数-从原始流的单个项目到任意类 ...
- 并发查询parallel_惯用并发:flatMap()与parallel()– RxJava常见问题解答
并发查询parallel 简单,有效和安全的并发是RxJava的设计原则之一. 然而,具有讽刺意味的是,它可能是该库中最容易被误解的方面之一. 让我们举一个简单的例子:假设我们有一堆UUID并且对于每 ...
- 惯用并发:flatMap()与parallel()– RxJava常见问题解答
简单,有效和安全的并发是RxJava的设计原则之一. 然而,具有讽刺意味的是,它可能是该库中最容易被误解的方面之一. 让我们举一个简单的例子:假设我们有一堆UUID并且对于每个UUID ,我们必须执行 ...
- 技术停滞_检测和测试停滞的流– RxJava常见问题解答
技术停滞 假设您有一个流以不可预测的频率发布事件. 有时您可以预期每秒会有数十条消息,但是偶尔几秒钟都看不到任何事件. 如果您的流是通过Web套接字,SSE或任何其他网络协议传输的,则可能会出现问题. ...
- flowable背压 取消_使用Flowable.generate()生成可感知背压的流– RxJava常见问题解答...
flowable背压 取消 RxJava缺少创建无限自然数流的工厂. 这样的流很有用,例如,当您想通过压缩两个事件的顺序来为可能的无限事件流分配唯一的序列号时: Flowable<Long> ...
- 渴望 英语_渴望订阅– RxJava常见问题解答
渴望 英语 在教学和指导RxJava以及撰写本书之后 ,我注意到某些领域特别有问题. 我决定发布一些简短的提示,以解决最常见的陷阱. 这是第一部分. Observable和Flowable本质上是惰性 ...
- 检测和测试停滞的流– RxJava常见问题解答
假设您有一个流以不可预测的频率发布事件. 有时您可以预期每秒会有数十条消息,但是偶尔几秒钟都看不到任何事件. 如果您的流是通过Web套接字,SSE或任何其他网络协议传输的,则可能会出现问题. 静默时间 ...
- 渴望订阅– RxJava常见问题解答
在教学和指导RxJava以及撰写本书之后 ,我注意到某些领域尤其成问题. 我决定发布一些简短的提示,以解决最常见的陷阱. 这是第一部分. Observable和Flowable本质上是惰性的. 这意味 ...
最新文章
- html按钮线性炫光,6分钟实现CSS炫光倒影按钮 html+css
- 转Java转iOS-第一个项目总结(2):遇到问题和解决方案
- 转贴一篇很不错的有关ASP.NET Session的分析文章
- 软件测试江湖之公会武器之争
- 编辑器的合并用不了_Excel多工作簿合并为一个工作簿,10秒搞定,这才是最高效的方式...
- 烟台大学计算机专业调剂贴吧,烟台大学计算机与控制工程学院2021年考研复试与调剂的说明...
- 字符串格式化成时间格式_JAVA | 常用的日期/时间格式化方式
- 《中国人工智能学会通讯》——6.16 基于统计的推理方法
- Centos磁盘管理和文件系统管理
- 为了满足自己的好奇心,搞了一个业余项目耍,没想到还给我带来了$3000的收入......
- 干货:制造业中的机械智能(内附完整PPT)
- NoSQL和MemeryCache的出现意味着传统数据库使用方式的变革吗?(arvin-推荐--看评论)
- assoc在php中,在PHP中使用array_diff_assoc函数
- Node开发实践总结-定时脚本的设计与实现
- Tomcat无法用命令关闭
- 耶鲁大学计算机科学专业 录取,GRE309获得芝加哥大学计算机专业录取
- (初学者)使用DOSBox编写汇编程序
- 如何关闭office软件中字符的下划线和波浪线
- 报错:Unsatisfied dependency expressed through field 'xxxService';
- 【中英】mac电脑清理软件 ToolWiz Mac Boost