canal 监听同步指定数据库,所有表

canal 监听同步指定数据库,所有表

因为工作需求,需要用到数据库同步,又从网上找了一些发现都有些问题,所以自己弄好之后写一篇总结,及配置步骤吧

先将 MySQL配置成 bin-log模式

给MySQL配置canal用户

下载 canal工具(在这里我用的是:canal.deployer-1.1.5-SNAPSHOT)我会把工具上传到我的资源(免费的)

配置instance.properties 配置文件

编写java api

配置MySQL bin-log模式

先 window+R 唤出 输入 services.msc 然后点击确定

找到MySQL服务右键属性,找到MySQL地址,因为我这配置了默认的my.ini文件,没配置应该是一个MySQL的地址找到里面的my.ini文件

找到my.ini文件进入

添加配置

log-bin=mysql-bin

binlog-format=ROW

注意server-id之前有没有,没有就添加一个

server-id=1 然后保存,在刚才唤出的服务里重启MySQL服务

给MySQL配置canal用户

CREATE USER canal IDENTIFIED BY ‘canal’;

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’;

FLUSH PRIVILEGES;

下载canal 程序包 canal软件包下载地址 https://download.csdn.net/download/Angzush/12894366

4.配置 instance.properties文件,注意 红框的文件夹名后面配置会用到

canal.instance.defaultDatabaseName = 你默认监听的数据库

canal.instance.filter.regex = 正则配置的规则

我这里配置的是 data_resource_update_platform 数据库下所有的表

mysql 数据解析关注的表,Perl正则表达式.

多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)

常见例子:

所有表:.* or .\…

canal schema下所有表: canal\…*

canal下的以canal打头的表:canal\.canal.*

canal schema下的一张表:canal.test1

多个规则组合使用:canal\…*,mysql.test1,mysql.test2 (逗号分隔)

注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解

析sql,所以无法准确提取tableName进行过滤)

我当时是这块出了问题,一直监听的是整个MySQL服务器,不是我配置的数据库

canal instance启动时,默认加载instance.properties的canal.instance.filter.regex参数,之后会根据conf/canal/meta.dat文件filter值更新过滤规则。当客户端调用CanalConnector.subscribe(String filter)方法时,instance再次用filter参数更新过滤规则。

所以当你只关心部分库表更新时,设置了canal.instance.filter.regex,一定不要在客户端调用CanalConnector.subscribe(".\…"),不然等于没设置canal.instance.filter.regex。

如果一定要调用CanalConnector.subscribe(".\…"),那么可以设置instance.properties的canal.instance.filter.black.regex参数添加黑名单,过滤非关注库表。

你需要将 meta.dat 文件中的 filter对应的正则 改成 你配置的那一个我的是:data_resource_update_platform\…*

然后再启动 bin下面的 startup.bat 文件

接着查看日志 这样就启动成功了

java API 代码

package org.bigdata.framework.utils;

import java.net.InetSocketAddress;

import java.util.List;

import javax.validation.constraints.NotNull;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.common.utils.AddressUtils;

import com.alibaba.otter.canal.protocol.CanalEntry.Column;

import com.alibaba.otter.canal.protocol.CanalEntry.Entry;

import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;

import com.alibaba.otter.canal.protocol.CanalEntry.EventType;

import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;

import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

import com.alibaba.otter.canal.protocol.Message;

/**

* @param

* @author zcs

* @date 2020/09/27

* @description

* @return

*/

public class ClientSample {

public static void main(String args[]) {

// 创建链接

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),

11111), "example", "canal", "canal");//这里的 example 是上文中提到的文件名

int batchSize = 1000;

int emptyCount = 0;

try {

connector.connect();

connector.subscribe(".*\\..*");

connector.rollback();

int totalEmtryCount = 1200;

while (emptyCount < totalEmtryCount) {

Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据

long batchId = message.getId();

System.out.println("batchId = "+batchId);

int size = message.getEntries().size();

System.out.println(size);

if (batchId == -1 || size == 0) {

emptyCount++;

System.out.println("empty count : " + emptyCount);

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

} else {

emptyCount = 0;

// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);

printEntry(message.getEntries());

}

connector.ack(batchId); // 提交确认

// connector.rollback(batchId); // 处理失败, 回滚数据

}

System.out.println("empty too many times, exit");

} finally {

connector.disconnect();

}

}

