摘要:

本文基于 Flink 1.9.0 和 Kafka 2.3 版本,对 Flink kafka 端到端 Exactly-Once 进行分析及 notifyCheckpointComplete 顺序,主要内容分为以下两部分:

1.Flink-kafka 两阶段提交源码分析

TwoPhaseCommitSinkFunction 分析

2.Flink 中 notifyCheckpointComplete 方法调用顺序

定义

样例

operator 调用 notifyCheckpointComplete

对 Exactly-Once 语义的影响

Tips:Flink 中文社区征稿啦,感兴趣的同学可点击「阅读原文」了解详情~

Flink-kafka 两阶段提交源码分析

FlinkKafkaProducer 实现了 TwoPhaseCommitSinkFunction,也就是两阶段提交。关于两阶段提交的原理,可以参见《An Overview of End-to-End Exactly-Once Processing in Apache Flink》,本文不再赘述两阶段提交的原理,但是会分析 FlinkKafkaProducer 源码中是如何实现两阶段提交的,并保证了在结合 Kafka 的时候做到端到端的 Exactly Once 语义的。

https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

TwoPhaseCommitSinkFunction 分析

public abstract class TwoPhaseCommitSinkFunctionextends RichSinkFunctionimplements CheckpointedFunction, CheckpointListener

TwoPhaseCommitSinkFunction 实现了 CheckpointedFunction 和 CheckpointListener 接口,首先就是在 initializeState 方法中开启事务,对于 Flink sink 的两阶段提交,第一阶段就是执行 CheckpointedFunction#snapshotState 当所有 task 的 checkpoint 都完成之后,每个 task 会执行 CheckpointedFunction#notifyCheckpointComplete 也就是所谓的第二阶段。

■FlinkKafkaProducer 第一阶段分析

