当我们遇到需要捕获数据库中数据变化的时候,总是会想到通过消息队列来实现该需求,通过把数据变化发布到消息队列,来完成系统上下游的解耦。关心这些数据变化的应用可以从消息队列上获取这些数据。

Bottledwater-pg是针对PostgreSQL数据库的一种消息生产者,可以将PostgreSQL数据库的数据写入confluent Kafka,从而实时的分享给消息订阅者。支持PostgreSQL 9.4以及以上版本,支持全量快照,以及持续解析数据WAL日志中的增量数据并写入Kafka。每一张数据库表为一个topic。数据在使用decode从WAL取出后,使用Avro将数据格式化(通常格式化为JSON)再写入Kafka。

Bottledwater-pg有docker、源码编译、Ubuntu三种使用方式,本文以源码编译方式说明如何部署。

一. 环境说明

  • 源端

IP:10.19.100.23

操作系统:RHEL6.3

数据库:PostgreSQL-9.4.11

  • 目的端

IP:10.19.100.21

操作系统:RHEL6.3

Kafka:2.10-0.10.2.0

二. 源端配置(10.19.100.23)

Bottledwater-pg依赖以下软件包:

  • avro-c > =1.8.0
  • jansson
  • libcurl
  • librdkafka > =0.9.1

Bottledwater-pg可选以下软件包:

  • libsnappy
  • boost

另外编译要求较高的cmake版本,操作系统自带的cmake会出现编译错误,本文使用:

  • cmake-3.8.0

2.0 卸载系统自带PG包

root用户操作)

RedHat根据安装模式可能自带8.4版本的PostgreSQL,请务必卸载,否则会对编译造成影响

# rpm -qa|grep postgres
postgresql-libs-8.4.11-1.el6_2.x86_64
postgresql-devel-8.4.11-1.el6_2.x86_64
postgresql-8.4.11-1.el6_2.x86_64# rpm -e postgresql-devel-8.4.11-1.el6_2.x86_64
# rpm -e postgresql-8.4.11-1.el6_2.x86_64
# rpm -e postgresql-libs-8.4.11-1.el6_2.x86_64

2.1 编译安装cmake

root用户操作)

# cd /opt
# tar zxvf cmake-3.8.0.tar.gz
# cd cmake-3.8.0
# ./bootstrap
# make
# make install# cmake -version
cmake version 3.8.0
CMake suite maintained and supported by Kitware (kitware.com/cmake).

2.2 编译安装jansson

root用户操作)

# cd /opt
# tar jxvf jansson-2.9.tar.bz2
# cd jansson-2.9
# ./configure
# make
# make install# ls /usr/local/lib/pkgconfig
jansson.pc#export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH

2.3 编译安装avro

root用户操作)

# cd /opt
# yum install -y xz-*
# yum install -y zlib-devel.x86_64# tar zxvf avro-src-1.8.1.tar.gz
# cd avro-src-1.8.1/lang/c
# mkdir build
# cd build
# cmake .. -DCMAKE_INSTALL_PREFIX=/opt/avro -DCMAKE_BUILD_TYPE=Release -DTHREADSAFE=true
# make
# make test
# make install

导入库文件

# vi /etc/ld.so.conf
/opt/avro/lib# ldconfig

配置临时环境变量

# export LD_LIBRARY_PATH=/opt/avro/lib:$LD_LIBRARY_PATH# export PKG_CONFIG_PATH=/opt/avro/lib/pkgconfig:/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH

2.4 安装libcurl

root用户操作)

# yum install -y libcurl-devel.x86_64

2.5 编译安装librdkafka

root用户操作)

# cd /opt
# unzip librdkafka-master.zip
# cd librdkafka-master
# ./configure
# make
# make install# ls /usr/local/lib/pkgconfig
jansson.pc  rdkafka.pc  rdkafka++.pc

2.6 添加引用库

root用户操作)

# vi /etc/ld.so.conf.d/bottledwater.conf
/opt/avro/lib
/usr/local/lib
/PostgreSQL/9.4/lib# ldconfig

2.7 编译安装bottledwater-pg

root用户操作)

# chown -R postgres:postgres /opt/

postgres用户操作)

配置环境变量

$ vi ~/.bash_profile# .bash_profile
# Get the aliases and functionsif [ -f ~/.bashrc ]; then. ~/.bashrc
fi# User specific environment and startup programsexport PG_HOME=/PostgreSQL/9.4
export LD_LIBRARY_PATH=/opt/avro/lib:$LD_LIBRARY_PATH
export PKG_CONFIG_PATH=/opt/avro/lib/pkgconfig:/usr/local/lib/pkgconfig:$PKG_CONFIG_PATHPATH=$PG_HOME/bin:$PATH:$HOME/bin
export PATH

