需求:高密网和低密网之间的mysql文件落地同步。          分析:解决不同网段之间的数据通讯可以采用光闸或者网闸。

解决mysql之间的同步则可以采用canal。利用canal生成数据库变化的sql落地成相应文件,让其被交换至相对网段的某个目录下。canal客户端读取该目录下的文件,执行目录下文件的sql完成数据同步。

1. canal简介:

canal是阿里巴巴的基于数据库增量日志解析,提供增量数据订阅&消费的一个开源项目。目前主要支持mysql、oracle数据库。

canal的工作原理:

原理相对比较简单:

canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议。

mysql master收到dump请求,开始推送binary log给slave(也就是canal)。

canal解析binary log对象(原始为byte流

2.canal的配置  canal分为canal客户端和服务端,canal服务端由阿里提供;

1.下载canal服务端 https://github.com/alibaba/canal/releases。   2.解压

3.配置canal-server。

4.开启mysql的binlog写入功能,建议配置binlog模式为row,修改数据库配置文件my.ini,Linux下为my.cnf。

[mysqld]

log-bin=mysql-bin

binlog-format=ROW

server_id=1

skip-name-resolve

expire_logs_days = 10

解释:

log-bin=mysql-bin  #开启binlog

binlog-format=ROW  #选择row模式

server_id=1  #配置mysql replaction需要定义,不能和canal的slaveId重复

skip-name-resolve  #默认安装的MySql开启了DNS的反向解析。如果禁用的话就不能在MySQL的授权表中使用主机名了而只能用ip格式,防止用127.0.0.1登录,mysql对ip反向解析后用localhost登录出现权限不足,拒绝登录的错误。

expire_logs_days = x  #二进制日志自动删除的天数。默认值为0,表示“没有自动删除”

修改完成后重启MySql服务,执行以下语句查看是否开启binlog写入功能。

SHOW VARIABLES LIKE "log_%";

如下图所示,若log_bin为ON,则binlog写入功能已打开。

5.为数据库添加canal用户,开启做为mysql slave的相关权限,执行以下语句。

CREATE USER canal IDENTIFIED BY 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION

CLIENT ON *.* TO 'canal'@'%';

-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;

FLUSH PRIVILEGES;

执行完成后,执行SELECT * FROM mysql.user;查询是否添加成功,如下图所示。

6.修改配置文件instance.properties,位于canal-server/conf/example/instance.properties,红色字体的部分需要修改。

说明:canal.instance.connectionCharset代表数据库的编码方式对应到java中的编码类型,比如UTF-8,GBK

, ISO-8859-1。

7.启动canal服务器端

执行canal-server\bin下的启动脚本,windows下为startup.bat,Linux为startup.sh。windows下正常启动页面如下:

8.启动后查看日志, canal-server\logs\canal\canal.log

具体instance的日志canal-server\logs\example \example.log

以上表示正常启动。

3.canal-client  参考https://github.com/alibaba/canal/wiki/ClientExample创建canal客户端:

package com.shu.hamal.canal.instance;

import java.io.File;

import java.io.FileNotFoundException;

import java.io.FileOutputStream;

import java.io.IOException;

import java.io.UnsupportedEncodingException;

import java.net.InetSocketAddress;

import java.util.List;

import java.util.Vector;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.CanalEntry.Entry;

import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;

import com.alibaba.otter.canal.protocol.CanalEntry.EventType;

import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;

import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

import com.alibaba.otter.canal.protocol.Message;

import com.shu.hamal.canal.common.CanalUtility;

import com.shu.hamal.canal.common.ConfigEntity;

import com.shu.hamal.canal.common.FileIndexUtil;

import com.shu.hamal.canal.common.InitConfig;

/**

*

* 生成SQL文件线程

*

* @author shu.xiaobai

*/

public class CanalClient implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(CanalClient.class);

// private Vector obj;

public CanalClient(Vector v) {

// this.obj = v;

}

public void run() {

// synchronized (obj) {

// 创建链接

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(CanalUtility.INET_SOCKET_ADDRESS,

CanalUtility.INET_SOCKET_PORT), CanalUtility.CANAL_DESTINATION, CanalUtility.CANAL_USERNAME, CanalUtility.CANAL_PASSWORD);

int batchSize = 1000;

try {

long now = System.currentTimeMillis();

while (true) {

try {

connector.connect();

connector.subscribe(".*\\..*");

connector.rollback();

LOGGER.info("连接canal服务端成功...");

break;

} catch (Exception e) {

LOGGER.error("连接canal服务端失败...10s后尝试下一次连接...");

Thread.sleep(10000);

}

if ((System.currentTimeMillis() - now) > 1000 * 60 * 10) {

LOGGER.error("10min中内连接canal服务端失败...程序退出...请启动canal服务端后重启客户端...");

System.exit(-1);

}

}

LOGGER.info("canal client is running...");

while (true) {

// if (obj.size() != 0) {

// obj.wait();

// }

Message message = connector.getWithoutAck(batchSize);

long batchId = message.getId();

int size = message.getEntries().size();

if (batchId == -1 || size == 0) {

LOGGER.debug("canal client is running...");

// obj.add(new String("canal"));

// obj.notify();

Thread.sleep(1000);

} else {

try {

printEntry(message.getEntries());

} catch (Exception e) {

LOGGER.error("PrintEntry Exception" + e);

}

}

connector.ack(batchId);

}

} catch (Exception e) {

LOGGER.error("线程休眠或唤醒异常:" + e);

} finally {

connector.disconnect();

}

// }

}

