这里写目录标题

  • flink checkpint出错类型
  • flink 重启策略
  • Checkpint 流程简介
  • 增量Checkpoint实现原理
    • MemoryStateBackend
      • 原理
    • FsStateBackend
      • 原理
    • RocksDBStateBackend
      • 原理
    • RocksDBStateBackend增量更新
      • 原理详解
      • 举例
  • Checkpoint 异常情况排查
    • Checkpoint Decline:
    • Checkpoint Expire:
      • Source Trigger 慢
      • State 非常大
      • 数据倾斜或有反压的情况
        • 反压问题处理:
      • barrier对齐的慢
      • 线程太忙
      • 同步阶段慢
      • 异步阶段慢
  • 总结一下checkpoint 1.0版本(社区在202301正在搞2.0版本)的问题
  • 作业恢复和扩缩容原理与优化
    • 本地状态重建
  • 快照管理
  • 容错恢复2022进展:
  • 分层状态存储架构

本文主要参考官方社区给出的checkpoint出错类型和种类,以及查找报错的方法。

flink checkpint出错类型

主要分为两种
Checkpoint Decline 与 Checkpint Expire 两种类型 下面分开讨论

flink 重启策略

  • 固定延迟(失败重试次数)重启策略(Fixed Delay Restart Strategy)
  • 故障率重启策略(Failure Rate Restart Strategy)
  • 没有重启策略(No Restart Strategy)
  • Fallback 重启策略(Fallback Restart Strategy):使用群集定义的重新启动策略。这对于启用检查点的流式传输程序很有帮助。
    - 默认情况下,如果没有定义其他重启策略,则选择固定延迟重启策略。

Checkpint 流程简介

  1. 第一步,Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint;。
  2. 第二步,source 节点向下游广播 barrier,这个 barrier 就是实现 Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才会执行相应的 Checkpoint。
  3. 第三步,当 task 完成 state 备份后,会将备份数据的地址(state handle)通知给 Checkpoint coordinator。
  4. 第四步,下游的 sink 节点收集齐上游两个 input 的 barrier 之后,会执行本地快照,这里特地展示了 RocksDB incremental Checkpoint 的流程,首先 RocksDB 会全量刷数据到磁盘上,然后 Flink 框架会从中选择没有上传的文件进行持久化备份。
  5. 同样的,sink 节点在完成自己的 Checkpoint 之后,会将 state handle 返回通知 Coordinator。
  6. task收到上游全部的barrier后,会把barrier向下继续传递,并异步将自己的状态写如到持久化存储中,完成后给jm中的 Checkpoint coordinator 通知已经完成,并将备份数据的地址(state handle)也给过去。Checkpoint coordinator收集全后,会将Checkpoint Meta写入到持久化存储中,完。

总结一下 checkpoint分为一下几个操作:

  1. JM trigger checkpoint
  2. Source 收到 trigger checkpoint 的 PRC,自己开始做 snapshot,并往下游发送 barrier
  3. 下游接收 barrier(需要 barrier 都到齐才会开始做 checkpoint)
  4. Task 开始同步阶段 snapshot(发起备份,以及,增量checkpoint 将内存数据刷到磁盘的操作)
  5. Task 开始异步阶段 snapshot(具体操作备份)
    Task snapshot 完成,汇报给 JM
    以上任何一个操作失败都会导致checkpoint失败

增量Checkpoint实现原理

目前三种状态管理器

MemoryStateBackend

原理

运行时所需的 State 数据全部保存在 TaskManager JVM 堆上内存中,执行 Checkpoint 的时候,会把 State 的快照数据保存到 JobManager 进程 的内存中。执行 Savepoint 时,可以把 State 存储到文件系统中。

  • 1 基于内存的状态管理器,聚合类算子的状态会存储在JobManager的内存中

  • 2 单次状态大小默认最大被限制为5MB,可以通过构造函数来指定状态初始化内存大小。无论单次状态大小最大被限制为多少,都不可大于akka的frame大小(1.5MB,JobManager和TaskManager之间传输数据的最大消息容量)。状态的总大小不能超过 JobManager 的内存。

  • 3 是Flink默认的后端状态管理器,默认是异步的

  • 4 主机内存中的数据可能会丢失,任务可能无法恢复

  • 5 将工作state保存在TaskManager的内存中,并将checkpoint数据存储在JobManager的内存中

  • 适用:本地开发和调试 状态比较少的作业

