1. 概述

在 RocketMQ 4.9.3 版本中,@Git-Yang 对延迟消息做了很大的优化,大幅度提升了延迟消息的性能。

其中,PR#3287 将原先用来启动周期性任务的 Timer 改为使用 ScheduledExecutorService,将多延迟等级下同时发送延迟消息的性能提升了 3+ 倍。

本文主要讲解的是另一个改动 PR#3458:支持延迟消息的异步投递。老版本中,延迟消息到期投递到 CommitLog 的动作是同步的,在 Dledger 模式下性能很差。新的改动将延迟消息的到期投递模式改为可配置,使用 BrokerConfig 的 enableScheduleAsyncDeliver 属性进行配置。改成异步投递后,在 Dledger 下的性能提升了 3 倍左右。

本文着重讲解定时消息异步投递的逻辑,老版本的延迟消息流程和源码解析可以看这篇文章:RocketMQ 延迟消息(定时消息)

2. 改动解析

2.1 将多延迟等级延迟消息扫描和投递的任务从单线程执行改为多线程

这个改动将延迟消息的任务调度器从 Timer 改为 ScheduledExecutorService

在老版本中,所有 18 个延迟等级的定时消息扫描和投递任务都是由一个 Timer 启动定时任务执行的。Timer 中所有定时任务都是由一个工作线程单线程处理的,如果某个任务处理慢了,后续有新的任务进来,会导致新的任务需要等待前一个任务执行结束。

改为 ScheduledExecutorService 线程池之后多线程处理任务,可以大幅度提高延迟消息处理速度,并且避免多延迟等级消息同时发送时造成的阻塞。


改动后的性能变化,出处:https://github.com/apache/rocketmq/issues/3286

  • 改动前,同时向 4 个延迟等级发送延迟消息,TPS: 657

  • 改动后,同时向4个延迟等级发送延迟消息,TPS: 2453

2.2 支持延迟消息异步投递,提升 Dledger 模式下的投递性能

原本的定时消息投递为单线程同步投递,在 DLedger 模式下存在性能瓶颈。

因为在 DLedger 模式下,主节点的角色会变为 SYNC_MASTER,同步复制。即需要足够多的从节点存储了该消息后,才会向主节点返回写入成功。

本次改动将延迟消息的写入改成可配置同步或异步写入,异步写入在 DLedger 模式下性能提升了 3 倍左右。

2.2.1 异步投递的注意点

异步投递的两个主要缺点是

  1. 无法保证消息投递的顺序
  2. 消息可能重复投递

异步投递的注意点

  • 需要做流控,当写入 TPS 过高时,页缓存可能会繁忙;甚至节点内存会被打爆。

  • 可能存在消息可能丢失的情况,比如投递时页缓存繁忙或者其他原因导致一次投递失败。这时候的处理是对失败消息进行重新投递,重试 3 次失败后,阻塞当前延迟等级对应的线程,直到重新投递成功。

2.2.2 异步投递逻辑

首先回顾一下同步投递的逻辑:每个延迟等级都分配一个线程,不断启动任务去扫描该等级对应的消费队列中是否有到期的消息。如果有则将到期的消息一个个同步投递,投递成功后更新该等级对应的 offset,下个任务从该 offset 开始扫描新的消息。


异步投递的逻辑相比于同步投递有一些不同:

异步投递采用了生产-消费模式,生产和消费的对象是异步投递的任务。生产者线程负责将到期的消息创建投递任务,消费者消费这些任务,根据任务的执行状态来更新 offset 或者重试。
这里引入了一个阻塞队列作为异步投递任务的容器,阻塞队列的大小可以配置,表示可以同时投递的消息数。当队列中投递任务满时触发流控。

将对应延迟等级的消息异步投递时,需要将异步投递的任务放入处理队列。此时,可能由于流控等原因,投递任务未能放入队列,那么等待一会后再次执行扫描-投递逻辑。

消息并不会直接投递成功,所以需要消费者线程从队列中消费并判断这些异步投递任务的状态。如果投递任务已完成,则更新 offset;如果投递异常,则等待一会后重新同步投递;投递成功则更新 offset,投递失败则继续重试。

3. 异步投递详解

延迟消息的投递逻辑全部在 ScheduleMessageService 类中。

