rabbitmq(二):死信队列

1:死信队列的用途(3种情况)

先考虑,死信队列存在的意义。它就像是一个兜底队列,当message出现三种情况(无处可去)的时候,死信exchange就会收下他们。

Messages from a queue can be “dead-lettered”; that is, republished to an exchange when any of the following events occur:

  • The message is negatively acknowledged by a consumer using basic.reject or basic.nack with requeue parameter set to false.
  • The message expires due to per-message TTL; or
  • The message is dropped because its queue exceeded a length limit

来自官网:https://www.rabbitmq.com/dlx.html

死信队列,就和普通的队列一样,需要声明和绑定。只是作用上,略有区别。废话不多说,来一个小demo,再来一个场景应用下。

2:springboot-rabbitmq实现

2-1:application.yml

server:port: 8021
spring:#给项目来个名字application:name: rabbitmq-provider#配置rabbitMq 服务器rabbitmq:#    host: 172.2200.10.2host: 127.0.0.1port: 5672username: rabbitmqpassword: rabbitmq#虚拟host 可以不设置,使用server默认hostvirtual-host: /# 用来配置发布者异步确认,尚硅谷视频中说,queue持久+消息持久(rabbitTemplate 自动持久化消息)+生产者确认 可以实现不丢失消息publisher-confirm-type: correlatedredis:#    host: 172.20.10.2host: 127.0.0.1port: 5070

2-2:configuration

package com.example.springbootrabbitmq.configuration;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author xx* @date 2021/10/5 16:24*/
@Configuration
@Slf4j
public class RabbitmqConfig {@Autowiredprivate CachingConnectionFactory connectionFactory;//自动装配消息监听器所在的容器工厂配置类实例@Autowiredprivate SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;//    @Bean
//    public MessageConverter jsonMessageConverter() {//        return new Jackson2JsonMessageConverter();
//    }@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {//                log.info("发送成功");} else {//                correlationData.getReturned().getMessage().getMessageProperties().getMessageId();
//                log.info("发送失败");}});return rabbitTemplate;}/*** 针对不同的消费者,可以进行不同的容器配置,来实现多个消费者应用不同的配置。*/@Bean(name = "singleListenerContainer")public SimpleRabbitListenerContainerFactory listenerContainer() {//定义消息监听器所在的容器工厂SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();//设置容器工厂所用的实例factory.setConnectionFactory(connectionFactory);//设置消息在传输中的格式,在这里采用JSON的格式进行传输factory.setMessageConverter(new Jackson2JsonMessageConverter());
//        //设置并发消费者实例的初始数量。在这里为1个
//        factory.setConcurrentConsumers(1);
//        //设置并发消费者实例的最大数量。在这里为1个
//        factory.setMaxConcurrentConsumers(1);
//        //设置并发消费者实例中每个实例拉取的消息数量-在这里为1个
//        factory.setPrefetchCount(1);// 关闭自动应答factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 设置不公平分发,更改为每次读取1条消息,在消费者未回执确认之前,不在进行下一条消息的投送,而不是默认的轮询;// 也就是说,我处理完了,我再接受下一次的投递,属于消费者端的控制// 不设置的话,就是采用轮询的方法去监听队列,你一条我一条factory.setPrefetchCount(1);return factory;}
}
package com.example.springbootrabbitmq.configuration;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class DeadLetterConfig {public static final String DEAD_LETTER_QUEUE_NAME = "dead_letter_queue";public static final String DEAD_LETTER_EXCHANGE_NAME = "dead_letter_exchange";public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";// 业务队列1专门用来测试channel.basicNackpublic static final String BUSINESS1_QUEUE_NAME = "business1_queue";public static final String BUSINESS1_EXCHANGE_NAME = "business1_exchange_name";public static final String BUSINESS1_ROUTING_KEY = "busines1s_routing_key";// 业务队列2专门用来测试TTL,以及队列溢出public static final String BUSINESS2_QUEUE_NAME = "business2_queue";public static final String BUSINESS2_EXCHANGE_NAME = "business2_exchange_name";public static final String BUSINESS2_ROUTING_KEY = "business2_routing_key";/******************************************死信队列配置 start********************************************/@Beanpublic Queue deadLetterQueue() {return new Queue(DEAD_LETTER_QUEUE_NAME, true, false, false);}@Beanpublic FanoutExchange deadLetterExchange() {return new FanoutExchange(DEAD_LETTER_EXCHANGE_NAME, true, false);}@Beanpublic Binding bindingDeadLetterQueue() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());}/******************************************死信队列配置 end*************************************************//******************************************普通业务队列配置 start********************************************/@Beanpublic DirectExchange business1Exchange() {return new DirectExchange(BUSINESS1_EXCHANGE_NAME, true, false);}@Beanpublic Queue business1Queue() {// 将队列绑定到死信交换机上Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);return new Queue(BUSINESS1_QUEUE_NAME, true, false, false, args);}@Beanpublic Binding bindingBusiness1Queue() {return BindingBuilder.bind(business1Queue()).to(business1Exchange()).with(BUSINESS1_ROUTING_KEY);}// ------------------------------------------@Beanpublic DirectExchange business2Exchange() {return new DirectExchange(BUSINESS2_EXCHANGE_NAME, true, false);}@Beanpublic Queue business2Queue() {Map<String, Object> args = new HashMap<>();// 模拟队列溢出args.put("x-max-length", 3);// 将其绑定要死信交换机上args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);return new Queue(BUSINESS2_QUEUE_NAME, true, false, false, args);}@Beanpublic Binding bindingBusiness2Queue() {return BindingBuilder.bind(business2Queue()).to(business2Exchange()).with(BUSINESS2_ROUTING_KEY);}/******************************************普通业务队列配置 end********************************************/
}

