Flink Checkpoint超时问题

文章目录

  • Flink Checkpoint超时问题
    • 问题现象
    • 问题分析
      • 问题1:TaskManager进程挂掉
      • 问题2:任务长时间处于CANCELING
      • 问题3:Checkpoint超时
      • 问题4:数据无法正常同步
    • 解决思路
    • 总结
    • 参考文档

问题现象

业务部门最近使用Flink来做数据实时同步,通过同步工具把CDC消息接入Kafka,其中上百张表同步到单个topic里,然后通过Flink来消费Kafka,做数据解析、数据分发、然后发送到目标数据库(mysql/oracle),整个链路相对比较简单,之前通过Jstorm来实现,最近才迁移到Flink,通过Flink DataStream API来实现。代码里仅用到Kafka Source、Map、Process几个简单的算子,发送目标库的逻辑在Process的逻辑里实现,因此process的逻辑里涉及数据库连接的创建与清理、通过队列来缓存数据,创建额外线程来启动发送和消费队列的逻辑,先不说整个逻辑是否合理,本文主要基于此案例来阐述遇到的问题和排查思路以及解决方法。

业务部门使用的Flink版本为1.11.2,部署模式采用Standalone。出问题的是单机环境,即有一个JobManager进程和一个TaskManager进程。

问题现象是通过web页面观察发现启动任务后很短时间任务就发生重启,同时还会出现重启去Cancel任务的时候无法Cancel,一直处于CANCELING状态(正常会很快变成CANCELED)。并且过一段时候后TaskManager进程挂掉,导致任务一直处于无法申请Slot的状态,最终导致数据无法正常同步。因此,问题主要有以下几个现象:

  1. Checkpoint超时
  2. 子任务长时间处于CANCELING,任务长时间处于RESTARTING状态
  3. 一段时间后TaskManager进程挂掉
  4. 数据无法正常同步

问题分析

问题1:TaskManager进程挂掉

看到问题的第一反应是首先看TaskManager进程为什么会挂掉,这个问题比较严重,因为涉及到集群层面而不单单是任务了。查看Taskmanager日志,发现有以下片段:

// 日志1
ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Task did not exit gracefully within 180 + seconds.
org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) [flink-dist_2.11-1.11.2.jar:1.11.2]at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
2021-03-05 04:09:30,816 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Fatal error occurred while executing the TaskManager. Shutting it down...
org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) [flink-dist_2.11-1.11.2.jar:1.11.2]at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
// 日志2
WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Task 'Source: Custom Source -> Map -> Process (1/1)' did not react to cancelling signal for 30 seconds, but is stuck in method:org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:91)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
java.lang.Thread.run(Thread.java:748)

看日志1发现有关键日志Task did not exit gracefully within 180 + seconds打印,于是查看Flink源码,查看包含此日志的代码。

private static class TaskCancelerWatchDog implements Runnable {@Overridepublic void run() {try {final long hardKillDeadline = System.nanoTime() + timeoutMillis * 1_000_000;long millisLeft;while (executerThread.isAlive()&& (millisLeft = (hardKillDeadline - System.nanoTime()) / 1_000_000) > 0) {try {executerThread.join(millisLeft);} catch (InterruptedException ignored) {}}if (executerThread.isAlive()) {String msg = "Task did not exit gracefully within " + (timeoutMillis / 1000) + " + seconds.";taskManager.notifyFatalError(msg, new FlinkRuntimeException(msg));}} catch (Throwable t) {}}
}

可以看到TaskCancelerWatchDog是用来监听Cancel任务是否成功的线程,如果超过timeoutMillis执行线程还处理alive状态,则向TaskManager进程抛出FatalError,而这个timeoutMillis是通过task.cancellation.timeout参数来指定,默认是180s,如果指定为0则不开启这个功能。

日志2涉及的源码如下:

