SpringAMQP对RabbitMQ消息的确认(消费)

之前已经简单介绍了基本是从发送方去确认的,我们需要在配置文件当中开启发送方确认模式,共育两种,一种是相对于交换机一个是相对于队列。

本次的介绍是基于消费者对消息的确认,也就是基本的逻辑是消费者对消息处理的确认。

基本上生产者这边的代码是不需要去改变的,但是我们需要让消费者去正确的人发送到消息。我们按照什么形式都可以,确认与不确认都可以,因为本次主要是为了测试消费端对消息的处理确认。

首先生产者的配置和相关的代码

spring:
#  profiles:
#    active: devrabbitmq:host:  #远程主机外网地址username: shabi #远程用户名password:  #密码virtual-host: shabi #虚拟机名称port: 5672 #远程主机端口名称publisher-confirm-type: correlated #开启确认模式publisher-returns: true

然后就是之前我们在测试类当中写的一些发送的各种模式,包括一般的默认发送,以及发送者确认,以及发送者回执。
然后具体的配置类就是真不要进行了队列和交换机的声明和创建,然后进行了具体绑定。

package com.jgdabc.rabbitconfig;import com.rabbitmq.client.ConnectionFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {//交换机public static final String Exchange_Name = "boot_rabbit_topic_ee";public static final String Queue_Name = "boot_rabbit_topic_qqq";@Bean("bootExchange") //交换机的创建public Exchange bootExchange(){return ExchangeBuilder.topicExchange(Exchange_Name).durable(true).build(); //绑定一个topic类型的交换机,持久化并构建}@Bean("bootQueue") //队列的创建public Queue bootQueue(){return QueueBuilder.durable(Queue_Name).build();}
//    队列和交换机的绑定关系
//    哪个队列
//  哪个交换机
//    routing key
//    这里不写的话会按照方法名注入@Beanpublic Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();}}
package com.jgdabc;import com.jgdabc.rabbitconfig.RabbitConfig;import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.junit4.SpringRunner;import java.util.*;
import java.util.stream.IntStream;
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class DemoApplicationTests {//    注入RabbitTemplate@Autowiredprivate RabbitTemplate template;@Testpublic void testSend() {template.convertAndSend(RabbitConfig.Exchange_Name, "boot.haha", "hi");}/*** 在yml配置文件当中开启去人模式* 在RabbitTemplate定义ConfirmCallBack回调函数*/@Testpublic void testConfirm() {//定义回调template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println(b);System.out.println("confirm 方法被执行了");if (!b) {//接收成功System.out.println("消息成功接收");} else {System.out.println("消息接受失败," + b);}}});//发送一条消息template.convertAndSend(RabbitConfig.Exchange_Name, "boot.haha", "你好,我的小宝贝");}
//    回退模式,当消息发送给Exchange后,Exchange路由到Queue失败后才会执行ReturnCallBack/*** 回退模式* 1:在yml文件当中开启回退模式* 2:设置ReturnCallBack* 3:设置Exchange处理消息的模式* <1:如果消息没有路由到Queue,那么丢弃掉消息(默认)* <2:如果路由没有回退到Queue,返回给消息发送方*/@Testpublic void testReturn() {//        设置交换机处理消息的模式template.setMandatory(true);//设置为true交换机会将路由到队列失败的消息再返回给发送者template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("消息对象:" + returnedMessage.getMessage());System.out.println("错误码:" + returnedMessage.getReplyCode());System.out.println("错误信息:" + returnedMessage.getReplyText());System.out.println("交换机:" + returnedMessage.getExchange());System.out.println("路由键:" + returnedMessage.getRoutingKey());System.out.println("return执行了...");}});template.convertAndSend(RabbitConfig.Exchange_Name, "boot.haha", "hi");}}

然后是这次主要介绍的消费端。

先看配置

spring:rabbitmq:host: username: password: virtual-host:port: 5672
#    publisher-confirm-type: correlated
#    publisher-returns: true
#    开启ack也就是手动消息确认listener:
#      设置手动确认simple:acknowledge-mode: manual

具体的类,

package com.jgdabc.boot_rabbit_consumer;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.List;/*** consumer ack 机制* 设置手动签收,acknowledge = “manual”* 如果消息成功处理,则调用channel的basicAck签收* // * 如果消息处理失败,则调用channel的basicNack拒绝签收,broker重新发送给consumer*/
@Component
public class ConsumerSpringbootApplication implements ChannelAwareMessageListener {@RabbitListener(queues = "boot_rabbit_topic_qqq") //指定要消费消息的队public void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("接收转换消息:" + new String(message.getBody()));
//            手动签收channel.basicAck(deliveryTag, true);} catch (IOException e) {channel.basicNack(deliveryTag, true, true);}
//        第二个参数代表运行多条消息被签收
//        拒绝签收,第三个参数重回队列,如果设置为true,则消息重新回到队列}public void onMessage(Message message) {//        System.out.println(message);}}

这个方法具体没有用,之所以写上,是因为我实现上边那个类的时候,如果不实现这个方法的话,那么启动就会报错。所以就写上了。

然后主要在说明一些参数

long deliveryTag = message.getMessageProperties().getDeliveryTag();
message.getMessageProperties ().getMessageId () 获取 MessageID,获取的 MessageID 可以用来判断是否已经被消费者消费过了,如果已经消费则取消再次消费。

下面这里加了一个异常的捕获,因为可能消费者这个处理消息出错,所以进行了异常的捕获。首先一定是接收了具体的消息。然后会进行一个签收

channel.basicAck (long deliveryTag, boolean multiple)为消息确认,参数1:消息的id;参数2:是否批量应答。

basic.nack方法为不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue,与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到。

try {System.out.println("接收转换消息:" + new String(message.getBody()));
//            手动签收channel.basicAck(deliveryTag, true);} catch (IOException e) {channel.basicNack(deliveryTag, true, true);}

这里只是列举一些方法的使用,当然还有其他的方法,后面慢慢来熟悉好了。打开这个管理面板,可以看到没有队列,这里提前已经删除掉之前的创建好的队列和交换机了,为的是为了是运行展示后的效果比较明显一些。
交换机和队列都是可以在程序中创建和绑定的。

现在我们在生产者测试类去生产一条消息。可以随便去用一个方法就可以了。
我们就运行这个方法

因为没有做错误,所以不会有错误信息输出的。

现在我们去面板看,可以看到这里就自动创建出来队列和生产了一条消息,当然交换机的创建和队列的绑定也是执行了。


现在我们在消费者去消费,执行的话,我们就去执行启动类就好。

因为我们这个类加上了这个注解,其实就是已经实例化给spring了。表明了已经成为spring的一个组件,所以直接去启动启动运行类就好了。

你看这里就接收到消息了,并且会处于一个持续运行的等待过程。

同时消费处理成功验证。

现在我们可以去让程序出错,来验证消息处理失败情况。
我们在签收之前让代码出一个错。

哦对了,这个异常是算数异常,我们之前捕获一个大的异常算了。

下面那段改成这样。

现在重新开始之前的步骤。然后这里器是会一直打印这段话,主要是因为我们设置basic.nack方法为不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue。我们这里出现异常,第二个参数为true,代表不确认,第三个代表重新让它回到队列,设置为true该行消息重新回到队列,但是我们这里会持续接收进行接收消费,于是来来回回就形成了死循环。


同时验证我们这里设置的重回队列确实生效。

大概就是这样的一个模式,当热这种处理模式并不是合适的,主要是举个例子,其他的方法处理模式顺着这个模板来就行了。

主要是为了忘记后好回顾,必要的时候直接就地取材。

SpringACK对RabbitMQ消息的确认(消费)相关推荐

  1. RabbitMq 消息接收确认(可靠消费)

    RabbitMq 消息接收确认(可靠消费) 一.消息接收确认是什么: 是RabbitMq确认消息是否成功被消费的一种机制. 有三种消息确认方式: 1.none代表不确认:该模式下,只要队列获取到了消息 ...

  2. RabbitMq 消息发送确认(可靠生产和推送确认)

    RabbitMq 消息发送确认(可靠生产和推送确认) 此文档只是本人在项目中碰到的一些问题而产生的个人相关总结,实际上的消息确认机制可以做得更多(比如分布式事务等,但此处不做阐述). 一.消息发送确认 ...

  3. RabbitMQ消息的确认模式

    确认模式 包括两种自动确认.手动确认 自动确认 只要消息从队列中获取,无论消费者获取到消息后, 是否执行成功,都认为是消息已经成功消费. 手动确认 消费者从队列中获取消息后,服务器会将该消息标记为不可 ...

  4. rabbitmq消息ACK确认机制及发送失败处理

    rabbitmq为确保消息发送和接收成功,采用ack机制. (1)生产者producter发送消息到mq时,mq会发送ack给producter告知消息是否投递成功: (2)消费者consumer接收 ...

  5. RabbitMQ消息confirm确认机制

  6. rabbitmq消息重回队列

    什么是消息的ACK 不管是哪种类型的消息中间件,都有一一种机制,即consumer端的消息ACK,通俗来讲,就是消息的确认消费机制,为什么会有这个ACK机制呢?这个和消息中间件的架构设计有关 下面是关 ...

  7. rabbitmq 启动异常_RabbitMQ:消息发送确认 与 消息接收确认(ACK)

    默认情况下如果一个 Message 被消费者所正确接收则会被从 Queue 中移除 如果一个 Queue 没被任何消费者订阅,那么这个 Queue 中的消息会被 Cache(缓存),当有消费者订阅时则 ...

  8. RabbitMQ消息重复消费问题

    业务背景 消息队列在数据传输的过程中,为了保证消息传递的可靠性,一般会对消息采用ack确认机制,如果消息传递失败,消息队列会进行重试,此时便可能存在消息重复消费的问题. 比如,用户到银行取钱后会收到扣 ...

  9. RabbitMQ消息确认机制

    文章目录 1. 事务机制 2. Confirm模式 2.1 生产者 2.1.1 普通Confirm模式 2.1.2 批量Confirm模式 2.1.3 异步Confirm模式 2.2 消费者 3. 其 ...

最新文章

  1. Mysql如何创建短索引(前缀索引)
  2. python queue 查询是否在队列中_python队列Queue的详解
  3. 打开和保存文件的对话框
  4. SAP CRM BSPWDApplication.do
  5. python sort 多级排序_为什么在python中使用排序功能进行多级排序...
  6. 第5章 函数与函数式编程
  7. oracle电梯案例,Oracle技术嘉年华的一个案例,redo的那些事,连载一
  8. 超大Sql文件_超大文件_mysql数据导入到mycat数据库_亲测好用---Linux运维工作笔记053
  9. CF1110E Magic Stones(构造题)
  10. IT不是技术,IT是一个世界
  11. ApacheCN 翻译活动进度公告 2019.3.3
  12. c语言添加vmp保护代码,易语言使用vmp加壳保护程序
  13. html怎么创建表格,html怎么做表格
  14. Visual Studio 调试时右侧诊断工具窗口如何显示
  15. tensorflow sigmoid 如何计算训练数据的正确率_逻辑回归算法!数据产品经理必看...
  16. C#学习笔记五——选择文件、文件夹操作
  17. python爬虫爬取视频
  18. word2010设置护眼背景
  19. chkdsk 停滞_职业停滞–早期发现和治疗
  20. 当上领导以后才明白的事情

热门文章

  1. 什么是Divi主题生成器?
  2. 流程体系 - 变更管理
  3. Flutter使用插件flutter_staggered_grid_view实现分页瀑布流效果
  4. 面试官问:前后端分离项目,有什么优缺点?我说:没
  5. springboot 多模块项目构建【创建√ + 启动√ 】
  6. 感谢C语言吧吧友奉上的C语言小程序练习---初学者练手
  7. 头插法和尾插法建立单链表详解与实现
  8. 用友erp同步输出文件服务器拒绝,用友系统备份帐套,文件拒绝访问怎么办?
  9. JPG图片中的文字或表格怎么转成Word文档?
  10. 除了北上广深杭,还有哪些城市移动互联网发展的不错