目前CDC(Change Data Capture)工具还是比较多的 ,现在有部分增量数据同步工具是基于触发器实现的,把数据库的变化记录到某个系统表中,然后在客户端建立缓冲,并定期将变化push到接收端,接收端和发送端建立通道,并定期pull新的数据变化,在目标数据库回放 。但因为使用了触发器会影响生产环境的性能,这点可能无法接受 ,其他的增量数据同步工具中占比比较大的一类,是基于kafka来实现的。例如RedHat开源的 Debezium。通过抽取数据库日志来获取变更,实现增量数据同步。下面是一个PostgreSQL数据同步至kafka 的例子:

一、准备java环境

[root@ysla bin]# yum install java-1.8.0-openjdk.x86_64
[root@ysla bin]# yum install java-1.8.0-openjdk-devel.x86_64
[root@ysla bin]# java -version
openjdk version "1.8.0_322"
OpenJDK Runtime Environment (build 1.8.0_322-b06)
OpenJDK 64-Bit Server VM (build 25.322-b06, mixed mode)

二、启动zookeeper

[root@ysla ~]# tar -xf apache-zookeeper-3.8.0-bin.tar.gz
[root@ysla ~]# cd apache-zookeeper-3.8.0-bin/
[root@ysla apache-zookeeper-3.8.0-bin]# ls
bin  conf  docs  lib  LICENSE.txt  NOTICE.txt  README.md  README_packaging.md
[root@ysla apache-zookeeper-3.8.0-bin]# cd conf/
[root@ysla conf]# ls
configuration.xsl  logback.xml  zoo_sample.cfg
[root@ysla conf]# cp zoo_sample.cfg  zoo.cfg[root@ysla bin]# pwd
/root/apache-zookeeper-3.8.0-bin/bin
[root@ysla bin]# ./zkServer.sh start
/bin/java
ZooKeeper JMX enabled by default
Using config: /root/apache-zookeeper-3.8.0-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@ysla bin]# jps
65408 Jps
65355 QuorumPeerMain

可以使用zkCli.sh进行连接测试

[root@ysla bin]# ./zkCli.sh -server 127.0.0.1:2181

三、启动kafka

[root@ysla ~]# tar -xf kafka_2.13-3.1.0.tgz
[root@ysla ~]# cd kafka_2.13-3.1.0/
[root@ysla kafka_2.13-3.1.0]# ls
bin  config  libs  LICENSE  licenses  NOTICE  site-docs

因为我zookeeper也安在本地,所以server.properties不用修改,走的localhost

[root@ysla bin]# ./kafka-server-start.sh ../config/server.properties &[root@ysla bin]# jps
74677 Kafka
75227 Jps
65355 QuorumPeerMain

四、安装kafkacat

安装依赖

[root@ysla kafkacat]# yum install -y librdkafka-devel

安装kafkacat

[root@ysla ~]# git clone https://github.com/edenhill/kafkacat
Cloning into 'kafkacat'...
remote: Enumerating objects: 1550, done.
remote: Counting objects: 100% (280/280), done.
remote: Compressing objects: 100% (177/177), done.
remote: Total 1550 (delta 157), reused 195 (delta 93), pack-reused 1270
Receiving objects: 100% (1550/1550), 627.51 KiB | 221.00 KiB/s, done.
Resolving deltas: 100% (935/935), done.[root@ysla ~]# cd kafkacat
[root@ysla kafkacat]# ./configure
[root@ysla kafkacat]# make -j 24
[root@ysla kafkacat]# make install -j 24[root@ysla kafkacat]# kcat --help
Usage: kcat <options> [file1 file2 .. | topic1 topic2 ..]]
kcat - Apache Kafka producer and consumer tool
https://github.com/edenhill/kcat
Copyright (c) 2014-2021, Magnus Edenhill
Version 1.7.1-2-g338ae3 (librdkafka 0.11.4 builtin.features=gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins)General options:-C | -P | -L | -Q  Mode: Consume, Produce, Metadata List, Query mode-G <group-id>      Mode: High-level KafkaConsumer (Kafka >=0.9 balanced consumer groups)Expects a list of topics to subscribe to-t <topic>         Topic to consume from, produce to, or list-p <partition>     Partition-b <brokers,..>    Bootstrap broker(s) (host[:port])-D <delim>         Message delimiter string:a-z | \r | \n | \t | \xNN ..Default: \n-K <delim>         Key delimiter (same format as -D)-c <cnt>           Limit message count-m <seconds>       Metadata (et.al.) request timeout.This limits how long kcat will blockwhile waiting for initial metadata to beretrieved from the Kafka cluster.It also sets the timeout for the producer'stransaction commits, init, aborts, etc.Default: 5 seconds.

