Flink Checkpoint超时问题
Flink Checkpoint超时问题
文章目录
- Flink Checkpoint超时问题
- 问题现象
- 问题分析
- 问题1:TaskManager进程挂掉
- 问题2:任务长时间处于CANCELING
- 问题3:Checkpoint超时
- 问题4:数据无法正常同步
- 解决思路
- 总结
- 参考文档
问题现象
业务部门最近使用Flink来做数据实时同步,通过同步工具把CDC消息接入Kafka,其中上百张表同步到单个topic里,然后通过Flink来消费Kafka,做数据解析、数据分发、然后发送到目标数据库(mysql/oracle),整个链路相对比较简单,之前通过Jstorm来实现,最近才迁移到Flink,通过Flink DataStream API来实现。代码里仅用到Kafka Source、Map、Process几个简单的算子,发送目标库的逻辑在Process的逻辑里实现,因此process的逻辑里涉及数据库连接的创建与清理、通过队列来缓存数据,创建额外线程来启动发送和消费队列的逻辑,先不说整个逻辑是否合理,本文主要基于此案例来阐述遇到的问题和排查思路以及解决方法。
业务部门使用的Flink版本为1.11.2,部署模式采用Standalone
。出问题的是单机环境,即有一个JobManager
进程和一个TaskManager
进程。
问题现象是通过web页面观察发现启动任务后很短时间任务就发生重启,同时还会出现重启去Cancel
任务的时候无法Cancel
,一直处于CANCELING
状态(正常会很快变成CANCELED
)。并且过一段时候后TaskManager
进程挂掉,导致任务一直处于无法申请Slot
的状态,最终导致数据无法正常同步。因此,问题主要有以下几个现象:
Checkpoint
超时- 子任务长时间处于
CANCELING
,任务长时间处于RESTARTING
状态 - 一段时间后
TaskManager
进程挂掉 - 数据无法正常同步
问题分析
问题1:TaskManager进程挂掉
看到问题的第一反应是首先看TaskManager
进程为什么会挂掉,这个问题比较严重,因为涉及到集群层面而不单单是任务了。查看Taskmanager
日志,发现有以下片段:
// 日志1
ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Task did not exit gracefully within 180 + seconds.
org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) [flink-dist_2.11-1.11.2.jar:1.11.2]at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
2021-03-05 04:09:30,816 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error occurred while executing the TaskManager. Shutting it down...
org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) [flink-dist_2.11-1.11.2.jar:1.11.2]at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
// 日志2
WARN org.apache.flink.runtime.taskmanager.Task [] - Task 'Source: Custom Source -> Map -> Process (1/1)' did not react to cancelling signal for 30 seconds, but is stuck in method:org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:91)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
java.lang.Thread.run(Thread.java:748)
看日志1发现有关键日志Task did not exit gracefully within 180 + seconds
打印,于是查看Flink源码,查看包含此日志的代码。
private static class TaskCancelerWatchDog implements Runnable {@Overridepublic void run() {try {final long hardKillDeadline = System.nanoTime() + timeoutMillis * 1_000_000;long millisLeft;while (executerThread.isAlive()&& (millisLeft = (hardKillDeadline - System.nanoTime()) / 1_000_000) > 0) {try {executerThread.join(millisLeft);} catch (InterruptedException ignored) {}}if (executerThread.isAlive()) {String msg = "Task did not exit gracefully within " + (timeoutMillis / 1000) + " + seconds.";taskManager.notifyFatalError(msg, new FlinkRuntimeException(msg));}} catch (Throwable t) {}}
}
可以看到TaskCancelerWatchDog
是用来监听Cancel任务是否成功的线程,如果超过timeoutMillis
执行线程还处理alive状态,则向TaskManager
进程抛出FatalError
,而这个timeoutMillis
是通过task.cancellation.timeout
参数来指定,默认是180s,如果指定为0则不开启这个功能。
日志2涉及的源码如下:
private static final class TaskInterrupter implements Runnable {@Overridepublic void run() {try {executerThread.join(interruptIntervalMillis);while (task.shouldInterruptOnCancel() && executerThread.isAlive()) {//log.warn("Task '{}' did not react to cancelling signal for {} seconds, but is stuck in method:\n {}",taskName, (interruptIntervalMillis / 1000), bld);executerThread.interrupt();try {executerThread.join(interruptIntervalMillis);} catch (InterruptedException e) {// we ignore this and fall through the loop}}} catch (Throwable t) {// FatalError}}
}
TaskInterrupter
是用来中断执行线程的线程,这个日志可以看出需要Cancel但还在执行的线程的堆栈信息。从以上两段日志可以看出,TaskManager
进程挂掉的原因是由于任务在180s内没被正常Cancel导致。为了防止TaskManager
进程挂掉,我们添加参数task.cancellation.timeout: 0
问题2:任务长时间处于CANCELING
显然,问题1是由于问题2导致的,那问题2是什么原因导致的呢,从问题1的日志堆栈可以看到在执行StreamTask.invoke
,此堆栈好像也没提供比较有用的信息。我们只能猜测,某个Task在执行Cancel
的时候未被Cancel
掉,可能是因为某种原因hang住导致,下面再进一步分析。但还有一个问题是这个Task是什么原因需要去执行Cancel
操作呢?因为没有人为的去执行Cancel
操作,所以肯定是Flink自己去Cancel
的,具体的原因继续往下看,我们发现standalonesession
(JobManager
日志)日志里存在checkpoint
超时的错误日志,关键信息为expired before completing
:
INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 58 of job f46ee0d14fe0e6f91253e78487796f5b expired before completing.
INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66) ~[flink-dist_2.11-1.11.2.jar:1.11.2]at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673) ~[flink-dist_2.11-1.11.2.jar:1.11.2]at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650) ~[flink-dist_2.11-1.11.2.jar:1.11.2]at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91) ~[flink-dist_2.11-1.11.2.jar:1.11.2]at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783) ~[flink-dist_2.11-1.11.2.jar:1.11.2]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181]at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_181]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_181]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_181]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_181]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181]at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
因此接着看第三个问题
问题3:Checkpoint超时
expired before completing
的日志意味着checkpoint
发生超时,确认任务配置参数,配置如下:
execution.checkpointing.interval: 60000
execution.checkpointing.timeout: 60000
execution.checkpointing.max-concurrent-checkpoints: 500
execution.checkpointing.min-pause: 500
一开始以为checkpoint
超时时间设置太短,于是增大超时时间到30分钟,但从web界面发现,大量checkpoint
处于pendding状态,最终还会超时。因为未设置execution.checkpointing.tolerable-failed-checkpoints
,因此一旦发生超时,任务将会发生重启。
看代码和日志都看不出个所以然,只能查看TaskManager
进程的堆栈来排查了,目的是看下发生checkpoint
超时的时候内部线程运行情况是怎么样的。Flink1.11.2也提供了web界面查看stack的功能,但相比jstack命令打印的还是有点区别,这里还是采用jstack -l id > id.jstack的方式来进行排查。
查看stack发现有处于BLOCKED
状态的线程
"Source: Custom Source -> Map -> mysqlmsg (1/1)" #77 prio=5 os_prio=0 tid=0x00007fb560080800 nid=0x7c3b waiting for monitor entry [0x00007fb539609000]java.lang.Thread.State: BLOCKED (on object monitor)at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:91)- waiting to lock <0x0000000702a61a58> (a java.lang.Object)
接着查看0x0000000702a61a58
对应的线程
"Legacy Source Thread - Source: Custom Source -> Map -> mysqlmsg (1/1)" #82 prio=5 os_prio=0 tid=0x00007fb48c485000 nid=0x7c4e in Object.wait() [0x00007fb47f3fd000]java.lang.Thread.State: TIMED_WAITING (on object monitor)at java.lang.Object.wait(Native Method)at org.apache.ibatis.datasource.pooled.PooledDataSource.popConnection(PooledDataSource.java:451)- locked <0x000000070396d180> (a org.apache.ibatis.datasource.pooled.PoolState)at org.apache.ibatis.datasource.pooled.PooledDataSource.getConnection(PooledDataSource.java:90)at org.apache.ibatis.transaction.jdbc.JdbcTransaction.openConnection(JdbcTransaction.java:139)at org.apache.ibatis.transaction.jdbc.JdbcTransaction.getConnection(JdbcTransaction.java:61)at org.apache.ibatis.executor.BaseExecutor.getConnection(BaseExecutor.java:338)at org.apache.ibatis.executor.SimpleExecutor.prepareStatement(SimpleExecutor.java:84)at org.apache.ibatis.executor.SimpleExecutor.doQuery(SimpleExecutor.java:62)at org.apache.ibatis.executor.BaseExecutor.queryFromDatabase(BaseExecutor.java:326)at org.apache.ibatis.executor.BaseExecutor.query(BaseExecutor.java:156)at org.apache.ibatis.executor.CachingExecutor.query(CachingExecutor.java:109)at org.apache.ibatis.executor.CachingExecutor.query(CachingExecutor.java:83)at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:148)at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:141)at org.apache.ibatis.session.defaults.DefaultSqlSession.selectOne(DefaultSqlSession.java:77)at org.apache.ibatis.binding.MapperMethod.execute(MapperMethod.java:83)at org.apache.ibatis.binding.MapperProxy.invoke(MapperProxy.java:59)at com.sun.proxy.$Proxy35.selectByPrimaryKey(Unknown Source)
//省略org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)- locked <0x0000000702a61a58> (a java.lang.Object)at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)- locked <0x0000000702a61a58> (a java.lang.Object)at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)Locked ownable synchronizers:- None
可以看到PooledDataSource.popConnection
一直在阻塞,即在获取连接时阻塞了。于是查看初始化连接池的配置,没有配置poolMaximumActiveConnections
,即默认最大连接数为10,而代码在ProcessFunction
的processElement
方法里采用短连接方式获取数据库连接,每次来一波数据都创建连接,发送完断开连接。因此很容易因为获取不到连接而使得processElement
方法处于阻塞状态。而processElement
方法阻塞进而影响Barrier
的流动,所以导致了Checkpoint
发生超时。
问题4:数据无法正常同步
基于以上几个问题的定位,这个问题就很好解释了,首先由于阻塞导致了Checkpoint
发生超时(问题3),然后导致任务重启,在重启时由于阻塞的线程hang住无法Cancel
(问题2),由于TaskCancelerWatchDog
的存在导致超过默认时间180s后TaskManager
挂掉(问题1)。最后导致了问题4数据无法正常同步。
解决思路
原因定位清楚了,离解决问题就近在咫尺了,可以采用几种方式:
- 增加最大活跃线程数
poolMaximumActiveConnections
; - 采用长连接,在
open
时初始化连接,close
方法销毁连接; - 不用另外开启连接,直接采用
flink-jdbc-connector
来发送数据,因为数据源涉及上百张表,需要有分流的操作。
总结
本文基于实时同步任务遇到无法正常同步的问题进行排查分析,旨在提供一种当遇到Flink Checkpoint
超时问题时的排查思路,同时也顺便介绍了在Standalone
部署模式下运行Flink任务的一种典型问题-TaskManager
无缘无故挂掉的问题,希望给正在使用Flink的同学提供一种思路,避免踩坑。
参考文档
jstack详解
MyBatis-内置DataSource实现
Flink Checkpoint超时问题相关推荐
- 【flink】Flink常见Checkpoint超时问题排查思路
1.概述 转载:Flink常见Checkpoint超时问题排查思路 这里仅仅是自己学习. 在日常flink应用中,相信大家经常会遇到checkpoint超时失败这类的问题,遇到这种情况的时候仅仅只会在 ...
- Flink checkpoint失败
目录 前言 问题描述 问题定位 checkpoint的基本原理 思路 现象 问题解决 前言 Flink容错机制的核心部分是绘制分布式数据流和操作员状态的一致快照.这些快照充当一致的检查点,如果发生故障 ...
- Flink Checkpoint 详解
Flink Checkpoint 详解 一.checkpoint简介 二.checkpoint原理 三.精确一次 四.状态后端 五.配置推荐 一.checkpoint简介 Checkpoint是Fli ...
- Flink Checkpoint机制分析
原创作品,转载请标明:http://blog.csdn.net/xiejingfa/article/details/105439802 可靠性是分布式系统实现必须考虑的因素之一.Flink基于Chan ...
- Flink内核源码(八)Flink Checkpoint
Flink中Checkpoint是使Flink 能从故障恢复的一种内部机制.检查点是 Flink 应用状态的一个一致性副本,在发生故障时,Flink 通过从检查点加载应用程序状态来恢复. 核心思想:是 ...
- Flink CheckPoint的触发过程
CheckpointCoordinator的转换及调度 1.转换过程 在Flink JobMaster中有用于协调和触发checkpoint机制的协调管理器CheckpointCoordinator, ...
- Flink——Flink CheckPoint之两阶段提交协议(Two-Phase Commit Protocol)
文章目录 两阶段提交协议 1. 两阶段提交的前提条件 2. 两阶段提交的基本算法 a. 第一阶段(提交请求阶段) b. 第二阶段(提交执行阶段) 3. 两阶段提交的缺点 Flink-两阶段提交协议 1 ...
- Flink Checkpoint 机制:如何保证 barrier 和数据之间不乱序?
Flink Checkpoint 机制:如何保证 barrier 和数据之间不乱序? 1 前言 1.1 什么是 state? 要说 checkpoint,首先要从 state 聊起.之前有被问到对于 ...
- flink checkpoint 恢复_干货:Flink+Kafka 0.11端到端精确一次处理语义实现
2017年12月Apache Flink社区发布了1.4版本.该版本正式引入了一个里程碑式的功能:两阶段提交Sink,即TwoPhaseCommitSinkFunction.该SinkFunction ...
最新文章
- 使用RNNs进行机器翻译——介绍RNN和LSTM网络及其应用
- 6.STM32外设函数分类
- linux手工迁移php,Linux+PHP+MySql网站迁移配置
- 列表排序并返回索引_Python特性—列表,看完你就能轻松驾驭,拿走不谢
- [原创]按键小精灵通用去广告破解补丁
- Spring Boot基础学习笔记23:用户自定义授权管理
- 思维方式是看待事物的角度、方式和方法,它对人的言行起到决定性作用
- 编程:Python实现图片识别
- HCIE 数通资料下载 肖哥视频下载
- SPI全双工模式下收发字节的理解
- 腾讯2021校园招聘全球启动
- Java编程语言的风格
- 华为服务器电源性能指标,华为服务器可服务性设计介绍-电源篇.PDF
- ios视频直播没有音频问题
- Android WebView 加载https网页白屏,空白解决方案
- [论文阅读] Action Semantics Network: Considering the Effects of Actions in Multiagent System
- Zabbix 监控功能实现(监控数据库,使用percona 优化数据库的监控,监控java应用,Agent端 主动传输数据,Zabbix proxy 的使用,Zabbix 监控 + 智能降噪告警)
- CMakeLists.txt 详解
- python标注cad桩位_cad自动进行桩位编号
- stopwords.txt