1.概述

转载:Flink 源码:从 KeyGroup 到 Rescale

通过阅读本文你能 get 到以下点:

KeyGroup、KeyGroupRange 介绍
maxParallelism 介绍及采坑记
数据如何映射到每个 subtask 上?
任务改并发时,KeyGroup rescale 的过程

2.KeyGroup、KeyGroupRange 介绍

Flink 中 KeyedState 恢复时,是按照 KeyGroup 为最小单元恢复的,每个 KeyGroup 负责一部分 key 的数据。这里的 key 指的就是 Flink 中 keyBy 中提取的 key。

每个 Flink 的 subtask 负责一部分相邻 KeyGroup 的数据,即一个 KeyGroupRange 的数据,有个 start 和 end(这里是闭区间)。

看到这里可能有点蒙,没关系后面有例子帮助读者理解这两个概念。

3.maxParallelism 介绍及采坑记

3.1 最大并行度的概念

maxParallelism 表示当前算子设置的 maxParallelism,而不是 Flink 任务的并行度。maxParallelism 为 KeyGroup 的个数。

当设置算子的并行度大于 maxParallelism 时,有些并行度就分配不到 KeyGroup,此时 Flink 任务是无法从 Checkpoint 处恢复的。

3.2 maxParallelism 到底是多少呢?

如果设置了,就是设定的值。当然设置了,也需要检测合法性。如下图所示,Flink 要求 maxParallelism 应该介于 1 到 Short.MAX_VALUE 之间。


如果没有设置,则 Flink 引擎会自动通过 KeyGroupRangeAssignment 类的 computeDefaultMaxParallelism 方法计算得出,computeDefaultMaxParallelism 源码如下所示:

/**根据算子的并行度计算 maxParallelism* 计算规则:* 1. 将算子并行度 * 1.5 后,向上取整到 2 的 n 次幂* 2. 跟 DEFAULT_LOWER_BOUND_MAX_PARALLELISM 相比,取 max* 3. 跟 UPPER_BOUND_MAX_PARALLELISM 相比,取 min*/
public static int computeDefaultMaxParallelism(int operatorParallelism) {checkParallelismPreconditions(operatorParallelism);return Math.min(Math.max(MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)),DEFAULT_LOWER_BOUND_MAX_PARALLELISM),UPPER_BOUND_MAX_PARALLELISM);
}

computeDefaultMaxParallelism 会根据算子的并行度计算 maxParallelism,计算规则:将算子并行度 * 1.5 后,向上取整到 2 的 n 次幂,同时保证计算的结果在最小值和最大值之间。

最小值 DEFAULT_LOWER_BOUND_MAX_PARALLELISM 是 2 的 7 次方 = 128。

最大值 UPPER_BOUND_MAX_PARALLELISM 是 2 的 15 次方 = 32768。

即:Flink 自动生成的 maxParallelism 介于 128 和 32768 之间。

3.3 采坑记

新开发的 Job 业务数据量较小,所以初期设置的并行度也会很小。同时没有给每个 Job 主动设置 maxParallelism,根据上面的规则,Flink 自动生成的 maxParallelism 为 128,后期随着业务数据量暴涨,当 Job 的并发数调大 128 以上时,发现 Job 无法从 Checkpoint 或 Savepoint 中恢复了,这就是所谓的 “并发调不上去了”。当然可以选择不从状态恢复,选择直接启动的方式去启动任务。但是有些 Flink 任务对状态是强依赖的,即:必须从 State 中恢复,对于这样的 Job 就不好办了。

所以按照开发规范,应该结合业务场景主动为每个 Job 设置合理的 maxParallelism,防止出现类似情况。

4.每个 key 应该分配到哪个 subtask 上运行?

根据 key 计算其对应的 subtaskIndex,即应该分配给哪个 subtask 运行,计算过程包括以下两步,源码都在相应的 KeyGroupRangeAssignment 类中:

  • 第一步:根据 key 计算其对应哪个 KeyGroup
  • 第二步:计算 KeyGroup 属于哪个并行度

第一步:根据 key 计算其对应哪个 KeyGroup

computeKeyGroupForKeyHash 源码如下所示:

/*** Assigns the given key to a key-group index.** @param keyHash the hash of the key to assign* @param maxParallelism the maximum supported parallelism, aka the number of key-groups.* @return the key-group to which the given key is assigned* 根据 Key 的 hash 值来计算其对应的 KeyGroup 的 index*/
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {return MathUtils.murmurHash(keyHash) % maxParallelism;
}

根据 Key 的 hash 值进行 murmurHash 后,对 maxParallelism 进行求余,就是对应的 KeyGroupIndex。

第二步:计算 KeyGroup 属于哪个并行度

computeOperatorIndexForKeyGroup 源码如下所示:

// 根据 maxParallelism、算子的并行度 parallelism 和 keyGroupId,
// 计算 keyGroupId 对应的 subtask 的 index
public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {return keyGroupId * parallelism / maxParallelism;
}

4.1 示例

假如 maxParallelism 为 50,parallelism 为 10,那么数据是如何分布的?

MathUtils.murmurHash(key.hashCode()) % maxParallelism:所有 key 的 hashCode 通过 Murmurhash 对 50 求余得到的范围为 0~49,也就是说:总共有 keyGroupId 为 0~49 的这 50 个 KeyGroup

subtask 与 KeyGroupId 对应关系:

0~4 号 KeyGroup 位于第 0 个 subtask。即:subtask0 处理 KeyGroupRange(0,4 ) 的数据
5~9 号 KeyGroup 位于第 1 个 subtask。即:subtask1 处理 KeyGroupRange(5,9 ) 的数据
10~14 号 KeyGroup 位于第 2 个 subtask。即:subtask2 处理 KeyGroupRange(10,14 ) 的数据
15~19 号 KeyGroup 位于第 3 个 subtask。即:subtask3 处理 KeyGroupRange(15,19 ) 的数据
。。。以此类推

这里我们看到了每个 subtask 对应一个 KeyGroupRange 的数据,且是闭区间。

5.计算某个并行度上负载哪些 KeyGroup?

计算某个并行度上负载哪些 KeyGroup?等价于求某个 subtask 负载的 KeyGroupRange。

在 KeyGroupRangeAssignment 类中有 computeKeyGroupRangeForOperatorIndex 方法可以完成这个操作:

// 根据 maxParallelism, parallelism 计算 operatorIndex 对应的 subtask 负责哪个范围的 KeyGroup
public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(int maxParallelism,int parallelism,int operatorIndex) {checkParallelismPreconditions(parallelism);checkParallelismPreconditions(maxParallelism);Preconditions.checkArgument(maxParallelism >= parallelism,"Maximum parallelism must not be smaller than parallelism.");int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;return new KeyGroupRange(start, end);
}

6.Rescale 过程

如下图所示是 Flink 依赖 KeyGroup 修改并发的 Rescale 过程(并发度从 3 改成 4):


由图中可得知 key 的范围是 0~19, maxParallelism = 10

0->{0,10} 表示 key 为 0 和 10 的数据,对应的 KeyGroupId 为 0。1->{1,11} 表示 key 为 1 和 11 的数据,对应的 KeyGroupId 为 1。
以此类推。。。

修改并发前的映射关系

并发度是 3:

subtask0 负责 KeyGroupRange(0,3)
Subtask1 负责 KeyGroupRange(4,6)
Subtask2 负责 KeyGroupRange(7,9)

修改并发后的映射关系

并发度是 4:

subtask0 负责 KeyGroupRange(0,2)
Subtask1 负责 KeyGroupRange(3,4)
Subtask2 负责 KeyGroupRange(5,7)
Subtask3 负责 KeyGroupRange(8,9)

6.1 maxParallelism 修改则任务不能恢复

KeyGroup 的数量为 maxParallelism,一旦 maxParallelism 变了,说明 KeyGroup 的分组完全变了,而 KeyedState 恢复是以 KeyGroup 为最小单元的,所以 maxParallelism 改变后,任务将无法恢复。在 Checkpoint 恢复过程中也会对新旧 Job 的 maxParallelism 进行检查匹配,如果某个算子的 maxParallelism 变了,则任务将不能恢复。

7.总结

本文主要介绍了 KeyGroup、KeyGroupRange 和 maxParallelism 的一些概念,及他们之间的关系。最后讲述了改并发的情况状态的 Rescale 流程。其实在 Flink 内部不只是状态恢复时需要用到 KeyGroup,数据 keyBy 后进行 shuffle 数据传输时也需要按照 KeyGroup 的规则来将分配数据,将数据分发到对应的 subtask 上。

本文比较简单,主要是为后续 State 恢复流程做一个铺垫。