private void printEntry(List entrys) {

for (Entry entry : entrys) {

if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {

continue;

}

RowChange rowChage = null;

try {

rowChage = RowChange.parseFrom(entry.getStoreValue());

} catch (Exception e) {

throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);

}

for (RowData rowData : rowChage.getRowDatasList()) {

String sql = CanalUtility.constructSql(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), entry.getHeader()

.getTableName(), rowChage.getEventType());

String canalSql = CanalUtility.constructCanalSql(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), entry.getHeader()

.getTableName(), rowChage.getEventType());

// 将语句拼接写入文件

exportSql2File(canalSql, sql, rowChage.getEventType(), entry.getHeader().getTableName(), entry.getHeader().getSchemaName());

}

}

}

/**

* 导出sql到文件

*

* @param sql

*/

private void exportSql2File(String canalSql, String sql, EventType eventType, String tableName, String databaseName) {

ConfigEntity config = InitConfig.configMap.get(databaseName);

if (config != null) {

StringBuffer sb = new StringBuffer();

sb.append(databaseName);

sb.append(CanalUtility.minus);

sb.append(tableName);

sb.append(CanalUtility.minus);

sb.append(eventType);

sb.append(CanalUtility.minus);

sb.append(canalSql);

sb.append(CanalUtility.minus);

sb.append(sql);

if (InitConfig.listMap.get(databaseName).contains(sql)) {

InitConfig.listMap.get(databaseName).remove(sql);

return;

}

FileOutputStream fos = null;

File file = new File(config.getCanalSqlDirectory().replace("/", File.separator) + File.separator + databaseName

+ CanalUtility.CANAL_FILENAME_PREFIX + FileIndexUtil.readClientIndex(config.getClientFile()) + CanalUtility.CANAL_FILENAME_SUFFIX);

try {

if (file.exists()) {

file.delete();

} else {

file.getParentFile().mkdirs();

file.createNewFile();

file.setReadable(false);

}

fos = new FileOutputStream(file);

LOGGER.info("export [" + sb.toString() + "] to " + file.getCanonicalPath());

fos.write(sb.toString().getBytes("UTF-8"));

fos.flush();

file.setReadable(true);

LOGGER.info("SQL语句 [" + sb.toString() + "] 成功写至 " + file.getCanonicalPath());

} catch (FileNotFoundException e) {

LOGGER.error(file.getName() + "不存在:" + e);

} catch (UnsupportedEncodingException e) {

LOGGER.error("字符串转字节数组异常:" + e);

} catch (IOException e) {

LOGGER.error("字节写入文件" + file.getName() + "异常:" + e);

} finally {

if (null != fos)

try {

fos.close();

} catch (IOException e) {

LOGGER.error("流关闭异常:" + e);

}

}

}

}

}

我的完整代码:https://github.com/shuxiaoabi/xiaobaiRepositiry。

注:我的canal客户端构建场景为:canal分为两个线程一个线程用于连接监听canal服务端解析binlog日志生成对应的sql语句落地成文件,该文件会被数据交换交换至跨网的另外一端某个目录下,还有一个线程用于解析交换过来的sql文件并执行该文件使其数据库完成同步(跨网域之间的中间器可采用光闸或者网闸)。

运行

1,运行canal服务端startup.bat / startup.sh

2,运行客户端程序。

这是小白写的第一次分享,如文中有错误欢迎大家指出,大家相互学习共同成长,谢谢。

