canal简介

canal官网文档

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

工作原理

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 - MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

canal安装

Linux安装canal
canal版本列表

  1. 下载安装包 这里下载1.1.6版本

wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz

  1. 解压

tar -zxvf canal.deployer-1.1.6.tar.gz

  1. 更改配置文件
    conf/example/instance.properties
## 监听的mysql主机地址和端口
canal.instance.master.address=127.0.0.1:3306
## 用户名和密码
canal.instance.dbUsername=root
canal.instance.dbPassword=root
## 匹配规则
canal.instance.filter.regex=.*\\..*
  1. 启动canal

cd /usr/data/lib/canal/bin 进入bin目录
sh startup.sh

标题

docker安装canal

  1. 拉取canal镜像

docker pull canal/canal-server

  1. 查看镜像是否拉取成功

docker images


3. 复制canal配置文件

# 创建一个容器
docker run --name canal -d canal/canal-server
# 复制容器中的配置文件到本地
docker cp canal:/home/admin/canal-server/conf/canal.properties /siyi/docker-cancl/conf
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /siyi/docker-cancl/conf
# 停止canal
docker stop canal
# 删除canal容器
docker rm canal
  1. 运行canal
docker run --name canal -p 11111:11111 -d -v /siyi/docker-cancl/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties -v /siyi/docker-cancl/conf/canal.properties:/home/admin/canal-server/conf/canal.properties canal

使用docker安装canal数据库地址需要注意更改默认127.0.0.1是容器内地址会导致连接不上mysql

canal-client 客户端测试

  1. 添加maven依赖
        <dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version></dependency>
  1. 测试类
package com.yunwangl.study.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;
import java.util.List;public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("121.43.165.22",11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe("*.*");connector.rollback();int totalEmptyCount = 1000;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}continue;} else {emptyCount = 0;System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN|| entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------> before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------> after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}}
  1. 测试数据监听

canal同步mysql数据相关推荐

  1. Elasticsearch7.9集群部署,head插件,canal同步mysql数据到es,亲自测试,无坑

    Elasticsearch集群部署 1.服务器规划 10.4.7.11 node1 10.4.7.12 node2 10.4.7.13 node3 1. 集群相关    一个运行中的 Elastics ...

  2. 使用canal同步MySQL数据到Elasticsearch(ES)

    目录 1.功能及使用场景 1.1.功能介绍 1.2.使用场景 2.需求引入 3.canal文件下载及准备 3.1 下载文件 3.2 准备文件 4.deployer安装及效果测试 4.1.deploye ...

  3. Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB

    摘要:本文将介绍如何将 MySQL 中的数据,通过 Binlog + Canal 的形式导入到 Kafka 中,继而被 Flink 消费的案例.内容包括: 背景介绍 环境介绍 部署 TiDB Clus ...

  4. canal同步mysql数据到rocketmq集群

    rockermq多主多从异步复制部署参考 canal github 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更.从 2010 ...

  5. canal同步mysql到kafka_使用Canal同步MySQL数据到Kafka 得到的数据中sql字段无值-问答-阿里云开发者社区-阿里云...

    这个应该跟你的binlog记录模式有关系,binlog有3中模式,ROW(行模式), Statement(语句模式), Mixed(混合模式)三种模式的用法如下: ROW(行模式):记录那条数据修改了 ...

  6. canal实现mysql数据同步

    前言 canal是实现mysql数据备份,异地灾备,异地数据同步等重要的中间件,在实际的业务场景中有着广泛的使用,本文基于小编所在项目中一个异地数据同步的场景为例,通过案例演示下利用canal实现my ...

  7. 利用Canal全量/增量同步mysql数据至ES

    Canal同步mysql数据至ES 1.更改Mysql配置 1.1 开启 Binlog 写入功能 配置 binlog-format 为 ROW 模式,配置my.cnf [mysqld] log-bin ...

  8. canal实现mysql数据实时同步es

    前言 canal是阿里开源的一款用于同步mysql数据到其他数据存储的中间件,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 在搭建mysql服务器主从同步的时候,我们知道,备 ...

  9. liunx下通过Canal将MySQL数据同步到Elasticsearch

    liunx下通过Canal将MySQL数据同步到Elasticsearch 一.canal背景信息 Canal是Github中开源的ETL(Extract Transform Load)软件 cana ...

最新文章

  1. Linux与Windows文件共享命令 rz,sz
  2. 在html语言中frame,html Frame、Iframe、Frameset 的区别
  3. nginx在Centos下的安装,转:http://www.linuxidc.com/Linux/2016-09/134907.htm
  4. Java编程初学者应该了解的编程框架
  5. 前端学习(1190):事件修饰符
  6. JAVA 正则表达式4种常用的功能
  7. JAVA 反射 动态获取类,并调用方法
  8. Exchange2013/2016 ECP/OWA无法通过用户验证EventID3002/3005
  9. 百度AI攻略:货币识别
  10. js代码实现百度换肤
  11. NCM格式转换MP3格式
  12. C#设置点击打开外部exe程序,并判断是否程序已开启,未开启的话打开,已经在运行了就前置
  13. visual studio code打不开
  14. 解决jieba分词,切不出分词
  15. Java——腐烂的橘子
  16. 旋转变换公式详细推导
  17. 润盈益通保本型理财产品优势
  18. Matlab中interp1()和interp2()的用法
  19. finalize()介绍
  20. php activemq实例,php操作ActiveMQ - 小周博客,小周个人博客,程序猿小王子,技术博客,个人博客模板,php博客系统,设计模式,wzyl - 黑夜遮不住光亮...

热门文章

  1. 思维导图PPT有温度的学习工具
  2. c语言syscall函数,在C中调用syscall函数时出现问题
  3. 跨境电商独立站站群系统
  4. 合合信息扫描全能王“照片高清修复”功能上线,3秒还原老照片
  5. java 1.6.0 14_JDK 1.6.0_14 发布了
  6. [模式识别].(希腊)西奥多里蒂斯第四版笔记2之__基于贝叶斯决策理论的分类器
  7. Ubuntu 安装Vizdoom
  8. android emulator创建avd命令
  9. 解决Android Emulator 模拟器 Internal shared storage 533MB问题
  10. 游戏陪玩源码开发,仿某看书app首页Banner轮播+背景渐变