问题描述

Flink接入kafka数据写入hdfs集群,正常运行一段时间20min到1h作业后报错,failed挂掉。

报错信息

检查点问题:

Flink job failed with “Checkpoint Coordinator is suspending.

2020-12-26 20:58:54
org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, backoffTimeMS=10000)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunctionclass.applyOrElse(PartialFunction.scala:123)atakka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)atscala.PartialFunctionclass.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunctionclass.applyOrElse(PartialFunction.scala:123)atakka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)atscala.PartialFunctionOrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunctionOrElse.applyOrElse(PartialFunction.scala:171)atscala.PartialFunctionOrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunctionOrElse.applyOrElse(PartialFunction.scala:171)atscala.PartialFunctionOrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actorclass.aroundReceive(Actor.scala:517)atakka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)atakka.actor.ActorCell.receiveMessage(ActorCell.scala:592)atakka.actor.ActorCell.invoke(ActorCell.scala:561)atakka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)atakka.dispatch.Mailbox.run(Mailbox.scala:225)atakka.dispatch.Mailbox.exec(Mailbox.scala:235)atakka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)atakka.dispatch.forkjoin.ForkJoinPoolclass.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPoolclass.aroundReceive(Actor.scala:517)atakka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)atakka.actor.ActorCell.receiveMessage(ActorCell.scala:592)atakka.actor.ActorCell.invoke(ActorCell.scala:561)atakka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)atakka.dispatch.Mailbox.run(Mailbox.scala:225)atakka.dispatch.Mailbox.exec(Mailbox.scala:235)atakka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)atakka.dispatch.forkjoin.ForkJoinPoolWorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Could not perform checkpoint 286 for operator Source: Custom Source -> Map -> Filter -> Sink: Unnamed (1/1).
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:822)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$4(StreamTask.java:789)
at

org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 286 for operator Source: Custom Source -> Map -> Filter -> Sink: Unnamed (1/1). Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint5(StreamTask.java:892)atorg.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor5(StreamTask.java:892) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor5(StreamTask.java:892)atorg.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutorSynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:882)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:813)
… 11 more
Caused by: java.io.IOException: Failing write. Tried pipeline recovery 5 times without success.
at org.apache.hadoop.hdfs.DataStreamer.processDatanodeOrExternalError(DataStreamer.java:1250)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:667)

原因

自定义翻桶器Bucketer和Sink的时候设置的策略不当导致:

背景

​ 每接入一条kafka数据,根据数据字段里面的时间来生成hdfs里面的文件目录和文件后缀——INACTIVE_BUCKET_THRESHOLD长时间未写入新数据,导致大量的文件未关闭。

定位和测试

在黑盒测试下,BATCH_ROLLOVER_INTERVAL(定时生成新文件策略,跟文件大小对应),为一分钟的时候作业无问题(不挂掉,跑了3d)。固定时间生成新文件策略BATCH_ROLLOVER_INTERVAL为60s。修改未响应等待时长INACTIVE_BUCKET_THRESHOLD分别为:

测试用例:

1:间隔60s,30min

2:间隔60s,15min

3:间隔60s,5min

报错的配置:30*60*1000L20*60*1000 最少20min关闭

成功的配置:30*60*1000L60*1000 最少1min关闭

源码默认的配置:

解决方案:修改BATCH_ROLLOVER_INTERVAL为默认60s的未响应关闭时间。