下面以一个延迟等级的处理为例,用图展示一下消息投递线程和任务更新线程的工作流程。

左边是定时消息到期投递线程,右边是投递过程状态更新线程。

3.1 定时消息投递线程

延迟消息投递服务中维护了一个 offset 表offsetTable,表示每个延迟等级当前投递的消息在 ConsumeQuque 中的逻辑 offset。
它用来在关机恢复时标明扫描开始位置,所以这个表会定期持久化到磁盘中,并且从节点会定期从主节点拉去该表的最新值。

延迟消息处理服务启动时会在 deliverExecutorService 线程池为每个延迟等级创建并执行一个 DeliverDelayedMessageTimerTask 任务,这个任务并不是周期性任务,而是在一个任务的末尾执行下一个任务。这个任务的 executeOnTimeup() 方法即消息投递的逻辑。上图展示的就是该方法中的逻辑。

  1. 获取该等级的 ConsumeQueue,依次扫描消息是否到期
  2. 如果消息到期,从 CommitLog 中查出该消息的完整信息,从属性中恢复它的真实 Topic 和 QueueId,然后投递。(根据配置同步或者异步投递,这里按异步讲解)
  3. 异步消息投递后,投递的过程被放入阻塞队列 deliverPendingTable
  4. 如果放入队列失败,表示此时出现流控或者阻塞,需要等待一会然后重新投递
  5. 如果全部投递成功,将 offset 更新为当前投递消息的 offset + 1,表示下一次从下一个 offset 开始扫描

3.2 投递过程状态更新线程

每个延迟等级在 handleExecutorService 线程池中启动了一个状态更新线程,每个线程执行 HandlePutResultTask 任务。同样,这个任务不是周期性任务,而是一个任务末尾启动一个新的任务。

HandlePutResultTask 任务不断从阻塞队列头部获取异步投递过程对象,判断其状态

  • 如果投递成功,更新 offset 和统计数据,并从队列中移除投递任务
  • 如果投递中,无动作
  • 如果投递错误,根据是否配置自动重试来执行重试或者直接跳过
  • 重试投递时采用同步投递,投递成功则更新 offset 和统计数据,然后移除;否则继续重新投递

全部任务扫描完毕后等待一会,执行新的HandlePutResultTask 任务。

4. 源码解析

4.1 定时消息投递任务

public void executeOnTimeup() {// 根据delayLevel查找对应的延迟消息ConsumeQueueConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));if (cq == null) {this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);return;}// 根据ConsumeQueue的有效延迟消息逻辑offset,获取所有有效的消息SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);if (bufferCQ == null) {long resetOffset;if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",this.offset, resetOffset, cq.getQueueId());} else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",this.offset, resetOffset, cq.getQueueId());} else {resetOffset = this.offset;}this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);return;}long nextOffset = this.offset;try {int i = 0;ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();// 遍历ConsumeQueue中的所有有效消息for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {// 获取ConsumeQueue索引的三个关键属性long offsetPy = bufferCQ.getByteBuffer().getLong();int sizePy = bufferCQ.getByteBuffer().getInt();long tagsCode = bufferCQ.getByteBuffer().getLong();if (cq.isExtAddr(tagsCode)) {if (cq.getExt(tagsCode, cqExtUnit)) {tagsCode = cqExtUnit.getTagsCode();} else {//can't find ext content.So re compute tags code.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",tagsCode, offsetPy, sizePy);long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);}}// ConsumeQueue里面的tagsCode实际是一个时间点(投递时间点)long now = System.currentTimeMillis();long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);// 如果现在已经到了投递时间点,投递消息// 如果现在还没到投递时间点,继续创建一个定时任务,countdown秒之后执行long countdown = deliverTimestamp - now;if (countdown > 0) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;}MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);if (msgExt == null) {continue;}MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",msgInner.getTopic(), msgInner);continue;}// 重新投递消息到CommitLogboolean deliverSuc;if (ScheduleMessageService.this.enableAsyncDeliver) {// 异步投递deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);} else {// 同步投递deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);}// 投递失败(流控、阻塞、投递异常等原因),等待0.1s再次执行投递任务if (!deliverSuc) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;}}nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);} catch (Exception e) {log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);} finally {bufferCQ.release();}// 该条ConsumeQueue索引对应的消息如果未到投递时间,那么创建一个定时任务,到投递时间时执行// 如果有还未投递的消息,创建定时任务后直接返回this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}
private boolean asyncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,int sizePy) {Queue<PutResultProcess> processesQueue = ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);//Flow Control 流控,如果阻塞队列中元素数量大于阈值则触发流控int currentPendingNum = processesQueue.size();int maxPendingLimit = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().getScheduleAsyncDeliverMaxPendingLimit();if (currentPendingNum > maxPendingLimit) {log.warn("Asynchronous deliver triggers flow control, " +"currentPendingNum={}, maxPendingLimit={}", currentPendingNum, maxPendingLimit);return false;}//Blocked 阻塞,如果有一个投递任务重试 3 次以上,阻塞该延迟等级的消息投递,直到该任务投递成功PutResultProcess firstProcess = processesQueue.peek();if (firstProcess != null && firstProcess.need2Blocked()) {log.warn("Asynchronous deliver block. info={}", firstProcess.toString());return false;}PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, true);processesQueue.add(resultProcess);return true;
}

