1.延迟消息的使用

Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
msg.setDelayTimeLevel(1);
SendResult sendResult = producer.send(msg);

只需要在创建消息的时候指定消息的延迟级别就可以了,默认有18个延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

2.延迟消息的实现原理

在RocketMQ中,发送一个消息我们都是需要指定消息投递到哪个topic,但是如果这个消息设置了消息的延迟级别,那么该消息投递的就不是目标topic的,而是一个叫SCHEDULE_TOPIC_XXXX的topic,然后会有一个定时任务定时地去检查该topic里面的消息对应的延时时间是否已经结束了,如果结束了就把该消息重新放回目标topic,那么消费者此时就可以消费到了

3.延迟消息源码实现

(1)commitlog写入延迟消息

org.apache.rocketmq.store.CommitLog#asyncPutMessage

调用链就不贴出来了,最终消息都是需要调用CommitLog的写入方法

if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// 条件成立:说明用户发送的是延时消息,设置了消息的延时级别if (msg.getDelayTimeLevel() > 0) {// 重置下延时级别,不能太大if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}// 延时消息投递的topic,SCHEDULE_TOPIC_XXXXtopic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;// 延时消息投递的queueId,delayLevel - 1queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// 备份下消息原本要投递的topic以及queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));// 设置延时消息投递的topic以及queueIdmsg.setTopic(topic);msg.setQueueId(queueId);}
}

由于该方法太长,所以这里只贴出关于处理延时消息的部分。可以看到,如果消息设置了延迟级别(默认等于0),首先会把原始投递的topic和queueId保存到自身属性中,然后会把原始topic换成SCHEDULE_TOPIC_XXXX这个topic,并且queueId为延迟级别-1,然后后面就会写入commitlog文件中

(2)consumequeue写入延迟消息

org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput

消息写入了commitlog之后,后台线程会异步地把消息的索引信息写入到consumequeue文件中

DispatchRequest dispatchRequest =DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);

org.apache.rocketmq.store.CommitLog#checkMessageAndReturnSize(java.nio.ByteBuffer, boolean, boolean)

......
{// 获取用户设置的消息延迟级别String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);// 这里基本都会成立,当发送延迟消息的时候,会先写入到commitlog中,并且会把消息的topic改成SCHEDULE_TOPIC_XXXXif (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {int delayLevel = Integer.parseInt(t);// 重置消息延迟级别if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();}if (delayLevel > 0) {// 根据延迟级别计算出消息延迟结束时间,也就是说对于延迟消息来说,在延迟时间还没结束之前,ConsumeQueueData中的tagCode记录的是延时结束时间tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,storeTimestamp);}}
}
......

所以可以看到对于延迟消息来说,在延迟队列对应的consumequeue中存储的条目数据其中tagCode这一块内容存储的并不是该消息的tagCode,而是该消息的延迟结束时间

(3)加载负责延迟消息组件---ScheduleMessageService

在broker启动的时候,会去加载各种文件中的数据到内存,其中需要把delayOffset.json文件加载到内存中,而这个工作就是ScheduleMessageService去做的

org.apache.rocketmq.store.schedule.ScheduleMessageService#load

public boolean load() {boolean result = super.load();// 初始化delayLevelTable表result = result && this.parseDelayLevel();return result;
}

调用父类的load方法,在父类的load方法中读取到文件内容之后,就会去转换成java对象,看decode方法

public void decode(String jsonString) {if (jsonString != null) {DelayOffsetSerializeWrapper delayOffsetSerializeWrapper =DelayOffsetSerializeWrapper.fromJson(jsonString, DelayOffsetSerializeWrapper.class);if (delayOffsetSerializeWrapper != null) {this.offsetTable.putAll(delayOffsetSerializeWrapper.getOffsetTable());}}
}
public class DelayOffsetSerializeWrapper extends RemotingSerializable {private ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =new ConcurrentHashMap<Integer, Long>(32);public ConcurrentMap<Integer, Long> getOffsetTable() {return offsetTable;}public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) {this.offsetTable = offsetTable;}
}

