http://blog.csdn.net/zhu_tianwei/article/details/53563311

在实际的业务中我们会遇见生产者产生的消息,不立即消费,而是延时一段时间在消费。RabbitMQ本身没有直接支持延迟队列功能,但是我们可以根据其特性Per-Queue Message TTL和 Dead Letter Exchanges实现延时队列。也可以通过改特性设置消息的优先级。

1.Per-Queue Message TTL
RabbitMQ可以针对消息和队列设置TTL(过期时间)。队列中的消息过期时间(Time To Live, TTL)有两种方法可以设置。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息进行单独设置,每条消息TTL可以不同。如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead message,消费者将无法再收到该消息。
2.Dead Letter Exchanges
当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange。消息变成Dead Letter一向有以下几种情况:
消息被拒绝(basic.reject or basic.nack)并且requeue=false
消息TTL过期
队列达到最大长度
实际上就是设置某个队列的属性,当这个队列中有Dead Letter时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange中去,进而被路由到另一个队列,publish可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0.0以前支持的immediate参数中的向publish确认的功能。

一、在队列上设置TTL

1.建立delay.exchange

这里Internal设置为NO,否则将无法接受dead letter,YES表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定。

2.建立延时队列(delay queue)

如上配置延时5min队列(x-message-ttl=300000)

x-max-length:最大积压的消息个数,可以根据自己的实际情况设置,超过限制消息不会丢失,会立即转向delay.exchange进行投递

x-dead-letter-exchange:设置为刚刚配置好的delay.exchange,消息过期后会通过delay.exchange进行投递

这里不需要配置"dead letter routing key"否则会覆盖掉消息发送时携带的routingkey,导致后面无法路由为刚才配置的delay.exchange

3.配置延时路由规则

需要延时的消息到exchange后先路由到指定的延时队列

1)创建delaysync.exchange通过Routing key将消息路由到延时队列

2.配置delay.exchange 将消息投递到正常的消费队列

配置完成。

下面使用代码测试一下:

生产者:

[java] view plaincopy print?
  1. package cn.slimsmart.study.rabbitmq.delayqueue.queue;
  2. import java.io.IOException;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. public class Producer {
  7. private static String queue_name = "test.queue";
  8. public static void main(String[] args) throws IOException {
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost("10.1.199.169");
  11. factory.setUsername("admin");
  12. factory.setPassword("123456");
  13. Connection connection = factory.newConnection();
  14. Channel channel = connection.createChannel();
  15. // 声明队列
  16. channel.queueDeclare(queue_name, true, false, false, null);
  17. String message = "hello world!" + System.currentTimeMillis();
  18. channel.basicPublish("delaysync.exchange", "deal.message", null, message.getBytes());
  19. System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());
  20. // 关闭频道和连接
  21. channel.close();
  22. connection.close();
  23. }
  24. }

消费者:

[java] view plaincopy print?
  1. package cn.slimsmart.study.rabbitmq.delayqueue.queue;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import com.rabbitmq.client.QueueingConsumer;
  6. public class Consumer {
  7. private static String queue_name = "test.queue";
  8. public static void main(String[] args) throws Exception {
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost("10.1.199.169");
  11. factory.setUsername("admin");
  12. factory.setPassword("123456");
  13. Connection connection = factory.newConnection();
  14. Channel channel = connection.createChannel();
  15. // 声明队列
  16. channel.queueDeclare(queue_name, true, false, false, null);
  17. QueueingConsumer consumer = new QueueingConsumer(channel);
  18. // 指定消费队列
  19. channel.basicConsume(queue_name, true, consumer);
  20. while (true) {
  21. // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
  22. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  23. String message = new String(delivery.getBody());
  24. System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());
  25. }
  26. }
  27. }

二、在消息上设置TTL

实现代码:

生产者:

[java] view plaincopy print?
  1. package cn.slimsmart.study.rabbitmq.delayqueue.message;
  2. import java.io.IOException;
  3. import java.util.HashMap;
  4. import com.rabbitmq.client.AMQP;
  5. import com.rabbitmq.client.Channel;
  6. import com.rabbitmq.client.Connection;
  7. import com.rabbitmq.client.ConnectionFactory;
  8. public class Producer {
  9. private static String queue_name = "message_ttl_queue";
  10. public static void main(String[] args) throws IOException {
  11. ConnectionFactory factory = new ConnectionFactory();
  12. factory.setHost("10.1.199.169");
  13. factory.setUsername("admin");
  14. factory.setPassword("123456");
  15. Connection connection = factory.newConnection();
  16. Channel channel = connection.createChannel();
  17. HashMap<String, Object> arguments = new HashMap<String, Object>();
  18. arguments.put("x-dead-letter-exchange", "amq.direct");
  19. arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");
  20. channel.queueDeclare("delay_queue", true, false, false, arguments);
  21. // 声明队列
  22. channel.queueDeclare(queue_name, true, false, false, null);
  23. // 绑定路由
  24. channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");
  25. String message = "hello world!" + System.currentTimeMillis();
  26. // 设置延时属性
  27. AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
  28. // 持久性 non-persistent (1) or persistent (2)
  29. AMQP.BasicProperties properties = builder.expiration("300000").deliveryMode(2).build();
  30. // routingKey =delay_queue 进行转发
  31. channel.basicPublish("", "delay_queue", properties, message.getBytes());
  32. System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());
  33. // 关闭频道和连接
  34. channel.close();
  35. connection.close();
  36. }
  37. }

消费者:

[java] view plaincopy print?
  1. package cn.slimsmart.study.rabbitmq.delayqueue.message;
  2. import java.util.HashMap;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import com.rabbitmq.client.QueueingConsumer;
  7. public class Consumer {
  8. private static String queue_name = "message_ttl_queue";
  9. public static void main(String[] args) throws Exception {
  10. ConnectionFactory factory = new ConnectionFactory();
  11. factory.setHost("10.1.199.169");
  12. factory.setUsername("admin");
  13. factory.setPassword("123456");
  14. Connection connection = factory.newConnection();
  15. Channel channel = connection.createChannel();
  16. HashMap<String, Object> arguments = new HashMap<String, Object>();
  17. arguments.put("x-dead-letter-exchange", "amq.direct");
  18. arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");
  19. channel.queueDeclare("delay_queue", true, false, false, arguments);
  20. // 声明队列
  21. channel.queueDeclare(queue_name, true, false, false, null);
  22. // 绑定路由
  23. channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");
  24. QueueingConsumer consumer = new QueueingConsumer(channel);
  25. // 指定消费队列
  26. channel.basicConsume(queue_name, true, consumer);
  27. while (true) {
  28. // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
  29. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  30. String message = new String(delivery.getBody());
  31. System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());
  32. }
  33. }
  34. }

查看资料:

http://www.rabbitmq.com/ttl.html 
http://www.rabbitmq.com/dlx.html 
http://www.rabbitmq.com/maxlength.html
https://www.cloudamqp.com/docs/delayed-messages.html

转载于:https://www.cnblogs.com/telwanggs/p/7124687.html

