Flink CDC 系列文章:
《Flink CDC 系列(1)—— 什么是 Flink CDC》
《Flink CDC 系列(2)—— Flink CDC 源码编译》
《Flink CDC 系列(3)—— Flink CDC MySQL Connector 与 Flink SQL 的结合使用案例Demo》
《Flink CDC 系列(4)—— Flink CDC MySQL Connector 常用参数表》
《Flink CDC 系列(5)—— Flink CDC MySQL Connector 启动模式》
《Flink CDC 系列(6)—— Flink CDC MySQL Connector 工作机制之 Incremental Snapshot Reading》
《Flink CDC 系列(7)—— 从 MySQL 到 ElasticSearch》

文章目录

  • 系统环境
  • MySQL 测试数据准备
  • Flink CDC 源码编译
  • Flink 集群准备
  • 演示开始
    • 1. 启动 Flink SQL Client
    • 2. 在 Flink SQL Client 中执行 DDL 和 查询
    • 3. 在 MySQL 客户端继续插入数据
    • 4. Flink SQL Client 观察数据变化
    • 5. 在 MySQL 客户端更新数据
    • 6. Flink SQL Client 查看数据是否有数据更新
    • 6. 在 MySQL 客户端删除数据
    • 7. 在 Flink SQL Client 客户端查看数据是否被删除

系统环境

Ubuntu 20.04
JDK 1.8
Maven 3.6.3

MySQL 测试数据准备


