RxJava 2.x中共有三个无缝相似的运算符: flatMap()concatMap()concatMapEager() 。 它们都接受相同的参数-从原始流的单个项目到任意类型的(子)流的函数。 换句话说,如果您有Flowable<T>则可以为任意R类型提供从TFlowable<R>的函数。 应用任何这些运算符后,您最终得到Flowable<R> 。 那么它们有何不同?

样例项目

首先,让我们构建一个示例应用程序。 我们将使用Retrofit2 HTTP客户端包装器,该包装器具有RxJava2的内置插件。 我们的任务是利用GeoNames API来查找世界上任何城市的人口。 该界面如下所示:

public interface GeoNames {Flowable<Long> populationOf(String city);}

该接口的实现由Retrofit自动生成,向下滚动以查看胶粘源代码。 暂时假设我们有一个函数,该函数采用具有城市名称的String并异步返回具有该城市人口的单元素流。 还要假设我们有固定的城市要查找:

Flowable<String> cities = Flowable.just("Warsaw", "Paris", "London", "Madrid"
);

我们的目标是获取每个城市的人口。

带有concatMap()的示例应用程序如下所示:

cities.concatMap(geoNames::populationOf).subscribe(response -> log.info("Population: {}", response));

在看到结果之前,让我们研究一下concatMap()在做什么。 对于每个上游事件( 城市 ),它都调用一个函数,该函数用(子)流替换该事件。 在我们的情况下,它是Long的一元流( Flowable<Long> )。 因此,与所有运算符进行比较之后,我们最终得到的是Long流( Flowable<Flowable<Long>> )流。 当我们分析操作员为展平此类嵌套流所做的操作时,就会出现真正的区别。

concatMap()将首先订阅第一concatMap()流( Flowable<Long>代表华沙的人口)。 订阅实际上是指进行物理HTTP调用。 仅当第一concatMap()流完成时(在我们的情况下发出单个Long并发出完成信号), concatMap()才会继续。 继续意味着订阅第二个子流并等待其完成。 最后一个子流完成时,结果流完成。 这导致了随后的潮流:1702139,2138551,7556900和3255944。因此,恰好是华沙,巴黎,伦敦和马德里的人口。 输出顺序完全可以预测。 但是它也是完全顺序的。 完全没有并发发生,只有在第一个HTTP结束时才进行第二个HTTP调用。 RxJava所增加的复杂性根本没有回报:

23:33:33.531 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
23:33:33.656 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (123ms)
23:33:33.674 | Rx-1 | Population: 1702139
23:33:33.676 | Rx-1 | --> GET .../searchJSON?q=Paris http/1.1
23:33:33.715 | Rx-1 | <-- 200 OK .../searchJSON?q=Paris (38ms)
23:33:33.715 | Rx-1 | Population: 2138551
23:33:33.716 | Rx-1 | --> GET .../searchJSON?q=London http/1.1
23:33:33.754 | Rx-1 | <-- 200 OK .../searchJSON?q=London (37ms)
23:33:33.754 | Rx-1 | Population: 7556900
23:33:33.755 | Rx-1 | --> GET .../searchJSON?q=Madrid http/1.1
23:33:33.795 | Rx-1 | <-- 200 OK .../searchJSON?q=Madrid (40ms)
23:33:33.796 | Rx-1 | Population: 3255944

如您所见,没有多线程发生,请求是顺序的,彼此等待。 从技术上讲,并非所有这些都必须在同一线程中发生,但是它们绝不会重叠并且可以利用并发性。 最大的好处是可以保证结果事件的顺序,一旦我们进入flatMap() ,就不会那么明显了……

flatMap()代码几乎完全相同:

cities.flatMap(geoNames::populationOf).subscribe(response -> log.info("Population: {}", response));

就像之前一样,我们从Long流开始( Flowable<Flowable<Long>> )。 但是, flatMap()运算符不是一次又一次地订阅每个子流,而是急切地一次订阅所有子流。 这意味着我们看到在不同线程中同时启动多个HTTP请求:

00:10:04.919 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
00:10:04.919 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
00:10:04.919 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
00:10:04.919 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
00:10:05.449 | Rx-3 | <-- 200 OK .../searchJSON (529ms)
00:10:05.462 | Rx-3 | Population: 7556900
00:10:05.477 | Rx-1 | <-- 200 OK .../searchJSON (557ms)
00:10:05.478 | Rx-1 | Population: 1702139
00:10:05.751 | Rx-4 | <-- 200 OK .../searchJSON (831ms)
00:10:05.752 | Rx-4 | Population: 3255944
00:10:05.841 | Rx-2 | <-- 200 OK .../searchJSON (922ms)
00:10:05.843 | Rx-2 | Population: 2138551

当任何基础子流中的任何一个发出任何值时,它将立即向下游传递给订户。 这意味着我们现在可以在事件发生时即时处理事件。 请注意,结果流是乱序的。 我们收到的第一个事件是7556900,恰好是伦敦的人口,在第一流中排名第二。 与concatMap()相反, flatMap()无法保留顺序,因此以“随机”顺序发出值。 好吧,不是真正随机的,我们只是在它们可用时立即接收值。 在此特定执行中,首先是针对伦敦的HTTP响应,但绝对不能保证。 这导致一个有趣的问题。 我们有各种各样的人口价值流和最初的城市流。 但是,输出流可以是事件的任意排列,并且我们不知道哪个人口对应哪个城市。 我们将在后续文章中解决此问题。

concatMapEager()似乎带来了两全其美:并发性和输出事件的有保证顺序:

cities.concatMapEager(geoNames::populationOf).subscribe(response -> log.info("Population: {}", response));

在了解了concatMap()flatMap()功能之后,了解concatMapEager()相当简单。 急切地让流concatMapEager()流( duh! )同时预订所有子流。 但是,此运算符可确保首先传播第一个子流的结果,即使它不是要完成的第一个子流也是如此。 一个示例将Swift揭示这意味着什么:

00:34:18.371 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
00:34:18.371 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
00:34:18.371 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
00:34:18.371 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
00:34:18.517 | Rx-3 | <-- 200 OK .../searchJSON?q=London (143ms)
00:34:18.563 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (189ms)
00:34:18.565 | Rx-1 | Population: 1702139
00:34:20.460 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (2086ms)
00:34:20.460 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (2086ms)
00:34:20.461 | Rx-2 | Population: 2138551
00:34:20.462 | Rx-2 | Population: 7556900
00:34:20.462 | Rx-2 | Population: 3255944

我们立即启动四个HTTP请求。 从日志输出中,我们可以清楚地看到伦敦的居民首先被返回。 但是,订户没有收到它,因为华沙尚未到来。 巧合的是,华沙排名第二,因此华沙人口可以在下游传递给订户。 不幸的是,伦敦人口必须等待更多,因为首先我们需要巴黎人口。 巴黎(紧随其后是马德里)完成后,所有剩余结果都将传递到下游。