4.2 异步投递过程状态更新任务

public void run() {LinkedBlockingQueue<PutResultProcess> pendingQueue =ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);PutResultProcess putResultProcess;// 循环获取队列中第一个投递任务,查看其执行状态并执行对应操作while ((putResultProcess = pendingQueue.peek()) != null) {try {switch (putResultProcess.getStatus()) {case SUCCESS:// 消息投递成功,从队列中移除该投递任务ScheduleMessageService.this.updateOffset(this.delayLevel, putResultProcess.getNextOffset());pendingQueue.remove();break;case RUNNING:// 正在投递,不做操作break;case EXCEPTION:// 投递出错if (!isStarted()) {log.warn("HandlePutResultTask shutdown, info={}", putResultProcess.toString());return;}log.warn("putResultProcess error, info={}", putResultProcess.toString());// onException 方法执行重试putResultProcess.onException();break;case SKIP:// 跳过,直接从队列中移除log.warn("putResultProcess skip, info={}", putResultProcess.toString());pendingQueue.remove();break;}} catch (Exception e) {log.error("HandlePutResultTask exception. info={}", putResultProcess.toString(), e);putResultProcess.onException();}}// 等待0.01s,继续下一次扫描if (isStarted()) {ScheduleMessageService.this.handleExecutorService.schedule(new HandlePutResultTask(this.delayLevel), DELAY_FOR_A_SLEEP, TimeUnit.MILLISECONDS);}
}
private void resend() {log.info("Resend message, info: {}", this.toString());// Gradually increase the resend interval.try {Thread.sleep(Math.min(this.resendCount++ * 100, 60 * 1000));} catch (InterruptedException e) {e.printStackTrace();}try {// 从 CommitLog 中查询消息完整信息MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(this.physicOffset, this.physicSize);// 如果查询失败,检查重试次数,如果到达 6 次则打印日志并跳过该消息if (msgExt == null) {log.warn("ScheduleMessageService resend not found message. info: {}", this.toString());this.status = need2Skip() ? ProcessStatus.SKIP : ProcessStatus.EXCEPTION;return;}MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);// 同步投递PutMessageResult result = ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);// 根据结果更新状态this.handleResult(result);if (result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK) {log.info("Resend message success, info: {}", this.toString());}} catch (Exception e) {this.status = ProcessStatus.EXCEPTION;log.error("Resend message error, info: {}", this.toString(), e);}
}

