一文搞懂 Flink 的 Exactly Once 和 At Least Once
- 介绍 CheckPoint 如何保障 Flink 任务的高可用
- CheckPoint 中的状态简介
- 如何实现全域一致的分布式快照?
- 什么是 barrier?什么是 barrier 对齐?
- 证明了:为什么 barrier 对齐就是 Exactly Once,为什么 barrier 不对齐就是 At Least Once。
Flink 简介
- 当应用程序搜索某些事件模式时,状态将存储到目前为止遇到的事件序列。
- 在每分钟/小时/天聚合事件时,状态保存待处理的聚合。
- 当在数据点流上训练机器学习模型时,状态保持模型参数的当前版本。
- 当需要管理历史数据时,状态允许有效访问过去发生的事件。
- 比如:我们只是进行一个字符串拼接,输入 a,输出 a_666,输入b,输出 b_666输出的结果跟之前的状态没关系,符合幂等性。
- 幂等性:就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用
- 计算 pv、uv。
- 输出的结果跟之前的状态有关系,不符合幂等性,访问多次,pv 会增加。
Flink 的 CheckPoint 功能简介
- 例:(0,1000)表示 0 号 partition 目前消费到 offset 为 1000 的数据
- 例:(app1,50000)(app2,10000)
- 表示 app1 当前 pv 值为 50000
- 表示 app2 当前 pv 值为 10000
- 每来一条数据,只需要确定相应 app_id,将相应的 value 值 +1 后 put 到 map 中即可。
- offset:(0,1000)
- pv:(app1,50000)(app2,10000)
- 记录的其实就是第 n 次 CheckPoint 消费的 offset 信息和各 app 的 pv 值信息,记录一下发生 CheckPoint 当前的状态信息,并将该状态信息保存到相应的状态后端。(注:状态后端是保存状态的地方,决定状态如何保存,如何保障状态高可用,我们只需要知道,我们能从状态后端拿到 offset 信息和 pv 信息即可。状态后端必须是高可用的,否则我们的状态后端经常出现故障,会导致无法通过 checkpoint 来恢复我们的应用程序)
- chk-100
- 该状态信息表示第 100 次 CheckPoint 的时候, partition 0 offset 消费到了 1000,pv 统计结果为(app1,50000)(app2,10000)。
- 假如我们设置了三分钟进行一次 CheckPoint,保存了上述所说的 chk-100 的 CheckPoint 状态后,过了十秒钟,offset 已经消费到(0,1100),pv 统计结果变成了(app1,50080)(app2,10020),但是突然任务挂了,怎么办?
- 莫慌,其实很简单,flink只需要从最近一次成功的 CheckPoint 保存的offset(0,1000)处接着消费即可,当然pv值也要按照状态里的 pv 值(app1,50000)(app2,10000)进行累加,不能从(app1,50080)(app2,10020)处进行累加,因为 partition 0 offset 消费到 1000 时,pv 统计结果为(app1,50000)(app2,10000)。当然如果你想从 offset (0,1100)pv(app1,50080)(app2,10020)这个状态恢复,也是做不到的,因为那个时刻程序突然挂了,这个状态根本没有保存下来。我们能做的最高效方式就是从最近一次成功的 CheckPoint 处恢复,也就是我一直所说的 chk-100。
- 以上讲解,基本就是 CheckPoint 承担的工作,描述的场景比较简单。
- barrier 从 Source Task 处生成,一直流到 Sink Task,期间所有的 Task 只要碰到barrier,就会触发自身进行快照。
- CheckPoint barrier n-1 处做的快照就是指 Job 从开始处理到 barrier n-1所有的状态数据。
- barrier n 处做的快照就是指从 Job 开始到处理到 barrier n 所有的状态数据。
- 对应到 pv 案例中就是,SourceTask 接收到 JobManager 的编号为 chk-100 的 CheckPoint 触发请求后,发现自己恰好接收到 kafka offset(0,1000)处的数据,所以会往 offset(0,1000)数据之后 offset(0,1001)数据之前安插一个 barrier,然后自己开始做快照,也就是将 offset(0,1000)保存到状态后端 chk-100 中。然后 barrier 接着往下游发送,当统计 pv 的 task 接收到 barrier 后,也会暂停处理数据,将自己内存中保存的 pv 信息(app1,50000)。(app2,10000)保存到状态后端 chk-100 中。OK,Flink 大概就是通过这个原理来保存快照的。
- 统计 pv 的 task 接收到 barrier,就意味着 barrier 之前的数据都处理了,所以说,不会出现丢数据的情况。
- barrier 的作用就是为了把数据区分开,CheckPoint 过程中有一个同步做快照的环节不能处理 barrier 之后的数据,为什么呢?
- 如果做快照的同时,也在处理数据,那么处理的数据可能会修改快照内容,所以先暂停处理数据,把内存中快照保存好后,再处理数据。
- 结合案例来讲就是,统计 pv 的 task 想对(app1,50000)(app2,10000)做快照,但是如果数据还在处理,可能快照还没保存下来,状态已经变成了(app1,50001)(app2,10001),快照就不准确了,就不能保障 Exactly Once 了。
- Flink 是在数据中加了一个叫做 barrier 的东西(barrier 中文翻译:栅栏),上图中红圈处就是两个 barrier。
- 流式计算中状态交互
- 消费到 Y 位置的时候,将 Y 对应的状态保存下来
- 消费到 X 位置的时候,将 X 对应的状态保存下来
- 周期性地对消费 offset 和统计的状态信息或统计结果进行快照
多并行度、多 Operator 情况下,CheckPoint 过程
- 如何确保状态拥有精确一次的容错保证?
- 如何在分布式场景下替多个拥有本地状态的算子产生一个全域一致的快照?
- 如何在不中断运算的前提下产生快照?
- JobManager 端的 CheckPointCoordinator 向所有 SourceTask 发送 CheckPointTrigger,Source Task 会在数据流中安插 CheckPoint barrier。
- 当 task 收到所有的 barrier 后,向自己的下游继续传递 barrier,然后自身执行快照,并将自己的状态异步写入到持久化存储中。
- 增量 CheckPoint 只是把最新的一部分更新写入到 外部存储;
- 为了下游尽快做 CheckPoint,所以会先发送 barrier 到下游,自身再同步进行快照;
- 当 task 完成备份后,会将备份数据的地址(state handle)通知给 JobManager 的 CheckPointCoordinator。
- 如果 CheckPoint 的持续时长超过了 CheckPoint 设定的超时时间,CheckPointCoordinator 还没有收集完所有的 State Handle,CheckPointCoordinator 就会认为本次 CheckPoint 失败,会把这次 CheckPoint 产生的所有状态数据全部删除。
- 最后 CheckPointCoordinator 会把整个 StateHandle 封装成 completed CheckPoint Meta,写入到 hdfs。
- 什么是 barrier 对齐?
- 什么是 barrier 不对齐?
- 为什么要进行 barrier 对齐?不对齐到底行不行?
- 结合 pv 案例来看,之前的案例为了简单,描述的 kafka 的 topic 只有 1 个 partition,这里为了讲述 barrier 对齐,所以 topic 有 2 个 partittion。
- chk-100
- offset:(0,10000)(1,10005)
- pv:(app0,8000) (app1,12050)
- Source 的 kafka 的 Consumer,从 kakfa 中读取数据到 Flink 应用中
- TaskA 中的 map 将读取到的一条 kafka 日志转换为我们需要统计的 app_id
- keyBy 按照 app_id 进行 keyBy,相同的 app_id 会分到下游 TaskB的同一个实例中
- TaskB 的 map 在状态中查出该 app_id 对应的 pv 值,然后 +1,存储到状态中
- 利用 Sink 将统计的 pv 值写入到外部存储介质中
- JobManager 内部有个定时调度,假如现在 10 点 00 分 00 秒到了第 100 次 CheckPoint 的时间了,JobManager 的 CheckPointCoordinator 进程会向所有的 Source Task 发送 CheckPointTrigger,也就是向 TaskA0、TaskA1 发送 CheckPointTrigger。
- TaskA0、TaskA1 接收到 CheckPointTrigger,会往数据流中安插 barrier,将 barrier 发送到下游,在自己的状态中记录 barrier 安插的 offset 位置,然后自身做快照,将 offset 信息保存到状态后端。然后 TaskA 的 map 和 keyBy 算子中并没有状态,所以不需要进行快照。
- 接着数据和 barrier 都向下游 TaskB 发送,相同的 app_id 会发送到相同的TaskB实例上,这里假设有两个 app:app0 和 app1,经过 keyBy 后,假设 app0 分到了 TaskB0 上,app1 分到了 TaskB1 上。基于上面描述,TaskA0 和 TaskA1 中的所有 app0 的数据都发送到 TaskB0 上,所有 app1 的数据都发送到 TaskB1 上。
- 现在我们假设 TaskB0 做 CheckPoint 的时候 barrier 对齐了,TaskB1 做 CheckPoint 的时候 barrier 不对齐,当然不能这么配置,我就是举这么个例子,带大家分析一下 barrier 对不对齐到底对统计结果有什么影响?
- 上面说了 chk-100 的这次 CheckPoint,offset 位置为(0,10000)(1,10005),TaskB0 使用 barrier 对齐,也就是说 TaskB0 不会处理 barrier 之后的数据,所以TaskB0 在 chk-100 快照的时候,状态后端保存的 app0 的 pv 数据是从程序开始启动到 kafkaoffset 位置为(0,10000)(1,10005)的所有数据计算出来的 pv 值,一条不多(没处理 barrier 之后,所以不会重复),一条不少(barrier 之前的所有数据都处理了,所以不会丢失),假如保存的状态信息为(app0,8000)表示消费到(0,10000)(1,10005)offset 的时候,app0 的 pv 值为 8000。
- TaskB1 使用的 barrier 不对齐,假如 TaskA0 由于服务器的 CPU 或者网络等其他波动,导致 TaskA0 处理数据较慢,而 TaskA1 很稳定,所以处理数据比较快。导致的结果就是 TaskB1 先接收到了 TaskA1 的 barrier,由于配置的 barrier 不对齐,所以 TaskB1 会接着处理 TaskA1 barrier 之后的数据,过了 2 秒后,TaskB1 接收到了 TaskA0 的 barrier,于是对状态中存储的 app1 的 pv 值开始做 CheckPoint 快照,保存的状态信息为(app1,12050),但是我们知道这个(app1,12050)实际上多处理了 2 秒 TaskA1 发来的 barrier 之后的数据,也就是 kafka topic 对应的 partition1 offset 10005 之后的数据,app1 真实的 pv 数据肯定要小于这个 12050,partition1 的 offset 保存的 offset 虽然是 10005,但是我们实际上可能已经处理到了 offset 10200 的数据,假设就是处理到了 10200。
- chk-100
- offset:(0,10000)(1,10005)
- pv:(app0,8000) (app1,12050)
- Flink 同样会起四个 Operator 实例,我还称他们是 TaskA0、TaskA1、TaskB0、TaskB1。四个 Operator 会从状态后端读取保存的状态信息。
- 从 offset:(0,10000)(1,10005) 开始消费,并且基于 pv:(app0,8000) (app1,12050)值进行累加统计
- 然后你就应该会发现这个 app1 的 pv 值 12050 实际上已经包含了 partition1 的 offset 10005~10200 的数据,所以 partition1 从 offset 10005 恢复任务时,partition1 的 offset 10005~10200 的数据被消费了两次。
- TaskB1 设置的 barrier 不对齐,所以 CheckPoint chk-100 对应的状态中多消费了 barrier 之后的一些数据(TaskA1 发送),重启后是从 chk-100 保存的 offset 恢复,这就是所说的 At Least Once。
- 由于上面说 TaskB0 设置的 barrier 对齐,所以 app0 不会出现重复消费,因为 app0 没有消费 offset:(0,10000)(1,10005) 之后的数据,也就是所谓的 Exactly Once。
- 首先设置了 Flink 的 CheckPoint 语义是:Exactly Once。
- Operator 实例必须有多个输入流才会出现 barrier 对齐。
- 对齐,汉语词汇,释义为使两个以上事物配合或接触得整齐。由汉语解释可得对齐肯定需要两个以上事物,所以,必须有多个流才叫对齐。barrier 对齐其实也就是上游多个流配合使得数据对齐的过程。
- 言外之意:如果 Operator 实例只有一个输入流,就根本不存在 barrier 对齐,自己跟自己默认永远都是对齐的。
- An Overview of End-to-End Exactly-OnceProcessing in Apache Flink (with Apache Kafka, too!)
- Managing Large State in Apache Flink: AnIntro to Incremental Checkpointing
- State & Fault Tolerance
- Checkpoints
- Savepoints
- State Backends
- Tuning Checkpoints and Large State
- Data Streaming Fault Tolerance
- 1.2
- 2.3 Flink Checkpoint-轻量级分布式快照
- 2.11 Flink State 最佳实践
一文搞懂 Flink 的 Exactly Once 和 At Least Once相关推荐
- 一文搞懂 Flink 网络流控与反压机制
看完本文,你能get到以下知识 Flink 流处理为什么需要网络流控? Flink V1.5 版之前网络流控介绍 Flink V1.5 版之前的反压策略存在的问题 Credit的反压策略实现原理,Cr ...
- 一文搞懂 checkpoint 全过程
前言 前面我们讲解了 一文搞懂 Flink 处理 Barrier 全过程 和 一文搞定 Flink Checkpoint Barrier 全流程 基本上都是跟 checkpoint 相关.这次我们就具 ...
- 这一次,你能彻底搞懂 Flink!
近年来,AI 场景发展得如火如荼,同时其计算规模也越来越大.这也让专注于数据处理的 Flink 有了较大的发展空间.Flink作为在大数据生态里实时处理的一个新框架,在一定程度上也有一定的难度. Fl ...
- 一文搞懂RNN(循环神经网络)
基础篇|一文搞懂RNN(循环神经网络) https://mp.weixin.qq.com/s/va1gmavl2ZESgnM7biORQg 神经网络基础 神经网络可以当做是能够拟合任意函数的黑盒子,只 ...
- 一文搞懂 Python 的 import 机制
一.前言 希望能够让读者一文搞懂 Python 的 import 机制 1.什么是 import 机制? 通常来讲,在一段 Python 代码中去执行引用另一个模块中的代码,就需要使用 Python ...
- python语言语句快的标记是什么_一文搞懂Python程序语句
原标题:一文搞懂Python程序语句 程序流 Python 程序中常用的基本数据类型,包括: 内置的数值数据类型 Tuple 容器类型 String 容器类型 List 容器类型 自然的顺序是从页面或 ...
- 一文搞懂 Java 线程中断
转载自 一文搞懂 Java 线程中断 在之前的一文<如何"优雅"地终止一个线程>中详细说明了 stop 终止线程的坏处及如何优雅地终止线程,那么还有别的可以终止线程 ...
- 一文搞懂HMM(隐马尔可夫模型)-Viterbi algorithm
***一文搞懂HMM(隐马尔可夫模型)*** 简单来说,熵是表示物质系统状态的一种度量,用它老表征系统的无序程度.熵越大,系统越无序,意味着系统结构和运动的不确定和无规则:反之,,熵越小,系统越有序, ...
- 一文搞懂如何使用Node.js进行TCP网络通信
摘要: 网络是通信互联的基础,Node.js提供了net.http.dgram等模块,分别用来实现TCP.HTTP.UDP的通信,本文主要对使用Node.js的TCP通信部份进行实践记录. 本文分享自 ...
- 【UE·蓝图底层篇】一文搞懂NativeClass、GeneratedClass、BlueprintClass、ParentClass
本文将对蓝图类UBlueprint的几个UClass成员变量NativeClass.GeneratedClass.BlueprintClass.ParentClass进行比较深入的讲解,看完之后对蓝图 ...
最新文章
- python适合做后端开发吗-pythonWeb后端开发好呢?还是从事网络爬虫比较好呢?
- 用VLC读取摄像头产生RTSP流,DSS侦听并转发(二)
- 配置zabbix监控windows,cmd运行报错cannot connect to Service Manager: [0x00000005]
- python改变字符串类型_python – Sklearn将字符串类标签更改为int
- pytorch教程龙曲良06-10
- 机器学习和深度学习笔记(Matlab语言实现)
- Go 函数特性和网络爬虫示例
- linux gpt分区看不到,Linux无法看到我的任何分区 – 备份GPT表不在磁盘的末尾
- c语言数组最大可定义多少位_C语言求数组的最大值三种方法
- lnmp 1.4 mysql_lnmp1.4配置https教程
- loading窗口动画 web_分享web前端七款HTML5 Loading动画特效集锦
- linux内核3,升级linux内核到3.10
- 第四章需求分析与设计工具
- 编制现金流量表3个步骤!
- 纯HTML标签详解(摘自阿里西西)
- vs以管理员身份运行
- ~艾比郎~学Python之Python基础
- win10连接filco蓝牙键盘
- 没有全景相机,普通人如何用krpano做属于自己的全景图
- Ubuntu 下yuma源码安装