Flink sql 写ddl连接kafka
Flink sql 写ddl连接kafka踩坑日记
背景:flink_2.12-1.14.3
kafka_2.12-3.1.0
准备工作
- 上传flink的kafka连接器(flink-connector-kafka)
将下载好的 flink-connector-kafka的jar包放在flink的lib目录下启动flink程序 - 编写flinksql的DDL语句
#kafka的读取ddl
CREATE TABLE source (name STRING,age STRING
) WITH ('connector' = 'kafka','topic' = 'test','properties.bootstrap.servers' = 'ip:prot','properties.group.id' = 'group_test','scan.startup.mode' = 'latest-offset','format' = 'json'
);
#kafka的写入ddl
CREATE TABLE sink (name STRING,sex STRING
) WITH ('connector' = 'kafka','topic' = 'test1','properties.bootstrap.servers' = 'ip:prot','format' = 'json'
);
#业务逻辑实现
insert into sink select * from source;
- 用bin/sql-client.sh -f xxx.sql 启动脚本提交任务
!!!可能出现的问题:
1.Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/OffsetResetStrategy
这是缺少kafka-clients的jar包,只需将下载的kafka-clients jar包放在lib目录下重启flink,提交任务
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/OffsetResetStrategyat org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.earliest(OffsetsInitializer.java:147)at org.apache.flink.connector.kafka.source.KafkaSourceBuilder.<init>(KafkaSourceBuilder.java:104)at org.apache.flink.connector.kafka.source.KafkaSource.builder(KafkaSource.java:117)at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createKafkaSource(KafkaDynamicSource.java:379)at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:216)at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:452)at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:160)at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:124)at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057)at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026)at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301)at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:639)at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:290)at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$parseStatement$1(LocalExecutor.java:172)at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88)at org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172)at org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:396)at org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:324)at org.apache.flink.table.client.cli.CliClient.executeFile(CliClient.java:314)at org.apache.flink.table.client.cli.CliClient.executeInNonInteractiveMode(CliClient.java:230)at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:153)at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)... 1 more
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.OffsetResetStrategyat java.net.URLClassLoader.findClass(URLClassLoader.java:387)at java.lang.ClassLoader.loadClass(ClassLoader.java:418)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)at java.lang.ClassLoader.loadClass(ClassLoader.java:351)... 37 more
2.org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
这是因为kafka的导入的包与版本不一致导致的错误,按照安装的版本重新下载jar包即可
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategyat org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) ~[flink-dist_2.12-1.14.3.jar:1.14.3]at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:101) ~[flink-dist_2.12-1.14.3.jar:1.14.3]at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:273) ~[flink-dist_2.12-1.14.3.jar:1.14.3]at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:563) ~[flink-dist_2.12-1.14.3.jar:1.14.3]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455) ~[flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455) ~[flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) ~[flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_c979f80b-8dd8-49a4-a019-20da907d2799.jar:1.14.3]at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545)at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223)at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285)at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:298)at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic partitions due toat org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:237)at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86)at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)... 3 more
Caused by: java.lang.RuntimeException: Failed to get metadata for topics [test].at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47)at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52)at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:222)at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)... 3 more
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1667810025114, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
Flink sql 写ddl连接kafka相关推荐
- flink sql client讀取kafka數據的timestamp(DDL方式)
实验目的 Kafka的数据能让Flink SQL Client读取到 本文是对[1]的详细记载 具体操作步骤 ①啓動hadoop集羣,離開安全模式 ②各个节点都关闭防火墙: service firew ...
- 一文带你入门flink sql
文章目录 一文带你入门flink sql 写在前面 环境准备 正文 遇到的一些问题 错误一 错误二 错误三 一文带你入门flink sql 写在前面 本次实战主要是通过Flink SQL Client ...
- 33张图解flink sql应用提交(建议收藏!)
前言 大家好,我是土哥. 这已经是我为读者写的第21篇 Flink系列文章了. 上周有粉丝在群里问,在流计算平台编写完Flink sql后,为什么通过一键提交按钮,就可以将sql提交到yarn集群上面 ...
- 个推基于Flink SQL建设实时数仓实践
作为一家数据智能企业,个推在服务垂直行业客户的过程中,会涉及到很多数据实时计算和分析的场景,比如在服务开发者时,需要对App消息推送的下发数.到达数.打开率等后效数据进行实时统计:在服务政府单位时,需 ...
- flink sql 知其所以然(八):flink sql tumble window 的奇妙解析之路
感谢您的小爱心(关注 + 点赞 + 再看),对博主的肯定,会督促博主持续的输出更多的优质实战内容!!! 1.序篇-本文结构 大数据羊说 用数据提升美好事物发生的概率~ 34篇原创内容 公众号 源码 ...
- Apache Doris 系列: 基础篇-Flink SQL写入Doris
简介 本文介绍 Flink SQL如何流式写入 Apache Doris,分为一下几个部分: Flink Doris connector Doris FE 节点配置 Flink SQL 写 Doris ...
- Flink SQL Client进行Kafka事实表与Hbase维度表Join(纯DDL/SQL方式)
概述: 對參考鏈接[1]進行DDL上的復現. 一些基本的業務常识 來源載體 數據特點 維表 Mysql/Csv/Hbase 很少變化 事實表 Kafka 不停變化 开发环境与准备工作 组件 版本 ...
- Flink SQL Client读Kafka+流计算(DDL方式+代碼嵌入DDL/SQL方式)
#################################################################################################### ...
- flink DDL读取kafka数据-Scala嵌入DDL形式
步驟: service firewalld stop(关闭防火墙) 啓動hadoop 離開安全模式 啓動zookeeper與kafka集羣 操作 命令 备注 查看topic $KAFKA/bin/ka ...
- go 连接 kafka 写 mysql
引言 上一篇中提到了,go 连接 kafka 进行消息的生产和消费过程.在这一篇中,将对 go 连接 kafka 写 mysql 进行简单的设计和实现. 本文主要针对的是 Mac 系统,如果使用其它操 ...
最新文章
- python抓取简单网页_【Python3 爬虫】01_简单页面抓取
- Homebrew安装(MacOS)
- SQL-SQLServer(926)
- webform(八)——LinQ简单增、删、改、查
- 推荐几个出论文的好方向
- matlab 邵玉斌,matlab 清华大学出版社 邵玉斌编写的《通信系统建模与仿真实例分析》一书的所有MATLAB和SIMULINK代码 - 下载 - 搜珍网...
- Angular 服务器端渲染的学习笔记(一)
- 史上最被低估的两个学科!它们远比你想的更重要!
- web.config中特殊字符的处理
- linux上apache和php结合
- 如何在生产环境排查 Rust 内存占用过高问题
- VB.net下使用开源免费三维控件Anycad的应用实例分享
- Discuz!风格模版初级不完全修改教程
- iMac重装系统的问题:无法与恢复服务器取得联系/将安装器信息下载到目标卷宗失败
- VS2015 解决 “有太多的错误导致IntelliSense引擎无法正常工作,其中有些错误无法在编辑其中查看”问题
- 用php照片艺术化,Lab:照片艺术化调色处理介绍
- 怎样在Mac上的Safari中观看YouTube画中画?
- SAP S4 FI后台详细配置教程- PART4 (科目及税费相关配置篇)
- GBK-unicode对照
- 【Windows11系统更新后蓝牙没了】