2-3:Business1ProducerController

package com.example.springbootrabbitmq.controller;import com.alibaba.fastjson.JSONObject;
import com.example.springbootrabbitmq.configuration.DeadLetterConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;@RestController
@Slf4j
@RequestMapping("/businessProducer")
public class Business1ProducerController {@Resourceprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMessage")public void sendMessage(String msg) {Map<String, String> msgMap = new HashMap<>();msgMap.put("message", msg);String messageJson = JSONObject.toJSONString(msgMap);Message message = MessageBuilder.withBody(messageJson.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();log.info("生产者发送:" + new String(message.getBody(), StandardCharsets.UTF_8));rabbitTemplate.convertAndSend(DeadLetterConfig.BUSINESS1_EXCHANGE_NAME, DeadLetterConfig.BUSINESS1_ROUTING_KEY, message);}@GetMapping("/sendMessageTTL")public void sendMessageTTL(String msg) {Map<String, String> msgMap = new HashMap<>();msgMap.put("message", msg);String messageJson = JSONObject.toJSONString(msgMap);Message message = MessageBuilder.withBody(messageJson.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)// 设置0s过期,为了检查是不是会直接入死信队列.setExpiration("0").setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();log.info("生产者发送:" + new String(message.getBody(), StandardCharsets.UTF_8));rabbitTemplate.convertAndSend(DeadLetterConfig.BUSINESS2_EXCHANGE_NAME, DeadLetterConfig.BUSINESS2_ROUTING_KEY, message);}@GetMapping("/sendMessageOverflow")public void sendMessageOverflow(String msg) {Map<String, String> msgMap = new HashMap<>();msgMap.put("message", msg);String messageJson = JSONObject.toJSONString(msgMap);Message message = MessageBuilder.withBody(messageJson.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();log.info("生产者发送:" + new String(message.getBody(), StandardCharsets.UTF_8));rabbitTemplate.convertAndSend(DeadLetterConfig.BUSINESS2_EXCHANGE_NAME, DeadLetterConfig.BUSINESS2_ROUTING_KEY, message);}
}

2-4:BusinessConsumer

