1. 延时消息的使用场景

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

2. 延时消息的使用限制

// org/apache/rocketmq/store/config/MessageStoreConfig.javaprivate String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码 SendMessageProcessor.java

3. 配置延时级别

在服务器端(rocketmq-broker端)的属性配置文件中加入如下行:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

描述了各级别与延时时间的对应映射关系。

  1. 这个配置项配置了从1级开始,各级延时的时间,能够修改这个指定级别的延时时间;
  2. 时间单位支持:s、m、h、d,分别表示秒、分、时、天;
  3. 默认值就是上面声明的,可手工调整;
  4. 默认值已够用,不建议修改这个值。

4. 延时消息样例

4.1 启动消费者等待传入订阅消息


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;public class ScheduledMessageConsumer {public static void main(String[] args) throws Exception {// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");// 订阅Topicsconsumer.subscribe("TestTopic", "*");// 注册消息监听者consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {// Print approximate delay time periodSystem.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();}
}

4.2 发送延时消息


import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;public class ScheduledMessageProducer {public static void main(String[] args) throws Exception {// 实例化一个生产者来产生延时消息DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");// 启动生产者producer.start();int totalMessagesToSend = 100;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)message.setDelayTimeLevel(3);// 发送消息producer.send(message);}// 关闭生产者producer.shutdown();}
}

4.3 Springboot 延时消息样例

  1. 发送延时消息
rocketmq.producer.topic.test=topic_test
rocketmq.producer.tag.simple=simple
rocketmq.producer.tag.delay=delay
/*** 发送延时消息** @param topic* @param orderAutoClose*/
public void sendDelayMsg(String topic, OrderAutoClose orderAutoClose) {Message<String> message = MessageBuilder.withPayload(JSON.toJSONString(orderAutoClose)).build();// private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";// 指定发送超时时间(毫秒)和延迟等级this.rocketMQTemplate.syncSend(topic, message, 1000, 3);
}
@Value("${rocketmq.producer.topic.test}")
private String topic;
@Value("${rocketmq.producer.tag.delay}")
private String delayTag;@Test
void sendDelayMsg() {OrderAutoClose orderAutoClose = new OrderAutoClose();orderAutoClose.setOrderNo("D202111231023141054334");orderAutoClose.setCreateTime(LocalDateTime.now());this.rocketmqSimpleProducer.sendDelayMsg(topic + ":" + delayTag, orderAutoClose);log.info("发送延时消息, end...");
}
  1. 消费者
rocketmq.name-server=192.168.0.24:9876
rocketmq.consumer.topic.test=topic_test
rocketmq.consumer.tag.delay=delay
rocketmq.consumer.group.delay=group_${spring.profiles.active}_delay
/*** 延时消息监听** @author gaoyang* @date 2021-12-08 10:04*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic.test}",selectorExpression = "${rocketmq.consumer.tag.delay}",consumerGroup = "${rocketmq.consumer.group.delay}")
public class RocketmqDelayListener implements RocketMQListener<String> {@Overridepublic void onMessage(String msg) {OrderAutoClose orderAutoClose = JSON.parseObject(msg, OrderAutoClose.class);log.info("RocketmqDelayListener - orderNo: {}, createTime: {}", orderAutoClose.getOrderNo(), orderAutoClose.getCreateTime());}
}

RocketMQ 延时消息的使用和延时级别的配置相关推荐

  1. RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息

    文章目录 广播消息 广播消息概述 演示步骤 延时消息 概述 使用场景 延时机制 实现原理 示例 批量消息 批量消息概述 示例 代码 广播消息 广播消息概述 广播消息就是向所有用户发送消息. 如果我们希 ...

  2. rocketmq原理_消息中间件漫谈:RocketMQ延时消息应用及原理剖析

    业务背景 延时任务是非常普遍的业务场景之一,即系统某一动作触发后,经过一定时间的延时后再触发其他一个或多个动作.以订单系统为例: 下单后10分钟未支付发送支付提醒 下单30分钟内未支付订单自动取消 业 ...

  3. 阿里云ONS / RocketMQ的定时消息 / 延时消息

    考虑延时和定时消息,是因为遇到了一个业务场景: 前置任务完成时发送消息,但因为一些业务原因,不希望消息马上被消费,因此需要设置延时. 文章目录 几种解决思路 实现方案 ONS延迟消息 RocketMQ ...

  4. 消息中间件RocketMQ的延时消息和批量消息

    文章目录 1. 延时消息 1.1 使用限制 1.2 示例 1.2.1 延时消息的生产者 1.2.2 延时消息的消费者 2. 批量消息 2.1 小于4MB的批量消息发送 2.2 大于4MB的批量消息发送 ...

  5. 计算发送延时与传播延迟_消息队列——延时消息应用解析及实践

    简介:在大部分场景下业务系统如果只需要实现异步解耦.削峰填谷等能力,常规的普通消息就可以满足此类需求.除此之外,在某些特殊的业务场景中,普通消息类型存在无法满足需求的情况.这就需要消息队列服务本身支持 ...

  6. rocktmq 消息延时清空_使用Kotlin+RocketMQ实现延时消息的示例代码

    一. 延时消息 延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 使用延时消息的典型场景,例如: 在电商系统中,用户下完订单30分钟内没支付, ...

  7. rocketmq延时消息自定义配置;topic下tag使用

    概述 使用的是开源版本的rocketmq4.9.4 rocketmq也是支持延时消息的. rocketmq一般是4个部分: nameserver:保存路由信息 broker:保存消息 生产者:生产消息 ...

  8. springboot整合rocketMQ记录 实现发送普通消息,延时消息

    一.为什么选择RocketMQ消息队列?(可跳过看三的整合代码实例) 首先RocketMQ是阿里巴巴自研出来的,也已开源.其性能和稳定性从双11就能看出来,借用阿里的一句官方介绍:历年双 11 购物狂 ...

  9. 构建企业级业务高可用的延时消息中台

    来自:架构之美 1.业务场景剖析 公司业务系统(比如:电商系统)中有大量涉及定时任务的业务场景,例如:实现买卖双方在线沟通的IM系统,为了确保接收方能够收到消息,服务端一般都会有重试策略,即服务端在消 ...

最新文章

  1. 人工智能开始应用于美国金融业 但在中国遭遇滑铁卢
  2. python3.7.2安装-CentOS 7中Python3.7.2的安装
  3. tcpreplay工具安装使用
  4. 数字图像处理(一) 绪论
  5. 持续集成jenkins工具介绍(一)
  6. 客户管理软件系统源码
  7. RGB VGA显示时序
  8. 深度强化学习-策略梯度算法深入理解
  9. Javasctipt面试题整理
  10. linux安装nginx、php、mysql搭建网站
  11. UE4--用插件加载第三方库lib/dll(lsl)
  12. ISO26262功能安全 安全等级和量化指标
  13. c,c++代码格式规范
  14. 由于超过32位java限制_Java 32位Xmx vs java 64位Xmx
  15. 安徽大学计算机学院高亮,计算机学院关于智能计算的大规模优化学术报告圆满结束...
  16. superset设置起止时间为明天
  17. 深度学习01——入门基础 基于Python
  18. 寄存器寻址和寄存器间接寻址的区别
  19. 视觉SLAM十四讲第二章学习与课后题与随笔日记
  20. 【高并发专题】-高并发下前后端常用解决方案总结(全套)

热门文章

  1. 64位CentOS源码编译方式安装wine
  2. asp.net + mysql
  3. 字节跳动杯2018中国大学生程序设计竞赛-女生专场题解
  4. 《程序设计技术》第五章例程
  5. Online Judge for ACM-ICPC etc.
  6. UVA1368 UVALive3602 ZOJ3132 DNA Consensus String【贪心】
  7. IE、Chrome、Firefox 三大浏览器对比
  8. 编码 —— 差错检验
  9. macos 开发环境配置
  10. Matlab Tricks(十)—— padarray 的实现