一、应用场景

在前面Echarts - 实现图表显示中,我们使用了服务调用(统计表中的信息通过调用用户模块服务来获取)获取统计信息,这样耦合度高,效率相对较低,目前有另一种方法,通过实时同步数据库表的方法实现。例如每天统计登录注册人数,我们只需要把用户表同步到统计库中,实现本地统计就可以了,这样效率更高,耦合度更低,canal就是一个数据库同步工具。也可以将mysql数据库中的数据同步到中间件。

canal是阿里巴巴旗下的一款开源项目,纯java开发。基于数据库增量日志解析,提供增量数据订阅与消费,目前主要支持MySQL。

二、Canal环境搭建

1、canal的原理是基于MySQL binlog技术,所以需要开启MySQL的binlog写入功能

show VARIABLES like 'log_bin';

2、如果log_bin没有开启显示OFF,则需要在MySQL的配置文件中添加配置信息后进行MySQL的重启。

log-bin=mysql-bin     #binlog文件名
binlog_format=ROW     #选择row模式
server_id=1           #mysql实例id,不能和canal的slaveId重复

3、安装canal数据同步工具

下载canal工具

下载后解压到安装的目录中

通过以下路径找到并修改配置文件

需要修改这三个地方,分别是本地mysql端口号,用户名与密码以及canal同步的正则表达式。

#需要改成自己的数据库信息
canal.instance.master.address=192.168.0.145:3306
#需要改成自己的数据库用户名与密码
canal.instance.dbUsername=root
canal.instance.dbPassword=root
#需要改成同步的数据库表规则,例如只是同步一下表
#canal.instance.filter.regex=.*..*
#指定某个库的某个表
canal.instance.filter.regex=canal_test.members

4、启动canal数据同步工具

需提前在虚拟机安装jdk

 在文件所在目录下bin目录中有startup.sh启动

三、客户端代码编写

1、创建canal模块

2、引入依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--    mysql    --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>commons-dbutils</groupId><artifactId>commons-dbutils</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId></dependency></dependencies>

3、创建配置文件

# 端口号
server:port: 10000
spring:application:# 应用名name: canal-clientprofiles:active: dev# 数据库连接datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/leader?serverTimezone=GMT%2B8username: rootpassword: root

4、创建canal客户端类,在启动类执行

@Component
public class CanalClient {//sql队列private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();@Resourceprivate DataSource dataSource;/*** canal入库方法*/public void run() {/*** hostname: 虚拟机ip地址* port:canal固定端口号11111* destination:查找虚拟机canal配置文件example* username 与 password 连接数据库*/CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.0.145",11111), "example", "root", "root");int batchSize = 1000;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();try {while (true) {//尝试从master那边拉去数据batchSize条记录,有多少取多少Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {Thread.sleep(1000);} else {dataHandle(message.getEntries());}connector.ack(batchId);//当队列里面堆积的sql大于一定数值的时候就模拟执行if (SQL_QUEUE.size() >= 1) {executeQueueSql();}}} catch (InterruptedException e) {e.printStackTrace();} catch (InvalidProtocolBufferException e) {e.printStackTrace();}} finally {connector.disconnect();}}/*** 模拟执行队列里面的sql语句*/public void executeQueueSql() {int size = SQL_QUEUE.size();for (int i = 0; i < size; i++) {String sql = SQL_QUEUE.poll();System.out.println("[sql]----> " + sql);this.execute(sql.toString());}}/*** 数据处理** @param entrys*/private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {for (Entry entry : entrys) {if (EntryType.ROWDATA == entry.getEntryType()) {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());EventType eventType = rowChange.getEventType();if (eventType == EventType.DELETE) {saveDeleteSql(entry);} else if (eventType == EventType.UPDATE) {saveUpdateSql(entry);} else if (eventType == EventType.INSERT) {saveInsertSql(entry);}}}}/*** 保存更新语句** @param entry*/private void saveUpdateSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> newColumnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");for (int i = 0; i < newColumnList.size(); i++) {sql.append(" " + newColumnList.get(i).getName()+ " = '" + newColumnList.get(i).getValue() + "'");if (i != newColumnList.size() - 1) {sql.append(",");}}sql.append(" where ");List<Column> oldColumnList = rowData.getBeforeColumnsList();for (Column column : oldColumnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存删除语句** @param entry*/private void saveDeleteSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> columnList = rowData.getBeforeColumnsList();StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");for (Column column : columnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存插入语句** @param entry*/private void saveInsertSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> columnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");for (int i = 0; i < columnList.size(); i++) {sql.append(columnList.get(i).getName());if (i != columnList.size() - 1) {sql.append(",");}}sql.append(") VALUES (");for (int i = 0; i < columnList.size(); i++) {sql.append("'" + columnList.get(i).getValue() + "'");if (i != columnList.size() - 1) {sql.append(",");}}sql.append(")");SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 入库** @param sql*/public void execute(String sql) {Connection con = null;try {if(null == sql) return;con = dataSource.getConnection();QueryRunner qr = new QueryRunner();int row = qr.execute(con, sql);System.out.println("update: "+ row);} catch (SQLException e) {e.printStackTrace();} finally {DbUtils.closeQuietly(con);}}
}
@SpringBootApplication
public class CanalApplication implements CommandLineRunner {@Resourceprivate CanalClient canalClient;public static void main(String[] args) {SpringApplication.run(CanalApplication.class, args);}@Overridepublic void run(String... args) throws Exception {canalClient.run();}
}

