文章目录

  • 1. 事务机制
  • 2. Confirm模式
    • 2.1 生产者
      • 2.1.1 普通Confirm模式
      • 2.1.2 批量Confirm模式
      • 2.1.3 异步Confirm模式
    • 2.2 消费者
  • 3. 其他

消费者如何确保消息一定能够消费成功呢?

由于在前面工作队列模式里面我们了解了应答模式,所以我们可以很自信的回答如上题目。

通过应答形式,默认自动应答,可以修改为手动应答来保证消息消费成功。

其实应答形式就是 RabbitMQ 消息确认机制的一种体现,我们再来看看问题的产生背景:

生产者发送消息出去之后,不知道到底有没有发送到 RabbitMQ 服务器, 默认是不知道的。而且有的时候我们在发送消息之后,后面的逻辑出问题了,我们不想要发送之前的消息了,需要撤回该怎么做。

两种解决方案:

  1. AMQP 事务机制
  2. Confirm 模式

1. 事务机制

事务机制分为三部分,开启事务,提交事务,事务回滚,如下:

  1. txSelect 将当前 channel 通道设置为 transaction 模式(开启事务)
  2. txCommit 提交当前事务
  3. txRollback 事务回滚

我们通过一个例子模拟消息生产者发送消息过程发生异常,进行事务回滚的过程。

public class Producer {/** 队列名称 */private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) throws IOException, TimeoutException {/** 1.获取连接 */Connection newConnection = MQConnectionUtils.newConnection();/** 2.创建通道 */Channel channel = newConnection.createChannel();/** 3.创建队列声明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 4.发送消息 */try {/** 4.1 开启事务 */channel.txSelect();String msg = "我是生产者生成的消息";System.out.println("生产者发送消息:"+msg);channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());/** 4.2 提交事务 - 模拟异常 */int i = 1/0;channel.txCommit();}catch (Exception e){e.printStackTrace();System.out.println("发生异常,我要进行事务回滚了!");/** 4.3 事务回滚 */channel.txRollback();}finally {channel.close();newConnection.close();}}}

打印结果:
生产者发送消息:我是生产者生成的消息
java.lang.ArithmeticException: / by zero at club.sscai.producer.Producer.main(Producer.java:37)
发生异常,我要进行事务回滚了!

2. Confirm模式

像上方这种采用 AMQP 事务机制来保证消息的准确到达,在一定程度上是消耗了性能的,所以我们再来看看 Confirm 模式。

Confirm 模式分为两块,一是生产者的 Confirm 模式,再就是消费者的 Confirm 模式。

2.1 生产者

通过生产者的确认模式我们是要保证消息准确达到客户端,而与 AMQP 事务不同的是 Confirm 是针对一条消息的,而事务是可以针对多条消息的。

Confirm 模式最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息。

Confirm 的三种实现方式:

  1. channel.waitForConfirms() 普通发送方确认模式;
  2. channel.waitForConfirmsOrDie() 批量确认模式;
  3. channel.addConfirmListener() 异步监听发送方确认模式
2.1.1 普通Confirm模式
public class Producer11 {/** 队列名称 */private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) throws IOException, TimeoutException {/** 1.获取连接 */Connection newConnection = MQConnectionUtils.newConnection();/** 2.创建通道 */Channel channel = newConnection.createChannel();/** 3.创建队列声明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 4.开启发送方确认模式 */channel.confirmSelect();/** 5.发送消息 */for (int i = 0; i < 5; i++) {channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, (" Confirm模式, 第" + (i + 1) + "条消息").getBytes());try {if (channel.waitForConfirms()) {System.out.println("发送成功");}else{System.out.println("进行消息重发");}} catch (InterruptedException e) {e.printStackTrace();}}/** 5.关闭通道、连接 */channel.close();newConnection.close();}
}

在推送消息之前,channel.confirmSelect() 声明开启发送方确认模式,再使用channel.waitForConfirms() 等待消息被服务器确认即可。