@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {// this is like the pre-commit of a 2-phase-commit transaction// we are ready to commit and remember the transactioncheckState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");long checkpointId = context.getCheckpointId();LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder);preCommit(currentTransactionHolder.handle);pendingCommitTransactions.put(checkpointId, currentTransactionHolder);LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);currentTransactionHolder = beginTransactionInternal();LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);state.clear();state.add(new State<>(this.currentTransactionHolder,new ArrayList<>(pendingCommitTransactions.values()),userContext));}

这部分代码的核心在于:

先执行 preCommit 方法,EXACTLY_ONCE 模式下会调 flush,立即将数据发送到指定的 topic,这时如果消费这个 topic,需要指定 isolation.level 为 read_committed 表示消费端应用不可以看到未提交的事物内的消息。

@Overrideprotected void preCommit(FlinkKafkaProducer.KafkaTransactionState transaction) throws FlinkKafkaException {switch (semantic) {case EXACTLY_ONCE:case AT_LEAST_ONCE:flush(transaction);break;case NONE:break;default:throw new UnsupportedOperationException("Not implemented semantic");}checkErroneous();}

注意第一次调用的 send 和 flush 的事务都是在 initializeState 方法中开启事务。

transaction.producer.send(record, callback);

transaction.producer.flush();

pendingCommitTransactions 保存了每个 checkpoint 对应的事务,并为下一次 checkpoint 创建新的 producer 事务,即 currentTransactionHolder = beginTransactionInternal();下一次的 send 和 flush 都会在这个事务中。也就是说第一阶段每一个 checkpoint 都有自己的事务,并保存在 pendingCommitTransactions 中。

■FlinkKafkaProducer 第二阶段分析

当所有 checkpoint 都完成后,会进入第二阶段的提交。

@Overridepublic final void notifyCheckpointComplete(long checkpointId) throws Exception {// the following scenarios are possible here (1) there is exactly one transaction from the latest checkpoint that// was triggered and completed. That should be the common case.// Simply commit that transaction in that case. (2) there are multiple pending transactions because one previous// checkpoint was skipped. That is a rare case, but can happen// for example when: - the master cannot persist the metadata of the last// checkpoint (temporary outage in the storage system) but// could persist a successive checkpoint (the one notified here) - other tasks could not persist their status during// the previous checkpoint, but did not trigger a failure because they// could hold onto their state and could successfully persist it in// a successive checkpoint (the one notified here) In both cases, the prior checkpoint never reach a committed state, but// this checkpoint is always expected to subsume the prior one and cover all// changes since the last successful one. As a consequence, we need to commit// all pending transactions. (3) Multiple transactions are pending, but the checkpoint complete notification// relates not to the latest. That is possible, because notification messages// can be delayed (in an extreme case till arrive after a succeeding checkpoint// was triggered) and because there can be concurrent overlapping checkpoints// (a new one is started before the previous fully finished). ==> There should never be a case where we have no pending transaction here//Iterator>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");Throwable firstError = null;while (pendingTransactionIterator.hasNext()) {Map.Entry> entry = pendingTransactionIterator.next();Long pendingTransactionCheckpointId = entry.getKey();TransactionHolder pendingTransaction = entry.getValue();if (pendingTransactionCheckpointId > checkpointId) {continue;}LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);logWarningIfTimeoutAlmostReached(pendingTransaction);try {commit(pendingTransaction.handle);} catch (Throwable t) {if (firstError == null) {firstError = t;}}LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);pendingTransactionIterator.remove();}if (firstError != null) {throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",firstError);}}

这一阶段会将 pendingCommitTransactions 中的事务全部提交。

@Overrideprotected void commit(FlinkKafkaProducer.KafkaTransactionState transaction) {if (transaction.isTransactional()) {try {transaction.producer.commitTransaction();} finally {recycleTransactionalProducer(transaction.producer);}}}

这时消费端就能看到 read_committed 的数据了,至此整个 producer 的流程全部结束。

■Exactly-Once 分析

当输入源和输出都是 kafka 的时候,Flink 之所以能做到端到端的 Exactly-Once 语义,主要是因为第一阶段 FlinkKafkaConsumer 会将消费的 offset 信息通过checkpoint 保存,所有 checkpoint 都成功之后,第二阶段 FlinkKafkaProducer 才会提交事务,结束 producer 的流程。这个过程中很大程度依赖了 kafka producer 事务的机制。

Flink 中 notifyCheckpointComplete方法调用顺序

定义

notifyCheckpointComplete 方法在 CheckpointListener 接口中定义。

/*** This interface must be implemented by functions/operations that want to receive* a commit notification once a checkpoint has been completely acknowledged by all* participants.*/@PublicEvolvingpublic interface CheckpointListener {/*** This method is called as a notification once a distributed checkpoint has been completed.** Note that any exception during this method will not cause the checkpoint to* fail any more.** @param checkpointId The ID of the checkpoint that has been completed.* @throws Exception*/void notifyCheckpointComplete(long checkpointId) throws Exception;}

简单说这个方法的含义就是在 checkpoint 做完之后,JobMaster 会通知 task 执行这个方法,例如在 FlinkKafkaProducer 中 notifyCheckpointComplete 中做了事务的提交。

样例

下面的程序会被分为两个 task,task1 是 Source: Example Source 和 task2 是 Map -> Sink: Example Sink。

DataStream input = env.addSource(new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties).assignTimestampsAndWatermarks(new CustomWatermarkExtractor())).name("Example Source").keyBy("word").map(new MapFunction() {@Overridepublic KafkaEvent map(KafkaEvent value) throws Exception {value.setFrequency(value.getFrequency() + 1);return value;}});input.addSink(new FlinkKafkaProducer<>("bar",new KafkaSerializationSchemaImpl(),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE)).name("Example Sink");

■ operator 调用 notifyCheckpointComplete

根据上面的例子,task1 中只有一个 source 的 operator,但是 task2 中有两个operator,分别是 map 和 sink。

在 StreamTask 中,调用 task 的 notifyCheckpointComplete 方法。

@Overridepublic void notifyCheckpointComplete(long checkpointId) throws Exception {boolean success = false;synchronized (lock) {if (isRunning) {LOG.debug("Notification of complete checkpoint for task {}", getName());for (StreamOperator> operator : operatorChain.getAllOperators()) {if (operator != null) {operator.notifyCheckpointComplete(checkpointId);}}success = true;}else {LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());}}if (success) {syncSavepointLatch.acknowledgeCheckpointAndTrigger(checkpointId, this::finishTask);}}

其中关键的部分就是:

