RocketMQ 延时消息的使用和延时级别的配置
1. 延时消息的使用场景
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
2. 延时消息的使用限制
// org/apache/rocketmq/store/config/MessageStoreConfig.javaprivate String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码 SendMessageProcessor.java
3. 配置延时级别
在服务器端(rocketmq-broker端)的属性配置文件中加入如下行:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
描述了各级别与延时时间的对应映射关系。
- 这个配置项配置了从1级开始,各级延时的时间,能够修改这个指定级别的延时时间;
- 时间单位支持:s、m、h、d,分别表示秒、分、时、天;
- 默认值就是上面声明的,可手工调整;
- 默认值已够用,不建议修改这个值。
4. 延时消息样例
4.1 启动消费者等待传入订阅消息
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;public class ScheduledMessageConsumer {public static void main(String[] args) throws Exception {// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");// 订阅Topicsconsumer.subscribe("TestTopic", "*");// 注册消息监听者consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {// Print approximate delay time periodSystem.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();}
}
4.2 发送延时消息
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;public class ScheduledMessageProducer {public static void main(String[] args) throws Exception {// 实例化一个生产者来产生延时消息DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");// 启动生产者producer.start();int totalMessagesToSend = 100;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)message.setDelayTimeLevel(3);// 发送消息producer.send(message);}// 关闭生产者producer.shutdown();}
}
4.3 Springboot 延时消息样例
- 发送延时消息
rocketmq.producer.topic.test=topic_test
rocketmq.producer.tag.simple=simple
rocketmq.producer.tag.delay=delay
/*** 发送延时消息** @param topic* @param orderAutoClose*/
public void sendDelayMsg(String topic, OrderAutoClose orderAutoClose) {Message<String> message = MessageBuilder.withPayload(JSON.toJSONString(orderAutoClose)).build();// private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";// 指定发送超时时间(毫秒)和延迟等级this.rocketMQTemplate.syncSend(topic, message, 1000, 3);
}
@Value("${rocketmq.producer.topic.test}")
private String topic;
@Value("${rocketmq.producer.tag.delay}")
private String delayTag;@Test
void sendDelayMsg() {OrderAutoClose orderAutoClose = new OrderAutoClose();orderAutoClose.setOrderNo("D202111231023141054334");orderAutoClose.setCreateTime(LocalDateTime.now());this.rocketmqSimpleProducer.sendDelayMsg(topic + ":" + delayTag, orderAutoClose);log.info("发送延时消息, end...");
}
- 消费者
rocketmq.name-server=192.168.0.24:9876
rocketmq.consumer.topic.test=topic_test
rocketmq.consumer.tag.delay=delay
rocketmq.consumer.group.delay=group_${spring.profiles.active}_delay
/*** 延时消息监听** @author gaoyang* @date 2021-12-08 10:04*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic.test}",selectorExpression = "${rocketmq.consumer.tag.delay}",consumerGroup = "${rocketmq.consumer.group.delay}")
public class RocketmqDelayListener implements RocketMQListener<String> {@Overridepublic void onMessage(String msg) {OrderAutoClose orderAutoClose = JSON.parseObject(msg, OrderAutoClose.class);log.info("RocketmqDelayListener - orderNo: {}, createTime: {}", orderAutoClose.getOrderNo(), orderAutoClose.getCreateTime());}
}
RocketMQ 延时消息的使用和延时级别的配置相关推荐
- RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息
文章目录 广播消息 广播消息概述 演示步骤 延时消息 概述 使用场景 延时机制 实现原理 示例 批量消息 批量消息概述 示例 代码 广播消息 广播消息概述 广播消息就是向所有用户发送消息. 如果我们希 ...
- rocketmq原理_消息中间件漫谈:RocketMQ延时消息应用及原理剖析
业务背景 延时任务是非常普遍的业务场景之一,即系统某一动作触发后,经过一定时间的延时后再触发其他一个或多个动作.以订单系统为例: 下单后10分钟未支付发送支付提醒 下单30分钟内未支付订单自动取消 业 ...
- 阿里云ONS / RocketMQ的定时消息 / 延时消息
考虑延时和定时消息,是因为遇到了一个业务场景: 前置任务完成时发送消息,但因为一些业务原因,不希望消息马上被消费,因此需要设置延时. 文章目录 几种解决思路 实现方案 ONS延迟消息 RocketMQ ...
- 消息中间件RocketMQ的延时消息和批量消息
文章目录 1. 延时消息 1.1 使用限制 1.2 示例 1.2.1 延时消息的生产者 1.2.2 延时消息的消费者 2. 批量消息 2.1 小于4MB的批量消息发送 2.2 大于4MB的批量消息发送 ...
- 计算发送延时与传播延迟_消息队列——延时消息应用解析及实践
简介:在大部分场景下业务系统如果只需要实现异步解耦.削峰填谷等能力,常规的普通消息就可以满足此类需求.除此之外,在某些特殊的业务场景中,普通消息类型存在无法满足需求的情况.这就需要消息队列服务本身支持 ...
- rocktmq 消息延时清空_使用Kotlin+RocketMQ实现延时消息的示例代码
一. 延时消息 延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 使用延时消息的典型场景,例如: 在电商系统中,用户下完订单30分钟内没支付, ...
- rocketmq延时消息自定义配置;topic下tag使用
概述 使用的是开源版本的rocketmq4.9.4 rocketmq也是支持延时消息的. rocketmq一般是4个部分: nameserver:保存路由信息 broker:保存消息 生产者:生产消息 ...
- springboot整合rocketMQ记录 实现发送普通消息,延时消息
一.为什么选择RocketMQ消息队列?(可跳过看三的整合代码实例) 首先RocketMQ是阿里巴巴自研出来的,也已开源.其性能和稳定性从双11就能看出来,借用阿里的一句官方介绍:历年双 11 购物狂 ...
- 构建企业级业务高可用的延时消息中台
来自:架构之美 1.业务场景剖析 公司业务系统(比如:电商系统)中有大量涉及定时任务的业务场景,例如:实现买卖双方在线沟通的IM系统,为了确保接收方能够收到消息,服务端一般都会有重试策略,即服务端在消 ...
最新文章
- 人工智能开始应用于美国金融业 但在中国遭遇滑铁卢
- python3.7.2安装-CentOS 7中Python3.7.2的安装
- tcpreplay工具安装使用
- 数字图像处理(一) 绪论
- 持续集成jenkins工具介绍(一)
- 客户管理软件系统源码
- RGB VGA显示时序
- 深度强化学习-策略梯度算法深入理解
- Javasctipt面试题整理
- linux安装nginx、php、mysql搭建网站
- UE4--用插件加载第三方库lib/dll(lsl)
- ISO26262功能安全 安全等级和量化指标
- c,c++代码格式规范
- 由于超过32位java限制_Java 32位Xmx vs java 64位Xmx
- 安徽大学计算机学院高亮,计算机学院关于智能计算的大规模优化学术报告圆满结束...
- superset设置起止时间为明天
- 深度学习01——入门基础 基于Python
- 寄存器寻址和寄存器间接寻址的区别
- 视觉SLAM十四讲第二章学习与课后题与随笔日记
- 【高并发专题】-高并发下前后端常用解决方案总结(全套)