Flink SQL Client实现CDC实验
概述
本文主要是對[7]中內容的復現
环境
组件 | 版本 |
Flink(HA) | 1.12 |
Zookeeper | 3.6.0 |
flink-sql-connector-mysql-cdc | 1.1.1 |
Mysql | 8.0.22-0ubuntu0.20.04.2 |
實驗流程圖
Mysql的同步配置
/etc/mysql/mysql.conf.d/mysqld.cnf中的[mysqld]下面添加:
# 前面还有其他配置
# 添加的部分
server-id = 12345
log-bin = mysql-bin
# 必须为ROW
binlog_format = ROW
# 必须为FULL,MySQL-5.7后才有该参数
binlog_row_image = FULL
expire_logs_days = 10
查看binlog默认配置:
SHOW VARIABLES LIKE '%binlog%';
service mysql restart
创建用于同步的用户,并给予权限(可供参考的文档)
-- 设置拥有同步权限的用户
CREATE USER 'appleyuchi' IDENTIFIED BY 'appleyuchi';
-- 赋予同步相关权限
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'appleyuchi';
创建用户并赋予权限成功后,使用该用户登录MySQL,可以使用以下命令查看主从同步相关信息
SHOW MASTER STATUS;
SHOW SLAVE STATUS;
SHOW BINARY LOGS;
详细操作步骤
操作內容 | 效果 |
Mysql建立表格 | |
Mysql插入數據 | |
Flink SQL Client建立Source | |
Flink SQL Client建立Sink | |
mysql_binlog的數據插入到tb_sink | |
查看當前Flink集羣的taskmanager中的數據 | |
更新Mysql以後,再次查看Flink集羣的taskmanager中的數據 |
上述表格中各个步骤需要的SQL汇总如下:
https://gitee.com/appleyuchi/Flink_Code/blob/master/FLINK读写各种数据源/cdc.sql
這裏需要注意,如果本實驗的source中的mysql的IP是一個外網IP,那麼需要確保mysql所在節點可以被外網訪問,
否則會無法順利提交任務到集羣.上述鏈接中使用的localhost,沒有該問題
實驗結論
當Mysql中修改數據以後,我們會發現Flink集羣任務中的Task Manager的Stdout也會立刻做出修改.
因此CDC的同步功能順利實現
CDC其实就是类似于一个金山同步盘一样的功能,上游的数据改动后,同步到下游.
依赖问题
https://maven.aliyun.com/mvn/search
搜索flink-sql-connector-mysql-cdc-1.1.1.jar
然后放到$FLINK_HOME/lib下面,然后同步到集群的其他节点
異常
Caused by: java.sql.SQLNonTransientConnectionException: Public Key Retrieval is not allowed
如果集羣兩個節點Desktop和Laptop
此時Laptop無法登錄Desktop的mysql就會導致Flink集羣發生這種異常報錯
此时下述两个解决办法取其一即可:
①让Desktop中的mysql支持外网访问
②source中的mysql的IP从原本的Desktop改成localhost
Reference:
[1]Flink SQL Client + Mysql CDC 部署实践(yaml格式)
[2]关于flink:Flink-SQL-Client-Mysql-CDC-部署实践(yaml格式)
[3]Flink SQL CDC(涉及ElasticSearch的)
[4]Flink SQL CDC 上线!我们总结了 13 条生产实践经验(提到维度表Join和双流Join不一样)
[5]Flink1.11中的CDC Connectors操作实践(非常详细)
[6]基于 Flink SQL CDC的实时数据同步方案(没啥用,讲了一系列的场景和架构)
[7]Flink示例——Flink-CDC(DDL嵌入到代码中的形式)
[8]Mysql binlog详解(详细介绍)
Flink SQL Client实现CDC实验相关推荐
- Flink SQL Client注册JAVA UDF完整流程
概述 听大佬说[1]里面有flink sql client注册udf的方法 去看了一眼,全是文字,闹心,索性琢磨了一下,记录下来. UDF的完整maven工程 https://github.com/a ...
- Flink SQL Client方言切换与datagen->Hive(DDL形式+streaming形式)
概述 本文是对[1]的完整复现,补充了[1]中缺失的大量细节. 切换方言 切换目标 Flink SQL Client命令 切换为hive SET table.sql-dialect=hive; 切换为 ...
- Flink SQL Client进行Kafka事实表与Hbase维度表Join(纯DDL/SQL方式)
概述: 對參考鏈接[1]進行DDL上的復現. 一些基本的業務常识 來源載體 數據特點 維表 Mysql/Csv/Hbase 很少變化 事實表 Kafka 不停變化 开发环境与准备工作 组件 版本 ...
- Flink SQL Client读Kafka+流计算(DDL方式+代碼嵌入DDL/SQL方式)
#################################################################################################### ...
- flink sql client讀取kafka數據的timestamp(DDL方式)
实验目的 Kafka的数据能让Flink SQL Client读取到 本文是对[1]的详细记载 具体操作步骤 ①啓動hadoop集羣,離開安全模式 ②各个节点都关闭防火墙: service firew ...
- Flink SQL Client注册SCALA UDF完整流程
UDF的完整maven工程與SQL https://github.com/appleyuchi/Flink_SQL_Client_UDF 完整操作步骤 ①mvn scala:compile packa ...
- flink sql client读取hive时卡住
问题复现如下: 查看$FLINK_HOME/log/flink-appleyuchi-sql-client-Desktop.log 2020-12-23 11:48:56,811 INFO org. ...
- Flink SQL Client讀取csv中的數據(轉載+總結)
根據官方文檔[2] Flink SQL啓動方式 啓動命令 (1)starting an embedded standalone process $FLINK_HOME/bin/sql-client.s ...
- 【阿里云EMR实战篇】以EMR测试集群版本为例,详解 Flink SQL Client 集成 Hive 使用步骤
简介: 以测试集群版本为例(EMR-4.4.1)-- Flink SQL Client 集成 Hive 使用文档 作者:林志成,阿里云EMR产品团队技术支持,拥有多年开源大数据经验 1.以测试集群版本 ...
最新文章
- 数据结构——非线性结构
- linux 中的who的参数,linux who命令参数及用法详解
- 思维导图学 Linux Shell攻略之小试牛刀篇
- 《Raspberry Pi用户指南》——2.4 使用外部存储设备
- 组件生命周期管理和通信方案
- 测试点4错的来:1029 旧键盘 (20分)
- Jerry入职SAP成都研究院14周年纪念日
- python语言的三个主要特点_python干货|新总结的4个python语言的特点,这几个细节值得关注...
- 机器学习爬大树之(GBDT原理)--二分类篇
- 人是什么垃圾?AI识别功能用到垃圾分类上时 网友试了试“扫自己”
- Microsoft SQL Server 2008 Management Studio Express 下载地址
- PostgreSQL 10 新特性, 流式接收端在线压缩redo
- 批量将csv转xls
- Flutter作插件的研究(学习)记录
- 项目管理-项目风险管理
- WIN10虚拟机安装教程
- 浏览器书签栏的小图标设置
- CSS雪碧图Sprite
- Power BI时间智能
- 网页Loding效果的实现