Flink-CDC监控mysql的好处在于,在项目中无需向canal和maxwell那样要先将数据先存入kafka,而是直接将数据拉取到实时流当中。

Flink - API方式监控

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;/*** @author zyj* @Date 2022/1/7 11:49* 自定义序列化器*/
public class FlinkCDC01_CustomSchema {public static void main(String[] args) throws Exception {// 创建flink环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(1);// 开启检查点,5秒插入一次env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);// 建立检查点超时时间为1分钟env.getCheckpointConfig().setCheckpointTimeout(60000L);// 检查点重启次数和重启间隔env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2000L));// 设置job取消后,检查点是否保留env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 设置状态后端env.setStateBackend(new FsStateBackend("hdfs:///flinkCDC"));// 设置访问HDFS用户名System.setProperty("HADOOP_USER_NAME", "zyj");DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder().hostname("hadoop110") // mysql主机名.port(3306) // mysql 端口号.databaseList("project_realtime") // 要监控的数据库名,可写多个.tableList("project_realtime.t_user") // 要监控的数据表,数据库.数据表方式.username("root") // mysql用户名.password("root") // mysql登录密码.startupOptions(StartupOptions.initial()) // 从最开始的binlog读取数据.deserializer(new MySchema()) // CDC 输出的文件格式.build();env.addSource(sourceFunction).print();env.execute();}
}class MySchema implements DebeziumDeserializationSchema<String> {@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<String> out) throws Exception {Struct valueStruct = (Struct) sourceRecord.value();Struct sourceStruct = valueStruct.getStruct("source");// 获取数据库的名称String database = sourceStruct.getString("db");// 获取表名String table = sourceStruct.getString("table");// 获取数据库操作的类型String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();if (type.equals("create")) {type = "insert";}JSONObject jsonObj = new JSONObject();jsonObj.put("database", database);jsonObj.put("table", table);jsonObj.put("type", type);// 获取影响的数据dataStruct afterStruct = valueStruct.getStruct("after");JSONObject dataJsonObj = new JSONObject();if (afterStruct != null) {for (Field field : afterStruct.schema().fields()) {String fieldName = field.name();Object fieldValue = afterStruct.get(field);dataJsonObj.put(fieldName, fieldValue);}}// 如果是delete操作, data为{}, 空, 防止后续调用产生空指针异常jsonObj.put("data", dataJsonObj);// 向下游传递数据out.collect(jsonObj.toJSONString());}@Overridepublic TypeInformation<String> getProducedType() {return TypeInformation.of(String.class);}
}

FLink - SQL方式监控

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @author zyj* @Date 2022/1/8 0:32* 通过Flink-CDC动态读取MySQL表中的数据 SQL方式*/
public class FlinkCDC02_SQL {public static void main(String[] args) throws Exception {//TODO 1.准备环境//1.1 流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 1.2 表执行环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 1.3 设置并行度env.setParallelism(1);//TODO 2.创建动态表tableEnv.executeSql("CREATE TABLE user_info (" +"  id INT," +"  name STRING," +"  age INT" +") WITH (" +"  'connector' = 'mysql-cdc'," +"  'hostname' = 'hadoop110'," +"  'port' = '3306'," +"  'username' = 'root'," +"  'password' = 'root'," +"  'database-name' = 'project_realtime'," +"  'table-name' = 't_user'" +")");tableEnv.executeSql("select * from user_info").print();env.execute();}
}

pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.atguigu.gmall</groupId><artifactId>gmall0224-cdc</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.0.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.48</version></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.2.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>1.12.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>

Flink-CDC 动态监控 mysql 数据表相关推荐

  1. Flink系列之:Flink CDC深入了解MySQL CDC连接器

    Flink系列之:Flink CDC深入了解MySQL CDC连接器 一.增量快照特性 1.增量快照读取 2.并发读取 3.全量阶段支持 checkpoint 4.无锁算法 5.MySQL高可用性支持 ...

  2. Flink CDC 系列 - Flink CDC 如何简化实时数据入湖入仓

    摘要:本文整理自伍翀 (云邪).徐榜江 (雪尽) 在 Flink Forward Asia 2021 的分享,该分享以 5 个章节详细介绍如何使用 Flink CDC 来简化实时数据的入湖入仓, 文章 ...

