邱从贤(山智),Apache Flink Contributor,中南大学硕士,2018 年加入阿里巴巴计算平台事业部,专注于 Flink 核心引擎开发,主要从事 Flink  State&Checkpoint 相关研发工作。

作者:Stefan Ricther & Chris Ward翻译:邱从贤(山智)

Apache Flink 是一个有状态的流计算框架,状态是作业算子中已经处理过的内存状态,供后续处理时使用。状态在流计算很多复杂场景中非常重要,比如:

  • 保存所有历史记录,用来寻找某种记录模式

  • 保存最近一分钟的所有记录,用于对每分钟的记录进行聚合统计

  • 保存当前的模型参数,用于进行模型训练

有状态的流计算框架必须有很好的容错性,才能在生产环境中发挥用处。这里的容错性是指,不管是发生硬件故障,还是程序异常,最终的结果不丢也不重。

Flink 的容错性从一开始就是一个非常强大的特性,在遇到故障时,能够保证不丢不重,且对正常逻辑处理的性能影响很小。

这里面的核心就是 checkpoint 机制,Flink 使用 checkpoint 机制来进行状态保证,在 Flink 中 checkpoint 是一个定时触发的全局异步快照,并持久化到持久存储系统上(通常是分布式文件系统)。发生故障后,Flink 选择从最近的一个快照进行恢复。有用户的作业状态达到 GB 甚至 TB 级别,对这么大的作业状态做一次 checkpoint 会非常耗时,耗资源,因此我们在 Flink 1.3 中引入了增量 checkpoint 机制。

在增量 checkpoint 之前,Flink 的每个 checkpoint 都包含作业的所有状态。我们在观察到状态在 checkpoint 之间的变化并没有那么大之后,支持了增量 checkpoint。增量 checkpoint 仅包含上次 checkpoint 和本次 checkpoint 之间状态的差异(也就是“增量”)。

对于状态非常大的作业,增量 checkpoint 对性能的提升非常明显。有生产用户反馈对于 TB 级别的作业,使用增量 checkpoint 后能将 checkpoint 的整体时间从 3 分钟降到 30 秒。这些时间节省主要归功于不需要在每次 checkpoint 都将所有状态写到持久化存储系统。

如何使用

当前,仅能够在 RocksDB StateBackend 上使用增量 checkpoint 机制,Flink 依赖 RocksDB 内部的备份机制来生成 checkpoint 文件。Flink 会自动清理掉之前的 checkpoint 文件, 因此增量 checkpoint 的历史记录不会无限增长。

为了在作业中开启增量 checkpoint,建议详细阅读 Apache Flink 的 checkpoint 文档,简单的说,你可以像之前一样开启 checkpoint,然后将构造函数的第二个参数设置为 true 来启用增量 checkpoint。

Java 示例

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new RocksDBStateBackend(filebackend, true));

Scala 示例

val env = StreamExecutionEnvironment.getExecutionEnvironment()env.setStateBackend(new RocksDBStateBackend(filebackend, true))

Flink 默认保留一个成功的 checkpoint,如果你需要保留多个的话,可以通过下面的配置进行设置:

state.checkpoints.num-retained

原理解析

Flink 的增量 checkpoint 以 RocksDB 的 checkpoint 为基础。RocksDB 是一个 LSM 结构的 KV 数据库,把所有的修改保存在内存的可变缓存中(称为 memtable),所有对 memtable 中 key 的修改,会覆盖之前的 value,当前 memtable 满了之后,RocksDB 会将所有数据以有序的写到磁盘。当 RocksDB 将 memtable 写到磁盘后,整个文件就不再可变,称为有序字符串表(sstable)。

RocksDB 的后台压缩线程会将 sstable 进行合并,就重复的键进行合并,合并后的 sstable 包含所有的键值对,RocksDB 会删除合并前的 sstable。

在这个基础上,Flink 会记录上次 checkpoint 之后所有新生成和删除的 sstable,另外因为 sstable 是不可变的,Flink 用 sstable 来记录状态的变化。为此,Flink 调用 RocksDB 的 flush,强制将 memtable 的数据全部写到 sstable,并硬链到一个临时目录中。这个步骤是在同步阶段完成,其他剩下的部分都在异步阶段完成,不会阻塞正常的数据处理。

