1.延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。很可惜,在RabbitMQ中并未提供延迟队列功能,但是我们有其他的方式可以实现延迟队列,方法就是TTL+死信队列,组合实现延迟队列的效果。

2.什么是TTL

TTL,全称Time To Live,消息过期时间设置。消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。

队列过期后,会将队列所有消息全部移除。 一个队列中某一个消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉),如果不在队列顶端,那么是无效的,过期时间有队列的过期时间判定。

如果队列设置了,消息也设置了,那么会取时间短的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。
我门一般通过设置消息的x-message-ttl属性来设置时间

3.死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

消息成为死信的三种情况:

1. 队列消息长度到达限制;

2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队 列,requeue=false;

3. 原队列存在消息过期设置,消息到达超时时间未被消费;

队列绑定死信交换机,给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key,就能成功绑定了

Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

4.实现延迟队列

延迟任务通过消息的TTL和Dead Letter Exchange来实现。我们需要建立2个队列,一个用于发送消息,一个用于消息过期后的转发目标队列。

这里模拟一个订单支付的功能,下单后30分钟如果没有付款,那么久自动取消订单并且回滚库存,使用的是spring框架,

以下是producer的spring配置文件中对交换机和队列的配置

   <!--
延迟队列:
1. 定义正常交换机(order_exchange)和队列(order_queue)
2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
3. 绑定,设置正常队列过期时间为30分钟
--><!-- 1. 定义正常交换机(order_exchange)和队列(order_queue)--><rabbit:queue id="order_queue" name="order_queue"><!-- 3. 绑定,设置正常队列过期时间为30分钟--><rabbit:queue-arguments><entry key="x-dead-letter-exchange" value="order_exchange_dlx" /><entry key="x-dead-letter-routing-key" value="dlx.order.cancel" /><entry key="x-message-ttl" value="20000" value-type="java.lang.Integer"/></rabbit:queue-arguments></rabbit:queue><rabbit:topic-exchange name="order_exchange"><rabbit:bindings><rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange><!-- 2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)--><rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue><rabbit:topic-exchange name="order_exchange_dlx"><rabbit:bindings><rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange>

为了测试方便,我就把延迟的时间设置为20s,

测试代码:

 @Testpublic void testDelay() throws InterruptedException {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");//1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息rabbitTemplate.convertAndSend("order_exchange", "order.msg", "订单信息: id=1,time=" + sdf.format(new Date()));}

consumer端配置

 <context:component-scan base-package="com.kkb.listener"/><!--加载配置文件--><context:property-placeholderlocation="classpath:rabbitmq.properties"/><!-- 定义rabbitmq connectionFactory --><rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.virtual-host}"/><rabbit:listener-container connection-factory="connectionFactory" auto-declare="true" acknowledge="manual" prefetch="1"><rabbit:listener ref="orderListener" queue-names="order_queue_dlx"/></rabbit:listener-container>

代码

package com.kkb.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;@Component
public class OrderListener implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1.接收转换消息SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");System.out.println(new String(message.getBody()));System.out.println("处理时间:"+sdf.format(new Date()));//2. 处理业务逻辑System.out.println("处理业务逻辑...");System.out.println("根据订单id查询其状态...");System.out.println("判断状态是否为支付成功");System.out.println("取消订单,回滚库存....");//3. 手动签收channel.basicAck(deliveryTag, true);} catch (Exception e) {//e.printStackTrace();System.out.println("出现异常,拒绝接受");//4.拒绝签收,不重回队列 requeue=falsechannel.basicNack(deliveryTag, true, false);}}
}

分别启动消费者端consumer,然后生产者producer发送一条消息过去

运行结果如下:

从图中可以看到,两个时间间隔正好是20s左右,由此实现了延迟队列的功能

