MySql自动同步主库数据(Canal)

上篇文章介绍了MongoDB数据库模拟从库实现主从复制效果,不会影响线上数据,本文介绍MySql模拟从库实现主从复制。

  • Find Action Ctrl+Shift+A,在此对话框中输入想操作的英文立即出现想的操作

  • System.out.println输入sout即可

  • for循环N输入N.for即可

  • return n;输入n.return即可

  • 离线写博客

  • 导入导出Markdown文件

  • 丰富的快捷键


MySql主从复制原理

MySql主从复制指数据从一个MySql复制到一个或多个MySql服务器中,基于binlog采用异步复制直接IO,这样数据访问不用一直访问主库来完成,主库负责修改,从库负责读取,实现读写分离。

  • master会将变更数据存入binlog文件中。
  • slave连接主库时主库会开启一个dump线程发送binlog内容。
  • slave会启动一个I\O Thread请求主库发送binlog记录并保存到中继日志中。
  • 从库启动Sql Thread线程读取中继日志,本地重放,使得数据与主库保持一致,最后I/O Thread和SQL Thread将进入睡眠状态,等待下一次被唤醒。

总结:

  • 从库生成两个线程I/O Thread、Sql Thread。
  • I/O Thread请求主库binlog并写到中继日志(relay-log)中。
  • 主库生成一个dump线程,给从库传binlog。
  • Sql线程读取中继日志解析并执行成sql

Canal原理

canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议,MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流) —— [ 官网地址 ]

下载源码修改:conf/example/instance.properties文件

//设置主库IP和端口
canal.instance.master.address=IP:端口
//设置同步开始时间(毫秒),可忽略
canal.instance.master.timestamp=

启动canal:/bin/startup.sh

本文用的阿里云服务,不需要有多过设置,其它小伙伴参照官网配置一下即可。

代码

SimpleCanalClientExample.class

package canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.util.StringUtils;import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;/*** @Author: LailaiMonkey* @Description:* @Date:Created in 2021-01-04 11:53* @Modified By:*/
public class SimpleCanalClientExample {static Statement statement = null;public static void main(String args[]) {//获得存储sqlStatementgetStatement();// 创建Canal链接CanalConnector connector = getCanalConnector();int batchSize = 10000;while (true) {// 获取指定数量的数据Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {try {Thread.sleep(1000);} catch (InterruptedException ignored) {}} else {findEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}}private static CanalConnector getCanalConnector() {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp,11111), "example", "", "");connector.connect();connector.subscribe(".*\\..*");connector.rollback();return connector;}private static void getStatement() {//驱动程序名String driver = "com.mysql.cj.jdbc.Driver";// URL指向要访问的数据库名scutcsString url = "jdbc:mysql://从库IP:3306/数据库名?useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull";// MySQL配置时的用户名String user = "账号";// Java连接MySQL配置时的密码String password = "密码";try {// 加载驱动程序Class.forName(driver);// 连续数据库Connection conn = DriverManager.getConnection(url, user, password);if (!conn.isClosed()) {System.out.println("Succeeded connecting to the Database!");}// statement用来执行SQL语句statement = conn.createStatement();} catch (ClassNotFoundException | SQLException e) {e.printStackTrace();}}private static void findEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}CanalEntry.EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {String sql = "delete from " + entry.getHeader().getTableName();sql += deleteSql(rowData.getBeforeColumnsList());doSql(sql);} else if (eventType == CanalEntry.EventType.INSERT) {String sql = "replace into " + entry.getHeader().getTableName();sql += insertSql(rowData.getAfterColumnsList());doSql(sql);} else {String sql = "update " + entry.getHeader().getTableName();sql += updateSql(rowData.getAfterColumnsList());doSql(sql);}}}}private static String updateSql(List<Column> columns) {StringBuilder str = new StringBuilder(" set ");CanalField canalField = getField(columns);List<String> fields = canalField.getFields();List<String> values = canalField.getValues();for (int i = 0; i < fields.size(); i++) {str.append(fields.get(i)).append(" = ").append(values.get(i)).append(",");}str.deleteCharAt(str.length() - 1);//以主键为条件更新str.append(" where id = ").append(canalField.getId());return str.toString();}private static String insertSql(List<Column> columns) {StringBuilder str = new StringBuilder(" ( ");CanalField canalField = getField(columns);List<String> fields = canalField.getFields();List<String> values = canalField.getValues();str.append(String.join(",", fields)).append(") values (").append(String.join(",", values)).append(" ) ");return str.toString();}private static String deleteSql(List<Column> columns) {StringBuilder str = new StringBuilder(" where id = ");String id = "";for (Column column : columns) {if ("id".equals(column.getName())) {id = column.getValue();}}str.append(id);return str.toString();}private static CanalField getField(List<Column> columns) {CanalField canalField = new CanalField();List<String> field = new ArrayList<>();List<String> value = new ArrayList<>();for (Column column : columns) {if (StringUtils.isEmpty(column.getValue())) {continue;}//获得id(主键)字段值if ("id".equals(column.getName())) {canalField.setId(column.getValue());}//获得字段和值int sqlType = column.getSqlType();if (sqlType <= 8 && sqlType >= 2 || sqlType <= -5 && sqlType >= -7) {field.add("`" + column.getName() + "`");value.add(column.getValue());} else {field.add("`" + column.getName() + "`");value.add("'" + column.getValue() + "'");}}canalField.setFields(field);canalField.setValues(value);return canalField;}private static void doSql(String sql) {try {// 要执行的SQL语句statement.executeUpdate(sql);} catch (Exception e) {System.out.println("执行失败:" + sql);e.printStackTrace();}}}

CanalField.class

package canal;import java.util.List;/*** @Author: LailaiMonkey* @Description:* @Date:Created in 2021-01-05 14:03* @Modified By:*/
public class CanalField {private String id;private List<String> fields;private List<String> values;public String getId() {return id;}public void setId(String id) {this.id = id;}public List<String> getFields() {return fields;}public void setFields(List<String> fields) {this.fields = fields;}public List<String> getValues() {return values;}public void setValues(List<String> values) {this.values = values;}
}

需要先初始化从库表结构及数据,启动java程序实现自己同步。—— [ 源码地址 ]

MySql自动同步主库数据(Canal)相关推荐

