作为流式计算,Flink通过checkpoint机制和kafka的可回溯性来保证作业在failover时不丢失状态。

作为生产环境的flink,我们期待做到快速failover、弹性扩缩容和平滑迁移,尽量做到用户无感知和变更方便,从而让用户将更多精力放在功能实现上。

本文先介绍checkpoint机制,接着介绍flink如何rescale,最后再介绍下代码流程。

注:下文中统一以task表示算子的一个并发,比如并发度为3的map算子包含3个并行的task,下文中出现的subtask语义上等同于task。

Checkpoint机制

flink快照从源头开始触发(记录消费的offset),通过barrier来标记本次快照,如图1的offset。

barrier流过的地方都会将state保存到共享存储中,如图2中的sum。

当barrier流到Sink时,所有算子都完成快照,本次作业快照也就完成,如图3(Sink算子无状态)。

一旦作业失败重启,会将state都恢复到各个算子中,同时从记录的offset开始消费,确保从上一次快照的地方恢复作业。我们可以看到,Task中的状态为21(1+2+3+4+5+6),下一次累加的数据为7,保证了flink内部状态的一致性。

以上为最简单的情况,实际情况可能包含多个快照、多个算子、迭代等复杂场景。

当多个快照时,flink通过barrier将数据分段,每个barrier都标记着一个checkpointID,如下图所示:

当一个task有多个输入时,必须等待上游所有的barrier都到达后,才能做快照。如果一个上游的barrier已经达到,想要做到exactly-once,需要先把之后到达的数据缓存下来,等做完快照再处理。

具体的实现原理可以参考论文:State Management in Apache Flink

Rescale原理

当作业的并发度改变时,flink会重新分配状态。这里采取的partition策略是固定总partition个数,当task并发改变时,重新计算并将partition分配到每个task上。除了这种partition策略,还存在根据partition大小自动合并和拆分的策略,比如Hbase所使用的。

在flink中,一个key group就是一个partition。之所以选择以key group为基本单位来操作状态,是为了减少磁盘访问IO和随机读写(如果以key为单位就会出现这种情况,比如恢复时每个task都需要读取全部的state来决定每个key是否属于自己)。

flink中有两种状态,包括operator state和key state。operator state是以task为单位的,一般采用list的形式存储,当重新rescale时每个task可以选择接受全部的operator state或者按照list平分。key state时以key为单位的,必须在keyby时候才能使用这种状态。

下面以kafka offset作为operator state作为介绍。每个task都会以list的形式记录自己负责的<partitionId, offset>,当做快照的时候,将状态保存在共享存储,所有task的list state会拼接成一个大的list。当重新rescale的时候,flink将list中的元素平分给每个task。(实际的flink kafka consumer是通过union方式获取所有list,然后再选择属于自己的)

下面简单介绍下key state的rescale。key group的个数等于作业的最大并发(一旦设置不可改变,即key group的个数必须大于等于task的并发度),每个key通过hash映射属于其中一个key group。比如下图,共有10个KG,KG-1包含1-11的key。

当作业rescale的时候,会将list形式的KG平分到每个task。

上图中最下面给出了key->KG→task的映射过程:

  • 计算key的哈希值。
  • 根据哈希值和最大并发确定key所属KG。
  • 根据key所属KG来确定发到下游哪个并发的task。

对应KeyGroupRangeAssignment代码如下:

    public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {Preconditions.checkNotNull(key, "Assigned key must not be null!");return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));}public static int assignToKeyGroup(Object key, int maxParallelism) {Preconditions.checkNotNull(key, "Assigned key must not be null!");return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);}public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {return keyGroupId * parallelism / maxParallelism;}

上游的计算在KeyGroupStreamPartitioner类里,下游的计算在KeyGroupPartitioner类里。

代码流程

这里的代码以flink-1.10.0作为参考。

关键类

CheckpointCoordinator

这个是位于Master节点的快照控制中心,负责定期的触发checkpoint和手动触发savepoint,维护在做和已完成的快照。

StateAssignmentOperation

这个是位于Master节点的作业恢复时负责rescale的类,主要是根据新作业的并发重新分配状态。针对operator state,主要采用broadcast的方式使得每个task都能接触算子全部的状态;针对key state,采用均分KG的方法来重新划分state的归属。

TaskStateManagerImpl

具体Task的状态管理中心,包括和JobMaster做checkpoint的交互,管理本地状态。

StreamTaskStateInitializerImpl

具体task的状态恢复,这里也是各个statebackend开始创建的地方。

RocksDBKeyedStateBackend

具体task通过rocksdb做key state的地方。使用这种backend,每个状态是一个cf,主键的组织形式为<KG, key, namespace>。支持增量快照和全量快照。

