为什么80%的码农都做不了架构师?>>>   

何为延迟队列?

顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。

延迟消费是延迟队列最为常用的使用模式。如下图所示,生产者产生的消息首先会进入缓冲队列(图中红色队列)。通过RabbitMQ提供的TTL扩展,这些消息会被设置过期时间,也就是延迟消费的时间。等消息过期之后,这些消息会通过配置好的DLX转发到实际消费队列(图中蓝色队列),以此达到延迟消费的效果。

一、编写代码

1、编写常量类RabbitDeadQueueConstant

package com.lvgang.springbootrabbitmq.deadqueue;/*** @author lvgang*/
public class RabbitDeadQueueConstant {/*** 死信队列*/public  static final  String DL_QUEUQ = "QUEUE_DL";/*** 转发队列*/public static final String REDIRECT_QUEUE = "QUEUE_REDIRECT";/*** 死信EXCHANGE*/public static final String DL_EXCHANGE = "EXCHANGE_DL";/***  发送死信队列KEY*/public static final String DL_QUEUQ_KEY ="KEY_DL";/*** 发送到转发队列KEY*/public static final String REDIRECT_QUEUE_KEY ="KEY_R";}

2、编写配置类RabbitDeadQueueConfig

package com.lvgang.springbootrabbitmq.deadqueue;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @author lvgang*/
@Configuration
public class RabbitDeadQueueConfig {private static Logger logger = LoggerFactory.getLogger(RabbitDeadQueueConfig.class);/*** Queue 可以有4个参数*      1.队列名*      2.durable       持久化消息队列 ,rabbitmq重启的时候不需要创建新的队列 默认true*      3.auto-delete   表示消息队列没有在使用时将被自动删除 默认是false*      4.exclusive     表示该消息队列是否只在当前connection生效,默认是false*//*** 死信队列跟交换机类型没有关系 不一定为directExchange  不影响该类型交换机的特性.** @return the exchange*/@Beanpublic Exchange deadLetterExchange() {logger.info("创建deadLetterExchange成功");return ExchangeBuilder.directExchange(RabbitDeadQueueConstant.DL_EXCHANGE).durable(true).build();}/*** 声明一个死信队列.* x-dead-letter-exchange   对应  死信交换机* x-dead-letter-routing-key  对应 死信队列** @return the queue*/@Beanpublic Queue deadLetterQueue() {logger.info("创建deadLetterQueue成功");Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    声明  死信交换机args.put("x-dead-letter-exchange", RabbitDeadQueueConstant.DL_EXCHANGE);
//       x-dead-letter-routing-key    声明 死信路由键args.put("x-dead-letter-routing-key", RabbitDeadQueueConstant.REDIRECT_QUEUE_KEY);return QueueBuilder.durable(RabbitDeadQueueConstant.DL_QUEUQ).withArguments(args).build();}/*** 定义死信队列转发队列.** @return the queue*/@Beanpublic Queue redirectQueue() {logger.info("创建redirectQueue成功");return QueueBuilder.durable(RabbitDeadQueueConstant.REDIRECT_QUEUE).build();}/*** 死信路由通过 DL_KEY 绑定键绑定到死信队列上.* @return the binding*/@Beanpublic Binding deadLetterBinding() {logger.info("绑定deadLetterQueue到deadLetterExchange成功");return new Binding(RabbitDeadQueueConstant.DL_QUEUQ, Binding.DestinationType.QUEUE,RabbitDeadQueueConstant.DL_EXCHANGE, RabbitDeadQueueConstant.DL_QUEUQ_KEY, null);}/*** 死信路由通过 KEY_R 绑定键绑定到最终处理队列上.* @return the binding*/@Beanpublic Binding redirectBinding() {logger.info("绑定redirectQueue到deadLetterExchange成功");return new Binding(RabbitDeadQueueConstant.REDIRECT_QUEUE, Binding.DestinationType.QUEUE,RabbitDeadQueueConstant.DL_EXCHANGE, RabbitDeadQueueConstant.REDIRECT_QUEUE_KEY, null);}}

3、编写消息生产者DeadQueueSender

package com.lvgang.springbootrabbitmq.deadqueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.UUID;/*** @author lvgang*/
@Component
public class DeadQueueSender implements  RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback {private static Logger logger = LoggerFactory.getLogger(DeadQueueReceiver.class);@Autowiredprivate RabbitTemplate rabbitTemplate;public void send() {//设置回调对象this.rabbitTemplate.setConfirmCallback(this);this.rabbitTemplate.setReturnCallback(this);CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());String content = "DeadQueue= " + new Date() + ", content= " + UUID.randomUUID().toString();MessagePostProcessor messagePostProcessor = message -> {MessageProperties messageProperties = message.getMessageProperties();
//            设置编码messageProperties.setContentEncoding("utf-8");
//            设置过期时间10*1000毫秒messageProperties.setExpiration("10000");return message;};
//         向DL_QUEUE 发送消息  10*1000毫秒后过期 形成死信rabbitTemplate.convertAndSend(RabbitDeadQueueConstant.DL_EXCHANGE, RabbitDeadQueueConstant.DL_QUEUQ_KEY, content, messagePostProcessor, correlationData);logger.info("Send ok,"+new Date()+","+content);}/*** 消息回调确认方法* 如果消息没有到exchange,则confirm回调,ack=false* 如果消息到达exchange,则confirm回调,ack=true* @param*/@Overridepublic void confirm(CorrelationData correlationData, boolean isSendSuccess, String s) {//logger.info("confirm--message:回调消息ID为: " + correlationData.getId());if (isSendSuccess) {//logger.info("confirm--message:消息发送成功");} else {logger.info("confirm--message:消息发送失败" + s);}}/*** exchange到queue成功,则不回调return* exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {logger.info("return--message:" + new String(message.getBody()) + ",replyCode:" + replyCode+ ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);}
}

4、编写消息消费者  DeadQueueReceiver

package com.lvgang.springbootrabbitmq.deadqueue;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;
import java.util.Date;/*** @author lvgang*/
@Component
@RabbitListener(queues = RabbitDeadQueueConstant.REDIRECT_QUEUE)
public class DeadQueueReceiver {private static Logger logger = LoggerFactory.getLogger(DeadQueueReceiver.class);@RabbitHandlerpublic void process(String hello, Message message, Channel channel) {try {//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);logger.info("消息消费成功!");} catch (Exception e) {logger.error("消息消费失败:"+e.getMessage(),e);//丢弃这条消息//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);}logger.info("Receiver : " + hello +","+ new Date());}
}

二、测试结果

1、编写测试类TopicTests

package com.lvgang.springbootrabbitmq.deadqueue;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/*** @author lvgang*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class DeadQueueTests {@Autowiredprivate DeadQueueSender deadQueueSender;@Testpublic void hello() {int i=1;while (true) {try {if(i==1) {deadQueueSender.send();}i++;Thread.sleep(1000);} catch (Exception e) {;}}}
}

2、执行测试类,并查看结果

通过执行测试类,查看到了消息消费的情况,生产者共计生产了1个消息,被消费者消费了一次,但发送消息时间及实际消费时间差10秒钟。

转载于:https://my.oschina.net/sdlvzg/blog/3045834

SpringBoot RabbitMQ 集成 七 延迟队列相关推荐

  1. 第四十六章:SpringBoot RabbitMQ完成消息延迟消费

    在2018-3-1日SpringBoot官方发版了2.0.0.RELEASE最新版本,新版本完全基于Spring5.0来构建,JDK最低支持也从原来的1.6也改成了1.8,不再兼容1.8以下的版本,更 ...

  2. RabbitMQ如何实现延迟队列?

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  3. SpringBoot整合Redisson实现延迟队列

    SpringBoot整合Redisson实现延迟队列 技术选型 引入 Redisson 依赖 配置项 编写工具类 延迟队列执行器 业务消费类枚举 加载消费队列 消费者类 测试类 测试结果 技术选型 关 ...

  4. RabbitMQ如何实现延迟队列

    1.延迟队列 延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费.很可惜,在RabbitMQ中并未提供延迟队列功能,但是我们有其他的方式可以实现延迟队列,方法就是TTL+死信队列 ...

  5. Rabbitmq超级详细的笔记,包括安装,基本命令,rabbitmq的七种消息模式,以及死信队列,延迟队列,优先级队列和惰性队列的介绍

    RabbitMQ 文章目录 RabbitMQ 1 RabbitMQ介绍 1.1 基本介绍 1.2 RabbitMQ的安装 1.2.1 ubuntu20.04 安装rabbitmq 1.2.2 cent ...

  6. RabbitMQ 延迟队列详解

    一.延迟队列概念 延迟队列存储的对象是对应的延迟消息,所谓"延迟消息"是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费. 二.延 ...

  7. 【RabbitMQ】一文带你搞定RabbitMQ延迟队列

    本文口味:鱼香肉丝   预计阅读:10分钟 0|1一.说明 在上一篇中,介绍了RabbitMQ中的死信队列是什么,何时使用以及如何使用RabbitMQ的死信队列.相信通过上一篇的学习,对于死信队列已经 ...

  8. 面试官:RabbitMQ本身不支持延迟队列,那你给我实现一个?

    以下文章来源方志朋的博客,回复"666"获面试宝典 RabbitMQ本身没有延迟队列的支持,但是基于其本身的一些特性,可以做到类似延迟队列的效果:基于死信交换器+TTL. 以下介绍 ...

  9. rabbitmq延迟队列实现

    延迟队列 Rabbitmq并没有延迟队列 但是:死信队列+消息时间设置过期时间可以 达成我们想要的延迟队列效果 例如下单5分钟之内未支付就会取消订单,那么设置下单支付时间为5分钟后过期然后进入死信队列 ...

最新文章

  1. 手动添加Cookie
  2. Rails工作效率和Java运行平台
  3. 【Web安全】一款功能强大的Web身份认证测试框架
  4. 大数据时代,一名优秀的开发者应具备怎样的特质?
  5. win8: hello gril
  6. php5.*.* iis 安装
  7. HDU 1317 XYZZY(floyd+bellman_ford判环)
  8. Linux进程管理工具的使用
  9. linux下zookeeper启动命令,For Linux Zookeeper客户端命令行操作指令
  10. java中输出进程的映像名称,怎么修改tomcat进程的名称(windows)
  11. 华为服务器麒麟系统,麒麟服务器
  12. 敏捷开发(scrum)简介
  13. 苹果6s微信提示未连接服务器,苹果6s微信无法打开,一直显示正在载入怎么处理?...
  14. 如果我们遇上得州寒潮,会不会「悲剧」?
  15. 基于Python的飞机票销售系统的设计和实现
  16. HTML学习笔记及案例(第五周 第1次)
  17. Eclipse小技巧--修改@auther和去掉//TODO
  18. uniapp和vue课程表实现、会议预约实现[表格table相同内容行的合并]
  19. 如何将带Dxperience组件的Asp.net 2.0网站部署到服务器(转载)
  20. java实现数字黑洞

热门文章

  1. 【洛谷1361】 小M的作物(最小割)
  2. 11(maven+SSH)网上商城项目实战之Freemarker 页面静态化
  3. 进行SEPM的灾难恢复时导入数据库后,Symantec Endpoint Protection Manager(SEPM)无法登陆...
  4. 自己定义WinXP的时间校正服务器
  5. XXX管理平台系统——架构
  6. 计算机四级信息安全题,2014年计算机四级考试信息安全工程精选真题
  7. MySQL高级or索引失效情况
  8. 初识Docker-Docker的安装
  9. ConcurrentHashMap的源码分析-CounterCells解释
  10. Redis 为什么是单线程的?