FsStateBackend

原理

运行时所需的 State 数据全部保存在 TaskManager 的内存中,执行 Checkpoint 的时候,会把 State 的快照数据保存到配置的文件系统中。TM 是异步将 State 数据写入外部存储。

  • 1 基于文件系统的状态管理器
  • 2 如果使用,默认是异步
  • 3 比较稳定,3个副本,比较安全。不会出现任务无法恢复等问题
  • 4 状态大小受磁盘容量限制
  • 5 将工作state保存在TaskManager的内存中,并将checkpoint数据存储在文件系统中
  • 适用:状态比较大,窗口比较长,大的KV状态

RocksDBStateBackend

原理

使用嵌入式的本地数据库 RocksDB 将流计算数据状态存储在本地磁盘中。在执行 Checkpoint 的时候,会将整个 RocksDB 中保存的 State 数据全量或者增量持久化到配置的文件系统中。

  • 1 状态数据先写入RocksDB,然后异步的将状态数据写入文件系统。
  • 2 正在进行计算的热数据存储在RocksDB,长时间才更新的数据写入磁盘中(文件系统)存储,体量比较小的元数据状态写入JobManager内存中(将工作state保存在RocksDB中,并且默认将checkpoint数据存在文件系统中)
  • 3 支持的单 key 和单 value 的大小最大为每个 2^31 字节(2GB)
  • 4 RocksDBStateBackend是目前唯一支持incremental的checkpoints的策略
  • 5 默认使用是异步

RocksDBStateBackend增量更新

原理详解

目前只有RocksDBStateBackend支持checkpoint增量更新,
Flink 的增量 checkpoint 以 RocksDB 的 checkpoint 为基础。RocksDB 是一个 LSM 结构的 KV 数据库,把所有的修改保存在内存的可变缓存中(称为 memtable),所有对 memtable 中 key 的修改,会覆盖之前的 value,当前 memtable 满了之后,RocksDB 会将所有数据以有序的写到磁盘。当 RocksDB 将 memtable 写到磁盘后,整个文件就不再可变,称为有序字符串表(sstable)。

RocksDB 的后台压缩线程会将 sstable 进行合并,就重复的键进行合并,合并后的 sstable 包含所有的键值对,RocksDB 会删除合并前的 sstable。

在这个基础上,Flink 会记录上次 checkpoint 之后所有新生成和删除的 sstable,另外因为 sstable 是不可变的,Flink 用 sstable 来记录状态的变化。为此,Flink 调用 RocksDB 的 flush,强制将 memtable 的数据全部写到 sstable,并硬链到一个临时目录中。这个步骤是在同步阶段完成,其他剩下的部分都在异步阶段完成,不会阻塞正常的数据处理。

Flink 将所有新生成的 sstable 备份到持久化存储(比如 HDFS,S3),并在新的 checkpoint 中引用。Flink 并不备份前一个 checkpoint 中已经存在的 sstable,而是引用他们。Flink 还能够保证所有的 checkpoint 都不会引用已经删除的文件,因为 RocksDB 中文件删除是由压缩完成的,压缩后会将原来的内容合并写成一个新的 sstable。因此,Flink 增量 checkpoint 能够切断 checkpoint 历史。

为了追踪 checkpoint 间的差距,备份合并后的 sstable 是一个相对冗余的操作。但是 Flink 会增量的处理,增加的开销通常很小,并且可以保持一个更短的 checkpoint 历史,恢复时从更少的 checkpoint 进行读取文件。

举例