private static void printEntry(@NotNull List entrys) {

for (Entry entry : entrys) {

if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {

continue;

}

RowChange rowChage = null;

try {

rowChage = RowChange.parseFrom(entry.getStoreValue());

} catch (Exception e) {

throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),

e);

}

EventType eventType = rowChage.getEventType();

System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",

entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),

entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),

eventType));

for (RowData rowData : rowChage.getRowDatasList()) {

if (eventType == EventType.DELETE) {

printColumn(rowData.getBeforeColumnsList());

} else if (eventType == EventType.INSERT) {

printColumn(rowData.getAfterColumnsList());

} else {

System.out.println("-------> before");

printColumn(rowData.getBeforeColumnsList());

System.out.println("-------> after");

printColumn(rowData.getAfterColumnsList());

}

}

}

}

private static void printColumn(@NotNull List columns) {

for (Column column : columns) {

System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());

}

}

}

canal 监听同步指定数据库,所有表相关教程

2020/09/28 react函数式组件 useEffect 监听滚动事件 获取滚动高

2020/09/28 react函数式组件 useEffect 监听滚动事件 获取滚动高度 目标效果,以圆通速递官网为例 –滑动一定高度后 代码 –代码块 const reductionStyle = { position:absolute, top:3.75rem, width:80%, borderRadius:40px, marginLeft:10%, zIndex:9999}fu

使goroutine同步的方法总结

使goroutine同步的方法总结 原文作者:xiaoxlm 使goroutine同步的方法总结 前言: 在前面并发性能对比的文章中,我们可以看到Golang处理大并发的能力十分强劲,而且开发也特别方便,只需要用go关键字即可开启一个新的协程。 但当多个goroutine同时进行处理的时

使用dropbox定时同步_使用Dropbox免费同步Rainlendar日历

使用dropbox定时同步_使用Dropbox免费同步Rainlendar日历 使用dropbox定时同步 Do you use Rainlendar Lite on multiple computers, or would you like to share your calendars with others? Here’s how you can keep your calendars synced and shared fo

Dubbo同步调用和超时源码

Dubbo同步调用和超时源码 同步调用 同步调用是一种阻塞式的调用方式,即 Consumer 端代码一直阻塞等待,直到 Provider 端返回为止; dubbo默认的协议是netty, Netty 是NIO 异步通讯机制,那么服务调用是怎么转化为同步的呢? 下面看源码: 省略一部分调用链

mysql的半同步复制

mysql的半同步复制 MySQL通过复制(Replication)实现存储系统的高可用。目前,MySQL支持的复制方式有: 异步复制 (Asynchronous Replication):原理最简单,性能最好。但是主备之间数据不一致的概率很大。 半同步复制 (Semi-synchronous Replication):

itunes ios_如何停止iTunes与iOS设备自动同步

itunes ios_如何停止iTunes与iOS设备自动同步 itunes ios ( Stop a Specific Device from Syncing with iTunes ) If you only want to stop a specific iOS device from syncing with iTunes, you must first connect that device to your computer and open

DMHS单向同步

DMHS单向同步 DMHS单向同步 DMHS是一款基于日志分析的数据库实时同步系统,DMHS的捕获功能(捕获器)是对源端的在线日志或归档日志进行解析并发送出去;DMHS的转发功能(转发器)是对基于本地文件、网络、FTP的数据进行过滤机应用层传输;DMHS的路由功能(路由器)

Linux时间同步

Linux时间同步 ####时间同步#### 1.服务端 yum install chrony -y ##安装服务 vim /etc/chrony.conf ##主配置文件 21 # Allow NTP client access from local network. 22 allow 172.25.0.0/24 ##允许谁去同步我的时间 27 # Serve time even if not synchroni