Flink 将所有新生成的 sstable 备份到持久化存储(比如 HDFS,S3),并在新的 checkpoint 中引用。Flink 并不备份前一个 checkpoint 中已经存在的 sstable,而是引用他们。Flink 还能够保证所有的 checkpoint 都不会引用已经删除的文件,因为 RocksDB 中文件删除是由压缩完成的,压缩后会将原来的内容合并写成一个新的 sstable。因此,Flink 增量 checkpoint 能够切断 checkpoint 历史。

为了追踪 checkpoint 间的差距,备份合并后的 sstable 是一个相对冗余的操作。但是 Flink 会增量的处理,增加的开销通常很小,并且可以保持一个更短的 checkpoint 历史,恢复时从更少的 checkpoint 进行读取文件,因此我们认为这是值得的。

举个栗子

上图以一个有状态的算子为例,checkpoint 最多保留 2 个,上图从左到右分别记录每次 checkpoint 时本地的 RocksDB 状态文件,引用的持久化存储上的文件,以及当前 checkpoint 完成后文件的引用计数情况。

  • Checkpoint 1 的时候,本地 RocksDB 包含两个 sstable 文件,该 checkpoint 会把这两个文件备份到持久化存储,当 checkpoint 完成后,对这两个文件的引用计数进行加 1,引用计数使用键值对的方式保存,其中键由算子的当前并发以及文件名所组成。我们同时会维护一个引用计数中键到对应文件的隐射关系。

  • Checkpoint 2 的时候,RocksDB 生成两个新的 sstable 文件,并且两个旧的文件还存在。Flink 会把两个新的文件进行备份,然后引用两个旧的文件,当 checkpoint 完成时,Flink 对这 4 个文件都进行引用计数 +1 操作。

  • Checkpoint 3 的时候,RocksDB 将 sstable-(1),sstable-(2) 以及 sstable-(3) 合并成 sstable-(1,2,3),并且删除了三个旧文件,新生成的文件包含了三个删除文件的所有键值对。sstable-(4) 还继续存在,生成一个新的 sstable-(5) 文件。Flink 会将 sstable-(1,2,3) 和 sstable-(5) 备份到持久化存储,然后增加 sstable-4 的引用计数。由于保存的 checkpoint 数达到上限(2 个),因此会删除 checkpoint 1,然后对 checkpoint 1 中引用的所有文件(sstable-(1) 和 sstable-(2))的引用计数进行 -1 操作。

  • Checkpoint 4 的时候,RocksDB 将 sstable-(4),sstable-(5) 以及新生成的 sstable-(6) 合并成一个新的 sstable-(4,5,6)。Flink 将 sstable-(4,5,6) 备份到持久化存储,并对 sstabe-(1,2,3) 和 sstable-(4,5,6) 进行引用计数 +1 操作,然后删除 checkpoint 2,并对 checkpoint 引用的文件进行引用计数 -1 操作。这个时候 sstable-(1),sstable-(2) 以及 sstable-(3) 的引用计数变为 0,Flink 会从持久化存储删除这三个文件。

竞争问题以及并发 checkpoint

Flink 支持并发 checkpoint,有时晚触发的 checkpoint 会先完成,因此增量 checkpoint 需要选择一个正确的基准。Flink 仅会引用成功的 checkpoint 文件,从而防止引用一些被删除的文件。

从 checkpoint 恢复以及性能

开启增量 checkpoint 之后,不需要再进行其他额外的配置。如果 Job 异常,Flink 的 JobMaster 会通知所有 task 从上一个成功的 checkpoint 进行恢复,不管是全量 checkpoint 还是增量 checkpoint。每个 TaskManager 会从持久化存储下载他们需要的状态文件。

尽管增量 checkpoint 能减少大状态下的 checkpoint 时间,但是天下没有免费的午餐,我们需要在其他方面进行舍弃。增量 checkpoint 可以减少 checkpoint 的总时间,但是也可能导致恢复的时候需要更长的时间如果集群的故障频繁,Flink 的 TaskManager 需要从多个 checkpoint 中下载需要的状态文件(这些文件中包含一些已经被删除的状态),作业恢复的整体时间可能比不使用增量 checkpoint 更长。

另外在增量 checkpoint 情况下,我们不能删除旧 checkpoint 生成的文件,因为新的 checkpoint 会继续引用它们,这可能导致需要更多的存储空间,并且恢复的时候可能消耗更多的带宽。

