其实说到flink的checkpoint,那必然也得讲讲flink的savepoint


什么是 Savepoint ? Savepoint 与 Checkpoint 有什么不同?

Savepoint 是依据 Flink checkpointing 机制所创建的流作业执行状态的一致镜像。 你可以使用 Savepoint 进行 Flink 作业的停止与重启、fork 或者更新。 Savepoint 由两部分组成:稳定存储(列入 HDFS,S3,…) 上包含二进制文件的目录(通常很大),和元数据文件(相对较小)。 稳定存储上的文件表示作业执行状态的数据镜像。 Savepoint 的元数据文件以(相对路径)的形式包含(主要)指向作为 Savepoint 一部分的稳定存储上的所有文件的指针。

注意: 为了允许程序和 Flink 版本之间的升级,请务必查看以下有关分配算子 ID 的部分 。

从概念上讲, Flink 的 Savepoint 与 Checkpoint 的不同之处类似于传统数据库中的备份与恢复日志之间的差异。 Checkpoint 的主要目的是为意外失败的作业提供恢复机制。 Checkpoint 的生命周期由 Flink 管理,即 Flink 创建,管理和删除 Checkpoint - 无需用户交互。 作为一种恢复和定期触发的方法,Checkpoint 实现有两个设计目标:i)轻量级创建和 ii)尽可能快地恢复。 可能会利用某些特定的属性来达到这个,例如, 工作代码在执行尝试之间不会改变。 在用户终止作业后,通常会删除 Checkpoint(除非明确配置为保留的 Checkpoint)。
与此相反、Savepoint 由用户创建,拥有和删除。 他们的用例是计划的,手动备份和恢复。 例如,升级 Flink 版本,调整用户逻辑,改变并行度,以及进行红蓝部署等。 当然,Savepoint 必须在作业停止后继续存在。 从概念上讲,Savepoint 的生成,恢复成本可能更高一些,Savepoint 更多地关注可移植性和对前面提到的作业更改的支持。

除去这些概念上的差异,Checkpoint 和 Savepoint 的当前实现基本上使用相同的代码并生成相同的格式。然而,目前有一个例外,我们可能会在未来引入更多的差异。例外情况是使用 RocksDB 状态后端的增量 Checkpoint。他们使用了一些 RocksDB 内部格式,而不是 Flink 的本机 Savepoint 格式。这使他们成为了与 Savepoint 相比,更轻量级的 Checkpoint 机制的第一个实例。


分配算子 ID

强烈建议你按照本节所述调整你的程序,以便将来能够升级你的程序。主要通过 uid(String) 方法手动指定算子 ID 。这些 ID 将用于恢复每个算子的状态。

DataStream<String> stream = env.// Stateful source (e.g. Kafka) with ID.addSource(new StatefulSource()).uid("source-id") // ID for the source operator.shuffle()// Stateful mapper with ID.map(new StatefulMapper()).uid("mapper-id") // ID for the mapper// Stateless printing sink.print(); // Auto-generated ID

如果不手动指定 ID ,则会自动生成 ID 。只要这些 ID 不变,就可以从 Savepoint 自动恢复。生成的 ID 取决于程序的结构,并且对程序更改很敏感。因此,强烈建议手动分配这些 ID 。

Savepoint 状态

你可以将 Savepoint 想象为每个有状态的算子保存一个映射“算子 ID ->状态”:

Operator ID | State
------------+------------------------
source-id   | State of StatefulSource
mapper-id   | State of StatefulMapper

在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 的每个条目映射回新程序。


算子

你可以使用命令行客户端来触发 Savepoint,触发 Savepoint 并取消作业,从 Savepoint 恢复,以及删除 Savepoint。

从 Flink 1.2.0 开始,还可以使用 webui 从 Savepoint 恢复。

触发 Savepoint

