通过阅读本文你能 get 到以下点:

  • KeyGroup、KeyGroupRange 介绍
  • maxParallelism 介绍及采坑记
  • 数据如何映射到每个 subtask 上?
  • 任务改并发时,KeyGroup rescale 的过程

一、 KeyGroup、KeyGroupRange 介绍

Flink 中 KeyedState 恢复时,是按照 KeyGroup 为最小单元恢复的,每个 KeyGroup 负责一部分 key 的数据。这里的 key 指的就是 Flink 中 keyBy 中提取的 key。

每个 Flink 的 subtask 负责一部分相邻 KeyGroup 的数据,即一个 KeyGroupRange 的数据,有个 start 和 end(这里是闭区间)。

看到这里可能有点蒙,没关系后面有例子帮助读者理解这两个概念。

二、 maxParallelism 介绍及采坑记

1、最大并行度的概念

maxParallelism 表示当前算子设置的 maxParallelism,而不是 Flink 任务的并行度。maxParallelism 为 KeyGroup 的个数。

当设置算子的并行度大于 maxParallelism 时,有些并行度就分配不到 KeyGroup,此时 Flink 任务是无法从 Checkpoint 处恢复的。

2、maxParallelism 到底是多少呢?

如果设置了,就是设定的值。当然设置了,也需要检测合法性。如下图所示,Flink 要求 maxParallelism 应该介于 1 到 Short.MAX_VALUE 之间。

/*** Sets the maximum degree of parallelism defined for the program. The upper limit (inclusive)* is Short.MAX_VALUE.** <p>The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also* defines the number of key groups used for partitioned state.** @param maxParallelism Maximum degree of parallelism to be used for the program.,*              with 0 < maxParallelism <= 2^15 - 1*/public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) {Preconditions.checkArgument(maxParallelism > 0 &&maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,"maxParallelism is out of bounds 0 < maxParallelism <= " +KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);config.setMaxParallelism(maxParallelism);return this;}

如果没有设置,则 Flink 引擎会自动通过 KeyGroupRangeAssignment 类的 computeDefaultMaxParallelism 方法计算得出,computeDefaultMaxParallelism 源码如下所示:

/**根据算子的并行度计算 maxParallelism* 计算规则:* 1. 将算子并行度 * 1.5 后,向上取整到 2 的 n 次幂* 2. 跟 DEFAULT_LOWER_BOUND_MAX_PARALLELISM 相比,取 max* 3. 跟 UPPER_BOUND_MAX_PARALLELISM 相比,取 min*/
public static int computeDefaultMaxParallelism(int operatorParallelism) {checkParallelismPreconditions(operatorParallelism);return Math.min(Math.max(MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)),DEFAULT_LOWER_BOUND_MAX_PARALLELISM),UPPER_BOUND_MAX_PARALLELISM);
}

computeDefaultMaxParallelism 会根据算子的并行度计算 maxParallelism,计算规则:将算子并行度 * 1.5 后,向上取整到 2 的 n 次幂,同时保证计算的结果在最小值和最大值之间。

最小值 DEFAULT_LOWER_BOUND_MAX_PARALLELISM 是 2 的 7 次方 = 128。

最大值 UPPER_BOUND_MAX_PARALLELISM 是 2 的 15 次方 = 32768。

即:Flink 自动生成的 maxParallelism 介于 128 和 32768 之间。

采坑记

新开发的 Job 业务数据量较小,所以初期设置的并行度也会很小。同时没有给每个 Job 主动设置 maxParallelism,根据上面的规则,Flink 自动生成的 maxParallelism 为 128,后期随着业务数据量暴涨,当 Job 的并发数调大 128 以上时,发现 Job 无法从 Checkpoint 或 Savepoint 中恢复了,这就是所谓的 “并发调不上去了”。当然可以选择不从状态恢复,选择直接启动的方式去启动任务。但是有些 Flink 任务对状态是强依赖的,即:必须从 State 中恢复,对于这样的 Job 就不好办了。

所以按照开发规范,应该结合业务场景主动为每个 Job 设置合理的 maxParallelism,防止出现类似情况。

三、每个 key 应该分配到哪个 subtask 上运行?

根据 key 计算其对应的 subtaskIndex,即应该分配给哪个 subtask 运行,计算过程包括以下两步,源码都在相应的 KeyGroupRangeAssignment 类中:

  • 第一步:根据 key 计算其对应哪个 KeyGroup
  • 第二步:计算 KeyGroup 属于哪个并行度

第一步:根据 key 计算其对应哪个 KeyGroup

computeKeyGroupForKeyHash 源码如下所示:

/*** Assigns the given key to a key-group index.** @param keyHash the hash of the key to assign* @param maxParallelism the maximum supported parallelism, aka the number of key-groups.* @return the key-group to which the given key is assigned* 根据 Key 的 hash 值来计算其对应的 KeyGroup 的 index*/
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {return MathUtils.murmurHash(keyHash) % maxParallelism;
}

根据 Key 的 hash 值进行 murmurHash 后,对 maxParallelism 进行求余,就是对应的 KeyGroupIndex。

第二步:计算 KeyGroup 属于哪个并行度

computeOperatorIndexForKeyGroup 源码如下所示:

// 根据 maxParallelism、算子的并行度 parallelism 和 keyGroupId,
// 计算 keyGroupId 对应的 subtask 的 index
public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {return keyGroupId * parallelism / maxParallelism;
}

示例

假如 maxParallelism 为 50,parallelism 为 10,那么数据是如何分布的?

MathUtils.murmurHash(key.hashCode()) % maxParallelism:所有 key 的 hashCode 通过 Murmurhash 对 50 求余得到的范围为 0~49,也就是说:总共有 keyGroupId 为 0~49 的这 50 个 KeyGroup。

subtask 与 KeyGroupId 对应关系:

  • 0~4 号 KeyGroup 位于第 0 个 subtask。即:subtask0 处理 KeyGroupRange(0,4 ) 的数据
  • 5~9 号 KeyGroup 位于第 1 个 subtask。即:subtask1 处理 KeyGroupRange(5,9 ) 的数据
  • 10~14 号 KeyGroup 位于第 2 个 subtask。即:subtask2 处理 KeyGroupRange(10,14 ) 的数据
  • 15~19 号 KeyGroup 位于第 3 个 subtask。即:subtask3 处理 KeyGroupRange(15,19 ) 的数据
  • 。。。以此类推

这里我们看到了每个 subtask 对应一个 KeyGroupRange 的数据,且是闭区间。

计算某个并行度上负载哪些 KeyGroup?

计算某个并行度上负载哪些 KeyGroup?等价于求某个 subtask 负载的 KeyGroupRange。

在 KeyGroupRangeAssignment 类中有 computeKeyGroupRangeForOperatorIndex 方法可以完成这个操作:

// 根据 maxParallelism, parallelism 计算 operatorIndex 对应的 subtask 负责哪个范围的 KeyGroup
public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(int maxParallelism,int parallelism,int operatorIndex) {checkParallelismPreconditions(parallelism);checkParallelismPreconditions(maxParallelism);Preconditions.checkArgument(maxParallelism >= parallelism,"Maximum parallelism must not be smaller than parallelism.");int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;return new KeyGroupRange(start, end);
}

四、 Rescale 过程

如下图所示是 Flink 依赖 KeyGroup 修改并发的 Rescale 过程(并发度从 3 改成 4):

由图中可得知 key 的范围是 0~19, maxParallelism = 10。

0->{0,10} 表示 key 为 0 和 10 的数据,对应的 KeyGroupId 为 0。

1->{1,11} 表示 key 为 1 和 11 的数据,对应的 KeyGroupId 为 1。

以此类推。。。

修改并发前的映射关系

并发度是 3:

  • Subtask0 负责 KeyGroupRange(0,3)
  • Subtask1 负责 KeyGroupRange(4,6)
  • Subtask2 负责 KeyGroupRange(7,9)

修改并发后的映射关系

并发度是 4:

  • subtask0 负责 KeyGroupRange(0,2)
  • Subtask1 负责 KeyGroupRange(3,4)
  • Subtask2 负责 KeyGroupRange(5,7)
  • Subtask3 负责 KeyGroupRange(8,9)

maxParallelism 修改则任务不能恢复

KeyGroup 的数量为 maxParallelism,一旦 maxParallelism 变了,说明 KeyGroup 的分组完全变了,而 KeyedState 恢复是以 KeyGroup 为最小单元的,所以 maxParallelism 改变后,任务将无法恢复。在 Checkpoint 恢复过程中也会对新旧 Job 的 maxParallelism 进行检查匹配,如果某个算子的 maxParallelism 变了,则任务将不能恢复。

五、总结

本文主要介绍了 KeyGroup、KeyGroupRange 和 maxParallelism 的一些概念,及他们之间的关系。最后讲述了改并发的情况状态的 Rescale 流程。其实在 Flink 内部不只是状态恢复时需要用到 KeyGroup,数据 keyBy 后进行 shuffle 数据传输时也需要按照 KeyGroup 的规则来将分配数据,将数据分发到对应的 subtask 上。

本文比较简单,主要是为后续 State 恢复流程做一个铺垫。

