7.RabbitMQ可靠性投递

为了保证信息不丢失, 可靠抵达,引入确认机制

消息从生产者传递到消费者的过程中, 不同的阶段使用不同的确认方式.

7.0.准备请求

一次性发送10 个消息 通过 new.exchange.direct交换机 接收消息, 使用 new.admin路由键new.admin队列 发送消息.

@Autowired
private RabbitTemplate rabbitTemplate;@RequestMapping("/sender/test2")
public String test2(){String msg = "Mode Batch Confirm, Rabbit MQ";for (int i = 0; i < 10; i++) {String cd = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(cd);//   交换机, 路由键, 消息, 消息idrabbitTemplate.convertAndSend("new.exchange.direct","new.admin", msg +":" + i , correlationData);}return msg;
}

7.1.消息发送到borker消息代理

从 Producer 生产者到 borker消息代理时, 有两种方式: Transaction(事务)模式Confirm(确认)模式

其中 Transaction(事务)模式是使用阻塞模式, 效率低, 官方说法是性能下降 270 倍. 所以通常使用的是 Confirm(确认)模式

7.1.1.配置启动代理borker确认

# 启动 代理borker 确认
##spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated

spring.rabbitmq.publisher-confirms=true 是旧版的写法

spring.rabbitmq.publisher-confirm-type

NONE : 禁用发布确认模式,是默认值

CORRELATED : 发布消息成功到交换器后会触发回调方法

SIMPLE : 经测试有两种效果,

​ 其一效果和CORRELATED值一样会触发回调方法,

​ 其二在发布消息成功后使用rabbitTemplate调用waitForConfirms()或waitForConfirmsOrDie()方法等待broker节点返回发送结果,

​ 根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie()方法如果返回false则会关闭channel,则接下来无法发送消息到broker;

7.1.2.增加 回调函数

建立配置类, 并为 RabbitTemplate增加回调函数 ConfirmCallback()

RabbitTemplate只允许设置一个callback方法,可以将RabbitTemplate的bean设为单例然后设置回调,但是这样有个缺点是使用RabbitTemplate的地方都会执行这个回调,如果直接在别的地方设置,会报如下错误

only one ConfirmCallback is supported by each RabbitTemplate

可以通过将RabbitTemplate的作用域设为@Scope,每次bean都是新的,来解决这个问题

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;@Configuration
public class RabbitMessageConfig {@Bean@Scope("prototype")public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 设置代理borker接收确认回调函数rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** @param correlationData   消息id* @param ack     消息是否成功收到, 投递到代理borker 返回 true , 失败返回 false* @param cause   失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("ack = " + ack);if (!ack) {System.out.println("发送消息到 [代理borker] 失败{ correlationData : " + correlationData + ", cause : " + cause + "}");}else{System.out.println("发送消息到 [代理borker] 成功{ correlationData : " + correlationData + "}");}}});return rabbitTemplate;}
}

这样当 代理borker接收到消息时, 会自动调用方法

7.1.3.发请求测试

在 控制台输出

ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=fc39288a-86e3-4122-9c68-50d084e483f5]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=09d765f9-8317-4c87-8e7f-4e66ef382bca]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=0d9a414c-b5ae-4cf5-a825-6c14f4030643]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=55c0fe51-18d3-4d4c-b221-114341a69ad3]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=1d45b2e0-6bec-40d4-90ab-da52a6252d28]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=fb4142a4-7f98-4517-bc74-b8ed0d048741]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=97ee4871-735d-4264-be62-bc7749804504]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=637e57a7-a440-45a9-bd90-87963e4108f9]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=e6260609-a118-4ad1-a864-a6777583826a]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=b9e2db4f-c259-42dd-81a9-3dbfc33755d7]}

rabbit控制台, 收到10条消息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4wtFbj8g-1679806873943)(RabbitMQ.assets/image-20230323104854513.png)]

7.2.消息发送到queue队列

第二个环节就是从交换机Exchange路由到队列queue。

消息无法路由到正确的队列的原因有 1)、路由键错误 2)、队列不存在。
有两种方式处理无法路由的消息,一种是让服务器重发给生产者,一种是让交换机路由到另一个备份的交换机。

7.2.1.配置启动队列queue确认

# 启动 队列queue 确认
spring.rabbitmq.publisher-returns=true
# 只要抵达队列, 以异步发送优先回调我们这个returnconfirm
spring.rabbitmq.template.mandatory=true

7.2.2.增加 回调函数

依然在 RabbitTemplate增加回调函数 ReturnCallback()

这个回调只有在队列接收失败时才会被调用

同时注意要 加上 rabbitTemplate.setMandatory(true); 的设置