可以看到里面存的数据就是每一个延迟队列对应的消息最大偏移量。但是仔细看load方法,在调用完父类的load方法之后,ScheduleMessageService还会调用一个parseDelayLevel方法,代码如下

/*** 初始化delayLevelTable表* @return*/
public boolean parseDelayLevel() {// 计算出每一个时间单位对应的毫秒数HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();timeUnitTable.put("s", 1000L);timeUnitTable.put("m", 1000L * 60);timeUnitTable.put("h", 1000L * 60 * 60);timeUnitTable.put("d", 1000L * 60 * 60 * 24);// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hString levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();try {String[] levelArray = levelString.split(" ");for (int i = 0; i < levelArray.length; i++) {String value = levelArray[i];// 获取到时间单位String ch = value.substring(value.length() - 1);// 获取该时间对应的毫秒数Long tu = timeUnitTable.get(ch);int level = i + 1;if (level > this.maxDelayLevel) {// 得到最大的延迟级别this.maxDelayLevel = level;}long num = Long.parseLong(value.substring(0, value.length() - 1));long delayTimeMillis = tu * num;this.delayLevelTable.put(level, delayTimeMillis);}} catch (Exception e) {log.error("parseDelayLevel exception", e);log.info("levelString String = {}", levelString);return false;}return true;
}

这个方法中做的事情也很简单,就是把配置中的每一个延迟级别都转换成对应的时间毫秒数,然后初始化delayLevelTable表

(4)启动ScheduleMessageService

org.apache.rocketmq.store.schedule.ScheduleMessageService#start

/*** 在消息存储服务启动的时候会被调用*/
public void start() {if (started.compareAndSet(false, true)) {this.timer = new Timer("ScheduleMessageTimerThread", true);for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {Integer level = entry.getKey();Long timeDelay = entry.getValue();Long offset = this.offsetTable.get(level);if (null == offset) {offset = 0L;}// 给每一个延迟级别都启动对应的TimeTask,延迟1s执行if (timeDelay != null) {this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);}}// 每10s把每一个延迟队列的最大消息偏移量写入到磁盘中this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {if (started.get()) ScheduleMessageService.this.persist();} catch (Throwable e) {log.error("scheduleAtFixedRate flush exception", e);}}}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());}
}

首先遍历delayLevelTable表,根据延迟级别从offsetTable表中找到该延迟队列中最大的消息偏移量,然后给每一个延迟级别都创建一个TimeTask,并且把对应的延迟级别和消息偏移量都传进TimeTask中,然后延迟1s执行。接着再创建一个定时任务每10s把每一个延迟队列的最大消息偏移量写入到delayOffset.json文件中

(5)执行每一个TimeTask

org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup

ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));

根据延迟级别找到topic为SCHEDULE_TOPIC_XXXX的队列,然后根据offset得到对应的consumequeue文件,再返回该文件从起始偏移量到写入位点的所有ConsumeQueueData

// 消息的commitlog物理偏移量
long offsetPy = bufferCQ.getByteBuffer().getLong();
// 消息大小
int sizePy = bufferCQ.getByteBuffer().getInt();
// 延迟结束时间,在消息写入到commitlog之后会进行分发到consumequeue,而对于延迟消息来说,tagCode这个位置存储的是该消息的延迟到期时间
long tagsCode = bufferCQ.getByteBuffer().getLong();

获取到ConsumeQueueData中存储的值

