问题导读

1.Barrier 对齐会造成什么问题?

目前的 Checkpoint 算法在大多数情况下运行良好,然而当作业出现反压时,阻塞式的 Barrier 对齐反而会加剧作业的反压,甚至导致作业的不稳定。

2.Barrier 对齐是否会造成反压?

3.如何理解Unaligned Checkpoint ?

作为 Flink 最基础也是最关键的容错机制,Checkpoint 快照机制很好地保证了 Flink 应用从异常状态恢复后的数据准确性。同时 Checkpoint 相关的 metrics 也是诊断 Flink 应用健康状态最为重要的指标,成功且耗时较短的 Checkpoint 表明作业运行状况良好,没有异常或反压。然而,由于 Checkpoint 与反压的耦合,反压反过来也会作用于 Checkpoint,导致 Checkpoint 的种种问题。针对于此,Flink 在 1.11 引入 Unaligned Checkpint 来解耦 Checkpoint 机制与反压机制,优化高反压情况下的 Checkpoint 表现。

当前 Checkpoint 机制简述

相信不少读者对 Flink Checkpoint 基于 Chandy-Lamport 算法的分布式快照已经比较熟悉,该节简单回顾下算法的基础逻辑,熟悉算法的读者可放心跳过。

Chandy-Lamport 算法将分布式系统抽象成 DAG(暂时不考虑有闭环的图),节点表示进程,边表示两个进程间通信的管道。分布式快照的目的是记录下整个系统的状态,即可以分为节点的状态(进程的状态)和边的状态(信道的状态,即传输中的数据)。因为系统状态是由输入的消息序列驱动变化的,我们可以将输入的消息序列分为多个较短的子序列,图的每个节点或边先后处理完某个子序列后,都会进入同一个稳定的全局统状态。利用这个特性,系统的进程和信道在子序列的边界点分别进行本地快照,即使各部分的快照时间点不同,最终也可以组合成一个有意义的全局快照。

图1. Checkpoint Barrier

从实现上看,Flink 通过在 DAG 数据源定时向数据流注入名为 Barrier 的特殊元素,将连续的数据流切分为多个有限序列,对应多个 Checkpoint 周期。每当接收到 Barrier,算子进行本地的 Checkpoint 快照,并在完成后异步上传本地快照,同时将 Barrier 以广播方式发送至下游。当某个 Checkpoint 的所有 Barrier 到达 DAG 末端且所有算子完成快照,则标志着全局快照的成功。

图2. Barrier Alignment

在有多个输入 Channel 的情况下,为了数据准确性,算子会等待所有流的 Barrier 都到达之后才会开始本地的快照,这种机制被称为 Barrier 对齐。在对齐的过程中,算子只会继续处理的来自未出现 Barrier Channel 的数据,而其余 Channel 的数据会被写入输入队列,直至在队列满后被阻塞。当所有 Barrier 到达后,算子进行本地快照,输出 Barrier 到下游并恢复正常处理。

比起其他分布式快照,该算法的优势在于辅以 Copy-On-Write 技术的情况下不需要 “Stop The World” 影响应用吞吐量,同时基本不用持久化处理中的数据,只用保存进程的状态信息,大大减小了快照的大小。

Checkpoint 与反压的耦合

目前的 Checkpoint 算法在大多数情况下运行良好,然而当作业出现反压时,阻塞式的 Barrier 对齐反而会加剧作业的反压,甚至导致作业的不稳定。

首先, Chandy-Lamport 分布式快照的结束依赖于 Marker 的流动,而反压则会限制 Marker 的流动,导致快照的完成时间变长甚至超时。无论是哪种情况,都会导致 Checkpoint 的时间点落后于实际数据流较多。这时作业的计算进度是没有被持久化的,处于一个比较脆弱的状态,如果作业出于异常被动重启或者被用户主动重启,作业会回滚丢失一定的进度。如果 Checkpoint 连续超时且没有很好的监控,回滚丢失的进度可能高达一天以上,对于实时业务这通常是不可接受的。更糟糕的是,回滚后的作业落后的 Lag 更大,通常带来更大的反压,形成一个恶性循环。

