canal数据同步工具
前言
使用canal之前要在linux中和windows中都安装数据库,linux安装数据库见博文
https://blog.csdn.net/weixin_45031570/article/details/125032004?csdn_share_tail=%7B%22type%22%3A%22blog%22%2C%22rType%22%3A%22article%22%2C%22rId%22%3A%22125032004%22%2C%22source%22%3A%22weixin_45031570%22%7D&ctrtid=tIbHy
一、mysql开启lob_bin
登录mysql
mysql -u root -p
加上你的密码
或者
用图形工具打开(前提开放端口)
检查是否开启
show variables like 'log_bin
在linux数据库
中检查是否开启 ,没有开启则继续向下浏览,否则进行(二、canal准备)
on
就是开启
如果没有开启
1.查看官方文档:
https://github.com/alibaba/canal/wiki/QuickStart
2.修改配置 vim /etc/my.cnf
log-bin=mysql-bin #binlog文件名
binlog_format=ROW #选择row模式
server_id=1 #mysql实例id,不能和canal的slaveId重复
3.重启
systemctl restart mysql.service
4.检查是否开启
show variables like 'log_bin
二、canal准备
下载
wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz
解压
tar -zxvf 文件名
修改配置文件
从 cd conf/example/
找到instance.properties
修改ip和端口
canal.instance.master.address=192.168.182.130:3306
地址为linux地址
修改用户和密码
canal.instance.dbUsername=xxxx和 canal.instance.dbPassword=xxxx
意思是可以远程连接的用户
和密码
修改同步表,看你选择哪个
canal.instance.filter.regex=.*\\..* #所有表都同步
# table black regex
canal.instance.filter.black.regex=a数据库.b表 #某个表同步(a数据库.b表)
启动canal
打开目录 cd bin/
启动命令./startup.sh
查看是否启动
ps -ef | grep canal
三、建工程
引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>commons-dbutils</groupId><artifactId>commons-dbutils</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId></dependency>
配置yml
server:port: 10001
spring:application:name: canal-clientprofiles:active: devdatasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/xxxxx?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=trueusername: xxxpassword: xxx
写组件
固定写法(Java直接复制代码即可)
其中端口号是11111是固定的,ip改成linux中的
,密码和账号改成之前**instance.properties**
中配置的用户和密码
package com.merchen.client;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import javax.sql.DataSource;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;@Component
public class CanalClient {//sql队列private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();@Resourceprivate DataSource dataSource;/*** canal入库方法*/public void run() {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.182.130",11111), "example", "", "");int batchSize = 1000;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();try {while (true) {//尝试从master那边拉去数据batchSize条记录,有多少取多少Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {Thread.sleep(1000);} else {dataHandle(message.getEntries());}connector.ack(batchId);//当队列里面堆积的sql大于一定数值的时候就模拟执行if (SQL_QUEUE.size() >= 1) {executeQueueSql();}}} catch (InterruptedException e) {e.printStackTrace();} catch (InvalidProtocolBufferException e) {e.printStackTrace();}} finally {connector.disconnect();}}/*** 模拟执行队列里面的sql语句*/public void executeQueueSql() {int size = SQL_QUEUE.size();for (int i = 0; i < size; i++) {String sql = SQL_QUEUE.poll();System.out.println("[sql]----> " + sql);this.execute(sql.toString());}}/*** 数据处理** @param entrys*/private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {for (Entry entry : entrys) {if (EntryType.ROWDATA == entry.getEntryType()) {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());EventType eventType = rowChange.getEventType();if (eventType == EventType.DELETE) {saveDeleteSql(entry);} else if (eventType == EventType.UPDATE) {saveUpdateSql(entry);} else if (eventType == EventType.INSERT) {saveInsertSql(entry);}}}}/*** 保存更新语句** @param entry*/private void saveUpdateSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> newColumnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");for (int i = 0; i < newColumnList.size(); i++) {sql.append(" " + newColumnList.get(i).getName()+ " = '" + newColumnList.get(i).getValue() + "'");if (i != newColumnList.size() - 1) {sql.append(",");}}sql.append(" where ");List<Column> oldColumnList = rowData.getBeforeColumnsList();for (Column column : oldColumnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存删除语句** @param entry*/private void saveDeleteSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> columnList = rowData.getBeforeColumnsList();StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");for (Column column : columnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存插入语句** @param entry*/private void saveInsertSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> columnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");for (int i = 0; i < columnList.size(); i++) {sql.append(columnList.get(i).getName());if (i != columnList.size() - 1) {sql.append(",");}}sql.append(") VALUES (");for (int i = 0; i < columnList.size(); i++) {sql.append("'" + columnList.get(i).getValue() + "'");if (i != columnList.size() - 1) {sql.append(",");}}sql.append(")");SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 入库* @param sql*/public void execute(String sql) {Connection con = null;try {if(null == sql) return;con = dataSource.getConnection();QueryRunner qr = new QueryRunner();int row = qr.execute(con, sql);System.out.println("update: "+ row);} catch (SQLException e) {e.printStackTrace();} finally {DbUtils.closeQuietly(con);}}
}
写启动类
@SpringBootApplication
public class CanalMain10001 implements CommandLineRunner {@Autowiredprivate CanalClient canalClient;public static void main(String[] args) {SpringApplication.run(CanalMain10001.class, args);}@Overridepublic void run(String... args) throws Exception {//项目启动执行 canal客户监听canalClient.run();}
}
测试
当对linux中的mysql进行crud,本地端口的mysql就会进行同步操作
四、问题
java代码显示拒绝连接
查看linux中的canal日志
canal.log和example下的example.log日志,查看报错是否有 allowmemary 内存分配错误等字样是就是改虚拟机内存,增加值
启动不报错,更新表报错
可能是数据库版本不匹配
2边的数据库最好版本一样
canal数据同步工具相关推荐
- canal - 数据同步工具
一.应用场景 在前面Echarts - 实现图表显示中,我们使用了服务调用(统计表中的信息通过调用用户模块服务来获取)获取统计信息,这样耦合度高,效率相对较低,目前有另一种方法,通过实时同步数据库表的 ...
- k8s集群下搭建数据同步工具-canal:canal-admin篇
k8s集群下搭建数据同步工具-canal:canal-admin篇 前言 容器化 canal-admin 环境准备 k8s集群创建pod canal-admin 前言 本文使用v1.1.4版本的can ...
- 【硬刚大数据】大数据同步工具之FlinkCDC/Canal/Debezium对比
欢迎关注博客主页:微信搜:import_bigdata,大数据领域硬核原创作者_王知无(import_bigdata)_CSDN博客 欢迎点赞.收藏.留言 ,欢迎留言交流! 本文由[王知无]原创,首发 ...
- 数据同步工具Sqoop
大数据Hadoop之--数据同步工具Sqoop Sqoop基本原理及常用方法 1 概述 Apache Sqoop(SQL-to-Hadoop)项目旨在协助RDBMS(Relational Databa ...
- 数据同步工具的研究(实时)
数据同步工具的研究(实时同步): FlinkCDC.Canal.Maxwell.Debezium --2023年01月17日 --Yahui Di 1. 常用CDC方案比较 2. FlinkCDC F ...
- springboot实现增量备份_SpringBoot canal数据同步解决方案
SpringBoot canal数据同步解决方案 一.需求 微服务多数据库情况下可以使用canal替代触发器,canal是应阿里巴巴跨机房同步的业务需求而提出的,canal基于数据库的日志解析,获取变 ...
- Linux的rsync远程数据同步工具
Rsync(remote synchronize) 是一个远程数据同步工具,可以使用"Rsync算法"同步本地和远程主机之间的文件. rsync的好处是只同步两个文件不同的部分,相 ...
- ETL的数据同步工具调研(持续更新中)
扯白了,数据同步工具就是"导数据 "的 名称 社区响应 国内使用情况(以前程无忧为参考) SQOOP 更新缓慢,对于hbase2.x以上版本使用时需要老版本的jar包 9页 Dat ...
- etl数据抽取工具_数据同步工具ETL、ELT傻傻分不清楚?3分钟看懂两者区别
什么是数据同步工具(ETL.ELT) 数据同步工具ETL或者ELT的作用是将业务系统的数据经过抽取.清洗转换之后加载到数据仓库的过程,目的是将企业中的分散.零乱.标准不统一的数据整合到一起,为企业的决 ...
最新文章
- 优质手机APP开发公司的特点
- 高性能mysql主存架构
- javaweb里边的重定向与转发的区别
- mysql5.7.20非安装版_mysql5.7.20\5.7.21免安装版安装配置教程
- java input回车,用java怎样编写加减乘除,从键盘输入,例如:1+2按回车得到
- 8月8日白暨豚宣告灭绝
- 科学技术是对人类历史发展和现代国家兴亡起决定作用的一种力量
- react-native 适配问题
- Sonarlint代码规范改造实践及一些想法
- ARP协议-路由交换原理5-【HCNA笔记】
- 百度文库免费下载,精选六种方法!
- PLC控制系统设计的基本原则和主要内容
- 直观理解图像的分形维数附matlab实现
- vi不保存退出的命令
- Centos7 raid0
- 制作Excel图表背景
- 【审稿意见】科研菜鸟如何攥写审稿意见?万能模板!!!
- 关于数学计算机手抄报简单的,关于简单的数学手抄报图片大全
- win10 mail删除邮件服务器,win10系统删除Mail应用程序的方法
- 基于用户组织角色权限和资源的五要素