1、前言

工作中,我们很多时候需要根据某些状态的变化更新另一个业务的逻辑,比如订单的生成,成交等,需要更新或者通知其他的业务。我们通常的操作通过业务埋点、接口的调用或者中间件完成。

但是状态变化的入口比较多的时候,就很容易漏掉某些地方。代码维护起来也比较麻烦。今天介绍阿里出品的 【canal】中间件完成数据库字段的监听。

2、canal的简单介绍

canal详见介绍件官网:https://github.com/alibaba/canal

2.1 家族成员:

【canal.adapter】:客户端落地的适配以及功能

【canal.admin】:提供WebUI的管理界面

【canal.deployer】:canal服务

【canal.example】:客户端提供的demo

2.2 工作原理

3、 实践目标

使用canal监控mysql数据的变化,将变化的数据推送到kafka,并使用canal-admin动态管理需要监控的数据库表。

4、工具准备

其中kafka是依赖zookeeper的,所以也需要zookeeper。

5、配置并启动kafka

Kafka QuickStart

5.1 修改配置

vim config/server.properties

换成自己的IP

替换成自己zookeeper的地址

5.2 启动server

  • 启动zookeeper脚本

# bin/zkServer.sh start

  • 启动kafka脚本

# bin/kafka-server-start.sh -daemon config/server.properties &

  • 查看是否启动成功脚本

# jps -ml

此时kafka启动成功。

5.3 注意事项

值得注意的是官方文档中查看topic的命令,

# bin/kafka-topics.sh --list --zookeeper 192.168.1.110:2181

在心的kafka版本中已经改变,可移步kafka官方文档: Apache Kafka

新版本中使用bootstrap-server,如下

# bin/kafka-topics.sh --list --bootstrap-server localhost:9092 

6、启动canal-admin

6.1 修改配置

改成对应的ip

 6.2 执行 conf/canal.manage.sql

         该脚本是canal-admin的管理脚本。

 6.3 启动canal-admin

sh bin/startup.sh

 6.4 查看启动状态

6.5 访问页面

此时代表canal-admin已经启动成功,可以通过 http://127.0.0.1:8089/ 访问,

默认密码:admin/123456

7、启动canal-server

7.1 修改配置脚本

# vim conf/canal_local.properties

换成canal-admin的IP

7.2 启动服务 指定local

# sh bin/startup.sh local

7.3 查询启动状态

8、管理平台配置

8.1 查看canal服务的状态

8.2 配置实例

修改监听的数据库信息:

canal.instance.master.address=192.168.88.111:3306

canal.instance.dbUsername=***
canal.instance.dbPassword=***

#默认监听全库

canal.instance.filter.regex=test.test_user

#配置不可访问的库表

canal.instance.filter.black.regex=

#配置mq的主题/路由

canal.mq.topic=example

保存即可。

8.3 启动实例

9、编写客户端监听kafka的客户端

@Test
public void test01(){// 修改打印日志的级别,不然会不停的打印debug日志,影响阅读LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();Logger root = loggerContext.getLogger("root");root.setLevel(Level.INFO);//设置消费者属性Properties properties = new Properties();properties.put("bootstrap.servers","192.168.88.111:9092");//反序列化器,与生产者的序列化器相对应properties.put("key.deserializer", StringDeserializer.class);properties.put("value.deserializer", StringDeserializer.class);//设置消费者的消费者群组properties.put(ConsumerConfig.GROUP_ID_CONFIG,"example");// properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastestKafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);try {//消费者订阅主题(可以多个,支持正则表达式,进行模糊匹配)consumer.subscribe(Collections.singletonList("example"));System.out.println("-------------------------消费端准备就绪,等待消息接受------------------------------------");//kafka消费者是通过拉取的方式获得服务端消息while(true){//循环调用poll方法,获取数据。ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String, String> record:records){String topic = record.topic();String value = record.value();if (StringUtils.isNotEmpty(value)) {System.out.println(String.format("topic:%s;" + "value:%s", topic,value));}}}} finally {consumer.close();}
}

10、验证

修改数据库字段,可以接收到修改的信息