(转) RabbitMQ学习之延时队列相关推荐

  1. RabbitMQ介绍与延时队列

    RabbitMQ特性 消息可靠性:典型的生产者-消费者模型,发送端有消息发送确认机制,服务端有消息持久化方案,消费端有消息Ack机制 灵活的消息路由分发:多种多样的交换机 多语言客户端开发AMQP:只 ...

  2. Docker安装RabbitMQ并安装延时队列插件

    一.RabbitMQ简介 RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消 ...

  3. 【SpringBoot】43、SpringBoot中整合RabbitMQ实现延时队列(延时插件篇)

    死信队列实现篇,参考文章:[SpringBoot]60.SpringBoot中整合RabbitMQ实现延时队列(死信队列篇) 一.介绍 1.什么是延时队列? 延时队列即就是放置在该队列里面的消息是不需 ...

  4. Rabbitmq学习笔记(尚硅谷2021)

    Rabbitmq学习笔记 (尚硅谷) 1.MQ 的概念 1.1 什么是 MQ? 1.2 为什么要用 MQ? 削峰 解耦 异步 1.3 MQ 的分类 ActiveMQ Kafka RocketMQ Ra ...

  5. RabbitMQ 学习知识点总结

    一个简洁的博客网站:http://lss-coding.top,欢迎大家来访 学习娱乐导航页:http://miss123.top/ 1. 消息队列简介 1.1 什么是 MQ MQ(message q ...

  6. Rabbitmq学习笔记教程-尚硅谷

    Rabbitmq学习笔记 (尚硅谷) 尚硅谷 rabbitmq 教程 1.MQ 的概念 1.1 什么是 MQ? 存放消息的队列,互联网架构中常见的一种服务与服务之间通信的方式. 1.2 为什么要用 M ...

  7. RabbitMQ学习【尚硅谷】

    消息重新入队 如果消费者由于某些原因失去连接(如通道关闭 | tcp连接断掉)导致消息未发送ACK确认,==RabbitMQ了解到消息未完全处理,会将其重新排入到队列中. 消息手动应答代码 publi ...

  8. 延时队列的几种实现方式

    延时队列的几种实现方式 何为延迟队列? 顾名思义,首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费. 延时队列能做什么? 延时队列多用于需要 ...

  9. RabbitMQ通过TTL和DLX实现延时队列

    RabbitMQ实现延时队列 一.介绍 1.TTL 如何设置TTL(2种方式): 2.Dead Letter Exchanges 二.实现延时队列的思路 三.SpringBoot+RabbitMQ实现 ...

最新文章

  1. Java项目:垃圾分类查询管理系统(java+SSM+jsp+MySQL+bootstrap)
  2. [微信小程序]滚动选择器
  3. LunarPages空间500错误原因及解决办法
  4. window.onload,body onload=function(), document.onreadystatechange, httpRequest.onreadystatechang
  5. 六自由度机器人逆向运动学_【课程笔记】Notes for Robotics/机器人学 (Part1)
  6. 一个flash前后台开源框架的的站点
  7. 在项目中配置PageHelper插件时遇到类型转换异常
  8. Android/Linux boot time分析优化
  9. 一段平平无奇的秋招经历
  10. 有需要【JavaScript权威指南第七版、JavaScript高级程序设计第四版】的可以私信我哈
  11. 二维码生成(如何实现扫描二维码,实现网址自动跳转?):扫码直接进入网页,直接进入网址页面
  12. Delphi7 请求webservice 方法。
  13. linux下如何实现pgadmin备份,linux下pgAdmin4安装
  14. 百度下拉词目前用软件工具可以刷出来吗?
  15. 安装Windows系统提示Windows无法安装到这个磁盘,选中的磁盘具有MBR分区表。在EFI系统上,Windows只能安装到GPT分区
  16. 计算机人类的三大科学思维,什么是科学思维:科学思维可以分为理论、实验、计算思维...
  17. 计算机二级word奇偶页眉页脚,Word中设置奇偶页不同的页眉和页码的操作方法
  18. python修改文件夹下文件夹的名字
  19. java 聊天室 私聊_Java WebSocket实现网络聊天室(群聊+私聊)
  20. 计算机监控系统sacad,太阳能热泵多功能复合机(sahpm)计算机监控系统实现方法研究-机械电子工程专业论文.docx...

热门文章

  1. (32)FPGA米勒型状态机设计(三段式)(第7天)
  2. (14)Verilog数据类型-基本语法(二)(第3天)
  3. (26)System Verilog范围随机函数约束类内变量
  4. (47)FPGA面试技能提升篇(Aurora协议/接口)
  5. C++11六大函数(构造函数,移动构造函数,移动赋值操作符,复制构造函数,赋值操作符,析构函数)
  6. 8003.ros2创建win10工程
  7. 1004.串口收发数据集成bug
  8. 学历史能学计算机吗,历史专业学计算机好吗
  9. 埋点 神策小程序_第9讲. 神策数据获3000万美元 C+ 轮融资,A、B、C 轮资方全部跟投!...
  10. 百度android定位 602 key mcode不匹配,我的Android进阶之旅------百度地图学习:BDLocation.getLocType ( )值分析...