Flink程序从kafka中读取数据进行计算,FLink程序一启动就报以下错误,看到错误很懵逼。加班到9点没解决,第二天提前来半小时,把如下错误信息又看了一遍。具体错误如下:

错误信息1.

20/12/17 09:31:07 WARN NetworkClient: [Consumer clientId=consumer-14, groupId=qa_topic_flink_group] Error connecting to node 172.16.40.233:9092 (id: -3 rack: null)
java.nio.channels.ClosedByInterruptExceptionat java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:659)at org.apache.kafka.common.network.Selector.doConnect(Selector.java:278)at org.apache.kafka.common.network.Selector.connect(Selector.java:256)at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:920)at org.apache.kafka.clients.NetworkClient.access$700(NetworkClient.java:67)at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1090)at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:976)at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:533)at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:292)at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1803)at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1771)at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:77)at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:508)at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)at java.lang.Thread.run(Thread.java:748)

错误信息2:

20/12/17 09:31:27 WARN StreamTask: Error while canceling task.
org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedExceptionat org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147)at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136)at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602)at org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1355)at java.lang.Thread.run(Thread.java:748)
20/12/17 09:31:27 WARN StreamTask: Error while canceling task.
org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedExceptionat org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147)at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136)at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602)at org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1355)at java.lang.Thread.run(Thread.java:748)

解决办法:我们kafka的topic为9个分区。因此我们Flink程序中的并行度也要设置为9.

运行程序后正常了。

后续又来了。。。。当我们把并行度设置为9的时候,我的数据要分为好多个侧输出流,

当我写到最后一个侧输出流的时候,又爆了以上同样的错误。然后我就各种尝试。

然后我把所有的侧输出流都注释掉了,一个一个的打开,每每到最后一个侧输出流打开时就报错了。

很郁闷!我开始怀疑玄学了。。。。

最后突发灵感。看看资源管理器,怀疑是内存不够,

但是发现从程序刚启动就报错,内存里面啥也没有。但是在我启动程序的瞬间发现了下图。。。。。。

此图为CPU的资源使用情况,发现在启动程序的瞬间CPU使用率到了100%。

这时就怀疑是CPU了。

果然修改了并行度为3,好了。

env.setParallelism(3);

然后重新启动程序正常CPU资源使用图如下:

启动瞬间没有到100%启动正常。。。。

开心。。。

Flink Caused by:org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException相关推荐

  1. Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file syst

    Flink提交job报错: Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not fi ...

  2. 大数据 Flink 教程之使用 Apache Flink 进行无服务器复杂事件处理

    大数据 Flink 教程之使用 Apache Flink 进行无服务器复杂事件处理 什么是 Apache Flink? Flink 是一个分布式处理引擎,能够对数据流进行大规模的内存计算.数据流是一系 ...

  3. flink 本地_Flink原理Apache Flink漫谈系列 State

    实际问题 在流计算场景中,数据会源源不断的流入Apache Flink系统,每条数据进入Apache Flink系统都会触发计算.如果我们想进行一个Count聚合计算,那么每次触发计算是将历史上所有流 ...

  4. java.lang.String cannot be cast to org.apache.flink.table.data.StringData

    完整报错: 16:54:56,851 INFO org.apache.hadoop.conf.Configuration.deprecation - io.bytes.per.checksum is ...

  5. org.apache.flink.client.program.ProgramInvocationException: The main method caused an error

    flink任务开启检查点并设置状态后端后,提交任务运行,出现以上错误,具体错误如下: org.apache.flink.client.program.ProgramInvocationExceptio ...

  6. Could not find a suitable table factory for ‘org.apache.flink.table.factories.TableSinkFactory‘

    報錯如下: Exception in thread "main" org.apache.flink.table.api.TableException: findAndCreateT ...

  7. flink1.14.0+mysql5.7+mysqlcdc2.2.1报错org.apache.flink.shaded.guava18.com.google.common.util.concurren

    版本 flink: 1.14.0 mysql: 5.7 mysql-cdc:2.1.0/2.2.1 依赖 <dependencies><dependency><group ...

  8. 2021年大数据Flink(十五):流批一体API Connectors ​​​​​​​Kafka

    目录 Kafka pom依赖 参数设置 参数说明 Kafka命令 代码实现-Kafka Consumer 代码实现-Kafka Producer 代码实现-实时ETL Kafka pom依赖 Flin ...

  9. org.apache.flink.client.program.ProgramInvocationException: Job failed

    完整报错信息如下: scala> senv.execute() org.apache.flink.client.program.ProgramInvocationException: Job f ...

最新文章

  1. 通过示例学习JavaScript闭包
  2. python3网上学习资源汇总
  3. 博主的办公室和他的工作台
  4. PyQt4学习资料汇总
  5. C++之private虚函数
  6. Python机器学习:评价分类结果003实现混淆矩阵,精准率和召回率
  7. sp导出法线_SP导出贴图导Redshift渲染效果不同的问题
  8. ffmpeg h264 解码 转
  9. 华为机试HJ64:MP3光标位置
  10. I/O接口标准(1):LVTTL、LVCMOS、SSTL、HSTL
  11. matlab仿真环境运行,第7章 Simulink仿真环境.ppt
  12. python api框架 hook_Windows API Hooking in Python
  13. NOI2022退役记
  14. mysql8 启动报错:Error while setting value ‘STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DI
  15. Apache服务器的下载、安装、配置等等
  16. WebRTC实时通信系列教程7 使用Socket.IO搭建信令服务器交换信息
  17. jdk1.8换成11,启动项目报错java.net.MalformedURLException: unknown protocol: jrt
  18. 个人微信小程序快速赚攻略
  19. 《论文阅读》Point Cloud Completion by Skip-attention Network with Hierarchical Folding
  20. 姜思达和机器人_中国偏见地图曝光:百度大数据看穿了一切

热门文章

  1. 校园导航-南邮仙林校区
  2. 【LC中等】1867. 最大数量高于平均水平的订单
  3. 经验分享:通过VNC Viewer访问阿里云ECS服务器的步骤
  4. 《收获,不止SQL优化》读书笔记
  5. Matlab绘图基础——colormap在数字图像处理及三维图形展示上的应用(分层设色)...
  6. sw安装未成功_solidworks2016安装失败怎么清理
  7. 网络侦察及其防御技术研究综述
  8. TradeX普通行情接口解析
  9. 贝叶斯网络结构学习(基于BDAGL工具箱的MATLAB实现)
  10. 主流深度相机选型(双目相机)