一、软件环境

环境 & 软件 版本
LinuxOS CentOS 7
Flink Standalone单节点测试集群 1.13.6
kafka 0.11.0
Zookeeper 3.4.5

二、启动FlinkSql Client

1.13.6版本的flink中的flinksql客户端还是Beta版本

启动FlinkSql客户端:

[bigdata_admin@dn5 bin]$ ./sql-client.sh embedded
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set.
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
No default environment specified.
Searching for '/data/flink-1.13.6/conf/sql-client-defaults.yaml'...not found.
Command history file path: /home/bigdata_admin/.flink-sql-history▒▓██▓██▒▓████▒▒█▓▒▓███▓▒▓███▓░░        ▒▒▒▓██▒  ▒░██▒   ▒▒▓▓█▓▓▒░      ▒██████▒         ░▒▓███▒    ▒█▒█▒░▓█            ███   ▓░▒██▓█       ▒▒▒▒▒▓██▓░▒░▓▓██░ █   ▒▒░       ███▓▓█ ▒█▒▒▒████░   ▒▓█▓      ██▒▒▒ ▓███▒░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓███▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓█▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒▓█   ▒█▓   ░     █░                ▒█              █▓█▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░█▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒███   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░______ _ _       _       _____  ____  _         _____ _ _            _  BETA   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |  | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|| |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.Flink SQL> HELP;
The following commands are available:CLEAR           Clears the current terminal.
CREATE TABLE            Create table under current catalog and database.
DROP TABLE              Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'
CREATE VIEW             Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'
DESCRIBE                Describes the schema of a table with the given name.
DROP VIEW               Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'
EXPLAIN         Describes the execution plan of a query or table with the given name.
HELP            Prints the available commands.
INSERT INTO             Inserts the results of a SQL SELECT query into a declared table sink.
INSERT OVERWRITE                Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.
QUIT            Quits the SQL CLI client.
RESET           Resets a session configuration property. Syntax: 'RESET <key>;'. Use 'RESET;' for reset all session properties.
SELECT          Executes a SQL SELECT query on the Flink cluster.
SET             Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties.
SHOW FUNCTIONS          Shows all user-defined and built-in functions or only user-defined functions. Syntax: 'SHOW [USER] FUNCTIONS;'
SHOW TABLES             Shows all registered tables.
SOURCE          Reads a SQL SELECT query from a file and executes it on the Flink cluster.
USE CATALOG             Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'
USE             Sets the current default database. Experimental! Syntax: 'USE <name>;'
LOAD MODULE             Load a module. Syntax: 'LOAD MODULE <name> [WITH ('<key1>' = '<value1>' [, '<key2>' = '<value2>', ...])];'
UNLOAD MODULE           Unload a module. Syntax: 'UNLOAD MODULE <name>;'
USE MODULES             Enable loaded modules. Syntax: 'USE MODULES <name1> [, <name2>, ...];'
BEGIN STATEMENT SET             Begins a statement set. Syntax: 'BEGIN STATEMENT SET;'
END             Ends a statement set. Syntax: 'END;'Hint: Make sure that a statement ends with ';' for finalizing (multi-line) statements.

FlinkSQL建表关联Kafka对应的topic

构建测试表:

Flink SQL> DROP TABLE IF EXISTS `ods_base_province`;
[INFO] Execute statement succeed.Flink SQL> CREATE TABLE `ods_base_province` (
>   `id` INT,
>   `name` STRING,
>   `region_id` INT ,
>   `area_code`STRING
> ) WITH(
> 'connector' = 'kafka',
>  'topic' = 'mydw.base_province',
>  'properties.bootstrap.servers' = 'dn5:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'canal-json' ,
>  'scan.startup.mode' = 'earliest-offset'
> ) ;
[INFO] Execute statement succeed.

这里的format还有json、csv等类型,需根据数据的实际应用场景来确定。

三、问题症状

建表成功,但是查询其下数据的时候,报了如下错误:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.Available factory identifiers are:blackhole
datagen
filesystem
print

四、问题原因 & 解决方案

问题原因:

Flink 运行时上下文 classpath 中缺少flinksql与kafka的连接器jar包(如: flink-sql-connector-kafka_2.11-1.x.y.jar)

解决方案:

通过在Flink集群安装目录${FLINK_HOME}的lib下(如:/data/flink-1.13.6/lib),引入如下 jar:

  • flink-sql-connector-kafka_2.11-1.13.6.jar

下载位置:
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka_2.11/1.13.6

