前言

使用canal之前要在linux中和windows中都安装数据库,linux安装数据库见博文

https://blog.csdn.net/weixin_45031570/article/details/125032004?csdn_share_tail=%7B%22type%22%3A%22blog%22%2C%22rType%22%3A%22article%22%2C%22rId%22%3A%22125032004%22%2C%22source%22%3A%22weixin_45031570%22%7D&ctrtid=tIbHy

一、mysql开启lob_bin

登录mysql

mysql -u root -p加上你的密码
或者
用图形工具打开(前提开放端口)

检查是否开启

show variables like 'log_binlinux数据库中检查是否开启 ,没有开启则继续向下浏览,否则进行(二、canal准备)
on就是开启

如果没有开启

1.查看官方文档:

https://github.com/alibaba/canal/wiki/QuickStart

2.修改配置 vim /etc/my.cnf

log-bin=mysql-bin #binlog文件名
binlog_format=ROW #选择row模式
server_id=1 #mysql实例id,不能和canal的slaveId重复

3.重启

systemctl restart mysql.service

4.检查是否开启

show variables like 'log_bin

二、canal准备

下载

wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz

解压

tar -zxvf 文件名

修改配置文件

cd conf/example/找到instance.properties

修改ip和端口

canal.instance.master.address=192.168.182.130:3306 地址为linux地址

修改用户和密码

canal.instance.dbUsername=xxxx和 canal.instance.dbPassword=xxxx意思是可以远程连接的用户密码

修改同步表,看你选择哪个

canal.instance.filter.regex=.*\\..*  #所有表都同步
# table black regex
canal.instance.filter.black.regex=a数据库.b表    #某个表同步(a数据库.b表)

启动canal

打开目录 cd bin/ 启动命令./startup.sh

查看是否启动

ps -ef | grep canal

三、建工程

引入依赖

     <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>commons-dbutils</groupId><artifactId>commons-dbutils</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId></dependency>

配置yml

server:port: 10001
spring:application:name: canal-clientprofiles:active: devdatasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/xxxxx?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=trueusername: xxxpassword: xxx

写组件

固定写法(Java直接复制代码即可)
其中端口号是11111是固定的,ip改成linux中的,密码和账号改成之前**instance.properties**中配置的用户和密码

package com.merchen.client;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import javax.sql.DataSource;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;@Component
public class CanalClient {//sql队列private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();@Resourceprivate DataSource dataSource;/*** canal入库方法*/public void run() {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.182.130",11111), "example", "", "");int batchSize = 1000;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();try {while (true) {//尝试从master那边拉去数据batchSize条记录,有多少取多少Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {Thread.sleep(1000);} else {dataHandle(message.getEntries());}connector.ack(batchId);//当队列里面堆积的sql大于一定数值的时候就模拟执行if (SQL_QUEUE.size() >= 1) {executeQueueSql();}}} catch (InterruptedException e) {e.printStackTrace();} catch (InvalidProtocolBufferException e) {e.printStackTrace();}} finally {connector.disconnect();}}/*** 模拟执行队列里面的sql语句*/public void executeQueueSql() {int size = SQL_QUEUE.size();for (int i = 0; i < size; i++) {String sql = SQL_QUEUE.poll();System.out.println("[sql]----> " + sql);this.execute(sql.toString());}}/*** 数据处理** @param entrys*/private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {for (Entry entry : entrys) {if (EntryType.ROWDATA == entry.getEntryType()) {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());EventType eventType = rowChange.getEventType();if (eventType == EventType.DELETE) {saveDeleteSql(entry);} else if (eventType == EventType.UPDATE) {saveUpdateSql(entry);} else if (eventType == EventType.INSERT) {saveInsertSql(entry);}}}}/*** 保存更新语句** @param entry*/private void saveUpdateSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> newColumnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");for (int i = 0; i < newColumnList.size(); i++) {sql.append(" " + newColumnList.get(i).getName()+ " = '" + newColumnList.get(i).getValue() + "'");if (i != newColumnList.size() - 1) {sql.append(",");}}sql.append(" where ");List<Column> oldColumnList = rowData.getBeforeColumnsList();for (Column column : oldColumnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存删除语句** @param entry*/private void saveDeleteSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> columnList = rowData.getBeforeColumnsList();StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");for (Column column : columnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存插入语句** @param entry*/private void saveInsertSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> columnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");for (int i = 0; i < columnList.size(); i++) {sql.append(columnList.get(i).getName());if (i != columnList.size() - 1) {sql.append(",");}}sql.append(") VALUES (");for (int i = 0; i < columnList.size(); i++) {sql.append("'" + columnList.get(i).getValue() + "'");if (i != columnList.size() - 1) {sql.append(",");}}sql.append(")");SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 入库* @param sql*/public void execute(String sql) {Connection con = null;try {if(null == sql) return;con = dataSource.getConnection();QueryRunner qr = new QueryRunner();int row = qr.execute(con, sql);System.out.println("update: "+ row);} catch (SQLException e) {e.printStackTrace();} finally {DbUtils.closeQuietly(con);}}
}

写启动类