上图以一个有状态的算子为例,checkpoint 最多保留 2 个,上图从左到右分别记录每次 checkpoint 时本地的 RocksDB 状态文件,引用的持久化存储上的文件,以及当前 checkpoint 完成后文件的引用计数情况。

  • Checkpoint 1 的时候,本地 RocksDB 包含两个 sstable 文件,该 checkpoint 会把这两个文件备份到持久化存储,当 checkpoint 完成后,对这两个文件的引用计数进行加 1,引用计数使用键值对的方式保存,其中键由算子的当前并发以及文件名所组成。我们同时会维护一个引用计数中键到对应文件的隐射关系。

  • Checkpoint 2 的时候,RocksDB 生成两个新的 sstable 文件,并且两个旧的文件还存在。Flink 会把两个新的文件进行备份,然后引用两个旧的文件,当 checkpoint 完成时,Flink 对这 4 个文件都进行引用计数 +1 操作。

  • Checkpoint 3 的时候,RocksDB 将 sstable-(1),sstable-(2) 以及 sstable-(3) 合并成 sstable-(1,2,3),并且删除了三个旧文件,新生成的文件包含了三个删除文件的所有键值对。sstable-(4) 还继续存在,生成一个新的 sstable-(5) 文件。Flink 会将 sstable-(1,2,3) 和 sstable-(5) 备份到持久化存储,然后增加 sstable-4 的引用计数。由于保存的 checkpoint 数达到上限(2 个),因此会删除 checkpoint 1,然后对 checkpoint 1 中引用的所有文件(sstable-(1) 和 sstable-(2))的引用计数进行 -1 操作。

  • Checkpoint 4 的时候,RocksDB 将 sstable-(4),sstable-(5) 以及新生成的 sstable-(6) 合并成一个新的 sstable-(4,5,6)。Flink 将 sstable-(4,5,6) 备份到持久化存储,并对 sstabe-(1,2,3) 和 sstable-(4,5,6) 进行引用计数 +1 操作,然后删除 checkpoint 2,并对 checkpoint 引用的文件进行引用计数 -1 操作。这个时候 sstable-(1),sstable-(2) 以及 sstable-(3) 的引用计数变为 0,Flink 会从持久化存储删除这三个文件。

Checkpoint 异常情况排查



以上几个参数分别是:

  1. 一列表示有多少个 subtask 对这个 Checkpoint 进行了 ack
  2. 表示该 operator 的所有 subtask 最后 ack 的时间
  3. 表 示 整 个 operator 的 所 有 subtask 中 完 成 snapshot 的最长时间
  4. 表示当前 Checkpoint 的 state 大小,增量就是增量的大小

从上图可以知道第4个task操作导致整体的checkpoint非常慢,可以根据UI给出物理执行图来有根据的检查任务,但是大部分情况当发现checkpoint报错时,任务已经down掉,那么就需要根据yarn上的日志来具体分析

Checkpoint Decline:

从jm的日志中可以看到

Decline checkpoint 10000 by task ********* container_e119_1640332468237_165586_01_000002 @ hostname01 with allocation id 2872ccdf76d6af3baf9064be9d46fcaa

可以去 container_e119_1640332468237_165586_01_000002 所在的 tm 也就是hostname01 ,可以查看具体的tm日志查看具体的报错信息

Checkpoint Decline 中有一种情况 Checkpoint Cancel,这是由于 较小的 barrier还没有对齐,就已经收到了更大的 barrier,这种情况下就会把小的 checkpoint给取消的掉
在jm.log中会有 当前chk-11还在对齐阶段,但收到了 chk-12的barrier ,所以取消了 chk-11

Received checkpoint barrier for checkpoint  ****** before completing current checkpoint   ** Skipping current checkpoint

下游task收到被取消的barrier时会打印

$taskNameWithSubTaskAndID: Checkpoint chk-11 canceled, aborting alignment.
或
$taskNameWithSubTaskAndID: Received cancellation barrier for checkpoint chk-12 before completing current checkpoint chk-11. Skipping current checkpoint

Checkpoint Expire:

上面的Decline 比较少见,更常见的是 Expire 的情况。其中最主要的原因就是因为 checkpoint 做的非常慢,导致超时等各种情况。
出现expire时,jm.log中会有

Checkpoint 157 of job ba02728367ae85bca4d43ab7445251f5 expired before completing.
以及
Received late message for now expired checkpoint attempt 158 from task d11aac4d0b6f4fd9bde0fa4e76240c71 of job ba02728367ae85bca4d43ab7445251f5 at container_e119_1640332468237_165586_01_000002 @ cp-hadoop-hdp-node07 (dataPort=11460).

