深入理解RocketMQ延迟消息
延迟消息是实际开发中一个非常有用的功能,本文第一部分从整体上介绍秒级精度延迟消息的实现思路,在第二部分结合RocketMQ的延迟消息实现,进行细致的讲解,点出关键部分的源码。第三步介绍延迟消息与消息重试的关系。
1 延迟消息介绍
基本概念:延迟消息是指生产者发送消息发送消息后,不能立刻被消费者消费,需要等待指定的时间后才可以被消费。
场景案例:用户下了一个订单之后,需要在指定时间内(例如30分钟)进行支付,在到期之前可以发送一个消息提醒用户进行支付。
一些消息中间件的Broker端内置了延迟消息支持的能力,如:
NSQ:这是一个go语言的消息中间件,其通过内存中的优先级队列来保存延迟消息,支持秒级精度,最多2个小时延迟。Java中也有对应的实现,如ScheduledThreadPoolExecutor内部实际上也是使用了优先级队列。
QMQ:采用双重时间轮实现。可参考:任意时间延时消息原理讲解:设计与实现
RabbitMQ:需要安装一个rabbitmq_delayed_message_exchange插件。
RocketMQ:RocketMQ 开源版本延迟消息临时存储在一个内部主题中,不支持任意时间精度,支持特定的 level,例如定时 5s,10s,1m 等。
Broker端内置延迟消息处理能力,核心实现思路都是一样:将延迟消息通过一个临时存储进行暂存,到期后才投递到目标Topic中。如下图所示:
步骤说明如下:
producer要将一个延迟消息发送到某个Topic中
Broker判断这是一个延迟消息后,将其通过临时存储进行暂存。
Broker内部通过一个延迟服务(delay service)检查消息是否到期,将到期的消息投递到目标Topic中。这个的延迟服务名字为delay service,不同消息中间件的延迟服务模块名称可能不同。
消费者消费目标topic中的延迟投递的消息
显然,临时存储模块和延迟服务模块,是延迟消息实现的关键。上图中,临时存储和延迟服务都是在Broker内部实现,对业务透明。
此外, 还有一些消息中间件原生并不支持延迟消息,如Kafka。在这种情况下,可以选择对Kafka进行改造,但是成本较大。另外一种方式是使用第三方临时存储,并加一层代理。
第三方存储选型要求:
对于第三方临时存储,其需要满足以下几个特点:
高性能:写入延迟要低,MQ的一个重要作用是削峰填谷,在选择临时存储时,写入性能必须要高,关系型数据库(如Mysql)通常不满足需求。
高可靠:延迟消息写入后,不能丢失,需要进行持久化,并进行备份
支持排序:支持按照某个字段对消息进行排序,对于延迟消息需要按照时间进行排序。普通消息通常先发送的会被先消费,延迟消息与普通消息不同,需要进行排序。例如先发一条延迟10s的消息,再发一条延迟5s的消息,那么后发送的消息需要被先消费。
支持长时间保存:一些业务的延迟消息,需要延迟几个月,甚至更长,所以延迟消息必须能长时间保留。不过通常不建议延迟太长时间,存储成本比较大,且业务逻辑可能已经发生变化,已经不需要消费这些消息。
例如,滴滴开源的消息中间件DDMQ,底层消息中间件的基础上加了一层代理,独立部署延迟服务模块,使用rocksdb进行临时存储。rocksdb是一个高性能的KV存储,并支持排序。
此时对于延迟消息的流转如下图所示:
说明如下:
生产者将发送给producer proxy,proxy判断是延迟消息,将其投递到一个缓冲Topic中;
delay service启动消费者,用于从缓冲topic中消费延迟消息,以时间为key,存储到rocksdb中;
delay service判断消息到期后,将其投递到目标Topic中。
消费者消费目标topic中的数据
这种方式的好处是,因为delay service的延迟投递能力是独立于broker实现的,不需要对broker做任何改造,对于任意MQ类型都可以提供支持延迟消息的能力。例如DDMQ对RocketMQ、Kafka都提供了秒级精度的延迟消息投递能力,但是Kafka本身并不支持延迟消息,而RocketMQ虽然支持延迟消息,但不支持秒级精度。
事实上,DDMQ还提供了很多其他功能,仅仅从延迟消息的角度,完全没有必要使用这个proxy,直接将消息投递到缓冲Topic中,之后通过delay service完成延迟投递逻辑即可。
具体到delay service模块的实现上,也有一些重要的细节:
为了保证服务的高可用,delay service也是需要部署多个节点。
为了保证数据不丢失,每个delay service节点都需要消费缓冲Topic中的全量数据,保存到各自的持久化存储中,这样就有了多个备份,并需要以时间为key。不过因为是各自拉取,并不能保证强一致。如果一定要强一致,那么delay service就不需要内置存储实现,可以借助于其他支持强一致的存储。
为了避免重复投递,delay service需要进行选主,可以借助于zookeeper、etcd等实现。只有master可以通过生产者投递到目标Topic中,其他节点处于备用状态。否则,如果每个节点进行都投递,那么延迟消息就会被投递多次,造成消费重复。
master要记录自己当前投递到的时间到一个共享存储中,如果master挂了,从slave节点中选出一个新的master节点,从之前记录时间继续开始投递。
延迟消息的取消:一些延迟消息在未到期之前,可能希望进行取消。通常取消逻辑实现较为复杂,且不够精确。对于那些已经快要到期的消息,可能还未取消之前,已经发送出去了,因此需要在消费者端做检查,才能万无一失。
2 RocketMQ中的延迟消息
开源RocketMQ支持延迟消息,但是不支持秒级精度。默认支持18个level的延迟消息,这是通过broker端的messageDelayLevel配置项确定的,如下:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Broker在启动时,内部会创建一个内部主题:SCHEDULE_TOPIC_XXXX,根据延迟level的个数,创建对应数量的队列,也就是说18个level对应了18个队列。注意,这并不是说这个内部主题只会有18个队列,因为Broker通常是集群模式部署的,因此每个节点都有18个队列。
延迟级别的值可以进行修改,以满足自己的业务需求,可以修改/添加新的level。例如:你想支持2天的延迟,修改最后一个level的值为2d,这个时候依然是18个level;也可以增加一个2d,这个时候总共就有19个level。
可以看到这里并不支持秒级精度,按照《rocketmq developer guide》中的说法,是为了避免在broker对消息进行排序,造成性能影响。不过笔者考虑,之所以不支持,更多应该是商业上的考虑。
生产者发送延迟消息:
生产者在发送延迟消息非常简单,只需要设置一个延迟级别即可,注意不是具体的延迟时间,如:
Message msg=new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
//设置延迟level为5,对应延迟1分钟
msg.setDelayTimeLevel(5);
producer.send(msg);
如果设置的延迟level超过最大值,那么将会重置最最大值。
Broker端存储延迟消息:
延迟消息在RocketMQ Broker端的流转如下图所示:
可以看到,总共有6个步骤,下面会对这6个步骤进行详细的讲解:
修改消息Topic名称和队列信息
转发消息到延迟主题的CosumeQueue中
延迟服务消费SCHEDULE_TOPIC_XXXX消息
将信息重新存储到CommitLog中
将消息投递到目标Topic中
消费者消费目标topic中的数据
第一步:修改消息Topic名称和队列信息
RocketMQ Broker端在存储生产者写入的消息时,首先都会将其写入到CommitLog中。之后根据消息中的Topic信息和队列信息,将其转发到目标Topic的指定队列(ConsumeQueue)中。
由于消息一旦存储到ConsumeQueue中,消费者就能消费到,而延迟消息不能被立即消费,所以这里将Topic的名称修改为SCHEDULE_TOPIC_XXXX,并根据延迟级别确定要投递到哪个队列下。
同时,还会将消息原来要发送到的目标Topic和队列信息存储到消息的属性中。相关源码如下所示:
org.apache.rocketmq.store.CommitLog#putMessage
第二步:转发消息到延迟主题的CosumeQueue中
CommitLog中的消息转发到CosumeQueue中是异步进行的。在转发过程中,会对延迟消息进行特殊处理,主要是计算这条延迟消息需要在什么时候进行投递。
投递时间=消息存储时间(storeTimestamp) + 延迟级别对应的时间
需要注意的是,会将计算出的投递时间当做消息Tag的哈希值存储到CosumeQueue中,CosumeQueue单个存储单元组成结构如下图所示:
其中:
Commit Log Offset:记录在CommitLog中的位置。
Size:记录消息的大小
Message Tag HashCode:记录消息Tag的哈希值,用于消息过滤。特别的,对于延迟消息,这个字段记录的是消息的投递时间戳。这也是为什么java中hashCode方法返回一个int型,只占用4个字节,而这里Message Tag HashCode字段却设计成8个字节的原因。
相关源码参见:
CommitLog#checkMessageAndReturnSize
第三步:延迟服务消费SCHEDULE_TOPIC_XXXX消息
Broker内部有一个ScheduleMessageService类,其充当延迟服务,消费SCHEDULE_TOPIC_XXXX中的消息,并投递到目标Topic中。
ScheduleMessageService在启动时,其会创建一个定时器Timer,并根据延迟级别的个数,启动对应数量的TimerTask,每个TimerTask负责一个延迟级别的消费与投递。
相关源码如下所示:
ScheduleMessageService#start
需要注意的是,每个TimeTask在检查消息是否到期时,首先检查对应队列中尚未投递第一条消息,如果这条消息没到期,那么之后的消息都不会检查。如果到期了,则进行投递,并检查之后的消息是否到期。
第四步:将信息重新存储到CommitLog中
在将消息到期后,需要投递到目标Topic。由于在第一步已经记录了原来的Topic和队列信息,因此这里重新设置,再存储到CommitLog即可。此外,由于之前Message Tag HashCode字段存储的是消息的投递时间,这里需要重新计算tag的哈希值后再存储。
源码参见:DeliverDelayedMessageTimerTask的messageTimeup方法。
第五步:将消息投递到目标Topic中
这一步与第二步类似,不过由于消息的Topic名称已经改为了目标Topic。因此消息会直接投递到目标Topic的ConsumeQueue中,之后消费者即消费到这条消息。
3 延迟消息与消费重试的关系
RocketMQ提供了消息重试的能力,在并发模式消费消费失败的情况下,可以返回一个枚举值RECONSUME_LATER,那么消息之后将会进行重试。如:
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {//处理消息,失败,返回RECONSUME_LATER,进行重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});
重试默认会进行重试16次。使用过RocketMQ消息重试功能的用户,可能看到过以下这张图:
第几次重试 | 与上次重试的间隔时间 | 第几次重试 | 与上次重试的间隔时间 |
1 | 10 秒 | 9 | 7 分钟 |
2 | 30 秒 | 10 | 8 分钟 |
3 | 1 分钟 | 11 | 9 分钟 |
4 | 2 分钟 | 12 | 10 分钟 |
5 | 3 分钟 | 13 | 20 分钟 |
6 | 4 分钟 | 14 | 30 分钟 |
7 | 5 分钟 | 15 | 1 小时 |
8 | 6 分钟 | 16 | 2 小时 |
细心地的读者发现了,消息重试的16个级别,实际上是把延迟消息18个级别的前两个level去掉了。事实上,RocketMQ的消息重试也是基于延迟消息来完成的。在消息消费失败的情况下,将其重新当做延迟消息投递回Broker。
在投递回去时,会跳过前两个level,因此只重试16次。当然,消息重试还有一些其他的设计逻辑,在之后的文章将会进行分析。
长按二维码,关注我,加好友,进群交流
往期精彩
消息中间件的四种投递模式对比
RocketMQ NameServer深入剖析
深入理解RocketMQ消息查询机制
深入理解RocketMQ Rebalance机制
数据库中间件详解
异地多活场景下的数据同步之道
mysql binlog应用场景与原理深度剖析
InnoDB MVCC 机制
深入理解数据库编程中的超时设置
可靠消息一致性的奇淫技巧
分布式事务概述
详解HTTP 与TCP中Keep-Alive机制的区别
TCP粘包、拆包与通信协议详解
剖析Spring多数据源
深入理解RocketMQ延迟消息相关推荐
- 面试常问Rocketmq延迟消息原理
延迟消息在业务场景中使用的非常多,订单失效,过期通知等功能都可以借助延迟消息机制来实现.本文将从源码层面来分析Rocketmq的延迟消息实现原理机制. 一.延迟消息的使用 ...
- RocketMQ延迟消息的代码实战及原理分析
RocketMQ简介 RocketMQ是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的.高可靠.万亿级容量.灵活可伸缩的消息发布与订阅服务. 它前身是MetaQ,是阿里基于Kafka ...
- RocketMQ延迟消息
延时消息即消息发送后并不立即对消费者可见,而是在用户指定的时间投递给消费者.比如我们现在发送一条延时30秒的消息,消息发送后立即发送给服务器,但是服务器在30秒后才将该消息交给消费者. RocketM ...
- 【RocketMQ】延迟消息(延迟队列)
文章目录 1. 什么是延迟消息 1.1 延时消息的使用场景 2. 示例 3. 原理 参考 1. 什么是延迟消息 发送消息后,消费者要等待一定的时间才能消费到该消息. RocketMQ 不支持任意时间自 ...
- RocketMq发送延迟消息
什么是延迟消息? 对于消息中间件来说,producer将消息发送到mq的服务器,但并不期望这条消息马上被消费,而是推迟到当前时间点之后的某个时间点后再投递到queue中让consumer进行消费,延迟 ...
- RocketMQ 任意时间维度的延迟消息(秒级)
RocketMQ 任意时间维度的延迟消息(秒级) 基于开源版本固定等级的延迟消息实现原理的基础上进行扩展,将所有维度的延迟消息封装成任务添加到时间轮上,通过时间轮固定周期的扫描,检测任务是否到执行时间 ...
- RocketMQ源码解析之延迟消息实现原理
原创不易,转载请注明出处 文章目录 前言 1.延时消息的demo 2.实现的原理 前言 今天要谈论的话题其实非常轻松,但是我们有些业务场景是离不开它的,其实说到延迟消息,不知道大家有没有想到它的业务场 ...
- RocketMQ源码分析之延迟消息
文章目录 前言 一.延迟消息 1.特点 2.使用场景 3.demo 二.发送延迟消息 三.broker端存储延迟消息 四.总结 1.延迟消息工作原理 2.延迟消息在消费者消费重试中的应用 前言 本篇文 ...
- rocketmq广播消息为什么不能重试_几分钟带你看懂“消息队列和RocketMQ”的入门总结
消息队列扫盲 消息队列顾名思义就是存放消息的队列,队列我就不解释了,别告诉我你连队列都不知道似啥吧? 所以问题并不是消息队列是什么,而是 消息队列为什么会出现?消息队列能用来干什么?用它来干这些事会带 ...
最新文章
- asp.net 窗体关闭事件_VBA代码将强制执行:你的窗体上必须显示最大、最小化按钮...
- 按单词逆序句子(含标点)
- 单片机搭建环境烧录方法_单片机仿真器的工作原理解析
- 比尔·盖茨,让骆驼穿过针眼的人
- 体现临床实际基线疾病活动度的早期RA患者中, 治疗起效时间对临床和放射学的影响...
- swing Ctrl+S 保存配置
- loadGrid layui
- MySQL数据库所有知识点最详细讲解,内置官方文档、个人理解、代码演示,自学MySQL必备文档(一)
- matlab语法归纳
- 能上天的代码? NASA 火星无人机飞行控制系统开源了
- python 拼音性命按照姓出现的次数排序_Python按姓氏排序字典列表
- 2019新个税如何计算
- 基础 八大疑问词+翻译
- org.hibernate.TransientObjectException:The given object has a null identifier
- Spring Cloud Alibaba
- 数据中台架构与技术选型
- run fsck manually
- UDP多播:一对多数据收发
- AngularJS中的谷歌地图开发
- OpenPortal V5认证计费系统说明文档
热门文章
- 最近几年电力行业信息化发展趋势和奥运对电力信息化的推动
- 6N65-ASEMI高品质MOS管6N65
- Linux 之ubuntu 文本阅读器打开text 乱码
- 图神经网络系列-知识图谱Neo4j-图神经网络案例实战
- 班旗怎么用软件设计,微信朋友圈投票软件[必看]如何制作
- 【Pixhawk】PX4源码控制逻辑详解(以UGV小车为例)
- Easyx-----c语言实现斗地主
- 「Adobe国际认证」Adobe Photoshop使用选框工具进行选择
- vue3+ts+element 简单的登陆案例 (一)
- 【Android UI】Paint Gradient 渐变渲染 ① ( LinearGradient 线性渐变渲染 | 设置渲染方向 | 设置渲染颜色 | 设置渲染模式 | MIRROR )