mysql> CREATE DATABASE mydb;mysql> USE mydb;mysql> CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));mysql> ALTER TABLE products AUTO_INCREMENT = 101;mysql> INSERT INTO productsVALUES (default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer"),(default,"hammer","16oz carpenter's hammer"),(default,"rocks","box of assorted rocks"),(default,"jacket","water resistent black wind breaker"),(default,"spare tire","24 inch spare tire");

Flink CDC 源码编译

参考文章《Flink CDC 系列(2)—— Flink CDC 源码编译》
编译产生的 Jar 文件在后面的 Flink 集群准备
需要用到。

Flink 集群准备

##  下载 flink 1.13.6 的二进制安装包
axel -n 20 https://archive.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz## 解压
tar xvf flink-1.13.6-bin-scala_2.11.tgz## 将 flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar 拷贝到 flink lib 目录下,该文件由 Flink CDC 源码编译得到
cp /opt/flink-cdc-connectors/flink-sql-connector-mysql-cdc/target/flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar /opt/flink-1.13.6/lib## 启动单机集群
cd flink-1.13.6
bin/start-cluster.sh## 查看 jobmanager 和 taskmanager 的进程是否存活
jps -m
## 正常情况会出现两个进程,如下:
$ jps -m
67440 StandaloneSessionClusterEntrypoint --configDir /opt/flink-1.13.6/conf --executionMode cluster -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=1073741824b -D jobmanager.memory.jvm-overhead.max=201326592b
68054 Jps -m
67705 TaskManagerRunner --configDir /opt/flink-1.13.6/conf -D taskmanager.memory.network.min=134217730b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=134217730b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=536870920b -D taskmanager.memory.task.heap.size=402653174b -D taskmanager.numberOfTaskSlots=1 -D taskmanager.memory.jvm-overhead.max=201326592b

演示开始

建议启动两个命令行窗口,一个运行 Flink SQL Client , 另一个运行 MySQL Client。

1. 启动 Flink SQL Client

cd /opt/flink-1.13.6
bin/sql-client.sh

2. 在 Flink SQL Client 中执行 DDL 和 查询

Flink SQL> SET execution.checkpointing.interval = 3s
Flink SQL> CREATE TABLE products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.64.6','port' = '3306','username' = 'test','password' = 'test','database-name' = 'mydb','table-name' = 'products');Flink SQL> select count(1) from products;
-- 结果为9
-- 不退出,继续下一步

3. 在 MySQL 客户端继续插入数据

mysql> INSERT INTO products VALUES (default,"scooter1","Small 2-wheel scooter");
INSERT INTO products VALUES (default,"scooter2","Small 2-wheel scooter");
INSERT INTO products VALUES (default,"scooter3","Small 2-wheel scooter");
INSERT INTO products VALUES (default,"scooter4","Small 2-wheel scooter");

4. Flink SQL Client 观察数据变化

观察 Flink SQL Client 窗口的数值变化,此时数值应为 13。

5. 在 MySQL 客户端更新数据

mysql> update products set name = 'scooter0001' where id = 101;

6. Flink SQL Client 查看数据是否有数据更新

Flink SQL> select * from products;


可以看到 id=101 的数据已经更新了。

6. 在 MySQL 客户端删除数据

mysql> delete from products where id = 101;

7. 在 Flink SQL Client 客户端查看数据是否被删除

Flink SQL> select * from products;


可以看到 id=101 的数据已经被删除了。

Flink CDC 系列(3)—— Flink CDC MySQL Connector 与 Flink SQL 的结合使用案例Demo相关推荐

  1. Flink CDC 系列(7)—— 从 MySQL 到 ElasticSearch

    Flink CDC 系列文章: <Flink CDC 系列(1)-- 什么是 Flink CDC> <Flink CDC 系列(2)-- Flink CDC 源码编译> < ...

  2. Flink CDC 系列(1)—— 什么是 Flink CDC

    Flink CDC 系列文章: <Flink CDC 系列(1)-- 什么是 Flink CDC> <Flink CDC 系列(2)-- Flink CDC 源码编译> < ...

  3. 精华文章置顶--CDC系列之一 :使用Dejournal Filter在InterSystems IRIS/Caché上通过Mirroring实现CDC功能...

    InterSystems IRIS/Caché的CDC InterSystems IRIS/Caché未提供开箱即用的变更数据捕获(CDC)工具,而且由于其多模型建模能力和底层的多维存储模型,相对于关 ...

  4. 技术解析|Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once精准接入

    本篇文档将演示如何使用 Apache Doris Flink Connector 结合 Flink CDC 以及 Doris Stream Load 的两阶段提交,实现 MySQL 数据库分库分表实时 ...

  5. Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once精准接入

    导读:本篇文档将演示如何使用 Apache Doris Flink Connector 结合 Flink CDC 以及 Doris Stream Load 的两阶段提交,实现 MySQL 数据库分库分 ...

  6. flink cdc 2.2.1 mysql connector

    报错 java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent ...

  7. Flink CDC 系列 - Flink CDC 如何简化实时数据入湖入仓

    摘要:本文整理自伍翀 (云邪).徐榜江 (雪尽) 在 Flink Forward Asia 2021 的分享,该分享以 5 个章节详细介绍如何使用 Flink CDC 来简化实时数据的入湖入仓, 文章 ...

  8. flink mysql connector_Flink JDBC Connector:Flink 与数据库集成最佳实践

    整理:陈政羽(Flink 社区志愿者) 摘要:Flink 1.11 引入了 CDC,在此基础上, JDBC Connector 也发生比较大的变化,本文由 Apache Flink Contribut ...

  9. Flink SQL篇,SQL实操、Flink Hive、CEP、CDC、GateWay

    Flink源码篇,作业提交流程.作业调度流程.作业内部转换流程图 Flink核心篇,四大基石.容错机制.广播.反压.序列化.内存管理.资源管理 Flink基础篇,基本概念.设计理念.架构模型.编程模型 ...

最新文章

  1. [原创]函数指针的应用
  2. Oracle XTTS跨平台数据库迁移(从Unix迁移数据库到Linux)_Oracle数据库迁移项
  3. 这两天发现一个老外用JavaScript编写的好东西:dp.SyntaxHighlighter。
  4. git如何合并指定文件内容_git小技巧--如何从其他分支merge个别文件或文件夹
  5. python3(六)监督学习
  6. 通过邮箱远程控制电脑
  7. linux(3):Linux MBR分区、挂载操作步骤,逻辑卷扩容操作
  8. 字符串、列表、元组、字典
  9. SpringMVC中ModelAndView对象与“视图解析器”
  10. 面对对象三大特性之一继承性。
  11. 图说:Windows 8 Copy的呈现变化
  12. 一段时间即可做一些事情,如二十分钟
  13. Word解析之Word内部结构
  14. dx逆向建模步骤_什么是3D打印?游戏建模具体是什么的?哪个更有发展
  15. pikachu靶场之暴力破解
  16. 2021.03.17 pokémon小游戏开发记录与周总结
  17. 快看!如何实现快速赢利--国内期货反跟单
  18. 揭秘:QQ号码能准确测出QQ主人年龄问题
  19. 小梅哥FPGA学习笔记——串口发送模块
  20. 一台完整的计算机硬件是由等组成的,一台计算机的硬件系统是由运算器、控制器、存储器、输入和输出设备五部分组成的。...

热门文章

  1. htc one x android5.0,HTC One M8升级Android 5.0后有哪些变化?
  2. 横向联邦学习-梯度安全聚合
  3. 如何删除百度IE伴侣error about c:/windows/downlo~1/bdplugin.dll
  4. dd命令与黑洞、白洞文件
  5. Planar Evasive Aircrafts Maneuvers Using Reinforcement Learning
  6. 德汇律师事务所任命Ray Liu为北京办事处主管
  7. 模拟快递单号查询案例(放大输入内容)
  8. 如何设置计划任务程序 每6小时运行一次_教你如何使用Folx专业版的任务计划功能...
  9. ES elasticsearch 系统默认配置设置方法 配置文件
  10. R绘散点图+误差线+显示回归线和R2