文章目录

  • 1. 延时消息
    • 1.1 使用限制
    • 1.2 示例
      • 1.2.1 延时消息的生产者
      • 1.2.2 延时消息的消费者
  • 2. 批量消息
    • 2.1 小于4MB的批量消息发送
    • 2.2 大于4MB的批量消息发送
    • 2.3 消费者代码

1. 延时消息

延时消息是指消费者延时消费,比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

它的实现与普通消息的发送和消费没多大区别,只多了一句话:

message.setDelayTimeLevel()

1.1 使用限制

现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

1.2 示例

1.2.1 延时消息的生产者

public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {// 创建 producer 实例,并设置生产者组名DefaultMQProducer producer = new DefaultMQProducer("delayGroup");// 设置 NameServer 地址producer.setNamesrvAddr("127.0.0.1:9876");// 启动生产者producer.start();for (int i = 0; i < 3; i++) {Message message = new Message("delayTopic", "delayTag", ("Hello Delay message-" + i).getBytes());// 设置延时等级2,这个消息将在5s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)message.setDelayTimeLevel(2);// 发送消息SendResult sendResult = producer.send(message);// 打印结果System.out.println(String.format("SendResult status:%s, queueId:%d",sendResult.getSendStatus(),sendResult.getMessageQueue().getQueueId()));}// 关闭生产者producer.shutdown();
}

打印结果:

SendResult status:SEND_OK, queueId:2
SendResult status:SEND_OK, queueId:3
SendResult status:SEND_OK, queueId:0

1.2.2 延时消息的消费者

public static void main(String[] args) throws MQClientException {// 实例化消费者,并设置组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delayGroup");// 设置 NameServer 地址consumer.setNamesrvAddr("127.0.0.1:9876");// 订阅 Topicconsumer.subscribe("delayTopic", "delayTag");// 注册消息监听者consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {// 打印消息内容和延时时间System.out.println("Receive message[msgBody=" + new String(message.getBody()) + "]");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();
}

在5秒钟之后就会收到延时消息,打印结果:

Receive message[msgBody=Hello Delay message-0]
Receive message[msgBody=Hello Delay message-1]
Receive message[msgBody=Hello Delay message-2]

2. 批量消息

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的 topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB

2.1 小于4MB的批量消息发送

public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {// 创建 producer 实例,并设置生产者组名DefaultMQProducer producer = new DefaultMQProducer("batchGroup");// 设置 NameServer 地址producer.setNamesrvAddr("127.0.0.1:9876");// 启动生产者producer.start();// 创建批量消息List<Message> messageList = new ArrayList<>();Message message1 = new Message("batchTopic", "batchTag", ("Hello Batch message-1").getBytes());Message message2 = new Message("batchTopic", "batchTag", ("Hello Batch message-2").getBytes());Message message3 = new Message("batchTopic", "batchTag", ("Hello Batch message-3").getBytes());messageList.add(message1);messageList.add(message2);messageList.add(message3);// 发送批量消息SendResult sendResult = producer.send(messageList);// 打印结果System.out.println(String.format("SendResult status:%s, queueId:%d",sendResult.getSendStatus(),sendResult.getMessageQueue().getQueueId()));// 关闭生产者producer.shutdown();
}

2.2 大于4MB的批量消息发送

要发送大于4MB的消息,我们只需要把大的消息分裂成若干个小的消息。首先创建一个拆分消息的工具类

public class ListSplitter implements Iterator<List<Message>> {private final int SIZE_LIMIT = 1024 * 1024 * 4;private final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Overridepublic boolean hasNext() {return currIndex < messages.size();}@Overridepublic List<Message> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex);int tmpSize = message.getTopic().length() + message.getBody().length;Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();}// 增加日志的开销20字节tmpSize = tmpSize + 20;if (tmpSize > SIZE_LIMIT) {//单个消息超过了最大的限制//忽略,否则会阻塞分裂的进程if (nextIndex - currIndex == 0) {//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环nextIndex++;}break;}if (tmpSize + totalSize > SIZE_LIMIT) {break;} else {totalSize += tmpSize;}}List<Message> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}
}

然后在实现发送消息,和上面的区别在于在发送消息前,使用了工具类分隔消息

public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {// 创建 producer 实例,并设置生产者组名DefaultMQProducer producer = new DefaultMQProducer("batchGroup");// 设置 NameServer 地址producer.setNamesrvAddr("127.0.0.1:9876");// 启动生产者producer.start();// 创建批量消息List<Message> messageList = new ArrayList<>();Message message1 = new Message("batchTopic", "batchTag", ("Hello Batch message-1").getBytes());Message message2 = new Message("batchTopic", "batchTag", ("Hello Batch message-2").getBytes());Message message3 = new Message("batchTopic", "batchTag", ("Hello Batch message-3").getBytes());messageList.add(message1);messageList.add(message2);messageList.add(message3);//发送批量消息:把大的消息分裂成若干个小的消息ListSplitter splitter = new ListSplitter(messageList);while (splitter.hasNext()) {try {List<Message> listItem = splitter.next();SendResult sendResult = producer.send(listItem);System.out.println(String.format("SendResult status:%s, queueId:%d",sendResult.getSendStatus(),sendResult.getMessageQueue().getQueueId()));} catch (Exception e) {e.printStackTrace();//处理error}}// 关闭生产者producer.shutdown();
}

2.3 消费者代码

消费者代码与普通消费者代码实现一样

