本文目标

debezium,简称dbz,伪装为MySQL从库,当主库发生变化后,主库会主动将变化的信息同步到dbz内,dbz将收到的信息转为JSON推送到Kafka内。

安装JDK11

yum -y install java-11-openjdk-devel

解压部署

tar xfz debezium-server-dist-2.0.0.Final.tar.gz

修改配置文件

application.properties

[yinyx@localhost conf]$ cat application.properties
quarkus.http.port=8999
rkus.log.level=INFO
quarkus.log.console.json=falsedebezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0debezium.source.database.hostname=127.0.0.1
debezium.source.database.port=6306
debezium.source.database.user=test
debezium.source.database.password=test
debezium.source.database.server.id=2
debezium.source.database.include.list=testdebezium.source.topic.prefix=yyx
debezium.source.key.converter.schemas.enable=false
debezium.source.value.converter.schemas.enable=false
debezium.source.schema.history.internal.kafka.bootstrap.servers=127.0.0.1:9092
debezium.source.schema.history.internal.kafka.topic=schemahistorydebezium.source.decimal.handling.mode=string
debezium.source.lob.enabled=true
debezium.source.database.history.skip.unparseable.ddl=true
debezium.source.tombstones.on.delete=falsedebezium.sink.type=kafka
debezium.sink.kafka.producer.bootstrap.servers=127.0.0.1:9092
debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializerdebezium.format.key.schemas.enable=false
debezium.format.value.schemas.enable=false[yinyx@localhost conf]$ 

启动

./run.sh

注意先启动kafka

测试

检查topic是否已经创建
[yinyx@localhost bin]$ ./kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092
__consumer_offsets
schemahistory
yinyx
yyx
yyx.test.t1
启动kafka的消费

./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic yyx.test.t1

到MySQL更新t1表的数据

insert update delete 随便整

查看kafka消费,应出现类似如下信息

{“before”:{“f1”:3,“f2”:“cc|33”,“f3”:1670056305000},“after”:{“f1”:3,“f2”:“cc|333”,“f3”:1670056305000},“source”:{“version”:“2.0.0.Final”,“connector”:“mysql”,“name”:“yyx”,“ts_ms”:1670030109000,“snapshot”:“false”,“db”:“test”,“sequence”:null,“table”:“t1”,“server_id”:1,“gtid”:“7bdc8394-71cf-11ed-b2d5-000c293c9462:25”,“file”:“mysql-bin.000002”,“pos”:7907,“row”:0,“thread”:39,“query”:null},“op”:“u”,“ts_ms”:1670031525419,“transaction”:null}

{“before”:{“f1”:2,“f2”:“bb|222”,“f3”:1670002422000},“after”:{“f1”:2,“f2”:“bb|222haha”,“f3”:1670002422000},“source”:{“version”:“2.0.0.Final”,“connector”:“mysql”,“name”:“yyx”,“ts_ms”:1670031487000,“snapshot”:“false”,“db”:“test”,“sequence”:null,“table”:“t1”,“server_id”:1,“gtid”:“7bdc8394-71cf-11ed-b2d5-000c293c9462:27”,“file”:“mysql-bin.000002”,“pos”:8561,“row”:0,“thread”:39,“query”:null},“op”:“u”,“ts_ms”:1670031525422,“transaction”:null}

总结

至此,MySQL的变化,会实时反应到Kafka的JSON数据里面,后续自己开发程序从Kafka接收处理即可。

