前言

在之前那篇讲解Flink Timer的文章里,我曾经用三言两语简单解释了Key Group和KeyGroupRange的概念。实际上,Key Group是Flink状态机制中的一个重要设计,值得专门探究一下。本文先介绍Flink状态的理念,再经由状态——主要是Keyed State——的缩放(rescale)引出KeyGroup的细节。

再认识Flink状态

自从开始写关于Flink的东西以来,“状态”这个词被提过不下百次,却从来没有统一的定义。Flink官方博客中给出的一种定义如下:

When it comes to stateful stream processing, state comprises of the information that an application or stream processing engine will remember across events and streams as more realtime (unbounded) and/or offline (bounded) data flow through the system.

根据这句话,状态就是流处理过程中需要“记住”的那些数据的快照。而这些数据既可以包括业务数据,也可以包括元数据(例如Kafka Consumer的offset)。以最常用也是最可靠的RocksDB状态后端为例,状态数据的流动可以抽象为3层,如下图所示。

用户代码产生的状态实时地存储在本地文件中,并且随着Checkpoint的周期异步地同步到远端的可靠分布式文件系统(如HDFS)。这样就保证了100%本地性,各个Sub-Task只需要负责自己所属的那部分状态,不需要通过网络互相传输状态数据,也不需要频繁地读写HDFS,减少了开销。在Flink作业重启时,从HDFS取回状态数据到本地,即可恢复现场。

我们已经知道Flink的状态分为两类:Keyed State和Operator State。前者与每个键相关联,后者与每个算子的并行实例(即Sub-Task)相关联。下面来看看Keyed State的缩放。

Keyed State的缩放

所谓缩放,在Flink中就是指改变算子的并行度。Flink是不支持动态改变并行度的,必须先停止作业,修改并行度之后再从Savepoint恢复。如果没有状态,那么不管scale-in还是scale-out都非常简单,只要做好数据流的重新分配就行,如下图的例子所示。

可是如果考虑状态的话,就没有那么简单了:并行度改变之后,HDFS里的状态数据该按何种规则取回给新作业里的各个Sub-Task?下图示出了这种困局。

按照最naive的思路考虑,Flink中的key是按照hash(key) % parallelism的规则分配到各个Sub-Task上去的,那么我们可以在缩放完成后,根据新分配的key集合从HDFS直接取回对应的Keyed State数据。下图示出并行度从3增加到4后,Keyed State中各个key的重新分配。

在Checkpoint发生时,状态数据是顺序写入文件系统的。但从上图可以看出,从状态恢复时是随机读的,效率非常低下。并且缩放之后各Sub-Task处理的key有可能大多都不是缩放之前的那些key,无形中降低了本地性。为了解决这两个问题,在FLINK-3755对Keyed State专门引入了Key Group,下面具体看看。

引入Key Group

如果看官有仔细读Flink官方文档的话,可能对这个概念已经不陌生了,原话抄录如下:

Keyed State is further organized into so-called Key Groups. Key Groups are the atomic unit by which Flink can redistribute Keyed State; there are exactly as many Key Groups as the defined maximum parallelism. During execution each parallel instance of a keyed operator works with the keys for one or more Key Groups.

翻译一下,Key Group是Keyed State分配的原子单位,且Flink作业内Key Group的数量与最大并行度相同,也就是说Key Group的索引位于[0, maxParallelism - 1]的区间内。每个Sub-Task都会处理一个到多个Key Group,在源码中,以KeyGroupRange数据结构来表示。

KeyGroupRange的逻辑相对简单,部分源码如下。注意startKeyGroup和endKeyGroup实际上指的是Key Group的索引,并且是闭区间。