public static void main(String[] args) throws MQClientException {// 实例化消费者,并设置组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batchGroup");// 设置 NameServer 地址consumer.setNamesrvAddr("127.0.0.1:9876");// 订阅 Topicconsumer.subscribe("batchTopic", "batchTag");// 注册消息监听者consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {// 打印消息内容System.out.println("Receive message[msgBody=" + new String(message.getBody()) + "] ");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();
}

技 术 无 他, 唯 有 熟 尔。
知 其 然, 也 知 其 所 以 然。
踏 实 一 些, 不 要 着 急, 你 想 要 的 岁 月 都 会 给 你。


消息中间件RocketMQ的延时消息和批量消息相关推荐

  1. springboot+rocketmq(5):实现批量消息

    一.概述 1.批量发送消息: 批量发送消息能显著提高传递小消息的性能.限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息.此外,这一批消息的总大小不应 ...

  2. RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息

    文章目录 广播消息 广播消息概述 演示步骤 延时消息 概述 使用场景 延时机制 实现原理 示例 批量消息 批量消息概述 示例 代码 广播消息 广播消息概述 广播消息就是向所有用户发送消息. 如果我们希 ...

  3. 跟我学RocketMQ之批量消息发送源码解析

    上篇文章 跟我学RocketMQ之消息发送源码解析 中,我们已经对普通消息的发送流程进行了详细的解释,但是由于篇幅问题没有展开讲解批量消息的发送.本文中,我们就一起来集中分析一下批量消息的发送是怎样的 ...

  4. RocketMQ 消息结构和消息类型

    发送消息的一方称为生产者,负责生产消息,一般由业务系统负责生产消息.一个消息生产者会把业务应用系统里产生的消息发送到 broker 服务器.RocketMQ 提供多种发送方式,同步发送.异步发送.顺序 ...

  5. 消息中间件RocketMQ

    消息中间件RocketMQ   RocketMQ 是阿里巴巴开源的分布式消息中间件.支持事务消息.顺序消息.批量消息.延时消息.消息回溯等.它里面有几个区别于标准消息中件间的概念,如Group.Top ...

  6. rocktmq 消息延时清空_使用Kotlin+RocketMQ实现延时消息的示例代码

    一. 延时消息 延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 使用延时消息的典型场景,例如: 在电商系统中,用户下完订单30分钟内没支付, ...

  7. 消息中间件:RocketMQ 介绍(特性、术语、原理、优缺点、消息顺序、消息重复)

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家.点击跳转到教程. 消息中间件的作用 1. 应用解耦 2. 异步处理 比如用户注册场景,注册主流程完成以后,需要调用邮件 ...

  8. 消息中间件学习总结(16)——17 个方面,综合对比 Kafka、RabbitMQ、RocketMQ、ActiveMQ 四个分布式消息队列

    本文将从,Kafka.RabbitMQ.ZeroMQ.RocketMQ.ActiveMQ 17 个方面综合对比作为消息队列使用时的差异. 一.资料文档 Kafka:中.有kafka作者自己写的书,网上 ...

  9. [RocketMQ]消息中间件—RocketMQ消息消费(一)

    2019独角兽企业重金招聘Python工程师标准>>> 文章摘要:在发送消息给RocketMQ后,消费者需要消费.消息的消费比发送要复杂一些,那么RocketMQ是如何来做的呢? 在 ...

最新文章

  1. 背景色透明,里面内容(图片、文字)不透明
  2. mac securecrt无法记住密码的解决方法
  3. Single-Shot Calibration:基于全景基础设施的多相机和多激光雷达之间的外参标定(ICRA2021)...
  4. 研发项目如何配置看板的任务流转
  5. 无源蜂鸣器c语言编程,无源蜂鸣器鸣叫
  6. word文档怎么批量解除锁定_word文档怎么解除锁定
  7. Arduino+SIM900A+继电器
  8. 金蝶套打文件放服务器还是本地,金蝶软件套打使用说明
  9. 认识常见的显卡外接口
  10. C/C++基础题045.PUM
  11. 精心整理!最全的100个Python精选库,建议收藏!
  12. “一切皆是映射” (光剑)
  13. CentOS上使用docker安装redis
  14. GCTA学习3 | GCTA的两篇NG:fast-LMM和fast-GLMM
  15. 非投机性的 Web3 用例
  16. NOJ - 2070 马尔扎哈的疑惑
  17. 字节算法题--N阶台阶,每次走一步或两步,计算共有多少种走法,并将每种走法打印出来。
  18. Sentinel 原理:滑动窗口
  19. IDEA安装lombok插件踩坑记录
  20. SQL中order by里面可以加条件

热门文章

  1. php提示Notice: Undefined index解决方法
  2. @NotNull 、@NotBlank、@NotEmpty区别和使用
  3. 霸王条款+管理不善:评深圳欢乐谷2005的经营诟病
  4. 新店速递|白玉兰(商务)酒店福州火车站西湖长冠店 正式上线
  5. 大学生考勤系统C语言代码,C++学生考勤系统(含源代码)
  6. matlab 求概率密度,MATLAB如何使用pdf函数计算指定分布的概率密度函数
  7. 思迈特软件Smartbi完成C轮融资,推动国产BI加速进入智能化时代
  8. clickhouse jdbc报错:Too many partitions for single INSERT block (more than 100)
  9. 【GPU基础】GPU状态监测 nvidia-smi 命令详解
  10. 做一个python的旅游系统_Python爬取13个旅游城市,告诉你新年大家最爱去哪玩?...