其中tm具体日志可以参考上述的办法来找到对应的报错日志。
chk很慢的情况主要有一下几种:

Source Trigger 慢

这个一般发生较少,但是也有可能,因为 source 做 snapshot 并往下游发送 barrier 的时候,需要抢锁(这个现在社区正在进行用 mailBox 的方式替代当前抢锁的方式,详情参考[1])。如果一直抢不到锁的话,则可能导致 Checkpoint 一直得不到机会进行。如果在 Source 所在的 taskmanager.log 中找不到开始做 Checkpoint 的 log,则可以考虑是否属于这种情况,可以通过 jstack 进行进一步确认锁的持有情况

State 非常大

这种情况使用增量checkpoint,现在增量checkpoint只支持RocksDBStateBackend 并需要设置开启

数据倾斜或有反压的情况

数据倾斜可以重新设计主键以及数据处理流程来改善,反压可以参考flink UI来查看哪里反压 ,并使用Metrics 来获取关键指标

反压问题处理:

定位节点,加Metrics
我们在监控反压时会用到的 Metrics 主要和 Channel 接受端的 Buffer 使用率有关,最为有用的是以下几个:
Metrics: Metris描述
outPoolUsage发送端 Buffer 的使用率
inPoolUsage接收端 Buffer 的使用率
floatingBuffersUsage(1.9 以上)接收端 Floating Buffer 的使用率
exclusiveBuffersUsage (1.9 以上)接收端 Exclusive Buffer 的使用率

barrier对齐的慢

Checkpoint 在 task 端分为 barrier 对齐(收齐所有上游发送过来的 barrier),然后开始同步阶段,再做异步阶段。如果 barrier 一直对不齐的话,就不会开始做 snapshot

这种情况也会导致 State非常大,当先到的barrier到达后,晚的barrier来之前,这之间的数据也会放入到State中一起保存起来。

在Debug日志下,barrier对齐后会有

Starting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4/4)

如果一直没有,注意! 是Debug日志,可以使用 at least once,来观察哪个barrier没有到达,多说一嘴,at least once 与 exectly once 最主要的语义区别就是 ,先到的barrier,是否等后到的barrier对齐才做checkpoint

Received barrier for checkpoint 96508 from channel 5

线程太忙

在 task 端,所有的处理都是单线程的,数据处理和 barrier 处理都由主线程处理,如果主线程在处理太慢(比如使用 RocksDBBackend,state 操作慢导致整体处理慢),导致 barrier 处理的慢,也会影响整体 Checkpoint 的进度,可能会出现barrier一直对不齐的情况
可以用AsyncProfile生成一份火焰图,查看占用cpu最多的栈,大数据集群中,如果实时离线使用一套集群,凌晨时,离线任务集体调度,就有可能导致node节点上线程不够,无法完成checkpoint导致报错

同步阶段慢

非 RocksDBBackend 我们可以考虑查看是否开启了异步 snapshot,如果开启了异步 snapshot 还是慢,需要看整个 JVM 在干嘛,也可以使用前一节中的工具。

对于 RocksDBBackend 来说,我们可以用 iostate 查看磁盘的压力如何,另外可以查看 tm 端 RocksDB 的 log 的日志如何,查看其中 SNAPSHOT 的时间总共开销多少

异步阶段慢

这一步主要是,jm将Checkpoint Meta写入到持久化存储,

非 RocksDB-Backend ,主要是网络流量的问题,可以使用metirc来监控检查问题

RocksDB 来说,则需要从本地读取文件,写入到远程的持久化存储上,会涉及磁盘IO的瓶颈,如果感觉IO足够,网络也没问题,可以开启多线程上传的功能

总结一下checkpoint 1.0版本(社区在202301正在搞2.0版本)的问题

问题 1:对齐时间长,反压时被完全阻塞

Flink 的 Checkpoint 机制是通过从 Source 插入 Barrier,然后在 Barrier 流过每个算子的时候给每个算子做快照来完成的。为了保证全局一致性,如果算子有多个输入管道的时候,需要对齐多个输入的 Barrier。这就产生了问题 1,因为每条链路的处理速度不一样,因此 Barrier 对齐是需要时间的。如果某一条链路有反压,会因为等待对齐而使得整条链路完全被阻塞,Checkpoint 也会因为阻塞而无法完成。