此时lib包下jar:

[bigdata_admin@dn5 lib]$ ll
total 202084
-rw-r--r-- 1 bigdata_admin bigdata_admin     92314 Feb  4 17:11 flink-csv-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin 115425612 Feb  4 17:15 flink-dist_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin    148127 Feb  4 17:11 flink-json-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin   7709740 May  7  2021 flink-shaded-zookeeper-3.4.14.jar
-rw-rw-r-- 1 bigdata_admin bigdata_admin   3674190 Feb  4 17:59 flink-sql-connector-kafka_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin  36455408 Feb  4 17:14 flink-table_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin  41077430 Feb  4 17:14 flink-table-blink_2.11-1.13.6.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin    208006 Jan 13 19:06 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin    301872 Jan  7 18:07 log4j-api-2.17.1.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin   1790452 Jan  7 18:07 log4j-core-2.17.1.jar
-rw-r--r-- 1 bigdata_admin bigdata_admin     24279 Jan  7 18:07 log4j-slf4j-impl-2.17.1.jar

触发MySQL binlog 数据的生成,并接入kafka的topic :mydw.base_province中,kafka消费端接收的数据如下:

[bigdata_admin@dn5 ~]$ kafka-console-consumer --zookeeper dn3:2181 --topic mydw.base_province {"data":[{"id":"6","name":"上海","region_id":"2","area_code":"310000"}],"database":"rtdw_test_gmall","es":1652270652000,"id":62,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"上海1"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652270652976,"type":"UPDATE"}
{"data":[{"id":"1","name":"北京1","region_id":"1","area_code":"110000"}],"database":"rtdw_test_gmall","es":1652272862000,"id":282,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"北京"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272862913,"type":"UPDATE"}
{"data":[{"id":"1","name":"北京","region_id":"1","area_code":"110000"}],"database":"rtdw_test_gmall","es":1652272892000,"id":286,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"北京1"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272892515,"type":"UPDATE"}
{"data":[{"id":"2","name":"天津市1","region_id":"1","area_code":"120000"}],"database":"rtdw_test_gmall","es":1652272930000,"id":291,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"天津市"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272930256,"type":"UPDATE"}
{"data":[{"id":"3","name":"山西1","region_id":"1","area_code":"140000"}],"database":"rtdw_test_gmall","es":1652272933000,"id":292,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"山西"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272933760,"type":"UPDATE"}
{"data":[{"id":"4","name":"内蒙古1","region_id":"1","area_code":"150000"}],"database":"rtdw_test_gmall","es":1652272936000,"id":293,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"内蒙古"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272936865,"type":"UPDATE"}
{"data":[{"id":"2","name":"天津市","region_id":"1","area_code":"120000"}],"database":"rtdw_test_gmall","es":1652272944000,"id":295,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"天津市1"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272944673,"type":"UPDATE"}
{"data":[{"id":"3","name":"山西","region_id":"1","area_code":"140000"}],"database":"rtdw_test_gmall","es":1652272947000,"id":296,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"山西1"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272947977,"type":"UPDATE"}
{"data":[{"id":"4","name":"内蒙古","region_id":"1","area_code":"150000"}],"database":"rtdw_test_gmall","es":1652272950000,"id":298,"isDdl":false,"mysqlType":{"id":"int(20)","name":"varchar(20)","region_id":"int(20)","area_code":"varchar(20)"},"old":[{"name":"内蒙古1"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"region_id":4,"area_code":12},"table":"base_province","ts":1652272950982,"type":"UPDATE"}

再次执行如下 FlinkSQL 查询语句:

Flink SQL> select * from `ods_base_province`;

Flink SQL Client中查询到的结果:

Ctrl + C 后退出结果查询
[INFO] Result retrieval cancelled.

五、错误的jar版本

如果没有引入任何flink-sql-connector-kafka_x.y.z.jar
会产生如下错误:

Flink SQL> select * from `ods_base_province`;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

如果引入的jar包版本是:flink-sql-connector-kafka_2.11-1.12.1.jar
则会产生如下错误:

Flink SQL> select * from `ods_base_province`;
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodError: org.apache.flink.table.factories.DynamicTableFactory$Context.getCatalogTable()Lorg/apache/flink/table/catalog/CatalogTable;

如果引入的jar包版本是:flink-sql-connector-kafka_2.11-1.14.4.jar
则会产生如下错误:

Flink SQL> select * from ods_base_province;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.configuration.DescribedEnum

FlinkSQL to Kafka连接器报错:could not find any factory for identifier ‘kafka‘ that implements相关推荐

  1. kafka启动报错Java HotSpotTM 64-bit Server VM warning:INFO: os::commit_memory

    版权声明:版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/qq_39501726/article/details/81938695 kafka启动报错 ...

  2. kafka tool报错:Error fetching datea.Offset out of range

    kafka tool报错 Error fetching datea.Offset out of range 解决方案: Kafka tool 升级最新2.0版本,没有此错误 转载于:https://w ...

  3. 【kafka】kafka消费者报错INVALID_FETCH_SESSION_EPOCH

    1.概述 kafka消息积压会导致cpu飙升吗 我从kafka中批量拿日志 遍历然后json反序列化成对象 在将对象批量入库 在某次入库之后日志打印INVALID_FETCH_SESSION_EPOC ...

  4. 【kafka】kafka 启动报错 InvalidReceiveException: Invalid receive (size = -720899)

    1.概述 kafka启动报错InvalidReceiveException: Invalid receive (size = -720899) 参考文章:https://bbs.csdn.net/to ...

  5. 【Kafka】kafka 偶然报错 NotLeaderForPartitionException

    文章目录 1.背景 1.背景 本地启动的kafka,想消费一下,然后报错如下,没有复现 [lcc@lcc ~/soft/kafka/kafka_2.11-0.10.0.0]

  6. Kafka 启动报错 AccessDeniedException

    Kafka 启动报错 AccessDeniedException 今天在本地电脑window环境上下载kafka,启动zookeeper正常,启动kafka时报:java.nio.file.Acces ...

  7. 服务器上Kafka启动报错:error=‘Cannot allocate memory‘ (errno=12)

    文章目录 环境 经历如下弯路才查看到报错信息 解决方法 1.kill一些不用的进程,来腾出内存. 2.修改默认配置,减少软件启动需要的内存 启动成功 其他 参考 解决问题思路:大问题拆小问题.从源头( ...

  8. SpringBoot启动报错:org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean

    SpringBoot启动报错:org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean ...

  9. Kafka : kafka重启报错 ZkClient allready closed

    1.美图 2.背景 kafka突然发现,有问题了,然后重启,却发现报错,但是zk是好的.

最新文章

  1. iOS Webview打开不受信的URL
  2. Ubuntu 无线密码破解利器aircrack-ng
  3. 树形菜单 php,简单的树形菜单_php
  4. 操作云数据库出现Access denied for user ‘common_1‘@‘%‘ to database ‘seata_storage‘
  5. jrebel不能使用ajax,Jrebel不生效的原因和解决办法
  6. 写代码还在用abcd命名,等着出大问题被开除吧(变量命名方法)
  7. mysql result mysqli_MYSQLI_USE_RESULT or MYSQLI_STORE_RESULT
  8. bzoj 2244: [SDOI2011]拦截导弹
  9. fork()成为负担,需要淘汰 | 极客头条
  10. FreeSwitch给会议室人员增加标识
  11. 服务器修改字体,云服务器怎么修改字体
  12. Jabber服务器部署
  13. 互联网开放平台纵横论
  14. 在firefox中 屏蔽CSDN博客广告 + 添加百度搜索引擎
  15. 渗透学什么?渗透测试中超全的提权思路来了!
  16. KPM算法——数据结构|复习局|串|复杂模式匹配算法|二维数组解决KPM
  17. Linux Centos7.x下安装部署Jira和confluence以及破解方法详述
  18. Python数据可视化第 3 讲:matplotlib绘图之函数plot()
  19. 刘强东的代码水平如何?网友:95年一个晚上赚5万
  20. 计算机的硬盘维修,四大电脑硬盘常见错误及修复方案

热门文章

  1. 如何实现“轻高精地图”的城市NOH?毫末自动驾驶的8大亮点
  2. 官方代付系统/支付宝微信代付/企业付款/提现秒到
  3. 微信小程序 - 入门篇
  4. python江红书后第六章上机答案_第六章上机题答案
  5. Elastic Stack容器化部署拓展(Https、AD域集成)并收集Cisco设备的日志信息
  6. 从整体视角了解情感分析、文本分类!
  7. 那些年啊,那些事——一个程序员的奋斗史 ——72
  8. goLang 如何开发 windows 窗口界面
  9. vba根据内容调整word表格_word表格技巧:如何对表格进行样式批处理
  10. 什么是条码,条码技术的应用,主要有哪些优势?