当触发 Savepoint 时,将创建一个新的 Savepoint 目录,其中存储数据和元数据。可以通过配置默认目标目录或使用触发器命令指定自定义目标目录(参见:targetDirectory参数来控制该目录的位置。

注意:目标目录必须是 JobManager(s) 和 TaskManager(s) 都可以访问的位置,例如分布式文件系统(或者对象存储系统)上的位置。
以 FsStateBackend 或 RocksDBStateBackend 为例:

# Savepoint 目标目录
/savepoint/# Savepoint 目录
/savepoint/savepoint-:shortjobid-:savepointid/# Savepoint 文件包含 Checkpoint元数据
/savepoint/savepoint-:shortjobid-:savepointid/_metadata# Savepoint 状态
/savepoint/savepoint-:shortjobid-:savepointid/...

从 1.11.0 开始,你可以通过移动(拷贝)savepoint 目录到任意地方,然后再进行恢复。

在如下两种情况中不支持 savepoint 目录的移动:1)如果启用了 *entropy injection:这种情况下,savepoint 目录不包含所有的数据文件,因为注入的路径会分散在各个路径中。 由于缺乏一个共同的根目录,因此 savepoint 将包含绝对路径,从而导致无法支持 savepoint 目录的迁移。2)作业包含了 task-owned state(比如 GenericWriteAhreadLog sink)。
和 savepoint 不同,checkpoint 不支持任意移动文件,因为 checkpoint 可能包含一些文件的绝对路径。
如果你使用 MemoryStateBackend 的话,metadata 和 savepoint 的数据都会保存在 _metadata 文件中,因此不要因为没看到目录下没有数据文件而困惑。
注意: 不建议移动或删除正在运行作业的最后一个 Savepoint ,因为这可能会干扰故障恢复。因此,Savepoint 对精确一次的接收器有副作用,为了确保精确一次的语义,如果在最后一个 Savepoint 之后没有 Checkpoint ,那么将使用 Savepoint 进行恢复。

触发 Savepoint

$ bin/flink savepoint :jobId [:targetDirectory]

这将触发 ID 为 :jobId 的作业的 Savepoint,并返回创建的 Savepoint 路径。 你需要此路径来还原和删除 Savepoint 。

使用 YARN 触发 Savepoint

$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId

这将触发 ID 为 :jobId 和 YARN 应用程序 ID :yarnAppId 的作业的 Savepoint,并返回创建的 Savepoint 的路径。

使用 Savepoint 取消作业

$ bin/flink cancel -s [:targetDirectory] :jobId

这将自动触发 ID 为 :jobid 的作业的 Savepoint,并取消该作业。此外,你可以指定一个目标文件系统目录来存储 Savepoint 。该目录需要能被 JobManager(s) 和 TaskManager(s) 访问。

从 Savepoint 恢复
$ bin/flink run -s :savepointPath [:runArgs]

这将提交作业并指定要从中恢复的 Savepoint 。 你可以给出 Savepoint 目录或 _metadata 文件的路径。

跳过无法映射的状态恢复

默认情况下,resume 操作将尝试将 Savepoint 的所有状态映射回你要还原的程序。 如果删除了运算符,则可以通过 --allowNonRestoredState(short:-n)选项跳过无法映射到新程序的状态:

$ bin/flink run -s :savepointPath -n [:runArgs]
删除 Savepoint
$ bin/flink savepoint -d :savepointPath

这将删除存储在 :savepointPath 中的 Savepoint。

请注意,还可以通过常规文件系统操作手动删除 Savepoint ,而不会影响其他 Savepoint 或 Checkpoint(请记住,每个 Savepoint 都是自包含的)。 在 Flink 1.2 之前,使用上面的 Savepoint 命令执行是一个更乏味的任务。

配置

你可以通过 state.savepoints.dir 配置 savepoint 的默认目录。 触发 savepoint 时,将使用此目录来存储 savepoint。 你可以通过使用触发器命令指定自定义目标目录来覆盖缺省值。

# 默认 Savepoint 目标目录
state.savepoints.dir: hdfs:///flink/savepoints

如果既未配置缺省值也未指定自定义目标目录,则触发 Savepoint 将失败。

注意:目标目录必须是 JobManager(s) 和 TaskManager(s) 可访问的位置,例如,分布式文件系统上的位置。

