有两个数据库,并不是主从关系,但是需要同步某张表,可以通过binlog日志,进行同步,前提是这两个数据库的要同步的表,表名和字段名需要一致。

当前项目连接的数据库(需要同步的数据库):base_project

需要将数据同步到 base_project 的数据库(需要监听的数据库):test

一、下载canal

我整合的是1.1.4版本,所以下载也是下载的1.1.4版本

解压,打开 conf/example/instance.properties 文件

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0# enable gtid use true/false
canal.instance.gtidon=false# 要监听的数据库ip地址和端口号,ip地址用真实ip,不要用localhost或127.0.0.1
canal.instance.master.address=192.168.0.111:3306
# binlog的名称,canalv1.1.5不需要设置日志名称和偏移量,canal会自动识别
canal.instance.master.journal.name=binlog.000189
# 偏移量
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=# table meta tsdb info
canal.instance.tsdb.enable=false
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password,MySQL服务器授权的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex
canal.instance.filter.regex=test.customer,test.fault
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

然后再打开 conf 下的 canal.properties 文件

# 找到下面这一句代码,这句代码默认是注释的,放开注释。如果不放开注释,可能在程序中就接收不到数据库的操作消息
# 如果放开了还是接收不到,可以试着把值调大一点
canal.instance.tsdb.snapshot.interval = 16

二、数据库配置

1、开启binlog

我的MySQL是8.0以上版本的,binlog是默认开启的,如果不知道是否开启的话,执行以下sql,value是ON说明是开启了,OFF是关闭状态,需要开启。

show variables like 'log_bin';

2、创建用户并授权

#创建用户canal,密码为canal,主机地址为192.168.0.111
create user canal@192.168.0.111 identified by 'canal';#SHOW VIEW 查看视图,SELECT 查询,REPLICATION SLAVE、REPLICATION CLIENT 复制,*.* 表示所有库
GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@192.168.0.111;# 刷新
flush privileges;

3、设置binlog的模式为ROW

# 查看binlog的模式,如果value不是ROW,需要设置成ROW
show variables like 'binlog_format';
# 设置ROW
SET SESSION binlog_format = 'ROW';

三、启动canal

1、bin目录下,双击 startup.sh 启动

2、logs/canal 目录下,查看 canal.log,这个样子说明启动成功

3、logs/example 目录下,查看 example.log,这个样子没有报错就没问题


如果有 caching_sha2_password Auth failed 异常,则修改canal用户对应的身份验证插件为 mysql_native_password

java.net.ConnectException: Failed to connect to localhost/127.0.0.1 异常,需要将canal用户的主机localhost或127.0.0.1改为本机ip地址,配置文件的也要改。按照我的配置应该不会出现这个异常。

四、整合canal

1、pom.xml

<!-- 整合canal,监听数据库binlog日志,实现增量同步 -->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version><!-- 去掉guava依赖,否则启动报错 --><exclusions><exclusion><groupId>com.google.guava</groupId><artifactId>guava</artifactId></exclusion></exclusions>
</dependency>
<dependency><groupId>commons-dbutils</groupId><artifactId>commons-dbutils</artifactId><version>1.7</version>
</dependency>

2、yml

spring:datasource:url: jdbc:mysql://127.0.0.1:3306/base_project?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=trueusername: rootpassword: roottype: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Drivermybatis-plus:mapper-locations: classpath:mapper/*/*.xmltype-aliases-package: com.entity.sys,;com.common.baseglobal-config:db-config:id-type: autofield-strategy: NOT_EMPTYdb-type: MYSQLconfiguration:map-underscore-to-camel-case: truecall-setters-on-nulls: truelog-impl: org.apache.ibatis.logging.stdout.StdOutImplcanal-monitor-mysql:#要监听的数据库的主机地址,用具体的ip地址,不要用localhost或127.0.0.1hostname: "192.168.0.111"#canal端口号,这个是固定的:11111port: 11111#这个也是固定的example: "example"#要监听的数据库名和表名,这里我只监听用户表和部门表;指定多个表用逗号隔开#如果是监听数据库的全部表,用:test\\..*tableName: test.sys_user,test.sys_dept

3、CanalUtil

