目录

一、什么是顺序消息?

二、顺序消息的原理

三、全局顺序消息

四、局部顺序消息

五、顺序消息缺陷


一、什么是顺序消息?

消息有序指的是,消费者端消费消息时,需按照消息的发送顺序来消费,即先发送的消息,需要先消费(FIFO)。

举个容易理解的例子:通常创建订单后,会经历一系列的操作:【订单创建 -> 订单支付 -> 订单发货 -> 订单配送 -> 订单完成】。在创建完订单后,会发送五条消息到MQ Broker中,消费的时候要按照【订单创建 -> 订单支付 -> 订单发货 -> 订单配送 -> 订单完成】这个顺序去消费,这样的订单才是有效的。

RocketMQ采用局部顺序一致性的机制,实现了单个队列中消息的有序性,使用FIFO顺序提供有序消息。简而言之,我们的消息要保证有序,就必须把一组消息存放在同一个队列,然后由Consumer进行逐一消费。

但是如果碰到高并发的情况,消息不就会阻塞了吗?

RocketMQ给的解决方案是按照业务去划分不同的队列,然后并行消费,提高消息的处理速度的同时避免消息堆积。

RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

  • 全局有序:全局顺序时使用一个queue;
  • 分区有序:局部顺序时多个queue并行消费;

二、顺序消息的原理

在默认的情况下,消息发送会采取Round Robin轮询方式把消息发送到不同的queue;而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序的。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

三、全局顺序消息

前面介绍到,全局顺序消息的话,我们需要将所有消息都发送到同一个队列,然后消费者端也订阅同一个队列,这样就能实现顺序消费消息的功能。下面通过一个示例说明如何实现全局顺序消息。

  • (1)、生产者发送消息
public class OrderMQProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException, ExecutionException {// 创建DefaultMQProducer类并设定生产者名称DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test");// 设置NameServer地址,如果是集群的话,使用分号;分隔开mqProducer.setNamesrvAddr("10.0.90.86:9876");// 启动消息生产者mqProducer.start();for (int i = 0; i < 5; i++) {// 创建消息,并指定Topic(主题),Tag(标签)和消息内容Message message = new Message("GLOBAL_ORDER_TOPIC", "", ("全局有序消息" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 实现MessageQueueSelector,重写select方法,保证消息都进入同一个队列// send方法的第一个参数: 需要发送的消息Message// send方法的第二个参数: 消息队列选择器MessageQueueSelector// send方法的第三个参数: 消息将要进入的队列下标,这里我们指定消息都发送到下标为1的队列SendResult sendResult = mqProducer.send(message, new MessageQueueSelector() {@Override// select方法第一个参数: 指该Topic下有的队列集合// 第二个参数: 发送的消息// 第三个参数: 消息将要进入的队列下标,它与send方法的第三个参数相同public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {return mqs.get((Integer) arg);}}, 1);System.out.println("sendResult = " + sendResult);}// 如果不再发送消息,关闭Producer实例mqProducer.shutdown();}}
  • (2)、消费者消费消息
public class OrderMQConsumer {public static void main(String[] args) throws MQClientException {// 创建DefaultMQPushConsumer类并设定消费者名称DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test");// 设置NameServer地址,如果是集群的话,使用分号;分隔开mqPushConsumer.setNamesrvAddr("10.0.90.86:9876");// 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费// 如果不是第一次启动,那么按照上次消费的位置继续消费mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息,如果订阅该主题下的所有tag,则使用*mqPushConsumer.subscribe("GLOBAL_ORDER_TOPIC", "*");/*** 与普通消费一样需要注册消息监听器,但是传入的不再是MessageListenerConcurrently* 而是需要传入MessageListenerOrderly的实现子类,并重写consumeMessage方法。*/// 顺序消费同一个队列的消息mqPushConsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(false);for (MessageExt msg : msgs) {System.out.println("消费线程=" + Thread.currentThread().getName() +", queueId=" + msg.getQueueId() + ", 消息内容:" + new String(msg.getBody()));}// 标记该消息已经被成功消费return ConsumeOrderlyStatus.SUCCESS;}});// 启动消费者实例mqPushConsumer.start();}
}
  • (3)、启动生产者

如下,可看到,消息成功发送到Broker中,并且可以看到,5条消息选择的queueId都是1。

sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E009213C818B4AAC28EAB71A70000, offsetMsgId=0A005A5600002A9F00000000000076AB, messageQueue=MessageQueue [topic=GLOBAL_ORDER_TOPIC, brokerName=broker-a, queueId=1], queueOffset=0]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E009213C818B4AAC28EAB71D20001, offsetMsgId=0A005A5600002A9F000000000000776B, messageQueue=MessageQueue [topic=GLOBAL_ORDER_TOPIC, brokerName=broker-a, queueId=1], queueOffset=1]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E009213C818B4AAC28EAB71D80002, offsetMsgId=0A005A5600002A9F000000000000782B, messageQueue=MessageQueue [topic=GLOBAL_ORDER_TOPIC, brokerName=broker-a, queueId=1], queueOffset=2]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E009213C818B4AAC28EAB71DE0003, offsetMsgId=0A005A5600002A9F00000000000078EB, messageQueue=MessageQueue [topic=GLOBAL_ORDER_TOPIC, brokerName=broker-a, queueId=1], queueOffset=3]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E009213C818B4AAC28EAB71E60004, offsetMsgId=0A005A5600002A9F00000000000079AB, messageQueue=MessageQueue [topic=GLOBAL_ORDER_TOPIC, brokerName=broker-a, queueId=1], queueOffset=4]
  • (4)、启动消费者