其次,Barrier 对齐本身可能成为一个反压的源头,影响上游算子的效率,而这在某些情况下是不必要的。比如典型的情况是一个的作业读取多个 Source,分别进行不同的聚合计算,然后将计算完的结果分别写入不同的 Sink。通常来说,这些不同的 Sink 会复用公共的算子以减少重复计算,但并不希望不同 Source 间相互影响。

图3. Barrier Alignment 阻塞上游 Task

假设一个作业要分别统计 A 和 B 两个业务线的以天为粒度指标,同时还需要统计所有业务线以周为单位的指标,拓扑如上图所示。如果 B 业务线某天的业务量突涨,使得 Checkpoint Barrier 有延迟,那么会导致公用的 Window Aggregate 进行 Barrier 对齐,进而阻塞业务 A 的 FlatMap,最终令业务 A 的计算也出现延迟。

当然这种情况可以通过拆分作业等方式优化,但难免引入更多开发维护成本,而且更重要的是这本来就符合 Flink 用户常规的开发思路,应该在框架内尽量减小出现用户意料之外的行为的可能性。

Unaligned Checkpoint

为了解决这个问题,Flink 在 1.11 版本引入了 Unaligned Checkpoint 的特性。要理解 Unaligned Checkpoint 的原理,首先需要了解 Chandy-Lamport 论文中对于 Marker 处理规则的描述:

图4. Chandy-Lamport Marker 处理

其中关键是 if q has not recorded its state,也就是接收到 Marker 时算子是否已经进行过本地快照。一直以来 Flink 的 Aligned Checkpoint 通过 Barrier 对齐,将本地快照延迟至所有 Barrier 到达,因而这个条件是永真的,从而巧妙地避免了对算子输入队列的状态进行快照,但代价是比较不可控的 Checkpoint 时长和吞吐量的降低。实际上这和 Chandy-Lamport 算法是有一定出入的。

举个例子,假设我们对两个数据流进行 equal-join,输出匹配上的元素。按照 Flink Aligned Checkpoint 的方式,系统的状态变化如下(图中不同颜色的元素代表属于不同的 Checkpoint 周期):

图5. Aligned Checkpoint 状态变化

图 a: 输入 Channel 1 存在 3 个元素,其中 2 在 Barrier 前面;Channel 2 存在 4 个元素,其中 2、9、7 在 Barrier 前面。

图 b: 算子分别读取 Channel 一个元素,输出 2。随后接收到 Channel 1 的 Barrier,停止处理 Channel 1 后续的数据,只处理 Channel 2 的数据。

图 c: 算子再消费 2 个自 Channel 2 的元素,接收到 Barrier,开始本地快照并输出 Barrier。

对于相同的情况,Chandy-Lamport 算法的状态变化如下:

图6. Chandy-Lamport 状态变化

图 a: 同上。

图 b: 算子分别处理两个 Channel 一个元素,输出结果 2。此后接收到 Channel 1 的 Barrier,算子开始本地快照记录自己的状态,并输出 Barrier。

图 c: 算子继续正常处理两个 Channel 的输入,输出 9。特别的地方是 Channel 2 后续元素会被保存下来,直到 Channel 2 的 Barrier 出现(即 Channel 2 的 9 和 7)。保存的数据会作为 Channel 的状态成为快照的一部分。

两者的差异主要可以总结为两点:

快照的触发是在接收到第一个 Barrier 时还是在接收到最后一个 Barrier 时。

是否需要阻塞已经接收到 Barrier 的 Channel 的计算。

从这两点来看,新的 Unaligned Checkpoint 将快照的触发改为第一个 Barrier 且取消阻塞 Channel 的计算,算法上与 Chandy-Lamport 基本一致,同时在实现细节方面结合 Flink 的定位做了几个改进。

首先,不同于 Chandy-Lamport 模型的只需要考虑算子输入 Channel 的状态,Flink 的算子有输入和输出两种 Channel,在快照时两者的状态都需要被考虑。

