在开发中经常会遇到延时任务的需求,例如在12306购买车票,若生成订单30分钟未支付则自动取消;还有在线商城完成订单后48小时不评价 ,自动5星好评。像这类在某事件触发后一段时间内执行的需求任务我们称之为 延时任务。

那么如何实现延迟任务呢?

第一反应是利用cron方案来实现:

启动一个cron定时任务,每隔一段时间执行一次,比如30分钟,找到那些超时的数据,直接更新状态,或者拿出来执行一些操作。如果数据量比较大,需要分页查询,分页update,这将是一个for循环更新操作。

cron方案是很常见的一种方案,但是常见的不一定是最好的,主要有以下几个问题:

  • 当数据量大的时候轮询效率低;
  • 时效性不够好,如果每小时轮询一次,最差的情况时间误差会达到1小时;
  • 如果通过增加cron轮询频率来减少时间误差,则会出现轮询低效和重复计算的问题;

既然cron方案不是很理想,那就请出我们今天的主角,使用RocketMQ的延时消息解决。在创建订单的时候发送一条延时消息到RocketMQ,30分钟后消费者消费消息去检查订单的状态,如果发现订单未支付则取消订单释放库存。

实现

RocketMQ延迟队列的核心思路是:所有的延迟消息由producer发出之后,都会存放到同一个topic(SCHEDULE_TOPIC_XXXX)下,不同的延迟级别会对应不同的队列序号,当延迟时间到之后,由定时线程读取转换为普通的消息存的真实指定的topic下,此时对于consumer端此消息才可见,从而被consumer消费。

注意: RocketMQ不支持任意时间的延时,只支持以下几个固定的延时等级

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

下面我们结合SprintBoot利用RocketMQ发送延时消息

  • 引入RocketMQ组件
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId></dependency>
  • 增加RocketMQ的配置

rocketmq:

name-server: 172.31.0.44:9876

producer:

group: delay-group

  • 编写生产者
@Component
@Slf4j
public class DelayProduce {   @Autowired    private RocketMQTemplate rocketMQTemplatet;  public void sendDelayMessage(String topic,String message,int delayLevel{       SendResult sendResult = rocketMQTemplatet.syncSend(topic, MessageBuilder.withPayload(message).build(), 2000, delayLevel);        log.info("sendtime is {}", DateTimeFormatter.ofPattern("yyyy年MM月dd 日 HH:mm:ss").format(LocalDateTime.now()));        log.info("sendResult is{}",sendResult);
}
}
  • 编写消费者
@Slf4j@Component@RocketMQMessageListener(        topic = "delay-topic",        consumerGroup = "delay-group")public class DelayConsumer implements RocketMQListener<String> {    @Override    public void onMessage(String message) {        log.info("received message time is {}", DateTimeFormatter.ofPattern("yyyy年MM月dd日 HH:mm:ss").format(LocalDateTime.now()));        log.info("received message is {}",message);    }}
  • 测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayProduceTest {
@Autowired
private DelayProduce delayProduce;
@Test
public void sendDelayMessage() {
delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知录",5);
}}

这里delayLevel设置成5,对应RocketMQ的延时等级就是1分钟后投递消息。

  • 运行结果

发送时间

消费时间

修改延时级别

RocketMQ的延迟等级可以进行修改,以满足自己的业务需求,可以修改/添加新的level。例如:你想支持1天的延迟,修改最后一个level的值为1d,这个时候依然是18个level;也可以增加一个1d,这个时候总共就有19个level。

  • 打开RocketMQ的配置文件,修改messageDelayLevel 属性

brokerClusterName = DefaultClusterbrokerName = broker-abrokerId = 0deleteWhen = 04fileReservedTime = 48brokerRole = ASYNC_MASTERflushDiskType = ASYNC_FLUSHstorePathRootDir = /app/rocketmq/datamessageDelayLevel=90s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

这次将延时等级1修改成了90s,生产者发送消息后需要90s后再进行消息投递。修改完成后重启RocketMQ。

nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &

  • 使用延时等级1发送消息

public void sendDelayMessage() { delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知录",1);}

  • 测试

发送时间

消费时间

通过比对发送时间与消费时间证明延时等级修改生效。

MQ延迟队列实现延迟消息相关推荐

  1. RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列

    搭建SpringBoot项目,用于演示 springboot版本 <!-- spring boot --><dependency><groupId>org.spri ...

  2. 【重难点】【RabbitMQ 01】消息队列的作用、主流的消息队列、RabbitMQ 基于什么传输消息、RabbitMQ 模型架构、死信队列和延迟队列

    [重难点][RabbitMQ 01]消息队列的作用.主流的消息队列.RabbitMQ 基于什么传输消息.RabbitMQ 模型架构.死信队列和延迟队列 文章目录 [重难点][RabbitMQ 01]消 ...

  3. SpringBoot整合RabbitMQ 消息可靠投递、手动ack、延迟队列、死信队列、消息幂等性保障、消息积压

    1.消息可靠投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式. confirm 确认模式 ...

  4. RabbitMq(十二) 借用死信交换机实现延迟队列

    概述:延迟队列即在消息发送后延迟固定时间后再去接受处理,做相应的一些相应. 应用场景举例:在电商购物后,订单支付前发送消息信息,在三分钟之后检查订单是否支付成功,如果支付,则取消订单并库存数量恢复:或 ...

  5. 谷粒商城笔记+踩坑(22)——库存自动解锁。RabbitMQ延迟队列

    导航: 谷粒商城笔记+踩坑汇总篇 目录 1 业务流程,订单失败后自动回滚解锁库存 可靠消息+最终一致性方案 2[仓库服务]RabbitMQ环境准备 2.1 导入依赖 2.2 yml配置RabbitMQ ...

  6. 面试官:RabbitMQ本身不支持延迟队列,那你给我实现一个?

    以下文章来源方志朋的博客,回复"666"获面试宝典 RabbitMQ本身没有延迟队列的支持,但是基于其本身的一些特性,可以做到类似延迟队列的效果:基于死信交换器+TTL. 以下介绍 ...

  7. rabbitmq利用死信队列+TTL 实现延迟队列

    2019独角兽企业重金招聘Python工程师标准>>> 适用场景:订单超时未支付,倘若适用定时器的话,那么数据量大的话,轮询查询数据,首先IO开销大,其次任务时间要求高,扫描越频繁性 ...

  8. redis延迟队列 实现_php使用redis的有序集合zset实现延迟队列

    延迟队列就是个带延迟功能的消息队列,相对于普通队列,它可以在指定时间消费掉消息. 延迟队列的应用场景: 1.新用户注册,10分钟后发送邮件或站内信. 2.用户下单后,30分钟未支付,订单自动作废. 我 ...

  9. redis延迟队列 实现_灵感来袭,基于Redis的分布式延迟队列

    一.延迟队列 延迟队列,也就是一定时间之后将消息体放入队列,然后消费者才能正常消费.比如1分钟之后发送短信,发送邮件,检测数据状态等. 二.Redisson Delayed Queue 如果你项目中使 ...

最新文章

  1. python入门编程题库-Python经典基础编程练习题(六)——每日10题
  2. python 全部缩进一行_Python(48)语言参考2:词法分析
  3. Java 多线程(一) 基础知识与概念
  4. 【C语言】(数组方式)输出一组成绩中的最高分与最低分
  5. 剑指offer-JZ30 包含min函数的栈(C++,附自己的分析)
  6. 关于String,StringBuffer和StringBuilder之间的区别和联系
  7. centos解压分卷rar_centos解压和压缩rar格式文件
  8. 等高线地图_高中地理——每日讲1题(北美洲的气候、等高线的阅读)
  9. Android学习笔记之AndroidManifest.xml文件解析(摘自皮狼的博客)
  10. 关于asp.net C# 导出Excel文件 打开Excel文件格式与扩展名指定格式不一致的解决办法...
  11. 指数基金之父Bogle为指数基金敲响了警钟
  12. 推荐VScode十大实用插件
  13. java 导出word模板
  14. java堆和栈 常量池_Java中栈、堆和常量池
  15. 开发APP软件需要多少钱?
  16. 学游戏设计好就业吗?有“钱”途吗?
  17. jenkins配置qq邮箱
  18. 纯js版本网页连连看原理分析和实现
  19. java数据类型_Java数据类型
  20. M-LAG—跨设备链路聚合组

热门文章

  1. 初识GeneXus产品
  2. 洛谷1781 宇宙总统
  3. opengl SwapBuffers的等待,虚伪的FPS
  4. axios 和洋葱模型中间件
  5. 论文精读:Mask R-CNN
  6. 南邮ctf nctf CG-CTF web题writeup
  7. draw.io绘图工具
  8. 更好用的HTTP客户端工具,跟SpringBoot绝配
  9. 解决uni-app微信小程序input输入框在底部时,键盘弹起页面整体上移问题
  10. Hexo Next 主题中添加本地搜索功能