本文基于 Flink-1.12

文章目录

  • 本文基于 Flink-1.12
  • 一、操作
    • 1.1、MySQL 创建表结构
    • 1.2、JDBC Connector
      • 1.2.0、下载以下 jar 包到 `/lib/`:
      • 1.2.1、Flink SQL 创建表结构
      • 1.2.3、测试
    • 1.3、Flink CDC
      • 1.3.1、Flink SQL CLI 创建表
        • 注意: 下表中的 connector 是 mysql-cdc 与 1.2 要区分开来。
      • 1.3.2、测试
  • 二、问题
  • 关注我的公众号【宝哥大数据】

一、操作

1.1、MySQL 创建表结构

-- MySQL
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer"),(default,"hammer","16oz carpenter's hammer"),(default,"rocks","box of assorted rocks"),(default,"jacket","water resistent black wind breaker"),(default,"spare tire","24 inch spare tire");CREATE TABLE orders (order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,order_date DATETIME NOT NULL,customer_name VARCHAR(255) NOT NULL,price DECIMAL(10, 5) NOT NULL,product_id INTEGER NOT NULL,order_status BOOLEAN NOT NULL -- 是否下单
) AUTO_INCREMENT = 10001;INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);CREATE TABLE shipments (shipment_id INTEGER NOT NULL AUTO_INCREMENT  PRIMARY KEY,order_id INTEGER NOT NULL,origin VARCHAR(255) NOT NULL,destination VARCHAR(255) NOT NULL,is_arrived BOOLEAN NOT NULL
);ALTER TABLE shipments AUTO_INCREMENT = 101;INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),(default,10002,'Hangzhou','Shanghai',false),(default,10003,'Shanghai','Hangzhou',false);

1.2、JDBC Connector

1.2.0、下载以下 jar 包到 <FLINK_HOME>/lib/:

flink-sql-connector-mysql-cdc-1.0.0.jar

1.2.1、Flink SQL 创建表结构

启动 Flink 集群,再启动 SQL CLI

 -- Flink SQLCREATE TABLE products (id INT,name STRING,description STRING
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/mydb','username' = 'root','password' = '123456','table-name' = 'products'
);CREATE TABLE orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/mydb','username' = 'root','password' = '123456','table-name' = 'orders'
);CREATE TABLE shipments (shipment_id INT,order_id INT,origin STRING,destination STRING,is_arrived BOOLEAN
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://locahost:3306/mydb','username' = 'root','password' = '123456','table-name' = 'shipments'
);

1.2.3、测试

1、多表关联

SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
FROM orders AS o
LEFT JOIN products AS p ON o.product_id = p.id
LEFT JOIN shipments AS s ON o.order_id = s.order_id;

2、插入新的 订单表 和 出货表

--MySQL
INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);--MySQL
INSERT INTO shipments
VALUES (default,10004,'Shanghai','Beijing',false);

3、修改订单状态

--MySQL
UPDATE orders SET order_status = true WHERE order_id = 10004;

多表联合的数据 的 状态并没有变化,Flink SQL Connector 不能实时的更新数据的状态。

1.3、Flink CDC

为了能够实时捕获 数据库 的 动态变更, 解决上面问题

1.3.1、Flink SQL CLI 创建表

注意: 下表中的 connector 是 mysql-cdc 与 1.2 要区分开来。

--FlinkSQL
CREATE TABLE products (id INT,name STRING,description STRING
) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'root','password' = '123456','database-name' = 'mydb','table-name' = 'products'
);CREATE TABLE orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN
) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'root','password' = '123456','database-name' = 'mydb','table-name' = 'orders'
);CREATE TABLE shipments (shipment_id INT,order_id INT,origin STRING,destination STRING,is_arrived BOOLEAN
) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'root','password' = '123456','database-name' = 'mydb','table-name' = 'shipments'
);# sink table
##################################
需要注意的是sink_table的输出是无法在SQL client上面查看的。需要打开Flink Web UI的Task Managers页面的stdout标签
##################################CREATE TABLE enriched_orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,product_name STRING,product_description STRING,shipment_id INT,origin STRING,destination STRING,is_arrived BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'print');

1.3.2、测试

1、多表关联


insert into enriched_orders
SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
FROM orders AS o
LEFT JOIN products AS p ON o.product_id = p.id
LEFT JOIN shipments AS s ON o.order_id = s.order_id;

查看 stdout

2、插入新的 订单表 和 出货表

--MySQL
INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);--MySQL
INSERT INTO shipments
VALUES (default,10004,'Shanghai','Beijing',false);

3、修改订单状态

--MySQL
UPDATE orders SET order_status = true WHERE order_id = 10004;

二、问题