问题 2:Buffer 数目固定,管道中有多余的处理数据

由于算子间的上下游 Buffer 数目是固定的,它们会缓存比实际所需更多的数据。这些多余的数据不仅会在反压时进一步阻塞链路,而且会使得 Unaligned Checkpoint 存储更多的上下游管道数据。

问题 3:快照异步上传时间较长且不可控

快照的过程包括两部分:同步状态刷盘和异步上传状态文件,其中异步文件上传的过程和状态文件大小相关,时间较长且不可控。

Flink 1.11、 Flink 1.12 引入了 Unaligned Checkpoint, 使得 Checkpoint Barrier 不被缓慢的中间数据阻塞。Flink 1.13、Flink 1.14 引入了 Buffer Debloating,让算子与算子间的管道数据变得更少。Flink 1.15、Flink 1.16 引入了通用增量 Checkpoints,让异步上传的过程更快、更稳定。

对于问题 1,在 Flink 1.16 版本中,Unaligned Checkpoint 允许透支 Buffer,解决了在 Buffer 不足时,不能及时响应 Unaligned Checkpoint 的问题。此外,全局计时超时机制的引入能够有效改进 Unaligned 和 Aligned Checkpoint 之间自动转换的触发条件。

对于问题 2,Buffer debloating 的引入可以动态调整缓存的数据量,动态缓存 1 秒内需要处理的数据。

下面我们来重点看一看第 3 个问题是如何用通用增量 Checkpoint 来解决的

Flink 的算子状态更新会反映在状态表中。在之前的设计当中,Flink 算子做快照的过程分为两步:第一步是同步的对状态表进行快照,内存中的数据刷盘,准备好上传到持久存储的文件;第二步是异步的上传这些文件。

异步上传文件这个部分有两个问题:

问题 1:异步上传的文件大小依赖 State Backend 的实现

问题 2:异步过程需要等到同步过程结束才能开始,因为同步快照结束前是没法准备好需要上传的文件的


我们来分别看一下这两个问题。对于第一个问题,以 RocksDB 为例,虽然 Flink 对 RocksDB 也支持增量 Checpoint,但是 RocksDB 出于自身实现考虑,它需要对文件做 Compaction。每次 Compaction 会产生新的比较大的文件,那这个时候即使是增量 Checkpoint,需要上传的文件也会因此时不时变大。在 Flink 作业并发比较大的情况下,上传文件时不时变大的问题就会变得很频繁,因为只有等所有并发的文件上传完毕,一个完整的算子状态才算快照完成。


对于第二个问题,在同步快照结束前,Flink 无法准备好需要上传的文件,所以必须要等快照结束时才能开始上传。也就是说,上图中的红色斜条纹这个时间段完全被浪费了。如果需要上传的状态比较大,会在很短时间内对 CPU 和网络产生较大的压力。

解决:Flink 社区实现了通用增量快照。在新架构下,状态更新不仅会更新状态表,而且会记录状态的更新日志。上图中状态表会和架构升级前一样周期性的刷到持久存储,但是这个周期可以比较大(比如 10 分钟)在后台慢慢上传,该过程称为物化过程。同时状态更新日志也会持续上传到远端持久存储,并且在做 Checkpoint 时 Flush 剩余全部日志。

这样的设计就比较好的解决了前面提到的两个问题:通过将快照过程和物化过程完全独立开来,可以让异步上传的文件大小变得很稳定;同时因为状态更新是持续的,所以我们可以在快照之前就一直持续的上传更新日志,所以在 Flush 以后我们实际需要上传的数据量就变得很小。

架构升级后的一个 Checkpoint 由物化的状态表以及增量更新的日志组成。物化过程结束后,相对应的更新日志就可以被删除了。上图中的蓝色方框部分,是通用增量快照和之前架构的区别,这个部分被称为 Changelog Storage(DSTL)。


DSTL 是 Durable Short-term Log 的缩写。我们从这个英文名就能看出来 DSTL 是有针对性需求的

  • 需要短期持久化增量日志,物化后即可删除

  • 需要支持高频写,是一个纯 append 写操作,仅在恢复时需要读取

  • 需要 99.9% 的写请求在1秒内完成

  • 需要和现有的 Checkpoint 机制提供同一级别的一致性保证