2.1.2 批量Confirm模式
public class Producer22 {/** 队列名称 */private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {/** 1.获取连接 */Connection newConnection = MQConnectionUtils.newConnection();/** 2.创建通道 */Channel channel = newConnection.createChannel();/** 3.创建队列声明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 4.开启发送方确认模式 */channel.confirmSelect();/** 5.发送消息 */for (int i = 0; i < 5; i++) {channel.basicPublish("", QUEUE_NAME, null, (" Confirm模式, 第" + (i + 1) + "条消息").getBytes());}/** 6.直到所有信息都发布,只要有一个未确认就会IOException */channel.waitForConfirmsOrDie();System.out.println("全部执行完成");/** 5.关闭通道、连接 */channel.close();newConnection.close();}
}

channel.waitForConfirmsOrDie() 使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未被确认就会抛出 IOException 异常。

2.1.3 异步Confirm模式
public class Producer33 {/** 队列名称 */private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) throws IOException, TimeoutException {/** 1.获取连接 */Connection newConnection = MQConnectionUtils.newConnection();/** 2.创建通道 */Channel channel = newConnection.createChannel();/** 3.创建队列声明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 4.开启发送方确认模式 */channel.confirmSelect();for (int i = 0; i < 10; i++) {String message = "我是生产者生成的消息:" + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));}/** 5.发送消息 异步监听确认和未确认的消息 */channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("未确认消息,标识:" + deliveryTag);}@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));}});/** 6.关闭通道、连接 *//** channel.close();*//** newConnection.close();*/}}

异步模式的优点,就是执行效率高,不需要等待消息执行完,只需要监听消息即可,以上异步返回的信息如下:

可以看出,代码是异步执行的,消息确认有可能是批量确认的,是否批量确认在于返回的 multiple 的参数,此参数为 bool 值,如果 true 表示批量执行了 deliveryTag 这个值以前的所有消息,如果为 false 的话表示单条确认。

维持异步调用要求我们不能断掉连接,因此注释掉第6步。

2.2 消费者

为了保证消息从队列可靠地到达消费者,RabbitMQ 提供消息确认机制(message acknowledgment)。消费者在声明队列时,可以指定 noAck 参数,当 noAck=false 时, RabbitMQ 会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ 会在队列中消息被消费后立即删除它。

在消费者中 Confirm 模式又分为手动确认和自动确认。

关于两者的介绍:

自动确认: 在自动确认模式下,消息在发送后立即被认为是发送成功。 这种模式可以提高吞吐量(只要消费者能够跟上),不过会降低投递和消费者处理的安全性。 这种模式通常被称为“发后即忘”。 与手动确认模式不同,如果消费者的TCP连接或信道在成功投递之前关闭,该消息则会丢失。

手动确认: 使用自动确认模式时需要考虑的另一件事是消费者过载。 手动确认模式通常与有限的信道预取一起使用,限制信道上未完成(“进行中”)传送的数量。 然而,对于自动确认,根据定义没有这样的限制。 因此,消费者可能会被交付速度所压倒,可能积压在内存中,堆积如山,或者被操作系统终止。 某些客户端库将应用TCP反压(直到未处理的交付积压下降超过一定的限制时才停止从套接字读取)。 因此,只建议当消费者可以有效且稳定地处理投递时才使用自动投递方式。

综上:尽量选择手动确认方式。

主要实现代码:

// 手动确认消息
channel.basicAck(envelope.getDeliveryTag(), false);// 关闭自动确认
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

3. 其他

1、如果 RabbitMQ 服务器宕机了,消息会丢失吗?

不会丢失,RabbitMQ 服务器支持消息持久化机制,会把消息持久化到硬盘上。

2、如何确保消息正确地发送至RabbitMQ?

RabbitMQ 使用发送方确认模式,确保消息正确地发送到 RabbitMQ。

发送方确认模式:将信道设置成 confirm 模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一ID)。如果RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack(not acknowledged,未确认)消息。

发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。


我创建了一个java相关的公众号,用来记录自己的学习之路,感兴趣的小伙伴可以关注一下微信公众号哈:niceyoo

