canal同步mysql数据
canal简介
canal官网文档
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
工作原理
MySQL主备复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 - MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
canal安装
Linux安装canal
canal版本列表
- 下载安装包 这里下载1.1.6版本
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
- 解压
tar -zxvf canal.deployer-1.1.6.tar.gz
- 更改配置文件
conf/example/instance.properties
## 监听的mysql主机地址和端口
canal.instance.master.address=127.0.0.1:3306
## 用户名和密码
canal.instance.dbUsername=root
canal.instance.dbPassword=root
## 匹配规则
canal.instance.filter.regex=.*\\..*
- 启动canal
cd /usr/data/lib/canal/bin 进入bin目录
sh startup.sh
标题
docker安装canal
- 拉取canal镜像
docker pull canal/canal-server
- 查看镜像是否拉取成功
docker images
3. 复制canal配置文件
# 创建一个容器
docker run --name canal -d canal/canal-server
# 复制容器中的配置文件到本地
docker cp canal:/home/admin/canal-server/conf/canal.properties /siyi/docker-cancl/conf
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /siyi/docker-cancl/conf
# 停止canal
docker stop canal
# 删除canal容器
docker rm canal
- 运行canal
docker run --name canal -p 11111:11111 -d -v /siyi/docker-cancl/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties -v /siyi/docker-cancl/conf/canal.properties:/home/admin/canal-server/conf/canal.properties canal
使用docker安装canal数据库地址需要注意更改默认127.0.0.1是容器内地址会导致连接不上mysql
canal-client 客户端测试
- 添加maven依赖
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version></dependency>
- 测试类
package com.yunwangl.study.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;
import java.util.List;public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("121.43.165.22",11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe("*.*");connector.rollback();int totalEmptyCount = 1000;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}continue;} else {emptyCount = 0;System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN|| entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------> before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------> after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());}}}
- 测试数据监听
canal同步mysql数据相关推荐
- Elasticsearch7.9集群部署,head插件,canal同步mysql数据到es,亲自测试,无坑
Elasticsearch集群部署 1.服务器规划 10.4.7.11 node1 10.4.7.12 node2 10.4.7.13 node3 1. 集群相关 一个运行中的 Elastics ...
- 使用canal同步MySQL数据到Elasticsearch(ES)
目录 1.功能及使用场景 1.1.功能介绍 1.2.使用场景 2.需求引入 3.canal文件下载及准备 3.1 下载文件 3.2 准备文件 4.deployer安装及效果测试 4.1.deploye ...
- Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB
摘要:本文将介绍如何将 MySQL 中的数据,通过 Binlog + Canal 的形式导入到 Kafka 中,继而被 Flink 消费的案例.内容包括: 背景介绍 环境介绍 部署 TiDB Clus ...
- canal同步mysql数据到rocketmq集群
rockermq多主多从异步复制部署参考 canal github 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更.从 2010 ...
- canal同步mysql到kafka_使用Canal同步MySQL数据到Kafka 得到的数据中sql字段无值-问答-阿里云开发者社区-阿里云...
这个应该跟你的binlog记录模式有关系,binlog有3中模式,ROW(行模式), Statement(语句模式), Mixed(混合模式)三种模式的用法如下: ROW(行模式):记录那条数据修改了 ...
- canal实现mysql数据同步
前言 canal是实现mysql数据备份,异地灾备,异地数据同步等重要的中间件,在实际的业务场景中有着广泛的使用,本文基于小编所在项目中一个异地数据同步的场景为例,通过案例演示下利用canal实现my ...
- 利用Canal全量/增量同步mysql数据至ES
Canal同步mysql数据至ES 1.更改Mysql配置 1.1 开启 Binlog 写入功能 配置 binlog-format 为 ROW 模式,配置my.cnf [mysqld] log-bin ...
- canal实现mysql数据实时同步es
前言 canal是阿里开源的一款用于同步mysql数据到其他数据存储的中间件,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 在搭建mysql服务器主从同步的时候,我们知道,备 ...
- liunx下通过Canal将MySQL数据同步到Elasticsearch
liunx下通过Canal将MySQL数据同步到Elasticsearch 一.canal背景信息 Canal是Github中开源的ETL(Extract Transform Load)软件 cana ...
最新文章
- Linux与Windows文件共享命令 rz,sz
- 在html语言中frame,html Frame、Iframe、Frameset 的区别
- nginx在Centos下的安装,转:http://www.linuxidc.com/Linux/2016-09/134907.htm
- Java编程初学者应该了解的编程框架
- 前端学习(1190):事件修饰符
- JAVA 正则表达式4种常用的功能
- JAVA 反射 动态获取类,并调用方法
- Exchange2013/2016 ECP/OWA无法通过用户验证EventID3002/3005
- 百度AI攻略:货币识别
- js代码实现百度换肤
- NCM格式转换MP3格式
- C#设置点击打开外部exe程序,并判断是否程序已开启,未开启的话打开,已经在运行了就前置
- visual studio code打不开
- 解决jieba分词,切不出分词
- Java——腐烂的橘子
- 旋转变换公式详细推导
- 润盈益通保本型理财产品优势
- Matlab中interp1()和interp2()的用法
- finalize()介绍
- php activemq实例,php操作ActiveMQ - 小周博客,小周个人博客,程序猿小王子,技术博客,个人博客模板,php博客系统,设计模式,wzyl - 黑夜遮不住光亮...
热门文章
- 思维导图PPT有温度的学习工具
- c语言syscall函数,在C中调用syscall函数时出现问题
- 跨境电商独立站站群系统
- 合合信息扫描全能王“照片高清修复”功能上线,3秒还原老照片
- java 1.6.0 14_JDK 1.6.0_14 发布了
- [模式识别].(希腊)西奥多里蒂斯第四版笔记2之__基于贝叶斯决策理论的分类器
- Ubuntu 安装Vizdoom
- android emulator创建avd命令
- 解决Android Emulator 模拟器 Internal shared storage 533MB问题
- 游戏陪玩源码开发,仿某看书app首页Banner轮播+背景渐变