社区现在的版本是用 DFS 来实现的,综合考量下来基本可以满足需求。同时 DSTL 提供了标准的接口也可以对接其他的存储。

这里是通用增量快照,不是增量checkpoint!

这个部分的最后我们来看一下使用通用增量快照的 Trade-off

  • 通用增量快照带来的好处显而易见:
    可以让 Checkpoint 做的更稳定,平滑 CPU 曲线,平稳网络流量使用(因为快照上传的时间被拉长了,并且单次上传量更小更可控)

  • 可以更快速的完成 Checkpoint(因为减少了做快照 Flush 的那个部分需要上传的数据)

  • 也因此,我们也可以获得更小的端到端的数据延迟,减小 Transactional Sink 的延迟

  • 因为可以把 Checkpoint 做快,所以每次 Checkpoint 恢复时需要回滚的数据量也会变少。这对于对数据回滚量有要求的应用是非常关键的

通用增量快照也会带来一些额外的 Cost,主要来自两个方面:Checkpoint 放大和状态双写:

  • Checkpoint 放大的影响主要有两点。第一,远端的存储空间变大。但远端存储空间很便宜,10G 一个月大约 1 块钱。第二,会有额外的网络流量。但一般做 Checkpoint 使用的流量也是内网流量,费用几乎可以忽略不计。

  • 对于状态双写,双写会对极限性能有一些影响,但在我们的实验中发现在网络不是瓶颈的情况下,极限性能的损失在 2-3% 左右(Flink 1.17 中优化了双写部分 FLINK-30345 [2] ,也会 backport 到 Flink 1.16),因此性能损失几乎可以忽略不计。

作业恢复和扩缩容原理与优化

接下来讲一讲 Flink 社区在作业恢复和扩缩容部分的优化,主要包括优化本地状态重建,云原生背景下的分层状态存储架构升级,以及简化调度过程。

作业扩缩容和作业容错恢复有很多共性,比如都需要依据上一次快照来做恢复,都需要重新调度,但他们在细微之处又是有些区别的。

本地状态重建

以状态恢复本地重建来讲,对于容错恢复,将状态文件原样加载进本地数据库就可以了,但是如果是扩缩容恢复就会更复杂一些。举例来说上图中的作业并发从 3 扩容到 4,新作业 task 2 的状态有一部分来自原先作业的 task 1,还有一部分来自原先作业的 task 2,分别是橙色和黄色部分。

Flink 作业算子的状态在 Rescaling 做状态重新分配时,新分配的状态来自原先作业相邻的并发,不可能出现跳跃的有间隔的状态分配。在缩容时,有可能有多个状态合成一个新状态;在扩容的时候,因为状态一定是变小的,所以新的变小的状态一定最多来自相邻的两个原先的并发。


接下来具体讲一讲状态是如何做本地重建的,以 RocksDB 为例。

  • 第一步,需要下载相关的状态文件。

  • 第二步,重建初始的 RocksDB 实例,并删除对实例无用的 Key,即删除上图中灰色的部分,留下橙色部分。

  • 第三步,将临时 RocksDB 实例中的 Key 插入到第二步重建的 RocksDB 中,也就是黄色的部分插入到橙色的 DB 中。

快照管理

Flink 的快照 Snapshot 分为两种:Savepoint 和 Checkpoint。

Savepoint 一般由用户触发,所以它归属用户所有,因此由用户负责创建和删除。正因此,Flink 系统引擎层是不能够去删除 Savepoint 相关文件的。所以 Savepoint 不和 Flink 作业强绑定,不同的 Flink 作业可以从同一个 Savepoint 启动。Savepoint 是自包含的:自己包含所需要的一切。

Checkpoint 正好相反,它的主要作用是系统容错自愈,所以它由 Flink 引擎周期性触发,并且所属权归属 Flink 引擎。Checkpoint 文件的组织结构都由 Flink 引擎决定和管理,所以引擎负责按需清理 Checkpoint 文件。正因此,Checkpoint 和生成该 Checkpoint 的作业强绑定,并且是非自包含的,比如说 Incremental Checkpoint 之间会有依赖关系。