基于Canal+kafka监听数据库变化的最佳实践相关推荐

  1. 【做一下1】python 监听数据库变化

    前言 用的yolov5,作者自己写的loadStream函数就是依据 streams.txt里面的rtsp流地址列表来新建线程,然后实现多路监控的. 大体就是这个图里面说的,我已经是为了个整体业务,去 ...

  2. linkedin databus介绍——监听数据库变化,有新数据到来时通知其他消费者app,新数据存在内存里,多份快照...

    概要结构如下图. 图中显示:Search Index和Read Replicas等系统是Databus的消费者.当主OLTP数据库发生写操作时,连接其上的中继系统会将数据拉到中继中.签入在Search ...

  3. 01 canal监听数据库变化

    1.cannal 1.1 canal简介 canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据. canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的 ...

  4. springboot整合canal,监听MySQL binlog日志,实现增量同步

    有两个数据库,并不是主从关系,但是需要同步某张表,可以通过binlog日志,进行同步,前提是这两个数据库的要同步的表,表名和字段名需要一致. 当前项目连接的数据库(需要同步的数据库):base_pro ...

  5. Java实现监听文件变化的三种方法,推荐第三种

    背景 在研究规则引擎时,如果规则以文件的形式存储,那么就需要监听指定的目录或文件来感知规则是否变化,进而进行加载.当然,在其他业务场景下,比如想实现配置文件的动态加载.日志文件的监听.FTP文件变动监 ...

  6. spring boot+kafka+canal实现监听MySQL数据库

    spring boot+kafka+canal实现监听MySQL数据库 一.zookeeper安装 kafka依赖于zookeeper,安装kafka前先安装zookeeper 下载地址:Apache ...

  7. java利用canal监听数据库

    springcloud如何使用canal监听mysql数据库操作 canal是阿里巴巴旗下的一款开源项目,纯Java开发.基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQ ...

  8. 基于Golang的监听读取配置文件的程序包开发——simpleConfig_v1

    基于Golang的监听&读取配置文件的程序包开发--simpleConfig_v1 [阅读时间:约10分钟] 一.配置文件概述 二.系统环境&项目介绍 1.系统环境 2.项目的任务要求 ...

  9. kafka监听topic消费_分布式专题|最近一直死磕kafka设计原理,都肝吐了

    kafka架构图 kafka核心控制器 定义 在kafka集群中,会选举出一个broker作为控制器(controller),负责管理集群中所有的分区和副本的状态: 职责 监听broker变化,通过监 ...

最新文章

  1. 四层和七层负载均衡的区别
  2. Jupyter Notebook已出现“返祖现象”,这款工具让你在终端里使用它
  3. 删除的页面怎么恢复_手机短信删除了怎么恢复?手机短信恢复最新教程
  4. 计算机视觉应用的简称,计算机视觉应用之图像检索任务简单介绍
  5. CVPR 2017 ECO:《ECO: Efficient Convolution Operators for Tracking》论文笔记
  6. UML中的6大关系(关联、依赖、聚合、组合、泛化、实现)
  7. 一个适合于Python初学者的入门练手项目
  8. Cesium:鼠标监听事件绑定
  9. mysql监控优化(二)主从复制
  10. 基于Paddle Serving百度智能边缘BIE的边缘AI解决方案
  11. SQL从数据库导出数据到EXCEL换行的问题解决方法
  12. 和风天气更新数据失败原因分析
  13. 二、C++反作弊对抗实战 (进阶篇 —— 2.作弊器中常见断链隐藏DLL方法)
  14. 元素水平垂直居中的六种方式
  15. 先帝爷下南阳御驾三请,联东吴灭曹魏鼎足三分~~~
  16. 阿里云 centos7静默安装oracle12c,使用navicat 连接oracle数据库
  17. 无法识别 移动固态硬盘_M.2接口的固态硬盘无法识别?其实解决方法非常简单!...
  18. 根据c51程序改写汇编语言,Keil C51编译及连接技术
  19. Linux系统进入紧急模式(emergency mode)的处理方法
  20. 录音 voice record

热门文章

  1. 嵌入式系统求职回忆录1
  2. ZooKeeper Commands: The Four Letter Words
  3. mysql中如何根据表名找到库名
  4. window 10 安装node.js时遇到2502 2503错误解决方法
  5. 使用GitHub Actions通过CI提高代码质量
  6. Dev-C++5.11实现愤怒的小鸟
  7. Linux下集群的搭建
  8. 一直都在用的rgba与hex颜色系统你了解过吗?
  9. pip 查看可安装版本
  10. Oracle查询数据提示ORA-00942:表或视图不存在