消息应答机制

消息应答机制是保证RabbitMQ能够把消息发送给消费者,但是消息发送给了消费者并不能代表消息能正确被消费,所以保证消息能够被消费者正确消费才能够保证业务和数据的完整。


       RabbitMQ为了能把消息正确发送给消费者,提供了一种消息应答机制,就是告诉RabbitMQ消息已经收到,消息应答分为自动应答手动应答

自动应答

RabbitMQ给消费者推送消息的时候,自动应答是默认打开的,在自动应答打开的时候,当RabbitMQ将消息发送给消费者之后,RabbitMQ就认为消息已经被传递成功了,RabbitMQ就会将消息从队列中删除掉,而不会去管消费者是不是真的将消息处理成功。

假如:消费者在处理消息的过程中,程序死掉了,消息其实是没有被正确消费的。或者消费者处理的速度很慢,但是能还是能够加收消息,RabbitMQ会一直给消费者发送消息的,这样就会出现消费者程序出现消息堆积阻塞的情况,也可能把消费者程序整死的。这样看来,自动应答是不安全的。所以在正常开发中不建议使用自动应答。

自动应答消费者代码

package rabbitmq.ced.ack;import com.rabbitmq.client.*;/*** 消息确认机制  自动应答 消费者代码** @author 崔二旦* @since now*/
public class AutoAckConsumer {public static void main(String[] args) {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置连接属性connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3. 从连接工厂中获取连接connection = connectionFactory.newConnection("消费者");// 3. 在连接中创建信道,RabbitMQ中的所有操作都是在信道中完成的channel = connection.createChannel();System.out.println("等待接收消息......");/** RabbitMQ推送给消费者消息回调接口,在该接口中用于编写如何对消息进行处理。* @param1 消费者注册到RabbitMQ之后,RabbitMQ给生成的一个该消费者的唯一标识* @param2 推送过来的消息的信息。其中包括真正的数据body(消息体),*         Properties(消息的属性信息),*         Envelope(包装信息),该对象里有*                  deliveryTag(消息的ID),*                  redeliver(是否重新投递,当RabbitMQ发现消费者无法应答的时候,*                            RabbitMQ会将消息重新编排,进行重新投递),*                  exchange(所使用的交换机),*                  routingKey(路由KEY)。*/DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());System.out.println("接收到队列发送来的消息:" + message);};/** rabbitmq取消该消费者对信道中队列的订阅时,调用的回调接口。* 当我们在RabbitMQ管理界面手动删除该队列时,就会调用该接口。* @param1 消费者注册到RabbitMQ之后,RabbitMQ给生成的一个该消费者的唯一标识*/CancelCallback cancelCallback = (consumerTag) -> {System.out.println("消息消费被中断" + consumerTag);};//自动应答boolean autoAck = true;/** 消费者订阅队列,在某个队列上注册消费者,在这个队列中有消息时,* 就会把消息转发给此Channel来处理,如果这个队列有多个消费者,* RabbitMQ则会采用轮转的方式将消息分发给各个消费者。** @param1 队列名称 将自己注册到那个队列中,以便消费那个队列的消息。* @param2 消息成功接收后是否要自动应答,当设置为true时,代表自动应答,false时代表手动应答。* @param3 当队列中有消息是,rabbitmq调用的回调接口,用于将消息传递过来。* @param4 rabbitmq取消该消费者对信道中队列的订阅时,调用的回调接口,*         如被订阅的队列被删除时,rabbitmq就会通过该接口通知消费者,*         以便消费者做相应处理。消费者可以根据consumerTag标识,*         主动调用channel.basicCancel(consumerTag)方法进行对RabbitMQ队列的注册关系进行解除。* @return 该方法的返回值是该消费者注册到RabbitMQ之后,RabbitMQ给生成的一个该消费者的唯一标识。*/String consumerTag = channel.basicConsume("hello", autoAck, deliverCallback, cancelCallback);System.out.println("注册到RabbitMQ中后,RabbitMQ给的唯一标识是:" + consumerTag);} catch (Exception e) {e.printStackTrace();System.out.println("消息接收异常");} finally {// 释放关闭连接信道与连接if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}

手动应答

手动应答就是我们需要通过设置,将消费者设置成为手动应答模式,手动应答的时候,只有当我们把消费正确消费掉之后,在告诉RabbitMQ该消息已经被成功接收,这时RabbitMQ才会将消息从队列中删除掉。这样就可以保证消息的安全了。手动应答和异步确认一样都是异步处理的。

上边说到手动应答是在消费者处理完消息之后给RabbitMQ返回的应答消息,所以手动应答设置应该被写到收到消息并处理完消息之后。

如下:

 DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());System.out.println("接收到队列发送来的消息:" + message);//------ 该位置为处理消息的业务逻辑。表示消息已经处理完成。------/** 手动应答* @param1 deliveryTag 消息应答标记,消息的ID* @param2 multiple:(false、只应答接收到的那个消息 true、应答所有传递过来的消息,批量应答)*/finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};

手动应答消费者代码

package rabbitmq.ced.ack;import com.rabbitmq.client.*;/*** 消息确认机制  手动应答 消费者代码** @author 崔二旦* @since now*/
public class ManualAckConsumer {public static void main(String[] args) {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置连接属性connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");Connection connection = null;Channel channel = null;try {// 3. 从连接工厂中获取连接connection = connectionFactory.newConnection("消费者");// 3. 在连接中创建信道,RabbitMQ中的所有操作都是在信道中完成的channel = connection.createChannel();System.out.println("等待接收消息......");//lambda表达式中使用的变量应该是final或final,所以重新给channel赋值final Channel finalChannel = channel;/** RabbitMQ推送给消费者消息回调接口,在该接口中用于编写如何对消息进行处理。* @param1 消费者注册到RabbitMQ之后,RabbitMQ给生成的一个该消费者的唯一标识* @param2 推送过来的消息的信息。其中包括真正的数据body(消息体),*         Properties(消息的属性信息),*         Envelope(包装信息),该对象里有*                  deliveryTag(消息的ID),*                  redeliver(是否重新投递,当RabbitMQ发现消费者无法应答的时候*                           (某种原因丢失连接,如:通道关闭,连接关闭,TCP连接丢失),*                            RabbitMQ会将消息重新编排入队,进行重新投递,*                            如果其它消费者可以处理,RabbitMQ会将消息重新投递给其它消费者),*                  exchange(所使用的交换机),*                  routingKey(路由KEY)。*/DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());System.out.println("接收到队列发送来的消息:" + message);//------ 该位置为处理消息的业务逻辑。表示消息已经处理完成。------/** 手动应答* @param1 deliveryTag 消息应答标记,消息的ID* @param2 multiple:(false、只应答接收到的那个消息 true、应答所有传递过来的消息,批量应答)*         假如有5,6,7,8四个消息被传递过来,当前消息为8*         false时:只会应答8这个消息,5,6,7三个消息不会进行应答*         true时:会将5,6,7,8这四个消息全部应答。*/finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};/** rabbitmq取消该消费者对信道中队列的订阅时,调用的回调接口。* 当我们在RabbitMQ管理界面手动删除该队列时,就会调用该接口。* @param1 消费者注册到RabbitMQ之后,RabbitMQ给生成的一个该消费者的唯一标识*/CancelCallback cancelCallback = (consumerTag) -> {System.out.println("消息消费被中断" + consumerTag);};//自动应答boolean autoAck = true;/** 消费者订阅队列,在某个队列上注册消费者,在这个队列中有消息时,* 就会把消息转发给此Channel来处理,如果这个队列有多个消费者,* RabbitMQ则会采用轮转的方式将消息分发给各个消费者。** @param1 队列名称 将自己注册到那个队列中,以便消费那个队列的消息。* @param2 消息成功接收后是否要自动应答,当设置为true时,代表自动应答,false时代表手动应答。* @param3 当队列中有消息是,rabbitmq调用的回调接口,用于将消息传递过来。* @param4 rabbitmq取消该消费者对信道中队列的订阅时,调用的回调接口,*         如被订阅的队列被删除时,rabbitmq就会通过该接口通知消费者,*         以便消费者做相应处理。消费者可以根据consumerTag标识,*         主动调用channel.basicCancel(consumerTag)方法进行对RabbitMQ队列的注册关系进行解除。* @return 该方法的返回值是该消费者注册到RabbitMQ之后,RabbitMQ给生成的一个该消费者的唯一标识。*/String consumerTag = channel.basicConsume("hello", autoAck, deliverCallback, cancelCallback);System.out.println("注册到RabbitMQ中后,RabbitMQ给的唯一标识是:" + consumerTag);} catch (Exception e) {e.printStackTrace();System.out.println("消息接收异常");} finally {// 释放关闭连接信道与连接if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}

注意

本次介绍的消息应答,代码中注释比较多,而且对于深入理解应答机制非常重要,所以要认真看代码中的注释。

崔二旦: 崔二旦用于记录学习的代码 - Gitee.com

RabbitMQ学习记录(六)-应答机制相关推荐