import cn.hutool.core.lang.Console;
import cn.hutool.core.util.StrUtil;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import com.mapper.pwjk.SqlMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.List;/*** @author Admin* 监听数据库binlog日志,实现监听的数据库和当前数据库增删改同步*/
@Component
public class CanalUtil {/*** 要监听的数据库的主机地址*/@Value("${canal-monitor-mysql.hostname}")private String canalMonitorHost;/*** canal端口号,这个是固定的用:11111*/@Value("${canal-monitor-mysql.port}")private Integer canalMonitorPort;/*** canal的example,这个值是固定的用:example*/@Value("${canal-monitor-mysql.example}")private String canalExample;/*** 要监听的数据库名和表名*/@Value("${canal-monitor-mysql.tableName}")private String canalMonitorTableName;@Resourceprivate SqlMapper sqlMapper;/*** canal入库方法*/public void run() {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost,canalMonitorPort), canalExample, "", "");int batchSize = 1000;try {connector.connect();Console.log("数据库检测连接成功:" + canalMonitorTableName);connector.subscribe(canalMonitorTableName);connector.rollback();try {while (true) {//尝试从master那边拉去数据batchSize条记录,有多少取多少Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {//每隔一秒监听一次Thread.sleep(1000);} else {dataHandle(message.getEntries());}connector.ack(batchId);}} catch (InterruptedException e) {e.printStackTrace();} catch (InvalidProtocolBufferException e) {e.printStackTrace();}} finally {connector.disconnect();}}/*** 数据处理*/private void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {try {for (CanalEntry.Entry entry : entrys) {if (CanalEntry.EntryType.ROWDATA == entry.getEntryType()) {CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());CanalEntry.EventType eventType = rowChange.getEventType();if (eventType == CanalEntry.EventType.DELETE) {//删除,返回删除的sql语句,然后在mapper中,直接执行这句sqlString sql = saveDeleteSql(entry);if (StrUtil.isNotBlank(sql)){sqlMapper.dynamicsDelete(sql);}} else if (eventType == CanalEntry.EventType.UPDATE) {//更新,返回更新的sql语句,然后在mapper中,直接执行这句sqlString sql = saveUpdateSql(entry);if (StrUtil.isNotBlank(sql)){sqlMapper.dynamicsUpdate(sql);}} else if (eventType == CanalEntry.EventType.INSERT) {//新增,返回新增的sql语句,然后在mapper中,直接执行这句sqlString sql = saveInsertSql(entry);if (StrUtil.isNotBlank(sql)){sqlMapper.dynamicsInsert(sql);}}}}}catch (Exception e){return;}}/*** 保存更新语句*/private String saveUpdateSql(CanalEntry.Entry entry) {try {CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDataList) {List<CanalEntry.Column> newColumnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("update " +  entry.getHeader().getTableName() + " set ");for (int i = 0; i < newColumnList.size(); i++) {if (!newColumnList.get(i).getIsKey()) {sql.append(" " + newColumnList.get(i).getName() + " = '" + newColumnList.get(i).getValue() + "'");if (i != newColumnList.size() - 1) {sql.append(",");}}}sql.append(" where ");List<CanalEntry.Column> oldColumnList = rowData.getBeforeColumnsList();for (CanalEntry.Column column : oldColumnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}return sql.toString();}} catch (InvalidProtocolBufferException e) {return null;}return null;}/*** 保存删除语句*/private String saveDeleteSql(CanalEntry.Entry entry) {try {CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDataList) {List<CanalEntry.Column> columnList = rowData.getBeforeColumnsList();StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");for (CanalEntry.Column column : columnList) {if (column.getIsKey()) {sql.append(column.getName() + "=" + column.getValue());break;}}return sql.toString();}} catch (InvalidProtocolBufferException e) {return null;}return null;}/*** 保存插入语句*/private String saveInsertSql(CanalEntry.Entry entry) {try {CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDataList) {List<CanalEntry.Column> columnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("insert into "+entry.getHeader().getTableName() + " (");for (int i = 0; i < columnList.size(); i++) {sql.append(columnList.get(i).getName());if (i != columnList.size() - 1) {sql.append(",");}}sql.append(") VALUES (");for (int i = 0; i < columnList.size(); i++) {sql.append("'" + columnList.get(i).getValue() + "'");if (i != columnList.size() - 1) {sql.append(",");}}sql.append(")");return sql.toString();}} catch (InvalidProtocolBufferException e) {return null;}return null;}
}

4、启动类

import com.common.util.CanalUtil;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;import javax.annotation.Resource;@SpringBootApplication
@MapperScan({"com.mapper.sys"})
@EnableCaching
public class BaseProjectApplication implements CommandLineRunner {@Resourceprivate CanalUtil canalUtil;public static void main(String[] args) {SpringApplication.run(BaseProjectApplication.class, args);}@Overridepublic void run(String... args){canalUtil.run();}
}

五、增量同步

1、新增 SqlMapper

import org.apache.ibatis.annotations.Param;/*** @author Admin*/
public interface SqlMapper {void dynamicsInsert(@Param("paramSQL") String sql);void dynamicsUpdate(@Param("paramSQL") String sql);void dynamicsDelete(@Param("paramSQL") String sql);
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.mapper.sys.SqlMapper"><insert id="dynamicsInsert">${paramSQL}</insert><update id="dynamicsUpdate">${paramSQL}</update><delete id="dynamicsDelete">${paramSQL}</delete></mapper>

springboot整合canal,监听MySQL binlog日志,实现增量同步相关推荐

