目录

8.1. 发布确认 springboot 版本

8.1.1. 确认机制方案

8.1.2. 代码架构图

8.1.3. 配置文件

8.1.4. 添加配置类

8.1.5. 消息生产者

8.1.6. 回调接口

8.1.7. 消息消费者

8.1.8. 结果分析

8.2. 回退消息

8.2.1. Mandatory 参数

8.2.2. 消息生产者代码

8.2.3. 回调接口

8.2.4. 结果分析

8.3. 备份交换机

8.3.1. 代码架构图

8.3.2. 修改配置类

8.3.3. 报警消费者

8.3.4. 测试注意事项

8.3.5. 结果分析


在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,

导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?

特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢

8.1. 发布确认 springboot 版本

8.1.1. 确认机制方案

8.1.2. 代码架构图

8.1.3. 配置文件

在配置文件当中需要添加

spring.rabbitmq.publisher-confirm-type=correlated

⚫ NONE

禁用发布确认模式,是默认值

⚫ CORRELATED

发布消息成功到交换器后会触发回调方法

⚫ SIMPLE

经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,

其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法

等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是

waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker

8.1.4. 添加配置类

package com.xingchen.mq.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;/*** @author xingchen* @version V1.0*  * 发布确认兜底方案  添加缓存测试* @Package com.xingchen.mq.config* @date 2022/12/7 11:52*/
@Configuration
public class ConfirmConfig {//交换机private static final String CONFIRM_EXCHANGE_NAME = "confirm-exchange";//队列private static final String CONFIRM_QUEUE_NAME = "confirm-queue";//ROUTING_KEYprivate static final String CONFIRM_ROUTING_KEY = "confirm-key";/*** 备份交换机*/private static final String BACKUP_EXCHANGE_NAME = "backup-exchange";/*** 备份队列*/private static final String BACKUP_QUEUE_NAME = "backup-queue";/*** 报警队列*/private static final String WARNING_QUEUE_NAME = "warning-queue";@Bean("confirmExchange")public DirectExchange directExchange() {/**确认交换机配置备份交换机 以确保宕机后将消息转发到备份交换机*/return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build();}@Bean("backupExchange")public FanoutExchange backupExchange() {return new FanoutExchange(BACKUP_EXCHANGE_NAME);}@Bean("confirmQueue")public Queue confirmQueue() {HashMap<String, Object> map = new HashMap<>(8);return new Queue(CONFIRM_QUEUE_NAME, false, false, false, map);}@Bean("backupQueue")public Queue backupQueue() {HashMap<String, Object> map = new HashMap<>(8);return new Queue(BACKUP_QUEUE_NAME, false, false, false, map);}@Bean("warningQueue")public Queue warningQueue() {HashMap<String, Object> map = new HashMap<>(8);return new Queue(WARNING_QUEUE_NAME, false, false, false, map);}@Beanpublic Binding queueConfirmBindingExchange(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs();}@Beanpublic Binding backupConfirmBindingExchange(@Qualifier("backupQueue") Queue queue,@Qualifier("backupExchange") FanoutExchange exchange) {return BindingBuilder.bind(queue).to(exchange);}@Beanpublic Binding warningConfirmBindingExchange(@Qualifier("warningQueue") Queue queue,@Qualifier("backupExchange") FanoutExchange exchange) {return BindingBuilder.bind(queue).to(exchange);}
}

8.1.5. 消息生产者

package com.xingchen.mq.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;/*** @author xingchen* @version V1.0* @Package com.xingchen.mq.config* @date 2022/12/7 11:52*/
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ConfirmController {@Resourceprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendConfirm/{msg}")public void sendConfirmMessage(@PathVariable("msg") String msg) {/**声明回调的形参*/CorrelationData correlationData = new CorrelationData("1");rabbitTemplate.convertAndSend("confirm-exchange", "confirm-key", msg, correlationData);log.info("发送信息为:" + msg);}
}

8.1.6. 回调接口

