Canal

一、简介

canal [kə’næl],主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

二、工作原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

三、部署

1、准备

  • 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    
    • 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    CREATE USER canal IDENTIFIED BY 'canal';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    

2、配置并启动

2.1、下载 canal, 以最新版1.1.6 版本为例

wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz

2.2、解压缩

mkdir /opt/canal/canal.deployer
tar zxvf canal.deployer-1.1.6.tar.gz  -C /opt/canal/canal.deployer

解压完成后,进入/opt/canal/canal.deployer 目录,可以看到如下结构

drwxr-xr-x 2 root root   93 12月 12 14:28 bin
drwxr-xr-x 6 root root  179 12月 12 14:28 conf
drwxr-xr-x 2 root root 4096 12月 12 11:41 lib
drwxrwxrwx 5 root root   62 12月 12 14:05 logs
drwxrwxrwx 2 root root  235 8月  11 10:52 plugin

2.3、Canal server配置修改

$ cat  canal.properties
#################################################
#########       common argument     #############
#################################################
# tcp bind ip
这里修改为canal服务ip地址
canal.ip = 192.168.150.138
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
这里修改为pulsarMQ
canal.serverMode = pulsarMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolation
canal.instance.get.ddl.isolation = false# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360#################################################
#########       destinations        #############
#################################################
这里可以设置自定义目录(即instance名称),需要在该目录下添加配置文件instance.properties
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = falsecanal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml##################################################
#########         MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = localcanal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8##################################################
#########            Kafka           #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"##################################################
#########           RocketMQ         #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =##################################################
#########           RabbitMQ         #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =##################################################
#########             Pulsar         #############
##################################################
这里修改为对应的pulsar地址
pulsarmq.serverUrl = pulsar://192.168.150.139:6650
pulsarmq.roleToken =
pulsarmq.topicTenantPrefix = public/default

2.4、Canal instance配置修改:

vi conf/example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0# enable gtid use true/false
canal.instance.gtidon=false# position info
需要改成自己的数据库信息
canal.instance.master.address=192.168.150.140:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password
需要改成自己的数据库信息
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex
需要同步的数据库或数据库表
canal.instance.filter.regex=canal\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
自定义pulsar中topic的名字
canal.mq.topic=sync-mysql-bin-test1
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
  • canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
  • 如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false

2.5、启动

sh bin/startup.sh

2.6、查看 server 日志

tail logs/canal/canal.log
2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

2.7、查看 instance 的日志

vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

2.8、该数据库里有binlog变更后,查看队列日志

$ tail  /opt/canal/canal.deployer/logs/example/meta.log
2022-12-12 14:29:01.169 - clientId:1001 cursor:[mysql-bin.000002,40134,1670826065000,1,] address[/111.111.0.117:3306]
2022-12-12 14:29:20.169 - clientId:1001 cursor:[mysql-bin.000002,40401,1670826559000,1,] address[KD111111000117.ppp-bb.dion.ne.jp/192.168.150.140:3306]
2022-12-12 14:29:27.169 - clientId:1001 cursor:[mysql-bin.000002,40669,1670826566000,1,] address[KD111111000117.ppp-bb.dion.ne.jp/192.168.150.140:3306]
2022-12-12 14:29:35.169 - clientId:1001 cursor:[mysql-bin.000002,40937,1670826574000,1,] address[KD111111000117.ppp-bb.dion.ne.jp/192.168.150.140:3306]
2022-12-12 14:29:44.169 - clientId:1001 cursor:[mysql-bin.000002,41205,1670826583000,1,] address[KD111111000117.ppp-bb.dion.ne.jp/192.168.150.140:3306]
2022-12-12 14:30:25.169 - clientId:1001 cursor:[mysql-bin.000002,41473,1670826624000,1,] address[KD111111000117.ppp-bb.dion.ne.jp/192.168.150.140:3306]
2022-12-12 14:30:32.169 - clientId:1001 cursor:[mysql-bin.000002,41741,1670826631000,1,] address[KD111111000117.ppp-bb.dion.ne.jp/192.168.150.140:3306]
2022-12-12 14:30:44.169 - clientId:1001 cursor:[mysql-bin.000002,42008,1670826643000,1,] address[KD111111000117.ppp-bb.dion.ne.jp/192.168.150.140:3306]

2.9、查看puslar中topic是否创建或有数据进入

bin/pulsar-admin topics list public.default

2.10、关闭

sh bin/stop.sh

3、参考

  • https://github.com/alibaba/canal
  • https://github.com/alibaba/canal/wiki/QuickStart