那有什么问题呢?因为 Savepoint 主要目标服务对象是用户,为了对用户友好,Savepoint 使用用户可读的标准格式,也正因此 Savepoints 做得非常慢,经常情况下状态稍微大一点就会超时,同样恢复也很慢。另一方面,Checkpoint 使用的是增量系统原生格式,所以做得很快。

这种情况下,用户会把 Retained Checkpoint 当成 Savepoint 来使用。Retained Checkpoint 是在作业停掉后保留的 Checkpoint,这样Retained Checkpoint 就变成了 Savepoint 和 Checkpoint 的混合体。造成的问题是用户负责删除 Retained Checkpoint,但是用户并不知道如何安全的删除 Retained Checkpoint。

为了解决上述问题,Flink 1.15 引入了两种状态恢复模式,即 Claim 模式和 No-Claim 模式。


在 Claim 恢复模式下,引擎声明 Retained Checkpoint 的所属权,Retained Checkpoint 归引擎所有,引擎负责删除。

在 No-Claim 恢复模式下,引擎放弃 Retained Checkpoint 的所属权。Retained Checkpoint 中所有的文件都不会被 Flink 引擎使用,用户可以很安全的删除 Retained Checkpoint。

在 No-Claim 的基础上,我们引入了 Native Savepoint,来加速 Savepoint 的创建和恢复。Native Savepoints 使用和 Checkpoint 一样的存储格式,其实现原理和 No-Claim 类似。Savepoint 不会使用之前的 Checkpoint 文件,相当于做一个全量的 Checkpoint。我们的企业版本通过进一步优化,让 Native Savepoint 也真正能做到增量 Savepoint。

容错恢复2022进展:

最后我们小结回顾一下 Flink 容错恢复在 2022 年的主要进展

在分布式快照架构方面,Unaligned Checkpoint 引入全局计时器,可以通过超时机制自动从 Aligned Checkpoint 切换成 Unaligned Checkpoint,这个对于 Unaligned Checkpoint 生产可用是非常重要的一步

通用增量 Checkpoint 生产可用,这对于 Checkpoint 稳定性和完成速度有很大的提升,同时可以平滑 CPU 和网络带宽的使用

这里值得一提的是,不仅仅是阿里巴巴在 Checkpoint 这个部分贡献了大量的代码,很多其他的公司也积极的投入到社区当中,比如 Shopee 和美团。他们在社区中贡献代码同时,也积极推动这些功能在公司内部的落地和延展,取得了不错的效果

在状态存储方面,我们进行了分层状态存储的初步探索,扩缩容速度有 2 – 10 倍的提升

阿里云实时计算平台推出了扩缩容无断流的组合功能:延迟状态加载和作业热更新,分别从状态加载和作业调度这两个方面来实现扩缩容无断流

引入增量 Native Savepoint,全面提升 Savepoint 的可用性和性能

分层状态存储架构

为了更好的适应云原生的大背景,我们对分层状态存储架构也进行了初步探索,也就是说我们把远端盘也作为 State Backend 的一部分。这种分层架构可以解决 Flink 状态存储在云原生背景下面临的大部分问题:

  • 解决容器化部署本地磁盘大小受限的问题

  • 解决外置状态成本高,数据一致性难以保障的问题

  • 解决小状态需要额外落盘的问题

  • 解决大状态访问速度慢的问题