四、最终测试

启动canal,启动CanalApplication

通过向虚拟机中的表插入一行数据后,查看本地MySQL是否同步更新此数据。

虚拟机数据库中有此信息

本机数据库中也有此信息

测试成功。


以上

canal - 数据同步工具相关推荐

  1. canal数据同步工具

    前言 使用canal之前要在linux中和windows中都安装数据库,linux安装数据库见博文 https://blog.csdn.net/weixin_45031570/article/deta ...

  2. k8s集群下搭建数据同步工具-canal:canal-admin篇

    k8s集群下搭建数据同步工具-canal:canal-admin篇 前言 容器化 canal-admin 环境准备 k8s集群创建pod canal-admin 前言 本文使用v1.1.4版本的can ...

  3. 【硬刚大数据】大数据同步工具之FlinkCDC/Canal/Debezium对比

    欢迎关注博客主页:微信搜:import_bigdata,大数据领域硬核原创作者_王知无(import_bigdata)_CSDN博客 欢迎点赞.收藏.留言 ,欢迎留言交流! 本文由[王知无]原创,首发 ...

  4. 数据同步工具Sqoop

    大数据Hadoop之--数据同步工具Sqoop Sqoop基本原理及常用方法 1 概述 Apache Sqoop(SQL-to-Hadoop)项目旨在协助RDBMS(Relational Databa ...

  5. 数据同步工具的研究(实时)

    数据同步工具的研究(实时同步): FlinkCDC.Canal.Maxwell.Debezium --2023年01月17日 --Yahui Di 1. 常用CDC方案比较 2. FlinkCDC F ...

  6. springboot实现增量备份_SpringBoot canal数据同步解决方案

    SpringBoot canal数据同步解决方案 一.需求 微服务多数据库情况下可以使用canal替代触发器,canal是应阿里巴巴跨机房同步的业务需求而提出的,canal基于数据库的日志解析,获取变 ...

  7. Linux的rsync远程数据同步工具

    Rsync(remote synchronize) 是一个远程数据同步工具,可以使用"Rsync算法"同步本地和远程主机之间的文件. rsync的好处是只同步两个文件不同的部分,相 ...

  8. ETL的数据同步工具调研(持续更新中)

    扯白了,数据同步工具就是"导数据 "的 名称 社区响应 国内使用情况(以前程无忧为参考) SQOOP 更新缓慢,对于hbase2.x以上版本使用时需要老版本的jar包 9页 Dat ...

  9. etl数据抽取工具_数据同步工具ETL、ELT傻傻分不清楚?3分钟看懂两者区别

    什么是数据同步工具(ETL.ELT) 数据同步工具ETL或者ELT的作用是将业务系统的数据经过抽取.清洗转换之后加载到数据仓库的过程,目的是将企业中的分散.零乱.标准不统一的数据整合到一起,为企业的决 ...

最新文章

  1. 第十五届全国大学生智能车竞赛华南赛区成绩与奖项
  2. 编码 data:text/html;c,iOS 用TFHpple抓取GB-2312编码的html页面,页面返回编码错误
  3. 地图漫游功能的具体体现_骏谷科技|数据中心三维可视化管理系统功能亮点
  4. 003 PECompact 2.55
  5. python中if的效率_Python算法效率和增长量级,经典题目回顾
  6. 前端优秀博客网站收集
  7. 求矩形中心点坐标编程c语言,c语言编程序求矩形面积 我是新手,很多不懂,初学...
  8. 网易云推出了一组程序猿の真实写照【文末有彩蛋】
  9. SpringBoot的配置详解application
  10. 【转】蓝牙技术及其系统原理
  11. CSRFGuard工具介绍
  12. ArcGIS Engine 10.2开发环境搭建
  13. 1190. 反转每对括号间的子串 golang反转字符串
  14. git查看之前的提交日志
  15. java http连接es_连接es
  16. MongoDB配置主从同步(二)
  17. c++调用栈库函数_大华 | C/C++ 校招笔试题
  18. 在Windows系统下安装RabbitMQ
  19. wuzhicms内的全局函数--load_class()
  20. JZOJ 4230. 淬炼神体

热门文章

  1. 短域名生成java_腾讯短链接url生成接口/腾讯短网址在线生成/新浪微博短链接生成器的分享...
  2. Fragment中getContext得到的context从哪来?
  3. Revit API之BoundingBoxXYZ的用法和剖面框(Section Box)
  4. 四十种 智能合约 支持平台
  5. python图片切割以及识别图片中的文字
  6. 【NLP】⚠️学不会打我! 半小时学会基本操作 2⚠️词向量模型简介
  7. 词袋模型和词向量模型
  8. 用CreateToolhelp32Snapshot、Process32First、Process32Next枚举进程(BCB)
  9. window.print()手动设置纸张的宽高
  10. 【HTML基础】HTML文字效果标签+超齐全颜色表(可直接复制使用)