点击上方“Java知音”,选择“置顶公众号”

技术文章第一时间送达!

上一期讲到了通过canal订阅mysql的binlog日志并且转换为对象,那么这一次我们将订阅来的对象通过RocketMQ发送消息,接收方接受消息之后同时存储到其他类型的数据源当中,完成一个简单的数据异构的过程。


什么是Java消息服务?

两个应用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持JAVA应用程序开发。

在J2EE中,当两个应用程序使用JMS进行通信时,它们之间并不是直接相连的,而是通过一个共同的消息收发服务连接起来,可以达到解耦的效果,我们将会在接下来的教程中详细介绍。

jms的消息传送模型

常见的消息传送模型有以下两种:

点对点消息传送模型

在点对点消息传送模型中,应用程序由消息队列,发送者,接收者组成。每一个消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息(除了被接收者消费掉的和过期的消息)。如下图所示:


发布订阅消息传送模型

在发布订阅模型中,消费者需要订阅相关的topic才能接收到生产者的信息。生产者会将信息传输到topic中,然后消费者只需要从topic中获取数据即可。如下图所示:


RocketMQ消息队列使用

这次使用的消息中间件为RocketMQ的使用场景。RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache基金会,并于2016年11月成为 Apache 孵化项目。

RocketMQ在使用之前,需要我们引入相关的依赖配置:

            <dependency>            <groupId>org.apache.rocketmqgroupId>            <artifactId>rocketmq-clientartifactId>            <version>${rocketmq.version}version>        dependency>

关于RocketMQ的安装在这里就不做过多的讲解了。

通过mq的方式来进行数据异构通常是比较简单的方案,首先我们需要在项目里面独立一个模块专门用于监听mysql的binlog日志,这个模块我暂且称之为datahandle-core模块


整个工程采用了springboot的结构来构建,主要的核心也是在core工程中。

首先是监听canal的日志状态模块了,采用了上一节中讲解到的客户端代码进行数据监听,并且将其转换为对象然后发送往mq中:

package com.sise.datahandle.core;

import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.Message;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.CommandLineRunner;import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;

import static com.sise.datahandle.constants.CanalConstants.*;

/** * @author idea * @date 2019/10/20 */@Component@Slf4jpublic class CanalListener implements CommandLineRunner {

    @Autowired    private CanalClient canalClient;

    @Override    public void run(String... args) throws Exception {      log.info("=============canal监听器开启===============");        CanalConnector canalConnector = CanalConnectors.newSingleConnector(                new InetSocketAddress(SERVER_ADDRESS, PORT), DESTINATION, USERNAME, PASSWORD);        canalConnector.connect();        canalConnector.subscribe(".*\\..*");        canalConnector.rollback();        for (; ; ) {            Message message = canalConnector.getWithoutAck(100);            long batchId = message.getId();            if (batchId != -1) {                canalClient.entityHandle(message.getEntries());            }        }    }}

ps:这里面的CanalClient代码主要来自上一篇的canal客户端代码,文末会有完整项目代码链接,需要的读者可以前往查看。

在CanalClient里面,有一个函数是专门用于处理将订阅的数据发送到mq消息队列中:

package com.sise.datahandle.core;

import com.alibaba.fastjson.JSON;import com.alibaba.otter.canal.protocol.CanalEntry;import com.google.protobuf.InvalidProtocolBufferException;import com.sise.datahandle.handler.CanalDataHandler;import com.sise.datahandle.model.TypeDTO;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.remoting.exception.RemotingException;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;

import java.util.List;

