前言

在开发中经常会遇到延时任务的需求,例如在12306购买车票,若生成订单30分钟未支付则自动取消;还有在线商城完成订单后48小时不评价 ,自动5星好评。像这类在某事件触发后一段时间内执行的需求任务我们称之为 延时任务

那么如何实现延迟任务呢?

第一反应是利用cron方案来实现:

启动一个cron定时任务,每隔一段时间执行一次,比如30分钟,找到那些超时的数据,直接更新状态,或者拿出来执行一些操作。如果数据量比较大,需要分页查询,分页update,这将是一个for循环更新操作。

cron方案是很常见的一种方案,但是常见的不一定是最好的,主要有以下几个问题:

  • 当数据量大的时候轮询效率低;
  • 时效性不够好,如果每小时轮询一次,最差的情况时间误差会达到1小时;
  • 如果通过增加cron轮询频率来减少时间误差,则会出现轮询低效和重复计算的问题;

既然cron方案不是很理想,那就请出我们今天的主角,使用RocketMQ的延时消息解决。在创建订单的时候发送一条延时消息到RocketMQ,30分钟后消费者消费消息去检查订单的状态,如果发现订单未支付则取消订单释放库存。

实现

RocketMQ延迟队列的核心思路是:所有的延迟消息由producer发出之后,都会存放到同一个topic(SCHEDULE_TOPIC_XXXX)下,不同的延迟级别会对应不同的队列序号,当延迟时间到之后,由定时线程读取转换为普通的消息存的真实指定的topic下,此时对于consumer端此消息才可见,从而被consumer消费。

注意:RocketMQ不支持任意时间的延时,只支持以下几个固定的延时等级private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

下面我们结合SprintBoot利用RocketMQ发送延时消息

  • 引入RocketMQ组件
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>

  • 增加RocketMQ的配置
rocketmq:name-server: 172.31.0.44:9876producer:group: delay-group

  • 编写生产者
@Component
@Slf4j
public class DelayProduce {@Autowiredprivate RocketMQTemplate rocketMQTemplatet;public void sendDelayMessage(String topic,String message,int delayLevel){SendResult sendResult = rocketMQTemplatet.syncSend(topic, MessageBuilder.withPayload(message).build(), 2000, delayLevel);log.info("sendtime is {}", DateTimeFormatter.ofPattern("yyyy年MM月dd日 HH:mm:ss").format(LocalDateTime.now()));log.info("sendResult is{}",sendResult);}
}

  • 编写消费者
