1、What is Canal?

canal [kə'næl],中文翻译为 水道/管道/沟渠/运河,主要用途是用于 MySQL 数据库增量日志数据的订阅、消费和解析,是阿里巴巴开发并开源的,采用Java语言开发;

历史背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房数据同步的业务需求,实现方式主要是基于业务 trigger(触发器) 获取增量变更。从2010年开始,阿里巴巴逐步尝试采用解析数据库日志获取增量变更进行同步,由此衍生出了canal项目;

Github:https://github.com/alibaba/canal

2、工作原理

传统MySQL主从复制工作原理

从上层来看,复制分成三步:

MySQL的主从复制将经过如下步骤:

1、当 master 主服务器上的数据发生改变时,则将其改变写入二进制事件日志文件中;

2、salve 从服务器会在一定时间间隔内对 master 主服务器上的二进制日志进行探测,探测其是否发生过改变,如果探测到 master 主服务器的二进制事件日志发生了改变,则开始一个 I/O Thread 请求 master 二进制事件日志;

3、同时 master 主服务器为每个 I/O Thread 启动一个dump  Thread,用于向其发送二进制事件日志;

4、slave 从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中;

5、salve 从服务器将启动 SQL Thread 从中继日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致;

6、最后 I/O Thread 和 SQL Thread 将进入睡眠状态,等待下一次被唤醒;

canal 工作原理

1、canal 模拟 MySQL slave 的交互协议,把自己伪装为 MySQL slave,向 MySQL master 发送dump 协议;

2、MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即canal );

3、canal 解析 binary log 对象 (原始数据为byte流)

3、Canal使用场景

Canal是基于MySQL变更日志增量订阅和消费的组件,可以使用在如下一些一些应用场景:

数据库实时备份

业务cache刷新

search build

价格变化等重要业务消息

带业务逻辑的增量数据处理

跨数据库的数据备份(异构数据同步),

例如mysql => oracle,mysql=>mongo,mysql =>redis,

mysql => elasticsearch等;

当前canal 主要是支持源端 MySQL(也支持mariaDB),版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x;Canal搭建环境

1、准备好MySQL运行环境;

2、开启 MySQL的binlog写入功能,配置 binlog-format 为 ROW 模式,my.cnf中配置如下:

[mysqld]

log-bin=mysql-bin #开启 binlog

binlog-format=ROW #选择 ROW 模式

server_id=1 #配置MySQL replaction需要定义,不要和canal的 slaveId重复

3、授权canal连接MySQL账号具有作为MySQL slave的权限, 如果已有账户可直接 grant授权:

启动MySQL服务器;

登录mysql:./mysql -uroot -p -h127.0.0.1 -P3306

CREATE USER canal IDENTIFIED BY 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

4、下载 canal部署程序

Wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

tar -zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal.deployer-1.1.4

5、配置修改

vim conf/example/instance.properties

主要是修改配置文件中与自己的数据库配置相关的信息;

6、启动Canal

./startup.sh

7、查看进程:

ps -ef | grep canal

8、查看 server 日志

cat logs/canal/canal.log

9、查看 instance 的日志

vi logs/example/example.log

10、关闭Canal

./stop.sh

canal server的默认端口号为:11111,如果需要调整的话,可以去到\conf目录底下的canal.properties文件中进行修改;

相关命令:

#是否启用了日志

show variables like 'log_bin';

#怎样知道当前的日志

show master status;

#查看mysql binlog模式

show variables like 'binlog_format';

#获取binlog文件列表

show binary logs;

#查看当前正在写入的binlog文件

show master status\G

#查看指定binlog文件的内容

show binlog events in 'mysql-bin.000002';

注意binlog日志格式要求为row格式;

Binlog的三种基本类型分别为:

ROW模式 除了记录sql语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,但是会占用较多的空间,需要使用mysqlbinlog工具进行查看;

STATEMENT模式只记录了sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况;

MIX模式比较灵活的记录,例如说当遇到了表结构变更的时候,就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式;

启动了canal的server之后,便是基于java的客户端搭建了;

代码集成方式:

com.alibaba.otter

canal.client

1.1.4

package com.unwulian.search.engine.suggestion.service;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.CanalEntry.*;

import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;

import java.util.List;

/**

* canal测试

*

* @author shiye

* @create 2020-11-30 14:22

*/

public class CanalTest {

public static void main(String[] args) {

String ip = "192.168.2.165";

int port = 11111;

String destination = "example";

String username = "";

String password = "";

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, port), destination, username, password);

try {

connector.connect();

connector.subscribe(".*\\..*");

//跳转到上次进行读取日志的地方

connector.rollback();

while (true) {

//获取指定数量的数据

Message message = connector.getWithoutAck(1);

long id = message.getId();

int size = message.getEntries().size();

if (id == -1 || size == 0) {

//如果没有获取到数据就睡眠疫苗

Thread.sleep(1000);

} else {

System.out.println("messge id:" + id);

printEntry(message.getEntries());

}

//提交确认

connector.ack(id);

// connector.rollback(batchId); // 处理失败, 回滚数据

}

} catch (Exception e) {

e.printStackTrace();

} finally {

connector.disconnect();

}

}