1、[ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException:com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction
Flink 与 MySql-cdc的版本兼容问题

关注我的公众号【宝哥大数据】

Flink CDC 实战相关推荐

  1. Flink原理解析50篇(四)-基于 Flink CDC 打通数据实时入湖

    在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎Flink和数据湖Apache Iceberg两种技术,来解决业务数据实时入湖相关的问题. 0 ...

  2. 基于Flink CDC打通数据实时入湖

    作者 | 数据社       责编 | 欧阳姝黎 在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎 Flink 和数据湖 Apache Ice ...

  3. Flink CDC 2.0原理详解和生产实践

    点击上方蓝色字体,选择"设为星标" 回复"面试"获取更多惊喜 八股文交给我,你们专心刷题和面试 Hi,我是王知无,一个大数据领域的原创作者. 放心关注我,获取更 ...

  4. Flink CDC 2.0 正式发布,详解核心改进

    简介:Flink CDC 2.0.0 版本于 8 月 10 日正式发布,点击了解详情- 本文由社区志愿者陈政羽整理,内容来源自阿里巴巴高级开发工程师徐榜江 (雪尽) 7 月 10 日在北京站 Flin ...

  5. 37 手游基于 Flink CDC + Hudi 湖仓一体方案实践

    简介: 介绍了 37 手游为何选择 Flink 作为计算引擎,并如何基于 Flink CDC + Hudi 构建新的湖仓一体方案. 本文作者是 37 手游大数据开发徐润柏,介绍了 37 手游为何选择 ...

  6. Flink CDC 实时同步mysql

    前言 在实际开发中,需要做数据同步的场景是非常多的,比如不同的应用之间不想直接通过RPC的方式进行数据交互,或者说下游应用需要检测来自上游应用的某些业务指标数据的变化时,这些都可以考虑使用数据同步的方 ...

  7. Flink CDC 新一代数据集成框架

    前言: 主要讲解了技术原理,入门与生产实践,主要功能:全增量一体化数据集成.实时数据入库入仓.最详细的教程.Flink CDC 是Apache Flink的一个重要组件,主要使用了CDC技术从各种数据 ...

  8. flink cdc 2.2.1 mysql connector

    报错 java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent ...

  9. XTransfer技术专家亮相Flink CDC Meetup

    背景信息:Flink CDC 是实时数据集成框架的开源代表,具有全增量一体化.无锁读取.并发读取.分布式架构等技术优势,在开源社区中非常受欢迎. 为促进 Flink CDC 技术的交流和发展,社区于 ...

最新文章

  1. 如何评估互阻抗放大器(第 2 部分)
  2. php在dw中设置按钮圆角,Dreamweaver怎么用CSS制作圆角按钮?
  3. DISTINCT 去重---SQL
  4. 文本比较算法Ⅴ——回顾贴,对前面几篇文章的回顾与质疑
  5. uItron内核原理和服务调用--Kernel篇
  6. HTML5学习笔记简明版(10):废弃的元素和属性
  7. TCP请求发送和接收,如果接收端终止,发送端继续发送会出什么错
  8. bzoj 1801: [Ahoi2009]chess 中国象棋【dp】
  9. mlp 参数调优_积神经网络(CNN)的参数优化方法
  10. JxBrowser概述与简单应用
  11. python 示例_Python条件类| release()方法与示例
  12. python不同数据类型的式子_Python 基础篇:数据类型、数据运算、表达
  13. Python精通-Python字典操作
  14. HDU Calling Extraterrestrial Intelligence Again
  15. Citrix XenAPP DS角色
  16. linux怎么进入mnt目录,「Linux基础知识」Linux路径的表示方式
  17. 为什么要使用PPTP协议代理ip?
  18. “压缩(zipped)文件夹“G:\Program\Wechat WeChatFiles(wxid cqpx72n77z9x22 FileStorage\File 2022-12 基...
  19. 开发板qt移植和交叉开发环境搭建学习笔记
  20. 解决windows 2003 sp1下安装arcsde 9.0 for sql server 2000 sp4概要方案

热门文章

  1. iqc工作职责和工作内容_IQC岗位职责
  2. 中兴java翻盖_中兴折叠手机专利曝光:柔性翻盖屏+双摄
  3. FlashFxp简单使用教程
  4. Win7家庭高级版有什么功能
  5. 最新版Photoshop 2023(ps2023)特色功能有哪些?
  6. java解析js字典,javaj解析js文件,请问怎么写通用的方法
  7. 谈弱电系统工程包含了哪些系统?
  8. POJ 2341 优先队列
  9. Python:实现simulated annealing模拟退火算法(附完整源码)
  10. idea字体大小放大和缩小的快捷键设置