一 引言

最近在项目中发现消息的延迟消费是通过redis的过期消息监听,存在很大的安全问题,由于服务的宕机或其他问题会导致消息的丢失,本想系采用延迟队列和死信队列完成消息的延迟消费,但这种方案存在一定的局限性,当队列中第一个消息未过期时,后面过期了的消息也不会投递到死信队列中,这样会造成消息的阻塞,这种方案对时间精度要求不是很高时,可以采用,但时间精度要求比较高时就会存在一定的局限性
rabbitmq官方给我提供了rabbitmq_delayed_message_exchange用于解决消息阻塞问题,本文重点介绍基于自定义springboot组件扩展mq实现延迟消息的消费问题.自定义mq组件扩展可以参考笔者的这篇文章自定义springboot组件–基于模板模式对原生springboot的rabbitmq组件进行扩展

二 引入mq延迟插件

2.1 下载插件

登录mq管理平台查看对应的版本号

在 RabbitMQ 的 3.5.7 版本之后,提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列 ,同时需保证 Erlang/OPT 版本为 18.0 之后。
我这里 MQ 的版本是 3.9.11,现在去 GitHub 上根据版本号下载插件
点击下载插件
根据自己的版本号自行下载即可

2.2 制作包含延迟插件的镜像

将下载好的插件安装mq,这里我们采用Dockerfile制作相应的镜像,避免每次启动docker时都去拷贝插件进行相应的插件部署

FROM rabbitmq:management
COPY ./rabbitmq_delayed_message_exchange-3.9.0.ez /plugins
#开启插件
RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange
#开启webUI
RUN rabbitmq-plugins enable rabbitmq_management

发布到自己的镜像仓库

三 扩展mq组件,集成延迟插件

申明自定义元数据

package com.cncloud.cncloud.common.mq.metadata;import com.cncloud.cncloud.common.mq.common.ExchangeTypeEnum;/*** @author likun* @date 2022年11月21日 17:21*/
public abstract class CustomMessageMetadata implements MessageMetadata{@Overridepublic ExchangeTypeEnum getExchangeType() {return ExchangeTypeEnum.CUSTOM;}/*** 获取交换机名* @return*/public abstract String getExchange();
}

申明解析器

package com.cncloud.cncloud.common.mq.metadata.resolver;import com.cncloud.cncloud.common.mq.common.ExchangeTypeEnum;
import com.cncloud.cncloud.common.mq.metadata.CustomMessageMetadata;
import com.cncloud.cncloud.common.mq.metadata.MessageMetadata;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;import java.util.HashMap;
import java.util.Map;/*** @author likun* @date 2022年11月21日 17:23*/
public class CustomMessageMetadataResolver extends AbstractMessageMetadataResolver{public CustomMessageMetadataResolver(RabbitAdmin rabbitAdmin) {super(rabbitAdmin);}@Overridepublic boolean isSupport(MessageMetadata messageMetadata) {return ExchangeTypeEnum.CUSTOM.equals(messageMetadata.getExchangeType());}@Overridepublic boolean doResolve(MessageMetadata messageMetadata) {CustomMessageMetadata customMessageMetadata = (CustomMessageMetadata) messageMetadata;Map<String, Object> args = messageMetadata.getQueueArgs();if (args==null){args=new HashMap();args.put("x-delayed-type", "direct");}CustomExchange exchange = new CustomExchange(customMessageMetadata.getExchange(), "x-delayed-message", true, false, args);Queue queue = new Queue(customMessageMetadata.getQueue(), true, false, false, customMessageMetadata.getQueueArgs());Binding binding = BindingBuilder.bind(queue).to(exchange).with(messageMetadata.getQueue()).noargs();declareQueue(queue);declareExchange(exchange);declareBinding(binding);return true;}
}

申明消息发射器