@SpringBootApplication
public class CanalMain10001 implements CommandLineRunner {@Autowiredprivate CanalClient canalClient;public static void main(String[] args) {SpringApplication.run(CanalMain10001.class, args);}@Overridepublic void run(String... args) throws Exception {//项目启动执行 canal客户监听canalClient.run();}
}

测试

当对linux中的mysql进行crud,本地端口的mysql就会进行同步操作

四、问题

java代码显示拒绝连接

查看linux中的canal日志

canal.log和example下的example.log日志,查看报错是否有 allowmemary 内存分配错误等字样是就是改虚拟机内存,增加值

启动不报错,更新表报错

可能是数据库版本不匹配
2边的数据库最好版本一样

canal数据同步工具相关推荐

  1. canal - 数据同步工具

    一.应用场景 在前面Echarts - 实现图表显示中,我们使用了服务调用(统计表中的信息通过调用用户模块服务来获取)获取统计信息,这样耦合度高,效率相对较低,目前有另一种方法,通过实时同步数据库表的 ...

  2. k8s集群下搭建数据同步工具-canal:canal-admin篇

    k8s集群下搭建数据同步工具-canal:canal-admin篇 前言 容器化 canal-admin 环境准备 k8s集群创建pod canal-admin 前言 本文使用v1.1.4版本的can ...

  3. 【硬刚大数据】大数据同步工具之FlinkCDC/Canal/Debezium对比

    欢迎关注博客主页:微信搜:import_bigdata,大数据领域硬核原创作者_王知无(import_bigdata)_CSDN博客 欢迎点赞.收藏.留言 ,欢迎留言交流! 本文由[王知无]原创,首发 ...

  4. 数据同步工具Sqoop

    大数据Hadoop之--数据同步工具Sqoop Sqoop基本原理及常用方法 1 概述 Apache Sqoop(SQL-to-Hadoop)项目旨在协助RDBMS(Relational Databa ...

  5. 数据同步工具的研究(实时)

    数据同步工具的研究(实时同步): FlinkCDC.Canal.Maxwell.Debezium --2023年01月17日 --Yahui Di 1. 常用CDC方案比较 2. FlinkCDC F ...

  6. springboot实现增量备份_SpringBoot canal数据同步解决方案

    SpringBoot canal数据同步解决方案 一.需求 微服务多数据库情况下可以使用canal替代触发器,canal是应阿里巴巴跨机房同步的业务需求而提出的,canal基于数据库的日志解析,获取变 ...

  7. Linux的rsync远程数据同步工具

    Rsync(remote synchronize) 是一个远程数据同步工具,可以使用"Rsync算法"同步本地和远程主机之间的文件. rsync的好处是只同步两个文件不同的部分,相 ...

  8. ETL的数据同步工具调研(持续更新中)

    扯白了,数据同步工具就是"导数据 "的 名称 社区响应 国内使用情况(以前程无忧为参考) SQOOP 更新缓慢,对于hbase2.x以上版本使用时需要老版本的jar包 9页 Dat ...

  9. etl数据抽取工具_数据同步工具ETL、ELT傻傻分不清楚?3分钟看懂两者区别

    什么是数据同步工具(ETL.ELT) 数据同步工具ETL或者ELT的作用是将业务系统的数据经过抽取.清洗转换之后加载到数据仓库的过程,目的是将企业中的分散.零乱.标准不统一的数据整合到一起,为企业的决 ...

最新文章

  1. 优质手机APP开发公司的特点
  2. 高性能mysql主存架构
  3. javaweb里边的重定向与转发的区别
  4. mysql5.7.20非安装版_mysql5.7.20\5.7.21免安装版安装配置教程
  5. java input回车,用java怎样编写加减乘除,从键盘输入,例如:1+2按回车得到
  6. 8月8日白暨豚宣告灭绝
  7. 科学技术是对人类历史发展和现代国家兴亡起决定作用的一种力量
  8. react-native 适配问题
  9. Sonarlint代码规范改造实践及一些想法
  10. ARP协议-路由交换原理5-【HCNA笔记】
  11. 百度文库免费下载,精选六种方法!
  12. PLC控制系统设计的基本原则和主要内容
  13. 直观理解图像的分形维数附matlab实现
  14. vi不保存退出的命令
  15. Centos7 raid0
  16. 制作Excel图表背景
  17. 【审稿意见】科研菜鸟如何攥写审稿意见?万能模板!!!
  18. 关于数学计算机手抄报简单的,关于简单的数学手抄报图片大全
  19. win10 mail删除邮件服务器,win10系统删除Mail应用程序的方法
  20. 基于用户组织角色权限和资源的五要素

热门文章

  1. mac装win10后快捷键失灵
  2. Ajax 二级联动
  3. 安卓高通机型的基带移植 修改 编译的相关 增加信号 支持5G等
  4. ARC142E Pairing Wizards
  5. 基于迁移深度学习的遥感图像场景分类
  6. 恭喜郭霖成为GDE(Google开发者专家)!
  7. 社区拼团小程序,社区拼团电商小程序系统开发解决方案
  8. SDL农场游戏开发 11.总结
  9. jetson tx2内核编译步骤与方法
  10. 赋予角色生命的游戏配音技巧