Flink sql 写ddl连接kafka踩坑日记

背景:flink_2.12-1.14.3
kafka_2.12-3.1.0

准备工作

  1. 上传flink的kafka连接器(flink-connector-kafka)
    将下载好的 flink-connector-kafka的jar包放在flink的lib目录下启动flink程序
  2. 编写flinksql的DDL语句
#kafka的读取ddl
CREATE TABLE source (name              STRING,age               STRING
) WITH ('connector' = 'kafka','topic' = 'test','properties.bootstrap.servers' = 'ip:prot','properties.group.id' = 'group_test','scan.startup.mode' = 'latest-offset','format' = 'json'
);
#kafka的写入ddl
CREATE TABLE sink (name   STRING,sex  STRING
) WITH ('connector' = 'kafka','topic' = 'test1','properties.bootstrap.servers' = 'ip:prot','format' = 'json'
);
#业务逻辑实现
insert into sink select * from source;
  1. 用bin/sql-client.sh -f xxx.sql 启动脚本提交任务

!!!可能出现的问题:
1.Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/OffsetResetStrategy
这是缺少kafka-clients的jar包,只需将下载的kafka-clients jar包放在lib目录下重启flink,提交任务

Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/OffsetResetStrategyat org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.earliest(OffsetsInitializer.java:147)at org.apache.flink.connector.kafka.source.KafkaSourceBuilder.<init>(KafkaSourceBuilder.java:104)at org.apache.flink.connector.kafka.source.KafkaSource.builder(KafkaSource.java:117)at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createKafkaSource(KafkaDynamicSource.java:379)at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:216)at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:452)at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:160)at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:124)at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057)at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026)at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301)at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:639)at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:290)at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$parseStatement$1(LocalExecutor.java:172)at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88)at org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172)at org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:396)at org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:324)at org.apache.flink.table.client.cli.CliClient.executeFile(CliClient.java:314)at org.apache.flink.table.client.cli.CliClient.executeInNonInteractiveMode(CliClient.java:230)at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:153)at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)... 1 more
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.OffsetResetStrategyat java.net.URLClassLoader.findClass(URLClassLoader.java:387)at java.lang.ClassLoader.loadClass(ClassLoader.java:418)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)at java.lang.ClassLoader.loadClass(ClassLoader.java:351)... 37 more

2.org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
这是因为kafka的导入的包与版本不一致导致的错误,按照安装的版本重新下载jar包即可

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategyat org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) ~[flink-dist_2.12-1.14.3.jar:1.14.3]at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:101) ~[flink-dist_2.12-1.14.3.jar:1.14.3]at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:273) ~[flink-dist_2.12-1.14.3.jar:1.14.3]at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:563) ~[flink-dist_2.12-1.14.3.jar:1.14.3]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455) ~[flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455) ~[flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) ~[flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545)at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223)at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285)at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:298)at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic partitions due toat org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:237)at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86)at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)... 3 more
Caused by: java.lang.RuntimeException: Failed to get metadata for topics [test].at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47)at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52)at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:222)at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)... 3 more
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1667810025114, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)