package com.xingchen.mq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;/*** @author xingchen* @version V1.0* @Package com.xingchen.mq.config* @date 2022/12/7 11:52*/
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Resourceprivate RabbitTemplate rabbitTemplate;/*** 该注解会在其他注解执行完毕之后,进行一个属性的注入,必须将该类注入到rabbitTemplate的内部类中* 内部类就是这个ConfirmCallback*/@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);/**同时需要注入队列回退接口*/rabbitTemplate.setReturnCallback(this);}/*** @param correlationData 包含了消息的ID和其他数据信息 这个需要在发送方创建,否则没有* @param ack             返回的一个交换机确认状态 true 为确认 false 为未确认* @param cause           未确认的一个原因,如果ack为true的话,此值为null*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {log.info("消息发送成功,id 是{} ", id);} else {log.info("消息发送失败,原因 是{} id 为{}", cause, id);}}/*** 可以在消息传递过程中,如果交换机遇到不可路由的情况,会将消息返回给生产者** @param message    消息* @param replyCode  回复状态码* @param replyText  退回原因* @param exchange   交换机* @param routingKey 路由Key*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.error("消息{},被交换机{}退回,路由Key是{},退回原因是{}", new String(message.getBody()), exchange, routingKey, replyText);}
}

8.1.7. 消息消费者

package com.xingchen.mq.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author xingchen* @version V1.0* @Package com.xingchen.mq.config* @date 2022/12/7 11:52*/
@Component
@Slf4j
public class ConfirmConsumer {@RabbitListener(queues = {"confirm-queue"})public void receiveMsg(Message message) {log.info("接收到的消息为: " + new String(message.getBody()));}
}

8.1.8. 结果分析

可以看到,发送了两条消息,第一条消息的 RoutingKey 为 "key1",第二条消息的 RoutingKey 为

"key2",两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为

第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条

消息被直接丢弃了。

8.2. 回退消息

8.2.1. Mandatory 参数

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息

果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何

让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参

数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

8.2.2. 消息生产者代码