// 获取当前时间
long now = System.currentTimeMillis();
// 得到正确的到期时间
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);// 这里计算出的偏移位是当前遍历的前一个ConsumeQueueData的位点,作用是当延迟消息投递到原始的topic失败的时候会根据这个偏移位点去重新执行投递
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);long countdown = deliverTimestamp - now;// 条件成立: 说明延迟时间已经结束了
if (countdown <= 0) {// 根据commitlog物理偏移量找到msgMessageExt msgExt =ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);if (msgExt != null) {try {// cp创建一个新的msg对象,该msg对象的topic以及queueId都回到了原始的值MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",msgInner.getTopic(), msgInner);continue;}// 写入到commitlog中PutMessageResult putMessageResult =ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);// 写入成功,跳过该次循环判断下一条延迟消息是否达到到期时间if (putMessageResult != null&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {continue;} else {// XXX: warn and notify melog.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",msgExt.getTopic(), msgExt.getMsgId());ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,nextOffset), DELAY_FOR_A_PERIOD);ScheduleMessageService.this.updateOffset(this.delayLevel,nextOffset);return;}} catch (Exception e) {/** XXX: warn and notify me*/log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="+ offsetPy + ",sizePy=" + sizePy, e);}}
}
// 条件成立: 说明此时队列中的延迟时间还未结束
else {// 重新提交一个TimeTask,并且设置的延迟执行时间为消息剩余的延时时间ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),countdown);// 更新延迟队列已消费的消息偏移量ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;
}

然后就会去判断该消息的延迟时间是否已经结束了,如果结束了,那么就把消息重新投递到原来的topic的队列中(原始的topic和queueId存储在消息属性中),然后再去遍历下一个ConsumeQueueData,否则如果这个ConsumeQueueData中的延迟时间还未结束,就重新提交一个TimeTask,该TimeTask延迟执行的时间是这个ConsumeQueueData剩余的延时时间(当这个TimeTask执行的时候,这个ConsumeQueueData肯定延时时间肯定就已经结束了),然后再更新该延迟队列已消费的消息偏移量

if (putMessageResult != null&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {continue;
} else {// XXX: warn and notify melog.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",msgExt.getTopic(), msgExt.getMsgId());ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,nextOffset), DELAY_FOR_A_PERIOD);ScheduleMessageService.this.updateOffset(this.delayLevel,nextOffset);return;
}

但是如果写入延迟消息失败的话,就需要从上一个ConsumeQueue的偏移量开始,延迟0.1s重新执行TimeTask,并且把上一个ConsumeQueueData的偏移量更新到该延迟队列的最大已消费偏移量

// 如果已经遍历完了延迟队列中的所有消息了,那么就计算出此时最后一个消息的偏移位,然后根据这个偏移位再次发起一个TimeTask
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
// 更新延迟队列已消费的消息偏移量
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;

如果消息都遍历完了,那么就延迟0.1s再发起一个TimeTask重新遍历该consumequeue文件,并且此时会更新延迟队列已消费的消息偏移量

ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,failScheduleOffset), DELAY_FOR_A_WHILE);

如果是一开始ScheduleMessageService启动的时候,此时延迟队列是还没有对应的consumequeue文件的,所以此时会延迟0.1s重新发起一个TimeTask