其次,无论在 Chandy-Lamport 还是 Flink Aligned Checkpoint 算法中,Barrier 都必须遵循其在数据流中的位置,算子需要等待 Barrier 被实际处理才开始快照。而 Unaligned Checkpoint 改变了这个设定,允许算子优先摄入并优先输出 Barrier。如此一来,第一个到达 Barrier 会在算子的缓存数据队列(包括输入 Channel 和输出 Channel)中往前跳跃一段距离,而被”插队”的数据和其他输入 Channel 在其 Barrier 之前的数据会被写入快照中(图中黄色部分)。

图7. Barrier 越过数据

这样的主要好处是,如果本身算子的处理就是瓶颈,Chandy-Lamport 的 Barrier 仍会被阻塞,但 Unaligned Checkpoint 则可以在 Barrier 进入输入 Channel 就马上开始快照。这可以从很大程度上加快 Barrier 流经整个 DAG 的速度,从而降低 Checkpoint 整体时长。

回到之前的例子,用 Unaligned Checkpoint 来实现,状态变化如下:

图8. Unaligned-Checkpoint 状态变化

图 a: 输入 Channel 1 存在 3 个元素,其中 2 在 Barrier 前面;Channel 2 存在 4 个元素,其中 2、9、7 在 Barrier 前面。输出 Channel 已存在结果数据 1。

图 b: 算子优先处理输入 Channel 1 的 Barrier,开始本地快照记录自己的状态,并将 Barrier 插到输出 Channel 末端。

图 c: 算子继续正常处理两个 Channel 的输入,输出 2、9。同时算子会将 Barrier 越过的数据(即输入 Channel 1 的 2 和输出 Channel 的 1)写入 Checkpoint,并将输入 Channel 2 后续早于 Barrier 的数据(即 2、9、7)持续写入 Checkpoint。

比起 Aligned Checkpoint 中不同 Checkpoint 周期的数据以算子快照为界限分隔得很清晰,Unaligned Checkpoint 进行快照和输出 Barrier 时,部分本属于当前 Checkpoint 的输入数据还未计算(因此未反映到当前算子状态中),而部分属于当前 Checkpoint 的输出数据却落到 Barrier 之后(因此未反映到下游算子的状态中)。这也正是 Unaligned 的含义: 不同 Checkpoint 周期的数据没有对齐,包括不同输入 Channel 之间的不对齐,以及输入和输出间的不对齐。而这部分不对齐的数据会被快照记录下来,以在恢复状态时重放。换句话说,从 Checkpoint 恢复时,不对齐的数据并不能由 Source 端重放的数据计算得出,同时也没有反映到算子状态中,但因为它们会被 Checkpoint 恢复到对应 Channel 中,所以依然能提供只计算一次的准确结果。

当然,Unaligned Checkpoint 并不是百分百优于 Aligned Checkpoint,它会带来的已知问题就有:

由于要持久化缓存数据,State Size 会有比较大的增长,磁盘负载会加重。

随着 State Size 增长,作业恢复时间可能增长,运维管理难度增加。

目前看来,Unaligned Checkpoint 更适合容易产生高反压同时又比较重要的复杂作业。对于像数据 ETL 同步等简单作业,更轻量级的 Aligned Checkpoint 显然是更好的选择。

总结

Flink 1.11 的 Unaligned Checkpoint 主要解决在高反压情况下作业难以完成 Checkpoint 的问题,同时它以磁盘资源为代价,避免了 Checkpoint 可能带来的阻塞,有利于提升 Flink 的资源利用率。随着流计算的普及,未来的 Flink 应用大概会越来越复杂,在未来经过实战打磨完善后 Unaligned Checkpoint 很有可能会取代 Aligned Checkpoint 成为 Flink 的默认 Checkpoint 策略。

转载:https://www.sohu.com/a/407428726_797717

