flink-cdc初体验

产品介绍

flink-cdc是一款专用于数据库增量数据监控的插件。基于flink计算引擎提供的高性能,高可用性,高扩展性的数据监控功能。当前flink-cdc支持读取例如mysql,oracle,mongodb,tidb,postgres等常见的数据库类型,具体支持版本见官方 Support Version。
之前大多数公司会单独部署一些数据同步服务,例如当前flink-cdc的内核产品 Debezium,阿里开源的canal,开源软件datalink(目前我司就是基于这个产品实现的各个业务数据库的binlog监控)。flink-cdc出现之后,用户可以脱离这些繁重的中间产品,使用基于flink部署轻量化的同步服务。更可以对同步的数据再进行进一步加工实现各种业务需求。
公司环境flink-cdc运行依赖大数据基础组件 yarn或者k8s部署flink任务,但是个人体验最低要求仅需要下载flink客户端运行即可。
下面基于mysql-cdc 介绍三种任务运行方式

  • sql-client模式
  • docker部署模式
  • 源码运行模式

sql-client模式

使用flink自带的sql客户端,可以便捷无代码部署任务。
例如需要将一张业务mysql表同步至其他存储介质(hive,es,hbase等支持sql-connector组件)
建两张表分别映射源表和目标表,直接提交insert语句即可生成flink-job。

mysql环境

如果本地没有测试环境可以使用docker部署 当前镜像架构为arm64,使用于mac M芯片,windows x86机器需使用官方镜像

docker pull gsw/mysql-cdc:5.7

启动镜像后初始化表环境

CREATE DATABASE mydb;
USE mydb;
--创建源表
CREATE TABLE `orders` (`order_id` int(11) NOT NULL AUTO_INCREMENT,`order_date` datetime NOT NULL,`customer_name` varchar(255) NOT NULL,`price` decimal(10,5) NOT NULL,`product_id` int(11) NOT NULL,`order_status` tinyint(1) NOT NULL,PRIMARY KEY (`order_id`)
);
--创建sink表
CREATE TABLE `orders_sink` (`order_id` int(11) NOT NULL AUTO_INCREMENT,`order_date` datetime NOT NULL,`customer_name` varchar(255) NOT NULL,`price` decimal(10,5) NOT NULL,`product_id` int(11) NOT NULL,`order_status` tinyint(1) NOT NULL,PRIMARY KEY (`order_id`)
);
--源表插入测试数据
INSERT INTO mydb.orders (order_id, order_date, customer_name, price, product_id, order_status) VALUES
(10001, '2020-07-30 10:08:22', 'Jark', 50.50000, 102, 0),
(10002, '2020-07-30 10:11:09', 'Sally', 15.00000, 105, 0),
(10003, '2020-07-30 12:00:30', 'Edward', 25.25000, 106, 0),
(10004, '2020-07-30 15:22:00', 'Jark', 29.71000, 106, 1);

flink环境

  1. 下载flink客户端 flink-1.14.3并解压
  2. flink-connector-jdbc_2.11-1.14.3.jar flink连接mysql需要
    flink-sql-connector-mysql-cdc-2.2.1.jar flink-cdc读取binlog变更需要
    mysql-connect-java.jar jdbc基础包
    将下载的三个jar放入FLINK_HOME/lib目录下
  3. 启动单机集群
sh ./bin/start-cluster.sh
  1. 打开链接http://localhost:8081进入flink作业管理页面,查看集群是否正常启动

任务部署

进入bin目录下,启动sql客户端

./sql-client.sh

建立mysql表映射关系

注意mysql字段与flink sql字段的转换关系 data-type-mapping

--源表映射指定mysql-cdc连接,以便获取binlog变更,server-id指定为mysql配置的id
CREATE TABLE `cdc_orders` (`order_id` int NOT NULL,`order_date` TIMESTAMP ,`customer_name` STRING,`price` decimal(10,5),`product_id` INT,`order_status` BOOLEAN,PRIMARY KEY (`order_id`) NOT ENFORCED
) with ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'root','password' = '123456','database-name' = 'mydb','table-name' = 'orders','server-id' = '1'
);
--sink表映射指定jdbc连接
CREATE TABLE `cdc_orders_sink` (`order_id` int NOT NULL,`order_date` TIMESTAMP ,`customer_name` STRING,`price` decimal(10,5),`product_id` INT,`order_status` BOOLEAN,PRIMARY KEY (`order_id`) NOT ENFORCED
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/mydb?serverTimezone=UTC','username' = 'root','password' = '123456','table-name' = 'orders_sink'
);
--提交数据同步任务从orders向orders_sink同步数据
insert into orders_sink select * from orders;


在mysql中查看sink表数据

docker部署模式

强烈推荐使用docker desktop运行docker环境。MAC机器支持一键安装相关依赖环境,window环境需要在docker桌面版安装完成后手动安装linux内核。
windows安装注意一定要安装WSL2内核,如果不指定版本默认安装的是WSL,当然如果不小心装错了,也可以进行版本转义。具体安装步骤见官方文档windows安装WSL2
docker环境准备完成,就可以参照flink-cdc官方文档体验基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL

源码运行模式

源码模式同样依赖docker环境,适合学习的时候debug源码。