public class KeyGroupRange implements KeyGroupsList, Serializable {private static final long serialVersionUID = 4869121477592070607L;public static final KeyGroupRange EMPTY_KEY_GROUP_RANGE = new KeyGroupRange();private final int startKeyGroup;private final int endKeyGroup;private KeyGroupRange() {this.startKeyGroup = 0;this.endKeyGroup = -1;}public KeyGroupRange(int startKeyGroup, int endKeyGroup) {this.startKeyGroup = startKeyGroup;this.endKeyGroup = endKeyGroup;}@Overridepublic boolean contains(int keyGroup) {return keyGroup >= startKeyGroup && keyGroup <= endKeyGroup;}public KeyGroupRange getIntersection(KeyGroupRange other) {int start = Math.max(startKeyGroup, other.startKeyGroup);int end = Math.min(endKeyGroup, other.endKeyGroup);return start <= end ? new KeyGroupRange(start, end) : EMPTY_KEY_GROUP_RANGE;}public int getNumberOfKeyGroups() {return 1 + endKeyGroup - startKeyGroup;}public int getStartKeyGroup() {return startKeyGroup;}public int getEndKeyGroup() {return endKeyGroup;}@Overridepublic int getKeyGroupId(int idx) {if (idx < 0 || idx > getNumberOfKeyGroups()) {throw new IndexOutOfBoundsException("Key group index out of bounds: " + idx);}return startKeyGroup + idx;}public static KeyGroupRange of(int startKeyGroup, int endKeyGroup) {return startKeyGroup <= endKeyGroup ? new KeyGroupRange(startKeyGroup, endKeyGroup) : EMPTY_KEY_GROUP_RANGE;}
}

我们还有两个问题需要解决:

  • 如何决定一个key该分配到哪个Key Group中?
  • 如何决定一个Sub-Task该处理哪些Key Group(即对应的KeyGroupRange)?

第一个问题,相关方法位于KeyGroupRangeAssignment类:

    public static int assignToKeyGroup(Object key, int maxParallelism) {return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);}public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {return MathUtils.murmurHash(keyHash) % maxParallelism;}

可见是对key进行两重哈希(一次取hashCode,一次做MurmurHash)之后,再对最大并行度取余,得到Key Group的索引。

第二个问题,仍然在上述类中的computeKeyGroupRangeForOperatorIndex()方法,源码如下。

    public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(int maxParallelism,int parallelism,int operatorIndex) {checkParallelismPreconditions(parallelism);checkParallelismPreconditions(maxParallelism);Preconditions.checkArgument(maxParallelism >= parallelism,"Maximum parallelism must not be smaller than parallelism.");int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;return new KeyGroupRange(start, end);}

可见是由并行度、最大并行度和算子实例(即Sub-Task)的ID共同决定的。根据Key Group的逻辑,上一节中Keyed State重分配的场景就会变成下图所示(设最大并行度为10)。

很明显,将Key Group作为Keyed State的基本分配单元之后,上文所述本地性差和随机读的问题都部分得到了解决。当然还要注意,最大并行度对Key Group分配的影响是显而易见的,因此不要随意修改最大并行度的值。Flink内部确定默认最大并行度的逻辑如下代码所示。

    public static int computeDefaultMaxParallelism(int operatorParallelism) {checkParallelismPreconditions(operatorParallelism);return Math.min(Math.max(MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)),DEFAULT_LOWER_BOUND_MAX_PARALLELISM),UPPER_BOUND_MAX_PARALLELISM);}

其中,下限值DEFAULT_LOWER_BOUND_MAX_PARALLELISM为128,上限值UPPER_BOUND_MAX_PARALLELISM为32768。

The End

准备开启远程办公模式。希望疫情快点好转。

民那晚安。

