在实际的业务中我们会遇见生产者产生的消息,不立即消费,而是延时一段时间在消费。RabbitMQ本身没有直接支持延迟队列功能,但是我们可以根据其特性Per-Queue Message TTL和 Dead Letter Exchanges实现延时队列。也可以通过改特性设置消息的优先级。

1.Per-Queue Message TTL
RabbitMQ可以针对消息和队列设置TTL(过期时间)。队列中的消息过期时间(Time To Live, TTL)有两种方法可以设置。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息进行单独设置,每条消息TTL可以不同。如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead message,消费者将无法再收到该消息。
2.Dead Letter Exchanges
当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange。消息变成Dead Letter一向有以下几种情况:
消息被拒绝(basic.reject or basic.nack)并且requeue=false
消息TTL过期
队列达到最大长度
实际上就是设置某个队列的属性,当这个队列中有Dead Letter时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange中去,进而被路由到另一个队列,publish可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0.0以前支持的immediate参数中的向publish确认的功能。

虽然 consumer 从来看不到过期的 message ,但是在过期 message 到达 queue 的头部时确实会被真正的丢弃(或者 dead-lettered )。当对每一个 queue 设置了 TTL 值时不会产生任何问题,因为过期的 message 总是会出现在 queue 的头部。当对每一条 message 设置了 TTL 时,过期的 message 可能会排队于未过期 message 的后面,直到这些消息被 consume 到或者过期了。在这种情况下,这些过期的 message 使用的资源将不会被释放,且会在 queue 统计信息中被计算进去(例如,queue 中存在的 message 的数量)。对于第一种设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去,而第二种方法里,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期时在即将投递到消费者之前判定的,为什么两者得处理方法不一致?因为第一种方法里,队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期消息即可,而第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息,势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期,再进行删除。

一、在队列上设置TTL

1.建立delay.exchange

这里Internal设置为NO,否则将无法接受dead letter,YES表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定。

2.建立延时队列(delay queue)

如上配置延时5min队列(x-message-ttl=300000)

x-max-length:最大积压的消息个数,可以根据自己的实际情况设置,超过限制消息不会丢失,会立即转向delay.exchange进行投递

x-dead-letter-exchange:设置为刚刚配置好的delay.exchange,消息过期后会通过delay.exchange进行投递

这里不需要配置"dead letter routing key"否则会覆盖掉消息发送时携带的routingkey,导致后面无法路由为刚才配置的delay.exchange

3.配置延时路由规则

需要延时的消息到exchange后先路由到指定的延时队列

1)创建delaysync.exchange通过Routing key将消息路由到延时队列

2.配置delay.exchange 将消息投递到正常的消费队列

配置完成。

下面使用代码测试一下:

生产者:

  1. package cn.slimsmart.study.rabbitmq.delayqueue.queue;
  2. import java.io.IOException;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. public class Producer {
  7. private static String queue_name = "test.queue";
  8. public static void main(String[] args) throws IOException {
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost("10.1.199.169");
  11. factory.setUsername("admin");
  12. factory.setPassword("123456");
  13. Connection connection = factory.newConnection();
  14. Channel channel = connection.createChannel();
  15. // 声明队列
  16. channel.queueDeclare(queue_name, true, false, false, null);
  17. String message = "hello world!" + System.currentTimeMillis();
  18. channel.basicPublish("delaysync.exchange", "deal.message", null, message.getBytes());
  19. System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());
  20. // 关闭频道和连接
  21. channel.close();
  22. connection.close();
  23. }
  24. }

消费者:

  1. package cn.slimsmart.study.rabbitmq.delayqueue.queue;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import com.rabbitmq.client.QueueingConsumer;
  6. public class Consumer {
  7. private static String queue_name = "test.queue";
  8. public static void main(String[] args) throws Exception {
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost("10.1.199.169");
  11. factory.setUsername("admin");
  12. factory.setPassword("123456");
  13. Connection connection = factory.newConnection();
  14. Channel channel = connection.createChannel();
  15. // 声明队列
  16. channel.queueDeclare(queue_name, true, false, false, null);
  17. QueueingConsumer consumer = new QueueingConsumer(channel);
  18. // 指定消费队列
  19. channel.basicConsume(queue_name, true, consumer);
  20. while (true) {
  21. // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
  22. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  23. String message = new String(delivery.getBody());
  24. System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());
  25. }
  26. }
  27. }

二、在消息上设置TTL

实现代码:

生产者:

  1. package cn.slimsmart.study.rabbitmq.delayqueue.message;
  2. import java.io.IOException;
  3. import java.util.HashMap;
  4. import com.rabbitmq.client.AMQP;
  5. import com.rabbitmq.client.Channel;
  6. import com.rabbitmq.client.Connection;
  7. import com.rabbitmq.client.ConnectionFactory;
  8. public class Producer {
  9. private static String queue_name = "message_ttl_queue";
  10. public static void main(String[] args) throws IOException {
  11. ConnectionFactory factory = new ConnectionFactory();
  12. factory.setHost("10.1.199.169");
  13. factory.setUsername("admin");
  14. factory.setPassword("123456");
  15. Connection connection = factory.newConnection();
  16. Channel channel = connection.createChannel();
  17. HashMap<String, Object> arguments = new HashMap<String, Object>();
  18. arguments.put("x-dead-letter-exchange", "amq.direct");
  19. arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");
  20. channel.queueDeclare("delay_queue", true, false, false, arguments);
  21. // 声明队列
  22. channel.queueDeclare(queue_name, true, false, false, null);
  23. // 绑定路由
  24. channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");
  25. String message = "hello world!" + System.currentTimeMillis();
  26. // 设置延时属性
  27. AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
  28. // 持久性 non-persistent (1) or persistent (2)
  29. AMQP.BasicProperties properties = builder.expiration("300000").deliveryMode(2).build();
  30. // routingKey =delay_queue 进行转发
  31. channel.basicPublish("", "delay_queue", properties, message.getBytes());
  32. System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());
  33. // 关闭频道和连接
  34. channel.close();
  35. connection.close();
  36. }
  37. }

消费者:

  1. package cn.slimsmart.study.rabbitmq.delayqueue.message;
  2. import java.util.HashMap;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import com.rabbitmq.client.QueueingConsumer;
  7. public class Consumer {
  8. private static String queue_name = "message_ttl_queue";
  9. public static void main(String[] args) throws Exception {
  10. ConnectionFactory factory = new ConnectionFactory();
  11. factory.setHost("10.1.199.169");
  12. factory.setUsername("admin");
  13. factory.setPassword("123456");
  14. Connection connection = factory.newConnection();
  15. Channel channel = connection.createChannel();
  16. HashMap<String, Object> arguments = new HashMap<String, Object>();
  17. arguments.put("x-dead-letter-exchange", "amq.direct");
  18. arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");
  19. channel.queueDeclare("delay_queue", true, false, false, arguments);
  20. // 声明队列
  21. channel.queueDeclare(queue_name, true, false, false, null);
  22. // 绑定路由
  23. channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");
  24. QueueingConsumer consumer = new QueueingConsumer(channel);
  25. // 指定消费队列
  26. channel.basicConsume(queue_name, true, consumer);
  27. while (true) {
  28. // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
  29. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  30. String message = new String(delivery.getBody());
  31. System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());
  32. }
  33. }
  34. }

spring-rabbit整合教程

maven依赖:

  1. <dependency>
  2. <groupId>org.springframework.amqp</groupId>
  3. <artifactId>spring-rabbit</artifactId>
  4. <version>1.4.6.RELEASE</version>
  5. </dependency>

spring配置文件(在文件头部引入rabbit的命名空间和约束文件):

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:aop="http://www.springframework.org/schema/aop"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xmlns:mvc="http://www.springframework.org/schema/mvc"
  6. xmlns:task="http://www.springframework.org/schema/task"
  7. xmlns:tx="http://www.springframework.org/schema/tx"
  8. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  9. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  10. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  11. http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
  12. http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
  13. http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
  14. http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
  15. http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
  16. http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">
  17. <!-- 定义Rabbit,指定连接工厂 -->
  18. <rabbit:connection-factory id="connectionFactory" host="你的rabbitMQ服务的ip" virtual-host="/vhost名称" username="用户名" password="密码" port="5672" />
  19. <!-- MQ的管理,包括队列、交换器等 -->
  20. <rabbit:admin connection-factory="connectionFactory"/>
  21. <!-- 定义Rabbit模板,指定连接工厂以及定义exchange -->
  22. <rabbit:template id="amqpTemplate" exchange="my_exchange" connection-factory="connectionFactory" />
  23. <!-- queue 队列声明 -->
  24. <!-- durable 是否持久化 ,exclusive 仅创建者可以使用的私有队列,断开后自动删除 ,auto-delete 当所有消费端连接断开后,是否自动删除队列 -->
  25. <rabbit:queue name="my_queue" durable="true" auto-delete="false" exclusive="false"/>
  26. <!-- 交换机定义 -->
  27. <!-- direct-exchange 模式:消息与一个特定的路由器完全匹配,才会转发; topic-exchange 模式:按规则转发消息,最灵活 -->
  28. <rabbit:topic-exchange name="my_exchange" durable="true" auto-delete="false">
  29. <rabbit:bindings>
  30. <!-- 设置消息Queue匹配的pattern (direct模式为key) -->
  31. <rabbit:binding queue="my_queue" pattern="my_patt"/>
  32. </rabbit:bindings>
  33. </rabbit:topic-exchange>
  34. <!-- 引入消费者 -->
  35. <bean id="rabbitmqService" class="com.group.service.RabbitmqService" />
  36. <!-- 配置监听 消费者 acknowledeg = manual,auto,none -->
  37. <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" >
  38. <!-- queues 监听队列,多个用逗号分隔; ref 监听器 -->
  39. <rabbit:listener queue-names="my_queue" ref="rabbitmqService" method="test"/>
  40. </rabbit:listener-container>
  41. </beans>

