写在前面

今天不学习,明天变垃圾。最近在学习如何使用canal监听binlog并且将数据同步到es,俗话说好记性不如烂笔头,所以写一篇文章记录一下,一是为了健忘的自己,二是为了恰好有此需求的小可爱(程序员都是小可爱)欢迎大家学习讨论。

一、开启mysql的binlog写入功能

1.在mysql的my.ini配置文件中加入下面的配置

log-bin=mysql-bin
binlog-format=ROW
server_id=1

2.重启mysql服务

查看是否开启log_bin,在mysql中执行:show variables like '%log_bin%';

看到login_bin为ON代表已经开启

3.创建一个有相关权限的mysql slave账号

# 创建账号
CREATE USER canal IDENTIFIED WITH MYSQL_NATIVE_PASSWORD BY 'canal';
# 给账号赋权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
# 刷新
FLUSH PRIVILEGES;

二、下载canal.deployer

canal下载地址:Releases · alibaba/canal · GitHub

1.修改conf/canal.properties配置文件

# zookeeper集群的地址和端口
canal.zkServers = 127.0.0.1:2181
# 默认为tcp,我这里选择将监听到的消息发送到rocketMQ
canal.serverMode = rocketmq
# 当前server上部署的instance列表,默认为example
canal.destinations = example
# mq的地址和端口
canal.mq.servers = 127.0.0.1:9876
# 创建的mysql slave账号和密码
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
##################################################
#########           RocketMQ         #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false

2.修改conf/example/instance.properties配置文件

# mysql从服务Id,未被其他mysql服务使用即可
canal.instance.mysql.slaveId = 1234
# mysql主服务的地址和端口
canal.instance.master.address = 127.0.0.1:3306
# 指定要监听的数据库
canal.instance.defaultDatabaseName = test
# username/password 数据库的用户名和密码
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
# 需要监听的表的表达式,我这里只监听了test数据库下的binlog_test表
# 1.  所有表:.*   or  .*\\..*
# 2.  canal schema下所有表: canal\\..*
# 3.  canal下的以canal打头的表:canal\\.canal.*
# 4.  canal schema下的一张表:canal.test1
# 5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
canal.instance.filter.regex=test\\.binlog_test
# rocketMQ的topic,如果使用rabbitMQ此处配置rabbitMQ的routingkey
canal.mq.topic=ROCKET_TEST

三、启动canal服务

1.启动canal服务:\bin\startup.bat

2.启动zookeeper:\bin\zkServer.cmd

3.启动rocketMQ

这里给大家推荐一个rocketMQ的详细安装使用教程,我这里就不再赘述了,点击传送门

四、修改表中的数据,查看结果

表中原数据

修改其中一条数据然后保存

然后到rocketMQ的可视化界面查看,此时已经出现了一条消息,说明canal监听成功

点开消息详情可以看到具体的消息变更内容

五、编写rocketMQ的消费者,将数据添加到es

1.创建一个项目,添加相关依赖

<dependencies><!-- web依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
​<!-- rocketMQ依赖 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.1</version></dependency>
​<!-- lombok依赖 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
​<!-- es依赖 --><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-elasticsearch</artifactId></dependency>
​<dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope><version>4.12</version></dependency>
</dependencies>

2.编写一个dto(数据传输对象)用来接收消息中的消息体,因为我们只需要拿到消息体

@Data
public class RocketConsumerDTO {private JSONArray data;private String database;private Long es;private JSONArray old;private String table;private Long ts;private String type;
}

3.编写es的文档对象

@Document(indexName = "user",type = "docs")
@Data
public class UserDoc {@Idprivate Long id;/** 用户名 **/@Field(type = FieldType.Text,analyzer = "ik_smart",searchAnalyzer = "ik_smart")private String name;/** 年龄 **/private Integer age;/** 状态 **/private Integer status;
}

4.编写rocketMQ的消费者

public void testRocketConsumer() throws MQClientException {DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("CANAL_TEST_GROUP");//设置name server 地址defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");//从开始位置消费defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//广播模式defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);//订阅defaultMQPushConsumer.subscribe("canal_test", "");//注册消息监听器defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext           consumeConcurrentlyContext) {try {for (MessageExt message : list) {// 获取消息体,将消息体转换为dtoRocketConsumerDTO rocketMq = JSON.parseObject(message.getBody(),                                            RocketConsumerDTO.class);// 获取data的数据,data中的数据就是数据库中修改过的新数据JSONArray data = rocketMq.getData();// 遍历data数组,拿到每一条数据for (int i = 0; i < data.size(); i++) {// 通过循环获取对象,一个对象对应数据库中的一条记录JSONObject obj = data.getJSONObject(i);// 将获取到的对象转换为文档对象UserDoc userDoc = JSON.parseObject(JSON.toJSONString(obj), UserDoc.class);// 通过repository添加到es中esRepository.save(userDoc);}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}catch (Exception e){return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}});defaultMQPushConsumer.start();// 这里我使用的spring的单元测试,让程序睡眠一会儿,不然消费者一创建就会死亡,没有消费时间try {Thread.sleep(10*1000);} catch (InterruptedException e) {e.printStackTrace();}
}