private static final class TaskInterrupter implements Runnable {@Overridepublic void run() {try {executerThread.join(interruptIntervalMillis);while (task.shouldInterruptOnCancel() && executerThread.isAlive()) {//log.warn("Task '{}' did not react to cancelling signal for {} seconds, but is stuck in method:\n {}",taskName, (interruptIntervalMillis / 1000), bld);executerThread.interrupt();try {executerThread.join(interruptIntervalMillis);} catch (InterruptedException e) {// we ignore this and fall through the loop}}} catch (Throwable t) {// FatalError}}
}

TaskInterrupter是用来中断执行线程的线程,这个日志可以看出需要Cancel但还在执行的线程的堆栈信息。从以上两段日志可以看出,TaskManager进程挂掉的原因是由于任务在180s内没被正常Cancel导致。为了防止TaskManager进程挂掉,我们添加参数task.cancellation.timeout: 0

问题2:任务长时间处于CANCELING

显然,问题1是由于问题2导致的,那问题2是什么原因导致的呢,从问题1的日志堆栈可以看到在执行StreamTask.invoke,此堆栈好像也没提供比较有用的信息。我们只能猜测,某个Task在执行Cancel的时候未被Cancel掉,可能是因为某种原因hang住导致,下面再进一步分析。但还有一个问题是这个Task是什么原因需要去执行Cancel操作呢?因为没有人为的去执行Cancel操作,所以肯定是Flink自己去Cancel的,具体的原因继续往下看,我们发现standalonesession(JobManager日志)日志里存在checkpoint超时的错误日志,关键信息为expired before completing

INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint 58 of job f46ee0d14fe0e6f91253e78487796f5b expired before completing.
INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66) ~[flink-dist_2.11-1.11.2.jar:1.11.2]at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673) ~[flink-dist_2.11-1.11.2.jar:1.11.2]at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650) ~[flink-dist_2.11-1.11.2.jar:1.11.2]at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91) ~[flink-dist_2.11-1.11.2.jar:1.11.2]at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783) ~[flink-dist_2.11-1.11.2.jar:1.11.2]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181]at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_181]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_181]at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_181]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_181]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181]at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]

因此接着看第三个问题

问题3:Checkpoint超时

expired before completing的日志意味着checkpoint发生超时,确认任务配置参数,配置如下:

execution.checkpointing.interval: 60000
execution.checkpointing.timeout: 60000
execution.checkpointing.max-concurrent-checkpoints: 500
execution.checkpointing.min-pause: 500

一开始以为checkpoint超时时间设置太短,于是增大超时时间到30分钟,但从web界面发现,大量checkpoint处于pendding状态,最终还会超时。因为未设置execution.checkpointing.tolerable-failed-checkpoints,因此一旦发生超时,任务将会发生重启。

看代码和日志都看不出个所以然,只能查看TaskManager进程的堆栈来排查了,目的是看下发生checkpoint超时的时候内部线程运行情况是怎么样的。Flink1.11.2也提供了web界面查看stack的功能,但相比jstack命令打印的还是有点区别,这里还是采用jstack -l id > id.jstack的方式来进行排查。

查看stack发现有处于BLOCKED状态的线程

"Source: Custom Source -> Map -> mysqlmsg (1/1)" #77 prio=5 os_prio=0 tid=0x00007fb560080800 nid=0x7c3b waiting for monitor entry [0x00007fb539609000]java.lang.Thread.State: BLOCKED (on object monitor)at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:91)- waiting to lock <0x0000000702a61a58> (a java.lang.Object)

接着查看0x0000000702a61a58对应的线程

