cdc工具 postgresql_SQLServer CDC数据通过Kafka connect实时同步至分析型数据库 AnalyticDB For PostgreSQL及OSS-阿里云开发者社区...
背景
SQLServer为实时更新数据同步提供了CDC机制,类似于Mysql的binlog,将数据更新操作维护到一张CDC表中。
开启cdc的源表在插入INSERT、更新UPDATE和删除DELETE活动时会插入数据到日志表中。cdc通过捕获进程将变更数据捕获到变更表中,通过cdc提供的查询函数,可以捕获这部分数据。
CDC的使用条件
1.SQL server 2008及以上的企业版、开发版和评估版;
2.需要开启代理服务(作业)。
3.CDC需要业务库之外的额外的磁盘空间。
4.CDC的表需要主键或者唯一主键。
图1:Sqlserver CDC原理
ADB4PG Sink使用条件
需要提前使用建表语句,在ADB4PG端建表,系统不会自动创建(如果有需要可以加这部分功能)
每张表需要有主键或唯一主键
当前支持的数据格式:INTEGER,BIGINT,SMALLINT,NUMERIC,DECIMAL,REAL,DOUBLEPERICISION,BOOLEAN,DATE,TIMESTAMP,VARCHAR
环境准备
SQLServer环境准备
已有自建SQLServer或云上RDS实例(示例使用云上RDS SQLServer实例)
已有windows环境,并安装SSMS(SQL Server Management Studio),部分命令需要在SSMS执行
SQLServer环境建表
-- 创建源表
create database connect
GO
use connect
GO
create table t1
(
a int NOT NULL PRIMARY KEY,
b BIGINT,
c SMALLINT,
d REAL,
e FLOAT,
f DATETIME,
g VARCHAR
);
-- 开启db级的cdc
exec sp_rds_cdc_enable_db
-- 验证数据库是否开启cdc成功
select * from sys.databases where is_cdc_enabled = 1
-- 对源表开启cdc
exec sp_cdc_enable_table @source_schema='dbo', @source_name='t1', @role_name=null;
ADB4PG端创建目标表
CREATE DATABASE connect;
create table t1
(
a int NOT NULL PRIMARY KEY,
b BIGINT,
c SMALLINT,
d REAL,
e FLOAT,
f TIMESTAMP,
g VARCHAR
);
Kafka环境准备
安装Kafka Server
1. 下载kafka安装包,并解压
SQL Server Source Connect目前只支持2.1.0及以上版本的Kafka Connect,故需要安装高版本kafka,实例使用kfakf-2.11-2.1.0。 http://kafka.apache.org/downloads?spm=a2c4g.11186623.2.19.7dd34587dwy89h#2.1.0
2. 编辑$KAFKA_HOME/config/server.properties
修改以下参数
...
## 为每台broker配置一个唯一的id号
broker.id=0
...
## log存储地址
log.dirs=/home/gaia/kafka_2.11-2.1.0/logs
## kafka集群使用的zk地址
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
...
3. 启动kafka server
bin/kafka-server-start.sh config/server.properties
安装Kafka Connect
1. 修改kafka connect配置文件
修改$KAFKA_HOME/config/connect-distributed.properties
## kafka server地址
bootstrap.servers=broker1:9092,broker2:9092,broker3:9092
## 为kafka connector选定一个消费group id
group.id=
## 安装插件的地址,每次kafka connector启动时会动态加载改路径下的jar包,可以将每个插件单独放到一个子路径
plugin.path=
安装需要的kafka-connect插件
1. 将插件jar包放在我们在前面已经配置过的配置的plugin.path路径下
sqlserver-source-connector
oss-sink-connector, 需要使用代码自行编译,注意在pom修改依赖的kafka及scala版本号
2. 编辑配置文件
# CDC connector的配置文件 sqlserver-cdc-source.json
▽
{
"name": "sqlserver-cdc-source",
"config": {
"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max" : "1",
"database.server.name" : "server1",
"database.hostname" : "database hostname",
"database.port" : "1433",
"database.user" : "xxxx",
"database.password" : "xxxxxx",
"database.dbname" : "connect",
"schemas.enable" : "false",
"mode":"incrementing",
"incrementing.column.name":"a",
"database.history.kafka.bootstrap.servers" : "kafka-broker:9092",
"database.history.kafka.topic": "server1.dbo.t1",
"value.converter.schemas.enable":"false",
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
}
}
# oss sink的配置文件 oss-sink.json
{
"name":"oss-sink",
"config": {
"name":"oss-sink",
"topics":"server1.dbo.testdata",
"connector.class":"com.aliyun.oss.connect.kafka.OSSSinkConnector",
"format.class":"com.aliyun.oss.connect.kafka.format.json.JsonFormat",
"flush.size":"1",
"tasks.max":"4",
"storage.class":"com.aliyun.oss.connect.kafka.storage.OSSStorage",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"timestamp.extractor":"Record",
"oss.bucket":"traffic-csv",
"partition.duration.ms":"10000",
"path.format":"YYYY-MM-dd-HH",
"locale":"US",
"timezone":"Asia/Shanghai",
"rotate.interval.ms":"30000"
}
}
## adb4pg-jdbc-sink配置文件
{
"name":"adb4pg-jdbc-sink",
"config": {
"name":"adb4pg-jdbc-sink",
"topics":"server1.dbo.t1",
"connector.class":"io.confluent.connect.jdbc.Adb4PgSinkConnector",
"connection.url":"jdbc:postgresql://gp-8vb8xi62lohhh2777o.gpdb.zhangbei.rds.aliyuncs.com:3432/connect",
"connection.user":"xxx",
"connection.password":"xxxxxx",
"col.names":"a,b,c,d,e,f,g",
"col.types":"integer,bigint,smallint,real,doublepericision,timestamp,varchar",
"pk.fields":"a",
"target.tablename":"t1",
"tasks.max":"1",
"auto.create":"false",
"table.name.format":"t1",
"batch.size":"1"
}
}
由于OSS sinker使用了hdfs封装的FileSystem,需要将OSS相关的信息维护到$KAFKA_HOME/config/core-site.xml文件中
fs.oss.endpoint
xxxxxxx
fs.oss.accessKeyId
xxxxxxx
fs.oss.accessKeySecret
xxxxxxx
fs.oss.impl
org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
fs.oss.buffer.dir
/tmp/oss
fs.oss.connection.secure.enabled
false
fs.oss.connection.maximum
2048
3. 启动已经配置好的kafka-connector插件
启动及删除connect任务命令
## 启动命令
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @sqlserver-cdc-source.json
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @adb4pg-jdbc-sink.json
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oss-sink.json
## 删除命令
curl -s -X DELETE http://localhost:8083/connectors/sqlserver-cdc-source
curl -s -X DELETE http://localhost:8083/connectors/adb4pg-jdbc-sink
curl -s -X DELETE http://localhost:8083/connectors/oss-sink
在ADB For PG获取更新数据
SQLServer插入赠/更/删数据记录
insert into t1(a,b,c,d,e,f,g) values(1, 2, 3, 4, 5, convert(datetime,'24-12-19 10:34:09 PM',5), 'h');
在kafka topic获取更新结果
先确认是否生成了kafka-connect所需的topic信息
bin/kafka-topics.sh --zookeeper zk_address --list
如截图,connect-configs, connect-offsets, connect-status为kafka-connect用来存储任务数据更新状态的topic。schema-changes-inventory是维护sqlserver表结构的topic。
可以通过kafka consloe-consumer上获取到的topic信息,以确认cdc数据正确被采集到kafka topic
bin/kafka-console-consumer.sh --bootstrap-server xx.xx.xx.xx:9092 --topic server1.dbo.t1
在ADB For PG上查询同步过来的数据
注意:因为是不同数据库之间的同步,时区设置的不同可能会导致同步结果产生时区偏移,需要在两侧数据库做好设置。
在OSS查看更新的数据
cdc工具 postgresql_SQLServer CDC数据通过Kafka connect实时同步至分析型数据库 AnalyticDB For PostgreSQL及OSS-阿里云开发者社区...相关推荐
- ssm mysql 插入date 数据_SSM中插入数据没有报错,但是数据库没有值?报错-问答-阿里云开发者社区-阿里云...
mybatis默认开启事务不会自动提交,只有调用了commit才会提交事务.你这种情况明显是事务成功执行了,而且自增ID也已生效,但事务本身没有提交,请调用mybatis的commit方法提交事务! ...
- coba mysql_在Android Studio中将数据从MySQL数据库显示到TextView中-问答-阿里云开发者社区-阿里云...
我是新手,Android Studio我想将数据库(我使用MySQL)中的数据显示到中TextView.我也使用Button和RadioButton.单击按钮后,数据将显示在中TextView.这是我 ...
- 阿里mysql数据库同步_如何对MySQL数据库中的数据进行实时同步-阿里云开发者社区...
数据传输(Data Transmission) 支持以数据库为核心的结构化存储产品之间的数据传输. 它是一种集数据迁移.数据订阅及数据实时同步于一体的数据传输服务.数据传输致力于在公有云.混合云场景下 ...
- db h2 数据类型_H2数据库函数及数据类型概述-阿里云开发者社区
H2数据库函数及数据类型概述 jieforest 2015-01-29 573浏览量 简介: H2数据库函数及数据类型概述 一.H2数据库常用数据类型 INT类型:对应java.lang.Intege ...
- python的requests模块功能_《Python数据可视化编程实战》—— 1.7 安装requests模块-阿里云开发者社区...
本节书摘来异步社区<Python数据可视化编程实战>一书中的第1章,第1.7节,作者:[爱尔兰]Igor Milovanović,更多章节内容可以访问云栖社区"异步社区" ...
- tableau三轴合并_《Tableau数据可视化实战》——1.12节合并不同数据源-阿里云开发者社区...
本节书摘来自华章社区<Tableau数据可视化实战>一书中的第1章,第1.12节合并不同数据源,作者(美)Ashutosh Nandeshwar,更多章节内容可以访问云栖社区"华 ...
- 瓴羊CEO朋新宇:从数据发现问题到数据创造价值|2022全球数字价值峰会-阿里云开发者社区
9月27日-28日,由钛媒体与ITValue共同主办的2022全球数字价值峰会深圳站在深圳星河·领创天下举行.此次峰会以"复苏与可持续发展"为主题,聚焦"数据" ...
- 阿里云分析型数据库AnalyticDB:使用Logstash插件进行高效数据写入
前言 AnalyticDB(简称ADB,ADS,早期项目名Garuda)是阿里巴巴自主研发的海量数据实时高并发在线实时分析型数据库(Real-Time OLAP).自2012年第一次在集团发布上线以来 ...
- mysql 轨迹数据存储_基于Tablestore实现海量运动轨迹数据存储-阿里云开发者社区...
前言 现在越来越多的人都开始关心自己的运动数据,比如每日的计步.跑步里程.骑行里程等.运动APP与运动类的穿戴设备借助传感器.地图.GPS定位等技术,收集好运动数据以后,通过与互联网社交功能结合,产生 ...
最新文章
- 修改maven本地仓库的位置及疑惑
- Hdu 1029 Ignatius and the Princess IV
- html请求接口_前端工程师吐后端工程师(第八讲)——接口的开发
- java注解定义时间格式_SpringBoot基础教程2-1-8 数据验证-自定义日期格式验证
- 每天一道LeetCode-----买卖商品问题,计算最大利润,分别有一次交易,两次交易,多次交易的情况
- python办公实用功能_【一点资讯】实用办公技巧贴——当Python遇上PDF www.yidianzixun.com...
- Spring事务管理介绍
- 【转】java中定义二维数组的几种写法
- 博客园随笔添加自己的版权信息 [转]
- Luogu P1525 【关押罪犯】
- linux install nginx
- 来看看深度学习如何在文娱行业“落地”
- Unix/Linux shell脚本编程学习--Shell Script II
- python下载的库包放_python下载的库包存放路径
- Android SDK下载失败解决
- 评价模型总结——个人学习笔记(二)
- MAXDOS网刻教程~~(虚拟机与物理机 / 两台或者多台电脑之间)
- 戴尔(DELL)成就Vostro15-7580 15.6英寸八代混合独显便携商务笔记本 5699元
- arduinouno的地是相连的吗_如何连接地线是最标准的,能起到保障的作用吗?
- 3D远方动态白云页面源码
热门文章
- cv2.error: opencv(4.4.0)_【OpenCV 4开发详解】图像连通域分析
- python 格式化字符串 模板字符串(五分钟读懂)
- python列表数据类型(一分钟读懂)
- CPU启动计算机,如何加快双核CPU计算机win7启动速度
- 大屏数据可视化源码_数据可视化大屏快速入门
- 有源带阻和无源带阻的区别_一文看懂AOC有源光缆与DAC高速线缆的差异
- 析砂性土层php泥浆护壁,砂卵石层钻探护壁工艺分析
- A Point-Line Feature based Visual SLAM Method in Dynamic Indoor Scene
- 机器学习、深度学习需要哪些数学知识?✅
- 深度学习——Optimizer算法学习笔记