rabbitmq(二):死信队列,springboot 实现3种情况
rabbitmq(二):死信队列
1:死信队列的用途(3种情况)
先考虑,死信队列存在的意义。它就像是一个兜底队列,当message出现三种情况(无处可去)的时候,死信exchange就会收下他们。
Messages from a queue can be “dead-lettered”; that is, republished to an exchange when any of the following events occur:
- The message is negatively acknowledged by a consumer using basic.reject or basic.nack with requeue parameter set to false.
- The message expires due to per-message TTL; or
- The message is dropped because its queue exceeded a length limit
来自官网:https://www.rabbitmq.com/dlx.html
死信队列,就和普通的队列一样,需要声明和绑定。只是作用上,略有区别。废话不多说,来一个小demo,再来一个场景应用下。
2:springboot-rabbitmq实现
2-1:application.yml
server:port: 8021
spring:#给项目来个名字application:name: rabbitmq-provider#配置rabbitMq 服务器rabbitmq:# host: 172.2200.10.2host: 127.0.0.1port: 5672username: rabbitmqpassword: rabbitmq#虚拟host 可以不设置,使用server默认hostvirtual-host: /# 用来配置发布者异步确认,尚硅谷视频中说,queue持久+消息持久(rabbitTemplate 自动持久化消息)+生产者确认 可以实现不丢失消息publisher-confirm-type: correlatedredis:# host: 172.20.10.2host: 127.0.0.1port: 5070
2-2:configuration
package com.example.springbootrabbitmq.configuration;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author xx* @date 2021/10/5 16:24*/
@Configuration
@Slf4j
public class RabbitmqConfig {@Autowiredprivate CachingConnectionFactory connectionFactory;//自动装配消息监听器所在的容器工厂配置类实例@Autowiredprivate SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;// @Bean
// public MessageConverter jsonMessageConverter() {// return new Jackson2JsonMessageConverter();
// }@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {// log.info("发送成功");} else {// correlationData.getReturned().getMessage().getMessageProperties().getMessageId();
// log.info("发送失败");}});return rabbitTemplate;}/*** 针对不同的消费者,可以进行不同的容器配置,来实现多个消费者应用不同的配置。*/@Bean(name = "singleListenerContainer")public SimpleRabbitListenerContainerFactory listenerContainer() {//定义消息监听器所在的容器工厂SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();//设置容器工厂所用的实例factory.setConnectionFactory(connectionFactory);//设置消息在传输中的格式,在这里采用JSON的格式进行传输factory.setMessageConverter(new Jackson2JsonMessageConverter());
// //设置并发消费者实例的初始数量。在这里为1个
// factory.setConcurrentConsumers(1);
// //设置并发消费者实例的最大数量。在这里为1个
// factory.setMaxConcurrentConsumers(1);
// //设置并发消费者实例中每个实例拉取的消息数量-在这里为1个
// factory.setPrefetchCount(1);// 关闭自动应答factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 设置不公平分发,更改为每次读取1条消息,在消费者未回执确认之前,不在进行下一条消息的投送,而不是默认的轮询;// 也就是说,我处理完了,我再接受下一次的投递,属于消费者端的控制// 不设置的话,就是采用轮询的方法去监听队列,你一条我一条factory.setPrefetchCount(1);return factory;}
}
package com.example.springbootrabbitmq.configuration;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class DeadLetterConfig {public static final String DEAD_LETTER_QUEUE_NAME = "dead_letter_queue";public static final String DEAD_LETTER_EXCHANGE_NAME = "dead_letter_exchange";public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";// 业务队列1专门用来测试channel.basicNackpublic static final String BUSINESS1_QUEUE_NAME = "business1_queue";public static final String BUSINESS1_EXCHANGE_NAME = "business1_exchange_name";public static final String BUSINESS1_ROUTING_KEY = "busines1s_routing_key";// 业务队列2专门用来测试TTL,以及队列溢出public static final String BUSINESS2_QUEUE_NAME = "business2_queue";public static final String BUSINESS2_EXCHANGE_NAME = "business2_exchange_name";public static final String BUSINESS2_ROUTING_KEY = "business2_routing_key";/******************************************死信队列配置 start********************************************/@Beanpublic Queue deadLetterQueue() {return new Queue(DEAD_LETTER_QUEUE_NAME, true, false, false);}@Beanpublic FanoutExchange deadLetterExchange() {return new FanoutExchange(DEAD_LETTER_EXCHANGE_NAME, true, false);}@Beanpublic Binding bindingDeadLetterQueue() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());}/******************************************死信队列配置 end*************************************************//******************************************普通业务队列配置 start********************************************/@Beanpublic DirectExchange business1Exchange() {return new DirectExchange(BUSINESS1_EXCHANGE_NAME, true, false);}@Beanpublic Queue business1Queue() {// 将队列绑定到死信交换机上Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);return new Queue(BUSINESS1_QUEUE_NAME, true, false, false, args);}@Beanpublic Binding bindingBusiness1Queue() {return BindingBuilder.bind(business1Queue()).to(business1Exchange()).with(BUSINESS1_ROUTING_KEY);}// ------------------------------------------@Beanpublic DirectExchange business2Exchange() {return new DirectExchange(BUSINESS2_EXCHANGE_NAME, true, false);}@Beanpublic Queue business2Queue() {Map<String, Object> args = new HashMap<>();// 模拟队列溢出args.put("x-max-length", 3);// 将其绑定要死信交换机上args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);return new Queue(BUSINESS2_QUEUE_NAME, true, false, false, args);}@Beanpublic Binding bindingBusiness2Queue() {return BindingBuilder.bind(business2Queue()).to(business2Exchange()).with(BUSINESS2_ROUTING_KEY);}/******************************************普通业务队列配置 end********************************************/
}
2-3:Business1ProducerController
package com.example.springbootrabbitmq.controller;import com.alibaba.fastjson.JSONObject;
import com.example.springbootrabbitmq.configuration.DeadLetterConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;@RestController
@Slf4j
@RequestMapping("/businessProducer")
public class Business1ProducerController {@Resourceprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMessage")public void sendMessage(String msg) {Map<String, String> msgMap = new HashMap<>();msgMap.put("message", msg);String messageJson = JSONObject.toJSONString(msgMap);Message message = MessageBuilder.withBody(messageJson.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();log.info("生产者发送:" + new String(message.getBody(), StandardCharsets.UTF_8));rabbitTemplate.convertAndSend(DeadLetterConfig.BUSINESS1_EXCHANGE_NAME, DeadLetterConfig.BUSINESS1_ROUTING_KEY, message);}@GetMapping("/sendMessageTTL")public void sendMessageTTL(String msg) {Map<String, String> msgMap = new HashMap<>();msgMap.put("message", msg);String messageJson = JSONObject.toJSONString(msgMap);Message message = MessageBuilder.withBody(messageJson.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)// 设置0s过期,为了检查是不是会直接入死信队列.setExpiration("0").setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();log.info("生产者发送:" + new String(message.getBody(), StandardCharsets.UTF_8));rabbitTemplate.convertAndSend(DeadLetterConfig.BUSINESS2_EXCHANGE_NAME, DeadLetterConfig.BUSINESS2_ROUTING_KEY, message);}@GetMapping("/sendMessageOverflow")public void sendMessageOverflow(String msg) {Map<String, String> msgMap = new HashMap<>();msgMap.put("message", msg);String messageJson = JSONObject.toJSONString(msgMap);Message message = MessageBuilder.withBody(messageJson.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();log.info("生产者发送:" + new String(message.getBody(), StandardCharsets.UTF_8));rabbitTemplate.convertAndSend(DeadLetterConfig.BUSINESS2_EXCHANGE_NAME, DeadLetterConfig.BUSINESS2_ROUTING_KEY, message);}
}
2-4:BusinessConsumer
package com.example.springbootrabbitmq.controller;import com.example.springbootrabbitmq.configuration.DeadLetterConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;@Component
@Slf4j
public class BusinessConsumer {// 测试reject情况@RabbitListener(queues = DeadLetterConfig.BUSINESS1_QUEUE_NAME, containerFactory = "singleListenerContainer")public void processMessage(Message message, Channel channel) throws Exception {String messageString = new String(message.getBody(), StandardCharsets.UTF_8);String messageId = message.getMessageProperties().getMessageId();log.info("业务consumer消费者接受:" + messageString);// 第一种情况,拒绝,不重新回归队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);}// // 测试队列满/TTL
// @RabbitListener(queues = DeadLetterConfig.BUSINESS2_QUEUE_NAME, containerFactory = "singleListenerContainer")
// public void processMessage2(Message message, Channel channel) throws Exception {// String messageString = new String(message.getBody(), StandardCharsets.UTF_8);
// String messageId = message.getMessageProperties().getMessageId();
//
// // 睡10s,更好的体现队列溢出的情况
// TimeUnit.SECONDS.sleep(10);
//
// log.info("consumer2消费者接受:" + messageString);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);;
// }// 监听死信队列@RabbitListener(queues = DeadLetterConfig.DEAD_LETTER_QUEUE_NAME, containerFactory = "singleListenerContainer")public void processMessage1(Message message, Channel channel) throws Exception {String messageString = new String(message.getBody(), StandardCharsets.UTF_8);String messageId = message.getMessageProperties().getMessageId();log.info("死信队列consumer消费者接受:" + messageString);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}
3: 测试场景
3-1:TTL过期
流程:将businessConsumer
中的processMessage2()方法注释掉,没有消费者,且ttl设置为0,势必不会被消费,message会直接过期,进而进入死信队列。
postman_request: http://localhost:8021/businessProducer/sendMessageTTL?msg=测试directExchange2021-10-13 10:03:24.083 INFO 18476 --- [nio-8021-exec-1] c.e.s.c.Business1ProducerController : 生产者发送:{"message":"测试directExchange"}
2021-10-13 10:03:24.097 INFO 18476 --- [nectionFactory1] c.e.s.configuration.RabbitmqConfig : 发送成功
2021-10-13 10:03:24.135 INFO 18476 --- [ntContainer#1-1] c.e.s.controller.Business1Consumer : 死信队列consumer消费者接受:{"message":"测试directExchange"}
3-2:队列满
流程:线程睡眠10s模拟业务处理情况。通过postman发送多次消息,使队列满。点5次,队列存3个,消费者消费1个,死信队列里面存一个。
postman_request:http://localhost:8021/businessProducer/sendMessageOverflow?msg=测试directExchange2021-10-13 10:41:11.233 INFO 20685 --- [nio-8021-exec-1] c.e.s.c.Business1ProducerController : 生产者发送:{"message":"测试directExchange"}
2021-10-13 10:41:12.309 INFO 20685 --- [nio-8021-exec-3] c.e.s.c.Business1ProducerController : 生产者发送:{"message":"测试directExchange"}
2021-10-13 10:41:13.095 INFO 20685 --- [nio-8021-exec-4] c.e.s.c.Business1ProducerController : 生产者发送:{"message":"测试directExchange"}
2021-10-13 10:41:13.791 INFO 20685 --- [nio-8021-exec-5] c.e.s.c.Business1ProducerController : 生产者发送:{"message":"测试directExchange"}
2021-10-13 10:41:14.643 INFO 20685 --- [nio-8021-exec-6] c.e.s.c.Business1ProducerController : 生产者发送:{"message":"测试directExchange"}
2021-10-13 10:41:14.648 INFO 20685 --- [ntContainer#2-1] c.e.s.controller.BusinessConsumer : 死信队列consumer消费者接受:{"message":"测试directExchange"}
2021-10-13 10:41:21.281 INFO 20685 --- [ntContainer#1-1] c.e.s.controller.BusinessConsumer : consumer2消费者接受:{"message":"测试directExchange"}
2021-10-13 10:41:31.285 INFO 20685 --- [ntContainer#1-1] c.e.s.controller.BusinessConsumer : consumer2消费者接受:{"message":"测试directExchange"}
2021-10-13 10:41:41.291 INFO 20685 --- [ntContainer#1-1] c.e.s.controller.BusinessConsumer : consumer2消费者接受:{"message":"测试directExchange"}
2021-10-13 10:41:51.299 INFO 20685 --- [ntContainer#1-1] c.e.s.controller.BusinessConsumer : consumer2消费者接受:{"message":"测试directExchange"}
3-3:basicNack,失败,并且不重新回队列
request: http://localhost:8021/businessProducer/sendMessage?msg=测试directExchange2021-10-13 10:55:16.692 INFO 20923 --- [nio-8021-exec-2] c.e.s.c.Business1ProducerController : 生产者发r送:{"message":"测试directExchange"}
2021-10-13 10:55:16.758 INFO 20923 --- [ntContainer#0-1] c.e.s.controller.BusinessConsumer : 业务consumer消费者接受:{"message":"测试directExchange"}
2021-10-13 10:55:16.762 INFO 20923 --- [ntContainer#2-1] c.e.s.controller.BusinessConsumer : 死信队列consumer消费者接受:{"message":"测试directExchange"}
4:应用场景
网上搜到的一共有两种场景:
- 订单超过多少分钟,自动取消,就是到了死信队列后再进行业务处理修改订单状态即可。
- 作为兜底队列,如果出现异常,可以dilivery进死信队列中,之后进行异常排查等。
5:bug记录
- 修改queue配置的时候,得先删除队列,不然会报错
2021-10-12 17:39:46.669 ERROR 15287 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'dead_letter_queue' in vhost '/': received none but current is the value 'dead_letter_exchange' of type 'longstr', class-id=50, method-id=10)
死信队列没有收到message。
原因:配置没正确,要在业务队列里面添加如下参数配置,通过加入参数即可实现绑定,不需要像业务队列一样,绑定进交换机里面。
@Beanpublic Queue business1Queue() {// 将队列绑定到死信交换机上Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);return new Queue(BUSINESS1_QUEUE_NAME, true, false, false, args);}
测试队列溢出的时候,没有效果,一直被consumer消费。
原因:没有配置,
factory.setPrefetchCount(1);
没有配置,通过web看队列,发现都是处于unack
状态,而不是ready。配置上了之后,当前消息处理完之后,consumer才会去从队列中拉去一条新的message。
6:小结
一共有3种情况,message会进入消息队列。
修改队列状态的时候应该先删除
上面的代码是都可以复现的,有问题可以留言,大家一起讨论,共同进步。
rabbitmq(二):死信队列,springboot 实现3种情况相关推荐
- RabbitMQ高级特性(五):RabbitMQ之死信队列DLX
一.死信队列简介 (1)死信队列 死信队列,英文缩写:DLX .Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就 ...
- RabbitMq(五) -- 死信队列和延迟队列
1. 死信 1.1 死信的概念 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue ...
- RabbitMQ:死信队列
✨ RabbitMQ:死信队列 1.死信队列 1.1死信队列基本介绍 1.2消息成为死信的三种情况 1.3死信队列结构图 1.4死信的处理方式 2.TTL消息过期时间 2.1基本介绍 2.2生产者 2 ...
- RabbitMQ 之死信队列
文章目录 什么是死信队列 如何配置死信队列 死信消息的变化 死信队列应用场景 总结 什么是死信队列 为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将 ...
- rabbitMQ学习-死信队列
死信队列 死信:顾名思义就是无法被消费的消息,一般情况下,product将消息投递到broker或者直接到queue里,consumer从queue取出消息,进行消费,但某些时候由于特定的原因导致qu ...
- RabbitMQ实现死信队列
目录 死信队列是什么 怎样实现一个死信队列 说明 实现过程 导入依赖 添加配置 编写mq配置类 添加业务队列的消费者 添加死信队列的消费者 添加消息发送者 添加消息测试类 测试 死信队列的应用场景 总 ...
- 6 RabbitMQ之死信队列
文章目录 1. 案例一:消息TTL过期 2. 案例二:队列达到最大长度 3. 案例三:消息被拒 死信就是无法被消费的消息成为死信.正常情况下,生产者生产的消息投递到交换机,交换机根据routingKe ...
- RabbitMQ的死信队列的应用
强烈推荐一个大神的人工智能的教程:http://www.captainbed.net/zhanghan [前言] 最近在项目中用到了RabbitMQ来做异步处理,自己将这块儿系统的搞了搞,下面主要记录 ...
- 消息中间件之rabbitMQ实战-死信队列
该篇文章内容较多,包括有rabbitMq相关的一些简单理论介绍,集成spring Boot,provider消息推送实例,consumer消息消费实例,Direct(直连类型交换机).Fanout(广 ...
最新文章
- redis服务器索引文件删除,Redis基本命令整理
- 系统的环境变量path的作用是什么
- js控制select大全
- 参数检验——当总体分布已知(如总体为正态分布),根据样本数据对总体分布的统计参数进行推断 非参数检验——利用样本数据对总体分布形态等进行推断的方法。...
- Vue使用better-scroll左右菜单联动
- wxWidgets:可用类概述
- 医疗数据治理——构建高质量医疗大数据智能分析数据基础
- Matplotlib学习---用matplotlib画箱线图(boxplot)
- PAT 00-自测1. 打印沙漏(20)
- go语言的特殊变量 iota
- java栈链_java实现链栈与队列详解
- 基于javaweb的驾校车辆教练预约系统ssm+Vue
- java聊天室类图怎么画,UML课程设计(java web网上聊天室附源码)
- Rails——migration
- 紫川猜想--第二十二卷第六章
- stm32作为spi的从机使用例程
- C:素数(质数)的判断以及输出
- Class Activation Mapping(CAM)介绍
- windows10忘记开机密码解决办法
- UltraISO 软碟通制作系统U盘