for (StreamOperator> operator : operatorChain.getAllOperators()) {if (operator != null) {operator.notifyCheckpointComplete(checkpointId);}}

operator 的调用顺序取决于 allOperators 变量,可以看到源码中的注释,operator 是以逆序存放的。

/*** Stores all operators on this chain in reverse order.*/private final StreamOperator>[] allOperators;

也就是说上面客户端的代码,虽然先调用了 map 后调用的 sink,但是实际执行的时候,确实先调用 sink 的 notifyCheckpointComplete 方法,后调用 map 的。

对 Exactly-Once 语义的影响

上面的例子,是先执行 source 的 notifyCheckpointComplete 方法,再执行 sink 的 notifyCheckpointComplete 方法。但是如果把 .keyBy("word") 去掉,那么只会有一个 task,所有 operator 逆序执行,也就是先调用 sink 的 notifyCheckpointComplete 方法再调用 source 的。

为了方便理解整个流程,下文只考察并发度为1的情况,不考虑部分 subtask 成功部分不成功的情况。

Tips:

以下讨论的都是基于 kafka source 和 sink

■ 先 sink 后 source

sink 成功之后 source 执行之前

sink 成功之前

checkpoint 恢复

exactly-once

__consumer_offsets 恢复

重复消费

sink 成功之后 source 执行之前,表示 sink 的 notifyCheckpointComplete 方法执行成功了,但是在执行 source 的 notifyCheckpointComplete 方法之前任务失败。

sink 成功之前,表示 sink 的 notifyCheckpointComplete 方法执行失败,提交事务失败。

测试用例

测试代码主体架构如下:

DataStream input = env.addSource(new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties).assignTimestampsAndWatermarks(new CustomWatermarkExtractor())).name("Example Source").map(new MapFunction() {@Overridepublic KafkaEvent map(KafkaEvent value) throws Exception {value.setFrequency(value.getFrequency() + 1);return value;}});input.addSink(new FlinkKafkaProducer<>("bar",new KafkaSerializationSchemaImpl(),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE)).name("Example Sink");

测试环境采用的是 Flink 1.9.0 Standalone Cluster 模式,一个 JobManager,一个TaskManager,默认只保存一个 checkpoint。

模拟异常的方法,通过 kill -9 杀掉 JobManager 和 TaskManager 进程。

在 FlinkKafkaProducer#commit 方法第一行设置断点,当程序走到这个断点的时候 kill -9 杀掉 JobManager 和 TaskManager 进程,模拟 sink 的notifyCheckpointComplete 方法执行失败的场景;

监控1,通过 bin/kafka-console-consumer.sh --topic bar --bootstrap-server 10.1.236.66:9092 监控 producer 是否 flush 数据;监控2,通过 bin/kafka-console-consumer.sh --topic bar --bootstrap-server 10.1.236.66:9092 --isolation-level read_committed 监控 producer 的事务是否成功提交;监控3,通过 bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server 10.1.236.66:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config /tmp/Consumer.properties 监控 consumer 的offset 是否提交到 kafka;

发送数据一条数据 a,5,1572845161023,当走到断点的时候,说明 consumer 的 checkpoint 已经生成,但是还没有将 offset 提交到 kafka,也就是checkpoint 认为 offset 已经成功发送,但是 kafka 认为并没有发送,监控1有数据,监控2和监控3都没有数据。kill -9 杀掉 JobManager 和 TaskManager进程;

重新启动,并提交作业,不指定 checkpoint 路径。监控1,2,3,都有数据,所以这种情况,监控2,只收到了一次数据,也就是 exactly-once。这时候监控3收到的数据为:partition0 的 offset=37,partition1 的 offset=43,partition2 的 offset=39;

同样1-3步骤,发送数据一条数据 b,6,1572845161023,第4步,启动作业的时候通过-s指定要恢复的 checkpoint 路径,启动后监控1,2都没有数据,但是监控3的数据为:partition0 的 offset=37,partition1 的 offset=43,partition2 的 offset=40,再查看 task 的日志 FlinkKafkaConsumerBase - Consumer subtask 0 restored state: {KafkaTopicPartition{topic='foo', partition=0}=36, KafkaTopicPartition{topic='foo', partition=1}=42, KafkaTopicPartition{topic='foo', partition=2}=39}.,说明 checkpoint 认为上一次 partition2 的 offset=39 已经成功消费,所以恢复之后向 kafka 发送的offset 为 40。这样就导致了 partition2 的 offset=39 这条数据丢失。