private static void printEntry(List entrys) {

for (Entry entry : entrys) {

if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {

continue;

}

RowChange rowChage = null;

try {

rowChage = RowChange.parseFrom(entry.getStoreValue());

} catch (Exception e) {

throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);

}

EventType eventType = rowChage.getEventType();

System.out.println(String.format("================> binlog日志偏移量[%s:%s] , 库名,表名[%s,%s] , 操作类型 : %s",

entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),

entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),

eventType));

for (RowData rowData : rowChage.getRowDatasList()) {

if (eventType == EventType.DELETE) {

printColumn(rowData.getBeforeColumnsList());

} else if (eventType == EventType.INSERT) {

printColumn(rowData.getAfterColumnsList());

} else {

System.out.println("-------> before");

printColumn(rowData.getBeforeColumnsList());

System.out.println("-------> after");

printColumn(rowData.getAfterColumnsList());

}

}

}

}

private static void printColumn(List columns) {

for (Column column : columns) {

System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());

}

}

}

springboo集成canal# 阿里binlog canal配置

canal:

ip: 192.168.2.13   #192.168.2.165

subscribe: undev.t_bas_xxx1,undev.t_bas_xxx2#配置你要监听的表

port: 11111

destination: dev

username:

password:

package com.unwulian.search.engine.suggestion.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

import org.springframework.context.annotation.Configuration;

import java.io.Serializable;

/**

* binlog canal的配置

*

* @author shiye

* @create 2020-07-17 19:30

*/

@Configuration

@ConfigurationProperties(prefix = "canal")

public class CanalConfig implements Serializable {

/**

* ip

*/

private String ip;

/**

* mq监听表

*/

private String subscribe;

/**

* 端口

*/

private int port;

/**

* 目的地

*/

private String destination;

/**

* 用户名

*/

private String username = "";

/**

* 密码

*/

private String password;

public String getSubscribe() {

return subscribe;

}

public void setSubscribe(String subscribe) {

this.subscribe = subscribe;

}

public String getIp() {

return ip;

}

public void setIp(String ip) {

this.ip = ip;

}

public int getPort() {

return port;

}

public void setPort(int port) {

this.port = port;

}

public String getDestination() {

return destination;

}

public void setDestination(String destination) {

this.destination = destination;

}

public String getUsername() {

return username;

}

public void setUsername(String username) {

this.username = username;

}

public String getPassword() {

return password;

}

public void setPassword(String password) {

this.password = password;

}

}

package com.unwulian.search.engine.suggestion.schedule;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.CanalEntry;

import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;

import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;

import com.alibaba.otter.canal.protocol.Message;

import com.github.structlog4j.ILogger;

import com.github.structlog4j.SLoggerFactory;

import com.unwulian.search.engine.suggestion.config.CanalConfig;

import com.unwulian.search.engine.suggestion.service.CardService;

import com.unwulian.search.engine.suggestion.service.CommunityStructService;

import com.unwulian.search.engine.suggestion.service.HouseService;

import com.unwulian.search.engine.suggestion.service.RoomService;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;

import java.util.List;

/**

* 项目启动的时候就初始化canal启动一个线程去监听canal server

*

* @author shiye

* @create 2020-11-30 15:11

*/

@Component

public class CanalTask implements InitializingBean {

private static final ILogger logger = SLoggerFactory.getLogger(CanalTask.class);

@Autowired

private CanalConfig canalConfig;

@Override

public void afterPropertiesSet() throws Exception {

/**

* 启动一下线程一直监听canal server

*/

new Thread(() -> {

logger.info("start Thread to listent canal success....");

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getIp(), canalConfig.getPort()),

canalConfig.getDestination(),

canalConfig.getUsername(),

canalConfig.getPassword());

connector.connect();

connector.subscribe(canalConfig.getSubscribe());

//跳转到上次进行读取日志的地方

connector.rollback();

try {

while (true) {

//获取指定数量的数据

Message message = connector.getWithoutAck(1);

long id = message.getId();

int size = message.getEntries().size();

if (id == -1 || size == 0) {

//如果没有获取到数据就睡眠1s

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

logger.error("sleep 1000ms error...." + e.getMessage());

}

} else {

//处理消息

//logger.info("messge id:" + id);

handlerEntry(message.getEntries());

}

//提交确认

connector.ack(id);

// connector.rollback(batchId); // 处理失败, 回滚数据

}

} finally {

//关闭

connector.disconnect();

}

}).start();

logger.info("start Thread to listent canal ....");

}

/**

* 处理消息

*

* @param entrys

*/

private void handlerEntry(List entrys) {

for (CanalEntry.Entry entry : entrys) {

if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {

//类型是事务开始事务结束不做处理

continue;

}

//库名

//String databaseName = entry.getHeader().getSchemaName();

//表名

String tableName = entry.getHeader().getTableName();

RowChange rowChage = null;

try {

rowChage = RowChange.parseFrom(entry.getStoreValue());

} catch (Exception e) {

logger.error("ERROR 数据转换异常, data:" + entry.toString(), e);

}

switch (tableName) {

case "t_bas_xxx1":

//进行你的业务处理

break;

case "t_bas_xxx2":

//进行你的业务处理

break;

default:

return;

}

}

}