Flink的非Barrier对齐可以优化高反压相关推荐

  1. 一文弄懂Flink网络流控及反压

    一文弄懂Flink网络流控及反压 1. 为什么需要网络流控? 2. 网络流控的实现:静态限速 3. 网络流控的实现:动态反馈/自动反压 3.1 案例一:Storm 反压实现 3.2 案例二:Spark ...

  2. Flink教程(31)- Flink网络流控及反压

    文章目录 01 引言 02 为什么需要网络流控? 03 网络流控的实现:静态限速 04 网络流控的实现:动态反馈/自动反压 4.1 案例一:Storm 反压实现 4.2 案例二:Spark Strea ...

  3. Flink 网络流控和反压剖析详解

    传送门:Flink 系统性学习笔记 前言: 本文根据 Apache Flink 系列直播整理而成,由 Apache Flink Contributor.OPPO 大数据平台研发负责人张俊老师分享,社区 ...

  4. Flink反压如何排查

    Flink反压利用了网络传输和动态限流.Flink的任务的组成由流和算子组成,那么流中的数据在算子之间转换的时候,会放入分布式的阻塞队列中.当消费者的阻塞队列满的时候,则会降低生产者的处理速度. 如上 ...

  5. 基于结构化数据的文本生成:非严格对齐生成任务及动态轻量的GCN生成模型

    作者|邴立东.程丽颖.付子豪.张琰等 单位|阿里巴巴达摩院.香港中文大学等 摘要 基于结构化数据生成文本(data-to-text)的任务旨在生成人类可读的文本来直观地描述给定的结构化数据.然而,目前 ...

  6. 全尺度表示的上下文非局部对齐

    [写在前面] 基于文本的人物搜索旨在使用人物的描述性句子在图像库中检索目标人物.这是一个非常具有挑战性的问题,因为模态差异使得有效提取鉴别特征更加困难.此外,行人图像和描述的类间方差很小.因此,需要综 ...

  7. Apache Flink 在京东的实践与优化

    简介: Flink 助力京东实时计算平台朝着批流一体的方向演进. 本文整理自京东高级技术专家付海涛在 Flink Forward Asia 2020 分享的议题<Apache Flink 在京东 ...

  8. 【Flink】 Flink 源码之 Buffer Timeout优化

    1.概述 转载:Flink 源码之 Buffer Timeout优化 2.Buffer Timeout 概念 Flink每个算子向下游发送数据需要两个条件: 输出buffer空间占满 buffer中数 ...

  9. Flink SQL流式聚合Mini-Batch优化原理浅析

    点击上方蓝色字体,选择"设为星标" 回复"面试"获取更多惊喜 Hi,我是王知无,一个大数据领域的原创作者. 放心关注我,获取更多行业的一手消息. 前言 流式聚合 ...

最新文章

  1. IMAX融资5000万美元,三年内要打造25个VR项目
  2. 疫情之下的网站优化该怎样进行?
  3. Python进阶-----property用法(实现了get,set,delete三种方法)
  4. How far away ?
  5. maven+jenkins自动化构件
  6. pitstop插件使用说明_PDF其他及PitStop插件
  7. P3246 [HNOI2016]序列(莫队+单调栈+ST表)
  8. python安装scrapy_Python安装Scrapy的种种
  9. 三维点云学习(3)6- 实现K-Means
  10. linux创建空镜像,Docker创建base镜像
  11. spring boot对输入的字符串进行html转码
  12. OpenStack_I版 5.Nova部署
  13. pandas nat_EDA神器pandas-profiling万岁?
  14. HTML网页实训的目的,网页设计实习目的及意义
  15. 软件工程经济学期末复习第二章
  16. 贝塔分布与狄利克雷分布
  17. 【安卓R 源码】获取音频焦点和释放音频焦点
  18. 为什么戏说php,戏说PHP——1.1切的开始
  19. 对vector<int> 的数组使用sort排序出错
  20. 川大计算机学硕扩招,2021考研又有高校扩招!学硕停招!部分院校专硕取消全日制招生!...

热门文章

  1. 【Python之正则表达式与JSON】
  2. 惠普战x适合计算机专业吗,惠普战x锐龙版怎么样?电脑值得入手吗?
  3. 微信公众号流量主的实际收益(本人亲测)
  4. php生成PDF文件
  5. 探讨刀塔传奇是怎么防止客户端作弊的
  6. centos 配置java环境
  7. 2015南阳CCPC E - Ba Gua Zhen 高斯消元 xor最大
  8. 10%干股、65K高薪!本周新增多项高福利急聘职位
  9. 海康硬盘录像机接入RTSP/onvif协议安防视频平台EasyNVR的注意事项
  10. 顺丰菜鸟之争落幕:今日12时起恢复数据传输