以下文章来源方志朋的博客,回复”666“获面试宝典

场景

开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期、订单定时关闭、微信支付2小时未支付关闭订单等等,都需要用到定时任务,但是定时任务本身有一个问题,一般来说我们都是通过定时轮询查询数据库来判断是否有任务需要执行。

也就是说不管怎么样,我们需要先查询数据库,而且有些任务对时间准确要求比较高的,需要每秒查询一次,对于系统小倒是无所谓,如果系统本身就大而且数据也多的情况下,这就不大现实了。所以需要其他方式的,当然实现的方式有多种多样的,比如Redis实现定时队列、基于优先级队列的JDK延迟队列、时间轮等。

因为我们项目中本身就使用到了Rabbitmq,所以基于方便开发和维护的原则,我们使用了Rabbitmq延迟队列来实现定时任务。

Rabbitmq延迟队列

Rabbitmq本身是没有延迟队列的,只能通过Rabbitmq本身队列的特性来实现,想要Rabbitmq实现延迟队列,需要使用Rabbitmq的死信交换机(Exchange)和消息的存活时间TTL(Time To Live)

死信交换机

一个消息在满足如下条件下,会进死信交换机,记住这里是交换机而不是队列,一个交换机可以对应很多队列。

  1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。

  2. 上面的消息的TTL到了,消息过期了。

  3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

死信交换机就是普通的交换机,只是因为我们把过期的消息扔进去,所以叫死信交换机,并不是说死信交换机是某种特定的交换机

消息TTL(消息存活时间)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。

如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);

可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。只是expiration字段是字符串参数,所以要写个int类型的字符串:

当上面的消息扔到队列中后,过了60秒,如果没有被消费,它就死了。不会被消费者消费到。这个消息后面的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去

处理流程图

图片

创建交换机(Exchanges)和队列(Queues)

创建死信交换机

图片

如图所示,就是创建一个普通的交换机,这里为了方便区分,把交换机的名字取为:delay

创建自动过期消息队列

这个队列的主要作用是让消息定时过期的,比如我们需要2小时候关闭订单,我们就需要把消息放进这个队列里面,把消息过期时间设置为2小时

图片

创建一个一个名为delay_queue1的自动过期的队列,当然图片上面的参数并不会让消息自动过期,因为我们并没有设置x-message-ttl参数,如果整个队列的消息有消息都是相同的,可以设置,这里为了灵活,所以并没有设置。

另外两个参数x-dead-letter-exchange代表消息过期后,消息要进入的交换机,这里配置的是delay,也就是死信交换机,x-dead-letter-routing-key是配置消息过期后,进入死信交换机的routing-key,跟发送消息的routing-key一个道理,根据这个key将消息放入不同的队列

创建消息处理队列

这个队列才是真正处理消息的队列,所有进入这个队列的消息都会被处理

图片

消息队列的名字为delay_queue2

消息队列绑定到交换机

进入交换机详情页面,将创建的2个队列(delayqueue1和delayqueue2)绑定到交换机上面

图片

自动过期消息队列的routing key 设置为delay

绑定delayqueue2

图片

delayqueue2 的key要设置为创建自动过期的队列的x-dead-letter-routing-key参数,这样当消息过期的时候就可以自动把消息放入delay_queue2这个队列中了

绑定后的管理页面如下图:

图片

当然这个绑定也可以使用代码来实现,只是为了直观表现,所以本文使用的管理平台来操作

发送消息

String msg = "hello word";
MessageProperties messageProperties = new MessageProperties();messageProperties.setExpiration("6000");messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());Message message = new Message(msg.getBytes(), messageProperties);rabbitTemplate.convertAndSend("delay", "delay",message);

主要的代码就是

messageProperties.setExpiration("6000");

设置了让消息6秒后过期

注意:因为要让消息自动过期,所以一定不能设置delay_queue1的监听,不能让这个队列里面的消息被接受到,否则消息一旦被消费,就不存在过期了

接收消息