"Legacy Source Thread - Source: Custom Source -> Map -> mysqlmsg (1/1)" #82 prio=5 os_prio=0 tid=0x00007fb48c485000 nid=0x7c4e in Object.wait() [0x00007fb47f3fd000]java.lang.Thread.State: TIMED_WAITING (on object monitor)at java.lang.Object.wait(Native Method)at org.apache.ibatis.datasource.pooled.PooledDataSource.popConnection(PooledDataSource.java:451)- locked <0x000000070396d180> (a org.apache.ibatis.datasource.pooled.PoolState)at org.apache.ibatis.datasource.pooled.PooledDataSource.getConnection(PooledDataSource.java:90)at org.apache.ibatis.transaction.jdbc.JdbcTransaction.openConnection(JdbcTransaction.java:139)at org.apache.ibatis.transaction.jdbc.JdbcTransaction.getConnection(JdbcTransaction.java:61)at org.apache.ibatis.executor.BaseExecutor.getConnection(BaseExecutor.java:338)at org.apache.ibatis.executor.SimpleExecutor.prepareStatement(SimpleExecutor.java:84)at org.apache.ibatis.executor.SimpleExecutor.doQuery(SimpleExecutor.java:62)at org.apache.ibatis.executor.BaseExecutor.queryFromDatabase(BaseExecutor.java:326)at org.apache.ibatis.executor.BaseExecutor.query(BaseExecutor.java:156)at org.apache.ibatis.executor.CachingExecutor.query(CachingExecutor.java:109)at org.apache.ibatis.executor.CachingExecutor.query(CachingExecutor.java:83)at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:148)at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:141)at org.apache.ibatis.session.defaults.DefaultSqlSession.selectOne(DefaultSqlSession.java:77)at org.apache.ibatis.binding.MapperMethod.execute(MapperMethod.java:83)at org.apache.ibatis.binding.MapperProxy.invoke(MapperProxy.java:59)at com.sun.proxy.$Proxy35.selectByPrimaryKey(Unknown Source)
//省略org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)- locked <0x0000000702a61a58> (a java.lang.Object)at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)- locked <0x0000000702a61a58> (a java.lang.Object)at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)Locked ownable synchronizers:- None

可以看到PooledDataSource.popConnection一直在阻塞,即在获取连接时阻塞了。于是查看初始化连接池的配置,没有配置poolMaximumActiveConnections,即默认最大连接数为10,而代码在ProcessFunctionprocessElement方法里采用短连接方式获取数据库连接,每次来一波数据都创建连接,发送完断开连接。因此很容易因为获取不到连接而使得processElement方法处于阻塞状态。而processElement方法阻塞进而影响Barrier的流动,所以导致了Checkpoint发生超时。

问题4:数据无法正常同步

基于以上几个问题的定位,这个问题就很好解释了,首先由于阻塞导致了Checkpoint发生超时(问题3),然后导致任务重启,在重启时由于阻塞的线程hang住无法Cancel(问题2),由于TaskCancelerWatchDog的存在导致超过默认时间180s后TaskManager挂掉(问题1)。最后导致了问题4数据无法正常同步。

解决思路

原因定位清楚了,离解决问题就近在咫尺了,可以采用几种方式:

  • 增加最大活跃线程数poolMaximumActiveConnections
  • 采用长连接,在open时初始化连接,close方法销毁连接;
  • 不用另外开启连接,直接采用flink-jdbc-connector来发送数据,因为数据源涉及上百张表,需要有分流的操作。

总结

本文基于实时同步任务遇到无法正常同步的问题进行排查分析,旨在提供一种当遇到Flink Checkpoint超时问题时的排查思路,同时也顺便介绍了在Standalone部署模式下运行Flink任务的一种典型问题-TaskManager无缘无故挂掉的问题,希望给正在使用Flink的同学提供一种思路,避免踩坑。

参考文档

jstack详解
MyBatis-内置DataSource实现