HeapKeyedStateBackend

具体task通过rocksdb做key state的地方。使用这种backend,底层通过使用CopyOnWriteStateMap来存储,主键的组织形式为<NS, K, SV>。相比rocksdb,内存的存取速度都非常快,但是状态大小受制于内存。

如何制作快照

  • CheckpointCoordinator::triggerCheckpoint()。这个是checkpoint和savepoint共同的入口函数,checkpoint是通过定时调度来做的,savepoint则需要人工触发。这里头会做一些控制检查,没有问题的话就会向source task发送制作快照通知。
  • Execution::triggerCheckpointHelper()。通知source task对应的节点做快照。
  • TaskExecutor::triggerCheckpoint()。快照通知到TaskManager。
  • Task::triggerCheckpointBarrier()。进到具体task里。
  • StreamTask::performCheckpoint()。新版本采用了mailBox模型来解决持锁竞争问题。这里会首先下发barrier,然后开始本地快照。
  • CheckpointingOperation::executeCheckpointing()。进行同步快照(checkpointStreamOperator方法)和异步快照(AsyncCheckpointRunnable类)。
  • StreamOperator::snapshotState() → AbstractStreamOperator::snapshotState()。同步快照的制作,主要保存KeyedStateRaw、OperatorStateRaw、OperatorStateManaged、KeyedStateManaged等状态。
  • AsyncCheckpointRunnable::run()。异步快照的制作。
  • CheckpointResponder::acknowledgeCheckpoint()。快照做完后汇报给主节点。

具体的快照制作,取决于所选statebackend,这里不再详述,可以参考RocksDBKeyedStateBackend和HeapKeyedStateBackend。

针对非source节点,需要上游的barrier对齐后才能触发快照,这点跟source task略有不同,如下所示:

  • CheckpointedInputGate::pollNext()。从输入里头获取barrier。
  • CheckpointBarrierAligner::processBarrier()。处理barrier,负责exactly-once快照的处理。另一个类似类CheckpointBarrierTracker则负责at-least-once快照的处理。
  • CheckpointBarrierAligner::notifyCheckpoint()。如果barrier都到齐了,那么开始制作快照。
  • StreamTask::triggerCheckpointOnBarrier()。进到task里,之后的流程就如上面所述了。

如何恢复快照

Master端的分配

主要的状态分配逻辑都在类StateAssignmentOperation里。这里先明确几个概念:

  • ExecutionJobVertex,表示一个逻辑上的执行节点,可能是好几个operator通过chain连到一起的。
  • Execution,对应ExecutionJobVertex的一个并发执行,也是由好几个operator通过chain连到一起的。
  • OperatorState,表示一个operator的所有并发的状态。
  • OperatorID,一个operator的唯一标识。
  • KeyGroupRange,表示一个subtask所负责的KG范围。
  • 状态。包括ManagedOperatorStates、RawOperatorStates、ManagedKeyedState和RawKeyedState。
  • TaskStateSnapshot,一个Execution所有operator的状态。

这里的处理逻辑是,对ExecutionJobVertex的所有operator做状态分配,对operator的所有subtask做状态分配。基本流程如下:

  • 检查并发是否符合要求,主要是确保设置并发不要超过最大并发等。
  • 计算每个subtask负责的KeyGroupRange,下面根据这个标准来分配KG。
  • 重新分配operatorState,主要在reDistributePartitionableStates里实现,这里头对unionState进行了合并,按照round-bin的方式来分配list的state。
  • 重新分配keyedState,主要在reDistributeKeyedStates里实现,这里头会具体到subtask里,从之前的快照里找到所有属于它的stateHandler。
  • 将分配好的状态赋值给ExecutionJobVertices。这里会以Execution为基本单位,设置它的JobManagerTaskRestore(由多个operator的状态组成)。

Task端的恢复

当状态都分配好了之后,在Task端就可以进行状态恢复了。大概流程如下:

  • TaskStateManagerImpl::prioritizedOperatorState() ,将对应operator的状态(OperatorSubtaskState)拿出来,最后存到PrioritizedOperatorSubtaskState里。
  • StreamTaskStateInitializerImpl::streamOperatorStateContext,初始化keyedStatedBackend、operatorStateBackend、timeServiceManager等过程。
  • BackendRestorerProcedure::createAndRestore(),这个是在初始化keyedStatedBackend的时候调用的,将状态保存到了keyedStatedBackend中。
  • RestoreOperation::restore(),针对历史状态进行恢复。这是个抽象函数,比如一个具体的实现是RocksDBFullRestoreOperation::restore()。
  • 之后根据具体的stateHandle进行恢复。