flink的Savepoint相关推荐

  1. 使用Flink的Savepoint功能

    Flink通过Savepoint功能可以做到程序升级后,继续从升级前的那个点开始执行计算,保证数据不中断. Flink中Checkpoint用于保存状态,是自动执行的,会过期,Savepoint是指向 ...

  2. flink的savepoint实验-scala

    大致步骤同[1] 步骤 步骤内容 ① 启动Zookeeper,Hadoop,Flink(HA)三种集群(启动顺序不要错) ② mvn clean scala:compile compile packa ...

  3. flink的savepoint实验-java

    前言和一些准备工作 首先需要明确的是,savepoint和checkpoint不是同一类, 所以在进行savepoint实验时,代码里不需要添加checkpoint enable选项等. $FLINK ...

  4. Flink Checkpoint/Savepoint

    一.CheckPoints 为了使Flink的状态具有良好的容错性,Flink提供了检查点机制(Checkpoints).通过检查点机制,Flink定期在数据流上生成checkpoint barrie ...

  5. flink check-point save-point理解

    参考 https://blog.csdn.net/hxcaifly/article/details/84673292 https://www.jianshu.com/p/4d31d6cddc99 ht ...

  6. 1.15.Flink state(状态)管理与恢复、什么是state、Keyed State、Operator State、状态容错(生成快照,恢复快照),checkPoint简介,重启策略等

    1.15.Flink state(状态)管理与恢复 1.15.1.什么是state 1.15.2.状态(State) 1.15.3.Keyed State 1.15.4.Operator State ...

  7. flink的savepoints和checkpoints以及state Query(暂时无法全部完成)

    下面表格来自[18] 维度 Checkpoints Savepoints 目标 任务失败的恢复/故障转移机制 手动备份/重启/恢复任务 实现 轻量快速 注重可移植性,成本较高 生命周期 Flink自身 ...

  8. flink 写kafka_flink消费kafka的offset与checkpoint

    生产环境有个作业,逻辑很简单,读取kafka的数据,然后使用hive catalog,实时写入hbase,hive,redis.使用的flink版本为1.11.1. 为了防止写入hive的文件数量过多 ...

  9. 95-260-050-源码-检查点-SavePoint

    2.Flink的Savepoint 2.1 概述 概念 Ø 让时光倒流的语法糖 Ø 全局.一致性快照,如数据源offset.并行操作状态 Ø 可以从应用在过去的任意做了savepoint的时刻开始继续 ...

最新文章

  1. Oxford Nanopore sequencing, hybrid error correction, and de novo assembly of a eukaryotic genome
  2. 快手刘霁:AI基础能力决定每个公司AI的迭代和落地效率丨MEET2020
  3. CentOS 6.5环境实现corosync+pacemaker实现DRBD高可用
  4. [MATLAB学习笔记] global声明全部变量
  5. 日常生活小技巧 -- “Error: Encountered an improper argument”的解决方法
  6. java ftp读取文件内容_java读取ftp中TXT文件的案例
  7. apache域名本地映射
  8. rfid在高速公路管理中的应用_RFID亮灯电子标签在仓储管理中的应用
  9. 通过canal实现把MySQL数据实时增量到kafka
  10. node稳定版本_Node.js十年,你大爷还是你大爷
  11. 简单区块链Python实现
  12. concurrenthashmap为什么是线程安全_为什么 StringBuilder 不是线程安全的?
  13. 数学分析教程(科大)——1-(1~10)小节
  14. tt作曲家简谱打谱软件_每个人的本地作曲家! 会议友好的满意设置
  15. python中-是什么意思
  16. 基于DES和RSA算法自动分配密钥的加密聊天程序
  17. 深度操作系统 deepin V23 Beta 发布
  18. 解决谷歌无法加载扩展程序
  19. 中科院最年轻院士入职浙大!他一篇论文未发博士毕业!
  20. (译)网站加速最佳实践——雅虎35条

热门文章

  1. 谷歌钦定的编程语言Kotlin大揭秘
  2. AIX root用户密码遗忘的处理
  3. 2021Java面经:java封装的概念
  4. 我知道眼泪多余,笑变得好不容易
  5. 纯函数、柯里化、组合函数的解析以及代码实现
  6. 使用 Docker 安装 Zabbix,并配置自定义监控项
  7. (一)操作系统的基本概念
  8. Matlab exercise04
  9. 单目ORB-SLAM流程梳理
  10. Doom-Emacs安装和基本使用方法