Flink Checkpoint超时问题相关推荐

  1. 【flink】Flink常见Checkpoint超时问题排查思路

    1.概述 转载:Flink常见Checkpoint超时问题排查思路 这里仅仅是自己学习. 在日常flink应用中,相信大家经常会遇到checkpoint超时失败这类的问题,遇到这种情况的时候仅仅只会在 ...

  2. Flink checkpoint失败

    目录 前言 问题描述 问题定位 checkpoint的基本原理 思路 现象 问题解决 前言 Flink容错机制的核心部分是绘制分布式数据流和操作员状态的一致快照.这些快照充当一致的检查点,如果发生故障 ...

  3. Flink Checkpoint 详解

    Flink Checkpoint 详解 一.checkpoint简介 二.checkpoint原理 三.精确一次 四.状态后端 五.配置推荐 一.checkpoint简介 Checkpoint是Fli ...

  4. Flink Checkpoint机制分析

    原创作品,转载请标明:http://blog.csdn.net/xiejingfa/article/details/105439802 可靠性是分布式系统实现必须考虑的因素之一.Flink基于Chan ...

  5. Flink内核源码(八)Flink Checkpoint

    Flink中Checkpoint是使Flink 能从故障恢复的一种内部机制.检查点是 Flink 应用状态的一个一致性副本,在发生故障时,Flink 通过从检查点加载应用程序状态来恢复. 核心思想:是 ...

  6. Flink CheckPoint的触发过程

    CheckpointCoordinator的转换及调度 1.转换过程 在Flink JobMaster中有用于协调和触发checkpoint机制的协调管理器CheckpointCoordinator, ...

  7. Flink——Flink CheckPoint之两阶段提交协议(Two-Phase Commit Protocol)

    文章目录 两阶段提交协议 1. 两阶段提交的前提条件 2. 两阶段提交的基本算法 a. 第一阶段(提交请求阶段) b. 第二阶段(提交执行阶段) 3. 两阶段提交的缺点 Flink-两阶段提交协议 1 ...

  8. Flink Checkpoint 机制:如何保证 barrier 和数据之间不乱序?

    Flink Checkpoint 机制:如何保证 barrier 和数据之间不乱序? 1 前言 1.1 什么是 state? 要说 checkpoint,首先要从 state 聊起.之前有被问到对于 ...

  9. flink checkpoint 恢复_干货:Flink+Kafka 0.11端到端精确一次处理语义实现

    2017年12月Apache Flink社区发布了1.4版本.该版本正式引入了一个里程碑式的功能:两阶段提交Sink,即TwoPhaseCommitSinkFunction.该SinkFunction ...

最新文章

  1. 使用RNNs进行机器翻译——介绍RNN和LSTM网络及其应用
  2. 6.STM32外设函数分类
  3. linux手工迁移php,Linux+PHP+MySql网站迁移配置
  4. 列表排序并返回索引_Python特性—列表,看完你就能轻松驾驭,拿走不谢
  5. [原创]按键小精灵通用去广告破解补丁
  6. Spring Boot基础学习笔记23:用户自定义授权管理
  7. 思维方式是看待事物的角度、方式和方法,它对人的言行起到决定性作用
  8. 编程:Python实现图片识别
  9. HCIE 数通资料下载 肖哥视频下载
  10. SPI全双工模式下收发字节的理解
  11. 腾讯2021校园招聘全球启动
  12. Java编程语言的风格
  13. 华为服务器电源性能指标,华为服务器可服务性设计介绍-电源篇.PDF
  14. ios视频直播没有音频问题
  15. Android WebView 加载https网页白屏,空白解决方案
  16. [论文阅读] Action Semantics Network: Considering the Effects of Actions in Multiagent System
  17. Zabbix 监控功能实现(监控数据库,使用percona 优化数据库的监控,监控java应用,Agent端 主动传输数据,Zabbix proxy 的使用,Zabbix 监控 + 智能降噪告警)
  18. CMakeLists.txt 详解
  19. python标注cad桩位_cad自动进行桩位编号
  20. stopwords.txt

热门文章

  1. 电烙铁使用方法总结集合
  2. 华为Android10怎样root,华为Mate10 root教程_华为Mate10卡刷获取root权限的方法
  3. 全国(大学)高等教育各学科视频教学全集
  4. hdfs API命令操作京东云主机,采坑记录
  5. 互联网医疗十大公司排名
  6. U盘蠕虫病毒解决办法
  7. 需要administrator权限才能删除文件的处理方法
  8. 星际争霸Ⅱ 神族操作记录
  9. mailgun php版本,如何使用mailgun php API仅向bcc发送邮件?
  10. IP输出不足,端游手游化时代的单一手游企业如何破局?