SpringBoot canal数据同步解决方案

一、需求

微服务多数据库情况下可以使用canal替代触发器,canal是应阿里巴巴跨机房同步的业务需求而提出的,canal基于数据库的日志解析,获取变更进行增量订阅&消费的业务。无论是canal实验需要还是为了增量备份、主从复制和恢复,都是需要开启mysql-binlog日志,数据目录设置到不同的磁盘分区可以降低io等待。

canal 工作原理

canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

canal 解析 binary log 对象(原始为 byte 流)

二、部署环境

1、登录mysql查看是否开启binlog,标红的log_bin默认是OFF关

mysql> show variables like 'log_%';

+----------------------------------------+-------------------------------------------------------+

| Variable_name | Value |

+----------------------------------------+-------------------------------------------------------+

| **log_bin | OFF** |

| log_bin_basename | |

| log_bin_index | |

| log_bin_trust_function_creators | OFF |

| log_bin_use_v1_row_events | OFF |

| log_builtin_as_identified_by_password | OFF |

| log_error | F:\tools\mysql-5.7.28-winx64\Data\DESKTOP-C1LU9IQ.err |

| log_error_verbosity | 3 |

| log_output | FILE |

| log_queries_not_using_indexes | OFF |

| log_slave_updates | OFF |

| log_slow_admin_statements | OFF |

| log_slow_slave_statements | OFF |

| log_statements_unsafe_for_binlog | ON |

| log_syslog | ON |

| log_syslog_tag | |

| log_throttle_queries_not_using_indexes | 0 |

| log_timestamps | UTC |

| log_warnings | 2 |

+----------------------------------------+-------------------------------------------------------+

19 rows in set (0.03 sec)

复制代码

2、编辑配置文件

[mysqld]

# 设置3306端口

port=3306

# 设置mysql的安装目录,按照个人的实际需要改

basedir=F:\\tools\\mysql-5.7.28-winx64 # 切记此处一定要用双斜杠\\,单斜杠我这里会出错,不过看别人的教程,有的是单斜杠。自己尝试吧

# 设置mysql数据库的数据的存放目录

datadir=F:\\tools\\mysql-5.7.28-winx64\\Data # 此处同上

# 允许最大连接数

max_connections=200

# 允许连接失败的次数。这是为了防止有人从该主机试图攻击数据库系统

max_connect_errors=10

# 服务端使用的字符集默认为UTF8

character-set-server=utf8

# 创建新表时将使用的默认存储引擎

default-storage-engine=INNODB

# 默认使用“mysql_native_password”插件认证

default_authentication_plugin=mysql_native_password

lower_case_table_names=2

sql_mode = STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION

max_connections=1000

#实验重点配置

# 开启 binlog

log-bin=mysql-bin

# 选择 ROW 模式

binlog-format=ROW

# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

server_id=1

[mysql]

# 设置mysql客户端默认字符集

default-character-set=utf8

[client]

# 设置mysql客户端连接服务端时默认使用的端口

port=3306

default-character-set=utf8

复制代码

3、创建MySQL slave 的权限canal账户并且进行远程连接授权

CREATE USER canal IDENTIFIED BY 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;

FLUSH PRIVILEGES;

复制代码

4、记得重启mysql服务

Linux:

systemctl restart mysqld

Window:

net stop mysql;

net start mysql;

复制代码

三、canal快速部署配置

1、修改配置conf/example/instance.properties

## mysql serverId

canal.instance.mysql.slaveId = 1234

#position info,需要改成自己的数据库信息

canal.instance.master.address = 127.0.0.1:3306

canal.instance.master.journal.name =

canal.instance.master.position =

canal.instance.master.timestamp =

#canal.instance.standby.address =

#canal.instance.standby.journal.name =

#canal.instance.standby.position =

#canal.instance.standby.timestamp =

#username/password,需要改成自己的数据库信息

canal.instance.dbUsername = canal

canal.instance.dbPassword = canal

canal.instance.defaultDatabaseName =

canal.instance.connectionCharset = UTF-8

#table regex

canal.instance.filter.regex = .\*\\\\..\*

复制代码

2、通过启动脚本运行:sh bin/startup.sh

3、查看 server 日志和instance 的日志

$ tail -f logs/canal/canal.log

2020-05-28 13:52:03.037 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler

2020-05-28 13:52:03.065 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations

2020-05-28 13:52:03.072 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.

2020-05-28 13:52:03.444 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.36.58.25(172.36.58.25):11111]