准备安装包

$ unzip bottledwater-pg-master.zip
$ cd bottledwater-pg-master

这里不知道是操作系统环境的问题还是开源软件本身的问题,需要修改源码包里的2处Makefile才能通过编译。

  • 修改client/Makefile
PG_CFLAGS = -I$(shell pg_config --includedir) -I$(shell pg_config --includedir-server) -I$(shell pg_config --pkgincludedir)

  • 修改kafka/Makefile
PG_CFLAGS = -I$(shell pg_config --includedir) -I$(shell pg_config --includedir-server) -I$(shell pg_config --pkgincludedir)LDFLAGS=-L/usr/lib64 $(CURL_LDFLAGS) $(PG_LDFLAGS) $(KAFKA_LDFLAGS) $(AVRO_LDFLAGS) $(JSON_LDFLAGS)

编译并安装bottledwater-pg

$ make
$ make install

安装完成后会自动在PostgreSQL数据库扩展包目录下生成扩展库文件和扩展库控制文件。

$ ls /PostgreSQL/9.4/lib/postgresql/bottledwater*
/PostgreSQL/9.4/lib/postgresql/bottledwater.so$ ls /PostgreSQL/9.4/share/postgresql/extension/bottledwater*
/PostgreSQL/9.4/share/postgresql/extension/bottledwater--0.1.sql
/PostgreSQL/9.4/share/postgresql/extension/bottledwater.control

2.7 数据库配置

试验环境中PostgreSQL已有数据库mas,用户mas,密码mas。

Mas库中有一张表:

mastest(col1 numeric primary key,col2 character varying(10));

  • 修改数据库配置文件
$ vi /PostgreSQL/9.4/data/postgresql.confmax_worker_processes = 8        # 至少为8
wal_level = logical             # 至少为logical,可以更高
max_wal_senders = 8             # 至少为8
wal_keep_segments = 256         # 至少为256
max_replication_slots = 4       # 至少为4

  • 修改数据库白名单配置文件

  *这部分权限可能过大,可以精简

$ vi /PostgreSQL/9.4/data/pg_hba.conflocal   replication     all                                     trust
host    replication     all             127.0.0.1/32            trust
host    replication     all             0.0.0.0/0               md5
host    all             postgres        10.19.100.23/32         trust

  • 重启数据库并对要监控的库创建bottledwater扩展
$ pg_ctl restart -D /PostgreSQL/9.4/data/ -l /PostgreSQL/9.4/data/pglog.log -m fast$ psql -U postgres -d mas -c "create extension bottledwater;"
Password for user postgres:CREATE EXTENSION

三. 测试同步

  • 在源端(10.19.100.23)启动bottledwater
$ cd /opt/bottledwater-pg-master/kafka$ ./bottledwater --postgres=postgres://postgres:123456@10.19.100.23:5432/mas --broker=10.19.100.21:9092 -f json

[INFO] Writing messages to Kafka in JSON format
[INFO] Created replication slot "bottledwater", capturing consistent snapshot "00000718-1".
INFO:  bottledwater_export: Table mas.mastest is keyed by index mastest_pkey
[INFO] Snapshot complete, streaming changes from 0/1749F58.

  • 向源端数据库写入数据
mas=> insert into mastest values(1,'A');
INSERT 0 1mas=> insert into mastest values(2,'B');
INSERT 0 1mas=> update mastest set col2='C' where col1=2;
UPDATE 1mas=> delete from mastest where col1=1;
DELETE 1

  • 目的端查看消息事件
# bin/kafka-topics.sh --list --zookeeper localhost:2181
mas.mastest# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mas.mastest --from-beginning --property print.key=true
{"col1": {"double": 1.0}}       {"col1": {"double": 1.0}, "col2": {"string": "A"}}
{"col1": {"double": 2.0}}       {"col1": {"double": 2.0}, "col2": {"string": "B"}}
{"col1": {"double": 2.0}}       {"col1": {"double": 2.0}, "col2": {"string": "C"}}
{"col1": {"double": 1.0}}       null

说明:

1. 目的端Kafka配置文件中,Listener的配置不能用默认值localhost

2. 每一张PostgreSQL中的表都会被做为一个topic,每个topic是自动创建的不需要人工干预,即使后期新建的表也是如此。

3. 消息事件以“主键列 + 变更后的所有列”作为消息内容

4. 主从复制模式下,slave不能启动bottledwater,因为备库不产生wal日志

转载于:https://www.cnblogs.com/aegis1019/p/9051462.html

