• GreatSQL社区原创内容未经授权不得随意使用,转载请联系小编并注明来源。
  • GreatSQL是MySQL的国产分支版本,使用上与MySQL一致。

    一、Debezium介绍

    摘自官网:

    Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.

简单理解就是Debezium可以捕获数据库中所有行级的数据变化并包装成事件流顺序输出。

二、基本使用

下面以MySQL为例介绍Debezium的基本使用。

1. MySQL的准备工作

  1. 准备一个MySQL用户并且拥有相应权限,像这样:

    CREATE USER 'dbz'@'%' IDENTIFIED BY 'dbzpwd';
    

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'dbz' IDENTIFIED BY 'dbzpwd';


2. 检查MySQL是否开启`log-bin`
```sql
SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin';-- If the following error occurs: The 'INFORMATION_SCHEMA.GLOBAL_VARIABLES' feature is disabled...
-- please execute the given SQL again after execute this SQL: set global show_compatibility_56=on;

如果是OFF则需要修改MySQL配置文件,类似下面这样:

server-id         = 223344        #必须有
log_bin           = mysql-bin    #log_bin的值是binlog文件序列的基本名称
binlog_format     = ROW                #必须是ROW
binlog_row_image  = FULL            #必须是FULL
expire_logs_days  = 10                #依据实际情况而定
  1. 准备数据库&表

    create database inventory;
    create table inventory.a (id bigint primary key auto_increment, name varchar(32));
    insert into inventory.a values (null, 'n1'),(null, 'n2'),(null, 'n3');

2. 编写程序

2.1. 工程依赖(Maven)

pom.xml

<dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>${version.debezium}</version>
</dependency>
<dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>${version.debezium}</version>
</dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>${version.debezium}</version>
</dependency>

目前Debezium最新稳定版本为:1.9.5.Final

2.2. 准备数据库&表

create database inventory;
create table inventory.a (id bigint primary key, name varchar(32));
insert into inventory.a values (1, 'n1'),(2, 'n2'),(3, 'n3');

2.3. 代码编写

package com.greatdb.dbzdemo;import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;/*** @author wang.jianwen* @version 1.0* @date 2022/07/29*/
public class DebeziumTest {private static DebeziumEngine<ChangeEvent<String, String>> engine;public static void main(String[] args) throws Exception {final Properties props = new Properties();props.setProperty("name", "dbz-engine");props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");//offset config begin - 使用文件来存储已处理的binlog偏移量props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");props.setProperty("offset.storage.file.filename", "/tmp/dbz/storage/mysql_offsets.dat");props.setProperty("offset.flush.interval.ms", "0");//offset config endprops.setProperty("database.server.name", "mysql-connector");props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");props.setProperty("database.history.file.filename", "/tmp/dbz/storage/mysql_dbhistory.txt");props.setProperty("database.server.id", "122112");    //需要与MySQL的server-id不同props.setProperty("database.hostname", "tmg");props.setProperty("database.port", "3306");props.setProperty("database.user", "mysqluser");props.setProperty("database.password", "mysqlpw");props.setProperty("database.include.list", "inventory");//要捕获的数据库名props.setProperty("table.include.list", "inventory.a");//要捕获的数据表props.setProperty("snapshot.mode", "initial");//全量+增量// 使用上述配置创建Debezium引擎,输出样式为Json字符串格式engine = DebeziumEngine.create(Json.class).using(props).notifying(record -> {System.out.println(record);//输出到控制台}).using((success, message, error) -> {if (error != null) {// 报错回调System.out.println("------------error, message:" + message + "exception:" + error);}closeEngine(engine);}).build();ExecutorService executor = Executors.newSingleThreadExecutor();executor.execute(engine);addShutdownHook(engine);awaitTermination(executor);System.out.println("------------main finished.");}private static void closeEngine(DebeziumEngine<ChangeEvent<String, String>> engine) {try {engine.close();} catch (IOException ignored) {}}private static void addShutdownHook(DebeziumEngine<ChangeEvent<String, String>> engine) {Runtime.getRuntime().addShutdownHook(new Thread(() -> closeEngine(engine)));}private static void awaitTermination(ExecutorService executor) {if (executor != null) {try {executor.shutdown();while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}
}

3. 测试

程序跑起来后,可以看到控制台输出:

...(省略)
EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":1}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":1,"name":"n1"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005186,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005191,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=1,name=n1},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005186,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005191}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":2}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"n2"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005195,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=2,name=n2},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005195,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":3}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":3,"name":"n3"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005196,"snapshot":"last","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=3,name=n3},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005196,snapshot=last,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
...(省略)

可以看到全量的数据已经输出,关键的数据如下:

..."payload":{"before":null,"after":{"id":1,"name":"n1"}..."op":"r"...
..."payload":{"before":null,"after":{"id":2,"name":"n2"}..."op":"r"...
..."payload":{"before":null,"after":{"id":3,"name":"n3"}..."op":"r"...
  • 接下来新增一条数据:

    insert into inventory.a values (4, 'n4');

    控制台输出:

    ..."payload":{"before":null,"after":{"id":4,"name":"n4"}..."op":"c"...
  • 修改一条数据:

    update inventory.a set name = 'n4-upd' where id = 4;

    控制台输出:

    ..."payload":{"before":{"id":4,"name":"n4"},"after":{"id":4,"name":"n4-upd"}..."op":"u"...
  • 删除一条数据:

    delete from inventory.a where id = 1;

    控制台输出:

    ..."payload":{"before":{"id":1,"name":"n1"},"after":null..."op":"d"...

    三、总结

    本文以MySQL为例介绍了Debezium在代码中基本使用流程,对MySQL的数据进行常见的增删改操作,Debezium将捕获这些数据行的变化,并记录了数据行变化前后的数据,并对外提供事件流,外部可以获取并对事件进行相应处理。

参考:https://debezium.io/documentation/reference/1.8/index.html

Debezium的基本使用(以MySQL为例)相关推荐

  1. Spark _24 _读取JDBC中的数据创建DataFrame/DataSet(MySql为例)(三)

    两种方式创建DataSet 现在数据库中创建表不能给插入少量数据. javaapi: package SparkSql;import org.apache.spark.SparkConf; impor ...

  2. 数据库水平拆分和垂直拆分区别(以mysql为例)

    数据库水平拆分和垂直拆分区别(以mysql为例) 数据库水平拆分和垂直拆分区别(以mysql为例) 案例: 简单购物系统暂设涉及如下表: 1.产品表(数据量10w,稳定) 2.订单表(数据量200w, ...

  3. mysql sql语句 参数化_C#参数化执行SQL语句,防止破绽攻击本文以MySql为例【20151108非查询操作】_mysql...

    C#参数化执行SQL语句,防止漏洞攻击本文以mysql为例[20151108非查询操作] 为什么要参数化执行SQL语句呢? 一个作用就是可以防止用户注入漏洞. 简单举个列子吧. 比如账号密码登入,如果 ...

  4. 一下子搞懂JDBC,看这篇就够了--以MySQL为例。

    一下子搞懂JDBC,看这篇就够了–以MySQL为例. 文章目录 一下子搞懂JDBC,看这篇就够了--以MySQL为例. 一.什么是JDBC? 二.JDBC的使用步骤 三.jdbc进阶--上述各个类或接 ...

  5. 【SQL】以mysql为例系统学习DQL理论知识

    [SQL]以mysql为例系统学习DQL理论知识 1.思维导图-知识体系 2.数据库相关操作-查询 2.1.单一select简单查询 2.2.复杂数据库查询(重点) 2.3.单一select的SQL语 ...

  6. php单例模式实现对象只被创建一次 mysql单例操作类

    这是我在php面试题中遇到的一道试题,单例模式按字面来看就是某一个类只有一个实例,这样做的好处还是很大的,比如说数据库的连接,我们只需要实例化一次,不需要每次都去new了,这样极大的降低了资源的耗费. ...

  7. 通过JDBC进行简单的增删改查(以MySQL为例)

    目录 前言:什么是JDBC 一.准备工作(一):MySQL安装配置和基础学习 二.准备工作(二):下载数据库对应的jar包并导入 三.JDBC基本操作 (1)定义记录的类(可选) (2)连接的获取 ( ...

  8. Linux下如何查找软件安装路径(mysql为例)

    时间长了我们可能忘记软件的安装目录,通过以下方法来获得路径 1 通过rpm查看软件是否安装 rpm -qa | grep mysql 已安装的话会例举出mysql的组件信息,如上图 2 接着根据 rp ...

  9. jdbc测试类代码mysql_通过JDBC进行简单的增删改查(以MySQL为例)

    JDBC基本操作 下面的所有方法和数据成员都在public class JDBCOperation内部. (1)定义记录的类(可选) 这样做主要是为了便于操作和接口定义,是非必须的. static c ...

最新文章

  1. Nature:植物叶际微生物组稳态维持机制
  2. python脚本如何监听终止进程行为,如何通过脚本名获取pid
  3. 考前自学系列·计算机组成原理·计算机的硬件组成及其功能
  4. jquery源码解析:代码结构分析
  5. assubclass_Java类class asSubclass()方法及示例
  6. 前端js获取图片大小 扩展名_前端 JS 获取 Image 图像 宽高 尺寸
  7. 华为汪涛:走向智能世界2030,无线网络未来十年十大产业趋势
  8. jeecgboot配置文件_Jeecg-Boot 技术文档
  9. 【Linux】Linux下 CURL如何发送http请求
  10. C++--第21课 - 类模板 - 上
  11. Dell服务器如何重装操作系统 windows server
  12. 关于数据迁移:解决kettle中mysql的数据是tinyint的kettle查出来后变成boolean问题
  13. ios 点生成线路 百度地图_iOS百度地图开发之路径规划
  14. chm文档已取消到该网页的导航的解决方法
  15. 广东理工学院计算机组成原理,20年广东理工学院成人高考期末考试 计算机组成原理 复习资料(7页)-原创力文档...
  16. Java中的this关键字
  17. 绥芬河java_那段岁月
  18. bim 模型web页面展示_一种基于BIM模型的Web端轻量化展示方法与流程
  19. 开源OA协同办公平台搭建教程丨服务器端命令:数据导入导出及配置
  20. OpenCV Java入门三 Mat的基本操作

热门文章

  1. npm i 安装插件报:permission denied, symlink
  2. 2021牛客练习赛90
  3. android app报告,知乎APP用户体验报告
  4. c语言两个浮点数相乘,两个浮点变量相乘结果为什么不精确
  5. 麦凯隆全屋分质供水 保障家庭饮用水安全与健康
  6. 09.python常用数据类型—字典
  7. untiy 串口通信
  8. Pandas入门超详细教程,看了超简单
  9. 用笔记本做路由器共享4G流量
  10. 正则只保留括号里的内容