Flink checkpoint操作流程详解与报错调试方法汇总,增量checkpoint原理及版本更新变化,作业恢复和扩缩容原理与优化相关推荐

  1. mysql scope runtime_Maven依赖scope属性详解-一个报错引发的问题 - 老郭种树

    ApplicationEventMulticaster not initialized - call 'refresh' before multicasting events via the cont ...

  2. yum命令详解和报错 Cannot find a valid baseurl for repo: base

    文章目录 1 yum命令 1.1 yum简介 1.2 yum安装 1.3 yum配置 1.4 配置本地yum源 1.4.1 关于repo 文件的格式 1.5 配置国内yum源 1.5.1 上海交通大学 ...

  3. Eclipse报错解决方法汇总

    Eclipse报错一些解决方法汇总(1) 1.An invalid character [44] was present in the Cookie value 说明:ascii为44的字符是&quo ...

  4. linux tar 解压报错解决方法

    问题一 使用tar命令解压.zip文件的时候,遇到如下异常, gzip: stdin has more than one entry--rest ignored tar: Child returned ...

  5. oracle call 存储过程 带out_详解oracle数据库存储过程调试方法

    概述 虽然现在存储过程相对比较少用了,但是平时接触不可避免的要跟存储过程打交道,当需要自己写的时候总会碰到这或那的错误,这个时候一般要怎么调试呢? PL/SQL调试 PL/SQL中提供了[调试存储过程 ...

  6. 手机吃鸡语音服务器异常错误,绝地求生游戏报错解决方法汇总

    游戏内语音故障: 在游戏中,如果语音存在问题(说不了话-听不到队友说话"没有喇叭图案"),先观察游戏内的语音是否有打开. 游戏内可按快捷键Ctrl+T开启或关闭语音,Ctrl+y更 ...

  7. 吃鸡不能玩显示服务器错误,绝地求生游戏报错解决方法汇总

    游戏结束后还显示在运行中: 结束吃鸡进程后,再次运行Steam进行游戏时显示"运行中",这时无需重启或注销电脑,按Ctrl+Alt+Detele(DEL)打开任务管理器,找到&qu ...

  8. 未能找到元数据文件_Flink 源码:Checkpoint 元数据详解

    本文是 Flink 源码解析系列,通过阅读本文你能 get 到以下点: Flink 任务从 Checkpoint 处恢复流程概述 Checkpoint 元数据详解 从源码层分析:JM 该如何合理地给每 ...

  9. 微信小程序点击按钮弹出弹窗_转载 | 广东大学生就业创业微信小程序操作流程详解(一)...

    广东大学生就业创业微信小程序 操作流程详解(一) 转眼来到10月,炎炎夏日也阻挡不了青春的忙碌,同学萌走出校园,迈向社会. 在这段时间,同学们不仅要准备毕业论文,应对毕业答辩,还需要兼顾各种毕业手续的 ...

最新文章

  1. 带你剖析WebGis的世界奥秘----Geojson数据加载(高级)
  2. 深入解析C/C++的优缺点以及就业方向
  3. Android手机网页字体异常,移动端html5手机网站的中文字体使用
  4. 云存储云计算选择开源还是商业版
  5. ArcGIS 基础6-ArcCatalog数据库管理
  6. win10计算机记录,Win10新版计算器用法介绍
  7. 什么是编程?为什么要编程?
  8. 安彦Linux系统时间同步
  9. 探索女性角色扮演游戏Top Girl成功的秘诀
  10. “掌上运维” – 下一代网管的思考
  11. 三点布光材质连接,做旧
  12. 计算机存储单位和网速单位换算,数据速度计算:在线进行网速各种bps mbps kbps B/秒 KB/秒 MB/秒单位之间换算...
  13. 31岁才转行程序员,现在34了,我来说说我的经历和一些感受吧...
  14. 栈,队列和链表三者之间的关系与区别
  15. 吉利车机安装第三方软件教程,手机修改dns完整操作教程
  16. Oracle 官网下载地址
  17. 常见的数学建模比赛汇总(参考资料)
  18. 如何实现系统公告或系统消息
  19. 小游戏 虚拟支付 米大师 90009
  20. Mac,Windows和Linux系统都能读写移动硬盘的方法

热门文章

  1. Translation插件谷歌翻译引擎无法使用
  2. 三星揭示新的美国数据泄露
  3. opencv c++ 二值图像、阈值计算方法、全局阈值、自适应阈值
  4. Excel中数据转换成甘特图(wps)
  5. 最新C#调用Google即时翻译
  6. 各大网络培训站vip课程翻录免key版 [ 不断更新ing..... ]
  7. 修改创维机顶盒服务器信息,广电机顶盒如何设置修改密码
  8. k8s系列05-使用containerd和cilium部署kubeproxy-free的k8s集群
  9. 如何成功运行tomcat并进入汤姆猫
  10. iocomp iPlot使用说明20 Interpolation插值