一、延时消息的使用

使用比较简单,指定message的DelayTimeLevel即可。示例代码如下:

Message msg = new Message("DelayTopicTest","TagA",("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET) );

//设置延迟级别,注意这里的3不是代表延迟3s

msg.setDelayTimeLevel(3);

SendResult sendResult = producer.send(msg);

目前rockatmq支持的延迟时间有:

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

以上支持的延迟时间在msg.setDelayTimeLevel对应的级别依次是1,2,3。。。。

二、实现原理

延迟队列的核心思路

所有的延迟消息由producer发出之后,都会存放到同一个topic(SCHEDULE_TOPIC_XXXX)下,不同的延迟级别会对应不同的队列序号,当延迟时间到之后,由定时线程读取转换为普通的消息存的真实指定的topic下,此时对于consumer端此消息才可见,从而被consumer消费。

延迟消息存放的结构

consumequeue

├── SCHEDULE_TOPIC_XXXX

│ ├── 0

│ │ └── 00000000000000000000

│ ├── 1

│ │ └── 00000000000000000000

│ ├── 2

│ │ └── 00000000000000000000

│ ├── 3

│ │ └── 00000000000000000000

│ ├── 4

│ │ └── 00000000000000000000

.....

.....

├── DelayTopicTest

│ ├── 0

│ │ └── 00000000000000000000

│ ├── 1

│ │ └── 00000000000000000000

│ ├── 2

│ │ └── 00000000000000000000

│ └── 3

│ └── 00000000000000000000

其中不同的延迟级别放在不同的队列序号下(queueId=delayLevel-1)。每一个延迟级别对应的延迟消息转换为普通消息的位置标识存放在~/store/config/delayOffset.json文件内。

key为对应的延迟级别,value对应不同延迟级别转换为普通消息的offset值。

{

"offsetTable":{1:1,2:1,3:11,4:1,5:1,6:1,7:1,8:1,9:1,10:1,11:1,12:1,13:1,14:1,15:0,16:0,17:0,18:0}

}

三、源码分析

入口:ScheduleMessageService.start

broker启动的时候会调用此方法。

通过jdk自带的Timer类开启一个timer定时器,在这个timer类添加了多个TimeTask。其中不同的延迟级别都对应DeliverDelayedMessageTimerTask的不同实例。

TimeTask分为两类:DeliverDelayedMessageTimerTask(每秒执行1次)和 ScheduleMessageService.this.persist()(每10秒是执行一次)

每一个延迟级别对应一个offset,这个offset是干嘛的呢?(先抛结论:这个offset的值代表每个级别的延迟队列已经转换为普通消息的位置)

public void start() {

//1. 根据支持的各种延迟级别,添加不同延迟时间的TimeTask

for (Map.Entry entry : this.delayLevelTable.entrySet()) {

Integer level = entry.getKey();

Long timeDelay = entry.getValue();

//每一个延迟级别对应已经读取为普通消息的offset值

Long offset = this.offsetTable.get(level);

if (null == offset) {

offset = 0L;

}

if (timeDelay != null) {

this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), 1000L);

}

}

//2. 添加一个10s执行一次的TimeTask

this.timer.scheduleAtFixedRate(new TimerTask() {

@Override

public void run() {

try {

ScheduleMessageService.this.persist();

} catch (Throwable e) {

log.error("scheduleAtFixedRate flush exception", e);

}

}

}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());

}

两类TimeTask的作用

DeliverDelayedMessageTimerTask

扫描延迟消息队列(SCHEDULE_TOPIC_XXXX)的消息,将该延迟消息转换为指定的topic的消息。

核心代码:ScheduleMessageService.executeOnTimeup

读取不同延迟级别对应的延迟消息

取得对应延迟级别读取的开始位置offset

将延迟消息转换为指定topic的普通消息并存放起来

修改下一次读取的offset值(修改的只是缓存),并指定下一次转换延迟消息的timetask

ScheduleMessageService.this.persist()

将延迟队列扫描处理的进度offset持久化到delayOffset.json文件中

public void executeOnTimeup() {

//读取队列SCHEDULE_TOPIC_XXXX,其中不同的延迟级别对应不同的队列id(queueId=delayLevel-1)

ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(“SCHEDULE_TOPIC_XXXX”,delayLevel2QueueId(delayLevel));

long failScheduleOffset = offset;

if (cq != null) {

SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);

if (bufferCQ != null) {

try {

long nextOffset = offset;

int i = 0;

ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();

//循环读取延迟消息

for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {

long offsetPy = bufferCQ.getByteBuffer().getLong();

nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

long countdown = this.correctDeliverTimestamp(now, tagsCode) - now;

//只有当延迟消息发送的时间在当前时间之前才处理,否则此消息应该延迟后再处理

if (countdown <= 0) {

//根据offset值读取SCHEDULE_TOPIC_XXXX队列的消息

MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);

if (msgExt != null) {

try {

//将读取的消息转换为真实topic的消息(也就是普通消息)

MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);

//存放此消息

PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner);

} catch (Exception e) {

}

}

} else {

ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),countdown);

ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);

return;

}

}

//计算下一次读取延迟队列的offset,是定时任务下一次从该位置读取延时消息

nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);

//将下一次读取延迟队列的offset存放到一个缓存map中

ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);

return;

}

}

else {

long cqMinOffset = cq.getMinOffsetInQueue();

if (offset < cqMinOffset) {

failScheduleOffset = cqMinOffset;

}

}

}

ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,failScheduleOffset), DELAY_FOR_A_WHILE);

}