Canal同步mysql binlog至pulsar相关推荐

  1. 使用canal同步MySQL数据到Elasticsearch(ES)

    目录 1.功能及使用场景 1.1.功能介绍 1.2.使用场景 2.需求引入 3.canal文件下载及准备 3.1 下载文件 3.2 准备文件 4.deployer安装及效果测试 4.1.deploye ...

  2. Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB

    摘要:本文将介绍如何将 MySQL 中的数据,通过 Binlog + Canal 的形式导入到 Kafka 中,继而被 Flink 消费的案例.内容包括: 背景介绍 环境介绍 部署 TiDB Clus ...

  3. Elasticsearch7.9集群部署,head插件,canal同步mysql数据到es,亲自测试,无坑

    Elasticsearch集群部署 1.服务器规划 10.4.7.11 node1 10.4.7.12 node2 10.4.7.13 node3 1. 集群相关    一个运行中的 Elastics ...

  4. canal同步mysql到es

    canal 主要用途 是基于 **MySQL 数据库增量日志解析**,提供**增量数据订阅和消费**. 可以简单地把canal理解为一个用来**同步增量数据的一个工具**. 工作原理 canal的工作 ...

  5. canal同步mysql数据到rocketmq集群

    rockermq多主多从异步复制部署参考 canal github 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更.从 2010 ...

  6. ES MQ canal同步mysql

    转载来源:https://juejin.cn/post/6844904073213247496 大约两年以前,笔者在一个项目中遇到了数据同步的难题. 当时,系统部署了几十个实例,分为1个中心平台和N个 ...

  7. canal同步mysql数据

    canal简介 canal官网文档 canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 早期阿里巴巴因为杭州和美国双机房部署, ...

  8. canal解析mysql日志异常_利用Canal解析mysql binlog日志

    一.安装包下载(canal.deployer-x.x.x.tar.gz  官方建议使用1.0.22版本) 二.解压文件 tar -zxvf canal.deployer-1.0.22.tar.gz - ...

  9. 全网最详细、最简单 canal同步mysql 监听单实例、监听多实例 配置

    修改涉及的文件 deployer /conf/canal.properties /conf/example/instance.properties canal-adapter /conf/applic ...

最新文章

  1. 网站服务器的解决方案有,Web网站服务器DDOS攻击的解决方案
  2. webpack4打包html,html-webpack-plugin详解
  3. Tomcat线程连接池参数优化
  4. android Activity布局初步(二)- 嵌套布局
  5. 外星人进化_深层分析宇宙常数对生命形成进化的影响,外星人或许根本就“不是人”!...
  6. 计算机网络部分(共44题),全国自学考试自考04741计算机网络原理4月考试真题
  7. javascript --- [jsonp] script标签的妙用(绕过同源限制)
  8. easyui关机图标_如何在Windows 10中创建关机图标
  9. SpringBoot报错couldn‘t check if tables are already present using metadata:
  10. 使用CSE轻松实现接口访问控制
  11. OpenGL基础25:多光源(附简单GLSL配置)
  12. 最主流的Java后台开发框架
  13. lsass.exe和smss.exe病毒专杀工具——即磁碟机病毒专杀工具(转载)
  14. matlab的罗马数字怎么写好看图解,【我想知道1—100的罗马数字怎样写啊就是ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪ这些等】作业帮...
  15. 蛋白质结构预测大赛top1方案分享
  16. keras input 层
  17. 匠心独运解读Mybatis源码,纯手工打造开源框架
  18. 阿里巴巴首次揭秘电商知识图谱AliCoCo!淘宝搜索原来这样玩!
  19. python三重积分_(整理)三重积分及其计算和多重积分72254.
  20. 时光金科php_发布中国首个社区敬老宣言,共敬美好岁月-金科全国首届重阳敬老节温暖落幕...

热门文章

  1. Java环形队列(史上最易懂)
  2. WinDebug相关
  3. (专升本)信息安全(电子商务概述、电子政务安全)
  4. 尴尬的一代(写给1987—1990年出生的同学,生活在80后和90后夹缝中的一代)
  5. mac windows系统安装mysql, InnoDB: File .\ib_logfile101: 'aio write' returned OS error 187. Cannot cont,
  6. 牛客23054 华华开始学信息学 树状数组分块
  7. android数据库插件,AndroidStudio中查看SQLite数据库插件
  8. android学习之屏幕解锁
  9. 中科院院士起诉原北大博士后侵权:被其恶意诋毁学术不端
  10. 建设工程法规专科【1】