RabbitMQ消息确认机制相关推荐

  1. rabbitmq消息确认机制及死信队列的使用

    关于rabbitmq的基本概念和相关的理论这里就不做过多介绍了,在之前的篇幅中有过相应的介绍,也可以查询一些资料详细了解一下rabbitmq的基础知识,下面要介绍的点主要包括两个方面, 1.rabbi ...

  2. RabbitMQ消息确认机制和消息重发机制

    一.机制 首先我们要知道一条消息的传递过程. 生产者 -> 交换机 ->  队列 我们的生产者生产消息,生产完成的消息发送到交换机,由交换机去把这个消息转发到对应的队列上.这其中我们可能在 ...

  3. RabbitMQ消息确认机制-可靠抵达

    消息发送到被消费的流程: JAVA的生产端的发送数据----->Broker(消息服务器)-------->达到Exchange交换机------------->通过路由键到达Que ...

  4. RabbitMQ消息确认机制-07

    在 Rabbitmq 中我们可以通过持久化来解决因为服务器异常而导致丢失的问题, 除此之外我们还会遇到一个问题:生产者将消息发送出去之后,消息到底有没有正确到达 Rabbit 服务器呢?如果不做出处理 ...

  5. RabbitMQ 消息确认机制 以及 原理解析

    https://www.cnblogs.com/DBGzxx/p/10091070.html

  6. RabbitMQ 消息确认机制confirm代码编写

  7. springboot + rabbitmq 用了消息确认机制,感觉掉坑里了

    最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI.不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人 ...

  8. RabbitMQ 基本消息模型和消息确认机制

    ​01 前言 关于 RabbitMQ 服务器的安装,本章节不做介绍,请培养个人动手能力,自行百度解决.RabbitMQ 成功安装后(win 版),浏览器输入:localhost:15672,则可以进入 ...

  9. RabbitMQ ACK消息确认机制 快速入门

    RabbitMQ 消息确认机制ACK ack机制保证的是broker和消费者之间的可靠性 ack表示的是消费端收到消息后的确认方式,有三种确认方式 自动确认:acknowledge="non ...

最新文章

  1. Chapter 4 Invitations——4
  2. Linux安装程序Anaconda分析
  3. 2020年网站优化思路从哪着手?
  4. jmeter获取mysql数据并作为请求参数使用
  5. 动漫风格迁移——AnimeGANv2的实现【复现】
  6. java8获取当前时间并格式化
  7. pythonpandas读取csv文件最后一行_简单小案例(一):使用Pandas在Python中读取和写入CSV文件...
  8. Mybatis简单入门及配置文件标签详情
  9. DBParameter比拼接字符串慢的解决办法
  10. Struct1中 Form表单提交的几种方式以及无刷新提交的方式
  11. 电子通信计算机行业分类,电子信息产业行业分类目录
  12. VC++键盘钩子demo
  13. tongweb自动部署_用apache配置TongWeb集群
  14. 卷积码主要是对抗_采用卷积编码的原因和优势 浅析卷积码之特点
  15. 公众号开发精品教程(4)——生成带参数的二维码及合成海报
  16. SQL :Date 函数
  17. 微信公众号模板消息管理
  18. 如何使用苹果官方文档
  19. 如何获取微信小程序包
  20. windows下vue-cli及webpack 构建网站(三)使用组件

热门文章

  1. [css] css中的选择器、属性、属性值区分大小写吗?
  2. [css] 举例说明CSS特性检测的方式有哪些?
  3. 工作77::配置id传值地址
  4. “约见”面试官系列之常见面试题之第九十九篇之router和route(建议收藏)
  5. “约见”面试官系列之常见面试题之第八十六篇之nexttick(建议收藏)
  6. 前端学习(1390):多人管理项目10服务器认证
  7. shiro学习(4):shiro认证流程
  8. 第十一期:30秒内便能学会的30个实用Python代码片段
  9. 玩转oracle 11g(24):数据文件设置自扩展和监听日志文件过大处理
  10. python之lambda