五、数据库安装wal2json

我的下载地址为
Release wal2json 2.3 · eulerto/wal2json · GitHub

[root@ysla local]# tar -xf wal2json-wal2json_2_3.tar.gz
[root@ysla wal2json-wal2json_2_3]# chown -R postgres: wal2json-wal2json_2_3/
[root@ysla local]# su - postgres
Last login: Wed Mar 23 15:52:23 CST 2022 on pts/1
[postgres@ysla ~]$ cd /usr/local/wal2json-wal2json_2_3/
[postgres@ysla wal2json-wal2json_2_3]$ ls
expected  LICENSE  Makefile  README.md  sql  wal2json.c  wal2json.vcxproj[postgres@ysla local]$ export PATH=/opt/pg12/bin/pg_config:$PATH
[postgres@ysla local]$ make -j 24
make: *** No targets specified and no makefile found.  Stop.
[postgres@ysla local]$ ls
bin  etc  games  include  lib  lib64  libexec  psql  sbin  share  src  wal2json-wal2json_2_3  wal2json-wal2json_2_3.tar.gz
[postgres@ysla local]$ cd wal2json-wal2json_2_3/
[postgres@ysla wal2json-wal2json_2_3]$ make -j 24
gcc -std=gnu99 -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Werror=vla -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I. -I./ -I/opt/pg12/include/postgresql/server -I/opt/pg12/include/postgresql/internal  -D_GNU_SOURCE   -c -o wal2json.o wal2json.c
gcc -std=gnu99 -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Werror=vla -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC wal2json.o -L/opt/pg12/lib   -Wl,--as-needed -Wl,-rpath,'/opt/pg12/lib',--enable-new-dtags  -shared -o wal2json.so
[postgres@ysla wal2json-wal2json_2_3]$ make install -j 24
/bin/mkdir -p '/opt/pg12/lib/postgresql'
/bin/install -c -m 755  wal2json.so '/opt/pg12/lib/postgresql/'
[postgres@ysla wal2json-wal2json_2_3]$ ls
expected  LICENSE  Makefile  README.md  sql  wal2json.c  wal2json.o  wal2json.so  wal2json.vcxproj
[postgres@ysla wal2json-wal2json_2_3]$ ll /opt/pg12/lib/postgresql/wal2json.so
-rwxr-xr-x 1 postgres dba 49224 Mar 23 16:01 /opt/pg12/lib/postgresql/wal2json.so

postgresql.conf里添加shared_preload_libraries = 'wal2json.so’并修改wal_level = logical,之后重启数据库

六、造测试数据并消费

创建复制槽

[postgres@ysla ~]$ /opt/pg12/bin/pg_recvlogical -d postgres --slot slot1 --create-slot -P wal2json[postgres@ysla ~]$ psql -c "select * from pg_replication_slots;"slot_name |  plugin  | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
-----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------slot1     | wal2json | logical   |  13593 | postgres | f         | f      |            |      |          855 | 0/2A03FE20  | 0/2A03FE58
(1 row)

我这里为了方便使用本地postgres用户。如有需求,按这样创建用户CREATE ROLR uysl WITH REPLICATION PASSWORD ‘QAZqaz123’ LOGIN;并修改pg_hba.conf,使该用户可以远程或本地访问数据库。

创建测试表

CREATE TABLE test_table (id char(10) NOT NULL,code        char(10),PRIMARY KEY (id)
);

开启一个终端,生产数据到本地 kafka

[root@ysla ~]# /opt/pg12/bin/pg_recvlogical -h localhost -p 6000 -U postgres   -d postgres -S slot1 --start -f - | /root/kafkacat/kcat -b 172.20.10.3:9092 -t testdb_topic
% Auto-selecting Producer mode (use -P or -C to override)

消费testdb_topic

[root@ysla kafka_2.13-3.1.0]# pwd
/root/kafka_2.13-3.1.0
[root@ysla kafka_2.13-3.1.0]# bin/kafka-console-consumer.sh --topic testdb_topic --bootstrap-server 172.20.10.3:9092 --from-beginning