4 Debezium抽取部署相关推荐

  1. Debezium系列之:Debezium UI部署详细步骤

    Debezium系列之:Debezium UI部署详细步骤 一.安装docker 二.修改docker镜像仓库 三.启动debezium ui命令详解 四.启动debezium ui 五.登陆debe ...

  2. Debezium 抽取oracle数据

    1.环境介绍 操作系统:centos 7.9 jdk版本:11.0.12 kafka版本:2.8.0 Debezium版本:1.6(debezium-connector-oracle-1.6.1.Fi ...

  3. 【硬刚大数据】大数据同步工具之FlinkCDC/Canal/Debezium对比

    欢迎关注博客主页:微信搜:import_bigdata,大数据领域硬核原创作者_王知无(import_bigdata)_CSDN博客 欢迎点赞.收藏.留言 ,欢迎留言交流! 本文由[王知无]原创,首发 ...

  4. CDC工具之Debezium

    前言 在之前的文章中,我们讲到了CDC工具Canal,今天我们来继续聊聊另外一个CDC工具--Debezium.  一.Debezium是什么? Debezium是一组分布式服务,用于捕获数据库中的更 ...

  5. debezium系列之:Kafka Connect

    debezium系列之:Kafka Connect 一.Source和Sink 二.Task和Worker 三.Kafka Connect特性 四.独立模式 1.Source连接器的用法 2.Sink ...

  6. Flink原理解析50篇(四)-基于 Flink CDC 打通数据实时入湖

    在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎Flink和数据湖Apache Iceberg两种技术,来解决业务数据实时入湖相关的问题. 0 ...

  7. 基于Flink CDC打通数据实时入湖

    作者 | 数据社       责编 | 欧阳姝黎 在构建实时数仓的过程中,如何快速.正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎 Flink 和数据湖 Apache Ice ...

  8. DeepDive自动化信息抽取---全网首发DeepDive偷工减料超速部署方式

    文章目录 1. DeepDive 1.1 系统架构 2. 部署DeepDive 2.1 环境准备 2.2 工具人偷工减料超速部署方式 2.2.1 下载相关文件 2.2.2 部署 2.3 官方部署方式 ...

  9. Debezium系列之:安装部署debezium2.0以上版本的详细步骤

    Debezium系列之:安装部署debezium2.0以上版本的详细步骤 一.相关技术博客 二.升级debezium2.0以上版本注意事项 三.安装jdk 四.修改kafka相关参数 五.启动kafk ...

最新文章

  1. iOS编程(双语版) - 视图 - 基本概念
  2. DB2存储过程——条件判断语句if then
  3. WinForm创建系统托盘以及操作注册表
  4. centos 安装指定版本gc_番外篇 (1) Docker 安装
  5. 【杂谈】有三AI秋季划火热进行中,如何深入学习模型优化,人脸算法,图像质量等研究方向...
  6. Windows10+PicGo+七牛云+Typora搭建写作环境与图床
  7. mysql 视图 字符集_MySQL创建子视图并查看的时候,字符集报错问题
  8. 多个DIV排列时居中
  9. SpringMvc 面试题
  10. Python将迁移到GitHub
  11. “哎哟,真的很快哦” 闪送宣布签约周杰伦为其品牌代言人
  12. python核心编程:第六章。
  13. python实现车牌识别
  14. DataFrame切片
  15. Python crawler 豆瓣电影排行榜评分
  16. keypair java_Java KeyPairGenerator genKeyPair()用法及代码示例
  17. cle IMP-00015: 由于对象已存在, 下列语句失败
  18. 埃拉托色尼筛选法计算素数个数
  19. 【文献查找神器】Connected Papers
  20. C++--名字空间的定义

热门文章

  1. 深圳绿色建筑人才需求持续增长
  2. 20P44 Premiere预设600个摄像机动画信号干扰调色视觉特效pr模板
  3. 初识 Mac机子 关机命令
  4. php m3u8转mp4
  5. pc端video视频播放的注意点,video兼容ie播放
  6. 修改opencv3.4 VideoCapture dshow模块以支持圆刚cv710等需要配置CrossBar的采集卡
  7. teamviewer12 linux安装,ubuntu16.04安装teamviewer12依赖包解决
  8. 还在担心零基础绘画?这篇文章让你少走弯路!
  9. 谁来PK“百度局域网”
  10. 键盘常用键的asii值