光闸mysql同步_mysql跨网域canal数据同步
需求:高密网和低密网之间的mysql文件落地同步。 分析:解决不同网段之间的数据通讯可以采用光闸或者网闸。
解决mysql之间的同步则可以采用canal。利用canal生成数据库变化的sql落地成相应文件,让其被交换至相对网段的某个目录下。canal客户端读取该目录下的文件,执行目录下文件的sql完成数据同步。
1. canal简介:
canal是阿里巴巴的基于数据库增量日志解析,提供增量数据订阅&消费的一个开源项目。目前主要支持mysql、oracle数据库。
canal的工作原理:
原理相对比较简单:
canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议。
mysql master收到dump请求,开始推送binary log给slave(也就是canal)。
canal解析binary log对象(原始为byte流
2.canal的配置 canal分为canal客户端和服务端,canal服务端由阿里提供;
1.下载canal服务端 https://github.com/alibaba/canal/releases。 2.解压
3.配置canal-server。
4.开启mysql的binlog写入功能,建议配置binlog模式为row,修改数据库配置文件my.ini,Linux下为my.cnf。
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1
skip-name-resolve
expire_logs_days = 10
解释:
log-bin=mysql-bin #开启binlog
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
skip-name-resolve #默认安装的MySql开启了DNS的反向解析。如果禁用的话就不能在MySQL的授权表中使用主机名了而只能用ip格式,防止用127.0.0.1登录,mysql对ip反向解析后用localhost登录出现权限不足,拒绝登录的错误。
expire_logs_days = x #二进制日志自动删除的天数。默认值为0,表示“没有自动删除”
修改完成后重启MySql服务,执行以下语句查看是否开启binlog写入功能。
SHOW VARIABLES LIKE "log_%";
如下图所示,若log_bin为ON,则binlog写入功能已打开。
5.为数据库添加canal用户,开启做为mysql slave的相关权限,执行以下语句。
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION
CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
执行完成后,执行SELECT * FROM mysql.user;查询是否添加成功,如下图所示。
6.修改配置文件instance.properties,位于canal-server/conf/example/instance.properties,红色字体的部分需要修改。
说明:canal.instance.connectionCharset代表数据库的编码方式对应到java中的编码类型,比如UTF-8,GBK
, ISO-8859-1。
7.启动canal服务器端
执行canal-server\bin下的启动脚本,windows下为startup.bat,Linux为startup.sh。windows下正常启动页面如下:
8.启动后查看日志, canal-server\logs\canal\canal.log
具体instance的日志canal-server\logs\example \example.log
以上表示正常启动。
3.canal-client 参考https://github.com/alibaba/canal/wiki/ClientExample创建canal客户端:
package com.shu.hamal.canal.instance;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Vector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
import com.shu.hamal.canal.common.CanalUtility;
import com.shu.hamal.canal.common.ConfigEntity;
import com.shu.hamal.canal.common.FileIndexUtil;
import com.shu.hamal.canal.common.InitConfig;
/**
*
* 生成SQL文件线程
*
* @author shu.xiaobai
*/
public class CanalClient implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(CanalClient.class);
// private Vector obj;
public CanalClient(Vector v) {
// this.obj = v;
}
public void run() {
// synchronized (obj) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(CanalUtility.INET_SOCKET_ADDRESS,
CanalUtility.INET_SOCKET_PORT), CanalUtility.CANAL_DESTINATION, CanalUtility.CANAL_USERNAME, CanalUtility.CANAL_PASSWORD);
int batchSize = 1000;
try {
long now = System.currentTimeMillis();
while (true) {
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
LOGGER.info("连接canal服务端成功...");
break;
} catch (Exception e) {
LOGGER.error("连接canal服务端失败...10s后尝试下一次连接...");
Thread.sleep(10000);
}
if ((System.currentTimeMillis() - now) > 1000 * 60 * 10) {
LOGGER.error("10min中内连接canal服务端失败...程序退出...请启动canal服务端后重启客户端...");
System.exit(-1);
}
}
LOGGER.info("canal client is running...");
while (true) {
// if (obj.size() != 0) {
// obj.wait();
// }
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
LOGGER.debug("canal client is running...");
// obj.add(new String("canal"));
// obj.notify();
Thread.sleep(1000);
} else {
try {
printEntry(message.getEntries());
} catch (Exception e) {
LOGGER.error("PrintEntry Exception" + e);
}
}
connector.ack(batchId);
}
} catch (Exception e) {
LOGGER.error("线程休眠或唤醒异常:" + e);
} finally {
connector.disconnect();
}
// }
}
private void printEntry(List entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
for (RowData rowData : rowChage.getRowDatasList()) {
String sql = CanalUtility.constructSql(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), entry.getHeader()
.getTableName(), rowChage.getEventType());
String canalSql = CanalUtility.constructCanalSql(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList(), entry.getHeader()
.getTableName(), rowChage.getEventType());
// 将语句拼接写入文件
exportSql2File(canalSql, sql, rowChage.getEventType(), entry.getHeader().getTableName(), entry.getHeader().getSchemaName());
}
}
}
/**
* 导出sql到文件
*
* @param sql
*/
private void exportSql2File(String canalSql, String sql, EventType eventType, String tableName, String databaseName) {
ConfigEntity config = InitConfig.configMap.get(databaseName);
if (config != null) {
StringBuffer sb = new StringBuffer();
sb.append(databaseName);
sb.append(CanalUtility.minus);
sb.append(tableName);
sb.append(CanalUtility.minus);
sb.append(eventType);
sb.append(CanalUtility.minus);
sb.append(canalSql);
sb.append(CanalUtility.minus);
sb.append(sql);
if (InitConfig.listMap.get(databaseName).contains(sql)) {
InitConfig.listMap.get(databaseName).remove(sql);
return;
}
FileOutputStream fos = null;
File file = new File(config.getCanalSqlDirectory().replace("/", File.separator) + File.separator + databaseName
+ CanalUtility.CANAL_FILENAME_PREFIX + FileIndexUtil.readClientIndex(config.getClientFile()) + CanalUtility.CANAL_FILENAME_SUFFIX);
try {
if (file.exists()) {
file.delete();
} else {
file.getParentFile().mkdirs();
file.createNewFile();
file.setReadable(false);
}
fos = new FileOutputStream(file);
LOGGER.info("export [" + sb.toString() + "] to " + file.getCanonicalPath());
fos.write(sb.toString().getBytes("UTF-8"));
fos.flush();
file.setReadable(true);
LOGGER.info("SQL语句 [" + sb.toString() + "] 成功写至 " + file.getCanonicalPath());
} catch (FileNotFoundException e) {
LOGGER.error(file.getName() + "不存在:" + e);
} catch (UnsupportedEncodingException e) {
LOGGER.error("字符串转字节数组异常:" + e);
} catch (IOException e) {
LOGGER.error("字节写入文件" + file.getName() + "异常:" + e);
} finally {
if (null != fos)
try {
fos.close();
} catch (IOException e) {
LOGGER.error("流关闭异常:" + e);
}
}
}
}
}
我的完整代码:https://github.com/shuxiaoabi/xiaobaiRepositiry。
注:我的canal客户端构建场景为:canal分为两个线程一个线程用于连接监听canal服务端解析binlog日志生成对应的sql语句落地成文件,该文件会被数据交换交换至跨网的另外一端某个目录下,还有一个线程用于解析交换过来的sql文件并执行该文件使其数据库完成同步(跨网域之间的中间器可采用光闸或者网闸)。
运行
1,运行canal服务端startup.bat / startup.sh
2,运行客户端程序。
这是小白写的第一次分享,如文中有错误欢迎大家指出,大家相互学习共同成长,谢谢。
光闸mysql同步_mysql跨网域canal数据同步相关推荐
- 跨时钟域的数据同步,亚稳态,和相关电路设计方法
目录 同步时钟: 亚稳态: 双锁存器同步电路 单bit信号跨时钟域传播 边沿检测同步电路(慢时钟到快时钟) 脉冲同步器电路(快时钟到慢时钟) 结绳法1 结绳法2 结绳法3 多bit信号跨时钟域传播 多 ...
- 外网数据同步到内网方案_数据同步之解决方案
关于数据同步的需求,想必是开发人员都可能遇到!下面就聊聊关于数据同步的解决方案: 一.使用中间表:数据生产者将数据放在一个中间库,数据消费方定时的去这个中间库取数据,用来消费这些数据, 但是这中方案并 ...
- springboot实现增量备份_SpringBoot canal数据同步解决方案
SpringBoot canal数据同步解决方案 一.需求 微服务多数据库情况下可以使用canal替代触发器,canal是应阿里巴巴跨机房同步的业务需求而提出的,canal基于数据库的日志解析,获取变 ...
- canal - 数据同步工具
一.应用场景 在前面Echarts - 实现图表显示中,我们使用了服务调用(统计表中的信息通过调用用户模块服务来获取)获取统计信息,这样耦合度高,效率相对较低,目前有另一种方法,通过实时同步数据库表的 ...
- 赶集网CDC案例-蔡峰:赶集网CDC异构数据同步方案实践-IT168 信息化专区
赶集网CDC案例-蔡峰:赶集网CDC异构数据同步方案实践-IT168 信息化专区 赶集网CDC案例-蔡峰:赶集网CDC异构数据同步方案实践-IT168 信息化专区 posted on 2015-07- ...
- 阿里开源数据同步神器DataX异构数据源间数据同步同步MySQL与HDFS相互实战
Datax 实战使用 继上一篇 阿里开源数据同步神器DataX异构数据源间数据同步基础介绍与快速入门之后的实战篇 1.MySQL-To-HDFS 环境 & 准备说明: 描述: 为了快速搭建测试 ...
- 阿里mysql 二进制_Mysql binlog 之阿里canal
1.What is Canal? canal [kə'næl],中文翻译为 水道/管道/沟渠/运河,主要用途是用于 MySQL 数据库增量日志数据的订阅.消费和解析,是阿里巴巴开发并开源的,采用Jav ...
- 源码安装mysql主从_mysql源码安装和主从同步配置
mysql源码安装和主从同步配置 mysql介绍: MySQL 是一种关联数据库管理系统,关联数据库将数据保存在不同的表中,而不是将所有数据放在一个大仓库内,这样就增加了速度并提高了灵活性.MySQL ...
- Canal数据同步策略
缓存数据同步的常见方式有三种: 设置有效期:给缓存设置有效期,到期后自动删除.再次查询时更新 优势:简单.方便 缺点:时效性差,缓存过期之前可能不一致 场景:更新频率较低,时效性要求低的业务 同步双写 ...
最新文章
- 【Android】SlidingMenu属性详解
- 计算机二进制加减符号,(带符号的二进制数的表示方法及加减法运算).ppt
- 程序员版本管理知识 Git 详细整理
- 【数据结构与算法】最小生成树--Kruskal算法 Prim算法
- 14怎么敷铜不了_YEO护肤课堂:敷面膜有什么误区?我们应该如何选择面膜?
- js用户密码强度验证函数
- Visual Studio 2017在编译OpenCV 4.2.0时出现编译器错误C2001:常量中有换行符
- java 多线程 举例,Java多线程简单举例
- Highlighting高亮插件使用说明
- pip或者python安装jpype总是报错----Boilerpipe使用
- 卡巴斯基2017免费版发布下载:文件/网页杀毒、自动更新/保护
- 打开cmd 的方式和常用的cmd快捷键
- SpringCloud系列之服务总线(Bus)
- 读高明之《帛書老子校注》
- Unity功能点---模拟枪械射击时的后坐力
- 在WindowXP中显示找不到服务器或者DNS错误
- JavaEE的RESTful标准技术JAX-RS,jersey-client客户端使用介绍【享学Java】
- vue-video播放器
- matlab行向量,列向量
- 基于多智能体模型的街道步行空间量化研究
热门文章
- 计算机一级 像素题目,2016计算机一级公共基础练习题
- 朝花夕拾:HSR/PRP冗余协议(一)
- Python与C语言对比大全(持续更新中)
- 计算机应用基础名言名句大全,计算机应用基础实训指导书-上.doc
- Fight against involution | 2020ICPC济南D
- 2023的网安玩家,会和布洛芬退烧一样“凉”得快吗?
- EAGAIN、EWOULDBLOCK、EINTR与非阻塞的理解
- html页面弹窗代码
- 金山网盾V3.5产品档案
- C++ 字符串转int