RocketMQ延迟消息的底层实现源码解析相关推荐

  1. Spring源码深度解析(郝佳)-学习-Spring消息-整合RabbitMQ及源码解析

      我们经常在Spring项目中或者Spring Boot项目中使用RabbitMQ,一般使用的时候,己经由前人将配置配置好了,我们只需要写一个注解或者调用一个消息发送或者接收消息的监听器即可,但是底 ...

  2. 集合深度学习07—Set、HashSet、LinkedHashSet、TreeSet、 底层原理 源码解析

    一.Set接口 特点: 唯一 无序(相对List接口部分来说的,无序不等于随机) 没有索引相关的方法 遍历方式: 迭代器 增强 for 循环(底层还是 Itr 迭代器) 二.HashSet 1. Ha ...

  3. Spring事务管理的底层逻辑—源码解析

    本文代码为spring 5.1.2 spring是如何控制事务的提交和回滚 加上@Transactional注解之后,Spring可以启到事务控制的功能了,再正式执行方法前它会做一些操作,我们来看看 ...

  4. RocketMQ源码解析之消息消费者(consume Message)

    原创不易,转载请注明出处 文章目录 前言 1.消息流程介绍 2.源码解析 2.1 并发消费 2.2 顺序消费 前言 我们在<RocketMQ源码解析之消息消费者(pullMessage)> ...

  5. RocketMQ源码解析之broker文件清理

    原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...

  6. 跟我学RocketMQ之批量消息发送源码解析

    上篇文章 跟我学RocketMQ之消息发送源码解析 中,我们已经对普通消息的发送流程进行了详细的解释,但是由于篇幅问题没有展开讲解批量消息的发送.本文中,我们就一起来集中分析一下批量消息的发送是怎样的 ...

  7. 面试常问Rocketmq延迟消息原理

    延迟消息在业务场景中使用的非常多,订单失效,过期通知等功能都可以借助延迟消息机制来实现.本文将从源码层面来分析Rocketmq的延迟消息实现原理机制. 一.延迟消息的使用    ​    ​    ​ ...

  8. Colly源码解析——结合例子分析底层实现

    通过<Colly源码解析--框架>分析,我们可以知道Colly执行的主要流程.本文将结合http://go-colly.org上的例子分析一些高级设置的底层实现.(转载请指明出于break ...

  9. Handler消息机制(九):IntentService源码解析

    作者:jtsky 链接:https://www.jianshu.com/p/0a150ec09a32 简介 首先我们先来了解HandlerThread和IntentService是什么,以及为什么要将 ...

  10. RocketMQ源码解析:Filtersrv

    ???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...

最新文章

  1. java shiro实例_Apache Shiro入门实例
  2. 电脑端京东的我的订单html+css页面_什么是前端和后端开发?写给即将迈入前端开发领域的朋友...
  3. 了解区块链,从挖矿开始
  4. J2EE中使用jstl报http://java.sun.com/jsp/jstl/core cannot be resolved in either web.xml or the jar错...
  5. 读者看《赢道:成功创业者的28条戒律》
  6. 汪子熙微信公众号的写作计划
  7. c++类指针赋值表达式必须是可修改的左值_C++学习刷题8--复制构造函数和赋值运算符重载函数...
  8. 树莓派与阿里云服务器之间的无线通信(非局域网)
  9. 你们都被电视剧版的 《西游记》给骗了!| 今日趣图
  10. 销售员所做的一切工作最终目的就是为了成交
  11. c语言课程设计2018,C语言课程设计报告(2018)——学生管理系统(17页)-原创力文档...
  12. 【亚伦博客】反方观点: 下载不是偷窃
  13. 从零开始学前端:弹性盒模型(flex布局) --- 今天你学习了吗?(CSS:Day19)
  14. java 确定对象的引用_JVM学习笔记之了解对象存活判断和4种引用【三】
  15. java 的记住用户名和密码,JAVA--高级基础开发Cookie实现记住用户名和密码
  16. 身份认证 对称密钥的认证协议 公开密钥的认证协议 公钥基础设施PKI
  17. jquery设置禁止浏览器刷新
  18. DeepFool论文阅读
  19. [linux]scp与服务器互传文件
  20. 同一样商品,不同颜色和尺码的批量新增

热门文章

  1. 【python】字符串转换整数 (atoi) - String
  2. Kaggle Future Sales“”竞赛 XGB_model_final
  3. 180.连续出现的数字
  4. 138.复制带随机指针的链表
  5. tensorflow安装中踩到的坑protobuf、h5py、tensorboard、werkzeug
  6. python join函数的作用_Python join()函数原理及使用方法
  7. modelsim安装_XLINUXFPGA开发工具篇modelsim的安装
  8. 利用图片指纹检测高相似度图片--相似图片搜索的原理
  9. 转录组拼接软件Trinity使用安装报错锦集
  10. 最新nodejs的开发学习实战(1)从一个博客开始