Flink 源码: 从 KeyGroup 到 Rescale相关推荐

  1. Flink源码分析 - 源码构建

    本篇文章首发于头条号Flink源码分析 - 源码构建,欢迎关注我的头条号和微信公众号"大数据技术和人工智能"(微信搜索bigdata_ai_tech)获取更多干货,也欢迎关注我的C ...

  2. Flink 源码 | 自定义 Format 消费 Maxwell CDC 数据

    Flink 1.11 最重要的 Feature -- Hive Streaming 之前已经和大家分享过了,今天就和大家来聊一聊另一个特别重要的功能 -- CDC. CDC概述 何为CDC?Chang ...

  3. 【Flink】Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型

    1.概述 转载:Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型 相似文章:[Flink]Flink 基于 MailBox 实现的 StreamTask 线程模型 Fl ...

  4. 【Flink】Flink 源码阅读笔记(18)- Flink SQL 中的流和动态表

    1.概述 转载:Flink 源码阅读笔记(18)- Flink SQL 中的流和动态表

  5. 【Flink】Flink 源码阅读笔记(16)- Flink SQL 的元数据管理

    1.概述 转载:Flink 源码阅读笔记(16)- Flink SQL 的元数据管理 Flink 源码阅读笔记(17)- Flink SQL 中的时间属

  6. 【Flink】Flink 源码阅读笔记(15)- Flink SQL 整体执行框架

    1.概述 转载:Flink 源码阅读笔记(15)- Flink SQL 整体执行框架 在数据处理领域,无论是实时数据处理还是离线数据处理,使用 SQL 简化开发将会是未来的整体发展趋势.尽管 SQL ...

  7. 【Flink】 Flink 源码之 SQL 执行流程

    1.概述 转载:Flink 源码之 SQL 执行流程 2.前言 本篇为大家带来Flink执行SQL流程的分析.它的执行步骤概括起来包含: 解析.使用Calcite的解析器,解析SQL为语法树(SqlN ...

  8. 【Flink】Flink 源码之ExecutionGraph

    1.概述 以前的一个老文章基于 Flink 1.9版本的,现在是基于flink 1.13版本的. 参考:95-230-028-源码-WordCount走读-获取ExecutionGraph 本文转载: ...

  9. 【Flink】Flink 源码之OperatorChain

    1.概述 转载:Flink 源码之OperatorChain 前言 OperatorChain是Flink中一个很重要的优化措施,能够将尽可能多的满足条件的数据处理操作在一个slot中串联完成,从而最 ...

  10. 【Flink】Flink 源码之快照

    1.概述 转载:Flink 源码之快照 2.周期触发checkpoint的方法调用链 JobMaster.triggerSavepointSchedulerBase.startCheckpointSc ...

最新文章

  1. c# 调用SQL Server存储过程返回值(转)
  2. 下载vs2008 beta2的新方法
  3. 关于SQL 数据库表中的聚集索引和非聚集索引等
  4. python安装系统要求_python需要什么系统 | window重装系统教程
  5. [Python图像处理] 三.获取图像属性、兴趣ROI区域及通道处理
  6. 腾讯计费:助力游戏千亿级营收,覆盖180多个国家
  7. Silverlight 4.0添加鼠标右键菜单和Silverlight全屏模式的进入退出
  8. 解决vue单页路由跳转后scrollTop的问题
  9. Javascript取select的选中值和文本
  10. php mysql 失败_在php中插入失败的数据mysql
  11. idea cloud bootstrap是啥_application.yml与bootstrap.yml的区别
  12. CSS 动画指南: 原理和实战 (一)
  13. 弱电工程综合布线施工过程控制
  14. 基于Java毕业设计大学生旅游拼团网站源码+系统+mysql+lw文档+部署软件
  15. 【VBA宏编程】——Excel操作
  16. 记录使用Kettle导入excel数据心得
  17. 音箱后面接口 COM 8欧 70V 100V
  18. java 一元二次方程_java求解一元二次方程
  19. 阿里云服务器ECS windows server已开放端口但连不上的问题
  20. 从用户的角度看 java_[Java教程]开发网站要从用户的角度出发!

热门文章

  1. ios下拉效果滑动滚出页面
  2. word表格怎么缩小上下间距_word文档中表格怎么缩小行间距
  3. GAN的评价图像评价指标(IS和FID)
  4. ubuntu中安装vscode后创建快捷方式
  5. 二元一次方程编程解鸡兔同笼问题
  6. kc705进行DDR3扩容
  7. Vlan和Trank
  8. Inter Edsion添加USB有线网卡解决办法
  9. 2021年中国大企业创新百强排行榜:华为位居榜首,北京上榜企业最多(附年榜TOP100详单)
  10. DHT11温湿度传感器原理剖析