@Slf4j
@Component
@RocketMQMessageListener(topic = "delay-topic",consumerGroup = "delay-group"
)
public class DelayConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("received message time is {}", DateTimeFormatter.ofPattern("yyyy年MM月dd日 HH:mm:ss").format(LocalDateTime.now()));log.info("received message is {}",message);}
}

  • 测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayProduceTest {@Autowiredprivate DelayProduce delayProduce;@Testpublic void sendDelayMessage() {delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知录",5);}
}

这里delayLevel设置成5,对应RocketMQ的延时等级就是1分钟后投递消息。

  • 运行结果

发送时间

消费时间

修改延时级别

RocketMQ的延迟等级可以进行修改,以满足自己的业务需求,可以修改/添加新的level。例如:你想支持1天的延迟,修改最后一个level的值为1d,这个时候依然是18个level;也可以增加一个1d,这个时候总共就有19个level。

  • 打开RocketMQ的配置文件,修改 messageDelayLevel 属性
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
storePathRootDir = /app/rocketmq/data
messageDelayLevel=90s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

这次将延时等级1修改成了90s,生产者发送消息后需要90s后再进行消息投递。修改完成后重启RocketMQ。nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &

  • 使用延时等级1发送消息
public void sendDelayMessage() {delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知录",1);
}

  • 测试

发送时间

消费时间

通过比对发送时间与消费时间证明延时等级修改生效。

RocketMQ 相关文章

  • RocketMQ 入门基础 - 环境 & 整合
  • RocketMQ进阶-事务消息

for循环延时_RocketMQ进阶-延时消息相关推荐

  1. 两个service事务统一_RocketMQ进阶 - 事务消息

    分布式消息选型的时候是否支持事务消息是一个很重要的考量点,而目前只有RocketMQ对事务消息支持的最好.今天我们来唠唠如何实现RocketMQ的事务消息! Apache RocketMQ在4.3.0 ...

  2. 计算发送延时与传播延迟_消息队列——延时消息应用解析及实践

    简介:在大部分场景下业务系统如果只需要实现异步解耦.削峰填谷等能力,常规的普通消息就可以满足此类需求.除此之外,在某些特殊的业务场景中,普通消息类型存在无法满足需求的情况.这就需要消息队列服务本身支持 ...

  3. 嵌入式_常见延时方式的差异与选择(for循环延时、定时器延时、汇编延时....)

    嵌入式_常见延时方式的差异与选择(for循环延时.定时器延时.汇编延时-) 这里整理几种常见的延时方式,并做简单测试供大家参考,如果有什么不对的地方,欢迎指正,共同探讨. 文章目录 嵌入式_常见延时方 ...

  4. matlab怎样编写延时函数,编写延时函数的简单方法

    原标题:编写延时函数的简单方法 如果从keil里看了c语言的反汇编代码然后根据晶振和指令计算延时的时间这样虽然非常的准确但是相当的麻烦而且容易搞错,我这里介绍一个最简单的方法.可以验证你的 这里用一个 ...

  5. c语言不用死等的延时函数,matlab延时函数怎么写

    1. c语言延时函数delay,怎么算延时 下面是delay 函延迟函数里执行的都是空语句,也就是说通过循环执行空语句来达到延迟的目的.每执行一条语句,即使是空语句都要耗费电脑一些处理时间的,就是因为 ...

  6. 单片机硬件和软件延时、RTOS相对延时和绝对延时

    已剪辑自: https://mp.weixin.qq.com/s/-RPLQn4KO9Aqu1fpfZeOKA 前不久有个读者在问关于延时的问题,大概就是问:软件延时和硬件延时是啥意思?做项目时他俩有 ...

  7. FreeRTOS中相对延时和绝对延时的区别

    在公众号列表中,长按我的公众号,置顶公众号,就可以随时看到我. 相信许多朋友都有过这么一个需求:固定一个时间(周期)去处理某一件事情. 比如:固定间隔10ms去采集传感器的数据,然后通过一种算法计算出 ...

  8. 【SpringBoot】43、SpringBoot中整合RabbitMQ实现延时队列(延时插件篇)

    死信队列实现篇,参考文章:[SpringBoot]60.SpringBoot中整合RabbitMQ实现延时队列(死信队列篇) 一.介绍 1.什么是延时队列? 延时队列即就是放置在该队列里面的消息是不需 ...

  9. RTOS中相对延时和绝对延时的区别

    关注+星标公众号,不错过精彩内容 作者 | strongerHuang 微信公众号 | 嵌入式专栏 相信许多朋友都有过这么一个需求:固定一个时间(周期)去处理某一件事情. 比如:固定间隔10ms去采集 ...

最新文章

  1. 解读2016全球ICT 50强榜单:谁动了传统通信产业的奶酪?
  2. 菜鸟学Linux 第050篇笔记 dhcp
  3. 《iOS 9 开发指南》——第6章,第6.4节 Interface Builder中的故事板——Storyboarding...
  4. Selenium3自动化测试——14.操作Cookie
  5. mysql 1100_mysql数据库选择,有1100个用户,每个用户每月生成一张表,使用中该表内每秒上传一条数据,数据量很大...
  6. java方法重载实事例_零基础java入门教程函数重载function实例化格式案例
  7. 【Spring】IOC
  8. SDP在SIP协议中的应用
  9. 企业级大数据架构设计及规划方案.ppt
  10. 大数据下的图片类别以及图片爬取详细的过程(一)
  11. 计算机软件师倾斜怎么能摆正,ps中怎么把倾斜图案拉直
  12. android手机内存什么东西,清理手机必须要知道,这些文件夹里都装了些什么?...
  13. 【毕业设计】基于人脸登录的大学生快递系统
  14. iOS 权限设置判断和跳转 - 最全最详细
  15. 旷视研究院张祥雨:3年看1800篇论文,28岁掌舵旷视基础模型研究
  16. 工作流程管理系统,表结构与运行机制
  17. 存储新图谱:DNA存储的边界与天地
  18. 2022年计算机一级MS Office模拟冲刺题及答案
  19. 基于中文维基百科的词向量构建及可视化
  20. windows 10 文件夹无法移动和重命名,提示找不到指定文件

热门文章

  1. 感受学生考勤“智慧化”变革 签到荚让校园更智慧
  2. python list操作复杂度
  3. maven项目部署打包
  4. vsftpd的不同安装方式及服务控制脚本
  5. 【转】EXC_BAD_ACCESS问题在xode4下的调试技巧
  6. Power Designer的使用
  7. C# List泛型集合中的GroupBy用法
  8. transactionscope报“此操作对该事务的状态无效”问题
  9. 实践SQLServer Tuning
  10. VSS(2005)中如何强行签入文件