/** * canal监听客户端变化 * * @author idea * @date 2019/10/12 */@Slf4j@Servicepublic class CanalClient {

    @Autowired    private DefaultMQProducer rocketMqProducer;

    /**     * 处理binlog日志的监听     *     * @param entries     */    public void entityHandle(List entries) {        for (CanalEntry.Entry entry : entries) {            if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {                continue;            }            try {                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {                    switch (rowChange.getEventType()) {                        case INSERT:                            String tableName = entry.getHeader().getTableName();                            //测试选用t_type这张表进行映射处理                            if ("t_type".equals(tableName)) {                                TypeDTO typeDTO = CanalDataHandler.convertToBean(rowData.getAfterColumnsList(), TypeDTO.class);                                org.apache.rocketmq.common.message.Message message = new org.apache.rocketmq.common.message.Message();                                message.setTopic("canal-test-topic");                                message.setTags("canal-test-tag");                                String json = JSON.toJSONString(typeDTO);                                message.setBody(json.getBytes());                                SendResult sendResult = rocketMqProducer.send(message);                                log.info("[mq消息发送结果]----" + sendResult);                            }                            break;                        default:                            break;                    }                }            } catch (InvalidProtocolBufferException e) {                log.error("[CanalClient]监听数据过程出现异常,异常信息为{}", e);            } catch (InterruptedException | RemotingException | MQClientException | MQBrokerException e) {                log.error("[CanalClient] mq发送信息出现异常:{}", e);            }        }    }

}

这里面主要是监听binlog记录为插入数据事件的时候做发送mq操作。

接下来便是常见的mq配置了,本工程主要是一个模拟的简单案例,因此我将consumer和producer都放在了一起方便测试。


通过springboot自身的properties文件对mq进行参数初始化配置之后便可以构建一个基本的consumer和producer了。这里我们拿一个TypeDto类来进行树异构的测试,consumer端的核心代码为:

package com.sise.datahandle.mq.rocketmq.consumer;

import com.sise.datahandle.model.TypeDTO;import com.sise.datahandle.mq.rocketmq.producer.RocketMqMsgHandle;import com.sise.datahandle.redis.RedisService;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.message.MessageExt;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import org.springframework.util.CollectionUtils;

import java.util.List;

/** * @author idea * @date 2019/10/20 */@Component@Slf4jpublic class RocketMqConsumeMsgListenerProcessor implements MessageListenerConcurrently {

    @Autowired    private RedisService redisService;

    @Override    public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {if(CollectionUtils.isEmpty(msgs)){            log.info("接受到的消息为空,不处理,直接返回成功");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        }        MessageExt messageExt = msgs.get(0);        System.out.println("接受到的消息为:"+messageExt.toString());if("canal-test-topic".equals(messageExt.getTopic())){if("canal-test-tag".equals(messageExt.getTags())){                int reconsume = messageExt.getReconsumeTimes();if(reconsume ==3){//消息已经重试了3次,如果不需要再次消费,则返回成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;                }                TypeDTO typeDTO = RocketMqMsgHandle.parseMessage(messageExt,TypeDTO.class);//存储进入redis中                redisService.setObject("typeDTO-"+System.currentTimeMillis(),typeDTO);            }        }// 如果没有return success ,consumer会重新消费该消息,直到return successreturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;    }}

通过订阅mq的信息,读取相关的数据再次写入到redis里面,完成一个简单过程的数据异构。

整个迷你工程写下来,比较核心的地方就在于对binlog日志的解析器部分,如何将日志订阅之后转换为相应的对象进行处理。

通常采用mq的方式进行数据异构会相对简单,实际上是在监听binlog为写DB的同时去写一次MQ,但是这种方式不能够保证数据一致性,就是不能保证跨资源的事务。注:调用第三方远程RPC的操作一定不要放到事务中。

完整案例的代码链接如下(点击阅读原文直达):

https://gitee.com/IdeaHome_admin/wfw

推荐阅读(点击即可跳转阅读)

1.SpringBoot内容聚合

2.面试题内容聚合

3.设计模式内容聚合

4.Mybatis内容聚合

5.多线程内容聚合

觉得不错?欢迎转发分享给更多人

我知道你 “在看

canal 监听不到数据变化_数据的异构实战(二)手写迷你版同步工程相关推荐

  1. 数据的异构实战(二)手写迷你版同步工程

    上一期讲到了通过canal订阅mysql的binlog日志并且转换为对象,那么这一次我们将订阅来的对象通过RocketMQ发送消息,接收方接受消息之后同时存储到其他类型的数据源当中,完成一个简单的数据 ...

  2. Canal监听mysql的binlog日志实现数据同步

    Canal监听mysql的binlog日志实现数据同步 1. canal概述 1.1 canal简介 1.2 技术选型 1.3 原理分析 1.3.1 MySQL主备复制原理 1.3.2 canal原理 ...

  3. ios 监听数组个数的变化_【iOS】KVO方式监听数组的变化动态刷新tableView

    写作本文来由:   iOS默认不支持对数组的KVO,因为普通方式监听的对象的地址的变化,而数组地址不变,而是里面的值发生了改变 整个过程需要三个步骤 (与普通监听一致) /* *第一步建立观察者及观察 ...

  4. angular监听输入框值的变化_如何检测Angular中@Input()值何时发生变化?

    Actually, there are two ways of detecting and acting up on when an input changes in the child compon ...

  5. 监听mysql表内容变化 使用canal_2 监听mysql表内容变化,使用canal

    mysql本身是支持主从的(master slave),原理就是master产生的binlog日志记录了所有的增删改语句,将binlog发送到slave节点进行执行即可完成数据的同步. canal是阿 ...

  6. 监听mysql表内容变化 使用canal,canal 监听同步指定数据库,所有表

    canal 监听同步指定数据库,所有表 canal 监听同步指定数据库,所有表 因为工作需求,需要用到数据库同步,又从网上找了一些发现都有些问题,所以自己弄好之后写一篇总结,及配置步骤吧 先将 MyS ...

  7. 使用canal监听binlog将数据发送到RocketMQ同步到es

    写在前面 今天不学习,明天变垃圾.最近在学习如何使用canal监听binlog并且将数据同步到es,俗话说好记性不如烂笔头,所以写一篇文章记录一下,一是为了健忘的自己,二是为了恰好有此需求的小可爱(程 ...

  8. canal 监听同步指定数据库,刷新redis缓存

    最近工作中需要使用到缓存,但是由于在业务实现的时候刷新缓存总会出现一些缓存不一致问题.于是最终想采用canal监听来处理数据一致性问题. 查看mysql binlog日志是否开启: 1.配置mysql ...

  9. java利用canal监听数据库

    springcloud如何使用canal监听mysql数据库操作 canal是阿里巴巴旗下的一款开源项目,纯Java开发.基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQ ...

最新文章

  1. 智办事2.0APP全新发布,不可错过的3个亮点!
  2. Tomcat 详解 一
  3. mybatis 2 -常用数据操作
  4. high-speed A/D performance metrics and Amplifie...
  5. RabbitMQ, ZeroMQ, Kafka 是一个层级的东西吗, 相互之间有哪些优缺点
  6. ROS通过串口,读写STM32和HC-SR04超声波测距信息
  7. vscode保存代码,自动按照eslint规范格式化代码设置
  8. 米兔机器人第三代测评_米兔智能机器人——年轻人的第一台 31313?
  9. Typora配置smms图床
  10. 基于人脸识别技术实战开发人证比对访客系统
  11. 为什么unity中我的模型是红颜色的
  12. 用python爬取全国和全球疫情数据,并进行可视化分析(过程详细代码可运行)
  13. java软件工程师自我评价_java软件工程师自我评价
  14. python 中文分析句子成分_语文句子成分分析详解
  15. 【Linux】VM与Linux的安装
  16. 地图下面的标尺是什么意思_机油标尺怎么看图解,汽车机油标尺正常位置示意图...
  17. Linux文件或者文件内容搜索
  18. mybatis生成的Example实例函数使用及详解
  19. unity实现牧师与魔鬼问题回答
  20. 【工作时间打王者】eBest 首届“荣耀杯”比赛开幕啦

热门文章

  1. 矩阵在计算机程序中的应用
  2. 2019 好笔友-见字如面
  3. 大数据之数据库mysql优化实战(一)
  4. arm下如何烧写指定分区大小的内核和文件系统
  5. ORACLE10g安装
  6. 10个相似图片搜索以图找图的网站
  7. ie浏览器里面无法输入文字:
  8. 《央视-走遍中国》,没机会亲自去看,了解一下也好
  9. 一个c语言程序什么时候结束,新人求救,写了一个C语言程序,输入完数据后就结束了!!!...
  10. abap中的弹出窗口函数