目录

一、简介

二、生产者发送确认

三、消费者接收确认

四、示例

五、总结


一、简介

在RabbitMQ中,消息确认主要有生产者发送确认和消费者接收确认:

  • 生产者发送确认:指生产者发送消息后到RabbitMQ服务器,如果RabbitMQ服务器收到消息,则会给我们生产者一个应答,用于告诉生产者该条消息已经成功到达RabbitMQ服务器中。

  • 消费者接收确认:用于确认消费者是否成功消费了该条消息。

消息确认的实现方式主要有两种,一种是通过事务的方式(channel.txSelect()、channel.txCommit()、channel.txRollback()),另外一种是confirm确认机制。因为事务模式比较消耗性能,在实际工作中也用的不多,这里主要介绍通过confirm机制来实现消息的确认,保证消息的准确性。

二、生产者发送确认

在RabbitMQ中实现生产者发送确认的方法(本文使用springboot项目),主要有两点:

【a】配置文件中配置消息发送确认

spring.rabbitmq.publisher-confirms = true

【b】生产者实现 RabbitTemplate.ConfirmCallback接口,重写方法

confirm(CorrelationData correlationData, boolean isSendSuccess, String error)

当然也可以通过注入的方式自定义confirm listener.

@Component
public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback{@Overridepublic void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {.........}
}

三、消费者接收确认

为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。确认模式主要分为下面三种:

  • AcknowledgeMode.NONE:不确认
  • AcknowledgeMode.AUTO:自动确认
  • AcknowledgeMode.MANUAL:手动确认

注意:在springboot项目中通过在配置文件中指定消息确认的模式,如下指定手动确认模式:

spring.rabbitmq.listener.simple.acknowledge-mode = manual

手动确认与自动确认的区别:

  • 自动确认:这种模式下,当发送者发送完消息之后,它会自动认为消费者已经成功接收到该条消息。这种方式效率较高,当时如果在发送过程中,如果网络中断或者连接断开,将会导致消息丢失。
  • 手动确认:消费者成功消费完消息之后,会显式发回一个应答(ack信号),RabbitMQ只有成功接收到这个应答消息,才将消息从内存或磁盘中移除消息。这种方式效率较低点,但是能保证绝大部分的消息不会丢失,当然肯定还有一些小概率会发生消息丢失的情况。

手动确认主要使用的方法有下面几个:

  • public void basicAck(long deliveryTag, boolean multiple):deliveryTag 表示该消息的index(long类型的数字);multiple 表示是否批量(true:将一次性ack所有小于deliveryTag的消息);

如果成功消费消息,一般调用下面的代码用于确认消息成功处理完

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
public void basicNack(long deliveryTag, boolean multiple, boolean requeue):告诉服务器这个消息我拒绝接收,basicNack可以一次性拒绝多个消息。deliveryTag: 表示该消息的index(long类型的数字);multiple: 是否批量(true:将一次性拒绝所有小于deliveryTag的消息);requeue:指定被拒绝的消息是否重新回到队列;
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
public void basicReject(long deliveryTag, boolean requeue):也是用于拒绝消息,但是只能拒绝一条消息,不能同时拒绝多个消息。deliveryTag: 表示该消息的index(long类型的数字);requeue:指定被拒绝的消息是否重新回到队列;

四、示例

下面通过一个示例说明在RabbitMQ中实现发送确认和消费者确认的使用方法。

【a】引入pom.xml依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

【b】配置文件application.yml:

server:port: 6666
spring:application:name: mq-message-confirm2rabbitmq:host: 127.0.0.1virtual-host: /vhostusername: wshpassword: wshport: 5672#消息发送确认回调publisher-confirms: true#指定消息确认模式为手动确认listener:simple:acknowledge-mode: manual#发送返回监听回调publisher-returns: true

这里需要注意两点:

  • spring.rabbitmq.publisher-confirms = true
  • spring.rabbitmq.listener.simple.acknowledge-mode = manual

