flink-cdc初体验
flink-cdc初体验
产品介绍
flink-cdc是一款专用于数据库增量数据监控的插件。基于flink计算引擎提供的高性能,高可用性,高扩展性的数据监控功能。当前flink-cdc支持读取例如mysql,oracle,mongodb,tidb,postgres等常见的数据库类型,具体支持版本见官方 Support Version。
之前大多数公司会单独部署一些数据同步服务,例如当前flink-cdc的内核产品 Debezium,阿里开源的canal,开源软件datalink(目前我司就是基于这个产品实现的各个业务数据库的binlog监控)。flink-cdc出现之后,用户可以脱离这些繁重的中间产品,使用基于flink部署轻量化的同步服务。更可以对同步的数据再进行进一步加工实现各种业务需求。
公司环境flink-cdc运行依赖大数据基础组件 yarn或者k8s部署flink任务,但是个人体验最低要求仅需要下载flink客户端运行即可。
下面基于mysql-cdc 介绍三种任务运行方式
- sql-client模式
- docker部署模式
- 源码运行模式
sql-client模式
使用flink自带的sql客户端,可以便捷无代码部署任务。
例如需要将一张业务mysql表同步至其他存储介质(hive,es,hbase等支持sql-connector组件)
建两张表分别映射源表和目标表,直接提交insert语句即可生成flink-job。
mysql环境
如果本地没有测试环境可以使用docker部署 当前镜像架构为arm64,使用于mac M芯片,windows x86机器需使用官方镜像
docker pull gsw/mysql-cdc:5.7
启动镜像后初始化表环境
CREATE DATABASE mydb;
USE mydb;
--创建源表
CREATE TABLE `orders` (`order_id` int(11) NOT NULL AUTO_INCREMENT,`order_date` datetime NOT NULL,`customer_name` varchar(255) NOT NULL,`price` decimal(10,5) NOT NULL,`product_id` int(11) NOT NULL,`order_status` tinyint(1) NOT NULL,PRIMARY KEY (`order_id`)
);
--创建sink表
CREATE TABLE `orders_sink` (`order_id` int(11) NOT NULL AUTO_INCREMENT,`order_date` datetime NOT NULL,`customer_name` varchar(255) NOT NULL,`price` decimal(10,5) NOT NULL,`product_id` int(11) NOT NULL,`order_status` tinyint(1) NOT NULL,PRIMARY KEY (`order_id`)
);
--源表插入测试数据
INSERT INTO mydb.orders (order_id, order_date, customer_name, price, product_id, order_status) VALUES
(10001, '2020-07-30 10:08:22', 'Jark', 50.50000, 102, 0),
(10002, '2020-07-30 10:11:09', 'Sally', 15.00000, 105, 0),
(10003, '2020-07-30 12:00:30', 'Edward', 25.25000, 106, 0),
(10004, '2020-07-30 15:22:00', 'Jark', 29.71000, 106, 1);
flink环境
- 下载flink客户端 flink-1.14.3并解压
- flink-connector-jdbc_2.11-1.14.3.jar flink连接mysql需要
flink-sql-connector-mysql-cdc-2.2.1.jar flink-cdc读取binlog变更需要
mysql-connect-java.jar jdbc基础包
将下载的三个jar放入FLINK_HOME/lib目录下 - 启动单机集群
sh ./bin/start-cluster.sh
- 打开链接http://localhost:8081进入flink作业管理页面,查看集群是否正常启动
任务部署
进入bin目录下,启动sql客户端
./sql-client.sh
建立mysql表映射关系
注意mysql字段与flink sql字段的转换关系 data-type-mapping
--源表映射指定mysql-cdc连接,以便获取binlog变更,server-id指定为mysql配置的id
CREATE TABLE `cdc_orders` (`order_id` int NOT NULL,`order_date` TIMESTAMP ,`customer_name` STRING,`price` decimal(10,5),`product_id` INT,`order_status` BOOLEAN,PRIMARY KEY (`order_id`) NOT ENFORCED
) with ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'root','password' = '123456','database-name' = 'mydb','table-name' = 'orders','server-id' = '1'
);
--sink表映射指定jdbc连接
CREATE TABLE `cdc_orders_sink` (`order_id` int NOT NULL,`order_date` TIMESTAMP ,`customer_name` STRING,`price` decimal(10,5),`product_id` INT,`order_status` BOOLEAN,PRIMARY KEY (`order_id`) NOT ENFORCED
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/mydb?serverTimezone=UTC','username' = 'root','password' = '123456','table-name' = 'orders_sink'
);
--提交数据同步任务从orders向orders_sink同步数据
insert into orders_sink select * from orders;
在mysql中查看sink表数据
docker部署模式
强烈推荐使用docker desktop运行docker环境。MAC机器支持一键安装相关依赖环境,window环境需要在docker桌面版安装完成后手动安装linux内核。
windows安装注意一定要安装WSL2内核,如果不指定版本默认安装的是WSL,当然如果不小心装错了,也可以进行版本转义。具体安装步骤见官方文档windows安装WSL2
docker环境准备完成,就可以参照flink-cdc官方文档体验基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL
源码运行模式
源码模式同样依赖docker环境,适合学习的时候debug源码。
同样已MysqlE2elTCase为例。代码实现原理等同于sql-clinet模式。使用代码启动容器并执行相关环境命令。
错误解决方案:
1、mysqlw未设置时区,具体方法自行搜索2、mac M1机器会出现如下错误
需要添加以下依赖
<dependency><groupId>com.github.docker-java</groupId><artifactId>docker-java-api</artifactId><version>3.2.8</version><scope>test</scope></dependency><dependency><groupId>com.github.docker-java</groupId><artifactId>docker-java-transport-zerodep</artifactId><version>3.2.8</version><scope>test</scope></dependency>
3、arm芯片机器需要更改FlinkContainerTestEnvironment
中mysql版本为8.0,低版本官方镜像不支持arm架构,或者使用其他非官方镜像
flink-cdc初体验相关推荐
- Flink大数据实时计算系列-案例初体验:HotPages
Flink大数据实时计算系列-案例初体验:HotPages 目录 HotPages代码 输入日志 运行结果 HotPages代码 /*** Copyright (c) 2018-2028 尚硅谷 Al ...
- Flink原理解析50篇(四)-基于 Flink CDC 打通数据实时入湖
在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎Flink和数据湖Apache Iceberg两种技术,来解决业务数据实时入湖相关的问题. 0 ...
- 基于Flink CDC打通数据实时入湖
作者 | 数据社 责编 | 欧阳姝黎 在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎 Flink 和数据湖 Apache Ice ...
- Flink CDC 系列 - Flink CDC 如何简化实时数据入湖入仓
摘要:本文整理自伍翀 (云邪).徐榜江 (雪尽) 在 Flink Forward Asia 2021 的分享,该分享以 5 个章节详细介绍如何使用 Flink CDC 来简化实时数据的入湖入仓, 文章 ...
- Apache Doris 整合 FLINK CDC + Iceberg 构建实时湖仓一体的联邦查询
1.概览 这篇教程将展示如何使用 Flink CDC + Iceberg + Doris 构建实时湖仓一体的联邦查询分析,Doris 1.1版本提供了Iceberg的支持,本文主要展示Doris和Ic ...
- StarRocks X Flink CDC,打造端到端实时链路
实时数仓建设背景 实时数仓需求 随着互联网行业的飞速发展,企业业务种类变得越来越多,数据量也变得越来越大.以 Apache Hadoop 生态为核心的数据看板业务一般只能实现离线的业务.在部分领域,数 ...
- flink 开发平台Dinky 构建 Flink CDC 整库入仓入湖
原文:http://www.senlt.cn/article/866753893.html 摘要:本文介绍了如何使用 Dinky 实时计算平台构建 Flink CDC 整库入仓入湖.内容包括: 背景 ...
- Flink CDC 在京东的探索与实践
摘要:本文整理自京东资深技术专家韩飞,在 Flink Forward Asia 2022 数据集成专场的分享.本篇内容主要分为四个部分: 京东自研 CDC 介绍 京东场景的 Flink CDC 优化 ...
- 苹果电脑安装python3密码_mac系统安装Python3初体验
前沿 对于iOS开发不要随便拆卸系统自带的Python,因为有很多 library 还是使用 Python2.7. 1 安装Xcode 1.1 App Store 搜索Xcode 并安装 1.2 安装 ...
最新文章
- matlab fft例程,c++ FFTW与Matlab FFT
- 一脸懵逼学习Storm的搭建--(一个开源的分布式实时计算系统)
- Serverless 实战 —— Funcraft + OSS + ROS 进行 CI/CD
- 在企业内部使用openssl创建私有CA
- 内置模块/核心模块 (自带的) --fs 文件系统
- 浅析Kubernetes Pod重启策略和健康检查
- Android 系统(38)---Android抓取各种log的方法
- SAP License:物料类型的划分标准
- 学习《TCP/IP详解 卷一协议》第九章的一点心得
- IOS-Run loop学习总结
- java ftl 模板 输出list_关于在freemarker模板中遍历数据模型ListJavaBean的经验
- Add Juniper SRX Cluster into JunOS Space 16.1 Security Director
- 人脸识别活体检测测试案例
- Win7原版镜像注入USB驱动
- 数据统计基础之F分布及其应用
- 基于微信公众平台API的菜谱小程序 的设计与实现
- 如何介绍自己测试过的项目
- 基于python3在windows下安装gmpy2
- VScode 自定义主题颜色
- URL地址中的中文乱码问题的解决