同样的方法可以测试 sink 成功之后 source 执行之前的场景,只是这时候需要将断点设置在 TwoPhaseCommitSinkFunction#notifyCheckpointComplete 方法的最后一行,这样就会发现故障之前,监控1,2都是有数据的,监控3没有数据。不指定 checkpoint 路径恢复,监控1,2都会收到数据,这样就导致了重复消费。如果指定 checkpoint 路径消费,那么监控1,2就不会收到数据,保证了 exactly-once。

原因分析

产生上面情况的原因主要就是因为 checkpoint 存储的 offset 和 kafka 中的 offset 不一致导致的。

■ 先 source 后 sink

需要说明的一点这个场景的两个 task 实际是并行的,并没有绝对的先后关系,只是会有这种前后关系的可能。

source 成功之后 sink 执行之前

source 成功之前

checkpoint 恢复

丢数据

__consumer_offsets 恢复

丢数据

source 成功之后 sink 执行之前,表示 source 的 notifyCheckpointComplete 方法执行成功了,但是在执行 sink 的 notifyCheckpointComplete 方法之前任务失败。

source 成功之前,表示 source 的 notifyCheckpointComplete 方法执行失败,提交事务失败。

测试用例

模拟 source 成功之后 sink 执行之前:

需要在上面的用例中加入 keyby 算子,确保生成两个 task,监控3收到数据的时候说明 consumer 的 notifyCheckpointComplete 方法已经执行完。在FlinkKafkaProducer#commit 方法第一行设置断点,当程序走到这个断点并且监控3收到数据的时候,kill -9 杀掉 JobManager 和 TaskManager 进程,模拟 sink 执行 notifyCheckpointComplete 方法失败的场景;

这时候重启作业,checkpoint 和 kafka 中 offset 已经是一致的了,无论是从checkpoint 还是 kafka,都是一样的。所以 source 认为已经成功消费了,不会再读上次的 offset,都会导致数据丢失。

source 成功之前:

对于在 source 之前程序就挂掉,相当于所有的 operator 都没有执行notifyCheckpointComplete 方法,但是 source 的 checkpoint 已经做过了,只是没有将 offset 发送到 kafka,这样只有从 __consumer_offsets 恢复才能保证不丢数据。

小结:本节通过一种极端的测试场景希望让读者可以更深入的理解 Flink 中的  Exactly-Once 语义。在程序挂了以后需要排查是什么原因和什么阶段导致的,才能通过合适的方式恢复作业。在实际的生产环境中,会有重试或者更多的方式保证高可用,也建议保留多个 checkpoint,以便业务上可以恢复正确的数据。

作者介绍:

吴鹏,亚信科技资深工程师,Apache Flink Contributor。先后就职于中兴,IBM,华为。目前在亚信科技负责实时流处理引擎产品的研发。