【PostgreSQL逻辑复制数据同步到kafka】相关推荐

  1. 云和恩墨大讲堂丨PostgreSQL逻辑复制案例分享

    PostgreSQL逻辑复制案例分享--2月24日20:00 在PostgreSQL和基于PostgreSQL的国产数据库的使用中,逻辑复制作为一种区别于流复制的数据同步功能,常用于主业务库向分析库的 ...

  2. 参数详解 复制进程_如何优化PostgreSQL逻辑复制

    How to Optimize PostgreSQL Logical Replication 逻辑复制( Logical Replication )或 Pglogical 是表级别的复制.两者都是基于 ...

  3. Maxwell 配置实时将MySQL数据同步到Kafka

    文章目录 环境准备 安装maxwell 下载地址 安装 创建元数据存放数据库 被同步的数据库上创建用于数据复制的账号,并授权 配置mysql同步到kafka任务 运行任务 全表同步 可能遇到的问题 M ...

  4. Postgresql逻辑复制DDL插件pgl_ddl_deploy

    pgl_ddl_deploy可以支持DDL复制,不过目前版本只支pg13持原生的逻辑复制,13之前的版本,需要安装pglogical,也就是需要部署两个插件. 所以对于使用pg13版本的同学,可以单独 ...

  5. Postgresql逻辑复制报错could not start WAL streaming: ERROR: replication slot “x“is active for PID xxx

    先看日志错误: #发布端报错如下: 2022-04-01 10:18:23.812 CST,"postgres","hank",4666,"10.4. ...

  6. PostgreSQL主从数据库数据同步

    运行环境 操作系统:Debian 11.5 数 据 库:PostgreSQL 14.6 主数据库:192.168.8.68 从数据库:192.168.8.69 使用apt-get安装postgresq ...

  7. 24日直播预告丨云和恩墨大讲堂:PostgreSQL逻辑复制案例分享

    2021年 云和恩墨大讲堂为大家带来 43场线上直播及2场线下活动 致力于将最前沿的 数据技术.原理解析和应用实践 带给广大数据同仁 2022年,云和恩墨大讲堂继续前行! 本周四晚

  8. postgresql update使用别名_PostgreSQL逻辑复制之pglogical

    朱贵平(lottu)   中国PG分会认证专家 宜搜科技资深DBA,擅长Oracle.PostgreSQL,目前从事Oracle.PostgreSQL 相关的运维管理及迁移等工作. 一.pglogic ...

  9. PostgreSQL V10逻辑复制

    目录 环境 文档用途 详细信息 环境 系统平台:Linux x86-64 Red Hat Enterprise Linux 7 版本:10.1 文档用途 PostgreSQL V10逻辑复制原理以及作 ...

最新文章

  1. 第四次产业革命将由四个领域引领:大数据、新材料、新能源和生物科技。
  2. NASA:首批“太空蜜蜂”将赴空间站上班,助宇航员推进研究!
  3. FPGA之道(42)FPGA设计的分类
  4. linux dd 截文件,Linux使用dd命令快速生成大文件(转)
  5. 【深度学习】U-Net 网络分割多分类医学图像解析
  6. 前端学习(2983):一文理解数据劫持1
  7. c语言怎样得到函数内参数的值_MySQL之自定义函数
  8. redis安装包_Linux中安装Redis
  9. Ubuntu安装Go语言环境
  10. MySQL半同步复制 - 优点、缺点、配置
  11. IO(一)----字节流
  12. 电脑版微信防撤回大全
  13. java 集合 并集_Java中多个集合的交集,并集和差集
  14. ElasticSearch常用语法大全
  15. 利用python实现外星人入侵大战小游戏(带源代码)
  16. React-Native: bios打开VT-x选项
  17. lisp不是函授型语言_为什么Common Lisp是一门难学的语言
  18. SQL server  查询练习(四十五道题)
  19. VMware 虚拟机设置nat网络
  20. Java+MySQL+查询操作

热门文章

  1. 微信h5界面隐藏分享按钮
  2. SEO图片优化小技巧
  3. Chrome浏览器如何导入证书(最新!)
  4. POI对Word操作参考
  5. VXI和PXI的区别
  6. SSIS Execute SQL Task assign output 的两种方法
  7. key too large to index, failing 3346解决
  8. HUAS Summer Trainning #3 M
  9. modelsim 常用快捷键
  10. Ansible Inventory内置参数