【c】自定义消息发送、返回回调监听类

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;/*** @Description 自定义消息发送确认的回调* @Author weishihuai* @Date 2019/6/27 10:42* <p>* 实现接口:implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback* ConfirmCallback:只确认消息是否正确到达交换机中,不管是否到达交换机,该回调都会执行;* ReturnCallback:如果消息从交换机未正确到达队列中将会执行,正确到达则不执行;*/
@Component
public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {private static final Logger logger = LoggerFactory.getLogger(CustomConfirmAndReturnCallback.class);@Autowiredprivate RabbitTemplate rabbitTemplate;/*** PostConstruct: 用于在依赖关系注入完成之后需要执行的方法上,以执行任何初始化.*/@PostConstructpublic void init() {//指定 ConfirmCallbackrabbitTemplate.setConfirmCallback(this);//指定 ReturnCallbackrabbitTemplate.setReturnCallback(this);}/*** 消息从交换机成功到达队列,则returnedMessage方法不会执行;* 消息从交换机未能成功到达队列,则returnedMessage方法会执行;*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {logger.info("returnedMessage回调方法>>>" + new String(message.getBody(), StandardCharsets.UTF_8) + ",replyCode:" + replyCode+ ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);}/*** 如果消息没有到达交换机,则该方法中isSendSuccess = false,error为错误信息;* 如果消息正确到达交换机,则该方法中isSendSuccess = true;*/@Overridepublic void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {logger.info("confirm回调方法>>>回调消息ID为: " + correlationData.getId());if (isSendSuccess) {logger.info("confirm回调方法>>>消息发送到交换机成功!");} else {logger.info("confirm回调方法>>>消息发送到交换机失败!,原因 : [{}]", error);}}}

注意这里我同时也实现了RabbitTemplate.ReturnCallback返回回调接口,并且重写了returnedMessage()方法,返回回调主要指的是:如果消息从交换机未正确到达队列中将会执行,正确到达则不执行returnedMessage()。

【d】RabbitMQ配置信息

/*** @Description: RabbitMQ配置信息,绑定交换器、队列、路由键设置* @author: weishihuai* @Date: 2019/6/27 10:38* <p>* 说明:* <p>* 1. 声明Exchange交换器;* 2. 声明Queue队列;* 3. 绑定:BindingBuilder绑定队列到交换器,并设置路由键;*/
@Component
public class RabbitMQConfig {private static final String EXCHANGE_NAME = "message_confirm_exchange";private static final String QUEUE_NAME = "message_confirm_queue";private static final String ROUTING_KEY = "user.#";@Beanprivate TopicExchange topicExchange() {return new TopicExchange(EXCHANGE_NAME);}@Beanprivate Queue queue() {return new Queue(QUEUE_NAME);}@Beanprivate Binding bindingDirect() {return BindingBuilder.bind(queue()).to(topicExchange()).with(ROUTING_KEY);}}

【e】发送者:这里发送了三条消息,如果三条消息中某条消息已经被拒绝过一次,那么触发basicNack()重新回到队列中,如果该消息再次被拒绝,那么消费者将会调用basicReject()直接拒绝该条消息,以后也不会再次接收该消息。

/*** @Description: 生产者* @author: weishihuai* @Date: 2019/6/27 10:39*/
@Component
public class Producer {private static final Logger logger = LoggerFactory.getLogger(Producer.class);private static final String EXCHANGE_NAME = "message_confirm_exchange";private static final String ROUTING_KEY = "user.add.submit";@Autowiredpublic RabbitTemplate rabbitTemplate;public void sendMessage() {for (int i = 1; i <= 3; i++) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());logger.info("【Producer】发送的消费ID = {}", correlationData.getId());String msg = "hello confirm message" + i;logger.info("【Producer】发送的消息 = {}", msg);rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, msg, correlationData);}}}