//将消息退回给 producer 。并执行回调函数returnedMessage。
rabbitTemplate.setMandatory(true);// 设置Queue队列接收确认回调函数
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){/*** 这个方法在投递到队列queue [失败] 时才会执行* @param message  投递失败的消息的详细信息* @param replyCode  回复的状态码* @param replyText  回复的文本内容* @param exchange   交换机信息* @param routingKey   路由键*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey){System.out.print("发送消息到 [队列Queue] 失败, 回发的消息:{ ");System.out.print("replyCode: "+replyCode);System.out.print(", replyText: "+replyText);System.out.print(", exchange: "+exchange);System.out.print(", routingKey: "+routingKey);System.out.println( " }");}
});

7.2.3.测试

7.2.3.1.修改请求

加入一个不存在请求

@RequestMapping("/sender/test2")
public String test2(){String msg = "Mode Batch Confirm, Rabbit MQ";for (int i = 0; i < 10; i++) {String cd = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(cd);if(i%2==0){//   交换机, 路由键, 消息, 消息idrabbitTemplate.convertAndSend("new.exchange.direct","new.admin", msg +":" + i , correlationData);}else{//   写了一个不存在的路由键rabbitTemplate.convertAndSend("new.exchange.direct","1234567890", msg +":" + i , correlationData);}}return msg;
}

7.2.3.2.控制台输出

其中一部分, 消息成功发到broker代理, 但由于错误的路由键, 所以不能发到队列里

ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=7310b457-a6b2-4c76-b28b-d854fcccaefd]}
发送消息到 [队列Queue] 失败, 回发的消息:{ replyCode: 312, replyText: NO_ROUTE, exchange: new.exchange.direct, routingKey: 1234567890 }
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=06478d12-9e5d-432b-ad8e-e244e543b0ee]}
发送消息到 [队列Queue] 失败, 回发的消息:{ replyCode: 312, replyText: NO_ROUTE, exchange: new.exchange.direct, routingKey: 1234567890 }
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=24c8fed0-617b-4a00-8dec-98d71dace4a2]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=bc565b4f-c98c-4412-ad49-7fce3b233521]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=fd87a581-931b-44e7-9bbf-d3879e26da20]}
发送消息到 [队列Queue] 失败, 回发的消息:{ replyCode: 312, replyText: NO_ROUTE, exchange: new.exchange.direct, routingKey: 1234567890 }
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=b45113af-1a04-4e2b-8d1b-b1bfd78e8404]}
发送消息到 [队列Queue] 失败, 回发的消息:{ replyCode: 312, replyText: NO_ROUTE, exchange: new.exchange.direct, routingKey: 1234567890 }
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=3ef73907-f217-4676-aac8-cb3dfe30eea0]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=1e4ce655-2f4c-43f3-8f2e-c7d2d7d46b68]}
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=5ebb5f26-6cb0-4952-a00b-de218b1a144a]}
发送消息到 [队列Queue] 失败, 回发的消息:{ replyCode: 312, replyText: NO_ROUTE, exchange: new.exchange.direct, routingKey: 1234567890 }
ack = true
发送消息到 [代理borker] 成功{ correlationData : CorrelationData [id=9a930da2-12a0-4911-b219-2c5ff9407fac]}

7.3.消费者接收消息

如果 消息不被消费会一直存储在MQ里 , 直到被消费

但自动消费模式下, 如果 多条消息只有一条被消费, 其它的消息也被从队列中清除, 所以要改为手动消费

7.3.1.配置启动手动消费

# 将 消息消费确认 修改为手动模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual

7.3.2.增加消息消费手动确认

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class ReceiverServiceImpl {// 监听队列@RabbitListener(queues = "new.admin")@RabbitHandlerpublic void goodsProcess(Message message, Channel channel) {System.out.println("new.admin 队列 接收消息 : " + message);// DeliveryTag 是 channel 内顺序号 , 自增形式long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("deliveryTag = " + deliveryTag);try {if (deliveryTag%2 == 0) {// 确认消费, 第二个参数是 是否批量channel.basicAck(deliveryTag, false);System.out.println("签收了消息>>> = " + deliveryTag);}else {// long deliveryTag(序号), boolean multiple(是否批量签收), boolean requeue(是否重新入队)// 重新入队的消息会 , 重新从队列投递给消费者channel.basicNack(deliveryTag, false, true);System.out.println("没有签收到消息<<< = " + deliveryTag);}} catch (Exception e) {e.printStackTrace();}}}

这里使用 deliveryTag%2==0 来 模拟 有的消息 确认消费, 有的消息不能确认消费

通过 channel.basicNack(deliveryTag, false, true); 来处理 不能确认消费的消息

其中第三个参数, 代表是否重新进入队列, true为重新进入, 这样消息会重新发送到消费者这里

RabbitMQ 入门到应用 ( 六 ) 消息可靠性相关推荐

  1. RabbitMQ入门(三)消息应答与发布确认

    前言: 消息应答与发布确认都是保证消息不丢失.而重复消费问题则是消息幂等性.(之后会说幂等性) 消息应答: 应答功能属于消费者,消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理 ...

  2. RabbitMQ系列【8】消息可靠性之ACK机制

    有道无术,术尚可求,有术无道,止于术. 文章目录 前言 自动确认 1. 配置 2. 演示 手动确认 1. 配置 2. 代码 3. 测试 前言 在之前分析了对于生产者来说,可以使用消息发布确认及退回机制 ...

  3. RabbitMQ高级之如何保证消息可靠性?

    楔子 本篇是消息队列RabbitMQ的第四弹. RabbitMQ我已经写了三篇了,基础的收发消息和基础的概念我都已经写了,学任何东西都是这样,先基础的上手能用,然后遇到问题再去解决,无法理解就去深入源 ...

  4. RabbitMQ入门学习系列(三).消息发送接收

    快速阅读 用Rabitmq的队列管理,以及如何保证消息在队列中不丢失.通过ack的消息确认和持久化进行操作.以及Rabbit中如何用Web面板进行管理队列.消费者如何处理耗时的任务 生产者代码创建链接 ...

  5. RabbitMQ入门学习系列(六) Exchange的Topic类型

    快速阅读 介绍exchange的topic类型,和Direct类型相似,但是增加了"."和"#"的匹配.比Direct类型灵活 Topic消息类型 特点是:to ...

  6. RabbitMQ入门教程(十一):消息属性Properties

    分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 简介 发送消息可以为消息指定一些参数 Delivery mode: 是否持久化,1 - Non-persistent,2 ...

  7. 干货!消息队列RabbitMQ入门教程

    ​写在前面:全文12000多字,从为什么需要用消息队列,到rabbitMQ安装使用,如何使用JavaAPI生产消费消息,以及使用消息队列带来的一些常见问题.绝对很适合新手入门学习. 为什么需要消息队列 ...

  8. RabbitMQ 入门系列(10)— RabbitMQ 消息持久化、不丢失消息

    消息要保持"持久化",即不丢失,必须要使得消息.交换器.队列,必须全部 "持久化". 1. 生产者怎么确认 RabbitMQ 已经收到了消息? # 打开通道的确 ...

  9. rabbitmq可靠性投递_解决RabbitMQ消息丢失问题和保证消息可靠性(一)

    工作中经常用到消息中间件来解决系统间的解耦问题或者高并发消峰问题,但是消息的可靠性如何保证一直是个很大的问题,什么情况下消息就不见了?如何防止消息丢失?下面通过这篇文章,我们就聊聊RabbitMQ 消 ...

最新文章

  1. 黑盒测试方法之边界值分析法
  2. R画月亮阴晴圆缺:corrplot绘图相关系数矩阵
  3. Linux中pthread源码在哪,pthread - 源码下载|系统编程|Linux/Unix编程|源代码 - 源码中国...
  4. Android的Gson的使用方法,实现Json结构间相互转换
  5. .net core 并发下的线程安全问题
  6. python缓冲区_如何在Python中使用Google的协议缓冲区
  7. 这个天气怎么就这么热啊,哪里还有心情写代码呀。
  8. python输出命令_Python中的命令输出解析
  9. 【机器学习】K近邻(KNN)算法详解
  10. DB2表空间状态列表
  11. 谷歌关闭SameSite功能
  12. 微信团队分享:视频图像的超分辨率技术原理和应用场景
  13. 计算机教师个人诊改总结,完整版)教师个人诊改报告
  14. HTML基础,CSS基础
  15. 微信小程序打开微信公众号中的文章实战教程
  16. TensorFlow-gpuCould not load dynamic library ‘cudart64_102.dll‘; dlerror: cudart64_102.dll not found
  17. CAJ转PDF文件,这两个免费方法非常好用!
  18. PROJECT2: 华为云 >> 企业云平台完整架构实例应用分解(第一部分Web端)
  19. fpga的jtag接口扫不到器件_FPGA中AS和JTAG接口的使用
  20. 电路基础(第一章电路模型和电路定律)

热门文章

  1. Towards Better Understanding of Self-Supervised Representations / Q-Score
  2. LeetCode: 953. 验证外星语词典
  3. Linux手动安装和部署github
  4. 软件工程——软件测试方法
  5. sql语句的各种模糊查询
  6. DCloud使用小结
  7. echo回音消除方案
  8. Dcloud课程2 什么是Dcloud
  9. php怎么读取word文档
  10. 元胞自动机在交通系统中的应用之一【元胞自动机的基础知识】