由于业务需求有的时候需要将多个数据源进行合并,Reactor提供了concat方法和merge方法:

concat方法示意图:

merge方法示意图:

从图中可以很清楚的看出这两种合并方法的不同:

  • concat是合并的flux,按照顺序分别运行,flux1运行完成以后再运行flux2
  • merge是同时运行,根据时间先后运行

下面对concat和merge相关的方法进行测试,先准备测试数据:

private Flux<Integer> flux1() {return Flux.range(1,4);
}private Flux<Integer> flux2() {return Flux.range(5,8);
}private Flux<String> hotFlux1() {return flux1().map(i-> "[1]"+i).delayElements(Duration.ofMillis(10));
}private Flux<String> hotFlux2() {return flux2().map(i-> "[2]"+i).delayElements(Duration.ofMillis(4));
}

concat相关方法

concat代码演示

    @Testpublic void concatTest() throws InterruptedException {Flux.concat(hotFlux1(), hotFlux2()).subscribe(i -> System.out.print("->"+i));Thread.sleep(200);}

运行结果

  • 从结果可以看出先运行完flux1之后再运行flux2

concatWith方法

用法和concat基本相同,写法略有不同:

@Test
public void concatWithTest () {flux1().concatWith(flux2()).log().subscribe();
}

merge相关方法

merge代码演示

@Test
public void mergeTest() throws InterruptedException {Flux.merge(hotFlux1(), hotFlux2()).subscribe(i -> System.out.print("->"+i));Thread.sleep(200);
}

运行结果

  • 很明显顺序和concat的区别,是按照时间先后执行

mergeWith用法

  • 用法和merge相同,写法不同而已
@Test
public void mergeWithTest() throws InterruptedException {hotFlux1().mergeWith(hotFlux2()).subscribe(i -> System.out.print("->"+i));Thread.sleep(200);
}

mergeSequential用法

  • 跟concat有些相似,得到的结果类似
  • 跟concat不同在于,订阅的源是hot型,接收数据后根据订阅顺序重新排序
@Test
public void mergeSequentialTest() throws InterruptedException {Flux.mergeSequential(hotFlux1(), hotFlux2()).subscribe(i -> System.out.print("->"+i));Thread.sleep(200);
}

结果和concat的一样都是

mergeOrdered用法

  • 合并接收之后再排序
    @Testpublic void mergeOrderedTest() throws InterruptedException {Flux.mergeOrdered(Comparator.reverseOrder(), hotFlux1(), hotFlux2()).subscribe(i -> System.out.print("->"+i));Thread.sleep(200);}

combineLatest用法

  • 跟concat和merge不同该方法是将多个源的最后得到元素通过函数进行融合的到新的值

代码示例

@Test
public void combineLatestTest() throws InterruptedException {Flux.combineLatest(hotFlux1(), hotFlux2(), (v1, v2) -> v1 + ":" + v2).subscribe(i -> System.out.print("->"+i));Thread.sleep(200);
}

运行结果

  • 结果都是flux1和flux2中元素进行融合之后的元素

代码

github

gitee

Reactor 3 (10): 数据合并concat、merge相关推荐

  1. 04_pandas字符串函数;数据合并concat、merge;分组groupby;Reshaping;Pivot tables;时间处理(date_range、tz_localize等)

    字符串函数,Series的lower()函数 Series在str属性中提供了一组字符串处理方法,可以方便地对数组中的每个元素进行操作,如下面的代码片段所示.请注意,str中的模式匹配通常默认使用正则 ...

  2. 【数据科学】05 数据合并(merge、concat、combine)与数据清洗(缺失值、重复值、内容和格式)

    文章目录 1. 数据合并 1.1 merge()合并 1.2 concat()合并 1.3 combine()合并 2. 数据清洗 2.1 缺失值 2.2 重复值 2.3 内容与格式清洗 1. 数据合 ...

  3. Pandas 中DataFrame 数据合并 Contract | Merge

    最近在工作中,遇到了数据合并.连接的问题,故整理如下,供需要者参考~ 参考自:象在舞:https://blog.csdn.net/gdkyxy2013/article/details/80785361 ...

  4. [Pandas] 数据合并 pd.merge

    实现类似SQL的join操作,通过pd.merge()方法可以自由灵活地操作各种逻辑的数据连接.合并等操作 可以将两个DataFrame或Series合并,最终返回一个合并后的DataFrame 语法 ...

  5. pandas相关函数sort_values、字符串处理、index、merge、数据合并cancat、groupby分组统计

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言 一.排序函数sort_values() 二.字符串处理 1.介绍 2.代码介绍 3.Categorical类型降低数据 ...

  6. bind merge r 和join_R语言中的数据合并函数(merge,cbind和rbind)的使用

    R语言中的数据合并函数(merge,cbind和rbind)的使用-R语言中用cbind() 和rbind() 构建分块矩阵 1.merge函数 两个数据框拥有相同的时间或观测值,但这些列却不尽相同. ...

  7. pandas 数据合并 pd.join() pd.merge() pd.crosstab() pd.concat()

    文章目录 pd.join() pd.merge() pd.merge(left, right, how='inner', left_on=None, right_on=None...)形式 按照一列进 ...

  8. 数据合并之concat、append、merge和join

    Pandas 是一套用于 Python 的快速.高效的数据分析工具.它可以用于数据挖掘和数据分析,同时也提供数据清洗功能.本文将详细讲解数据合并与连接,目录如下: ① concat 一.定义 conc ...

  9. Python学习笔记---merge和concat数据合并(1)

    Python学习笔记-merge和concat数据合并(1) Python学习笔记-merge和concat数据合并(2) 文章目录 Python学习笔记---merge和concat数据合并(1) ...

最新文章

  1. 序列模式挖掘、频繁项集与频繁序列
  2. python os 获取当前路径的绝对路径的上层目录_os 模块常用函数
  3. 【杂谈】循序渐进,有三AI不得不看的技术综述(超过100篇核心干货)
  4. Objective-C MacOS以管理员权限运行程序
  5. 关于使用AsyncTaskLoader的使用
  6. webkit中对incomplete type指针的处理技巧
  7. python 删除文件 通配符_python 实现删除文件或文件夹实例详解
  8. php全词查询,php 关键词查询的实现方法
  9. html制作我和我的家乡,《我和我的家乡》怎么参与投资?投资流程是什么?
  10. 解决:Maven resources compiler: Maven project configuration required for module ‘c,不识别Maven项目
  11. 如何调节idea jvm的最大内存_那个小白还没搞懂内存溢出,只能用案例说给他听了
  12. 游戏设计梦工厂学习笔记(一)
  13. JT/T 808-2019、JT/T 809-2019文档分享
  14. 零知识证明 SNARKs C++库:libsnark教程
  15. 判断一个数是否为质数的三种方法
  16. Android扫描系统文件,安卓文档扫描仪
  17. 远程连接阿里云服务器一直显示连接失败原因
  18. 微信小程序图片放大预览效果的实现,轮播图点击放大预览
  19. scipy稀疏矩阵lil_array
  20. Python 3.12 目标:还可以更快!

热门文章

  1. matlab中二进制数运算规则,matlab二进制运算
  2. C++入门第一课:HelloWorld
  3. android+振动器权限,android振动器(Vibrator)
  4. 今天教教你怎么一键ai绘画卡通人物
  5. Objective-C introduction
  6. CSS3基础:字体图标icon的使用
  7. Python描述 LeetCode 70. 爬楼梯
  8. java自动生成sql语句
  9. 【概率论】随机变量 Random Variable
  10. 基于MYSQL的互联网药品交易系统数据库设计项目实战