package com.cncloud.cncloud.common.mq.producer;import com.alibaba.fastjson.JSON;
import com.cncloud.cncloud.admin.api.entity.MqSendLog;
import com.cncloud.cncloud.admin.api.feign.RemoteUserService;
import com.cncloud.cncloud.common.core.constant.SecurityConstants;
import com.cncloud.cncloud.common.core.message.base.DelayMessage;
import com.cncloud.cncloud.common.core.util.SpringContextHolder;
import com.cncloud.cncloud.common.core.util.UniqueIdUtil;
import com.cncloud.cncloud.common.mq.common.Constant;
import com.cncloud.cncloud.common.mq.common.ExchangeTypeEnum;
import com.cncloud.cncloud.common.mq.metadata.CustomMessageMetadata;
import com.cncloud.cncloud.common.mq.metadata.MessageMetadata;
import com.cncloud.cncloud.common.mq.metadata.resolver.MessageMetadataResolver;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.lang.NonNull;import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;/*** @author likun* @date 2021/6/23 15:22*/
public class CustomMessageDelivery extends AbstractMessageDelivery<CustomMessageMetadata> {public CustomMessageDelivery(@NonNull MessageMetadataResolver messageMetadataResolver, @NonNull RabbitTemplate rabbitTemplate) {super(messageMetadataResolver, rabbitTemplate);}public CustomMessageDelivery(@NonNull MessageMetadataResolver messageMetadataResolver, @NonNull RabbitTemplate rabbitTemplate, @NonNull MessageNotDeliveredCallback messageNotDeliveredCallback, boolean autoDeclareMQ) {super(messageMetadataResolver, rabbitTemplate, messageNotDeliveredCallback, autoDeclareMQ);}@Overridepublic boolean isSupport(MessageMetadata messageMetadata) {return ExchangeTypeEnum.CUSTOM.equals(messageMetadata.getExchangeType());}@Overrideprotected <T> boolean doDeliver(CustomMessageMetadata messageMetadata, T data){// 保存消息投递记录Message message = createMessage(data);String msgClsName = data.getClass().getName();String msgContent = JSON.toJSONString(data);MqSendLog mqSendLog = new MqSendLog();mqSendLog.setId(UniqueIdUtil.genId());mqSendLog.setExchange(messageMetadata.getExchange());mqSendLog.setMsgContent(msgContent);mqSendLog.setMsgClsName(msgClsName);mqSendLog.setMsgid(message.getMessageProperties().getCorrelationId());mqSendLog.setQueue(messageMetadata.getQueue());Date date = new Date(System.currentTimeMillis() + 1000 * 60 * Constant.MSG_TIMEOUT);Instant instant = date.toInstant();ZoneId zoneId = ZoneId.systemDefault();LocalDateTime localDateTime = instant.atZone(zoneId).toLocalDateTime();mqSendLog.setTrytime(localDateTime);RemoteUserService remoteUserService = SpringContextHolder.getBean(RemoteUserService.class);remoteUserService.saveMqSendLog(mqSendLog, SecurityConstants.FROM_IN);rabbitTemplate.convertAndSend(messageMetadata.getExchange(), messageMetadata.getQueue(),message,new CorrelationData(message.getMessageProperties().getCorrelationId()));return true;}@Overridepublic Message createMessage(Object data){String msgId = String.valueOf(UniqueIdUtil.genId());SimpleMessageConverter simpleMessageConverter = new SimpleMessageConverter();MessageProperties messageProperties = new MessageProperties();messageProperties.setCorrelationId(msgId);// 自定义MessageProperties扩展if (data instanceof DelayMessage){DelayMessage delayMessage = (DelayMessage) data;Message message = simpleMessageConverter.toMessage(data, messageProperties);message.getMessageProperties().setHeader("x-delay", delayMessage==null?0L:delayMessage.getDelayTime());return message;}Message message = simpleMessageConverter.toMessage(data, messageProperties);return message;}
}

完善相应的静态代理

四 客户端实现消息的延迟消费

申明自定义message,必须实现DelayMessage, Serializable这两个接口

DelayMessage 用于组件中获得延迟时间,Serializable接口便于消息投递时消息的序列化

package com.cncloud.cncloud.common.core.message.base;/*** @author likun* @date 2022年11月21日 17:43*/
public interface DelayMessage {/*** 获得延迟执行时间* @return*/Long getDelayTime();
}
package com.cncloud.cncloud.common.core.message.dto;import com.cncloud.cncloud.common.core.message.base.DelayMessage;
import lombok.Data;import java.io.Serializable;/*** @author likun* @date 2022年11月21日 17:47*/
@Data
public class TestDelayMessage implements DelayMessage, Serializable {private static final long serialVersionUID = -516414641899843714L;/*** 消息*/private String msg;/*** 延迟时间*/private Long millisecond;@Overridepublic Long getDelayTime() {return this.millisecond;}
}

申明队列的元数据中响应的配置

public class DelayMessageMetadata extends CustomMessageMetadata {@Overridepublic String getExchange() {return MessageMetadataConstant.DelayMessageMetadataConstant.EXCHANGE;}@Overridepublic String getQueue() {return MessageMetadataConstant.DelayMessageMetadataConstant.QUEUE;}
}
package com.cncloud.cncloud.admin.message.config;import com.cncloud.cncloud.admin.producer.matadata.CallDeadMessageMetadata;
import com.cncloud.cncloud.admin.producer.matadata.CallTtlMessageMetadata;
import com.cncloud.cncloud.admin.producer.matadata.DelayMessageMetadata;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author likun* @date 2021/6/23 16:14*/
@Configuration
public class MessageConfig {/*** 通话延迟队列元数据* @return*/@Beanpublic CallTtlMessageMetadata callTtlMessageMetadata(){return new CallTtlMessageMetadata();}/*** 通话死信队列元数据* @return*/@Beanpublic CallDeadMessageMetadata callDeadMessageMetadata(){return new CallDeadMessageMetadata();}@Beanpublic DelayMessageMetadata delayMessageMetadata(){return new DelayMessageMetadata();}
}

申明消息监听器

package com.cncloud.cncloud.admin.message.consum;import com.cncloud.cncloud.admin.producer.matadata.DelayMessageMetadata;
import com.cncloud.cncloud.common.core.message.dto.TestDelayMessage;
import com.cncloud.cncloud.common.mq.consumer.SimpleDynamicMessageListener;
import com.cncloud.cncloud.common.mq.metadata.resolver.MessageMetadataResolver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** @author likun* @date 2022年11月21日 17:54*/
@Component
@Slf4j
public class DelayMessageListener extends SimpleDynamicMessageListener<TestDelayMessage> {public DelayMessageListener(MessageMetadataResolver messageMetadataResolver) {super(new DelayMessageMetadata(), messageMetadataResolver);}@Overridepublic void onMessage(TestDelayMessage testDelayMessage) {log.info("监听到客户端消息:{},延迟时间为:{}",testDelayMessage.getMsg(),testDelayMessage.getDelayTime());}
}

五 延迟消息测试

package com.cncloud.cncloud.admin.controller;import com.cncloud.cncloud.admin.producer.matadata.DelayMessageMetadata;
import com.cncloud.cncloud.common.core.message.dto.TestDelayMessage;
import com.cncloud.cncloud.common.core.util.R;
import com.cncloud.cncloud.common.mq.producer.MessageDelivery;
import com.cncloud.cncloud.common.security.annotation.Inner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/demo/message")
public class MessageDemoController {@Autowiredprivate MessageDelivery messageDelivery;@Autowiredprivate DelayMessageMetadata delayMessageMetadata;@GetMapping("/message")@Inner(value = false)public R message(@RequestParam("message") String message) throws ClassNotFoundException {TestDelayMessage testDelayMessage1 = new TestDelayMessage();testDelayMessage1.setMsg("数据测试1");testDelayMessage1.setMillisecond(10000L);TestDelayMessage testDelayMessage2 = new TestDelayMessage();testDelayMessage2.setMsg("数据测试2");testDelayMessage2.setMillisecond(5000L);messageDelivery.deliver(delayMessageMetadata,testDelayMessage1);messageDelivery.deliver(delayMessageMetadata,testDelayMessage2);return R.ok();}
}

测试结果:

10s的延迟消息先投递5s的延迟消息后投递,但是客户端先收到5s的延迟消息,说明消息并没有发生相应的阻塞

扩展自定义mq组件,使用rabbitmq_delayed_message_exchange延迟组件,完善消息延迟消息精度问题相关推荐