通过Bottledwater同步PostgreSQL中的数据变化到Kafka消息队列相关推荐

  1. linux查看数据积压,查看kafka消息队列的积压情况

    创建topic kafka-topics --create --zookeeper master:2181/kafka2 --replication-factor 2 --partitions 3 - ...

  2. Flink使用KafkaSource从Kafka消息队列中读取数据

    Flink使用KafkaSource从Kafka消息队列中读取数据 使用KafkaSource从Kafka消息队列中读取数据 1.KafkaSource创建的DataStream是一个并行的DataS ...

  3. android listview 数据同步,android中ListView数据刷新时的同步方法

    本文实例讲述了android中ListView数据刷新时的同步方法.分享给大家供大家参考.具体实现方法如下: public class Main extends BaseActivity { priv ...

  4. dto 是只给前端需要的数据吗_解决消息队列的数据积压很难?其实只需要这三招...

    1 概述 最近生产环境的消息通知队列发生了大量的数据积压问题,从而影响到整个平台商户的交易无法正常进行,最后只能通过临时关闭交易量较大的商户来缓解消息队列积压的问题,经线上数据分析,我们的消息队列在面 ...

  5. 大数据之Kafka消息队列

    一.消息队列: 消息队列的核心功能:解耦,异步和并行. 消息队列与rpc区别: 消息队列只负责发送消息:rpc需要调用,并给响应状态码:相同点是他们都能解耦. 消息队列: activeMQ: jdk: ...

  6. RibbitMQ 大数据分布式下的消息队列思

    对于RibbitMQ  消息队列 使用: 定义一个队列,作为消息队列 生产者,生产消息添加入队列 消费者,监听到消息队列中有消息后,取出消息处理消息 每台服务器也可以配置多个消费者去处理消息 此处向H ...

  7. 实时数仓 大数据 Hadoop flink kafka

    ⼀.实时数仓建设背景 实时需求⽇趋迫切 ⽬前各⼤公司的产品需求和内部决策对于数据实时性的要求越来越迫切,需要实时数仓的能⼒来赋能.传统离 线数仓的数据时效性是 T+1,调度频率以天为单位,⽆法⽀撑实时 ...

  8. 2021年大数据Kafka(一):❤️消息队列和Kafka的基本介绍❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 消息队列和Kafka的基本介绍 一.什么是消息队列 二.消息队列的应用场景 ...

  9. canal 监听不到数据变化_数据的异构实战(二)手写迷你版同步工程

    点击上方"Java知音",选择"置顶公众号" 技术文章第一时间送达! 上一期讲到了通过canal订阅mysql的binlog日志并且转换为对象,那么这一次我们将 ...

  10. websphere mq 查看队列中是否有数据_全网最全的 “消息队列”

    消息队列的使用场景 以下介绍消息队列在实际应用常用的使用场景.异步处理.应用解耦.流量削锋和消息通讯四个场景. 1]异步处理:场景说明:用户注册后,需要发注册邮件和注册短信. 引入消息队列后架构如下: ...

最新文章

  1. Windows下MongoDB安装及创建用户名和密码
  2. linux 脚本自动编制工具,全自动工具链编译脚本
  3. c++ #define 预处理器
  4. JVM - 常见的JVM种类
  5. sql server之数据库语句优化
  6. Delphi IDE使用的一些主要技巧
  7. ubuntu下使用python将ppt转成图片_Ubuntu下使用Python实现游戏制作中的切分图片功能...
  8. 【BZOJ】3238: [Ahoi2013]差异
  9. 如何将根文件系统制作成yaffs格式,并设置从yaffs启动
  10. 蓝桥杯第五届JavaC组杨辉三角问题解决方法
  11. gunicorn: No module named 'fcntl'
  12. 极客大学架构师训练营 JVM虚拟机原理 JVM垃圾回收原理 Java编程优化 第17课 听课总结
  13. 医疗保险前台系统ER图1
  14. buuctf easyweb
  15. ice服务器修复教程,Bootice:系统引导菜单修复利器的功能解说
  16. Linux 下查询 DNS 服务器信息
  17. 18级计算机应用基础期中考试题,早安正能量 励志新开始---法学与社会学学院18级新生开展早自习活动...
  18. 三子棋(井字棋) 保姆级详解
  19. 如何使用python做图_如何使用python做动图
  20. Linux系统可以显示文件名,Linux系统如何显示中文目录和文件名

热门文章

  1. 客户消费积分管理系统的设计与实现
  2. QQ空间日志导出(php)
  3. SDUT OJ温度转换
  4. 网站出现403错误怎么办,为什么会出现403错误,要怎么解决
  5. An invalid domain [xx] was specified for this cookie
  6. 洛谷-2822 组合数问题
  7. c语言课设雷霆战机编程,C语言写的雷霆战机
  8. Mysql primary key主键冲突的可能性与解决方案
  9. javascript有声调的汉字注音字典(兼容各浏览器)
  10. springboot打包-依赖包到单独文件夹并微缩原有jar包体积