参考:

Flink状态的缩放(rescale)与键组(Key Group)设计_LittleMagics的博客-CSDN博客

【Flink】Flink key 应该分配到哪个 KeyGroup 以及 KeyGroup 分配在哪个subtask_九师兄的博客-CSDN博客_flink key

总览:

共计2k字,阅读时间10min。

前言

在 Flink 中,有很多数据需要进行保存,而且以及集群的方式进行保存以及重现。在分布式中的保存以及回复是很难实现的。那么我们先看看 Flink 是怎么进行保存的。

State

State 是 Flink 进行状态保留,那么具体定义如下:

When it comes to stateful stream processing, state comprises of the information that an application or stream processing engine will remember across events and streams as more realtime (unbounded) and/or offline (bounded) data flow through the system.

这句话的主要意思就是,State 是流处理过程中需要记住的数据的快照,而这些数据既可以包括业务数据,也可以包括元数据(比如 各种 JDBC、Connecter)。以最常用的 RocksDB 状态后端为例子,状态数据的流动可以抽象为三层,分别是:User code、Local state backend、Persisted savepoint。

用户代码产生的状态实时地存储在本地文件中,并且随着 Checkpoint 的周期异步地同步到远端可靠的分布式文件系统中。这样皆可以保证100%的本地性,各个 Sub-Task 只需要负责自己所属的那一部分 State 的保存,不需要通过网络互相传输 State,也不需要频繁的读写 HDFS,减少网络开支。

这里主要说明的是,我们 Flink 任务通过 yarn 运行在 dataManager 上,这个节点一般也是我们 dataNode 节点,那么保存在 hdfs 上的数据,这个 dataNode 也是有的,从而实现本地化。

在 Flink 需要作业重启的时候,从 HDFS 取回状态数据到本地,即可恢复运行。

而且在 Flink 中,有两种状态: Keyed State & Operator State。前者与每个键相关联,后者与每个算子的并行实例(Sub-Task)相关联。然后主要讨论的是关于 Keyed State 的缩放。

Keyed State 缩放

在实际使用 Flink 程序的时候,我们可能会更改某些算子的并行度(可能是因为计算的太慢、可能是上游的并行度调整、可能是其他某些原因),我们都要保存当前 State,然后修改代码的并行度,再从 Savepoint 处恢复。如果没有 State的话,我们进行 并行度修改是很方便的,只需要进行数据流的重新分配就行:

但如果加上了 State,我们就必须考虑怎么进行 State 的恢复:

思考一下最开始的 Flink 分配 key 是通过 hash 取并发的余数进行(hash(key) % parallelism),然后分配到各个 SubTask 上去,但如果放缩后根据新的计算方式势必会有以下的问题:

根据对应图可以看出来,状态恢复基本就是随机读写了,这样会跨磁盘、跨网路,效率低下。并且放缩后,各个 SubTask 处理的 key 也发生了改变,降低了本地性。为了解决这个问题,FLINK-3755 对 Keyed State 专门引入了 KeyGroup & KeyGroupRange。

KeyGroup & KeyGroupRange

keyGroup

在上一章,其实我们讨论 Timer 的存储的时候,在 InternalTimeTimerManagerImpl 中提起过 KeyGroup & KeyGroupRange,我们说 KeyGroup 是 Keyed State 原子单位,而且 Flink 作业内 Key Group 的数量与 maxParallelism 相同,也就是说 keyGroup 的索引在 [ 0, maxParallelism - 1 ] 的区间范围内。 每个 subTask 都会处理一个到多个 KeyGroup,这些都会保存到 KeyGroupRange 中(subTask 中存储了 KeyGroupRange,也就是这个 subTask 需要处理哪些 keyGroupRange):

比如上图算子A可能是通过keyed后的数据,如果我当前的 maxParallelism 是3(现实不可能),那么意味着我有3个 keyGroup,对应的A1处理一个,A2处理一个,A3处理了一个;而算子B的B1处理的是三个 KeyGroup。

KeyGroupRange

KeyGroupRange 被创建在每个 subTask 中,记录着这个 subTask 需要处理哪些数据。

查看源码结构如下:

startKeyGroup 和 endKeyGroup 实际上是指 keyGroup的索引区间,而且是闭区间。所以我们可以知道 subTask 获取数据是通过连续的一段值来进行获取的。

那么:

  • 如果决定一个 key 该分配在哪个 KeyGroup 中?
  • 如果决定一个 SubTask 应该处理哪些 KeyGroup?

KeyGroupRangeAssignment

那就观察观察代码,然后讲解吧。

computeDefaultMaxParallelism()

这个类以前我们也有讲到过,在计算默认最大并行度的时候,就是通过这个类的 computeDefaultMaxParallelism 计算得出:

具体规则就是 将算子的并行度*1.5后,向上取整,到2的N次幂。如果范围在 2**7 到 2**15 之间,那么就中。如果超出范围,小了就取 128,大了就取32768。

所以 Flink 生成的 maxParallelism 位于12832768之间,如果任务特别巨大,最好手动再加一点,如果后期升级超出 maxParallelism 的话,可能会导致无法从 Savepoint 处恢复。

computeKeyGroupForKeyHash()

获取当前元素所对应的 KeyGroup

根据 Key 取 hash 值,进行 murmurHash后,对 maxParallelism 进行求余,就是对应的 KeyGroup。

客官们可就迷糊了,这还不是已经取了一次 hashcode,为什么还要取 murmurhash呢?

通过源码可以看到 computeKeyGroupForKeyHash() 被两个地方所调用,从 assignToKeyGroup() 过来时已经进行了一次hashcode取值,是直接通过 object.hash 获得;另一个是通过 KeyGroupRangePartitioner.partition() ,传入是 key,为 int。

所以强词夺理的解释可以为,传入一个 Int 后再 reHash 的结果计算并行度。

computeOperatorIndexForKeyGroup()

获取 KeyGroup 所对应的 subTaskIndex,通过 KeyGroup * 当前并行度 / 最大并行度。

比如我们一般的 maxParallelism 为128,那么 keyGroupId 为 0~127,根据公式计算:

(0~127) * parallelism / 128 = [0, parallelism)

这里的 keyGroupId 也是对 maxParallelism 取余得到的,所以一定得到的结果是 [0, parallelism) 内的整数。

(n-1) * m / n = [0, m)

但是像以往所说,我们是通过 keyGroup 和 keyGroupRange 来进行存储的,所以还有另外一个计算。

computeKeyGroupRangeForOperatorIndex()

获取 subTask 的 KeyGroupRange。

通过 maxParallelism 、 parallelism 、 operatorIndex 来计算这个 operator 对应的 keyGroupRange。

KeyGroupRange 主要的就是 start 与 end,所以计算出这两个就可以了:

start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);

end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;

在进行注册调用的时候会进行 Operation KeyGroupRange 的计算,就会调用这个方法。

Rescale

在上述代码可以看到,我们连续的 hash 数据更可能由一个 Operator 进行使用,也更可能存储在本地上。如果我们重启 Flink 程序,并且将并发由3改成4,那么变化如下图:

可以看到,将 KeyGroup 作为 Keyed State 的基本分配单位后,我们的本地性和随机读写、网络IO等问题都得到了不同程度的解决。而且必须要注意 maxParallelism 对于一个 Flink 程序的重要性。

总结

画图:

看完了,但貌似还是心有余悸,KeyGroupRangeAssignment 确实可以用来计算并行度、KeyGroupRange什么的,但在哪些地方进行注册的,keyed State 怎么跟随的?

done, keep on.