python flink kafka_Flink Kafka 端到端 Exactly-Once 分析相关推荐

  1. Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理

    文章目录: Apache Flink 应用程序中的 Exactly-Once 语义 Flink 应用程序端到端的 Exactly-Once 语义 示例 Flink 应用程序启动预提交阶段 在 Flin ...

  2. kafka window 启动_Apache Flink结合Kafka构建端到端的Exactly-Once处理

    Apache Flink自2017年12月发布的1.4.0版本开始,为流计算引入了一个重要的里程碑特性:TwoPhaseCommitSinkFunction(相关的Jira).它提取了两阶段提交协议的 ...

  3. flink checkpoint 恢复_干货:Flink+Kafka 0.11端到端精确一次处理语义实现

    2017年12月Apache Flink社区发布了1.4版本.该版本正式引入了一个里程碑式的功能:两阶段提交Sink,即TwoPhaseCommitSinkFunction.该SinkFunction ...

  4. flink checkpoint 恢复_Flink解析 | Apache Flink结合Kafka构建端到端的ExactlyOnce处理

    周凯波(宝牛) 阿里巴巴技术专家,四川大学硕士,2010年毕业后加入阿里搜索事业部,从事搜索离线平台的研发工作,参与将搜索后台数据处理架构从MapReduce到Flink的重构.目前在阿里计算平台事业 ...

  5. 吐血之作 | 流系统Spark/Flink/Kafka/DataFlow端到端一致性实现对比

    长文预警, 全文两万五千多字, 37页word文档的长度 (略有杂乱,有些非常复杂的地方可能需要更多的例子来说明,使得初学者也能很容易看懂,但是实在花的时间已经太多太多了,留待后边利用起碎片时间一点点 ...

  6. 端到端一致性,流系统Spark/Flink/Kafka/DataFlow对比总结(压箱宝具呕血之作)

    前 这篇文章可以说是作者压箱底儿的知识总结(之一,毕竟作者学的东西很杂 ╮( ̄▽ ̄"")╭ )了. 断断续续写了将近三个月, 耗费了大量的精力, 本来的目的本来只是想对比一下各个s ...

  7. Flink 状态一致性:端到端状态一致性的保证

    文章目录 状态一致性 什么是状态一致性 状态一致性种类 端到端(end-to-end)状态一致性 Sink端到端状态一致性的保证 Flink+Kafka端到端状态一致性的保证 状态一致性 什么是状态一 ...

  8. kafka key的作用_kafka系列(kafka端到端原理分析)

    Kafka 端到端源码解析 Kafka的场景 Kafka概念 Topic 创建与删除 Topic状态流转 一些问题 Topic分区初始化选择 kafka producer解析 1. 发送流程 2. 分 ...

  9. StarRocks X Flink CDC,打造端到端实时链路

    实时数仓建设背景 实时数仓需求 随着互联网行业的飞速发展,企业业务种类变得越来越多,数据量也变得越来越大.以 Apache Hadoop 生态为核心的数据看板业务一般只能实现离线的业务.在部分领域,数 ...

  10. Flink的端对端精准一次处理(Exactly-Once)

    文章目录 Flink的Exactly Once 从Flink和Kafka组合来理解Exactly_Once Two-Phase Commit(两阶段提交协议) 两阶段提交Flink中应用 Flink的 ...

最新文章

  1. html 整个页面变灰
  2. 组合模式——透明组合模式,安全组合模式
  3. 获取cookie里面的值
  4. 逐行对比两个文件内容的好用软件
  5. [转载] 《Python语言程序设计》课程笔记
  6. UI设计师必须收藏,超好用Figma工具包
  7. linux usb拔出防止抖动,Linux 下监控USB设备拔插事件
  8. 【Unity3D入门教程】Unity3D播放音频和视频
  9. 一文搞懂深度学习所有工具——Anaconda、CUDA、cuDNN
  10. 远程控制设置 串口服务器,TCP232串口服务器连接远程控制电脑设置方法
  11. Java使用jfreechart画饼图_JFreeChart饼图
  12. itest软件测试工具,itest(爱测试)
  13. 视频传输协议之MPEG-DASH
  14. 2018——广东工业大学校赛题解
  15. android三星打印插件,三星打印服务插件 Samsung Print Service for Android
  16. 北斗系统海拔高度测试软件,GPS海拔测量仪手机版
  17. 献给1986,1987年出生的人
  18. 仿微博视频边下边播之封装播放器
  19. 宝塔Linux面板搭建网站(超简单)
  20. DB2快速入门—DB2 11的安装与使用

热门文章

  1. SEO搜索引擎优化是什么
  2. OpenCV-Calibration-Detailed Description翻译
  3. oracle 报表聚合,Oracle很实用的汇总报表实现方式!grouping_id
  4. oracle根据汇总报表计算结余
  5. android 找不到 theme,android-找不到与给定名称'@ style / Theme.Holo.Light.DarkActionBar'匹配的资源...
  6. 英伟达史上最便宜AI硬件发布:可运行所有AI模型,算力472 GFLOPS,功耗5瓦
  7. 计算机如何从光盘启动不了,电脑如何设置光驱启动?开机设置光驱为第一启动的步骤...
  8. 【Unity】游戏开发过程中的前后台切换技术
  9. 【供应链架构day12】电商仓储WMS的业务UseCase
  10. IRQL_NOT_LESS_OR_EQUAL蓝屏分析