RocketMQ 延迟消息(定时消息)4.9.3 版本优化 异步投递支持相关推荐

  1. ActiveMQ—消息特性(延迟和定时消息投递)

    ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article ...

  2. telegram定时消息_ActiveMQ(18):Message之延迟和定时消息投递

    一.简介 延迟和定时消息投递(Delay and Schedule Message Delivery) 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消 ...

  3. rocketmq 重复消费_消息队列 RocketMQ

    引言 本文整理了RocketMQ的相关知识,方便以后查阅. 功能介绍 简单来说,消息队列就是基础数据结构课程里"先进先出"的一种数据结构,但是如果要消除单点故障,保证消息传输的可靠 ...

  4. 阿里云ONS / RocketMQ的定时消息 / 延时消息

    考虑延时和定时消息,是因为遇到了一个业务场景: 前置任务完成时发送消息,但因为一些业务原因,不希望消息马上被消费,因此需要设置延时. 文章目录 几种解决思路 实现方案 ONS延迟消息 RocketMQ ...

  5. 深入理解RocketMQ延迟消息

    延迟消息是实际开发中一个非常有用的功能,本文第一部分从整体上介绍秒级精度延迟消息的实现思路,在第二部分结合RocketMQ的延迟消息实现,进行细致的讲解,点出关键部分的源码.第三步介绍延迟消息与消息重 ...

  6. 记录Rocketmq定时消息不消费问题的排查过程

    记录Rocketmq定时消息不消费问题的排查过程 写在前面 问题根源初步确认 问题根源再次确认. 了解rocketmq定时消息原理以及处理过程. 源码调试 结论求证 后记 写在前面 此本记录了一个项目 ...

  7. RocketMQ延迟消息

    延时消息即消息发送后并不立即对消费者可见,而是在用户指定的时间投递给消费者.比如我们现在发送一条延时30秒的消息,消息发送后立即发送给服务器,但是服务器在30秒后才将该消息交给消费者. RocketM ...

  8. 面试常问Rocketmq延迟消息原理

    延迟消息在业务场景中使用的非常多,订单失效,过期通知等功能都可以借助延迟消息机制来实现.本文将从源码层面来分析Rocketmq的延迟消息实现原理机制. 一.延迟消息的使用    ​    ​    ​ ...

  9. 【RocketMQ工作原理】消息堆积与消费延迟

    概念 消息处理流程中,如果Consumer的消费速度跟不上Producer的发送速度,MQ中未处理的消息会越来 越多(进的多出的少),这部分消息就被称为堆积消息.消息出现堆积进而会造成消息的消费延迟. ...

  10. 厚积薄发--一文带您了解阿里云 RocketMQ 轻量版消息队列(MNS)

    作者: 周新宇&陈涛&李凯 阿里云 RocketMQ 轻量版(MNS)消息队列是一个轻量.可靠.可扩展且完全托管的分布式消息队列服务.MNS 能够帮助应用开发者在他们应用的分布式组件上 ...

最新文章

  1. 辞去美国终身教职回国的帅教授,拟增列为顶尖985大学博导
  2. 你的简历写了 “熟悉” zookeeper ?那这些你会吗?
  3. 从洗牌算法谈起--Python的random.shuffle函数实现原理
  4. css涟漪光圈扩散_CSS动画实例:圆的涟漪扩散
  5. 信息系统项目管理师历年论文题目
  6. javascript --- 手写Promise、快排、冒泡、单例模式+观察者模式
  7. 11. IDEA 在同一工作空间创建多个项目
  8. node.js npm常用命令
  9. python编程考试_《Python程序设计》试题库
  10. MPLS ××× Carrier Supporting Carrier Option AB(二)
  11. Material Design基础
  12. 103-PHP定义一个类
  13. [网络安全自学篇] 八十八.基于机器学习的恶意代码检测技术详解
  14. 小红书用户画像分析_棋牌游戏如何做好用户画像分析?
  15. GBK 与GB2312 互查 区位码
  16. 用python画机器猫--哆啦A梦,开干!
  17. git推送不能完全退出错误
  18. UESTC ACM训练题二
  19. toAppendStream doesn‘t support consuming update and delete changes which is produced by node XXX
  20. 华为鸿蒙删除视频报错rm: local.mp4: Owner died

热门文章

  1. 华为FusionCompute之计算虚拟化
  2. MSP430F149--点亮LED灯程序代码
  3. Git通过SSH拉取报错kex_exchange_identification
  4. 【LeetCode】18. 4Sum 四数之和
  5. 1004 成绩排名 (20 分)
  6. 戳这里,誉天胡老师教你如何备考 HCIE-Datacom!考试通关so easy!
  7. bat命令修改文件夹的后缀名
  8. mysql结构改写为hbase表_sqlite数据库修改表结构
  9. 什么是自然语言处理(NLP)?
  10. 51单片机的定时器/计数器