RabbitMQ如何实现延迟队列相关推荐

  1. RabbitMQ如何实现延迟队列?

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  2. SpringBoot RabbitMQ 集成 七 延迟队列

    为什么80%的码农都做不了架构师?>>>    何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列.而一般的队列,消息一旦入队了之后就会被消费者马上消费. 延迟 ...

  3. 面试官:RabbitMQ本身不支持延迟队列,那你给我实现一个?

    以下文章来源方志朋的博客,回复"666"获面试宝典 RabbitMQ本身没有延迟队列的支持,但是基于其本身的一些特性,可以做到类似延迟队列的效果:基于死信交换器+TTL. 以下介绍 ...

  4. 【RabbitMQ】一文带你搞定RabbitMQ延迟队列

    本文口味:鱼香肉丝   预计阅读:10分钟 0|1一.说明 在上一篇中,介绍了RabbitMQ中的死信队列是什么,何时使用以及如何使用RabbitMQ的死信队列.相信通过上一篇的学习,对于死信队列已经 ...

  5. rabbitmq延迟队列实现

    延迟队列 Rabbitmq并没有延迟队列 但是:死信队列+消息时间设置过期时间可以 达成我们想要的延迟队列效果 例如下单5分钟之内未支付就会取消订单,那么设置下单支付时间为5分钟后过期然后进入死信队列 ...

  6. RabbitMq(五) -- 死信队列和延迟队列

    1. 死信 1.1 死信的概念 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue ...

  7. RabbitMQ 延迟队列详解

    一.延迟队列概念 延迟队列存储的对象是对应的延迟消息,所谓"延迟消息"是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费. 二.延 ...

  8. rabbitMq实现延迟队列

    文章目录 业务场景: 1 安装rabbitMq 2 添加maven依赖 3 在application.properties配置 4 具体的实现 4.1 Dead Letter Exchanges 4. ...

  9. Rabbitmq超级详细的笔记,包括安装,基本命令,rabbitmq的七种消息模式,以及死信队列,延迟队列,优先级队列和惰性队列的介绍

    RabbitMQ 文章目录 RabbitMQ 1 RabbitMQ介绍 1.1 基本介绍 1.2 RabbitMQ的安装 1.2.1 ubuntu20.04 安装rabbitmq 1.2.2 cent ...

最新文章

  1. CSS与HTML结合
  2. 【Pthon入门学习】多级菜单小例子
  3. Go语言 gRPC 实践(一)
  4. mysql索引有字符集_07. 类型、字符集、引擎和索引
  5. tree(nyoj)
  6. 数据切分——MySql表分区概述
  7. 美国甲骨文的CEO拉里.埃利森耶鲁大学演讲(附英文)
  8. 根据MAC地质反查IP工具-LanHelper
  9. 精选目标检测3——yolo1、yolo2、yolo3和SSD的网络结构汇总对比
  10. 详解 git cherry-pick用法
  11. php自动生成phpunit,PHP单元测试利器 PHPUNIT深入用法(三)
  12. 字体粗细怎么设置 html,html中字体的粗细怎么设置?字体大小是font-size,那粗细怎么设置的?...
  13. 怎样和控制欲很强的家人相处-不受他人影响
  14. 深入浅出matplotlib(10):构造圆弧示意图
  15. C语言从入门到精通——进阶6 C语言文件操作
  16. PAT甲级 1081 Rational Sum 分数相加的和
  17. 返回一个子字符串在主字符串出现的次数
  18. excel量化交易接口系统程序怎样进行数据预处理?
  19. 城堡战法--城堡分类、指标
  20. 查找算法之三:斐波那契查找(黄金分割法)

热门文章

  1. Hive分桶表创建clustered by()
  2. 2020.04.07网易笔试
  3. 理解Dilation convolution
  4. IE浏览器缓存问题解决方法整理
  5. STM32F103ZET6的时钟系统RCC配置
  6. Transactional注解详解
  7. Jmeter-多用户并发文件上传
  8. [JavaWeb]—前端篇
  9. 淘宝:皇冠店导航 省心省时又省钱了
  10. “玄学问题”解决汇总