flink checkpoint 恢复_Flink断点恢复机制相关推荐

  1. 【FLink】Flink checkpoint 实现数据连续计算 恢复机制 拓扑图 变化 如何处理

    1.概述 本博客是视频:Apache Flink 知其然,知其所以然(原理&实战) 的笔记. 改部分主要讲解了,当流任务失败后恢复,拓扑图改变了,这种情况下该怎么恢复checkpoint. 看 ...

  2. 【FLink】Flink checkpoint 实现数据连续计算 恢复机制 案例实战

    1.概述 本博客是视频:https://www.bilibili.com/video/BV1yk4y1z7Lr?p=26&spm_id_from=pageDriver 的笔记. 2. 案例 2 ...

  3. flink checkpoint 重启_Flink进阶教程:Checkpoint机制原理剖析与参数配置

    在Flink状态管理详解:Keyed State和Operator List State深度解析这篇文章中,我们介绍了Flink的状态都是基于本地的,而Flink又是一个部署在多节点的分布式引擎,分布 ...

  4. flink checkpoint 恢复_Flink解析 | Apache Flink结合Kafka构建端到端的ExactlyOnce处理

    周凯波(宝牛) 阿里巴巴技术专家,四川大学硕士,2010年毕业后加入阿里搜索事业部,从事搜索离线平台的研发工作,参与将搜索后台数据处理架构从MapReduce到Flink的重构.目前在阿里计算平台事业 ...

  5. Flink checkpoint机制以及恢复

    什么是checkpoint? checkpoint 则表示了一个 Flink Job ,在一个特定时刻的一份全局状态状态快照,即包含了一个 job 下 所有 task/operator 某时刻的状态. ...

  6. Flink CheckPoint机制 学习 测试 使用FsStateBackend状态后端 将checkpoint恢复到中断处

    Flink CheckPoint机制 1.实验目的 目的 开启一个Flink程序,使用hdfs做状态后端,手动取消job后,再次恢复job测试,观察程序是否能恢复到检查点,继续读取并处理数据: 实验原 ...

  7. 1.15.Flink state(状态)管理与恢复、什么是state、Keyed State、Operator State、状态容错(生成快照,恢复快照),checkPoint简介,重启策略等

    1.15.Flink state(状态)管理与恢复 1.15.1.什么是state 1.15.2.状态(State) 1.15.3.Keyed State 1.15.4.Operator State ...

  8. flink checkpoint 恢复_干货:Flink+Kafka 0.11端到端精确一次处理语义实现

    2017年12月Apache Flink社区发布了1.4版本.该版本正式引入了一个里程碑式的功能:两阶段提交Sink,即TwoPhaseCommitSinkFunction.该SinkFunction ...

  9. flink checkpoint 恢复_Apache Flink 管理大型状态之增量 Checkpoint 详解

    邱从贤(山智),Apache Flink Contributor,中南大学硕士,2018 年加入阿里巴巴计算平台事业部,专注于 Flink 核心引擎开发,主要从事 Flink  State&C ...

最新文章

  1. python 调用shell或windows命令
  2. linux环境变量显示、添加、删除
  3. DL之ResNeXt:ResNeXt算法的简介(论文介绍)、架构详解、案例应用等配图集合之详细攻略
  4. 「产品规划」的那些事儿
  5. LinearLayout(线性布局)
  6. POJ - 3764 The xor-longest Path(字典树性质)
  7. python数据分析师工作内容_数据分析师日常工作是什么?
  8. 为什么互联网35岁是道坎?
  9. web安全day29:linux日志异地备份
  10. python requests text content_对python requests的content和text方法的区别详解
  11. 基于nodejs的模拟数据分发服务
  12. 拓端tecdat|R语言数据可视化分析案例:探索BRFSS数据
  13. 百度关键词快排虚拟发包程序-SEO快速排名系统
  14. socket编程—UDP套接字
  15. 要做飞思卡尔智能车要学哪些知识?
  16. aria2 配置教程
  17. 北京环球影城游玩攻略
  18. JavaSE_day11【内部类、注解】
  19. 心动C++情牵汉洛塔
  20. 滴滴自动驾驶服务上线,程维:道阻且长,行则将至

热门文章

  1. [转]Linux awk 命令 说明
  2. Extjs--FormPanel(2)
  3. JAVA数字处理类使用2
  4. 浅谈工作流引擎的几个关键因素
  5. 让逻辑关系破缺的最小空间尺寸
  6. 【控制】傅里叶系列(一)傅里叶级数 (Fourier series) 的推导
  7. 【控制】《多智能体系统的协同群集运动控制》陈杰老师-第1章-绪论
  8. LaTex 版面设计
  9. 二、stm32f103+enc28j60
  10. 20051008:看了AppleSeed,领悟到了一个道理: