背景

SQLServer为实时更新数据同步提供了CDC机制,类似于Mysql的binlog,将数据更新操作维护到一张CDC表中。

开启cdc的源表在插入INSERT、更新UPDATE和删除DELETE活动时会插入数据到日志表中。cdc通过捕获进程将变更数据捕获到变更表中,通过cdc提供的查询函数,可以捕获这部分数据。

CDC的使用条件

1.SQL server 2008及以上的企业版、开发版和评估版;

2.需要开启代理服务(作业)。

3.CDC需要业务库之外的额外的磁盘空间。

4.CDC的表需要主键或者唯一主键。

图1:Sqlserver CDC原理

ADB4PG Sink使用条件

需要提前使用建表语句,在ADB4PG端建表,系统不会自动创建(如果有需要可以加这部分功能)

每张表需要有主键或唯一主键

当前支持的数据格式:INTEGER,BIGINT,SMALLINT,NUMERIC,DECIMAL,REAL,DOUBLEPERICISION,BOOLEAN,DATE,TIMESTAMP,VARCHAR

环境准备

SQLServer环境准备

已有自建SQLServer或云上RDS实例(示例使用云上RDS SQLServer实例)

已有windows环境,并安装SSMS(SQL Server Management Studio),部分命令需要在SSMS执行

SQLServer环境建表

-- 创建源表

create database connect

GO

use connect

GO

create table t1

(

a int NOT NULL PRIMARY KEY,

b BIGINT,

c SMALLINT,

d REAL,

e FLOAT,

f DATETIME,

g VARCHAR

);

-- 开启db级的cdc

exec sp_rds_cdc_enable_db

-- 验证数据库是否开启cdc成功

select * from sys.databases where is_cdc_enabled = 1

-- 对源表开启cdc

exec sp_cdc_enable_table @source_schema='dbo', @source_name='t1', @role_name=null;

ADB4PG端创建目标表

CREATE DATABASE connect;

create table t1

(

a int NOT NULL PRIMARY KEY,

b BIGINT,

c SMALLINT,

d REAL,

e FLOAT,

f TIMESTAMP,

g VARCHAR

);

Kafka环境准备

安装Kafka Server

1. 下载kafka安装包,并解压

SQL Server Source Connect目前只支持2.1.0及以上版本的Kafka Connect,故需要安装高版本kafka,实例使用kfakf-2.11-2.1.0。 http://kafka.apache.org/downloads?spm=a2c4g.11186623.2.19.7dd34587dwy89h#2.1.0

2. 编辑$KAFKA_HOME/config/server.properties

修改以下参数

...

## 为每台broker配置一个唯一的id号

broker.id=0

...

## log存储地址

log.dirs=/home/gaia/kafka_2.11-2.1.0/logs

## kafka集群使用的zk地址

zookeeper.connect=zk1:2181,zk2:2181,zk3:2181

...

3. 启动kafka server

bin/kafka-server-start.sh config/server.properties

安装Kafka Connect

1. 修改kafka connect配置文件

修改$KAFKA_HOME/config/connect-distributed.properties

## kafka server地址

bootstrap.servers=broker1:9092,broker2:9092,broker3:9092

## 为kafka connector选定一个消费group id

group.id=

## 安装插件的地址,每次kafka connector启动时会动态加载改路径下的jar包,可以将每个插件单独放到一个子路径

plugin.path=

安装需要的kafka-connect插件

1. 将插件jar包放在我们在前面已经配置过的配置的plugin.path路径下

sqlserver-source-connector

oss-sink-connector, 需要使用代码自行编译,注意在pom修改依赖的kafka及scala版本号

2. 编辑配置文件

# CDC connector的配置文件 sqlserver-cdc-source.json

{

"name": "sqlserver-cdc-source",

"config": {

"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",

"tasks.max" : "1",

"database.server.name" : "server1",

"database.hostname" : "database hostname",

"database.port" : "1433",

"database.user" : "xxxx",

"database.password" : "xxxxxx",

"database.dbname" : "connect",

"schemas.enable" : "false",

"mode":"incrementing",

"incrementing.column.name":"a",

"database.history.kafka.bootstrap.servers" : "kafka-broker:9092",

"database.history.kafka.topic": "server1.dbo.t1",

"value.converter.schemas.enable":"false",

"value.converter":"org.apache.kafka.connect.json.JsonConverter"

}

}

# oss sink的配置文件 oss-sink.json

{

"name":"oss-sink",

"config": {

"name":"oss-sink",

"topics":"server1.dbo.testdata",

"connector.class":"com.aliyun.oss.connect.kafka.OSSSinkConnector",

"format.class":"com.aliyun.oss.connect.kafka.format.json.JsonFormat",

"flush.size":"1",

"tasks.max":"4",

"storage.class":"com.aliyun.oss.connect.kafka.storage.OSSStorage",

"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",

"timestamp.extractor":"Record",

"oss.bucket":"traffic-csv",

"partition.duration.ms":"10000",

"path.format":"YYYY-MM-dd-HH",

"locale":"US",

"timezone":"Asia/Shanghai",

"rotate.interval.ms":"30000"

}

}

## adb4pg-jdbc-sink配置文件

{

"name":"adb4pg-jdbc-sink",

"config": {

"name":"adb4pg-jdbc-sink",

"topics":"server1.dbo.t1",

"connector.class":"io.confluent.connect.jdbc.Adb4PgSinkConnector",

"connection.url":"jdbc:postgresql://gp-8vb8xi62lohhh2777o.gpdb.zhangbei.rds.aliyuncs.com:3432/connect",

"connection.user":"xxx",

"connection.password":"xxxxxx",

"col.names":"a,b,c,d,e,f,g",

"col.types":"integer,bigint,smallint,real,doublepericision,timestamp,varchar",

"pk.fields":"a",

"target.tablename":"t1",

"tasks.max":"1",

"auto.create":"false",

"table.name.format":"t1",

"batch.size":"1"

}

}

由于OSS sinker使用了hdfs封装的FileSystem,需要将OSS相关的信息维护到$KAFKA_HOME/config/core-site.xml文件中

fs.oss.endpoint

xxxxxxx

fs.oss.accessKeyId

xxxxxxx

fs.oss.accessKeySecret

xxxxxxx

fs.oss.impl

org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem

fs.oss.buffer.dir

/tmp/oss

fs.oss.connection.secure.enabled

false

fs.oss.connection.maximum

2048

3. 启动已经配置好的kafka-connector插件

启动及删除connect任务命令

## 启动命令

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @sqlserver-cdc-source.json

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @adb4pg-jdbc-sink.json

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oss-sink.json

## 删除命令

curl -s -X DELETE http://localhost:8083/connectors/sqlserver-cdc-source

curl -s -X DELETE http://localhost:8083/connectors/adb4pg-jdbc-sink

curl -s -X DELETE http://localhost:8083/connectors/oss-sink

在ADB For PG获取更新数据

SQLServer插入赠/更/删数据记录

insert into t1(a,b,c,d,e,f,g) values(1, 2, 3, 4, 5, convert(datetime,'24-12-19 10:34:09 PM',5), 'h');

在kafka topic获取更新结果

先确认是否生成了kafka-connect所需的topic信息

bin/kafka-topics.sh --zookeeper zk_address --list

如截图,connect-configs, connect-offsets, connect-status为kafka-connect用来存储任务数据更新状态的topic。schema-changes-inventory是维护sqlserver表结构的topic。

可以通过kafka consloe-consumer上获取到的topic信息,以确认cdc数据正确被采集到kafka topic

bin/kafka-console-consumer.sh --bootstrap-server xx.xx.xx.xx:9092 --topic server1.dbo.t1

在ADB For PG上查询同步过来的数据

注意:因为是不同数据库之间的同步,时区设置的不同可能会导致同步结果产生时区偏移,需要在两侧数据库做好设置。

在OSS查看更新的数据