Flink 第8.2章 Flink 的键组 KeyGroup 与 缩放 Rescale相关推荐

  1. 【Flink】Flink状态的缩放(rescale)与键组(Key Group)设计

    1.概述 转载:Flink状态的缩放(rescale)与键组(Key Group)设计 这个东东还没了解过.来看一下. 在之前那篇讲解Flink Timer的文章里,我曾经用三言两语简单解释了Key ...

  2. 【flink】RocksDB介绍以及Flink对RocksDB的支持

    1.概述 转载:「Flink」RocksDB介绍以及Flink对RocksDB的支持 2.RocksDB简介 RocksDB是基于C++语言编写的嵌入式KV存储引擎,它不是一个分布式的DB,而是一个高 ...

  3. Flink教程(20)- Flink高级特性(双流Join)

    文章目录 01 引言 02 双流join介绍 03 Window Join 3.1 Tumbling Window Join 3.2 Sliding Window Join 3.3 Session W ...

  4. Flink教程(29)- Flink内存管理

    文章目录 01 引言 02 Flink内存管理 2.1 Flink内存划分 2.2 Flink堆外内存 2.3 序列化与反序列化 2.4 操纵二进制数据 2.5 注意 03 文末 01 引言 在前面的 ...

  5. Flink教程(13)- Flink高级API(状态管理)

    文章目录 01 引言 02 Flink中的有状态计算 03 有状态和无状态计算 3.1 无状态计算 3.1.1 无状态计算特点 3.1.2 无状态计算例子(消费延迟计算) 3.2 有状态计算 3.2. ...

  6. Flink教程(09)- Flink批流一体API(Connectors示例)

    文章目录 01 引言 02 Connectors 2.1 Flink目前支持的Connectors 2.2 JDBC案例 2.3 Kafa案例 2.3.1 Kafa相关命令 2.3.2 Kafka C ...

  7. 2021年大数据Flink(九):Flink原理初探

    Flink原理初探 Flink角色分工 在实际生产中,Flink 都是以集群在运行,在运行的过程中包含了两类进程. JobManager: 它扮演的是集群管理者的角色,负责调度任务.协调 checkp ...

  8. 2021年大数据Flink(八):Flink入门案例

    目录 Flink入门案例 前置说明 API 编程模型 准备工程 pom文件 log4j.properties Flink初体验 需求 编码步骤 代码实现 Flink入门案例 前置说明 API API ...

  9. 2021年大数据Flink(六):Flink On Yarn模式

    目录 Flink On Yarn模式 原理 为什么使用Flink On Yarn? Flink如何和Yarn进行交互? 两种方式 操作 1.关闭yarn的内存检查 2.同步 3.重启yarn 测试 S ...

  10. flink启动命令参数_[Flink]Flink1.3 指南四 命令行接口-阿里云开发者社区

    Flink提供了一个命令行接口(CLI)用来运行打成JAR包的程序,并且可以控制程序的运行.命令行接口在Flink安装完之后即可拥有,本地单节点或是分布式的部署安装都会有命令行接口.命令行接口启动脚本 ...

最新文章

  1. [ JSOI 2015 ] Salesman
  2. Web测试介绍2一 安全测试
  3. 帝国cms万能通用api二次开发核心包使用说明
  4. JSP第六次课:数据库访问显示商品信息
  5. CTR深度学习模型之 DIEN(Deep Interest Evolution Network) 的理解与示例
  6. P3435 [POI2006]OKR-Periods of Words kmp + fail指针
  7. 第十五天 图【下】(大结局)
  8. JNI学习积累之三 ---- 操作JNI函数以及复杂对象传递
  9. i.MX6 交叉编译zlib、yasm、xvidcore、libpng、x264、jpegsrc、ffmpeg
  10. 系统集成项目管理工程师2021年报名时间
  11. C#实现人民币阿拉伯数字转换成大写金额的代码
  12. 如何用安卓手机做代理服务器
  13. 初学者关于贝叶斯纳什均衡各类符号的一点理解
  14. 杀毒软件开发,原理、设计、编程实战
  15. 逆向笔记2(数据宽度_逻辑运算)
  16. 洛谷P1458 [USACO2.1]顺序的分数 Ordered Fractions
  17. Java利用Set集合去重复
  18. 一款非常萌的桌面工具 --- Bongo Cat Mver 附使用教程
  19. 面试必备:四种经典限流算法讲解
  20. 微软内部封杀 Slack

热门文章

  1. 各大主流编程语言比较,运用场景
  2. excel计算数据的差和的公式和方法、相关系数、绝对误差
  3. VBAProject密码清除 for EXCEL2003
  4. Web 全栈大会:万维网之父的数据主权革命
  5. Matlab导入数据(一定有用!!)
  6. hd6630m可以玩lol吗_《LOL》流畅玩!Intel HD620核显性能实测
  7. word封面背景及水印背景
  8. 小米路由器4a千兆版修改sn和关闭电源led灯
  9. 如何查询本机ip地址
  10. 【计组】计算机乘法运算