光闸mysql同步_mysql跨网域canal数据同步相关推荐

  1. 跨时钟域的数据同步,亚稳态,和相关电路设计方法

    目录 同步时钟: 亚稳态: 双锁存器同步电路 单bit信号跨时钟域传播 边沿检测同步电路(慢时钟到快时钟) 脉冲同步器电路(快时钟到慢时钟) 结绳法1 结绳法2 结绳法3 多bit信号跨时钟域传播 多 ...

  2. 外网数据同步到内网方案_数据同步之解决方案

    关于数据同步的需求,想必是开发人员都可能遇到!下面就聊聊关于数据同步的解决方案: 一.使用中间表:数据生产者将数据放在一个中间库,数据消费方定时的去这个中间库取数据,用来消费这些数据, 但是这中方案并 ...

  3. springboot实现增量备份_SpringBoot canal数据同步解决方案

    SpringBoot canal数据同步解决方案 一.需求 微服务多数据库情况下可以使用canal替代触发器,canal是应阿里巴巴跨机房同步的业务需求而提出的,canal基于数据库的日志解析,获取变 ...

  4. canal - 数据同步工具

    一.应用场景 在前面Echarts - 实现图表显示中,我们使用了服务调用(统计表中的信息通过调用用户模块服务来获取)获取统计信息,这样耦合度高,效率相对较低,目前有另一种方法,通过实时同步数据库表的 ...

  5. 赶集网CDC案例-蔡峰:赶集网CDC异构数据同步方案实践-IT168 信息化专区

    赶集网CDC案例-蔡峰:赶集网CDC异构数据同步方案实践-IT168 信息化专区 赶集网CDC案例-蔡峰:赶集网CDC异构数据同步方案实践-IT168 信息化专区 posted on 2015-07- ...

  6. 阿里开源数据同步神器DataX异构数据源间数据同步同步MySQL与HDFS相互实战

    Datax 实战使用 继上一篇 阿里开源数据同步神器DataX异构数据源间数据同步基础介绍与快速入门之后的实战篇 1.MySQL-To-HDFS 环境 & 准备说明: 描述: 为了快速搭建测试 ...

  7. 阿里mysql 二进制_Mysql binlog 之阿里canal

    1.What is Canal? canal [kə'næl],中文翻译为 水道/管道/沟渠/运河,主要用途是用于 MySQL 数据库增量日志数据的订阅.消费和解析,是阿里巴巴开发并开源的,采用Jav ...

  8. 源码安装mysql主从_mysql源码安装和主从同步配置

    mysql源码安装和主从同步配置 mysql介绍: MySQL 是一种关联数据库管理系统,关联数据库将数据保存在不同的表中,而不是将所有数据放在一个大仓库内,这样就增加了速度并提高了灵活性.MySQL ...

  9. Canal数据同步策略

    缓存数据同步的常见方式有三种: 设置有效期:给缓存设置有效期,到期后自动删除.再次查询时更新 优势:简单.方便 缺点:时效性差,缓存过期之前可能不一致 场景:更新频率较低,时效性要求低的业务 同步双写 ...

最新文章

  1. 【Android】SlidingMenu属性详解
  2. 计算机二进制加减符号,(带符号的二进制数的表示方法及加减法运算).ppt
  3. 程序员版本管理知识 Git 详细整理
  4. 【数据结构与算法】最小生成树--Kruskal算法 Prim算法
  5. 14怎么敷铜不了_YEO护肤课堂:敷面膜有什么误区?我们应该如何选择面膜?
  6. js用户密码强度验证函数
  7. Visual Studio 2017在编译OpenCV 4.2.0时出现编译器错误C2001:常量中有换行符
  8. java 多线程 举例,Java多线程简单举例
  9. Highlighting高亮插件使用说明
  10. pip或者python安装jpype总是报错----Boilerpipe使用
  11. 卡巴斯基2017免费版发布下载:文件/网页杀毒、自动更新/保护
  12. 打开cmd 的方式和常用的cmd快捷键
  13. SpringCloud系列之服务总线(Bus)
  14. 读高明之《帛書老子校注》
  15. Unity功能点---模拟枪械射击时的后坐力
  16. 在WindowXP中显示找不到服务器或者DNS错误
  17. JavaEE的RESTful标准技术JAX-RS,jersey-client客户端使用介绍【享学Java】
  18. vue-video播放器
  19. matlab行向量,列向量
  20. 基于多智能体模型的街道步行空间量化研究

热门文章

  1. 计算机一级 像素题目,2016计算机一级公共基础练习题
  2. 朝花夕拾:HSR/PRP冗余协议(一)
  3. Python与C语言对比大全(持续更新中)
  4. 计算机应用基础名言名句大全,计算机应用基础实训指导书-上.doc
  5. Fight against involution | 2020ICPC济南D
  6. 2023的网安玩家,会和布洛芬退烧一样“凉”得快吗?
  7. EAGAIN、EWOULDBLOCK、EINTR与非阻塞的理解
  8. html页面弹窗代码
  9. 金山网盾V3.5产品档案
  10. C++ 字符串转int