监听mysql表内容变化 使用canal,canal 监听同步指定数据库,所有表相关推荐

  1. 监听mysql表内容变化 使用canal_2 监听mysql表内容变化,使用canal

    mysql本身是支持主从的(master slave),原理就是master产生的binlog日志记录了所有的增删改语句,将binlog发送到slave节点进行执行即可完成数据的同步. canal是阿 ...

  2. iOS 自定义搜索框实时监听输入的内容变化

    产品需求:点击搜索框,弹出键盘,当输入内容发生变化时,需要实时匹配与输入内容相关的产品,列表展示,让用户去选择:类似于淘宝和京东的搜索功能. 拿到需求的时候觉得这个应该挺简单的啊,苹果这么强大,这些功 ...

  3. jq 检测元素内html变化,jq 监听 textarea 元素内容变化的方法

    在前台是可以限制 textarea 元素内输入内容的长度的,当然是用 jquery 代码实现起来是最简单,方便的,JQ脚本通过对 textarea 元素内容变化的检测来判断内容是否超出指定的长度. j ...

  4. canal 监听同步指定数据库,刷新redis缓存

    最近工作中需要使用到缓存,但是由于在业务实现的时候刷新缓存总会出现一些缓存不一致问题.于是最终想采用canal监听来处理数据一致性问题. 查看mysql binlog日志是否开启: 1.配置mysql ...

  5. mysql同步表到本地_sql 同步远程数据库(表)到本地

    一)在同一个数据库服务器上面进行数据表间的数据导入导出: 1. 如果表tb1和tb2的结构是完全一样的,则使用以下的命令就可以将表tb1中的数据导入到表tb2中: insert into db2.tb ...

  6. ios 监听TextField中内容变化

    本篇文章只为帮助跟多的人.适合初学者. 在这里我介绍3种监听UITextField的方法.并在最后写了一个小的demo 提供参考. -------请不要纠结小编的命名方式规不规范,一切只为共同学习,共 ...

  7. oracle定时向mysql取数据_Oracle中通过Job实现定时同步两个数据表之间的数据

    摘要:之前项目中用的触发器来实现数据解析,但是最近客户反应,会报错,所以我们从新设计了一下,通过Oracle里面的Job来解决这一问题,这样就不会对原来的数据表做操作,只对临时表操作,就不会对客户那边 ...

  8. MySQL报错、错误代码: 1005 Can‘t create table ‘数据库名.表名‘ (errno: 150) ?

    在创建数据库表时出现创建表失败的情况,如下: # 创建部门表 CREATE TABLE department(depid INT UNSIGNED NOT NULL PRIMARY KEY AUTO_ ...

  9. php连接mysql指定表名_php mysql获取指定数据库所有表名_PHP教程

    $cn = mysql_connect('localhost','root','root'); mysql_select_db('test',$cn); print_r(get_tables()); ...

最新文章

  1. mac怎么用python3.0_怎么在mac中使用python
  2. 怎么判断模式窗体打开的窗体名_QtitanRibbon是什么?该怎么用?
  3. python培训班那家好-Python培训机构去哪好
  4. php贝叶斯,php – 将单个概率与朴素贝叶斯垃圾邮件过滤相结合
  5. 计算机在财务核算中的应用,浅谈计算机在财务核算和财务管理工作中的辅助应用...
  6. python语言特点依赖平台吗_python语言的特点
  7. 详解struts2中struts.properties
  8. WPF遍历当前容器中某种控件的方法
  9. 分羊(区间dp:分治与决策单调性优化)
  10. 话费直充/三网直冲/联通直充/电信直充/移动直充/系统源码
  11. SpringMVC 中整合JSON、XML视图二
  12. kepware怎么读modbus/tcp数据_第479期丨看看这国企9000+的面试题目;国产PLC工控板质量怎么样?...
  13. vue 自己捣鼓周日程日历组件(WSchedule)
  14. 信息学奥赛一本通C++版
  15. 在页面中使用Flowplayer播放器
  16. 计算机32还是64位操作系统,电脑系统32位好还是64位好 哪个快?
  17. MySql8.0安装教程,细致教学
  18. 【Python 3.7】电影票:有家电影院根据观众的年龄收取不同的票价:不到3岁的观众免费; 3~12岁的观众为10美元;超过12岁的观众为15美元。请编写一个循环,在其中询问用户的年龄,指出其票价。
  19. python考研成绩查询_2020-09-08考研成绩预测模型
  20. Ubuntu18.04安装交叉编译工具链gcc-linaro-7.4.1-2019.02-x86_64_arm-linux-gnueabihf

热门文章

  1. 函数提升与变量提升的优先级
  2. 移动硬盘行货检测以东芝为例
  3. 人在旅途——》2018年10月6日上海欢乐谷
  4. 文本相似度算法Jaccard相似度(杰卡德相似度)java实现
  5. 微信小程序canvas绘图 绘图完成保存图片 附带代码和效果图
  6. mysql2012安装密钥_Win2012R2 x64 安装MySQL5.7.14压缩版
  7. 数据挖掘与数据分析应用
  8. C语言的文件IO操作,非常详细!!
  9. Java 命令行参数
  10. python命令行参数 空格_Python - 命令行参数