关于控制便捷性与性能之间平衡的策略可以参考此文档:

https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/large_state_tuning.html

原文链接:https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html


上一期中奖的是:开开心心的过

从11月6日开始至11月12日截止,一周时间,下周二我会从公众号底部留言+转发+在看综合最多的读者中抽取一名读者,免费包邮送一本实体新书,这周是《企业迁云实战》,以后每周都有这样的活动,留言互动起来吧!

END

关注我

公众号(zhisheng)里回复 面经、ES、Flink、 Spring、Java、Kafka、监控 等关键字可以查看更多关键字对应的文章

Flink 实战

1、《从0到1学习Flink》—— Apache Flink 介绍2、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门3、《从0到1学习Flink》—— Flink 配置文件详解4、《从0到1学习Flink》—— Data Source 介绍5、《从0到1学习Flink》—— 如何自定义 Data Source ?6、《从0到1学习Flink》—— Data Sink 介绍7、《从0到1学习Flink》—— 如何自定义 Data Sink ?8、《从0到1学习Flink》—— Flink Data transformation(转换)9、《从0到1学习Flink》—— 介绍 Flink 中的 Stream Windows10、《从0到1学习Flink》—— Flink 中的几种 Time 详解11、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 ElasticSearch12、《从0到1学习Flink》—— Flink 项目如何运行?13、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 Kafka14、《从0到1学习Flink》—— Flink JobManager 高可用性配置15、《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍16、《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL17、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ18、《从0到1学习Flink》—— 你上传的 jar 包藏到哪里去了19、大数据“重磅炸弹”——实时计算框架 Flink
20、《Flink 源码解析》—— 源码编译运行
21、为什么说流处理即未来?
22、OPPO数据中台之基石:基于Flink SQL构建实数据仓库
23、流计算框架 Flink 与 Storm 的性能对比
24、Flink状态管理和容错机制介绍
25、原理解析 | Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理
26、Apache Flink 是如何管理好内存的?
27、《从0到1学习Flink》——Flink 中这样管理配置,你知道?
28、《从0到1学习Flink》——Flink 不可以连续 Split(分流)?
29、Flink 从0到1学习—— 分享四本 Flink 的书和二十多篇 Paper 论文
30、360深度实践:Flink与Storm协议级对比
31、Apache Flink 1.9 重大特性提前解读
32、如何基于Flink+TensorFlow打造实时智能异常检测平台?只看这一篇就够了
33、美团点评基于 Flink 的实时数仓建设实践
34、Flink 灵魂两百问,这谁顶得住?
35、一文搞懂 Flink 的 Exactly Once 和 At Least Once
36、你公司到底需不需要引入实时计算引擎?
37、Flink 从0到1学习 —— 如何使用 Side Output 来分流?38、一文让你彻底了解大数据实时计算引擎 Flink39、基于 Flink 实现的商品实时推荐系统(附源码)40、如何使用 Flink 每天实时处理百亿条日志?41、Flink 在趣头条的应用与实践42、Flink Connector 深度解析43、滴滴实时计算发展之路及平台架构实践44、Flink Back Pressure(背压)是怎么实现的?有什么绝妙之处?45、Flink 实战 | 贝壳找房基于Flink的实时平台建设46、如何使用 Kubernetes 部署 Flink 应用47、一文彻底搞懂 Flink 网络流控与反压机制48、Flink中资源管理机制解读与展望49、Flink 实时写入数据到 ElasticSearch 性能调优50、深入理解 Flink 容错机制
51、吐血之作 | 流系统Spark/Flink/Kafka/DataFlow端到端一致性实现对比

Flink 源码解析

知识星球里面可以看到下面文章

朕已阅 