如下,可看到,消费者也是按照发送消息的顺序消费消息。

消费线程=ConsumeMessageThread_1, queueId=1, 消息内容:全局有序消息0
消费线程=ConsumeMessageThread_1, queueId=1, 消息内容:全局有序消息1
消费线程=ConsumeMessageThread_1, queueId=1, 消息内容:全局有序消息2
消费线程=ConsumeMessageThread_1, queueId=1, 消息内容:全局有序消息3
消费线程=ConsumeMessageThread_1, queueId=1, 消息内容:全局有序消息4

四、局部顺序消息

下面用订单进行分区有序的示例。一个订单创建完成后,订单的状态流转大概是:【订单创建 -> 订单支付 -> 订单完成】,我们在创建MessageQueueSelector消息队列选择器的时候,需要根据业务唯一标识自定义队列选择算法,如本例中则可以使用orderId订单号去选择队列。这样的话,订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

大体过程如下图:

  • (1)、生产者发送消息
public class OrderMQProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException, ExecutionException {// 创建DefaultMQProducer类并设定生产者名称DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test");// 设置NameServer地址,如果是集群的话,使用分号;分隔开mqProducer.setNamesrvAddr("10.0.90.86:9876");// 启动消息生产者mqProducer.start();List<Order> orderList = getOrderList();for (int i = 0; i < orderList.size(); i++) {String body = "【" + orderList.get(i) + "】订单状态变更消息";// 创建消息,并指定Topic(主题),Tag(标签)和消息内容Message msg = new Message("ORDER_STATUS_CHANGE", "", body.getBytes(RemotingHelper.DEFAULT_CHARSET));// MessageQueueSelector: 消息队列选择器,根据业务唯一标识自定义队列选择算法/*** msg:消息对象* selector:消息队列的选择器* arg:选择队列的业务标识,如本例中的orderId*/SendResult sendResult = mqProducer.send(msg, new MessageQueueSelector() {/*** @param mqs 队列集合* @param msg 消息对象* @param arg 业务标识的参数,对应send()方法传入的第三个参数arg* @return*/@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {//根据arg(实际上是订单id)选择消息发送的队列long index = (Long) arg % mqs.size();return mqs.get((int) index);}//mqProducer.send()方法第三个参数, 会传递到select()方法的arg参数}, orderList.get(i).getOrderId());System.out.println(String.format("消息发送状态:%s, orderId:%s, queueId:%d, body:%s",sendResult.getSendStatus(),orderList.get(i).getOrderId(),sendResult.getMessageQueue().getQueueId(),body));}// 如果不再发送消息,关闭Producer实例mqProducer.shutdown();}/*** 订单状态变更流程: ORDER_CREATE(订单创建) -> ORDER_PAYED(订单已支付) -> ORDER_COMPLETE(订单完成)*/public static List<Order> getOrderList() {List<Order> orderList = new ArrayList<>();Order orderDemo = new Order();orderDemo.setOrderId(1L);orderDemo.setOrderStatus("ORDER_CREATE");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(2L);orderDemo.setOrderStatus("ORDER_CREATE");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(1L);orderDemo.setOrderStatus("ORDER_PAYED");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(2L);orderDemo.setOrderStatus("ORDER_PAYED");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(2L);orderDemo.setOrderStatus("ORDER_COMPLETE");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(3L);orderDemo.setOrderStatus("ORDER_CREATE");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(4L);orderDemo.setOrderStatus("ORDER_CREATE");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(3L);orderDemo.setOrderStatus("ORDER_PAYED");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(1L);orderDemo.setOrderStatus("ORDER_COMPLETE");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(3L);orderDemo.setOrderStatus("ORDER_COMPLETE");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(4L);orderDemo.setOrderStatus("ORDER_PAYED");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(4L);orderDemo.setOrderStatus("ORDER_COMPLETE");orderList.add(orderDemo);return orderList;}}public class Order implements Serializable {/*** 订单ID*/private Long orderId;/*** 订单状态*/private String orderStatus;public Long getOrderId() {return orderId;}public void setOrderId(Long orderId) {this.orderId = orderId;}public String getOrderStatus() {return orderStatus;}public void setOrderStatus(String orderStatus) {this.orderStatus = orderStatus;}@Overridepublic String toString() {return "Order{" +"orderId=" + orderId +", orderStatus='" + orderStatus + '\'' +'}';}
}
  • (2)、消费者消费消息
