1、消息可靠投递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

  • confirm 确认模式
  • return 退回模式

rabbitmq 整个消息投递的路径为:

  • producer—>rabbitmq broker—>exchange—>queue—>consumer

  • 消息从 producer 到 exchange 则会返回一个 confirmCallback 。

  • 消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。

  • 将利用这两个 callback 控制消息的可靠性投递

因SpringBoot 整合RabbitMQ 当队列或交换机不存在是,自动创建,所以可靠性检测的一般是服务是否宕机。与消费者是否接收/确认消息无无关

1.1、SpringBoot整合

生产端

  • yaml

    spring:rabbitmq:host: 192.168.0.134port: 5672username: adminpassword: adminvirtual-host: /admin# 开启publisher-confirm 有以下可选值# simple:同步等待confirm结果,直到超时# correlated:异步回调,定义ConfirmCallback。mq返回结果时会回调这个ConfirmCallback# NONE:默认不开启publisher-confirm-type: correlated# 开启publish-return功能。可以定义ReturnCallback# true:调用ReturnCallback# false:直接丢弃消息publisher-returns: true
    
  • 自定义Callback类

    
    /*** 消息推送确认机制配置文件* @author codinganhour*/
    @Component
    public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {@AutowiredRabbitTemplate rabbitTemplate;/*** 初始化方法*/@PostConstructpublic void initMethod() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {Integer receivedDelay = null;if(null != correlationData){correlationData.getReturned().getMessage().getMessageProperties().getReceivedDelay();}if (receivedDelay != null && receivedDelay > 0) {// 是一个延迟消息,忽略这个错误提示return;}if (ack) {System.out.println("消息已经送达Exchange,ack已发");} else {System.out.println("消息没有送达Exchange");}}@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("消息没有送到队列中");}
    }
    

2、手动ACK确认机制

在RabbitMQ中指代的是消费者收到消息后确认的一种行为,关注点在于消费者能否实际接收到MQ发送的消息。

  • 自动Ack时,消费者接收消息后立即ack,然后慢慢处理,重启消费者会丢失消息。
  • 手动Ack时,消费者接收消息后,消息状态为 Unacked,如果消费的时候没有手动ack,则mq中的消息总量Total不会减少。

RabbitMQ默认的消息确认机制是:自动确认的

队列分配消息给监听消费者时,该消息处于未确认状态,不会被删除;当接收到消费者的确认回复才会将消息移除。

其提供了三种确认方式:

  • 自动确认acknowledge=“none”:当消费者接收到消息的时候,就会自动给到RabbitMQ一个回执,告诉MQ我已经收到消息了,不在乎消费者接收到消息之后业务处理的成功与否。

  • 手动确认acknowledge=“manual”:当消费者收到消息后,不会立刻告诉RabbitMQ已经收到消息了,而是等待业务处理成功后,通过调用代码的方式手动向MQ确认消息已经收到。当业务处理失败,就可以做一些重试机制,甚至让MQ重新向消费者发送消息都是可以的。

  • 根据异常情况确认acknowledge=“auto”:该方式是通过抛出异常的类型,来做响应的处理(如重发、确认等)。这种方式比较麻烦。

1.1、SpringBoot 整合RabbitMQ ACK

消费端

manual方式

  • yaml配置文件

    spring:rabbitmq:host: 192.168.0.134port: 5672username: adminpassword: adminvirtual-host: /adminlistener:# 容器类型simple或direct 简单理解为一对一;direct理解为一对多个消费者simple:# ACK模式(none,auto,manual,默认为auto)acknowledge-mode: manual# 开启重试retry:# 是否开启重试机制enabled: true
    
  • 消费者

/*** @author*/
@Slf4j
@Component
public class DirectManualListener {/*** 消息最大重试次数*/private static final int MAX_RETRIES = 3;/*** 重试间隔(秒)*/private static final long RETRY_INTERVAL = 5;/*** 手动进入死信队列* RabbitListener中的参数用于表示监听的是哪一个队列*/@RabbitListener(queues = DirectManualConfig.DIRECT_QUEUE)public void manualListenerQueue(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception {// 重试次数int retryCount = 0;boolean success = false;// 消费失败并且重试次数<=重试上限次数while (!success && retryCount < MAX_RETRIES) {retryCount++;// 具体业务逻辑System.out.println("处理业务逻辑");// 如果失败则重试if (!success) {String errorTip = "第" + retryCount + "次消费失败" +((retryCount < 3) ? "," + RETRY_INTERVAL + "s后重试" : ",进入死信队列");log.error(errorTip);Thread.sleep(RETRY_INTERVAL * 1000);}}if (success) {// 消费成功,确认channel.basicAck(deliveryTag, false);log.info("创建订单数据消费成功");} else {// requeue:false 手动拒绝,进入抛弃或进入死信队列channel.basicNack(deliveryTag, false, false);log.info("创建订单数据消费失败");}}
}

auto方式

  • yaml配置文件

    spring:rabbitmq:host: 192.168.0.134port: 5672username: adminpassword: adminvirtual-host: /adminlistener:simple:# ACK模式(none,auto,manual,默认为auto)acknowledge-mode: auto# 开启重试retry:# 是否开启重试机制enabled: true# 最大重试次数,默认3max-attempts: 5# 重试间隔(ms) 默认1秒initial-interval: 500# 重试因子,默认是1。本次推送时间间隔 = 上一次间隔时间 * multipliermultiplier: 2# 最大间隔时间(ms),默认10秒maxInterval: 20000
    
  • 消费者

@Slf4j
@Component
public class DirectAutoListener {/*** auto手动抛出异常方式进入死信队列,yaml中max-attempts,initial-interval生效* RabbitListener中的参数用于表示监听的是哪一个队列*/@RabbitListener(queues = DirectManualConfig.DIRECT_QUEUE)public void autoListenerQueue(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception {log.info("消息信息:"+message+";消息deliveryTag="+deliveryTag);Thread.sleep(1000);if(deliveryTag != 8){throw new RuntimeException("操作异常");}else{log.info("消息Ack deliveryTag="+deliveryTag);channel.basicAck(deliveryTag, false);}}
}

3、延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

场景:

  1. 下单后,30分钟未支付,取消订单,回滚库存。

  2. 新用户注册成功7天后,发送短信问候。

实现方式:

  1. 定时器
    缺点:触发时,会扫描数据库,难以精确定位触发时间,数据量大时数据库承受压力过大;

  2. 延迟队列(TTL+死信队列组合实现延迟队列的效果)
    精确触发,触发时只查询单一数据即可

延迟队列


/*** 延迟队列* @author*/
@Slf4j
@Configuration
public class DirectTtlConfig {/*** direct路由模式-交换机*/public static final String  DIRECT_EXCHANGE = "direct_ttl_exchange";/*** direct路由模式-队列*/public static final String DIRECT_QUEUE = "direct_ttl_queue";/*** direct路由模式-路由键*/public static final String DIRECT_ROUTING = "direct.ttl.routing";/*** direct路由模式-死信交换机*/public static final String  DIRECT_DLX_EXCHANGE = "direct_ttl_dlx_exchange";/*** direct路由模式-死信队列*/public static final String DIRECT_DLX_QUEUE = "direct_ttl_dlx_queue";/*** direct路由模式-路由键*/public static final String DIRECT_DLX_ROUTING = "direct.ttl.dlx.routing";/*** 1、声明交换机* direct路由模式,默认持久化,非自动删除* @return*/@Bean(DIRECT_EXCHANGE)public Exchange directTtlExchange(){return ExchangeBuilder.directExchange(DIRECT_EXCHANGE).build();}/*** 2、声明队列* direct路由模式* @return*/@Bean(DIRECT_QUEUE)public Queue directTtlQueue(){// ttl:延迟队列时间,超时为消费则进入死信队列中// deadLetterExchange:绑定死信交换机// deadLetterRoutingKey:绑定死信路由return QueueBuilder.durable(DIRECT_QUEUE).ttl(1000).deadLetterExchange(DIRECT_DLX_EXCHANGE).deadLetterRoutingKey(DIRECT_DLX_ROUTING).build();}/*** 3、队列与交换机进行绑定* direct路由模式* @param queue @Qualifier 将 value 对应的bean 注入到参数中* @param exchange @Qualifier 将 value 对应的bean 注入到参数中* @return*/@Beanpublic Binding directTtlQueueExchange(@Qualifier(DIRECT_QUEUE) Queue queue, @Qualifier(DIRECT_EXCHANGE) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DIRECT_ROUTING).noargs();}/*** 1、声明死信交换机* direct路由模式,默认持久化,非自动删除* @return*/@Bean(DIRECT_DLX_EXCHANGE)public Exchange directDlxExchange(){return ExchangeBuilder.directExchange(DIRECT_DLX_EXCHANGE).build();}/*** 2、声明死信队列* direct路由模式* @return*/@Bean(DIRECT_DLX_QUEUE)public Queue directDlxQueue(){return QueueBuilder.durable(DIRECT_DLX_QUEUE).build();}/*** 3、死信队列与死信交换机进行绑定* direct路由模式* @param queue @Qualifier 将 value 对应的bean 注入到参数中* @param exchange @Qualifier 将 value 对应的bean 注入到参数中* @return*/@Beanpublic Binding directDlxQueueExchange(@Qualifier(DIRECT_DLX_QUEUE) Queue queue, @Qualifier(DIRECT_DLX_EXCHANGE) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DIRECT_DLX_ROUTING).noargs();}
}

消费者只需要监听死信队列中消息即可

4、死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

死信的三种情况:

  1. 队列消息长度到达限制;

  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;(手动ack(auto,manual)都可以触发)

  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

死信队列与延期队列实现方式一致,只是会监听2个消费者,正常队列采用ack(auto,manual)触发是否进入死信队列

QueueBuilder.durable(DIRECT_QUEUE).maxLength():队列中等待消费的数量大于maxLength的数量就会进入死信队列

5、消息幂等性保障

幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

处理方式

  • 传递消息唯一值记录数据库中或者redis中,消费时判断,防止重复消费

  • 更新数据库时可以采用乐观锁方式,关键字段值发生变化则不消费

6、消息积压

  • 消费者宕机积压
  • 消费者消费能力不足积压
  • 发送者发流量太大

解决方案:上线更多的消费者,进行正常消费上线专门的队列消费服务,将消息先批量取出来,记录数据库,再慢慢处理

SpringBoot整合RabbitMQ 消息可靠投递、手动ack、延迟队列、死信队列、消息幂等性保障、消息积压相关推荐

  1. rabbit和mysql事务_分布式事务原理及SpringBoot整合RabbitMQ实现可靠事件,TCC事务模型及接口幂等性...

    分布式事务 我们知道在单数据库系统中,实现数据的一致性,通过数据库的事务来处理比较简单.在微服务或分布式系统中,各个独立的服务都会有自己的数据库,而不是在同一个数据库中,所以当一组事务(如商品交易中, ...

  2. RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列

    搭建SpringBoot项目,用于演示 springboot版本 <!-- spring boot --><dependency><groupId>org.spri ...

  3. SpringBoot整合RabbitMQ消息队列

    RabbitMQ 一.RabbitMQ介绍 1.1 现存问题 服务调用:两个服务调用时,我们可以通过传统的HTTP方式,让服务A直接去调用服务B的接口,但是这种方式是同步的方式,虽然可以采用Sprin ...

  4. Rabbitmq消息可靠投递和重复消费等问题解决方案

    消息的可靠性投递 在一些对数据一致性要求较高的业务场景里面,如果消息在发布和消费过程中出现了问题(消息丢失,消息重复消费),就会导致数据不一致,要做到消息的可靠性投递. 在RabbitMq里面提供了很 ...

  5. rabbitmq消息可靠投递(理论)

    消息的可靠投递 在使用Rabbit MQ的时候,在发送消息的时候我们希望消息不会投递失败,这个时候RabbitMQ为我们提供了两种方式用来控制消息的投递可靠性模式. config确认模式 return ...

  6. SpringBoot整合RabbitMQ-消息可靠性投递

    本系列是学习SpringBoot整合RabbitMQ的练手,包含服务安装,RabbitMQ整合SpringBoot2.x,消息可靠性投递实现等三篇博客. 学习路径:https://www.imooc. ...

  7. Springboot——整合Rabbitmq之Confirm和Return详解

    文章目录 前言 为什么会有Confirm Springboot 整合 Mq 实现 Confirm 监听机制 依赖引入 增加配置文件,设定连接信息 配置队列.交换机,以及对其进行绑定 编写mq消息发送服 ...

  8. SpringBoot整合RabbitMQ(包含生产者和消费者)

    生产者 创建一个SpringBoot项目springboot-producer,作为RabbitMQ的生产者. 在pom文件中引入相关的依赖坐标 <dependency><group ...

  9. Springboot 整合RabbitMq ,用心看完这一篇就够了

    该篇文章内容较多,包括有rabbitMq相关的一些简单理论介绍,provider消息推送实例,consumer消息消费实例,Direct.Topic.Fanout的使用,消息回调.手动确认等. (但是 ...

最新文章

  1. o oia ospf 路由优先_动态路由OSPF中注入默认路由,原来都是这么玩的,进去看看...
  2. svm学习之线性部分总结
  3. 支持OpenStack,红帽将开源进行到底
  4. opencv java水平投影_OpenCV实现图像在水平方向上投影
  5. 漫画:趣解鸿蒙 OS 如何实现跨平台?
  6. python 图形模块_Python图形模块
  7. robotframework安装_Robot Framework零基础入门教程
  8. 外部接口需求怎么写_怎么写财务工作报告?送你16套高逼格财务工作报告范文PPT模板,满足不同行业会计需求!...
  9. 大数据踩坑之旅: 从数据可视化到商业智能
  10. Civil2019程序安装及注意事项
  11. Android X86上运行基于ARMARM处理器的应用程序
  12. Android网上购物商城测试,Android 应用上架小米商城Monkey 测试不通过
  13. 从前后端分离到前后端整合的“退步”(二)pom.xml文件配置
  14. 电压的降额 Voltage Derating
  15. Redmi11T Pro +值不值得买 Redmi11T Pro +配置如何
  16. NGK韩国峰会完美落幕,共议区块链生态新发展
  17. 一个java随机数据的工具类
  18. 解决docker容器因报错无法启动的问题,检查、修复容器错误并重启
  19. 2017全国计算机ps版本,Adobe Photoshop v18.0.0 (PS CC 2017) 中文多语言版本 不断更新
  20. 三毛作品集——在线阅读

热门文章

  1. ADC VS参考电压在测量时和AD值的大小对应关系
  2. 无人机飞行表演编队的数据链方案介绍
  3. 第6章——马斯洛:需要层次理论
  4. 解决棘手SQL性能问题,我的SQLT使用心得
  5. Linux c语言 creat参数,C++_使用C语言操作文件的基本函数整理,C语言creat()函数:创建文件函 - phpStudy...
  6. 北航软件工程考研非全上岸经验贴
  7. Webstorm安装及使用
  8. 时间戳 和 时间的相互转换
  9. 【论文阅读笔记】Real-Time High-Resolution Background Matting
  10. 《洛克菲勒留给儿子的38封信》 第三封:天堂与地狱比邻