2020-05-28 13:52:04.604 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

$ tail -f logs/example/example.log

2020-05-28 13:52:04.238 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]

2020-05-28 13:52:04.264 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]

2020-05-28 13:52:04.265 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]

2020-05-28 13:52:04.568 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example

2020-05-28 13:52:04.572 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$

2020-05-28 13:52:04.573 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter :

2020-05-28 13:52:04.577 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....

2020-05-28 13:52:04.616 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position

2020-05-28 13:52:04.616 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status

2020-05-28 13:52:06.556 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=4,serverId=1,gtid=,timestamp=1590644973000] cost : 1935ms , the next step is binlog dump

复制代码

四、初步监听实验

com.alibaba.otter

canal.client

1.1.0

复制代码import java.net.InetSocketAddress;

import java.util.List;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.common.utils.AddressUtils;

import com.alibaba.otter.canal.protocol.Message;

import com.alibaba.otter.canal.protocol.CanalEntry.Column;

import com.alibaba.otter.canal.protocol.CanalEntry.Entry;

import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;

import com.alibaba.otter.canal.protocol.CanalEntry.EventType;

import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;

import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

public class SimpleCanalClientExample {

public static void main(String args[]) {

// 创建链接

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),

11111), "example", "", "");

int batchSize = 1000;

int emptyCount = 0;

try {

connector.connect();

connector.subscribe(".*\\..*");

connector.rollback();

int totalEmptyCount = 120;

while (emptyCount < totalEmptyCount) {

Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据

long batchId = message.getId();

int size = message.getEntries().size();

if (batchId == -1 || size == 0) {

emptyCount++;

System.out.println("empty count : " + emptyCount);

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

}

} else {

emptyCount = 0;

// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);

printEntry(message.getEntries());

}

connector.ack(batchId); // 提交确认

// connector.rollback(batchId); // 处理失败, 回滚数据

}

System.out.println("empty too many times, exit");

} finally {

connector.disconnect();

}

}

private static void printEntry(List entrys) {

for (Entry entry : entrys) {

if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {

continue;

}

RowChange rowChage = null;

try {

rowChage = RowChange.parseFrom(entry.getStoreValue());

} catch (Exception e) {

throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),

e);

}

EventType eventType = rowChage.getEventType();

System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",

entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),

entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),

eventType));

for (RowData rowData : rowChage.getRowDatasList()) {

if (eventType == EventType.DELETE) {

printColumn(rowData.getBeforeColumnsList());

} else if (eventType == EventType.INSERT) {

printColumn(rowData.getAfterColumnsList());

} else {

System.out.println("-------> before");

printColumn(rowData.getBeforeColumnsList());

System.out.println("-------> after");

printColumn(rowData.getAfterColumnsList());

}

}

}

}

private static void printColumn(List columns) {

for (Column column : columns) {

System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());

}

}

}

复制代码

随便插入数据触发

INSERT INTO `demo`.`tb_ad`(`id`, `url`, `status`, `position`, `image`, `start_time`, `end_time`) VALUES (1, 'https://www.baidu.com/', '1', 'web_index_lb', 'https://pics1.baidu.com/feed/c83d70cf3bc79f3d5c30d358deb67a17738b29a6.jpeg?https://kins.oss-cn-shenzhen.aliyuncs.com/yhzb/2020-03-11/ca21b3b17d6f4757b991dd86b8cef3fa-VIP-680.jpeg', '2020-05-22 10:58:08', '2021-06-01 10:58:14');

复制代码

从控制台中看到

empty count : 66

empty count : 67

empty count : 68

empty count : 69

empty count : 70

================> binlog[mysql-bin.000001:355] , name[demo,tb_ad] , eventType : INSERT

id : 2 update=true

url : https://www.baidu.com/ update=true

status : 1 update=true

position : web_index_lb update=true

image : https://pics1.baidu.com/feed/c83d70cf3bc79f3d5c30d358deb67a17738b29a6.jpeg?https://kins.oss-cn-shenzhen.aliyuncs.com/yhzb/2020-03-11/ca21b3b17d6f4757b991dd86b8cef3fa-VIP-680.jpeg update=true

start_time : 2020-05-22 10:58:08 update=true

end_time : 2021-06-01 10:58:14 update=true

复制代码

五、数据监控微服务

top.javatool

canal-spring-boot-starter

1.2.1-RELEASE

复制代码

订阅数据库的增删改操作

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.stereotype.Component;