那么在项目中装配amqpTemplate中就可以发送消息了

java实现rabbitMQ延时队列详解以及spring-rabbit整合教程相关推荐

  1. 转:Java 7 种阻塞队列详解

    转自: Java 7 种阻塞队列详解 - 云+社区 - 腾讯云队列(Queue)是一种经常使用的集合.Queue 实际上是实现了一个先进先出(FIFO:First In First Out)的有序表. ...

  2. RabbitMQ 延迟队列详解

    一.延迟队列概念 延迟队列存储的对象是对应的延迟消息,所谓"延迟消息"是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费. 二.延 ...

  3. RabbitMQ 死信队列详解

    一.死信的概念 死信,顾名思义就是无法被消费的消息.一般来说,Producer 将消息投递到 Broker 或者直接到 Queue 里了,Consumer 从 Queue 取出消息进行消费,但某些时候 ...

  4. rabbitmq死信队列详解与使用

    关注架构师高级俱乐部 开启架构之路 不定期福利发放哦~ 什么是死信队列 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到b ...

  5. php循环取redis队列,详解Redis和队列

    下面由Redis教程栏目给大家详解Redis和队列,希望对需要的朋友有所帮助! 概要 Redis不仅可作为缓存服务器,还可用作消息队列.它的列表类型天生支持用作消息队列.如下图所示: 由于Redis的 ...

  6. MQ消息队列详解、四大MQ的优缺点分析

    MQ消息队列详解.四大MQ的优缺点分析 前言 面试题切入 面试官心理分析 面试题剖析 ①为什么要使用MQ 系统解耦 异步调用 流量削峰 消息队列的优缺点 四大主流MQ(kafka.ActiveMQ.R ...

  7. RabbitMQ基础知识详解

    RabbitMQ基础知识详解 2017年08月28日 20:42:57 dreamchasering 阅读数:41890 标签: RabbitMQ 什么是MQ? MQ全称为Message Queue, ...

  8. Java并发编程最佳实例详解系列

    Java并发编程最佳实例详解系列: Java并发编程(一)线程定义.状态和属性 Java并发编程(一)线程定义.状态和属性 线程是指程序在执行过程中,能够执行程序代码的一个执行单元.在java语言中, ...

  9. Java多线程之线程池详解

    Java多线程之线程池详解 目录: 线程池使用及优势 线程池3个常用方式 线程池7大参数深入介绍 线程池底层工作原理 1. 线程池使用及优势 线程池做的工作主要是控制运行的线程的数量,处理过程中将任务 ...

最新文章

  1. MAX487制作RS485总线接口模块
  2. eclipse自动关闭的原因
  3. Luogu P4707 重返现世 (拓展Min-Max容斥、DP)
  4. cocos2dx标准容器_Cocos2d-x3.0模版容器详解之三:cocos2d::Value
  5. textFiled输入字数的控制问题之—把带输入的拼音也判断了
  6. php封装公共方法,TP框架下封装公共函数详解
  7. 构造函数的五种继承方法
  8. binlog日志_mysql的binlog日志的自动定时清理
  9. ldaptemplate 分页_UI设计干货分享:设计语言 - 侧边导航栏/分页
  10. 物联网大数据平台具备哪些功能
  11. Egret入门学习日记 --- 第八篇(书中 2.0~2.6节 内容)
  12. k8s设计-多容器pod设计模式
  13. 2021年第四届“安洵杯”网络安全挑战赛Writeup
  14. linux查看当前账号权限,Linux账号权限管理
  15. tiny-emitter 源码解析
  16. Kafka监控eagle
  17. python实现小游戏-猜年龄
  18. Google Bigtable 中文版
  19. echarts-liquidfill 水滴图/水位图/水球 下载地址
  20. Nvidia Maxine 精讲(一)AR-SDK安装使用——BodyTrack 【非官方全网首发】

热门文章

  1. Python中使用中文
  2. Qt 翻译文件的加载
  3. 软件质量包括哪些特性?软件质量保证的主要任务是什么?
  4. 1003. 检查替换后的词是否有效
  5. C# string转double,double转string
  6. sqlmap使用方法
  7. 关于事件委托的整理 ,另附bind,live,delegate,on区别
  8. STM32之JScope调试
  9. Cortex-M3复位序列
  10. 这个没去大厂的程序猿,用 4 年时间证明自己做对了!