Canal介绍

Canal是阿里巴巴的数据同步工具,最初主要为了应对杭州和美国的双机房部署问题,目前也是国内互联网企业经常使用的数据增量同步解决方案。

原理:

  1. canal将自己伪装为MySQL的slave,向master发送dump协议
  2. master收到dump协议,数据发生修改后推送binary log给canal
  3. canal解析binary log对象,转换为增量数据,同步到ES、Redis等

Canal 安装

  1. MySQL配置

    注:本案例的mysql在windows上,linux环境的配置没有太大区别

    首先要让mysql开启binlog模式

    1) 进入mysql查看是否启动binlog

    SHOW VARIABLES LIKE '%log_bin%'
    

    log_bin为ON表示启动,为OFF则未启动,需要修改mysql配置文件启动log_bin

    windows配置文件是MySQL安装目录的my.ini

    linux在/etc/my.cnf

    修改:

    [mysqld]
    log-bin=mysql-bin
    binlog-format=ROW
    server_id=1
    

    2) 创建用户

    进入mysql,创建canal用户并授权

    create user canal@'%'IDENTIFIED WITH mysql_native_password BY 'canal';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
    
  2. 下载和安装canal

    到官网下载 https://github.com/alibaba/canal/releases

    这里使用的是1.1.4版本

    上传文件到Linux,解压到canal目录中

    cd /usr/local
    mkdir canal
    tar -vxf canal.deployer-1.1.4.tar.gz -C canal
    
  3. 配置Canal

    进入mysql,输入命令,记录文件名和位置

    show master status;
    

    进入canal目录,修改配置文件

    vi conf/example/instance.properties
    

  4. 启动Canal

    进入bin目录启动服务

    ./startup.sh
    

    关闭服务使用 stop.sh

    查看启动日志文件

    cat /usr/local/canal/logs/canal/canal.log
    cat /usr/local/canal/logs/example/example.log
    

    以上效果表示已经运行,如果出现异常可以按日志情况解决

    主要问题总结:

    1. 异常信息 authentication error,数据库账号和密码配置错误
    2. 异常信息 can’t find position,检查配置的文件名和位置,再删除conf/example/meta.dat 重启
    3. 客户端版本兼容问题,canal的版本和客户端的版本要一致

Canal 客户端

官方客户端

1) 引入依赖

<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version>
</dependency>

2) Java代码

package com.blb.canal_demo;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;
import java.util.List;/*** 客户端测试*/
public class ClientTest {public static void main(String args[]) {// 创建canal连接对象CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.223.223",11111), "example", "canal", "canal");try {//连接connector.connect();//订阅所有数据库和表connector.subscribe(".*\\..*");connector.rollback();while (true) {// 获取指定数量的数据Message message = connector.getWithoutAck(1000);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {//没有数据,就休眠1秒try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {//有数据就打印printEntry(message.getEntries());}// 提交确认connector.ack(batchId);}} finally {connector.disconnect();}}private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN ||entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR parse data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {//判断增删改操作if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------> before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------> after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());}}
}

修改了数据库中任意一张表的数据,canal客户端监听到mysql数据的修改

第三方客户端

官方客户端的代码比较繁琐,这里使用了第三方客户端采用SpringBoot整合,使用比较简单

https://github.com/chenqian56131/spring-boot-starter-canal

1) 引入依赖

首先下载该开源项目,安装到本地的maven中,在项目中就可以使用该依赖

<dependency><groupId>com.xpand</groupId><artifactId>starter-canal</artifactId><version>0.0.1-SNAPSHOT</version>
</dependency>

2) 启动类添加注解

@EnableCanalClient

3)配置文件

canal.client.instances.example.host=192.168.223.223
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000

4) 监听器

package com.blb.canal_demo;import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xpand.starter.canal.annotation.CanalEventListener;
import com.xpand.starter.canal.annotation.ListenPoint;/*** 事件监听器*/
@CanalEventListener
public class CanalListener {/*** 监听 erp数据库的customer表*/@ListenPoint(schema = "erp",table = "customer")public void updateData(CanalEntry.EventType eventType, CanalEntry.RowData rowData){System.out.println("修改前");//打印改变之前的数据rowData.getBeforeColumnsList().forEach((c)-> System.out.print(c.getName()+":"+c.getValue()+"\t"));System.out.println("\n修改后");//打印改变之后的数据rowData.getAfterColumnsList().forEach((c)-> System.out.print(c.getName()+":"+c.getValue()+"\t"));}
}

Canal+RabbitMQ实现数据增量同步

实际开发过程中,我们常使用Canal配合RabbitMQ实现MySQL和其它存储系统的增量同步,下面是分布式在线教育系统中实现数据库和Elasticsearch的同步过程

步骤:

  1. 课程微服务对MySQL中的课程数据库课程表进行增删改操作,MySQL发送binlog给Canal
  2. 数据同步微服务通过Canal监听器获得具体的数据,通过RabbitMQ发送给搜索微服务
  3. 搜索微服务监听RabbitMQ消息,对Elasticsearch课程索引进行同步更新

课程表的增删改这里就不介绍了,主要看看同步服务的核心代码

  1. 依赖
<dependency><groupId>com.xpand</groupId><artifactId>starter-canal</artifactId><version>0.0.1-SNAPSHOT</version>
</dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version>
</dependency>
  1. 配置文件
server.port=8701
# canal配置
canal.client.instances.example.host=192.168.223.223
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000
# rabbitMQ配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=myhost
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
  1. MQ配置
/*** RabbitMQ的配置*/
@Slf4j
@Configuration
public class RabbitMQConfig {public static final String QUEUE_COURSE_SAVE = "queue.course.save";public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";public static final String KEY_COURSE_SAVE = "key.course.save";public static final String KEY_COURSE_REMOVE = "key.course.remove";public static final String COURSE_EXCHANGE = "edu.course.exchange";@Beanpublic Queue queueCourseSave() {return new Queue(QUEUE_COURSE_SAVE);}@Beanpublic Queue queueCourseRemove() {return new Queue(QUEUE_COURSE_REMOVE);}@Beanpublic TopicExchange topicExchange() {return new TopicExchange(COURSE_EXCHANGE);}@Beanpublic Binding bindCourseSave() {return BindingBuilder.bind(queueCourseSave()).to(topicExchange()).with(KEY_COURSE_SAVE);}@Beanpublic Binding bindCourseRemove() {return BindingBuilder.bind(queueCourseRemove()).to(topicExchange()).with(KEY_COURSE_REMOVE);}
}
  1. Canal监听器
/*** 课程表数据同步监听器*/
@Slf4j
@CanalEventListener
public class CourseSyncListener {@AutowiredRabbitTemplate rabbitTemplate;/*** 监听课程表的修改*/@ListenPoint(schema = "edu_course",table = "course")public void handleCourseChange(EventType eventType, RowData rowData){log.info("course表操作:{}",eventType);if(eventType == EventType.INSERT || eventType == EventType.UPDATE){//获得修改后的数据Map<String,String> map = new HashMap<>();rowData.getAfterColumnsList().forEach(c -> {map.put(c.getName(),c.getValue());});String json = JSON.toJSONString(map);log.info("保存数据:{}",json);//发送给mq,通知搜索服务进行添加rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,RabbitMQConfig.KEY_COURSE_SAVE, json));}else if(eventType == EventType.DELETE){//获得删除前的idLong[] id = new Long[1];rowData.getBeforeColumnsList().forEach(c -> {if("id".equals(c.getName())){id[0] = Long.valueOf(c.getValue());}});log.info("删除数据:{}",id[0]);//发送给mq,通知搜索服务进行删除rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,RabbitMQConfig.KEY_COURSE_REMOVE, Long.valueOf(id[0]));}else{log.info("不支持其它操作");}}
}

搜索服务的消息监听

@Slf4j
@Component
public class CourseMQListener {public static final String QUEUE_COURSE_SAVE = "queue.course.save";public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";public static final String KEY_COURSE_SAVE = "key.course.save";public static final String KEY_COURSE_REMOVE = "key.course.remove";public static final String COURSE_EXCHANGE = "course.exchange";@AutowiredICourseService courseService;/*** 监听课程添加和更新操作*/@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = QUEUE_COURSE_SAVE, durable = "true"),exchange = @Exchange(value = COURSE_EXCHANGE,type = ExchangeTypes.TOPIC,ignoreDeclarationExceptions = "true"), key = KEY_COURSE_SAVE)})public void receiveCourseSaveMessage(String json, Channel channel, Message message) throws IOException {log.info("保存课程课程:{}",json);//将消息转为课程,保存到es中Course course = JSON.parseObject(json,Course.class);//保存课程到ElasticSearch中courseService.saveOrUpdate(course);}/*** 监听课程删除操作*/@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),exchange = @Exchange(value = COURSE_EXCHANGE,type = ExchangeTypes.TOPIC,ignoreDeclarationExceptions = "true"), key = KEY_COURSE_REMOVE)})public void receiveCourseDeleteMessage(Long id) {courseService.removeById(id);log.info("课程删除完成:{}",id);}
}

阿里的数据同步神器——Canal相关推荐

  1. 阿里开源数据同步神器DataX异构数据源间数据同步同步MySQL与HDFS相互实战