Flink写入hdfs报错:Flink job failed with “Checkpoint Coordinator is suspending相关推荐

  1. 【Flink】Flink 写入 kafka 报错 The server disconnected before a response was received

    文章目录 1.场景再现 1.1.概述 1.场景再现 1.1.概述 Flink 写入 kafka 报错 The server disconnected before a response was rec ...

  2. 【Flink】Flink 写入 kafka 报错 Failed to send data to Kafka: Expiring 4 record(s) for 20001 ms has passed

    文章目录 1.场景1 1.1 概述 1.2 百度 1.3 同样问题 2.场景再现 本文为博主九师兄(QQ:541711153 欢迎来探讨技术)原创文章,未经允许博主不允许转载. 1.场景1 1.1 概 ...

  3. Apache Flink写入Clickhouse报错 code: 1002, ip:8123 failed to respond

    目录 版本 一.问题 1 问题描述 2 日志 二.解决 解决过程 1 升级驱动jar包版本(无效) 2 修改连接参数(无效) 3 修改服务端参数(有效) 4 修改获取连接方式(todo) 参考资料 版 ...

  4. 【Flink】Flink写入es报错failed to get node info for request_id time out out after

    1.美图 2.背景 windows下开发flink程序,连接本地的es,启动的时候报错如下,重启一下es就好了. INFO [elasticsearch[_ client_ _] [generic] ...

  5. 【Flink】Flink 操作HDFS报错 hadoop is not in the classpath/dependencies

    文章目录 1.背景 2.方案1 3.方案2 2.场景2 1.背景 写了一个FLink程序,用来设置RocksDb,然后报错 @Testpublic void flatMapStateBackendTe ...

  6. Flink 1.9 写入HDFS报错 UnsupportedFileSystemSchemeException:hdfs

    1.代码如下 @Testdef riteFileToHdfs(): Unit = {//0.主意:不论是本地还是hdfs.若Parallelism>1将把path当成目录名称,若Parallel ...

  7. Flink读取Kafka报错:KafkaException ByteArrayDeserializer is not an instance Deserializer

    1.视界 2.背景 做flink读取kafka报错 org.apache.kafka.common.KafkaException: Failed to construct kafka consumer ...

  8. 【Flink】Flink yarn 下报错ClassNotFoundException: org.apache.hadoop.yarn.api.ApplicationConstants$Environ

    1.概述 flink启动日志报错如下 2020-08-28 16:44:19,839 ERROR org.apache.flink.client.cli

  9. 【Flink】Flink连接prometheus报错 IOException :Response code formxxx/metrics/job/rule

    1.美图 2.背景 Flink连接prometheus报错 IOException :Response code formxxx/metrics/job/rule 如下图 注意: Flink连接pro ...

  10. Flink sql-clinet 查询报错

    Flink sql-clinet 查询报错 运行后进行 select 'hello world'报以下错误, could not execute sql statement java.net.NoRo ...

最新文章

  1. 连接状态_TCP 连接状态及相关命令学习
  2. 目标10亿部?苹果AR眼镜有望明年登场!传搭载Mac级处理器、4K显示屏
  3. 下一代安全工具:SHA-3
  4. OpenCASCADE绘制测试线束:布尔运算命令之处理多个参数的通用命令
  5. logstash filter grok 用法
  6. win10配置gcc编译环境
  7. 物联网和工业互联网场景下的边缘计算
  8. Javascript中四种函数调用方式
  9. 跨域问题_跨域问题如何解决?
  10. 生活杂谈-空调的修理
  11. LINUX备份,使用再生龙Clonezilla成功,remastersys、systemback都失败
  12. Centos 7 安装 TEMPO2
  13. 狂神说Redis学习笔记
  14. 关于严蔚敏教授的数据结构一书中return ok ,OK为1不为0的问题
  15. go 运行代码遇到的问题(同一个包,不同包 之间的引用报错)
  16. 计算机设计项目符号和编号,项目符号和编号
  17. 环境会计信息披露问题研究
  18. NVDIMM为存储加速
  19. blender改变物体中心位置
  20. DNS服务器配置:DNS服务器配置:正反解析,主从服务器,子域授权,

热门文章

  1. 店宝宝:电视剧“拼刀刀”梗引热议 拼多多紧急注册商标为哪版?
  2. 计算机视觉与机械专业相关吗,计算机视觉在早期森林火灾探测中的应用研究-精密仪器及机械专业论文.docx...
  3. 田野调查手记·浮山摩崖石刻(七)
  4. OpenTCS打造移动机器人交通管制系统(三)
  5. 计算机键盘运算符号输入,电脑上感叹号怎么打出来(电脑键盘符号大全)
  6. PAT乙级真题 1092 最好吃的月饼 C++实现
  7. 指标异动的贡献度量化归因
  8. linux dd 刻录光盘,使用dd命令制作U盘启动盘wodim刻录光盘cd dvd
  9. 关于海康威视网络摄像机二次开发问题
  10. 想长胖的人看过来,几招教你变胖|猎人营