import top.javatool.canal.client.annotation.CanalTable;

import top.javatool.canal.client.handler.EntryHandler;

@Component

@CanalTable(value = "t_user")

public class UserHandler implements EntryHandler {

private Logger logger = LoggerFactory.getLogger(UserHandler.class);

public void insert(User user) {

logger.info("insert message {}", user);

}

public void update(User before, User after) {

logger.info("update before {} ", before);

logger.info("update after {}", after);

}

public void delete(User user) {

logger.info("delete {}", user);

}

}

复制代码

启动数据监控微服务,修改user表,观察控制台输出。

2020-05-28 16:23:22.667 INFO 24284 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient : 获取消息 Message[id=23,entries=[header {

version: 1

logfileName: "mysql-bin.000001"

logfileOffset: 18380

serverId: 1

serverenCode: "UTF-8"

executeTime: 1590654201000

sourceType: MYSQL

schemaName: ""

tableName: ""

eventLength: 68

}

entryType: TRANSACTIONBEGIN

storeValue: " \025"

, header {

version: 1

logfileName: "mysql-bin.000001"

logfileOffset: 18505

serverId: 1

serverenCode: "UTF-8"

executeTime: 1590654201000

sourceType: MYSQL

schemaName: "demo"

tableName: "t_user"

eventLength: 88

eventType: UPDATE

props {

key: "rowsCount"

value: "1"

}

}

entryType: ROWDATA

storeValue: "\b\210\002\020\002P\000b\370\003\n\033\b\000\020\004\032\002id \001(\0000\000B\00221R\aint(11)\n*\b\001\020\f\032\tuser_name \000(\0000\000B\005ZeldaR\fvarchar(255)\n*\b\002\020\372\377\377\377\377\377\377\377\377\001\032\006gender \000(\0000\000B\0010R\ntinyint(4)\n\"\b\003\020\004\032\ncountry_id \000(\0000\000B\0011R\aint(11)\n&\b\004\020[\032\bbirthday \000(\0000\000B\n1998-04-18R\004date\n7\b\005\020]\032\vcreate_time \000(\0000\000B\0231991-01-10 05:45:50R\ttimestamp\022\033\b\000\020\004\032\002id \001(\0000\000B\00221R\aint(11)\022.\b\001\020\f\032\tuser_name \000(\0010\000B\tZelda1111R\fvarchar(255)\022*\b\002\020\372\377\377\377\377\377\377\377\377\001\032\006gender \000(\0000\000B\0010R\ntinyint(4)\022\"\b\003\020\004\032\ncountry_id \000(\0000\000B\0011R\aint(11)\022&\b\004\020[\032\bbirthday \000(\0000\000B\n1998-04-18R\004date\0227\b\005\020]\032\vcreate_time \000(\0000\000B\0231991-01-10 05:45:50R\ttimestamp"

, header {

version: 1

logfileName: "mysql-bin.000001"

logfileOffset: 18593

serverId: 1

serverenCode: "UTF-8"

executeTime: 1590654201000

sourceType: MYSQL

schemaName: ""

tableName: ""

eventLength: 31

}

entryType: TRANSACTIONEND

storeValue: "\022\0041574"

],raw=false,rawEntries=[]]

2020-05-28 16:23:22.668 INFO 24284 --- [xecute-thread-6] t.j.canal.example.handler.UserHandler : update before User{id=null, userName='Zelda', gender=null, countryId=null, birthday=null, createTime=null}

2020-05-28 16:23:22.668 INFO 24284 --- [xecute-thread-6] t.j.canal.example.handler.UserHandler : update after User{id=21, userName='Zelda1111', gender=0, countryId=1, birthday=Sat Apr 18 00:00:00 CST 1998, createTime=Thu Jan 10 05:45:50 CST 1991}

复制代码

关于找一找教程网

本站文章仅代表作者观点,不代表本站立场,所有文章非营利性免费分享。

本站提供了软件编程、网站开发技术、服务器运维、人工智能等等IT技术文章,希望广大程序员努力学习,让我们用科技改变世界。

[SpringBoot canal数据同步解决方案]http://www.zyiz.net/tech/detail-137645.html