private static void printColumn(List columns) {

for (CanalEntry.Column column : columns) {

System.out.println(column.getName() + " : " + column.getValue() + "    不做处理=" + column.getUpdated());

}

}

}

阿里mysql 二进制_Mysql binlog 之阿里canal相关推荐

  1. mysql 二进制订阅神器 alibaba 的 Canal

    第 1 章 Canal 入门 1.1 什么是 Canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费. 阿里巴巴 B2B 公司,因 ...

  2. MySQL二进制日志(binlog)开启、查看、截取

    mysql二进制日志记录了数据库所有变更类的操作日志主要有2个作用: MySQL主从配置 数据恢复 参数介绍 server_id=3 log_bin=/data/binlog/mysql-bin: / ...

  3. 阿里mysql笔记_MySQL学习笔记(一)

    MySQL的安装: 前提:因为目前企业流行的操作系统为Linux,所以我们采用CentOS作为我们测试环境,Windows下的安装我们一概不讲.Linux下我们讲两种安装方式.好,看一下我的测试环境: ...

  4. mysql 除号_MySql的运算符-阿里云开发者社区

    数据库中的表结构确立后,表中的数据代表的意义就已经确定.而通过MySQL运算符进行运算,就可以获取到表结构以外的另一种数据.例如,学生表中存在一个birth字段,这个字段表示学生的出生年份.而运用My ...

  5. 阿里云 mysql 日志_MySQL日志简介-阿里云开发者社区

    MySQL中的日志主要分为以下几种: 查询日志 慢查询日志 错误日志 二进制日志 中继日志 事务日志 说明: 支持本文实验使用的linux系统是CentOS7版本,使用的数据库是base源自带的Mar ...

  6. 阿里 mysql中间件_MySQL中间件ProxySQL介绍 -阿里云开发者社区

    ProxySQL作为一款强大的中间件为MySQL的架构提供了有力的支持. 目前可以很好的支持 Master Slave MGR PXC等,并提供连接池.读写分离.日志记录等功能,当然还有很多其他实用功 ...

  7. 光闸mysql同步_mysql跨网域canal数据同步

    需求:高密网和低密网之间的mysql文件落地同步.          分析:解决不同网段之间的数据通讯可以采用光闸或者网闸. 解决mysql之间的同步则可以采用canal.利用canal生成数据库变化 ...

  8. mysql临时开启二进制_关于MySQL二进制日志Binlog的认识

    MySQL的二进制日志可以说或是MySQL最重要的日志了,它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是失误安全型的. 在 ...

  9. 阿里mysql面试二面_阿里面试

    每一个互联网人心中都有一个大厂梦,百度.阿里巴巴.腾讯是很多互联网人梦寐以求的地方,而我也不例外.但是,BAT等一线互联网大厂并不是想进就能够进的,它对人才的技术能力和学历都是有一定要求的,所以除了学 ...

最新文章

  1. elasticsearch学习之路---Linux 下安装并启动elasticsearch
  2. Java 类的生命周期详解
  3. java 文件随机读取_Java 实现文件随机读写-RandomAccessFile
  4. SQL   PL/SQL   SQL*PLUS三者的区别
  5. 套接口学习(一)实现
  6. php裁剪图片白边,php缩略图填充白边的示例代码
  7. dubbo web工程示例_分布式开发-Zooker+dubbo入门-Demo
  8. ES6-改变对象的原型对象
  9. 禅道程序员的10条原则--转载--为了不忘
  10. mysql 输入密码后闪退_iPhone抹除还原后需要输入账号密码怎么办?
  11. Xilisoft iPad Magic Platinum for Mac一键下载在线视频并将其转换为 iPad?
  12. python安装第三方库太慢,很容易失败报错?教你如何提速
  13. IT项目失败的常见原因分析
  14. 数学中的全微分(方程),全导数(公式),偏微分(方程),梯度,斜率,导数,方向导数等
  15. vue 运行启动命令_如何使用Vue启动和运行
  16. 懂车帝与蛋蛋订车两大平台对比
  17. 2.股票入门课(新版)
  18. 删除单号中的符号并查询快递物流信息
  19. Python-opencv的指针检测、表盘识别算法案例分析
  20. 【Linux】进程概念 —— PCB

热门文章

  1. php科学计数法转string,php如何将科学计数法转数字
  2. linux命令帮助怎么看,Linux命令帮助
  3. python生成器迭代_python中的生成器和迭代器
  4. sdk是什么_人脸识别在美颜SDK中存在什么意义?
  5. java 自定义注解_Java注解
  6. ubuntu ip设置
  7. c语言内循环,C语言循环控制语句
  8. python安装vtk_python - 安装VTK for Python - SO中文参考 - www.soinside.com
  9. MySQL笔记——打开日志
  10. Android usb 权限广播,[Android]USB开发