执行消费者后,可以在kibana中看到数据已经被同步到es中了

六、结语

千山万水总是情,给我点赞行不行,人间自有真情在,一键三连会更帅。初写文章,不善表达,多多包涵,本篇文章到这里就结束了,欢迎大家学习讨论。

使用canal监听binlog将数据发送到RocketMQ同步到es相关推荐

  1. canal 监听不到数据变化_数据的异构实战(二)手写迷你版同步工程

    点击上方"Java知音",选择"置顶公众号" 技术文章第一时间送达! 上一期讲到了通过canal订阅mysql的binlog日志并且转换为对象,那么这一次我们将 ...

  2. 单机canal监听binlog

    文章目录 简介 环境部署 MySQL Canal安装 Canal客户端开发 创建client_demo项目 Maven依赖 在canal_demo模块创建包结构 开发步骤 Canal消息格式 转换为J ...

  3. [Spring cloud 一步步实现广告系统] 15. 使用开源组件监听Binlog 实现增量索引准备...

    MySQL Binlog简介 什么是binlog? 一个二进制日志,用来记录对数据发生或潜在发生更改的SQL语句,并以而进行的形式保存在磁盘中. binlog 的作用? 最主要有3个用途: 数据复制( ...

  4. Canal监听mysql的binlog日志实现数据同步

    Canal监听mysql的binlog日志实现数据同步 1. canal概述 1.1 canal简介 1.2 技术选型 1.3 原理分析 1.3.1 MySQL主备复制原理 1.3.2 canal原理 ...

  5. 监听mysql表内容变化 使用canal,canal 监听同步指定数据库,所有表

    canal 监听同步指定数据库,所有表 canal 监听同步指定数据库,所有表 因为工作需求,需要用到数据库同步,又从网上找了一些发现都有些问题,所以自己弄好之后写一篇总结,及配置步骤吧 先将 MyS ...

  6. java利用canal监听数据库

    springcloud如何使用canal监听mysql数据库操作 canal是阿里巴巴旗下的一款开源项目,纯Java开发.基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQ ...

  7. Canal监听MySQL

    Canal监听MySQL 1.Mysql数据库开启binlog模式 注意:Mysql容器,此处Mysql版本为5.7 #进入容器 docker exec -it mysql /bin/bash #进入 ...

  8. canal 监听同步指定数据库,刷新redis缓存

    最近工作中需要使用到缓存,但是由于在业务实现的时候刷新缓存总会出现一些缓存不一致问题.于是最终想采用canal监听来处理数据一致性问题. 查看mysql binlog日志是否开启: 1.配置mysql ...

  9. Canal监听阿里云RDS Mysql踩坑

    Canal监听阿里云RDS Mysql中间的坑 canal下载安装,mysql binlog开启常规操作,照着github上的指导就完事了 1)需要保证账号有如下权限,让管理员大大开一下,否则报权限错 ...

最新文章

  1. Linux之查看目录命令
  2. python ipython notebook或者 jupyter notebook 的安装
  3. LogMiner学习笔记
  4. superset可视化-country map
  5. 学号20145209《信息安全系统设计基础》第11周学习总结
  6. Codeforces Round #245 (Div. 1) E. Points and Segments 欧拉回路 + 建模
  7. linux 的间隔定时器函数setitimer
  8. feign传递多个对象_面向对象
  9. 《Total Commander:万能文件管理器》——第4.5节.其他补充
  10. 仿小米商城html网页源码
  11. 从生活中领悟设计模式(Python)
  12. 【树莓派 + 深度学习 + Python】从零开始做一个你画AI猜的小游戏
  13. 计算机英语阅读路线,计算机经典英语短文阅读
  14. $(origin variable;)
  15. 如何使用虚拟专用网络登录网络设备?
  16. man fgetc fgets getc getchar ungetc
  17. Bookmark Sentry – 检查重复、删除死链书签 Chrome扩展
  18. gps android 卫星位置,Android手机GPS获取卫星数量不正确问题
  19. python微信好友分析源代码_Python简单分析微信好友
  20. C# 添加、删除PPT水印

热门文章

  1. mysql合并2个sql的查询结果
  2. 远程调试weinre的使用
  3. 在cocos2d-x中使用DragonBones实现骨骼动画
  4. Java 生成本文文件的时候,Dos格式转成Unix格式
  5. FCM聚类算法详解(Python实现iris数据集)
  6. 安卓CPU架构梳理 应用商店64位APP上架适配
  7. Access-Control-Expose-Headers
  8. 操作系统---IO控制方式
  9. 天津天地伟业程序员怎么样_天津最强民营企业排行(2020年6月)
  10. php curl nginx 报错,【Docker】docker,nginx,php使用curl报错?