springboot实现增量备份_SpringBoot canal数据同步解决方案相关推荐

  1. springboot实现增量备份_SpringBoot几种定时任务的实现方式

    定时任务实现的几种方式 Timer:这是java自带的java.util.Timer类,这个类允许你调度一个java.util.TimerTask任务.使用这种方式可以让你的程序按照某一个频度执行,但 ...

  2. canal - 数据同步工具

    一.应用场景 在前面Echarts - 实现图表显示中,我们使用了服务调用(统计表中的信息通过调用用户模块服务来获取)获取统计信息,这样耦合度高,效率相对较低,目前有另一种方法,通过实时同步数据库表的 ...

  3. springboot实现增量备份_增量同步-spring batch(6)动态参数绑定与增量同步

    tags:springbatch 1.引言 上一篇<便捷的数据读写-spring batch(5)结合beetlSql进行数据读写>中使用Spring Batch及BeetlSql,对数据 ...

  4. 光闸mysql同步_mysql跨网域canal数据同步

    需求:高密网和低密网之间的mysql文件落地同步.          分析:解决不同网段之间的数据通讯可以采用光闸或者网闸. 解决mysql之间的同步则可以采用canal.利用canal生成数据库变化 ...

  5. Canal数据同步策略

    缓存数据同步的常见方式有三种: 设置有效期:给缓存设置有效期,到期后自动删除.再次查询时更新 优势:简单.方便 缺点:时效性差,缓存过期之前可能不一致 场景:更新频率较低,时效性要求低的业务 同步双写 ...

  6. Oracle全备增量备份脚本,oracle数据全备份与增量备份脚本

    1.脚本名称是rman_bk_0.sh 此脚本是数据库全备脚本 设定一周执行一次 set ORACLE_BASE=/oracle (设定 oracle的 安装基目录) set ORACLE_HOME= ...

  7. Oracle数据同步解决方案之databus

    [list][*][b]概述[/b][/list] 目前了解到基于Oracle的开源数据同步项目有yugong.databus.SymmetricDS,之前尝试了yugong,很容易上手.使用时需要注 ...

  8. redis与mysql数据同步_Redis与MySQL数据同步解决方案

    数据库同步到Redis 我们大多倾向于使用这种方式,也就是将数据库中的变化同步到Redis,这种更加可靠.Redis在这里只是做缓存. 方案1 做缓存,就要遵循缓存的语义规定: 读:读缓存redis, ...

  9. 两台SQL Server数据同步解决方案

    复制的概念 复制是将一组数据从一个数据源拷贝到多个数据源的技术,是将一份数据发布到多个存储站点上的有效方式.使用复制技术,用户可以将一份数据发布到多台服务器上,从而使不同的服务器用户都可以在权限的许可 ...

最新文章

  1. DB2的一些精品文章地址
  2. sat2 计算机科目,2019-2020年SAT2考试时间及Top100大学要求
  3. 《循序渐进学Spark》一1.7 本章小结
  4. 双目立体视觉建立深度图_单目视觉深度估计测距的前生今世
  5. SVN工具将本地代码导入SVN资源库
  6. stm32实验报告心得体会_STM32单片机红外遥控MP3实验报告
  7. 视频号算法推荐机制! 微信视频号怎么上热门?
  8. linux vim命令详解 编辑文件 保存 退出
  9. windows10 安装office2021(预装正版)经验分享
  10. Ubuntu linux下运行xv6
  11. Python类与对象最全总结大全(类、实例、属性方法、继承、派生、多态、内建函数)
  12. Word 如何更新全文或某一段的field(域),如何更新参考文献或自动编号
  13. 【Base】ping-pong buffer
  14. Windows Server 2008 R2 SP1升级补丁
  15. 区块链的数据结构(一)——区块、链
  16. 深度图像转点云数据(激光雷达数据)
  17. 前景与挑战并存 餐饮信息化当突围
  18. ARM Cortex A7 架构简介
  19. 基于Ardupilot/PX4固件,APM/PIXhawk硬件的VTOL垂直起降固定翼软硬件参数调试(第二篇)软硬件参数调试
  20. android智能家居ppt,智能家居介绍.ppt

热门文章

  1. Vue3 的学习教程汇总、源码解释项目、支持的 UI 组件库、优质实战项目
  2. spring boot 整合security
  3. python django model定义
  4. kafka可视化客户端工具(Kafka Tool)的基本使用
  5. veth-pair技术在docker中的应用(docker网络通信)及tomcat Dockerfile示例
  6. scala伴生类和半生对象详解
  7. 为什么阿里巴巴Java开发手册中不允许用Executors去创建线程池?
  8. Spring Cloud Stream整合RabbitMQ
  9. 阿里OSS图片存储java代码示例
  10. Java实现话术词槽匹配_知识图谱与KBQA——槽填充