Flink状态的缩放(rescale)与键组(Key Group)设计相关推荐

  1. 【Flink】Flink状态的缩放(rescale)与键组(Key Group)设计

    1.概述 转载:Flink状态的缩放(rescale)与键组(Key Group)设计 这个东东还没了解过.来看一下. 在之前那篇讲解Flink Timer的文章里,我曾经用三言两语简单解释了Key ...

  2. Flink 状态管理:算子状态、键值分区状态、状态后端、有状态算子的扩缩容

    文章目录 状态管理 算子状态 键值分区状态 状态后端(State Backends) 有状态算子的扩缩容 状态管理 通常意义上,函数里所有需要任务去维护并用来计算结果的数据都属于任务的状态,可以把状态 ...

  3. Flink小知识: KeyState的Rescale与 Key Group

    本文先介绍Flink状态的理念,再经由状态--主要是Keyed State--的缩放(rescale)引出KeyGroup的细节. Flink状态 自从开始写关于Flink的东西以来,"状态 ...

  4. Flink 状态管理

    1.MemoryStateBackend(Default) • 内存级的状态后端,会将键控状态作为内存中的对象进⾏管理,将它们存储在 TaskManager 的 JVM 堆上,⽽将 checkpoin ...

  5. 【推荐实践】Flink 状态(State)管理在推荐场景中的应用

    导语 Flink 提供了灵活丰富的状态管理,可轻松解决数据之间的关联性.本文介绍了Flink 状态(State)管理在推荐场景中的应用,大家结合自己的应用场景与业务逻辑,选择合适的状态管理. 背景 F ...

  6. Flink 状态管理与 Checkpoint 机制

    点击上方"zhisheng",选择"设为星标" 一.状态分类 相对于其他流计算框架,Flink 一个比较重要的特性就是其支持有状态计算.即你可以将中间的计算结果 ...

  7. Flink状态后端配置(设置State Backend)

    Flink提供不同的状态后端(state backends)来区分状态的存储方式和存储位置.flink状态可以存储在java堆内存内或者内存之外.通过状态后端的设置,flink允许应用保持大容量的状态 ...

  8. Flink状态一致性检查点

    Flink状态一致性检查点 一致性检查点:是指在某一个时刻所有算子将同一个任务都完成的情况下进行的一个快照(方便后续计算出错时,提供一个数据恢复的快照).Flink有状态的流处理,内部每个算子任务都可 ...

  9. Flink大数据实时计算系列-Flink的state介绍、Flink丰富的状态访问、Flink状态的分类

    Flink大数据实时计算系列-Flink的state介绍.Flink丰富的状态访问.Flink状态的分类 目录 Flink的state介绍 Flink丰富的状态访问 Flink状态的分类 Flink参 ...

  10. 【Flink状态】FsStateBackend 下 ValueState > MapState

    [Flink状态]FsStateBackend 下 ValueState > MapState 背景: 对程序进行状态后端替换(Rocks -> Fs)时,程序产生了背压.(状态开启了TT ...

最新文章

  1. Djando 的 cmd命令
  2. 中石油计算机组成原理a在线考试,计算机组成原理试题A.doc
  3. Windows下查看wifi密码的命令
  4. 前端遍历列表生成表格_图书作者的演练-创建列表页和添加表单框-flask
  5. 中职计算机英语教师教学总结,中职计算机教师教学工作总结 (3000字).doc
  6. ZOJ 2760 How Many Shortest Path 最大流+floyd求最短路
  7. springboot关闭http登录验证
  8. 计算理论导论第1章答案 Michael Sipser
  9. android-sdk-windows 如何安装,android-sdk-windows 安装.doc
  10. mysql查询条件忽略大小写_mysql 查询条件不区分大小写问题
  11. 纳什均衡定义、举例、分类
  12. android 转场动画 监听,Android 中的转场动画及兼容处理
  13. Soft Labels for Ordinal Regression阅读笔记
  14. ESP8266-Arduino网络编程实例-WiFi连接丢失解决方法
  15. End-to-End Learning From Spectrum Data: A DL Approach for Wireless Signal Identification(阅读笔记)
  16. Linux进程与线程
  17. h5 神策埋点_数据分析(一)埋点
  18. 怎么传文件到百度云服务器,别人传文件给我 可是怎么用百度网盘保存文件呢 其实很简单!...
  19. 2021年压力容器作业R2移动式压力容器充装证考试题库
  20. python图像拉伸

热门文章

  1. IDEA——一个项目启动多个服务
  2. 程序员也要学英语——连词、并列句和从句
  3. Matlab的数据导入和导出
  4. linux系统实现TTS(文字转语音)功能
  5. CC2430DMA学习
  6. 数据结构二叉树学习1-前序序列创建二叉树
  7. 大数据智能推荐系统原理介绍
  8. word中的表格复制到html代码,怎样将Word中的表格复制到Excel中还保持原有内容和格式?...
  9. 透明图片怎么发给别人_怎么用秀米写公众号文章
  10. windows运行中自定义命令创建/自定义bat文件创建