  1. RabbitMQ学习总结(六)之消息应答

    作者处于学习阶段,刚刚完成RabbitMQ的学习,作为学生,我会用更通俗的说法,来叙述自己对RabbitMQ的了解.愿各位大佬看到有见解错误的地方和叙述不好的地方,能够帮忙纠正.来帮助大家更加深入的了 ...

  2. gRPC学习记录(六)--客户端连接池

    对于客户端来说建立一个channel是昂贵的,因为创建channel需要连接,但是建立一个stub是很简单的,就像创建一个普通对象,因此Channel就需要复用,也就是说需要实现一个连接池应用.本文使 ...

  3. 1.Rabbitmq学习记录《本质介绍,协议AMQP分析》

    1.RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现. RabbitMQ的优势-: 除了Qpid,RabbitMQ是唯一一个实现了AMQP ...

  4. RabbitMQ学习记录 - Direct之Routing模式

    (内容均来自RabbitMQ官网:https://www.rabbitmq.com/tutorials/tutorial-four-java.html) 前面几篇学习了下RabbitMQ的" ...

  5. 《你好,放大器》----学习记录(六)

    6 仪器使用.焊接.调试和撰写报告 6.1 仪器使用基础 6.1.1 正确连接仪器和电路板 直流稳压电源.信号源.示波器以及实验用电路板的一种参考接线方式,如图所示: 图片来源于<你好,放大器& ...

  6. SpringBoot 微信点餐系统学习记录六-订单表和订单详情表的后端开发

    在dataobject文件夹创建订单表和订单详情表的实体类 package com.imooc.dataobject;import com.imooc.enums.OrderStatusEnum; i ...

  7. RabbitMQ学习记录

    什么是MQ 消息中间件MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法. 在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而通过这种异步处理的方式可大 ...

  8. 用python画糖葫芦_python学习记录六

    返回函数 def calc_sum(*args): ax=0for n inargs: ax= ax +nreturn ax def lazy_sum(*args): def sum(): ax = ...

  9. Rabbitmq学习笔记(尚硅谷2021)

    Rabbitmq学习笔记 (尚硅谷) 1.MQ 的概念 1.1 什么是 MQ? 1.2 为什么要用 MQ? 削峰 解耦 异步 1.3 MQ 的分类 ActiveMQ Kafka RocketMQ Ra ...

最新文章

  1. rrdtool的完整例子
  2. 别以为真懂Openstack: 虚拟机创建的50个步骤和100个知识点(2)
  3. mysql pom依赖关系_常用的POM依赖
  4. java内存泄漏笔记
  5. 怎样把颜色转换为字符串
  6. 世界三大顶级音响_世界三大汽车赛事是什么?一起来了解一下
  7. Poj - 3254 Corn Fields (状压DP)(入门)
  8. 搭建基础架构-Order
  9. 移动web开发之rem布局(rem基础、媒体查询、 less 基础、rem适配方案)
  10. PKU 学生反馈 2009 - 4
  11. Elasticsearch--入门-删除数据bulk批量操作导入样本测试数据---全文检索引擎ElasticSearch工作笔记008
  12. 斯坦福机器学习课程 Exercise 习题三
  13. [其他]Ubuntu安装genymotion后unable to load VirtualBox engine
  14. flink watermark 生成机制与总结
  15. 首先,打破一切常规 学习笔记 之五
  16. 构建 Java 镜像的 10 个最佳实践
  17. 机器学习课程讲义·第二章,线性模型系列
  18. 使用github免费搭建个人网站详细教程
  19. Docker Nginx 如何重新加载配置
  20. 量化投资与数据分析一: 如何用PYTHON下载WIND数据并转化成dataframe格式 分享

热门文章

  1. 计算机常用软件考试试题,计算机常用工具软件试卷试题.docx
  2. Python:编写缩写词,由一个短语中每个单词的第一个字母组成,均为大写。
  3. Pursue a Post-graduate Degree
  4. 意法半导体推出首款8引脚STM32微控制器,可适用于简单应用
  5. 下载github包慢
  6. JEECMS v8 发布,java 开源 CMS 系统
  7. python 打卡程序_基于python35,使用基于Linux的任务命令,CNIC自动打卡程序
  8. Python中的数学运算操作符使用进阶
  9. Devops 基础介绍
  10. 评分模板html,小程序模板-评分星星