  1. php 小程序自定义图,微信小程序之如何使用自定义组件封装原生 image 组件

    零.问题的由来 一般在前端展示图片时都会碰到这两个常见的需求: 图片未加载完成时先展示占位图,等到图片加载完毕后再展示实际的图片. 假如图片链接有问题(比如 404),依然展示占位图.甚至你还可以增加 ...

  2. 组件分享之后端组件——一个简单且高度可扩展的分布式文件系统seaweedfs

    组件分享之后端组件--一个简单且高度可扩展的分布式文件系统seaweedfs 背景 近期正在探索前端.后端.系统端各类常用组件与工具,对其一些常见的组件进行再次整理一下,形成标准化组件专题,后续该专题 ...

  3. layui树组件(扩展为带有图标的layui树组件)

    项目场景: 说起来layui前端框架,做后端开发的也是再熟悉不过了,模块开发,友好的UI界面,对前端后端人员都十分友好,当然,文档和示例也算是相当完备了.但是在开发项目时会遇到这样一种情况,就是想实现 ...

  4. 【Android 逆向】启动 DEX 字节码中的 Activity 组件 ( 使用 DexClassLoader 获取组件类失败 | 失败原因分析 | 自定义类加载器没有加载组件类的权限 )

    文章目录 一.使用 DexClassLoader 获取组件类失败报错 二.失败原因分析 一.使用 DexClassLoader 获取组件类失败报错 在上一篇博客 [Android 逆向]启动 DEX ...