package com.xingchen.mq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;/*** @author xingchen* @version V1.0* @Package com.xingchen.mq.config* @date 2022/12/7 11:52*/
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Resourceprivate RabbitTemplate rabbitTemplate;/*** 该注解会在其他注解执行完毕之后,进行一个属性的注入,必须将该类注入到rabbitTemplate的内部类中* 内部类就是这个ConfirmCallback*/@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);/**同时需要注入队列回退接口*/rabbitTemplate.setReturnCallback(this);}/*** @param correlationData 包含了消息的ID和其他数据信息 这个需要在发送方创建,否则没有* @param ack             返回的一个交换机确认状态 true 为确认 false 为未确认* @param cause           未确认的一个原因,如果ack为true的话,此值为null*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {log.info("消息发送成功,id 是{} ", id);} else {log.info("消息发送失败,原因 是{} id 为{}", cause, id);}}/*** 可以在消息传递过程中,如果交换机遇到不可路由的情况,会将消息返回给生产者** @param message    消息* @param replyCode  回复状态码* @param replyText  退回原因* @param exchange   交换机* @param routingKey 路由Key*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.error("消息{},被交换机{}退回,路由Key是{},退回原因是{}", new String(message.getBody()), exchange, routingKey, replyText);}
}

8.2.3. 回调接口

package com.xingchen.mq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;/*** @author xingchen* @version V1.0* @Package com.xingchen.mq.config* @date 2022/12/7 11:52*/
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Resourceprivate RabbitTemplate rabbitTemplate;/*** 该注解会在其他注解执行完毕之后,进行一个属性的注入,必须将该类注入到rabbitTemplate的内部类中* 内部类就是这个ConfirmCallback*/@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);/**同时需要注入队列回退接口*/rabbitTemplate.setReturnCallback(this);}/*** @param correlationData 包含了消息的ID和其他数据信息 这个需要在发送方创建,否则没有* @param ack             返回的一个交换机确认状态 true 为确认 false 为未确认* @param cause           未确认的一个原因,如果ack为true的话,此值为null*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {log.info("消息发送成功,id 是{} ", id);} else {log.info("消息发送失败,原因 是{} id 为{}", cause, id);}}/*** 可以在消息传递过程中,如果交换机遇到不可路由的情况,会将消息返回给生产者** @param message    消息* @param replyCode  回复状态码* @param replyText  退回原因* @param exchange   交换机* @param routingKey 路由Key*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.error("消息{},被交换机{}退回,路由Key是{},退回原因是{}", new String(message.getBody()), exchange, routingKey, replyText);}
}

8.2.4. 结果分析

8.3. 备份交换机

有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息

无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然

后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者

所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增

加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的

复杂性,该怎么做呢?前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些

处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。

在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份

交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就

是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备

份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定

的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进

入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

8.3.1. 代码架构图

8.3.2. 修改配置类

package com.xingchen.mq.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;/*** @author xingchen* @version V1.0*  * 发布确认兜底方案  添加缓存测试* @Package com.xingchen.mq.config* @date 2022/12/7 11:52*/
@Configuration
public class ConfirmConfig {//交换机private static final String CONFIRM_EXCHANGE_NAME = "confirm-exchange";//队列private static final String CONFIRM_QUEUE_NAME = "confirm-queue";//ROUTING_KEYprivate static final String CONFIRM_ROUTING_KEY = "confirm-key";/*** 备份交换机*/private static final String BACKUP_EXCHANGE_NAME = "backup-exchange";/*** 备份队列*/private static final String BACKUP_QUEUE_NAME = "backup-queue";/*** 报警队列*/private static final String WARNING_QUEUE_NAME = "warning-queue";@Bean("confirmExchange")public DirectExchange directExchange() {/**确认交换机配置备份交换机 以确保宕机后将消息转发到备份交换机*/return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build();}@Bean("backupExchange")public FanoutExchange backupExchange() {return new FanoutExchange(BACKUP_EXCHANGE_NAME);}@Bean("confirmQueue")public Queue confirmQueue() {HashMap<String, Object> map = new HashMap<>(8);return new Queue(CONFIRM_QUEUE_NAME, false, false, false, map);}@Bean("backupQueue")public Queue backupQueue() {HashMap<String, Object> map = new HashMap<>(8);return new Queue(BACKUP_QUEUE_NAME, false, false, false, map);}@Bean("warningQueue")public Queue warningQueue() {HashMap<String, Object> map = new HashMap<>(8);return new Queue(WARNING_QUEUE_NAME, false, false, false, map);}@Beanpublic Binding queueConfirmBindingExchange(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs();}@Beanpublic Binding backupConfirmBindingExchange(@Qualifier("backupQueue") Queue queue,@Qualifier("backupExchange") FanoutExchange exchange) {return BindingBuilder.bind(queue).to(exchange);}@Beanpublic Binding warningConfirmBindingExchange(@Qualifier("warningQueue") Queue queue,@Qualifier("backupExchange") FanoutExchange exchange) {return BindingBuilder.bind(queue).to(exchange);}
}

8.3.3. 报警消费者

package com.xingchen.mq.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author xingchen* @version V1.0* @Package com.xingchen.mq.config* @date 2022/12/7 11:52*/
@Component
@Slf4j
public class WarningConsumer {@RabbitListener(queues = {"warning-queue"})public void receiveWarningMsg(Message message) {log.warn("出现不可路由消息:", message);}
}

8.3.4. 测试注意事项

重新启动项目的时候需要把原来的confirm.exchange 删除因为我们修改了其绑定属性,不然报以下错:

8.3.5. 结果分析

mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先

级高,经过上面结果显示答案是备份交换机优先级高