  3. Flink CDC 系列 | 构建 MySQL 和 Postgres 上的 Streaming ETL

    摘要:本篇教程将展示如何基于 Flink CDC 快速构建 MySQL 和 Postgres 的流式 ETL. Flink-CDC 项目地址: https://github.com/ververica ...

  4. 编写Scala代码,使用Spark讲Mysql数据表中的数据抽取到Hive的ODS层

    编写Scala代码,使用Spark讲Mysql数据表中的数据抽取到Hive的ODS层 抽取MySQL的metast库中Production表的全量数据进入Hive的ods库中表production,字 ...

  5. django存入mysql数据库_django如何存数据到一个mysql数据表里面

    让我们聊聊这个话题, django如何存数据至mysql数据表里面,你会用什么方法?正常情况下,我们form逻辑处理后,直接form.save(),是,这个方法没毛病:但有没有其他的方法呢?假如我们有 ...

  6. mysql数据表数据丢失6_MYSQL数据表损坏的原因分析和修复方法小结

    MYSQL数据表损坏的原因分析和修复方法小结 1.表损坏的原因分析 以下原因是导致mysql 表毁坏的常见原因: 1. 服务器突然断电导致数据文件损坏. 2. 强制关机,没有先关闭mysql 服务. ...

  7. php转换excel文件怎么打开方式,用PHP将mysql数据表转换为excel文件格式_php

    详细内容如下: $DB_Server = "localhost"; $DB_Username = "mydowns"; $DB_Password = " ...

  8. linux下查看mysql数据库的字段类型_系统运维|[小白技巧]如何在Linux上检查MySQL数据表的存储引擎类型...

    提问: 我想要知道我的MySQL数据库是MyISAM还是Innodb类型.我该如何检查MySQL数据库表的类型? MySQl主要使用两种存储引擎:MyISAM 和 Innodb.MyISAM是非事务的 ...

  9. mysql 表关系传递,mysql数据表之间数据相互传递的问题

    mysql数据表之间数据相互传递的问题 近日,某操作需要从一个表(表A)读取一些数据,然后直接写入另外一个表(表B)(相当于一个备份) 平时没啥问题,某天测试MM将一些乱七八糟的数据写入后就出问题了, ...

最新文章

  1. 【对讲机的那点事】关于对讲机锂电池你了解多少?
  2. 设计模式—责任链模式
  3. boot loader能全部用C程序编写吗
  4. 池州天气预报软件测试,池州天气预报15天
  5. Linux Ubuntu 16.04系统下可用的Windows应用
  6. 《大数据》期刊“农业大数据”专刊征文通知
  7. 珠海 第十届亚洲机器人锦标赛_滨和中学“VEX”团队斩获粤港澳机器人大赛多个大奖!...
  8. 怎么估算空间利用率?新研发传感器分分钟搞定!
  9. (1)关于File类你知道多少
  10. pstate0 vid数值意义_光行差成因和物理意义新解及其验证方法
  11. 计算机网络并行传输和串行传输,网线RJ45是并行传输还是串行
  12. 一名职业3D建模师的学习经历,月薪28K依然焦虑
  13. 【Java案例】用户登录注册
  14. 汉语写代码编程与为什么很多软件有简体中文版
  15. [转载] 中华典故故事(孙刚)——26 叫了王承恩
  16. 蚂蚁树林小游戏玩法介绍
  17. #论文投稿必备#如何解决阅读文献的三大问题:坐不住,记不住,想不开
  18. 基于蘑菇博客建设个人SNS网站(二)--后台框架搭建
  19. 21精美ContactPages联系我们页面设计欣赏
  20. 微信小程序—查询快递

热门文章

  1. linux awk 字符串匹配,awk匹配字符串
  2. 利用go语言创建钱包并遍历钱包(wallet)
  3. sql 内连接,左连接,右连接,全连接
  4. 小程序运营推广的方法
  5. eclipse配置python环境后啷个写java类_在Eclipse上配置Python开发环境
  6. java数字猜大小游戏_Java之数字猜大小
  7. Twilio Inc.(TWLO)2020年第三季度收益电话会议记录
  8. 地图学的基础知识_天文坐标系_大地坐标系_地心坐标系及其相关概念
  9. 微软Hyper-V虚拟机复制实现双机备份过程
  10. Grafana 汉化