【f】消费者1:这里模拟了在处理消息的时候触发一个空指针异常,用于触发拒绝某个消息。

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;/*** @Description: 消费者1* @author: weishihuai* @Date: 2019/6/27 10:42*/
@Component
public class Consumer01 {private static final Logger logger = LoggerFactory.getLogger(Consumer01.class);@RabbitListener(queues = "message_confirm_queue")public void receiveMessage01(String msg, Channel channel, Message message) throws IOException {try {// 这里模拟一个空指针异常,String string = null;string.length();logger.info("【Consumer01成功接收到消息】>>> {}", msg);// 确认收到消息,只确认当前消费者的一个消息收到channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {logger.info("【Consumer01】消息已经回滚过,拒绝接收消息 : {}", msg);// 拒绝消息,并且不再重新进入队列//public void basicReject(long deliveryTag, boolean requeue)channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {logger.info("【Consumer01】消息即将返回队列重新处理 :{}", msg);//设置消息重新回到队列处理// requeue表示是否重新回到队列,true重新入队//public void basicNack(long deliveryTag, boolean multiple, boolean requeue)channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}e.printStackTrace();}}}

【g】消费者2:该消费者为正常消费消息。

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;/*** @Description: 消费者2* @author: weishihuai* @Date: 2019/6/27 10:42*/
@Component
public class Consumer02 {private static final Logger logger = LoggerFactory.getLogger(Consumer02.class);@RabbitListener(queues = "message_confirm_queue")public void receiveMessage02(String msg, Channel channel, Message message) throws IOException {try {logger.info("【Consumer02成功接收到消息】>>> {}", msg);// 确认收到消息,只确认当前消费者的一个消息收到channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {logger.info("【Consumer02】消息已经回滚过,拒绝接收消息 : {}", msg);// 拒绝消息,并且不再重新进入队列//public void basicReject(long deliveryTag, boolean requeue)channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {logger.info("【Consumer02】消息即将返回队列重新处理 :{}", msg);//设置消息重新回到队列处理// requeue表示是否重新回到队列,true重新入队//public void basicNack(long deliveryTag, boolean multiple, boolean requeue)channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}e.printStackTrace();}}}

【h】启动项目,查看运行结果

由控制台结果可见,hello confirm message1这条消息第一次被consumer1拒绝了一次,执行basicNack重新回到队列,第二次又被判断为该条消息已经回滚过,调用basicReject方法又被拒绝并且禁止重新回到队列,这样该条消息将不会被消费者重新消费。

hello confirm message2这条消息成功被consumer2消费掉,hello confirm message3这条消息第一次也被 consumer1拒绝了,但是在重新回到队列之后,被consumer2成功消费了。

同时可以看到,三条消息都正确从发送者到达交换机,所以都执行了 confirm(CorrelationData correlationData, boolean isSendSuccess, String error)回调方法。

因为消息都成功从交换机正确到达队列中,所有监听的returnCallback的returnedMessage()方法并没有被执行。下面我们测试一下,假设指定一个binding key不匹配的。修改下面的路由键,让消息无法从交换机正确路由到队列上:

首先在RabbitMQ管理控制台将之前的user.#绑定键解除绑定:

重新启动项目,查看控制台日志:

可见,三条消息都未能正确从交换机路由到队列,所以都执行了returnedMessage回调方法。

下面我们测试一下消息从发送者未能正确到达交换机的情况,这里主要修改一个不存在的交换机名称,这样消息就不能正确到达消费者监听队列所在的交换机message_confirm_exchange,从而触发confirmCallback中发送失败的情况,error为错误原因。

控制台日志:

2019-07-07 11:22:27.762  INFO 12528 --- [ 127.0.0.1:5672] c.w.s.s.c.CustomConfirmAndReturnCallback : confirm回调方法>>>回调消息ID为: 01ee6352-861e-4d96-8346-d808508ae3d2
2019-07-07 11:22:27.762  INFO 12528 --- [ 127.0.0.1:5672] c.w.s.s.c.CustomConfirmAndReturnCallback : confirm回调方法>>>消息发送到交换机失败!,原因 : [channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'message_confirm_exchange2' in vhost '/vhost', class-id=60, method-id=40)]
2019-07-07 11:22:27.762  INFO 12528 --- [ 127.0.0.1:5672] c.w.s.s.c.CustomConfirmAndReturnCallback : confirm回调方法>>>回调消息ID为: 0a6e426a-c93a-4f53-9f1d-0375b8f61a41
2019-07-07 11:22:27.762  INFO 12528 --- [ 127.0.0.1:5672] c.w.s.s.c.CustomConfirmAndReturnCallback : confirm回调方法>>>消息发送到交换机失败!,原因 : [channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'message_confirm_exchange2' in vhost '/vhost', class-id=60, method-id=40)]
2019-07-07 11:22:27.762  INFO 12528 --- [ 127.0.0.1:5672] c.w.s.s.c.CustomConfirmAndReturnCallback : confirm回调方法>>>回调消息ID为: d11ad471-3a4c-4bfe-804b-eda966c4df51
2019-07-07 11:22:27.762  INFO 12528 --- [ 127.0.0.1:5672] c.w.s.s.c.CustomConfirmAndReturnCallback : confirm回调方法>>>消息发送到交换机失败!,原因 : [channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'message_confirm_exchange2' in vhost '/vhost', class-id=60, method-id=40)]

五、总结

本文主要介绍了RabbitMQ中生产者消息发送确认和消费者接收确认,同时也扩展了监听返回回调returnCallback,在实际项目中,一般都用手动确认的方式,再加上一些补偿措施,这样可以保证绝大部分的消息不会出现丢失的情况。本文是笔者的一些学习总结,欢迎大家补充说明,希望能对大家有所参考,有所帮助。

RabbitMQ消息确认机制之Confirm模式总结相关推荐