RabbitMQ之发布确认相关推荐

  1. springboot整合rabbitmq的发布确认,消费者手动返回ack,设置备用队列,以及面试题:rabbitmq确保消息不丢失

    目录 1.生产者发消息到交换机时候的消息确认 2.交换机给队列发消息时候的消息确认 3.备用队列 3.消费者手动ack rabbitmq的发布确认方式,可以有效的保证我们的数据不丢失. 消息正常发送的 ...

  2. RabbitMQ异步发布确认

    异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是 ...

  3. RabbitMQ单个发布确认

    这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布, waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候 ...

  4. RabbitMQ(三)发布确认

    4.1 发布确认原理 生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队 ...

  5. RabbitMQ(三)发布确认 Publisher Confirms

    代码仓库:github:https://github.com/stopping5/RabbitMq-Operation-Record.git 本代码示例需要引入rabbitmq依赖 <!-- r ...

  6. 十、RabbitMQ发布确认高级

    RabbitMQ发布确认高级 发布确认SpringBoot版本 发布确认Springboot版本 简单的发布确认机制在应答与签收已经介绍,本内容将介绍整合了 SpringBoot 的发布确认机制. 介 ...

  7. RabbitMQ 从入门到精通 消息应答 持久化 交换机 队列 发布确认 集群 等

    RabbitMQ消息队列 RabbitMQ 的概念 RabbitMQ 是一个消息中间件:它接受并转发消息.你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快 ...

  8. RabbitMQ入门(三)消息应答与发布确认

    前言: 消息应答与发布确认都是保证消息不丢失.而重复消费问题则是消息幂等性.(之后会说幂等性) 消息应答: 应答功能属于消费者,消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理 ...

  9. RabbitMQ 消费者回执和发布确认

    为了保证数据安全,消费者和生产者的回执(ack)都是非常重要的. 由于我们无法保证消息都能像我们期望的那样,正常到达另一端或者被 Consumer 消费成功.因此,publisher 和 consum ...

  10. RabbitMQ之消息确认机制(事务+Confirm)

    概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达brok ...

最新文章

  1. vmoptions默认配置_AndroidStudio基本配置
  2. 概率整形 Peobabilistic Shaping PS
  3. 【Python之路Day12】网络篇之Python操作RabbitMQ
  4. Android之添加快捷方式(Shortcut)到手机桌面
  5. java基础—Runtime类使用
  6. 集合类 List 的那些坑
  7. 2019-06-04 Sublime Text 中文输入法的问题
  8. Linux之ls命令
  9. H5自带的type=date或者month等日期控件移动端显示placeholder
  10. C++ 常见错误(02) —— 将dll(用c++写的)处理的结果展示在界面上
  11. LightOJ - 1050 (唯一分解+推公式+乘法逆元)
  12. Problem:服务器超过最大中终连接数
  13. CSS3+JS制作的一款图标任意拖动,并且可以放在文件夹中
  14. php对接抖音快手小红书短视频去水印接口的代码
  15. 2021年12月电子学会图形化四级编程题解析含答案:棕熊大战
  16. 维山VS073高拍仪技术规格说明书
  17. 球形/PH响应性的树枝状聚合物:树枝状聚酰胺PAMAM/His-PAMAM/DNA 聚酰胺偶联组氨酸/的制备流程
  18. python读取mac地址_python - 获取MAC地址
  19. php.ini中文翻译版--转载
  20. python分析北京租房现状,最后的价格分布地图亮了

热门文章

  1. C4D渲染保存多通道psd格式,图层都是线性减淡模式,oc多通道图层都是线性简单模式
  2. 如何选用GPU云服务器?
  3. 惠普179fnw打印机使用说明_惠普HP Color Laser MFP 179fnw 一体打印机驱动下载
  4. 每个人都是雕刻自己的艺术家,生活是你的背景
  5. android 控制空调,控制精灵空调遥控器
  6. 不积跬步 无以至千里
  7. week 5 session and cookie
  8. PCB需要清洗的技巧
  9. 豆果美食 php,#一道菜表白豆果美食# 比蛋糕还好吃的咕咕霍夫面包
  10. javaEE 后台框架 SpringMVC Mybatis Shiro druid