同样已MysqlE2elTCase为例。代码实现原理等同于sql-clinet模式。使用代码启动容器并执行相关环境命令。
错误解决方案:
1、mysqlw未设置时区,具体方法自行搜索2、mac M1机器会出现如下错误
需要添加以下依赖

        <dependency><groupId>com.github.docker-java</groupId><artifactId>docker-java-api</artifactId><version>3.2.8</version><scope>test</scope></dependency><dependency><groupId>com.github.docker-java</groupId><artifactId>docker-java-transport-zerodep</artifactId><version>3.2.8</version><scope>test</scope></dependency>

3、arm芯片机器需要更改FlinkContainerTestEnvironment中mysql版本为8.0,低版本官方镜像不支持arm架构,或者使用其他非官方镜像

flink-cdc初体验相关推荐

  1. Flink大数据实时计算系列-案例初体验:HotPages

    Flink大数据实时计算系列-案例初体验:HotPages 目录 HotPages代码 输入日志 运行结果 HotPages代码 /*** Copyright (c) 2018-2028 尚硅谷 Al ...

  2. Flink原理解析50篇(四)-基于 Flink CDC 打通数据实时入湖

    在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎Flink和数据湖Apache Iceberg两种技术,来解决业务数据实时入湖相关的问题. 0 ...

  3. 基于Flink CDC打通数据实时入湖

    作者 | 数据社       责编 | 欧阳姝黎 在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎 Flink 和数据湖 Apache Ice ...

  4. Flink CDC 系列 - Flink CDC 如何简化实时数据入湖入仓

    摘要:本文整理自伍翀 (云邪).徐榜江 (雪尽) 在 Flink Forward Asia 2021 的分享,该分享以 5 个章节详细介绍如何使用 Flink CDC 来简化实时数据的入湖入仓, 文章 ...

  5. Apache Doris 整合 FLINK CDC + Iceberg 构建实时湖仓一体的联邦查询

    1.概览 这篇教程将展示如何使用 Flink CDC + Iceberg + Doris 构建实时湖仓一体的联邦查询分析,Doris 1.1版本提供了Iceberg的支持,本文主要展示Doris和Ic ...

  6. StarRocks X Flink CDC,打造端到端实时链路

    实时数仓建设背景 实时数仓需求 随着互联网行业的飞速发展,企业业务种类变得越来越多,数据量也变得越来越大.以 Apache Hadoop 生态为核心的数据看板业务一般只能实现离线的业务.在部分领域,数 ...

  7. flink 开发平台Dinky 构建 Flink CDC 整库入仓入湖

    原文:http://www.senlt.cn/article/866753893.html 摘要:本文介绍了如何使用 Dinky 实时计算平台构建 Flink CDC 整库入仓入湖.内容包括: 背景 ...

  8. Flink CDC 在京东的探索与实践

    摘要:本文整理自京东资深技术专家韩飞,在 Flink Forward Asia 2022 数据集成专场的分享.本篇内容主要分为四个部分: 京东自研 CDC 介绍 京东场景的 Flink CDC 优化 ...

  9. 苹果电脑安装python3密码_mac系统安装Python3初体验

    前沿 对于iOS开发不要随便拆卸系统自带的Python,因为有很多 library 还是使用 Python2.7. 1 安装Xcode 1.1 App Store 搜索Xcode 并安装 1.2 安装 ...

最新文章

  1. matlab fft例程,c++ FFTW与Matlab FFT
  2. 一脸懵逼学习Storm的搭建--(一个开源的分布式实时计算系统)
  3. Serverless 实战 —— Funcraft + OSS + ROS 进行 CI/CD
  4. 在企业内部使用openssl创建私有CA
  5. 内置模块/核心模块 (自带的) --fs 文件系统
  6. 浅析Kubernetes Pod重启策略和健康检查
  7. Android 系统(38)---Android抓取各种log的方法
  8. SAP License:物料类型的划分标准
  9. 学习《TCP/IP详解 卷一协议》第九章的一点心得
  10. IOS-Run loop学习总结
  11. java ftl 模板 输出list_关于在freemarker模板中遍历数据模型ListJavaBean的经验
  12. Add Juniper SRX Cluster into JunOS Space 16.1 Security Director
  13. 人脸识别活体检测测试案例
  14. Win7原版镜像注入USB驱动
  15. 数据统计基础之F分布及其应用
  16. 基于微信公众平台API的菜谱小程序 的设计与实现
  17. 如何介绍自己测试过的项目
  18. 基于python3在windows下安装gmpy2
  19. VScode 自定义主题颜色
  20. URL地址中的中文乱码问题的解决

热门文章

  1. cocos2d-JS engine--cc模块 概述
  2. Unity 弹球算法
  3. django+ajax
  4. 【论文翻译】Fast R-CNN
  5. 新一代数据库技术论坛-3306π社区广州站
  6. 电脑反应速度慢的原因及解决方法
  7. 售后派工单,这样子做,高效又便捷
  8. 参加中国大学生计算机设计大赛心得
  9. TCP/UDP(网络编程)
  10. android 视频通话开启呼叫等待后,来第三方的视频通话,接通后通话时间一直显示为0,过几秒之后视频通话自动挂断