    Datax 实战使用 继上一篇 阿里开源数据同步神器DataX异构数据源间数据同步基础介绍与快速入门之后的实战篇 1.MySQL-To-HDFS 环境 & 准备说明: 描述: 为了快速搭建测试 ...

  2. mysql获取最好成绩对应数据的其他项_开源数据同步神器——canal

    前言 如今大型的IT系统中,都会使用分布式的方式,同时会有非常多的中间件,如redis.消息队列.大数据存储等,但是实际核心的数据存储依然是存储在数据库,作为使用最广泛的数据库,如何将mysql的数据 ...

  3. 开源数据同步神器——canal

    前言 如今大型的IT系统中,都会使用分布式的方式,同时会有非常多的中间件,如redis.消息队列.大数据存储等,但是实际核心的数据存储依然是存储在数据库,作为使用最广泛的数据库,如何将mysql的数据 ...

  4. 阿里开源数据同步组件Canal

    一.简介 canal是阿里开源的数据同步组件 这个是是git地址 二.使用步骤 1.安装配置mysql 安装一个数据库(这个数据库是被监听的对象,我这里用的是mysql5.7) 创建一个用户专门用于数 ...

  5. k8s集群下搭建数据同步工具-canal:canal-admin篇

    k8s集群下搭建数据同步工具-canal:canal-admin篇 前言 容器化 canal-admin 环境准备 k8s集群创建pod canal-admin 前言 本文使用v1.1.4版本的can ...

  6. 开源oracle同步图形工具,阿里开源数据同步工具--DataX

    阿里开源数据同步工具--DataX 是啥?: 是异构数据源离线同步工具 能干啥?: 能够将MySQL sqlServer Oracle Hive HBase  FTP 之间进行稳定高效的数据同步. 设 ...

  7. 离线数据同步神器:DataX,支持几乎所有异构数据源的离线同步到MaxCompute

    2019独角兽企业重金招聘Python工程师标准>>> 摘要: 概述 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.Oracle.SqlSer ...

  8. 大数据同步工具Canal

    目录 1 什么是canal 2 canal能做什么 3 如何搭建canal 3.1 首先有一个MySQL服务器 3.2 安装canal 4 Java客户端操作 5 总结 6 ClientAdapter ...

  9. datax 定时执行多个job_数据同步神器Datax源码重构

    每日一句永远不要认为我们可以逃避, 我们的每一步都决定着最后的结局, 我们的脚步正在走向我们自己选定的终点.Do not ever think about that we can escape , o ...

  10. 数据同步神器Canel-day01

    背景 关于数据同步的方式有很多种,现在有一个场景需要将mysql数据库的数据主动同步到我们的工程中,并且能再mysql数据库客户端更改某一行的数据也能将数据同步到另一个数据库或者工程中,对于这种场景的 ...

最新文章

  1. centos7.9 配置nginx实现前后端分离
  2. 使用 Rx 中预定义的 Subject
  3. 【Zookeeper】源码分析之持久化(三)之FileTxnSnapLog
  4. abap 一些小知识点的总结
  5. 追踪社保基金操盘者的足迹-补充
  6. 《自然》年度十大人物:天才曹原居首 贺建奎来去匆匆
  7. oracle脚本导入mysql数据库_oracle脚本导入mysql数据库
  8. popWindow 根据内容计算高度
  9. matlab 系统辨识,系统辨识的Matlab实现方法(手把手)..docx
  10. WinRAR加密压缩冒充GlobeImposter勒索病毒 安全专家轻松解密
  11. javaScript编码爱心表白
  12. logistic人口模型python代码_人口模型(马尔萨斯--vs--logistic).ppt
  13. 《离散数学及其应用》阅读感想(转载)
  14. OpenKG 祝大家 2021 新年快乐 —「2020 精选文章汇编」
  15. MySQL性能调优-使用ROLLUP代替UNION ALL
  16. MATLAB算法实战应用案例精讲-【人工智能】语义分割(补充篇)(附matlab代码实现)
  17. Python月份格式转化
  18. 解决npm一直停在“checking installable status“的问题
  19. MySQL 索引相关知识
  20. Python实战项目:高血压检测项目调查问卷接口的测试

热门文章

  1. 用php打印出日历_PHP完成一个日历
  2. 点餐小程序源代码|餐饮小程序源码PHP全开源开发
  3. 计算机桌面推流,OBS推流PPT电脑桌面投屏
  4. java 保留原始顺序的有序map的新思路:List<Map.Entry<?,?>>
  5. 恢复Windows默认文件资源管理器(Files设置后,恢复亲测可用)
  6. ideaIU-2020.3.2安装教程以及导入第一个spring boot项目运行和环境配置教程
  7. Postman 安装
  8. termux安装mongodb
  9. weblogic安装以及异常解决方法【转】
  10. Informatic学习总结_day02_增量抽取