【Flink】Flink key 应该分配到哪个 KeyGroup 以及 KeyGroup 分配在哪个subtask相关推荐

  1. flink 不设置水印_从0到1学习Flink—— Flink parallelism 和 Slot 介绍

    前言 之所以写这个是因为前段时间自己的项目出现过这样的一个问题: 1Caused by: akka.pattern.AskTimeoutException: 2Ask timed out on [Ac ...

  2. 大数据计算引擎之Flink Flink CEP复杂事件编程

    原文地址:大数据计算引擎之Flink Flink CEP复杂事件编程 复杂事件编程(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的时事件系序列库,并利 ...

  3. 凌波微步Flink——Flink的技术逻辑与编程步骤剖析

    转载请注明出处:http://blog.csdn.net/dongdong9223/article/details/95459606 本文出自[我是干勾鱼的博客] Ingredients: Java: ...

  4. 凌波微步Flink——Flink API中的一些基础概念

    转载请注明出处:http://blog.csdn.net/dongdong9223/article/details/95355619 本文出自[我是干勾鱼的博客] Ingredients: Java: ...

  5. extract 模板 php,PHP自定义函数实现assign()数组分配到模板及extract()变量分配到模板功能示例...

    这篇文章主要介绍了PHP自定义函数实现assign()数组分配到模板及extract()变量分配到模板功能,可模拟tp框架中模板变量分配功能,涉及php基于面向对象的数组赋值相关操作技巧,需要的朋友可 ...

  6. php 在模板中赋值数组变量,PHP自定义函数实现assign()数组分配到模板及extract()变量分配到模板功能示例...

    本文实例讲述了PHP自定义函数实现assign()数组分配到模板及extract()变量分配到模板功能.分享给大家供大家参考,具体如下: 这里模拟tp框架模板变量分配与赋值操作. extract($a ...

  7. 【C 语言】结构体 ( 结构体中嵌套一级指针 | 分配内存时先 为结构体分配内存 然后再为指针分配内存 | 释放内存时先释放 指针成员内存 然后再释放结构头内存 )

    文章目录 一.结构体中嵌套一级指针 1.声明 结构体类型 2.为 结构体 变量分配内存 ( 分配内存时先 为结构体分配内存 然后再为指针分配内存 ) 3.释放结构体内存 ( 释放内存时先释放 指针成员 ...

  8. c语言malloc引用类型作参数,c语言中动态内存分配malloc只在堆中分配一片内存.doc...

    c语言中动态内存分配malloc只在堆中分配一片内存 .C语言中动态内存分配(malloc)只在堆中分配一片内存,返回一个void指针(分配失败则返回0),并没有创建一个对象.使用时需要强制转换成恰当 ...

  9. 动态分区分配及可重定位分区分配

    动态分区分配及可重定位分区分配 分区大小不固定 分区分配的数据结构 二维表格(连续存储结构) 空闲分区表记录空闲分区的大小,位置和状态 已分配区表记录已占用分区的大小,位置和状态 双向循环链表(离散存 ...

最新文章

  1. Castle IOC容器实践之EnterpriseLibrary Configuration Facility
  2. Node Sass does not yet support your current environment: Windows 64-bit然如何解决,cnpm此问题解决方法
  3. 我的Python成长之路---第一天---Python基础(5)---2015年12月26日(雾霾)
  4. 万里坑路第一步:1、cocos2d-js的JDKADK,SDK,apaceANT安装和环境变量设置
  5. R语言绘制流程图(二)
  6. ALOS 12.5米精度DEM数据下载与处理
  7. 高频交易揭:美国五大高频易访录
  8. xshell 安装包(百度网盘)+安装过程连接虚拟机注意事项
  9. 学计算机优盘多少内存够用,u盘建议买多大内存的
  10. 百度大脑车型识别使用攻略
  11. 真!一文搞定 HTTP 和 HTTPS
  12. 在ios中,input唤出软键盘中‘换行’转‘搜索’、‘前往’,及直接唤醒数字键盘的实现(vue)
  13. 华中科技大学计算机王凯,苗蕾-环境科学与工程学院
  14. 微信小程序动态生成二维码
  15. 分享几个 Github 镜像网站(亲测可用)
  16. CentOS 7 升级内核
  17. 13.linux系统使用小结
  18. create remote oracle odbc data source on win10
  19. 潜入维基解密机房 更新中文视频+高清图
  20. pyqt5 QPainter绘制图形,并旋转

热门文章

  1. 企业微信与微信互通能力再升级 全面打通与视频号的连接
  2. 男子趁前女友熟睡翻开眼皮,刷脸转走15万!支付宝:几率很小
  3. 瑞幸咖啡上半年营收31.8亿元 同比增长106%
  4. 网易第二季度营收205亿元 同比增长13%超预期
  5. 互联网大厂“抢填”高考志愿
  6. 漫画《灌篮高手》将拍电影?井上雄彦发文确认
  7. 华为或正与联发科、紫光展锐就采购更多芯片事宜展开磋商
  8. 下血本了!京东宣布未来三年向湖北投资,助力经济恢复
  9. 又涨价了!华为 P40系列海外售价曝光:还好国行版友好不少
  10. 2019胡润财富报告:中国大陆中产家庭3320万户