  5. [vue] vue自定义事件中父组件怎么接收子组件的多个参数?

    [vue] vue自定义事件中父组件怎么接收子组件的多个参数? 子组件传递多个参数,父组件用展开运算符获取 个人简介 我是歌谣,欢迎和大家一起交流前后端知识.放弃很容易, 但坚持一定很酷.欢迎大家一起 ...

  6. 微信小程序下拉框插件_微信小程序自定义select下拉选项框组件的实现代码_清玖_前端开发者...

    知识点:组件,animation,获取当前点击元素的索引与内容 微信小程序中没有select下拉选项框,所以只有自定义.自定义的话,可以选择模板的方式,也可以选择组件的方式来创建. 这次我选择了组件, ...

  7. 微信小程序(组件:路由、表单、媒体、自定义组件,插槽、组件通讯、侦听器、生命周期)

    三.微信小程序组件 组件就是小程序页面的组成结构,与html在web网页开发中的作用一样,铺设页面.可以参考其他UI库,像elementUI,vantUI组件 组件是视图层的基本组成单元. 组件自带一 ...

  8. 微信小程序自定义组件,和 父子组件 之间的传值

    自定义组件 父组件 向 子组件 传参 子组件 向 父组件 传参

  9. 微信小程序—自定义(城市选择)弹窗组件,将弹窗组件的值传给调用页面并显示(图文)

    微信小程序-自定义(城市选择)弹窗组件,并传值 1.新建component文件夹用来存放自定义组件,并在其文件夹内新建cityModal Component,注意不是新建Page 新建完成之后出现四个 ...

最新文章

  1. 公众号第三方平台开发 获取 component_verify_ticket
  2. WINCE---内核(kernel)---内存架构(memory archtitecture)学习
  3. openssl aes加解密的使用
  4. 笔记-高项案例题-2017年下-整体管理-变更管理
  5. 简单记录js中的this关键字
  6. oracle rtrim(),Oracle ltrim() rtrim() 函数详细用法
  7. pycharm appiunm 公众号测试_知道答案公众号_知到APP笔尖上的艺术——书法基础与赏析单元测试答案_知道答...
  8. 克鲁赛德战记服务器无响应,克鲁赛德战记闪退黑屏登不上怎么办 解决方法
  9. python设置excel表样式
  10. 人工智能会用绝对的力量,用更高层次的方式直接进入你的工作行业
  11. workman实现websocket聊天nginx的配置
  12. 领带的打法10种——男士必看(图)
  13. jquery $.each(data, function (index, value) {
  14. Spring Data JPA REST Query QueryDSL
  15. 主播直播间抖音电商脚本运营话术模板表格方案管理计划
  16. JDK 18 / Java 18 正式发布:九项 JDK 增强
  17. 物联网和android有关系吗,基于Android平台的物联网网关有什么优势?
  18. Arduino使用 旋转电位器
  19. 《新白发魔女传》之自创歌曲 《绝缘》
  20. Qt股票软件企业级源码

热门文章

  1. html5 手机页面 允许放大,阻止移动设备(手机、pad)浏览器双击放大网页的方法
  2. 袋鼠云数据中台专栏2.0 | 企业数字化(数据界面)整体架构
  3. 放下助人情结,尊重他人命运
  4. oppo手机蓝牙连不上Airpods怎么破?
  5. 多个pdf怎么合并成一个pdf
  6. 综合能效管理平台在学校园区管理中的应用
  7. req和resp的作用及常用方法
  8. ipad和ipod的区别
  9. javascript和java两种方式实现生成二维码功能
  10. 苏联27.5万亿美元消失的真相