cdc工具 postgresql_SQLServer CDC数据通过Kafka connect实时同步至分析型数据库 AnalyticDB For PostgreSQL及OSS-阿里云开发者社区...相关推荐

  1. ssm mysql 插入date 数据_SSM中插入数据没有报错,但是数据库没有值?报错-问答-阿里云开发者社区-阿里云...

    mybatis默认开启事务不会自动提交,只有调用了commit才会提交事务.你这种情况明显是事务成功执行了,而且自增ID也已生效,但事务本身没有提交,请调用mybatis的commit方法提交事务! ...

  2. coba mysql_在Android Studio中将数据从MySQL数据库显示到TextView中-问答-阿里云开发者社区-阿里云...

    我是新手,Android Studio我想将数据库(我使用MySQL)中的数据显示到中TextView.我也使用Button和RadioButton.单击按钮后,数据将显示在中TextView.这是我 ...

  3. 阿里mysql数据库同步_如何对MySQL数据库中的数据进行实时同步-阿里云开发者社区...

    数据传输(Data Transmission) 支持以数据库为核心的结构化存储产品之间的数据传输. 它是一种集数据迁移.数据订阅及数据实时同步于一体的数据传输服务.数据传输致力于在公有云.混合云场景下 ...

  4. db h2 数据类型_H2数据库函数及数据类型概述-阿里云开发者社区

    H2数据库函数及数据类型概述 jieforest 2015-01-29 573浏览量 简介: H2数据库函数及数据类型概述 一.H2数据库常用数据类型 INT类型:对应java.lang.Intege ...

  5. python的requests模块功能_《Python数据可视化编程实战》—— 1.7 安装requests模块-阿里云开发者社区...

    本节书摘来异步社区<Python数据可视化编程实战>一书中的第1章,第1.7节,作者:[爱尔兰]Igor Milovanović,更多章节内容可以访问云栖社区"异步社区" ...

  6. tableau三轴合并_《Tableau数据可视化实战》——1.12节合并不同数据源-阿里云开发者社区...

    本节书摘来自华章社区<Tableau数据可视化实战>一书中的第1章,第1.12节合并不同数据源,作者(美)Ashutosh Nandeshwar,更多章节内容可以访问云栖社区"华 ...

  7. 瓴羊CEO朋新宇:从数据发现问题到数据创造价值|2022全球数字价值峰会-阿里云开发者社区

    9月27日-28日,由钛媒体与ITValue共同主办的2022全球数字价值峰会深圳站在深圳星河·领创天下举行.此次峰会以"复苏与可持续发展"为主题,聚焦"数据" ...

  8. 阿里云分析型数据库AnalyticDB:使用Logstash插件进行高效数据写入

    前言 AnalyticDB(简称ADB,ADS,早期项目名Garuda)是阿里巴巴自主研发的海量数据实时高并发在线实时分析型数据库(Real-Time OLAP).自2012年第一次在集团发布上线以来 ...

  9. mysql 轨迹数据存储_基于Tablestore实现海量运动轨迹数据存储-阿里云开发者社区...

    前言 现在越来越多的人都开始关心自己的运动数据,比如每日的计步.跑步里程.骑行里程等.运动APP与运动类的穿戴设备借助传感器.地图.GPS定位等技术,收集好运动数据以后,通过与互联网社交功能结合,产生 ...

最新文章

  1. 修改maven本地仓库的位置及疑惑
  2. Hdu 1029 Ignatius and the Princess IV
  3. html请求接口_前端工程师吐后端工程师(第八讲)——接口的开发
  4. java注解定义时间格式_SpringBoot基础教程2-1-8 数据验证-自定义日期格式验证
  5. 每天一道LeetCode-----买卖商品问题,计算最大利润,分别有一次交易,两次交易,多次交易的情况
  6. python办公实用功能_【一点资讯】实用办公技巧贴——当Python遇上PDF www.yidianzixun.com...
  7. Spring事务管理介绍
  8. 【转】java中定义二维数组的几种写法
  9. 博客园随笔添加自己的版权信息 [转]
  10. Luogu P1525 【关押罪犯】
  11. linux install nginx
  12. 来看看深度学习如何在文娱行业“落地”
  13. Unix/Linux shell脚本编程学习--Shell Script II
  14. python下载的库包放_python下载的库包存放路径
  15. Android SDK下载失败解决
  16. 评价模型总结——个人学习笔记(二)
  17. MAXDOS网刻教程~~(虚拟机与物理机 / 两台或者多台电脑之间)
  18. 戴尔(DELL)成就Vostro15-7580 15.6英寸八代混合独显便携商务笔记本 5699元
  19. arduinouno的地是相连的吗_如何连接地线是最标准的,能起到保障的作用吗?
  20. 3D远方动态白云页面源码

热门文章

  1. cv2.error: opencv(4.4.0)_【OpenCV 4开发详解】图像连通域分析
  2. python 格式化字符串 模板字符串(五分钟读懂)
  3. python列表数据类型(一分钟读懂)
  4. CPU启动计算机,如何加快双核CPU计算机win7启动速度
  5. 大屏数据可视化源码_数据可视化大屏快速入门
  6. 有源带阻和无源带阻的区别_一文看懂AOC有源光缆与DAC高速线缆的差异
  7. 析砂性土层php泥浆护壁,砂卵石层钻探护壁工艺分析
  8. A Point-Line Feature based Visual SLAM Method in Dynamic Indoor Scene
  9. 机器学习、深度学习需要哪些数学知识?✅
  10. 深度学习——Optimizer算法学习笔记