  1. maxwell deamon 监听mysql binlog 二进制文件实现数据同步到

    1:首先下载maxwell https://github.com/zendesk/maxwell/releases/download/v1.24.1/maxwell-1.24.1.tar.gz (这玩 ...

  2. 使用canal 监听mysql binlog获取增量数据

    配置mysql sudo vi /etc/my.cnf [mysqld] log-bin=/var/lib/mysql/mysql-bin #开启日志监控 binlog-format=ROW #监控模 ...

  3. Canal监听mysql的binlog日志实现数据同步

    Canal监听mysql的binlog日志实现数据同步 1. canal概述 1.1 canal简介 1.2 技术选型 1.3 原理分析 1.3.1 MySQL主备复制原理 1.3.2 canal原理 ...

  4. Canal监听MySQL

    Canal监听MySQL 1.Mysql数据库开启binlog模式 注意:Mysql容器,此处Mysql版本为5.7 #进入容器 docker exec -it mysql /bin/bash #进入 ...

  5. 【SpringBoot】35、SpringBoot整合Redis监听Key过期事件

    在实际的开发项目中,监听 key 的过期事件,应用非常广泛,例如:订单超时未支付,优惠券过期等等 一.说明 本篇文章是继: [SpringBoot]三十四.SpringBoot整合Redis实现序列化 ...

  6. Java监听mysql的binlog详解(mysql-binlog-connector)

    Java监听mysql的binlog详解(mysql-binlog-connector) 1. 需求概述 2. 技术选型 3. 方案设计 3.环境准备 3.1 查看是否开启binlog 3.2 mys ...

  7. java利用canal监听数据库

    springcloud如何使用canal监听mysql数据库操作 canal是阿里巴巴旗下的一款开源项目,纯Java开发.基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQ ...

  8. spring boot+kafka+canal实现监听MySQL数据库

    spring boot+kafka+canal实现监听MySQL数据库 一.zookeeper安装 kafka依赖于zookeeper,安装kafka前先安装zookeeper 下载地址:Apache ...

  9. 监听mysql表内容变化 使用canal,canal 监听同步指定数据库,所有表

    canal 监听同步指定数据库,所有表 canal 监听同步指定数据库,所有表 因为工作需求,需要用到数据库同步,又从网上找了一些发现都有些问题,所以自己弄好之后写一篇总结,及配置步骤吧 先将 MyS ...

最新文章

  1. python字典中找最小值_从包含元组值的字典中查找最小值和最大值
  2. springboot api版本控制_SpringBoot入门练习
  3. SQL Server优化50法
  4. 移动端H5页面返回并且刷新页面(BFcache)
  5. java 自动加载jar_JAVA 动态(手动)加载jar文件
  6. 一本通 1064:奥运奖牌计数--AC
  7. 杀掉php所有进程,杀死某个用户的所有进程
  8. 组装我的计算机社会实践活动,暑期电脑销售社会实践心得体会
  9. bzoj 1006: [HNOI2008]神奇的国度
  10. american fuzzy lop 介绍
  11. 性能测试概念点分析与过程讲解(一)
  12. 开发必学的验证码,教你从零写一个验证码
  13. 启用windows功能NetFx3时出错,终极方法
  14. 广东工业大学华立学院c语言试题,广东工业大学华立学院考试试卷《高频电子线路》-2015.doc...
  15. linux安装有道词典步骤,Ubuntu 16.04安装有道词典的方法
  16. 离散数学-----自然数系统
  17. usermod -a -G group user修改user用户信息,把user添加到组group中
  18. 阿尔卡特交换机配置_阿尔卡特交换机上常用命令
  19. Daily Scrum Meeting 11.13
  20. Python OpenCV 计算机视觉:1~5

热门文章

  1. 自行车运动模型及其线性化
  2. 2525道菜谱(让老婆学做菜不用到处找菜谱了)~~
  3. Vue使用Echarts绘画地图可视化
  4. 使用Dockerfile定制LNMP环境镜像
  5. aero远程桌面_通过远程桌面连接使用Windows Vista Aero
  6. iOS无处不在详解iOS集成第三方登录(SSO授权登录无需密码)
  7. 国外区块链技术部分应用场景(及其一周内相关资讯汇总)
  8. 知识图谱-你不知道的sparql路径查询
  9. 上海桂隆阀门-用心为您经营每一条管道
  10. 三、51单片机模块介绍