请注意,即使人口充足,伦敦的人口也必须等待休眠,直到华沙和巴黎完成。 那么concatMapEager()是最好的并发运算符吗? 不完全的。 想象一下,我们有一个数千个城市的列表,每一个城市我们都获取一张1MB的图片。 使用concatMap()我们可以依次(即缓慢concatMap()下载图片。 使用flatMap()可以同时下载图片,并在图片到达时尽快进行处理。 现在, concatMapEager()呢? 在最坏的情况下,我们可以使用concatMapEager()缓存999张图片,因为来自第一个城市的图片恰好是最慢的。 即使我们已经拥有99.9%的结果,但由于我们执行严格的排序,因此我们无法对其进行处理。

使用哪个运算符?

flatMap()应该是您的首选武器。 它允许与流行为进行有效的并发。 但是要准备好接收乱序的结果。 仅当提供的转换速度如此之快,顺序处理不是问题时, concatMap()才能很好地工作。 concatMapEager()非常方便,但是要注意内存消耗。 同样在最坏的情况下,您可能最终会闲置,等待很少的响应。

附录:配置Retrofit2客户端

实际上,我们在本文中始终使用的GeoNames服务接口如下所示:

public interface GeoNames {@GET("/searchJSON")Single<SearchResult> search(@Query("q") String query,@Query("maxRows") int maxRows,@Query("style") String style,@Query("username") String username);default Flowable<Long> populationOf(String city) {return search(city, 1, "LONG", "s3cret").map(SearchResult::getGeonames).map(g -> g.get(0)).map(Geoname::getPopulation).toFlowable();}}

非默认方法的实现由Retrofit2自动生成。 请注意,为简单起见, populationOf()返回一个元素的Flowable<Long> 。 但是,要完全拥抱此API的本质,在现实世界中,其他实现将更为合理。 首先, SearchResult类返回结果的有序列表(省略了获取器/设置器):

class SearchResult {private List<Geoname> geonames = new ArrayList<>();
}class Geoname {private double lat;private double lng;private Integer geonameId;private Long population;private String countryCode;private String name;
}

毕竟,世界上有许多华沙和伦敦 。 我们默默假设列表将包含至少一个元素,而第一个是正确的匹配。 更合适的实现应返回所有匹配,甚至返回更好的Maybe<Long>类型以反映没有匹配项:

default Maybe<Long> populationOf(String city) {return search(city, 1, "LONG", "nurkiewicz").flattenAsFlowable(SearchResult::getGeonames).map(Geoname::getPopulation).firstElement();
}

粘合代码如下所示。 首先Jackson的设置,以便解析来自API的响应:

import com.fasterxml.jackson.databind.ObjectMapper;private ObjectMapper objectMapper() {return new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
}

FAIL_ON_UNKNOWN_PROPERTIES通常是您想要的。 否则,您必须映射JSON响应中的所有字段,并且当API生产者引入新的或向后兼容的字段时,代码将中断。 然后我们设置OkHttpClient ,由Retrofit在下面使用:

import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;private OkHttpClient client() {HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);return new OkHttpClient.Builder().addInterceptor(interceptor).build();
}

有时您可以跳过OkHttp客户端的配置,但是我们添加了日志拦截器。 默认情况下,OkHttp使用java.util.logging日志记录,因此为了使用体面的日志记录框架,我们必须在开始时就安装网桥:

import org.slf4j.bridge.SLF4JBridgeHandler;static {SLF4JBridgeHandler.removeHandlersForRootLogger();SLF4JBridgeHandler.install();
}

最后进行改造:

import io.reactivex.schedulers.Schedulers;
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory;
import retrofit2.converter.jackson.JacksonConverterFactory;GeoNames createClient() {return new Retrofit.Builder().client(client()).baseUrl("http://api.geonames.org").addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io())).addConverterFactory(JacksonConverterFactory.create(objectMapper())).build().create(GeoNames.class);
}

调用createClient()将产生GeoNames接口的动态实现。 我们使用了以下依赖项:

compile 'io.reactivex.rxjava2:rxjava:2.0.6'compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
compile 'com.squareup.retrofit2:converter-jackson:2.0.1'
compile 'com.squareup.okhttp3:logging-interceptor:3.8.0'compile 'ch.qos.logback:logback-classic:1.1.7'
compile 'org.slf4j:slf4j-api:1.7.21'
compile 'org.slf4j:jul-to-slf4j:1.7.21'

翻译自: https://www.javacodegeeks.com/2017/08/flatmap-vs-concatmap-vs-concatmapeager-rxjava-faq.html