flink checkpoint 恢复_Apache Flink 管理大型状态之增量 Checkpoint 详解相关推荐

  1. Apache Flink 管理大型状态之增量 Checkpoint 详解

    邱从贤(山智),Apache Flink Contributor,中南大学硕士,2018 年加入阿里巴巴计算平台事业部,专注于 Flink 核心引擎开发,主要从事 Flink  State&C ...

  2. python状态码及其含义_Shell退出状态码及其应用详解

    Shell 中运行的命令会使用0-255之间的整数值,作为退出状态码,并以此来告知shell该命令执行的状态.通常情况下,约定0代表命令成功结束,非0代表程序非正常退出. 典型退出状态码及其含义 退出 ...

  3. Flink CheckPoint机制 学习 测试 使用FsStateBackend状态后端 将checkpoint恢复到中断处

    Flink CheckPoint机制 1.实验目的 目的 开启一个Flink程序,使用hdfs做状态后端,手动取消job后,再次恢复job测试,观察程序是否能恢复到检查点,继续读取并处理数据: 实验原 ...

  4. 302状态码_HTTP协议详解(基础概念 方法 状态码 首部 连接 Cookie 新特性 安全)

    一 .基础概念 URI URI 包含 URL 和 URN. 请求和响应报文 1. 请求报文 2. 响应报文 二.HTTP 方法 客户端发送的 请求报文 第一行为请求行,包含了方法字段. GET 获取资 ...

  5. learn.log - 进程管理器fastcgi原理及fastcgi_param详解

    一. 何为FastCGI?  in all : 快-不崩溃-优雅 fast-strong-high FastCGI官方站点:http://www.fastcgi.com.common gateway  ...

  6. [入门篇]Linux操作系统fork子进程的创建以及进程的状态 超超超详解!!!我不允许有人错过!!!

    目录 0.前言 1.fork()创建子进程讲解 1.1fork()的简单介绍 1.2 创建子进程详解 1.2.1 如何理解fork创建子进程 1.2.2 子进程的PCB以及子进程的代码和数据 1.2. ...

  7. linux看不到进程管理,关于Linux下进程的详解【进程查看与管理】

    关于Linux下进程的详解[进程查看与管理] 一.关于进程 进程: 已经启动的可执行程序的运行实力 进程的组成:一个进程包含内核中的一部分地址空间和一系列数据结构.其中地址空间是内核标记的一部分内存以 ...

  8. linux usermod a,linux用户管理(useradd,usermod,suerdel命令详解)

    linux用户管理(useradd,usermod,suerdel命令详解) 新增用户 使用命令 adduser 或 useradd 添加用户. Centos下useradd与adduser命令没有区 ...

  9. HTTP常见的状态码及解决方案详解。

    转自:微点阅读  https://www.weidianyuedu.com/content/0517446524143.html HTTP中常见的各种状态码详解及解决方案 总结了一些常见的http的状 ...

最新文章

  1. 【JavaScript】核心语法之数组
  2. 三维重建13X-2:FCN和MaskRCNN中Mask的获取
  3. Chapter4:Using Standard Control(学习)
  4. python实现微信小程序的接口自动化_appium+python自动化56-微信小程序自动化(摩拜为例)...
  5. RecyclerView拖拽排序和滑动删除实现
  6. jquery ajax 跨域请求
  7. django-验证码
  8. 读博天赋更重要还是努力更重要?
  9. 为什么选择springcloud作为微服务架构
  10. [转]BT1120接口及协议
  11. jre7或jre8或其他版本共存问题
  12. 关于zip命令的使用问题
  13. 兄弟连php课程,LAMP兄弟连PHP课程学习笔记 第一天 PHP基本语法
  14. 计算机启动windows程序,win7系统开机自动运行程序怎么设置|win7电脑开机启动程序的方法...
  15. bilibili怎么用用户名登录_b站账号(bilibili免费账号密码)
  16. xcode7的那些坑-“Your binary is not optimized for iPhone 5” (ITMS-90096) when submitting
  17. 桌面扫码点餐系统(小程序+Java后台)
  18. Android Studio系统状态栏,设置setSmallIcon通知图标无效问题及解决方案
  19. java list 索引值_List中固定某个索引的值-简单替换位置
  20. 数据保护新愿景:欧盟GDPR十个误解与争议

热门文章

  1. Python类继承对象
  2. win11更新黑屏无法进入系统怎么办 windows11黑屏更新无法进入系统的解决方法
  3. win11下载时卡住0%不动怎么办 Windows11下载卡在0%的解决方法
  4. win11网络无法连接怎么办 Windows11连不上网的解决方法
  5. java逻辑判断_阿里JAVA开发强制要求的10条条件控制及逻辑判断的规范,切记
  6. java的源文件和字节码文件_javaweb项目源文件与字节码文件目录结构
  7. 盒子横向排列-初识浮动Float(HTML、CSS)
  8. 移动端h5 隐藏滑动滚动条
  9. Nginx设置expires设定页面缓存时间
  10. Eclipse-阶段1-配置问题解决