Flink sql 写ddl连接kafka相关推荐

  1. flink sql client讀取kafka數據的timestamp(DDL方式)

    实验目的 Kafka的数据能让Flink SQL Client读取到 本文是对[1]的详细记载 具体操作步骤 ①啓動hadoop集羣,離開安全模式 ②各个节点都关闭防火墙: service firew ...

  2. 一文带你入门flink sql

    文章目录 一文带你入门flink sql 写在前面 环境准备 正文 遇到的一些问题 错误一 错误二 错误三 一文带你入门flink sql 写在前面 本次实战主要是通过Flink SQL Client ...

  3. 33张图解flink sql应用提交(建议收藏!)

    前言 大家好,我是土哥. 这已经是我为读者写的第21篇 Flink系列文章了. 上周有粉丝在群里问,在流计算平台编写完Flink sql后,为什么通过一键提交按钮,就可以将sql提交到yarn集群上面 ...

  4. 个推基于Flink SQL建设实时数仓实践

    作为一家数据智能企业,个推在服务垂直行业客户的过程中,会涉及到很多数据实时计算和分析的场景,比如在服务开发者时,需要对App消息推送的下发数.到达数.打开率等后效数据进行实时统计:在服务政府单位时,需 ...

  5. flink sql 知其所以然(八):flink sql tumble window 的奇妙解析之路

    感谢您的小爱心(关注  +  点赞 + 再看),对博主的肯定,会督促博主持续的输出更多的优质实战内容!!! 1.序篇-本文结构 大数据羊说 用数据提升美好事物发生的概率~ 34篇原创内容 公众号 源码 ...

  6. Apache Doris 系列: 基础篇-Flink SQL写入Doris

    简介 本文介绍 Flink SQL如何流式写入 Apache Doris,分为一下几个部分: Flink Doris connector Doris FE 节点配置 Flink SQL 写 Doris ...

  7. Flink SQL Client进行Kafka事实表与Hbase维度表Join(纯DDL/SQL方式)

    概述: 對參考鏈接[1]進行DDL上的復現. 一些基本的業務常识   來源載體 數據特點 維表 Mysql/Csv/Hbase 很少變化 事實表 Kafka 不停變化 开发环境与准备工作 组件 版本 ...

  8. Flink SQL Client读Kafka+流计算(DDL方式+代碼嵌入DDL/SQL方式)

    #################################################################################################### ...

  9. flink DDL读取kafka数据-Scala嵌入DDL形式

    步驟: service firewalld stop(关闭防火墙) 啓動hadoop 離開安全模式 啓動zookeeper與kafka集羣 操作 命令 备注 查看topic $KAFKA/bin/ka ...

  10. go 连接 kafka 写 mysql

    引言 上一篇中提到了,go 连接 kafka 进行消息的生产和消费过程.在这一篇中,将对 go 连接 kafka 写 mysql 进行简单的设计和实现. 本文主要针对的是 Mac 系统,如果使用其它操 ...

最新文章

  1. python抓取简单网页_【Python3 爬虫】01_简单页面抓取
  2. Homebrew安装(MacOS)
  3. SQL-SQLServer(926)
  4. webform(八)——LinQ简单增、删、改、查
  5. 推荐几个出论文的好方向
  6. matlab 邵玉斌,matlab 清华大学出版社 邵玉斌编写的《通信系统建模与仿真实例分析》一书的所有MATLAB和SIMULINK代码 - 下载 - 搜珍网...
  7. Angular 服务器端渲染的学习笔记(一)
  8. 史上最被低估的两个学科!它们远比你想的更重要!
  9. web.config中特殊字符的处理
  10. linux上apache和php结合
  11. 如何在生产环境排查 Rust 内存占用过高问题
  12. VB.net下使用开源免费三维控件Anycad的应用实例分享
  13. Discuz!风格模版初级不完全修改教程
  14. iMac重装系统的问题:无法与恢复服务器取得联系/将安装器信息下载到目标卷宗失败
  15. VS2015 解决 “有太多的错误导致IntelliSense引擎无法正常工作,其中有些错误无法在编辑其中查看”问题
  16. 用php照片艺术化,Lab:照片艺术化调色处理介绍
  17. 怎样在Mac上的Safari中观看YouTube画中画?
  18. SAP S4 FI后台详细配置教程- PART4 (科目及税费相关配置篇)
  19. GBK-unicode对照
  20. 【Windows11系统更新后蓝牙没了】

热门文章

  1. psc格式文件 的数据库导入问题
  2. SEO工具脚本,Python百度下拉框关键词采集工具
  3. 三大指数快速入门和应用
  4. MyExcel.net 使用手册
  5. 360系统急救箱用在服务器上,360系统急救箱打开失败的处理操作
  6. GAC注册/卸载 dll
  7. CSS 3之 文本样式(三)
  8. 2016.03.07错误记录
  9. python 身份证实名认证
  10. 一种RGD-全氟化碳纳米乳MRI显影剂的制备方法