public synchronized void persist() {

//读取offsetTable缓存的延迟队列的值

String jsonString = this.encode(true);

if (jsonString != null) {

//读取delayOffset.json的文件地址

String fileName = this.configFilePath();

try {

//持久化到delayOffset.json文件中

MixAll.string2File(jsonString, fileName);

} catch (IOException e) {

log.error("persist file " + fileName + " exception", e);

}

}

}

四、总结

通过源码分析我们其实明白了,延迟消息相比普通消息只不过是在broker多了一层消息topic的转换,对于消息的发送和消费和普通消息没有什么差异。

但这里有一点要注意

RocketMQ的延迟消息本身有一个很大的缺点,熟悉java自带的Timer类的小伙伴应该知道一个timer对应只有一个线程,然后来处理不同的timeTask,而RockerMQ本身也确实只new了一个Timer,也就是说当同时发送的延迟消息过多的时候一个线程处理速度一定是有瓶颈的,因此在实际项目中使用延迟消息一定不要过多依赖,只能作为一个辅助手段。

rocktmq 消息延时清空_RocketMQ-延时消息相关推荐

  1. rocktmq 消息延时清空_使用Kotlin+RocketMQ实现延时消息的示例代码

    一. 延时消息 延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 使用延时消息的典型场景,例如: 在电商系统中,用户下完订单30分钟内没支付, ...

  2. 计算发送延时与传播延迟_消息队列——延时消息应用解析及实践

    简介:在大部分场景下业务系统如果只需要实现异步解耦.削峰填谷等能力,常规的普通消息就可以满足此类需求.除此之外,在某些特殊的业务场景中,普通消息类型存在无法满足需求的情况.这就需要消息队列服务本身支持 ...

  3. 消息队列面试 - 如何解决消息队列的延时以及过期失效问题?

    消息队列面试 - 如何解决消息队列的延时以及过期失效问题? 面试题 如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决? 面试官心理分析 你看 ...

  4. RocketMQ 延时消息的使用和延时级别的配置

    1. 延时消息的使用场景 比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存. 2. 延时消息的使用限制 // org/apache/roc ...

  5. 分享笔记RabbitMQ高级之消息限流与延时队列

    楔子 本篇是消息队列RabbitMQ的第五弹. 上篇本来打算讲述RabbitMQ的一些高级用法: 如何保证消息的可靠性? 消息队列如何进行限流? 如何设置延时队列进行延时消费? 最终因为篇幅缘故,上篇 ...

  6. 微信app清空群聊天消息的方法

    微信app是一款非常好用的社交软件,这款软件给用户提供了很多实用的功能,包括聊天功能.生活缴费.朋友圈等,极大的方便了我们的生活.我们在使用这款软件的时候,常常会加入一些群聊,并且在群中进行聊天,这样 ...

  7. tcp前4字节消息长度_RocketMQ的消息存储格式

    总体代码 我们可以通过阅读RocketMQ的消息存储代码来了解RocketMQ的消息存储格式,消息的存储入口是DefaultMessageStore,我们可以通过DefaultMessageStore ...

  8. mq日志怎么看_RocketMQ的消息是怎么丢失的

    前言 通过之前文章的阅读,有关RocketMQ的底层原理相信小伙伴们已经有了一个比较清晰的认识. 那么接下来王子想跟大家讨论一个话题,如果我们的项目中引入了MQ,势必要面对的一个问题,就是消息丢失问题 ...

  9. rocketmq 消息 自定义_RocketMQ的消息发送及消费

    RocketMQ消息支持的模式: 消息支持的模式分为三种:NormalProducer(普通同步),消息异步发送,OneWay. 消息同步发送: 普通消息的发送和接收在前面已经演示过了,在前面的案例中 ...

最新文章

  1. python第三方包安装方法(两种方法)
  2. Android 6.0 Changes
  3. JavaSE(八)——StringBuffer类、Arrays类、数组排序
  4. ubuntu16.04 远程控制win10
  5. 位图索引,数据库索引浅浅的学习
  6. java 检测 类型_[Java教程]javascript类型与类型检测
  7. python枚举函数_python dict函数枚举对象
  8. C#/.Net 的托管堆和垃圾回收
  9. 淘宝双11促销背后高并发处理之淘宝网采用什么技术架构来实现网站高负载
  10. 速腾(RoboSense)16线激光雷达调试出点云图(Ubuntu1804和windows系统都已经显示点云),包含各种遇见的坑【避坑指南】{[driver][socket]Rslidar poll}
  11. 混合罚函数c语言程序,混合惩罚函数法.ppt
  12. 性能退化评估 matlab,LED驱动电源性能退化参数监测及寿命预测方法研究
  13. Tensorrt7踩坑记录
  14. 挚文集团2021年Q3净营收37.592亿元 环比增长2.4%
  15. 使用PS给PDF文件加水印
  16. kali linux 通过粘贴板攻击对方服务器
  17. Proxmox VE
  18. 三年级语文计算机之父教学反思,三年级语文教学反思15篇
  19. 项目一之绘制小王八爬行
  20. 第四范式上市更进一步:再募资7亿美元,AI独角兽们陷亏损泥潭

热门文章

  1. 【windows7】解决IIS 80端口占用问题(亲测)
  2. 前端入门--解决问题的一些方法
  3. PagerHelper-分页类
  4. 014箱子开合并移动
  5. 看看你能认出多少种编程语言
  6. qqkey获取原理_获取QQKEY源码[C++版]
  7. php openssl des ecb,PHP7 OpenSSL DES-EDE-CBC加解密
  8. matlab如何响两声,matlab发出声音
  9. 聚类(上)K-mean算法
  10. 数据分析从零开始,新手小白如何入门?