接上一篇文章 用上cdc
基于flink-scala 1.14

当前cdc最新版本 2.2.1

<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.1</version></dependency>

看官方文档是和flink 13.5搭配使用,现在使用flink 1.14会有一些问题 原因是flink 1.14引用的guava版本和cdc包中的guava版本不一致,但是artifactId 相同,所以后一个会覆盖前一个导致flink找不到guava报错。
因此,要做一些改造
显示引入 cdc用的guava版本

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-guava</artifactId><version>18.0-13.0</version></dependency>

用maven修改artifactId名,在本地重新安装该版本,然后引入

mvn install:install-file -Dfile=flink-shaded-guava-18.0-13.0.jar -DgroupId=org.apache.flink -DartifactId=flink-shaded-guava18 -Dversion=18.0-13.0 -Dpackaging=jar
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-guava18</artifactId><version>18.0-13.0</version></dependency>

配置完坐标后,在项目lib中删除cdc引入的guava jar包 并手动引入flink 1.14依赖的guava jar包(包路径名与cdc依赖的不一样 可以引入) flink-shaded-guava-30.1.1-jre-14.0

前置工作完成

完整maven坐标

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.2</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>demo</artifactId><version>0.0.1-SNAPSHOT</version><name>demo</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.21</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.12.7</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>2.12.7</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.12</artifactId><version>1.14.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>1.14.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.14.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.14.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.14.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>1.14.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.5</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-guava18</artifactId><version>18.0-13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>1.14.5</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><id>scala-compile-first</id><goals><goal>compile</goal></goals><configuration><includes><include>**/*.scala</include></includes></configuration></execution></executions></plugin></plugins></build></project>

使用cdc

package demoimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject Test {private val host_url = "127.0.0.1"private val port = 3306private val db_name = "test"private val table_input = "salary_table"private val table_output = "salary_count"private val user = "root"private val password = "root"def flinkcdcTest:Unit = {val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()env.setParallelism(1)val tableEnv = StreamTableEnvironment.create(env)tableEnv.executeSql(s"""|create table mysqlInput (| name string,| salary double,| PRIMARY KEY (`name`) NOT ENFORCED| ) with (|   'connector' = 'mysql-cdc',|   'hostname' = '$host_url',|   'port' = '$port',|   'username' = '$user',|   'password' = '$password',|   'database-name' = '$db_name',|   'table-name' = '$table_input'| )""".stripMargin)tableEnv.executeSql(s"""|create table mysqlOutput (| salary double,| cnt bigint not null,| PRIMARY KEY (`salary`) NOT ENFORCED| ) with (|   'connector' = 'jdbc',|   'url' = 'jdbc:mysql://$host_url:$port/$db_name',|   'table-name' = '$table_output',|   'username' = '$user',|   'password' = '$password',|   'sink.buffer-flush.max-rows' = '0'| )""".stripMargin)val rltTable = tableEnv.sqlQuery("""|select salary,count(1) as cnt|from mysqlInput|group by salary""".stripMargin)rltTable.executeInsert("mysqlOutput")}}

通过 localhost:8081 访问flink web ui

flink-cdc 使用相关推荐

  1. Flink CDC 2.0 正式发布,详解核心改进

    简介:Flink CDC 2.0.0 版本于 8 月 10 日正式发布,点击了解详情- 本文由社区志愿者陈政羽整理,内容来源自阿里巴巴高级开发工程师徐榜江 (雪尽) 7 月 10 日在北京站 Flin ...

  2. Flink原理解析50篇(四)-基于 Flink CDC 打通数据实时入湖

    在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎Flink和数据湖Apache Iceberg两种技术,来解决业务数据实时入湖相关的问题. 0 ...

  3. 37 手游基于 Flink CDC + Hudi 湖仓一体方案实践

    简介: 介绍了 37 手游为何选择 Flink 作为计算引擎,并如何基于 Flink CDC + Hudi 构建新的湖仓一体方案. 本文作者是 37 手游大数据开发徐润柏,介绍了 37 手游为何选择 ...

  4. Flink CDC 实时同步mysql

    前言 在实际开发中,需要做数据同步的场景是非常多的,比如不同的应用之间不想直接通过RPC的方式进行数据交互,或者说下游应用需要检测来自上游应用的某些业务指标数据的变化时,这些都可以考虑使用数据同步的方 ...

  5. 基于Flink CDC打通数据实时入湖

    作者 | 数据社       责编 | 欧阳姝黎 在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎 Flink 和数据湖 Apache Ice ...

  6. Flink CDC 新一代数据集成框架

    前言: 主要讲解了技术原理,入门与生产实践,主要功能:全增量一体化数据集成.实时数据入库入仓.最详细的教程.Flink CDC 是Apache Flink的一个重要组件,主要使用了CDC技术从各种数据 ...

  7. flink cdc 2.2.1 mysql connector

    报错 java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent ...

  8. XTransfer技术专家亮相Flink CDC Meetup

    背景信息:Flink CDC 是实时数据集成框架的开源代表,具有全增量一体化.无锁读取.并发读取.分布式架构等技术优势,在开源社区中非常受欢迎. 为促进 Flink CDC 技术的交流和发展,社区于 ...

  9. Flink CDC 将MySQL的数据写入Hudi实践

    Flink CDC + Hudi实践 一.依赖关系 1.Maven依赖 2.SQL客户端JAR 二.设置MySQL服务器 1.创建MySQL用户: 2.向用户授予所需的权限: 3.最终确定用户的权限: ...

  10. Flink cdc +doris生产遇到的问题汇总-持续更新

    问题: 我有个表主键是字符串类型 然后cdc去读取的时候 自己split了很久 checkpoint一直显示执行中,我看日志打印是info : checkpoint一直卡在那里 程序一直等待中: 原因 ...

最新文章

  1. 【JavaSE_08】Java中static、继承、重写-思维导图
  2. COM编程入门---转发
  3. mysql版本 hibernate_Mysql 不同版本 说明
  4. linux运行好麻烦,解析用Linux非常困难、必须用命令行、很老又丑陋及无法运行游戏...
  5. Python-WSGI协议如何实现?
  6. 补习系列-springboot-使用assembly进行项目打包
  7. 网站日志分析工具:WebLog Expert Lite
  8. 前端的ajax缓存,解析jquery中的ajax缓存问题
  9. 纸筒制作机器人_趣味STEAM教育:如何用“垃圾”制作机器人?
  10. 【转】Gnuplot基本介绍
  11. 根据经度纬度 筛选距离远近
  12. 学习-Java循环之continue
  13. QPushButton去掉虚线框(焦点框)
  14. vbs介绍与常见整人代码
  15. 2022“杭电杯”中国大学生算法设计超级联赛(7)
  16. 计算机分子模拟聚乙烯,高分子物理虚拟实验讲义
  17. 多边形套索及魔棒工具
  18. excel 快捷换行,去除空白换行符
  19. 打造前端MAC工作站(一)简单系统配置
  20. liunx服务器配置

热门文章

  1. Python编程:函数
  2. windbg调试ACPI ASL Code 实例一则
  3. 可见光通信 调制解调技术 家庭机器人 可见光通信应用 原理及硬件方案
  4. 赚钱项目在家可以赚钱,赶紧来看看吧!
  5. 读书笔记—YES!产品经理(上册)
  6. 2022年半导体硅片行业研究报告
  7. AF Haf tuning <4>
  8. 【香蕉oi】耍望节(数位DP+倍增优化)
  9. 2013年节假日安排公布 春节假期2月9日至15日
  10. 《加州之梦》 (California Dreaming)