  1. mysql主从同步部分表_Mysql入门MySQL 主从同步部分数据表

    <Mysql入门MySQL 主从同步部分数据表>要点: 本文介绍了Mysql入门MySQL 主从同步部分数据表,希望对您有用.如果有疑问,可以联系我们. 导读:在配置MySQL主从同步的时 ...

  2. mysql同步到redis_通过mysql自动同步redis

    在服务端开发过程中,一般会使用MySQL等关系型数据库作为最终的存储引擎,Redis其实也可以作为一种键值对型的数据库,但在一些实际场景中,特别是关系型结构并不适合使用Redis直接作为数据库.这俩家 ...

  3. 数字化外协生产综合管理系统,实现信息自动同步,数据自动统计分析!

    随着市场经济的不断发展,制造生产行业竞争不断加剧,精细化.无纸化办公已成为生产企业生存和发展的基本条件.要想将企业内部管理做的更精更细,就必须借助于现代先进的企业管理手段和工具,如企业资源计划系统.生 ...

  4. mysql自动同步数据_MySQL数据库实现双向自动同步

    [IT168 技术]本文将探讨如何通过MySQL数据库的高级特性,实现数据库的双向自动同步,确保数据的冗余与完整性.通过以往真实的项目实战与经验,把操作实施过程全部记录下来,主要有以下几个主要内容. ...

  5. solr mysql 自动同步_MongoDB和Solr的整合以及实现数据同步功能

    使用mongo-connector实现mongodb与solr数据同步: 1.solr搭建.这个我有记录,可以去找,这里不说了.(此次采用solr版本为4.7) 2.mongo搭建,要搭建集群,就是副 ...

  6. mysql主从同步当天数据,mysql主从数据同步

    一,安装好主数据库和从数据库,此处省略 我已经准备好了以下数据库 主: centos 7 mysql 5.7 ip 192.168.1.2 从: centos 7 mysql 5.7 ip 192.1 ...

  7. MYSQL主从同步(主库服务器为Linux,从库为Windows)

    [以下操作使用root用户进行] 一.主从库系统环境 1.主库系统:CentOS Linux release 7.6.1810 (Core) 2.从库系统:Microsoft Windows 10 家 ...

  8. mysql nosql 同步_使用canal和canal_mysql_nosql_sync同步mysql数据

    场景: 有两个独立的项目A和B,都使用mysql做数据库, 其中项目A中有一个表存储新闻资讯,字段有新闻id,标题title,类型type,内容data. 后来项目B也需要这个表的数据,但项目B用了两 ...

  9. mysql增量同步kafka_MySQL数据实时增量同步到Kafka - Flume

    写在前面的话 需求,将MySQL里的数据实时增量同步到Kafka.接到活儿的时候,第一个想法就是通过读取MySQL的binlog日志,将数据写到Kafka.不过对比了一些工具,例如:Canel,Dat ...

  10. 让两个 mysql 自动同步_实现两个Mysql数据库之间同步的方案

    实现两个Mysql MySQL 为了实现replication 必须打开bin-log 项,也是打开二进制的MySQL 日志记录选项.MySQL 的bin log 二 进制日志,可以记录所有影响到数据 ...

最新文章

  1. 虚幻争霸服务器维护,《虚幻争霸》将于4月停止运营 玩家可全额退款
  2. 常见虚拟主机目录对照及星外提权目录
  3. cmd命令:在ftp下载文件运行
  4. Docker 方式安装 RabbitMQ (ribbitmq linux 部署)
  5. oracle form执行后左上角没出现oracle标记,oracle form学习笔记
  6. mpython掌控板作品_第1课 Arduino micro:bit 掌控板 创客教育常用的3类主控板
  7. 花了10块钱,我在朋友圈成为了富豪...
  8. Java核心知识点学习----多线程中的阻塞队列,ArrayBlockingQueue介绍
  9. mybatis使用和分析
  10. 405.数字转换为十六进制数
  11. 国家地理相关资源数据库
  12. 中国知网如何下载外文文献
  13. 2019各个省会城市全新DNS大全一
  14. 全球时区 简称 缩写 简介 PST EST GMT CST EDT UTC 等
  15. 利用REmap绘制百度迁徙图
  16. 中国联通广州软件研究院 软件开发岗二面(技术面)
  17. ios关联启动_部落冲突电脑版与IOS设备关联教程
  18. 中国的部分家长应该扪心自问一下了
  19. 在搭建分布式事务txlcn-tm服务端时遇到坑及解决方案
  20. 2023年产品经理需要考的证书,NPDP含金量真高

热门文章

  1. php中html插入图片,html插入图片的示例代码详解(图)
  2. Rendezvous: A Search Engine for Binary Code
  3. clone远程代码 在不同电脑上git_Git 同一电脑配置多个远程仓库
  4. QT的QScrollArea使用详解
  5. 基站定位(Google API)
  6. matlab做图片提取骨架,图片骨架提取
  7. css深入理解flex布局中的剩余空间分配规则——flex-grow,flex-shrink和flex-basis
  8. 个人理财通Android手机测试,基于Android的个人理财管理系统
  9. 空城旧梦,相逢不必邂逅
  10. pagefile.sys 分页文件貌似不能放在移动硬盘上