接收消息配置好delay_queue2的监听就好了

package wang.raye.rabbitmq.demo1;import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DelayQueue {/** 消息交换机的名字*/public static final String EXCHANGE = "delay";/** 队列key1*/public static final String ROUTINGKEY1 = "delay";/** 队列key2*/public static final String ROUTINGKEY2 = "delay_key";/*** 配置链接信息* @return*/@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory("120.76.237.8",5672);connectionFactory.setUsername("kberp");connectionFactory.setPassword("kberp");connectionFactory.setVirtualHost("/");connectionFactory.setPublisherConfirms(true); // 必须要设置return connectionFactory;}/**  * 配置消息交换机* 针对消费者配置  FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念  HeadersExchange :通过添加属性key-value匹配  DirectExchange:按照routingkey分发到指定队列  TopicExchange:多关键字匹配  */  @Bean  public DirectExchange defaultExchange() {  return new DirectExchange(EXCHANGE, true, false);} /*** 配置消息队列2* 针对消费者配置  * @return*/@Beanpublic Queue queue() {  return new Queue("delay_queue2", true); //队列持久  }/*** 将消息队列2与交换机绑定* 针对消费者配置  * @return*/@Bean  @Autowiredpublic Binding binding() {  return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);  } /*** 接受消息的监听,这个监听会接受消息队列1的消息* 针对消费者配置  * @return*/@Bean  @Autowiredpublic SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {  SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  container.setQueues(queue());  container.setExposeListenerChannel(true);  container.setMaxConcurrentConsumers(1);  container.setConcurrentConsumers(1);  container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认  container.setMessageListener(new ChannelAwareMessageListener() {public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {byte[] body = message.getBody();  System.out.println("delay_queue2 收到消息 : " + new String(body));  channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费  }  });  return container;  }  }

在消息监听中处理需要定时处理的任务就好了,因为Rabbitmq能发送消息,所以可以把任务特征码发过来,比如关闭订单就把订单id发过来,这样就避免了需要查询一下那些订单需要关闭而加重MySQL负担了,毕竟一旦订单量大的话,查询本身也是一件很费IO的事情

总结

基于Rabbitmq实现定时任务,就是将消息设置一个过期时间,放入一个没有读取的队列中,让消息过期后自动转入另外一个队列中,监控这个队列消息的监听处来处理定时任务具体的操作。另外,欢迎关注公众号Java笔记虾,后台回复“后端面试”,送你一份面试题宝典!

(感谢阅读,希望对你所有帮助)

来源:blog.csdn.net/wantnrun/article/details/80401641

热门内容:世界排名第一的内存数据库,300分钟就能撸出来!
硬核分享,靠这个技术过了阿里二面!
5种分布式事务最终一致性解决方案,一次性说清了!
拜访了这位小哥的GitHub后,我失眠了!
我们已经不用AOP做操作日志了!
强烈不建议你用 a.equals(b) 判断对象相等!最近面试BAT,整理一份面试资料《Java面试BAT通关手册》,覆盖了Java核心技术、JVM、Java并发、SSM、微服务、数据库、数据结构等等。获取方式:点“在看”,关注公众号并回复 666 领取,更多内容陆续奉上。

明天见(。・ω・。)ノ♡

RabbitMQ 延迟队列实现定时任务的正确姿势,你学会了么?相关推荐

  1. rabbitmq 延迟队列_Delayed Message 插件实现 RabbitMQ 延迟队列

    延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行. DLX + TTL 方式存在的时序问题 对于延迟队列不管是 AMQP 协议或者 RabbitMQ 本身是不支持的,之前有 ...

  2. Delayed Message 插件实现 RabbitMQ 延迟队列

    延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行. DLX + TTL 方式存在的时序问题 对于延迟队列不管是 AMQP 协议或者 RabbitMQ 本身是不支持的,之前有 ...

  3. RabbitMQ 延迟队列-对于入门来说可以快速上手

    RabbitMQ 延迟队列-非常非常实用 RabbitMQ 延迟队列-非常非常实用 一.使用场景 二.消息延迟推送的实现 三.项目具体实现 RabbitMQ 延迟队列-非常非常实用 一.使用场景 ​ ...

  4. 谷粒商城笔记+踩坑(22)——库存自动解锁。RabbitMQ延迟队列

    导航: 谷粒商城笔记+踩坑汇总篇 目录 1 业务流程,订单失败后自动回滚解锁库存 可靠消息+最终一致性方案 2[仓库服务]RabbitMQ环境准备 2.1 导入依赖 2.2 yml配置RabbitMQ ...

  5. 【RabbitMQ】一文带你搞定RabbitMQ延迟队列

    本文口味:鱼香肉丝   预计阅读:10分钟 0|1一.说明 在上一篇中,介绍了RabbitMQ中的死信队列是什么,何时使用以及如何使用RabbitMQ的死信队列.相信通过上一篇的学习,对于死信队列已经 ...

  6. RabbitMQ 延迟队列详解

    一.延迟队列概念 延迟队列存储的对象是对应的延迟消息,所谓"延迟消息"是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费. 二.延 ...

  7. C#实现rabbitmq 延迟队列功能

    最近在研究rabbitmq,项目中有这样一个场景:在用户要支付订单的时候,如果超过30分钟未支付,会把订单关掉.当然我们可以做一个定时任务,每个一段时间来扫描未支付的订单,如果该订单超过支付时间就关闭 ...

  8. Spring boot + RabbitMQ延迟队列实战

    一.背景 延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费. 那么,为什么需要延迟消费呢?我们来看以下的场景: 订单业务: 在电商/点餐中,都有下单后 30 ...

  9. RabbitMQ 延迟队列,太实用了!

    点击关注公众号,Java干货及时送达 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付 ...

最新文章

  1. 从零开始学习「张氏相机标定法」
  2. 深聪智能朱澄宇:自研 AI 芯片不是赶时髦 | CCF-GAIR 2019
  3. am5728 是否支持aarch64_am5728开启uart0接口通讯
  4. 【数据结构与算法】之深入解析“下一个更大元素II”的求解思路与算法示例
  5. 使用myeclipse建立maven项目(重要)
  6. C语言多文件编译链接为1个可执行文件的简单原理
  7. vmware虚拟机中ubuntu上网问题
  8. Codeforces Round #500 (Div. 2) C.Photo of The Sky
  9. Docker学习文档之二 搭建环境-Windows环境
  10. 基于HTTP的QQ协议(转)
  11. c 获取char*的长度_C/C++编程笔记:C语言字符串比较函数,超详细,值得收藏!...
  12. beego mysql增删改查_5-BeegoORM增删改查-Go语言中文社区
  13. pdfjs实现pdf预览
  14. 机器视觉——光源选型
  15. 【Leetcode刷题】:Python:347. 前 K 个高频元素
  16. html把图片色调一致,ps怎样让两张不同的图片色调统一
  17. 语言模型(五)—— Seq2Seq、Attention、Transformer学习笔记
  18. CentOS7 NVIDIA显卡驱动安装教程(亲测有效)
  19. 安卓数据恢复_精心整理20款数据恢复软件(含电脑端,安卓,苹果)
  20. 关于抽象类,接口的题目

热门文章

  1. 论文《一种金融市场预测的深度学习模型:FEPA》(3)--EMD+PCA
  2. facebook maskrcnn 安装笔记
  3. ehcache导致Tomcat重启出错
  4. Ceilometer Polling Performance Improvement
  5. Debug模式下加载文件,运行程序异常的慢
  6. strtok和strtok_r
  7. SET QUOTED_IDENTIFIER OFF语句的作用
  8. 抽象工厂与工厂模式例子
  9. 【青少年编程】【蓝桥杯】水仙花数
  10. 谢文睿:西瓜书 + 南瓜书 吃瓜系列 4. 二分类线性判别分析