public class OrderMQConsumer {public static void main(String[] args) throws MQClientException {// 创建DefaultMQPushConsumer类并设定消费者名称DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test");// 设置NameServer地址,如果是集群的话,使用分号;分隔开mqPushConsumer.setNamesrvAddr("10.0.90.86:9876");// 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费// 如果不是第一次启动,那么按照上次消费的位置继续消费mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息,如果订阅该主题下的所有tag,则使用*mqPushConsumer.subscribe("ORDER_STATUS_CHANGE", "*");// 注册回调实现类来处理从broker拉取回来的消息// 注意:顺序消息注册的是MessageListenerOrderly监听器mqPushConsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext consumeOrderlyContext) {consumeOrderlyContext.setAutoCommit(true);for (MessageExt msg : msgList) {// 每个queue有唯一的consume线程来消费, 订单对每个queue都是分区有序System.out.println("消费线程=" + Thread.currentThread().getName() +", queueId=" + msg.getQueueId() + ", 消息内容:" + new String(msg.getBody()));}try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}// 标记该消息已经被成功消费return ConsumeOrderlyStatus.SUCCESS;}});// 启动消费者实例mqPushConsumer.start();}
}
  • (3)、启动生产者
消息发送状态:SEND_OK, orderId:1, queueId:1, body:【Order{orderId=1, orderStatus='ORDER_CREATE'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:2, queueId:2, body:【Order{orderId=2, orderStatus='ORDER_CREATE'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:1, queueId:1, body:【Order{orderId=1, orderStatus='ORDER_PAYED'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:2, queueId:2, body:【Order{orderId=2, orderStatus='ORDER_PAYED'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:2, queueId:2, body:【Order{orderId=2, orderStatus='ORDER_COMPLETE'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:3, queueId:3, body:【Order{orderId=3, orderStatus='ORDER_CREATE'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:4, queueId:0, body:【Order{orderId=4, orderStatus='ORDER_CREATE'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:3, queueId:3, body:【Order{orderId=3, orderStatus='ORDER_PAYED'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:1, queueId:1, body:【Order{orderId=1, orderStatus='ORDER_COMPLETE'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:3, queueId:3, body:【Order{orderId=3, orderStatus='ORDER_COMPLETE'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:4, queueId:0, body:【Order{orderId=4, orderStatus='ORDER_PAYED'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:4, queueId:0, body:【Order{orderId=4, orderStatus='ORDER_COMPLETE'}】订单状态变更消息
  • (4)、启动消费者
消费线程=ConsumeMessageThread_2, queueId=1, 消息内容:【Order{orderId=1, orderStatus='ORDER_CREATE'}】订单状态变更消息
消费线程=ConsumeMessageThread_1, queueId=2, 消息内容:【Order{orderId=2, orderStatus='ORDER_CREATE'}】订单状态变更消息
消费线程=ConsumeMessageThread_3, queueId=3, 消息内容:【Order{orderId=3, orderStatus='ORDER_CREATE'}】订单状态变更消息
消费线程=ConsumeMessageThread_4, queueId=0, 消息内容:【Order{orderId=4, orderStatus='ORDER_CREATE'}】订单状态变更消息
消费线程=ConsumeMessageThread_2, queueId=1, 消息内容:【Order{orderId=1, orderStatus='ORDER_PAYED'}】订单状态变更消息
消费线程=ConsumeMessageThread_1, queueId=2, 消息内容:【Order{orderId=2, orderStatus='ORDER_PAYED'}】订单状态变更消息
消费线程=ConsumeMessageThread_3, queueId=3, 消息内容:【Order{orderId=3, orderStatus='ORDER_PAYED'}】订单状态变更消息
消费线程=ConsumeMessageThread_4, queueId=0, 消息内容:【Order{orderId=4, orderStatus='ORDER_PAYED'}】订单状态变更消息
消费线程=ConsumeMessageThread_1, queueId=2, 消息内容:【Order{orderId=2, orderStatus='ORDER_COMPLETE'}】订单状态变更消息
消费线程=ConsumeMessageThread_3, queueId=3, 消息内容:【Order{orderId=3, orderStatus='ORDER_COMPLETE'}】订单状态变更消息
消费线程=ConsumeMessageThread_2, queueId=1, 消息内容:【Order{orderId=1, orderStatus='ORDER_COMPLETE'}】订单状态变更消息
消费线程=ConsumeMessageThread_4, queueId=0, 消息内容:【Order{orderId=4, orderStatus='ORDER_COMPLETE'}】订单状态变更消息

从上面的结果,我们可以看出来实现了分区有序,即一个线程只完成唯一标识的订单消息。

五、顺序消息缺陷

  1. 消费顺序消息的并行度依赖于队列的数量 ;
  2. 队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题;
  3. 遇到消息失败的消息,无法跳过,当前队列消费暂停;

RocketMQ(九)RocketMQ顺序消息相关推荐

  1. rocketmq 重复消费_消息队列 RocketMQ

    引言 本文整理了RocketMQ的相关知识,方便以后查阅. 功能介绍 简单来说,消息队列就是基础数据结构课程里"先进先出"的一种数据结构,但是如果要消除单点故障,保证消息传输的可靠 ...

  2. RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage

    文章目录 顺序消息的概念 顺序消费的原理 消费状态 演示 Producer Consumer 代码 顺序消息的概念 消息有序指的是可以按照消息的发送顺序来消费(FIFO). RocketMQ可以严格的 ...

  3. rocketmq发送顺序消息(四)

    rocketmq怎么发送消息可参考我的上一篇博客:rocketmq发送第一条消息.此处我们讲解如何发送rocketmq顺序消息 producer public class ProducerOrder ...

  4. 聊一聊顺序消息(RocketMQ顺序消息的实现机制)

    本文来自:https://www.cnblogs.com/hzmark/p/orderly_message.html 当我们说顺序时,我们在说什么? 日常思维中,顺序大部分情况会和时间关联起来,即时间 ...

  5. RocketMQ入门到入土(二)事务消息顺序消息

    精彩推荐 一百期Java面试题汇总 SpringBoot内容聚合 IntelliJ IDEA内容聚合 Mybatis内容聚合 接上一篇:RocketMQ入门到入土(一)新手也能看懂的原理和实战! 一. ...

  6. RocketMQ——顺序消息

    消息有序指的是可以按照消息的发送顺序来消费. RocketMQ可以严格的保证消息有序.但这个顺序,不是全局顺序,只是分区(queue)顺序.要全局顺序只能一个分区. 之所以出现你这个场景看起来不是顺序 ...

  7. 源码分析RocketMQ顺序消息消费实现原理

    本节目录 1.消息队列负载 2.消息拉取 3.消息顺序消息消费 3.1核心属性与构造函数 3.2 start方法 3.3 submitConsumeRequest 3.4 ConsumeMessage ...

  8. RocketMQ - 6 生产者,顺序消息

    Producer类型 Producer主要职能就是生产消息,发送消息.它可以对多个主题发送消息,甚至可以通过Tag定义些简单的过滤.更复杂的过滤可以使用filter组件来进行相应的业务操作 Rocke ...

  9. RocketMQ(九):rocketMQ设计的全链路消息零丢失方案?+rocketmq消息中间件事务消息机制的底层实现原理?+half是什么?+half消息是如何对消费者不可见的?

    前言: 目前rocketmq更新已经更新了11篇博客了,预计接下来的2-3篇是暂时的更新进度了,准备更新一下springboot或者是jvm,mysql相关的专题出来,后续更新完事后,再分享一些实战性 ...

最新文章

  1. VC6.0下调bug的流程
  2. libopencv_core.so: file not recognized: File format not recognized
  3. 2.13 向量化 Logistic 回归-深度学习-Stanford吴恩达教授
  4. mybatis中mysql流式读取_MyBatis读取大量数据(流式读取)
  5. python所有文件都能用文本方式打开_python 打开文件方式讲解、常用读写操作指令(全)以及读写常见问题...
  6. 幅度和幅值有区别吗_克拉克 (Clark) 变换中等幅值 (2/3) 和等功率 (sqrt(2/3)) 变换的公式推导...
  7. mybatis_user_guide(7) SQL语句构建器类
  8. QT @ VS2017的安装
  9. Mybatis 原始Dao层开发
  10. Pentium的指令系统(1)——Pentium的寻址方式
  11. 列表生成式的复习以及生成器的练习, 杨辉三角实例(非常巧妙)
  12. Linux环境下通过gstack命令查看进程的运行堆栈信息
  13. web 网站抢购并发
  14. 绝对式编码器的ssi协议 stm32 hal
  15. 漂白android软件,原本图片漂白软件
  16. 大话李白flash系列(在线看,全)
  17. msn一直登陆不上,没有办法只好启用meebo!
  18. 零基础学图形学(9) 几何知识——行向量和列向量
  19. HBuilderX 连接 微信开发者工具
  20. 计算机语言属于人类意识的客观内容,《2008年考研政治800题精解》世界的物质性和人的实践活动(5)...

热门文章

  1. 第一届『Citric杯』NOIP提高组模拟赛 题解
  2. 三国杀ol服务器维护时间 11月6日,三国杀ol11月6日更新一览 聚宝盆功能上线
  3. 思科交换机配置试题_cisco交换机配置简单教程.doc
  4. 剪辑视频怎么赚钱怎么做?一个人0资金怎么创业?
  5. 人脸识别方案(包含tcp ,http,socket 三者的区别)
  6. 花了10分钟终于明白矩阵的逆怎么用了!
  7. Oracle生成data patching脚本
  8. 微信小程序分页功能(上拉触底事件)
  9. Python爬虫11-Scrapy爬虫框架
  10. 华为云GaussDB(for Redis)GaussDB(for Redis)全面对比Codis