目标

本文是对[1]的复现和整理

环境

组件 版本
Zookeeper 3.6.0
Kafka 2.5.0
Mysql 8.0.21-0ubuntu0.20.04.4

准备工作

分别新建两个数据库A和B,然后各自新建一个表格

mysql> create database A;
Query OK, 1 row affected (0.12 sec)

mysql> create database B;
Query OK, 1 row affected (0.08 sec)

mysql> use A;
Database changed
mysql> CREATE TABLE `person` (
    ->   `pid` int(11) NOT NULL AUTO_INCREMENT,
    ->   `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
    ->   `age` int(11) DEFAULT NULL,
    ->   PRIMARY KEY (`pid`)
    -> ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
Query OK, 0 rows affected, 3 warnings (0.82 sec)

mysql> use B;
Database changed
mysql> CREATE TABLE `kafkaperson` (
    ->   `pid` int(11) NOT NULL AUTO_INCREMENT,
    ->   `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
    ->   `age` int(11) DEFAULT NULL,
    ->   PRIMARY KEY (`pid`)
    -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
Query OK, 0 rows affected, 5 warnings (0.49 sec)

集群启动

启动Hadoop,Zookeeper与Kafka

测试

①生产者:

$KAFKA/bin/kafka-topics.sh --create --zookeeper Desktop:2181 --replication-factor 1 --partitions 1 --topic mysql-kafka-person

②消费者

$KAFKA/bin/connect-standalone.sh $KAFKA/config/connect-standalone.properties $KAFKA/config/quickstart-mysql.properties $KAFKA/config/quickstart-mysql-sink.properties

③往A表插入条数据

mysql> INSERT INTO person (pid,firstname,age) VALUES ( 1, 'zs',66);
Query OK, 1 row affected (0.07 sec)

mysql> select * from person;
+-----+-----------+------+
| pid | firstname | age  |
+-----+-----------+------+
|   1 | zs        |   66 |
+-----+-----------+------+
1 row in set (0.00 sec)

④mysql> use B;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> show tables;
+-------------+
| Tables_in_B |
+-------------+
| kafkaperson |
+-------------+
1 row in set (0.00 sec)

mysql> select * from kafkaperson
    -> ;
+-----+-----------+------+
| pid | firstname | age  |
+-----+-----------+------+
|   1 | zs        |   66 |
+-----+-----------+------+
1 row in set (0.00 sec)

可以看到mysql 表A的数据通过kafka顺利传达到了表B,而在我们的kafka终端也会看到相关信息:

附录

quickstart-mysql.properties

name=mysql-a-source-person
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://Desktop:3306/A?user=appleyuchi&password=appleyuchi
# incrementing  自增
mode=incrementing
# 自增字段  pid
incrementing.column.name=pid
# 白名单表  person
table.whitelist=person
# topic前缀   mysql-kafka-
topic.prefix=mysql-kafka-

quickstart-mysql-sink.properties

name=mysql-a-sink-person
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#kafka的topic名称
topics=mysql-kafka-person
# 配置JDBC链接
connection.url=jdbc:mysql://Desktop:3306/B?user=appleyuchi&password=appleyuchi
# 不自动创建表,如果为true,会自动创建表,表名为topic名称
auto.create=false
# upsert model更新和插入
insert.mode=upsert
# 下面两个参数配置了以pid为主键更新
pk.mode = record_value
pk.fields = pid
#表名为kafkatable
table.name.format=kafkaperson

Reference:

[1]Kafka Connect 实现MySQL增量同步

[2]Kafka connect快速构建数据ETL通道

[3]Kafka Connect 日志配置

Kafka实现MySQL增量同步相关推荐

  1. mysql增量同步kafka_MySQL数据实时增量同步到Kafka - Flume

    写在前面的话 需求,将MySQL里的数据实时增量同步到Kafka.接到活儿的时候,第一个想法就是通过读取MySQL的binlog日志,将数据写到Kafka.不过对比了一些工具,例如:Canel,Dat ...

  2. mysql条件增量同步命令_DataX3 Mysql增量同步ES

    1.下载DataX 2.修改pom.xml wiriter部分我保留了 mysqlwriter txtfilewriter streamwriter elasticsearchwriter 3.mav ...

  3. datax实现mysql增量同步_datax 3.0配合crontab实现数据定时增量同步

    使用datax 实现数据增量同步踩坑记录 前提概要 由于项目上需要将a服务器数据同步至b服务器,一开始使用mysql主从复制,但是由于主从同步无法触发位于b服务器的触发器,只能放弃此方案.后来找到了d ...

  4. mysql增量同步_在两个MySQL数据库之间实现数据增量同步

    在线QQ客服:1922638 专业的SQL Server.MySQL数据库同步软件 在两个数据库中实现数据增量同步,令数据库之间的数据能够同步更新. Oracle数据库IP:192.168.0.1(源 ...

  5. mysql增量同步kafka_canal解析mysql的binlog实时推送到kafka

    今天整理一下以前写的一个kafka消费canal的demo,实现实时推送数据到kafka.首先先介绍一下canal,官网是这么说的: 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和 ...

  6. Clickhouse单机部署以及从mysql增量同步数据

    背景: 随着数据量的上升,OLAP一直是被讨论的话题,虽然druid,kylin能够解决OLAP问题,但是druid,kylin也是需要和hadoop全家桶一起用的,我也搞不定,那只能找我能搞定的技术 ...

  7. clickhouse 增量更新_Clickhouse單機部署以及從mysql增量同步數據

    背景: 隨着數據量的上升,OLAP一直是被討論的話題,雖然druid,kylin能夠解決OLAP問題,但是druid,kylin也是需要和hadoop全家桶一起用的,我也搞不定,那只能找我能搞定的技術 ...

  8. clickhouse 同步mysql_ClickHouse单机部署以及从MySQL增量同步数据

    背景: 随着数据量的上升,OLAP一直是被讨论的话题,虽然druid,kylin能够解决OLAP问题,但是druid,kylin也是需要和hadoop全家桶一起用的,异常的笨重,再说我也搞不定,那只能 ...

  9. 第02期:ClickHouse 单机部署以及从 MySQL 增量同步数据

    本期作者:邓亚运 37 互娱高级 DBA,负责公司 MySQL,Redis,Hadoop,Clickhouse 集群的管理和维护. 背景 随着数据量的上升,OLAP 一直是被讨论的话题,虽然 drui ...

最新文章

  1. ini文件怎么注释_wamp怎么升级php版本
  2. linux 有线网卡,linux下有线网卡出现ADDRCONF(NETDEV_UP): eth0: link is not ready的解决方法...
  3. 实时对讲是怎么发起的_QQ可实时显示手机电量并展示给好友 近半投票者支持该功能...
  4. 使用SpringTask定时获取传感器设备信息并缓存到Redis
  5. 10.IDA-基本操作
  6. Tomcat源码分析(九)--Session管理
  7. 阿里助手 5.12.2
  8. Codeforces 576D. Flights for Regular Customers(倍增floyd+bitset)
  9. ObjC学习3-类、继承、重载
  10. python之pdf分页
  11. Java 爬虫系列丨(一)爬虫介绍
  12. oracle检查表失效,PL/SQL联系oracle成功可以sql解决的办法是检查表的名称无法显示...
  13. 2020年书法落款_书法落款的基本常识最新版
  14. 【NOIP2018复习】可见点数【数论】
  15. 笔记《Graph Neural Tangent Kernel: Fusing Graph Neural Networks with Graph Kernels》-NeurIPS 2019
  16. 吐血总结:国内外App制作平台大集合,总有一款适合你
  17. 2021极术通讯-为什么智慧计算如此重要?
  18. Kali 安装详细步骤
  19. ios使用友盟分享到QQ/微信时时如何判断手机上是否安装了QQ以及微信的客户端
  20. [渝粤教育] 中国地质大学 Java语言程序设计 复习题

热门文章

  1. Xcode9学习笔记63 - 使用SystemSoundId播放简短声音(太长的无法播放)
  2. APK反编译得工具总结(转载)
  3. 超炫jQuery测试答题功能
  4. JS函数运行在它们被定义的作用域内,而不是它们被执行的作用域内
  5. 网络流量监控分析工具 Ntopng 安装
  6. TCP拥塞控制算法 — CUBIC的补丁(三)
  7. httpclient4 中文版帮助文档,最新官方版翻译版(第一章 上)
  8. 7-Zip CommondLine 使用记录
  9. MS12_020漏洞
  10. MOCTF-Web-还是水题