package com.example.springbootrabbitmq.controller;import com.example.springbootrabbitmq.configuration.DeadLetterConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;@Component
@Slf4j
public class BusinessConsumer {// 测试reject情况@RabbitListener(queues = DeadLetterConfig.BUSINESS1_QUEUE_NAME, containerFactory = "singleListenerContainer")public void processMessage(Message message, Channel channel) throws Exception {String messageString = new String(message.getBody(), StandardCharsets.UTF_8);String messageId = message.getMessageProperties().getMessageId();log.info("业务consumer消费者接受:" + messageString);// 第一种情况,拒绝,不重新回归队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);}//    // 测试队列满/TTL
//    @RabbitListener(queues = DeadLetterConfig.BUSINESS2_QUEUE_NAME, containerFactory = "singleListenerContainer")
//    public void processMessage2(Message message, Channel channel) throws Exception {//        String messageString = new String(message.getBody(), StandardCharsets.UTF_8);
//        String messageId = message.getMessageProperties().getMessageId();
//
//        // 睡10s,更好的体现队列溢出的情况
//        TimeUnit.SECONDS.sleep(10);
//
//        log.info("consumer2消费者接受:" + messageString);
//        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);;
//    }// 监听死信队列@RabbitListener(queues = DeadLetterConfig.DEAD_LETTER_QUEUE_NAME, containerFactory = "singleListenerContainer")public void processMessage1(Message message, Channel channel) throws Exception {String messageString = new String(message.getBody(), StandardCharsets.UTF_8);String messageId = message.getMessageProperties().getMessageId();log.info("死信队列consumer消费者接受:" + messageString);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}

3: 测试场景

3-1:TTL过期

流程:将businessConsumer中的processMessage2()方法注释掉,没有消费者,且ttl设置为0,势必不会被消费,message会直接过期,进而进入死信队列。

postman_request: http://localhost:8021/businessProducer/sendMessageTTL?msg=测试directExchange2021-10-13 10:03:24.083  INFO 18476 --- [nio-8021-exec-1] c.e.s.c.Business1ProducerController      : 生产者发送:{"message":"测试directExchange"}
2021-10-13 10:03:24.097  INFO 18476 --- [nectionFactory1] c.e.s.configuration.RabbitmqConfig       : 发送成功
2021-10-13 10:03:24.135  INFO 18476 --- [ntContainer#1-1] c.e.s.controller.Business1Consumer       : 死信队列consumer消费者接受:{"message":"测试directExchange"}

3-2:队列满

流程:线程睡眠10s模拟业务处理情况。通过postman发送多次消息,使队列满。点5次,队列存3个,消费者消费1个,死信队列里面存一个。

postman_request:http://localhost:8021/businessProducer/sendMessageOverflow?msg=测试directExchange2021-10-13 10:41:11.233  INFO 20685 --- [nio-8021-exec-1] c.e.s.c.Business1ProducerController      : 生产者发送:{"message":"测试directExchange"}
2021-10-13 10:41:12.309  INFO 20685 --- [nio-8021-exec-3] c.e.s.c.Business1ProducerController      : 生产者发送:{"message":"测试directExchange"}
2021-10-13 10:41:13.095  INFO 20685 --- [nio-8021-exec-4] c.e.s.c.Business1ProducerController      : 生产者发送:{"message":"测试directExchange"}
2021-10-13 10:41:13.791  INFO 20685 --- [nio-8021-exec-5] c.e.s.c.Business1ProducerController      : 生产者发送:{"message":"测试directExchange"}
2021-10-13 10:41:14.643  INFO 20685 --- [nio-8021-exec-6] c.e.s.c.Business1ProducerController      : 生产者发送:{"message":"测试directExchange"}
2021-10-13 10:41:14.648  INFO 20685 --- [ntContainer#2-1] c.e.s.controller.BusinessConsumer        : 死信队列consumer消费者接受:{"message":"测试directExchange"}
2021-10-13 10:41:21.281  INFO 20685 --- [ntContainer#1-1] c.e.s.controller.BusinessConsumer        : consumer2消费者接受:{"message":"测试directExchange"}
2021-10-13 10:41:31.285  INFO 20685 --- [ntContainer#1-1] c.e.s.controller.BusinessConsumer        : consumer2消费者接受:{"message":"测试directExchange"}
2021-10-13 10:41:41.291  INFO 20685 --- [ntContainer#1-1] c.e.s.controller.BusinessConsumer        : consumer2消费者接受:{"message":"测试directExchange"}
2021-10-13 10:41:51.299  INFO 20685 --- [ntContainer#1-1] c.e.s.controller.BusinessConsumer        : consumer2消费者接受:{"message":"测试directExchange"}

3-3:basicNack,失败,并且不重新回队列

request: http://localhost:8021/businessProducer/sendMessage?msg=测试directExchange2021-10-13 10:55:16.692  INFO 20923 --- [nio-8021-exec-2] c.e.s.c.Business1ProducerController      : 生产者发r送:{"message":"测试directExchange"}
2021-10-13 10:55:16.758  INFO 20923 --- [ntContainer#0-1] c.e.s.controller.BusinessConsumer        : 业务consumer消费者接受:{"message":"测试directExchange"}
2021-10-13 10:55:16.762  INFO 20923 --- [ntContainer#2-1] c.e.s.controller.BusinessConsumer        : 死信队列consumer消费者接受:{"message":"测试directExchange"}

4:应用场景

网上搜到的一共有两种场景:

  • 订单超过多少分钟,自动取消,就是到了死信队列后再进行业务处理修改订单状态即可。
  • 作为兜底队列,如果出现异常,可以dilivery进死信队列中,之后进行异常排查等。

5:bug记录

  • 修改queue配置的时候,得先删除队列,不然会报错
2021-10-12 17:39:46.669 ERROR 15287 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'dead_letter_queue' in vhost '/': received none but current is the value 'dead_letter_exchange' of type 'longstr', class-id=50, method-id=10)
  • 死信队列没有收到message。

    • 原因:配置没正确,要在业务队列里面添加如下参数配置,通过加入参数即可实现绑定,不需要像业务队列一样,绑定进交换机里面。

          @Beanpublic Queue business1Queue() {// 将队列绑定到死信交换机上Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);return new Queue(BUSINESS1_QUEUE_NAME, true, false, false, args);}
      
  • 测试队列溢出的时候,没有效果,一直被consumer消费。

    原因:没有配置,factory.setPrefetchCount(1); 没有配置,通过web看队列,发现都是处于unack状态,而不是ready。配置上了之后,当前消息处理完之后,consumer才会去从队列中拉去一条新的message。

6:小结

  • 一共有3种情况,message会进入消息队列。

  • 修改队列状态的时候应该先删除

  • 上面的代码是都可以复现的,有问题可以留言,大家一起讨论,共同进步。

rabbitmq(二):死信队列,springboot 实现3种情况相关推荐

  1. RabbitMQ高级特性(五):RabbitMQ之死信队列DLX

    一.死信队列简介 (1)死信队列 死信队列,英文缩写:DLX .Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就 ...

  2. RabbitMq(五) -- 死信队列和延迟队列

    1. 死信 1.1 死信的概念 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue ...

  3. RabbitMQ:死信队列

    ✨ RabbitMQ:死信队列 1.死信队列 1.1死信队列基本介绍 1.2消息成为死信的三种情况 1.3死信队列结构图 1.4死信的处理方式 2.TTL消息过期时间 2.1基本介绍 2.2生产者 2 ...

  4. RabbitMQ 之死信队列

    文章目录 什么是死信队列 如何配置死信队列 死信消息的变化 死信队列应用场景 总结 什么是死信队列 为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将 ...

  5. rabbitMQ学习-死信队列

    死信队列 死信:顾名思义就是无法被消费的消息,一般情况下,product将消息投递到broker或者直接到queue里,consumer从queue取出消息,进行消费,但某些时候由于特定的原因导致qu ...

  6. RabbitMQ实现死信队列

    目录 死信队列是什么 怎样实现一个死信队列 说明 实现过程 导入依赖 添加配置 编写mq配置类 添加业务队列的消费者 添加死信队列的消费者 添加消息发送者 添加消息测试类 测试 死信队列的应用场景 总 ...

  7. 6 RabbitMQ之死信队列

    文章目录 1. 案例一:消息TTL过期 2. 案例二:队列达到最大长度 3. 案例三:消息被拒 死信就是无法被消费的消息成为死信.正常情况下,生产者生产的消息投递到交换机,交换机根据routingKe ...

  8. RabbitMQ的死信队列的应用

    强烈推荐一个大神的人工智能的教程:http://www.captainbed.net/zhanghan [前言] 最近在项目中用到了RabbitMQ来做异步处理,自己将这块儿系统的搞了搞,下面主要记录 ...

  9. 消息中间件之rabbitMQ实战-死信队列

    该篇文章内容较多,包括有rabbitMq相关的一些简单理论介绍,集成spring Boot,provider消息推送实例,consumer消息消费实例,Direct(直连类型交换机).Fanout(广 ...

最新文章

  1. redis服务器索引文件删除,Redis基本命令整理
  2. 系统的环境变量path的作用是什么
  3. js控制select大全
  4. 参数检验——当总体分布已知(如总体为正态分布),根据样本数据对总体分布的统计参数进行推断 非参数检验——利用样本数据对总体分布形态等进行推断的方法。...
  5. Vue使用better-scroll左右菜单联动
  6. wxWidgets:可用类概述
  7. 医疗数据治理——构建高质量医疗大数据智能分析数据基础
  8. Matplotlib学习---用matplotlib画箱线图(boxplot)
  9. PAT 00-自测1. 打印沙漏(20)
  10. go语言的特殊变量 iota
  11. java栈链_java实现链栈与队列详解
  12. 基于javaweb的驾校车辆教练预约系统ssm+Vue
  13. java聊天室类图怎么画,UML课程设计(java web网上聊天室附源码)
  14. Rails——migration
  15. 紫川猜想--第二十二卷第六章
  16. stm32作为spi的从机使用例程
  17. C:素数(质数)的判断以及输出
  18. Class Activation Mapping(CAM)介绍
  19. windows10忘记开机密码解决办法
  20. UltraISO 软碟通制作系统U盘

热门文章

  1. Android 蓝牙知识
  2. 微信企业号开发七:JSAPI模式
  3. 编辑器下运行exe或bat run exe or bat in editor
  4. 交替打印A1B2C3-Java多线程实现方式
  5. 基于拉丁超立方抽样与自适应策略的改进鲸鱼优化算法
  6. 微信群控系统的实现原理,微信群控系统源码的核心实现代码
  7. node-opcua的使用 --- [1] 简单server
  8. Python 分批次处理数据示例
  9. Request Headers 和Response Headers——请求头和响应头
  10. android oreo_您的手机何时将获得Android Oreo?