FlinkSQL to Kafka连接器报错:could not find any factory for identifier ‘kafka‘ that implements
一、软件环境
环境 & 软件 | 版本 |
---|---|
LinuxOS | CentOS 7 |
Flink Standalone单节点测试集群 | 1.13.6 |
kafka | 0.11.0 |
Zookeeper | 3.4.5 |
二、启动FlinkSql Client
1.13.6版本的flink中的flinksql客户端还是Beta版本
启动FlinkSql客户端:
[bigdata_admin@dn5 bin]$ ./sql-client.sh embedded
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set.
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
No default environment specified.
Searching for '/data/flink-1.13.6/conf/sql-client-defaults.yaml'...not found.
Command history file path: /home/bigdata_admin/.flink-sql-history▒▓██▓██▒▓████▒▒█▓▒▓███▓▒▓███▓░░ ▒▒▒▓██▒ ▒░██▒ ▒▒▓▓█▓▓▒░ ▒██████▒ ░▒▓███▒ ▒█▒█▒░▓█ ███ ▓░▒██▓█ ▒▒▒▒▒▓██▓░▒░▓▓██░ █ ▒▒░ ███▓▓█ ▒█▒▒▒████░ ▒▓█▓ ██▒▒▒ ▓███▒░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓███▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓█▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒▓█ ▒█▓ ░ █░ ▒█ █▓█▓ ██ █░ ▓▓ ▒█▓▓▓▒█░█▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒███ ▓█▓░ ▒ ░▒█▒██▒ ▓▓▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓░▓██▒ ▓░ ▒█▓█ ░░▒▒▒▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.Flink SQL> HELP;
The following commands are available:CLEAR Clears the current terminal.
CREATE TABLE Create table under current catalog and database.
DROP TABLE Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'
CREATE VIEW Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'
DESCRIBE Describes the schema of a table with the given name.
DROP VIEW Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'
EXPLAIN Describes the execution plan of a query or table with the given name.
HELP Prints the available commands.
INSERT INTO Inserts the results of a SQL SELECT query into a declared table sink.
INSERT OVERWRITE Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.
QUIT Quits the SQL CLI client.
RESET Resets a session configuration property. Syntax: 'RESET <key>;'. Use 'RESET;' for reset all session properties.
SELECT Executes a SQL SELECT query on the Flink cluster.
SET Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties.
SHOW FUNCTIONS Shows all user-defined and built-in functions or only user-defined functions. Syntax: 'SHOW [USER] FUNCTIONS;'
SHOW TABLES Shows all registered tables.
SOURCE Reads a SQL SELECT query from a file and executes it on the Flink cluster.
USE CATALOG Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'
USE Sets the current default database. Experimental! Syntax: 'USE <name>;'
LOAD MODULE Load a module. Syntax: 'LOAD MODULE <name> [WITH ('<key1>' = '<value1>' [, '<key2>' = '<value2>', ...])];'
UNLOAD MODULE Unload a module. Syntax: 'UNLOAD MODULE <name>;'
USE MODULES Enable loaded modules. Syntax: 'USE MODULES <name1> [, <name2>, ...];'
BEGIN STATEMENT SET Begins a statement set. Syntax: 'BEGIN STATEMENT SET;'
END Ends a statement set. Syntax: 'END;'Hint: Make sure that a statement ends with ';' for finalizing (multi-line) statements.
FlinkSQL建表关联Kafka对应的topic
构建测试表:
Flink SQL> DROP TABLE IF EXISTS `ods_base_province`;
[INFO] Execute statement succeed.Flink SQL> CREATE TABLE `ods_base_province` (
> `id` INT,
> `name` STRING,
> `region_id` INT ,
> `area_code`STRING
> ) WITH(
> 'connector' = 'kafka',
> 'topic' = 'mydw.base_province',
> 'properties.bootstrap.servers' = 'dn5:9092',
> 'properties.group.id' = 'testGroup',
> 'format' = 'canal-json' ,
> 'scan.startup.mode' = 'earliest-offset'
> ) ;
[INFO] Execute statement succeed.
这里的format还有json、csv等类型,需根据数据的实际应用场景来确定。
三、问题症状
建表成功,但是查询其下数据的时候,报了如下错误:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.Available factory identifiers are:blackhole
datagen
filesystem
print
四、问题原因 & 解决方案
问题原因:
Flink 运行时上下文 classpath 中缺少flinksql与kafka的连接器jar包(如: flink-sql-connector-kafka_2.11-1.x.y.jar)
解决方案:
通过在Flink集群安装目录${FLINK_HOME}的lib下(如:/data/flink-1.13.6/lib),引入如下 jar:
- flink-sql-connector-kafka_2.11-1.13.6.jar
下载位置:
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka_2.11/1.13.6
此时lib包下jar:
[bigdata_admin@dn5 lib]$ ll
total 202084
-rw-r--r-- 1 bigdata_admin bigdata_admin 92314 Feb 4 17:11 flink-csv-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 115425612 Feb 4 17:15 flink-dist_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 148127 Feb 4 17:11 flink-json-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 7709740 May 7 2021 flink-shaded-zookeeper-3.4.14.jar
-rw-rw-r-- 1 bigdata_admin bigdata_admin 3674190 Feb 4 17:59 flink-sql-connector-kafka_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 36455408 Feb 4 17:14 flink-table_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 41077430 Feb 4 17:14 flink-table-blink_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 208006 Jan 13 19:06 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 301872 Jan 7 18:07 log4j-api-2.17.1.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 1790452 Jan 7 18:07 log4j-core-2.17.1.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 24279 Jan 7 18:07 log4j-slf4j-impl-2.17.1.jar
触发MySQL binlog 数据的生成,并接入kafka的topic :mydw.base_province中,kafka消费端接收的数据如下:
[bigdata_admin@dn5 ~]$ kafka-console-consumer --zookeeper dn3:2181 --topic mydw.base_province {"data":[{"id":"6","name":"上海","region_id":"2","area_code":"310000"}],"database":"rtdw_test_gmall","es":1652270652000,"id":62,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"上海1"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652270652976,"type":"UPDATE"}
{"data":[{"id":"1","name":"北京1","region_id":"1","area_code":"110000"}],"database":"rtdw_test_gmall","es":1652272862000,"id":282,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"北京"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272862913,"type":"UPDATE"}
{"data":[{"id":"1","name":"北京","region_id":"1","area_code":"110000"}],"database":"rtdw_test_gmall","es":1652272892000,"id":286,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"北京1"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272892515,"type":"UPDATE"}
{"data":[{"id":"2","name":"天津市1","region_id":"1","area_code":"120000"}],"database":"rtdw_test_gmall","es":1652272930000,"id":291,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"天津市"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272930256,"type":"UPDATE"}
{"data":[{"id":"3","name":"山西1","region_id":"1","area_code":"140000"}],"database":"rtdw_test_gmall","es":1652272933000,"id":292,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"山西"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272933760,"type":"UPDATE"}
{"data":[{"id":"4","name":"内蒙古1","region_id":"1","area_code":"150000"}],"database":"rtdw_test_gmall","es":1652272936000,"id":293,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"内蒙古"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272936865,"type":"UPDATE"}
{"data":[{"id":"2","name":"天津市","region_id":"1","area_code":"120000"}],"database":"rtdw_test_gmall","es":1652272944000,"id":295,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"天津市1"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272944673,"type":"UPDATE"}
{"data":[{"id":"3","name":"山西","region_id":"1","area_code":"140000"}],"database":"rtdw_test_gmall","es":1652272947000,"id":296,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"山西1"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272947977,"type":"UPDATE"}
{"data":[{"id":"4","name":"内蒙古","region_id":"1","area_code":"150000"}],"database":"rtdw_test_gmall","es":1652272950000,"id":298,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"内蒙古1"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272950982,"type":"UPDATE"}
再次执行如下 FlinkSQL 查询语句:
Flink SQL> select * from `ods_base_province`;
Flink SQL Client中查询到的结果:
Ctrl + C 后退出结果查询
[INFO] Result retrieval cancelled.
五、错误的jar版本
如果没有引入任何flink-sql-connector-kafka_x.y.z.jar
会产生如下错误:
Flink SQL> select * from `ods_base_province`;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
如果引入的jar包版本是:flink-sql-connector-kafka_2.11-1.12.1.jar
则会产生如下错误:
Flink SQL> select * from `ods_base_province`;
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodError: org.apache.flink.table.factories.DynamicTableFactory$Context.getCatalogTable()Lorg/apache/flink/table/catalog/CatalogTable;
如果引入的jar包版本是:flink-sql-connector-kafka_2.11-1.14.4.jar
则会产生如下错误:
Flink SQL> select * from ods_base_province;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.configuration.DescribedEnum
FlinkSQL to Kafka连接器报错:could not find any factory for identifier ‘kafka‘ that implements相关推荐
- kafka启动报错Java HotSpotTM 64-bit Server VM warning:INFO: os::commit_memory
版权声明:版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/qq_39501726/article/details/81938695 kafka启动报错 ...
- kafka tool报错:Error fetching datea.Offset out of range
kafka tool报错 Error fetching datea.Offset out of range 解决方案: Kafka tool 升级最新2.0版本,没有此错误 转载于:https://w ...
- 【kafka】kafka消费者报错INVALID_FETCH_SESSION_EPOCH
1.概述 kafka消息积压会导致cpu飙升吗 我从kafka中批量拿日志 遍历然后json反序列化成对象 在将对象批量入库 在某次入库之后日志打印INVALID_FETCH_SESSION_EPOC ...
- 【kafka】kafka 启动报错 InvalidReceiveException: Invalid receive (size = -720899)
1.概述 kafka启动报错InvalidReceiveException: Invalid receive (size = -720899) 参考文章:https://bbs.csdn.net/to ...
- 【Kafka】kafka 偶然报错 NotLeaderForPartitionException
文章目录 1.背景 1.背景 本地启动的kafka,想消费一下,然后报错如下,没有复现 [lcc@lcc ~/soft/kafka/kafka_2.11-0.10.0.0]
- Kafka 启动报错 AccessDeniedException
Kafka 启动报错 AccessDeniedException 今天在本地电脑window环境上下载kafka,启动zookeeper正常,启动kafka时报:java.nio.file.Acces ...
- 服务器上Kafka启动报错:error=‘Cannot allocate memory‘ (errno=12)
文章目录 环境 经历如下弯路才查看到报错信息 解决方法 1.kill一些不用的进程,来腾出内存. 2.修改默认配置,减少软件启动需要的内存 启动成功 其他 参考 解决问题思路:大问题拆小问题.从源头( ...
- SpringBoot启动报错:org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean
SpringBoot启动报错:org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean ...
- Kafka : kafka重启报错 ZkClient allready closed
1.美图 2.背景 kafka突然发现,有问题了,然后重启,却发现报错,但是zk是好的.
最新文章
- iOS Webview打开不受信的URL
- Ubuntu 无线密码破解利器aircrack-ng
- 树形菜单 php,简单的树形菜单_php
- 操作云数据库出现Access denied for user ‘common_1‘@‘%‘ to database ‘seata_storage‘
- jrebel不能使用ajax,Jrebel不生效的原因和解决办法
- 写代码还在用abcd命名,等着出大问题被开除吧(变量命名方法)
- mysql result mysqli_MYSQLI_USE_RESULT or MYSQLI_STORE_RESULT
- bzoj 2244: [SDOI2011]拦截导弹
- fork()成为负担,需要淘汰 | 极客头条
- FreeSwitch给会议室人员增加标识
- 服务器修改字体,云服务器怎么修改字体
- Jabber服务器部署
- 互联网开放平台纵横论
- 在firefox中 屏蔽CSDN博客广告 + 添加百度搜索引擎
- 渗透学什么?渗透测试中超全的提权思路来了!
- KPM算法——数据结构|复习局|串|复杂模式匹配算法|二维数组解决KPM
- Linux Centos7.x下安装部署Jira和confluence以及破解方法详述
- Python数据可视化第 3 讲:matplotlib绘图之函数plot()
- 刘强东的代码水平如何?网友:95年一个晚上赚5万
- 计算机的硬盘维修,四大电脑硬盘常见错误及修复方案
热门文章
- 如何实现“轻高精地图”的城市NOH?毫末自动驾驶的8大亮点
- 官方代付系统/支付宝微信代付/企业付款/提现秒到
- 微信小程序 - 入门篇
- python江红书后第六章上机答案_第六章上机题答案
- Elastic Stack容器化部署拓展(Https、AD域集成)并收集Cisco设备的日志信息
- 从整体视角了解情感分析、文本分类!
- 那些年啊,那些事——一个程序员的奋斗史 ——72
- goLang 如何开发 windows 窗口界面
- vba根据内容调整word表格_word表格技巧:如何对表格进行样式批处理
- 什么是条码,条码技术的应用,主要有哪些优势?