业务背景

延时任务是非常普遍的业务场景之一,即系统某一动作触发后,经过一定时间的延时后再触发其他一个或多个动作。以订单系统为例:

  • 下单后10分钟未支付发送支付提醒
  • 下单30分钟内未支付订单自动取消

业界对延时任务的实现有不通过的解决方案,例如基于定时任务扫库/Redis ZSet/Rabbit MQ死信队列等等,本文对分布式延时任务的解决方案不做展开探讨,而是聚焦于基于RocketMQ的处理方案。

RocketMQ支持的延时消息机制允许生产者发送的消息在一定时间周期之后才能被消费者消费,基于该特性可以便捷的实现延时任务功能。

// 发送延时消息示例代码public void sendSheduleMessage() {    DefaultMQProducer producer = new DefaultMQProducer("Group");    producer.start();    Message message = new Message("topicname","content".getBytes());    // 设置延时级别为3,则消息在10秒之后投递给消费端    message.setDelayTimeLevel(3);    producer.send(message);}

延时消息的基本原理

RocketMQ延时消息实现的基本原理是:

如何保证延时消息发送到Broker后不会被立即消费?

Producer发送消息到达RockerMQ Broker,Broker根据延时级别判定是否为延时消息,如果是,则将该消息的原始目标Topic和目标队列备份到消息属性中,并将其替换为与该消息延时级别相对应的Topic和消费队列,然后执行消息存储的其他逻辑。由于消息投递的Topic和消费队列发生变化,所以该消息不会被客户端消费到。

如何保证达到延时周期后消费者能及时消费到消息?

RocketMQ内部定义了用于实现延时消息的统一Topic,并为每个延时级别定义各一个消费队列。同时,为每个延时级别初始化一个定时任务,通过定时任务扫描出满足投递条件的消息,然后将这些消息的目标Topic和消费队列变更为原来的Topic和消费队列并投递,此时,客户端就可以消费到延时消息了。

源码剖析

从延时消息的基本原理可以看出,实现延时消息涉及的几个关键因素:

  • 延时级别定义及初始化
  • 延时消息的Topic及消费队列转移
  • 延时消息的定时调度

延时级别定义及初始化

延时消息级别定义

// 类:org.apache.rocketmq.store.config.MessageStoreConfigprivate String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

内置延时级别的定义与加载

// org.apache.rocketmq.store.schedule.ScheduleMessageService// 延时级别和延时时间的映射关系:delayLevelTableprivate final ConcurrentMap delayLevelTable = new ConcurrentHashMap(32);public boolean parseDelayLevel() {    // 省略......    String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();    // 按照空格分隔延时级别字符串    String[] levelArray = levelString.split(" ");        // 存入映射表    this.delayLevelTable.put(level, delayTimeMillis);}

Broker接收延时消息的处理逻辑:转存Topic/Queue

// CommitLog.putMessage()// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {    // 如果发送的消息延时级别大于RocketMQ定义的最大值,则直接设置为最大值     if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {         msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());     }     // 获取内置的延时消息固定Topic:常量名SCHEDULE_TOPIC     topic = ScheduleMessageService.SCHEDULE_TOPIC;     // 根据延时级别获取消费队列ID:leve - 1     queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());      // 备份消息的原Topic和原队列ID      MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());      MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));      msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));      // 更改消息Topic为内置的延时消息Topic      msg.setTopic(topic);      // 更改消息队列为内置的延时消息队列      msg.setQueueId(queueId);}

根据延时级别和存储时间计算消息投递时间

// 根据延时级别和存储时间计算消息投递时间public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {        Long time = this.delayLevelTable.get(delayLevel);        if (time != null) {            return time + storeTimestamp;        }        return storeTimestamp + 1000;}

延时消息到期的自动投递:定时任务调度

延时队列定时任务初始化:

public void start() {    // 省略......    for (Map.Entry entry : this.delayLevelTable.entrySet()) {        if (timeDelay != null) {            // 为每个延时级别创建定时任务            this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);        }        // ......        // 为每个延时级别创建用于持久化的定时任务,默认10S持久化        this.timer.scheduleAtFixedRate(new TimerTask() {            @Override            public void run() {                if (started.get()) ScheduleMessageService.this.persist();            }        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());    }}

扫描是否满足投递条件

定时任务的主要处理类是DeliverDelayedMessageTimerTask

// DeliverDelayedMessageTimerTask// 定时任务的入口@Overridepublic void run() {   if (isStarted()) {       this.executeOnTimeup();   }}// 判定延时消息是否到期的主要逻辑public void executeOnTimeup() {    // 获取消费延时级别对应的消费队列    ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));     // ......     long now = System.currentTimeMillis();    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);    long countdown = deliverTimestamp - now;    if (countdown <= 0) {        MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( offsetPy, sizePy);    if (msgExt != null) {        try {         MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);        PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);        }      // ......

最后

延时消息是解决延时任务的方式之一,并不是所有的消息中间件都支持延时消息。RocketMQ的延时消息实现基于消息主题转换+定时调度实现,巧妙的利用了其自身的消息模型。另外,RocketMQ的延时级别是有限制的,默认支持18种,从1S到2H,实际项目使用时可能需要配合其他设计来完成具体的业务场景。

rocketmq原理_消息中间件漫谈:RocketMQ延时消息应用及原理剖析相关推荐

  1. RocketMQ-初体验RocketMQ(02)_单节点RocketMQ的安装

    文章目录 RocketMQ的安装(单节点) 安装虚拟机(optional) JDK 和 RocketMQ RocketMQ 版本及JDK的对应关系 版本选择 JDK1.8.0_221 安装 Rocke ...

  2. android qq功能实现原理,Android QQ、微信聊天消息界面设计原理与实现

     Android QQ.微信聊天消息界面设计原理与实现 原理:Android平台上,典型的以腾讯的QQ.微信这些聊天消息界面通常可以采用ListView设计与实现,需要使用ListView 适配器 ...

  3. rl滤波器原理_入门篇,层层讲解滤波电路工作原理

    在整流电路输出的电压是单向脉动性电压,不能直接给电子电路使用.所以要对输出的电压进行滤波, 消除电压中的交流成分,成为直流电后给电子电路使用.在滤波电路中,主要使用对交流电有特殊阻抗特性的器件,如:电 ...

  4. sentinel 阿里 原理_限流降级神器:哨兵(sentinel)原理分析

    文章较长,但是干货满满,建议收藏或关注后细读 Sentinel 是阿里中间件团队开源的,面向分布式服务架构的轻量级高可用流量控制组件,主要以流量为切入点,从流量控制.熔断降级.系统负载保护等多个维度来 ...

  5. 数字调制系统工作原理_水暖BA系统组成及各部工作原理

    原标题:水暖BA系统组成及各部工作原理 来源:机电人脉 如有侵权,请联系删除 01 暖通空调系统 系统监控功能: 智能中的空调系统是指空调机组.新风机组,变风量机组,风机盘管等设备.其控制主要是指温. ...

  6. l293d电机驱动原理_一文搞懂步进电机特性、原理及驱动器设计

    1.步进电机的概念 步进电机是将电脉冲信号,转变为角位移或线位移的开环控制电机,又称为脉冲电机.在非超载的情况下,电机的转速.停止的位置只取决于脉冲信号的频率和脉冲数,而不受负载变化的影响.当步进驱动 ...

  7. 相机成像原理_【亲子科学小实验】相机原理和小孔成像的秘密

    大家出去旅游的时候,都会带着照相机,拍下美景的同时,也给我们留下了永恒的回忆.但是,大家知不知道记录美好瞬间的照相机是如何工作成像的呢?下面就和小编一起来探究成像原理吧. 相机原理和小孔成像 材料 放 ...

  8. 红外测距模块工作原理_共享单车里的通讯模块,工作原理是啥呢?

    现在我们所看到了共享单车除了小黄车(OFO)没有配备GPS智能锁外,其他品牌的共享单车都有安装,那么这么高科技的东西具体是怎么工作的呢?下面由我给大家讲解下其中的奥秘. 其实这个东西也谈不上太多高科技 ...

  9. python解析原理_代码详解:Python虚拟环境的原理及使用

    Python的虚拟环境极大地方便了人们的生活.本指南先介绍虚拟环境的基础知识以及使用方法,然后再深入介绍虚拟环境背后的工作原理. 注意:本指南在macOS Mojave系统上使用最新版本的Python ...

最新文章

  1. 【学术快报】韩世辉课题组在《eLife》发表论文揭示群体冲突中复仇的神经生物学机制...
  2. 学习NGUI前的准备NGUI的相关信息
  3. Betriebssystem I 操作系统课件 01. Evolution von Rechnersystemen 计算机操作系统的演变
  4. pandas python csv_python:pandas合并csv文件的方法(图书数据集成)
  5. python第三方库文件传输_python3 post方式上传文件。
  6. html5通过api调数据库,使用HTML5数据库API [关闭](Using HTML5 Database API [closed])
  7. 如何知道交换机的缓存大小_网络基本功之细说交换机
  8. 用python画图代码意思_Python科学画图代码分享
  9. Vision Transformer太火!这门开源课也火了!十小时现场coding带你玩转ViT 爆款SOTA算法!...
  10. 数据库与Excel表格链接PHP,php读取Excel表格(Excel也可以做数据库)调用phpExcel类库...
  11. linux tar 基本格式、常用选项、压缩与解压缩
  12. paip.2013年技术趋势以及热点 v2.0 cae
  13. 一年工作经验的大专生程序员(java后台)
  14. 程序员学金融-金融科普(4)-净资产收益率
  15. 计算机系学生橱窗分析结果怎么写,2015年高职学生职业生涯规划书
  16. 《捉妖记》的命格解析
  17. 系统服务器算固定资产吗,服务器操作系统算固定资产
  18. 中原工学院计算机二级证书,中原工学院@计算机等级考试二级MS_Office基础知识(常考知识点记忆).doc...
  19. Windows10系统安装好用的截图软件--snipaste
  20. 计算机英语教案模板,英语教案模板范文

热门文章

  1. Goods:查询某个用户的购物车条目以及添加购物车条目
  2. 华为2014软件研发实习生面试经历
  3. 【求助】Android开发中的数据持久化
  4. ad中装配图如何导出_如何把endnote中的research note和title等一起导出成表格或者txt?...
  5. java中if条件中删除此行代码_Java中我如何去除if...else...语句?
  6. protected的继承方式有什么特点_草莓的授粉方式有哪些?各有什么特点
  7. android渠道校验,Android渠道版本自动化校验
  8. 制备石墨烯流程图_科研人员制备出小扭转角度双层石墨烯
  9. python计数循环,python - Python中的密码求解器循环计数 - SO中文参考 - www.soinside.com...
  10. caused by: java.lang.outofmemory_Caused by: java.lang.OutOfMemoryError: PermGen space