  1. RabbitMQ消息确认机制

    文章目录 1. 事务机制 2. Confirm模式 2.1 生产者 2.1.1 普通Confirm模式 2.1.2 批量Confirm模式 2.1.3 异步Confirm模式 2.2 消费者 3. 其 ...

  2. Java短信确认机制_JAVA 消息确认机制之 ACK 模式

    JAVA 消息确认机制之 ACK 模式 CLIENT_ACKNOWLEDGE : 客户端手动确认, 这就意味着 AcitveMQ 将不会 "自作主张" 的为你 ACK 任何消息, ...

  3. RabbitMQ消息确认机制和消息重发机制

    一.机制 首先我们要知道一条消息的传递过程. 生产者 -> 交换机 ->  队列 我们的生产者生产消息,生产完成的消息发送到交换机,由交换机去把这个消息转发到对应的队列上.这其中我们可能在 ...

  4. RabbitMQ消息确认机制-可靠抵达

    消息发送到被消费的流程: JAVA的生产端的发送数据----->Broker(消息服务器)-------->达到Exchange交换机------------->通过路由键到达Que ...

  5. rabbitmq消息确认机制及死信队列的使用

    关于rabbitmq的基本概念和相关的理论这里就不做过多介绍了,在之前的篇幅中有过相应的介绍,也可以查询一些资料详细了解一下rabbitmq的基础知识,下面要介绍的点主要包括两个方面, 1.rabbi ...

  6. RabbitMQ消息确认机制-07

    在 Rabbitmq 中我们可以通过持久化来解决因为服务器异常而导致丢失的问题, 除此之外我们还会遇到一个问题:生产者将消息发送出去之后,消息到底有没有正确到达 Rabbit 服务器呢?如果不做出处理 ...

  7. RabbitMQ 消息确认机制confirm代码编写

  8. RabbitMQ 消息确认机制 以及 原理解析

    https://www.cnblogs.com/DBGzxx/p/10091070.html

  9. springboot + rabbitmq 用了消息确认机制,感觉掉坑里了

    最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人 ...

  10. RabbitMQ 基本消息模型和消息确认机制

    ​01 前言 关于 RabbitMQ 服务器的安装,本章节不做介绍,请培养个人动手能力,自行百度解决.RabbitMQ 成功安装后(win 版),浏览器输入:localhost:15672,则可以进入 ...

最新文章

  1. STM32最小系统电路
  2. CEO 赠书 | 节省 50% 的人生,终止“瞎忙”式勤奋
  3. 细节决定成败,做网站也是如此
  4. 4.Android的学习(了解代码与部分配置)
  5. java.util.logging.Logger基础教程
  6. WindowsPhone 7 页面导航和虚拟路径、导航传值
  7. 最全的Pycharm debug技巧
  8. java jquery easyui_java中用jquery-easyui插件做可编辑datagird列表
  9. [转载] 使用hexo+github搭建免费个人博客详细教程
  10. 自然语言处理NLP知识结构
  11. 爬取中国天气网获取全国城市编码并存入mysql数据库
  12. AutoCAD中禁用shift+鼠标中键组合作为动态观察的功能
  13. 产业AI公司的简单调研
  14. 海湾主机汉字注释表打字出_海湾消防主机字根表-海湾消防主机
  15. Short Pairing-based Non-interactive Zero-Knowledge Arguments
  16. 各种门平面图画法_关于CAD各种门怎么画平面图就行 CAD铝合金门窗
  17. mininet-ovs转发行为与流表不对应
  18. 并发编程:进程+线程+协程
  19. MySQL中的ROWNUM的实现 阿星小栈
  20. 开展计算机课程的目的,信息工程学院开展“计算机应用基础”课程教学研讨会...

热门文章

  1. Pyspark:NLP(文本分类)
  2. C/C++[入门最后两题]
  3. Mac OS开启黑暗模式
  4. android.mk 依赖关系,Android NDK学习(二):编译脚本语法Android.mk和Application.mk
  5. 尼奥智能陪伴机器人如何绑定设备_巴巴腾 智能陪护儿童机器人A3,为儿童专业定制的小伙伴...
  6. presto安装及使用 1
  7. 393.UTF-8编码验证
  8. 类的可访问性(C++)
  9. iis启动服务时提示在本地计算机 无法启动iis admin服务,无法启动IIS Express Web服务器...
  10. 为什么年龄大了近视还增加_都是做近视手术,为什么价格区别这么大?