flatMap()与concatMap()与concatMapEager()– RxJava常见问题解答相关推荐

  1. flatmap_flatMap()与concatMap()与concatMapEager()– RxJava常见问题解答

    flatmap RxJava 2.x中共有三个无缝相似的运算符: flatMap() , concatMap()和concatMapEager() . 它们都接受相同的参数-从原始流的单个项目到任意类 ...

  2. 并发查询parallel_惯用并发:flatMap()与parallel()– RxJava常见问题解答

    并发查询parallel 简单,有效和安全的并发是RxJava的设计原则之一. 然而,具有讽刺意味的是,它可能是该库中最容易被误解的方面之一. 让我们举一个简单的例子:假设我们有一堆UUID并且对于每 ...

  3. 惯用并发:flatMap()与parallel()– RxJava常见问题解答

    简单,有效和安全的并发是RxJava的设计原则之一. 然而,具有讽刺意味的是,它可能是该库中最容易被误解的方面之一. 让我们举一个简单的例子:假设我们有一堆UUID并且对于每个UUID ,我们必须执行 ...

  4. flatMap()和事件顺序– RxJava常见问题解答

    正如我们已经发现的, flatMap()不会保留原始流的顺序. 让我们使用上一篇文章的GeoNames API示例进行说明 : public interface GeoNames {Flowable& ...

  5. 技术停滞_检测和测试停滞的流– RxJava常见问题解答

    技术停滞 假设您有一个流以不可预测的频率发布事件. 有时您可以预期每秒会有数十条消息,但是偶尔几秒钟都看不到任何事件. 如果您的流是通过Web套接字,SSE或任何其他网络协议传输的,则可能会出现问题. ...

  6. 渴望 英语_渴望订阅– RxJava常见问题解答

    渴望 英语 在教学和指导RxJava以及撰写本书之后 ,我注意到某些领域特别有问题. 我决定发布一些简短的提示,以解决最常见的陷阱. 这是第一部分. Observable和Flowable本质上是惰性 ...

  7. rxjava 背压_背压加载文件– RxJava常见问题解答

    rxjava 背压 事实证明,将文件作为流进行处理非常有效且方便. 许多人似乎忘记了,自Java 8(3年以上!)以来,我们可以很容易地将任何文件变成一行代码: String filePath = & ...

  8. 检测和测试停滞的流– RxJava常见问题解答

    假设您有一个流以不可预测的频率发布事件. 有时您可以预期每秒会有数十条消息,但是偶尔几秒钟都看不到任何事件. 如果您的流是通过Web套接字,SSE或任何其他网络协议传输的,则可能会出现问题. 静默时间 ...

  9. 渴望订阅– RxJava常见问题解答

    在教学和指导RxJava以及撰写本书之后 ,我注意到某些领域尤其成问题. 我决定发布一些简短的提示,以解决最常见的陷阱. 这是第一部分. Observable和Flowable本质上是惰性的. 这意味 ...

最新文章

  1. Unable to execute dex: Multiple dex files define Lcom/myapp/R$array;
  2. VMware 虚拟化编程(10) — VMware 数据块修改跟踪技术 CBT
  3. java Lock 源码分析
  4. 在浏览器里使用 SAP GUI
  5. 快手用旺旺瓶子做机器人_100品牌入榜,在快手的品牌运营怎么做?|11月快手品牌新势力榜揭晓...
  6. 【数学】Birthday
  7. python读取sqlserver的数据_Python实现读取SQLServer数据并插入到MongoDB数据库的方法示例...
  8. 用swing设计一个打地鼠小游戏_这7个风靡欧美的英语小游戏,学会胜过刷100道题!...
  9. UC将发布高性能HTML5游戏引擎X-Canvas
  10. 【Java数据结构与算法】第六章 算法的时间复杂度、算法的空间复杂度和排序算法的介绍
  11. Nginx启动报错误unlink() “nginx.pid” failed (2: No such file or directory)
  12. UNIX下DNS服务器之创建篇(下)
  13. 网易云音乐ios旧版本安装包_网易云音乐产品分析报告
  14. 【李宏毅2020 ML/DL】P11 Logistic Regression | 由逻辑回归中的特征转换巧妙引出“神经网络”的概念
  15. 单图像超分辨率重建总结
  16. Jzoj5460【NOIP2017提高A组冲刺11.7】士兵训练
  17. Global land use changes are four times greater than previously estimated
  18. 如何用Python画奥运五环——circle()
  19. sqlserver加上百分号_用一条sql语句显示数据百分比并加百分号
  20. 害怕,刷人超过70%?3招应对校招笔试|大厂笔试自救指南|应届生必看

热门文章

  1. 分表分库时机选择及策略
  2. Git使用中的一些奇技淫巧
  3. Spring面试题(第一期)
  4. 2021 程序媛跳槽记:学习计划篇(已收获字节等offer)
  5. java实现人脸识别(使用百度云V3版本)
  6. 2016蓝桥杯省赛---java---B---1(煤球数目)
  7. linux 编译 expat,关于expat库的编译
  8. 计算机应用基础期中上机考试,期中考试计算机应用基础试卷
  9. leetcode初级算法3.存在重复元素
  10. 用xshell传输jdk_在JDK 9中将InputStream传输到OutputStream