RocketMQ(九)RocketMQ顺序消息
目录
一、什么是顺序消息?
二、顺序消息的原理
三、全局顺序消息
四、局部顺序消息
五、顺序消息缺陷
一、什么是顺序消息?
消息有序指的是,消费者端消费消息时,需按照消息的发送顺序来消费,即先发送的消息,需要先消费(FIFO)。
举个容易理解的例子:通常创建订单后,会经历一系列的操作:【订单创建 -> 订单支付 -> 订单发货 -> 订单配送 -> 订单完成】。在创建完订单后,会发送五条消息到MQ Broker中,消费的时候要按照【订单创建 -> 订单支付 -> 订单发货 -> 订单配送 -> 订单完成】这个顺序去消费,这样的订单才是有效的。
RocketMQ采用局部顺序一致性的机制,实现了单个队列中消息的有序性,使用FIFO顺序提供有序消息。简而言之,我们的消息要保证有序,就必须把一组消息存放在同一个队列,然后由Consumer进行逐一消费。
但是如果碰到高并发的情况,消息不就会阻塞了吗?
RocketMQ给的解决方案是按照业务去划分不同的队列,然后并行消费,提高消息的处理速度的同时避免消息堆积。
RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
- 全局有序:全局顺序时使用一个queue;
- 分区有序:局部顺序时多个queue并行消费;
二、顺序消息的原理
在默认的情况下,消息发送会采取Round Robin轮询方式把消息发送到不同的queue;而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序的。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
三、全局顺序消息
前面介绍到,全局顺序消息的话,我们需要将所有消息都发送到同一个队列,然后消费者端也订阅同一个队列,这样就能实现顺序消费消息的功能。下面通过一个示例说明如何实现全局顺序消息。
- (1)、生产者发送消息
public class OrderMQProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException, ExecutionException {// 创建DefaultMQProducer类并设定生产者名称DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test");// 设置NameServer地址,如果是集群的话,使用分号;分隔开mqProducer.setNamesrvAddr("10.0.90.86:9876");// 启动消息生产者mqProducer.start();for (int i = 0; i < 5; i++) {// 创建消息,并指定Topic(主题),Tag(标签)和消息内容Message message = new Message("GLOBAL_ORDER_TOPIC", "", ("全局有序消息" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 实现MessageQueueSelector,重写select方法,保证消息都进入同一个队列// send方法的第一个参数: 需要发送的消息Message// send方法的第二个参数: 消息队列选择器MessageQueueSelector// send方法的第三个参数: 消息将要进入的队列下标,这里我们指定消息都发送到下标为1的队列SendResult sendResult = mqProducer.send(message, new MessageQueueSelector() {@Override// select方法第一个参数: 指该Topic下有的队列集合// 第二个参数: 发送的消息// 第三个参数: 消息将要进入的队列下标,它与send方法的第三个参数相同public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {return mqs.get((Integer) arg);}}, 1);System.out.println("sendResult = " + sendResult);}// 如果不再发送消息,关闭Producer实例mqProducer.shutdown();}}
- (2)、消费者消费消息
public class OrderMQConsumer {public static void main(String[] args) throws MQClientException {// 创建DefaultMQPushConsumer类并设定消费者名称DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test");// 设置NameServer地址,如果是集群的话,使用分号;分隔开mqPushConsumer.setNamesrvAddr("10.0.90.86:9876");// 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费// 如果不是第一次启动,那么按照上次消费的位置继续消费mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息,如果订阅该主题下的所有tag,则使用*mqPushConsumer.subscribe("GLOBAL_ORDER_TOPIC", "*");/*** 与普通消费一样需要注册消息监听器,但是传入的不再是MessageListenerConcurrently* 而是需要传入MessageListenerOrderly的实现子类,并重写consumeMessage方法。*/// 顺序消费同一个队列的消息mqPushConsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(false);for (MessageExt msg : msgs) {System.out.println("消费线程=" + Thread.currentThread().getName() +", queueId=" + msg.getQueueId() + ", 消息内容:" + new String(msg.getBody()));}// 标记该消息已经被成功消费return ConsumeOrderlyStatus.SUCCESS;}});// 启动消费者实例mqPushConsumer.start();}
}
- (3)、启动生产者
如下,可看到,消息成功发送到Broker中,并且可以看到,5条消息选择的queueId都是1。
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E009213C818B4AAC28EAB71A70000, offsetMsgId=0A005A5600002A9F00000000000076AB, messageQueue=MessageQueue [topic=GLOBAL_ORDER_TOPIC, brokerName=broker-a, queueId=1], queueOffset=0]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E009213C818B4AAC28EAB71D20001, offsetMsgId=0A005A5600002A9F000000000000776B, messageQueue=MessageQueue [topic=GLOBAL_ORDER_TOPIC, brokerName=broker-a, queueId=1], queueOffset=1]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E009213C818B4AAC28EAB71D80002, offsetMsgId=0A005A5600002A9F000000000000782B, messageQueue=MessageQueue [topic=GLOBAL_ORDER_TOPIC, brokerName=broker-a, queueId=1], queueOffset=2]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E009213C818B4AAC28EAB71DE0003, offsetMsgId=0A005A5600002A9F00000000000078EB, messageQueue=MessageQueue [topic=GLOBAL_ORDER_TOPIC, brokerName=broker-a, queueId=1], queueOffset=3]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E009213C818B4AAC28EAB71E60004, offsetMsgId=0A005A5600002A9F00000000000079AB, messageQueue=MessageQueue [topic=GLOBAL_ORDER_TOPIC, brokerName=broker-a, queueId=1], queueOffset=4]
- (4)、启动消费者
如下,可看到,消费者也是按照发送消息的顺序消费消息。
消费线程=ConsumeMessageThread_1, queueId=1, 消息内容:全局有序消息0
消费线程=ConsumeMessageThread_1, queueId=1, 消息内容:全局有序消息1
消费线程=ConsumeMessageThread_1, queueId=1, 消息内容:全局有序消息2
消费线程=ConsumeMessageThread_1, queueId=1, 消息内容:全局有序消息3
消费线程=ConsumeMessageThread_1, queueId=1, 消息内容:全局有序消息4
四、局部顺序消息
下面用订单进行分区有序的示例。一个订单创建完成后,订单的状态流转大概是:【订单创建 -> 订单支付 -> 订单完成】,我们在创建MessageQueueSelector消息队列选择器的时候,需要根据业务唯一标识自定义队列选择算法,如本例中则可以使用orderId订单号去选择队列。这样的话,订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
大体过程如下图:
- (1)、生产者发送消息
public class OrderMQProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException, ExecutionException {// 创建DefaultMQProducer类并设定生产者名称DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test");// 设置NameServer地址,如果是集群的话,使用分号;分隔开mqProducer.setNamesrvAddr("10.0.90.86:9876");// 启动消息生产者mqProducer.start();List<Order> orderList = getOrderList();for (int i = 0; i < orderList.size(); i++) {String body = "【" + orderList.get(i) + "】订单状态变更消息";// 创建消息,并指定Topic(主题),Tag(标签)和消息内容Message msg = new Message("ORDER_STATUS_CHANGE", "", body.getBytes(RemotingHelper.DEFAULT_CHARSET));// MessageQueueSelector: 消息队列选择器,根据业务唯一标识自定义队列选择算法/*** msg:消息对象* selector:消息队列的选择器* arg:选择队列的业务标识,如本例中的orderId*/SendResult sendResult = mqProducer.send(msg, new MessageQueueSelector() {/*** @param mqs 队列集合* @param msg 消息对象* @param arg 业务标识的参数,对应send()方法传入的第三个参数arg* @return*/@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {//根据arg(实际上是订单id)选择消息发送的队列long index = (Long) arg % mqs.size();return mqs.get((int) index);}//mqProducer.send()方法第三个参数, 会传递到select()方法的arg参数}, orderList.get(i).getOrderId());System.out.println(String.format("消息发送状态:%s, orderId:%s, queueId:%d, body:%s",sendResult.getSendStatus(),orderList.get(i).getOrderId(),sendResult.getMessageQueue().getQueueId(),body));}// 如果不再发送消息,关闭Producer实例mqProducer.shutdown();}/*** 订单状态变更流程: ORDER_CREATE(订单创建) -> ORDER_PAYED(订单已支付) -> ORDER_COMPLETE(订单完成)*/public static List<Order> getOrderList() {List<Order> orderList = new ArrayList<>();Order orderDemo = new Order();orderDemo.setOrderId(1L);orderDemo.setOrderStatus("ORDER_CREATE");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(2L);orderDemo.setOrderStatus("ORDER_CREATE");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(1L);orderDemo.setOrderStatus("ORDER_PAYED");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(2L);orderDemo.setOrderStatus("ORDER_PAYED");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(2L);orderDemo.setOrderStatus("ORDER_COMPLETE");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(3L);orderDemo.setOrderStatus("ORDER_CREATE");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(4L);orderDemo.setOrderStatus("ORDER_CREATE");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(3L);orderDemo.setOrderStatus("ORDER_PAYED");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(1L);orderDemo.setOrderStatus("ORDER_COMPLETE");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(3L);orderDemo.setOrderStatus("ORDER_COMPLETE");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(4L);orderDemo.setOrderStatus("ORDER_PAYED");orderList.add(orderDemo);orderDemo = new Order();orderDemo.setOrderId(4L);orderDemo.setOrderStatus("ORDER_COMPLETE");orderList.add(orderDemo);return orderList;}}public class Order implements Serializable {/*** 订单ID*/private Long orderId;/*** 订单状态*/private String orderStatus;public Long getOrderId() {return orderId;}public void setOrderId(Long orderId) {this.orderId = orderId;}public String getOrderStatus() {return orderStatus;}public void setOrderStatus(String orderStatus) {this.orderStatus = orderStatus;}@Overridepublic String toString() {return "Order{" +"orderId=" + orderId +", orderStatus='" + orderStatus + '\'' +'}';}
}
- (2)、消费者消费消息
public class OrderMQConsumer {public static void main(String[] args) throws MQClientException {// 创建DefaultMQPushConsumer类并设定消费者名称DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test");// 设置NameServer地址,如果是集群的话,使用分号;分隔开mqPushConsumer.setNamesrvAddr("10.0.90.86:9876");// 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费// 如果不是第一次启动,那么按照上次消费的位置继续消费mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息,如果订阅该主题下的所有tag,则使用*mqPushConsumer.subscribe("ORDER_STATUS_CHANGE", "*");// 注册回调实现类来处理从broker拉取回来的消息// 注意:顺序消息注册的是MessageListenerOrderly监听器mqPushConsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext consumeOrderlyContext) {consumeOrderlyContext.setAutoCommit(true);for (MessageExt msg : msgList) {// 每个queue有唯一的consume线程来消费, 订单对每个queue都是分区有序System.out.println("消费线程=" + Thread.currentThread().getName() +", queueId=" + msg.getQueueId() + ", 消息内容:" + new String(msg.getBody()));}try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}// 标记该消息已经被成功消费return ConsumeOrderlyStatus.SUCCESS;}});// 启动消费者实例mqPushConsumer.start();}
}
- (3)、启动生产者
消息发送状态:SEND_OK, orderId:1, queueId:1, body:【Order{orderId=1, orderStatus='ORDER_CREATE'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:2, queueId:2, body:【Order{orderId=2, orderStatus='ORDER_CREATE'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:1, queueId:1, body:【Order{orderId=1, orderStatus='ORDER_PAYED'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:2, queueId:2, body:【Order{orderId=2, orderStatus='ORDER_PAYED'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:2, queueId:2, body:【Order{orderId=2, orderStatus='ORDER_COMPLETE'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:3, queueId:3, body:【Order{orderId=3, orderStatus='ORDER_CREATE'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:4, queueId:0, body:【Order{orderId=4, orderStatus='ORDER_CREATE'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:3, queueId:3, body:【Order{orderId=3, orderStatus='ORDER_PAYED'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:1, queueId:1, body:【Order{orderId=1, orderStatus='ORDER_COMPLETE'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:3, queueId:3, body:【Order{orderId=3, orderStatus='ORDER_COMPLETE'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:4, queueId:0, body:【Order{orderId=4, orderStatus='ORDER_PAYED'}】订单状态变更消息
消息发送状态:SEND_OK, orderId:4, queueId:0, body:【Order{orderId=4, orderStatus='ORDER_COMPLETE'}】订单状态变更消息
- (4)、启动消费者
消费线程=ConsumeMessageThread_2, queueId=1, 消息内容:【Order{orderId=1, orderStatus='ORDER_CREATE'}】订单状态变更消息
消费线程=ConsumeMessageThread_1, queueId=2, 消息内容:【Order{orderId=2, orderStatus='ORDER_CREATE'}】订单状态变更消息
消费线程=ConsumeMessageThread_3, queueId=3, 消息内容:【Order{orderId=3, orderStatus='ORDER_CREATE'}】订单状态变更消息
消费线程=ConsumeMessageThread_4, queueId=0, 消息内容:【Order{orderId=4, orderStatus='ORDER_CREATE'}】订单状态变更消息
消费线程=ConsumeMessageThread_2, queueId=1, 消息内容:【Order{orderId=1, orderStatus='ORDER_PAYED'}】订单状态变更消息
消费线程=ConsumeMessageThread_1, queueId=2, 消息内容:【Order{orderId=2, orderStatus='ORDER_PAYED'}】订单状态变更消息
消费线程=ConsumeMessageThread_3, queueId=3, 消息内容:【Order{orderId=3, orderStatus='ORDER_PAYED'}】订单状态变更消息
消费线程=ConsumeMessageThread_4, queueId=0, 消息内容:【Order{orderId=4, orderStatus='ORDER_PAYED'}】订单状态变更消息
消费线程=ConsumeMessageThread_1, queueId=2, 消息内容:【Order{orderId=2, orderStatus='ORDER_COMPLETE'}】订单状态变更消息
消费线程=ConsumeMessageThread_3, queueId=3, 消息内容:【Order{orderId=3, orderStatus='ORDER_COMPLETE'}】订单状态变更消息
消费线程=ConsumeMessageThread_2, queueId=1, 消息内容:【Order{orderId=1, orderStatus='ORDER_COMPLETE'}】订单状态变更消息
消费线程=ConsumeMessageThread_4, queueId=0, 消息内容:【Order{orderId=4, orderStatus='ORDER_COMPLETE'}】订单状态变更消息
从上面的结果,我们可以看出来实现了分区有序,即一个线程只完成唯一标识的订单消息。
五、顺序消息缺陷
- 消费顺序消息的并行度依赖于队列的数量 ;
- 队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题;
- 遇到消息失败的消息,无法跳过,当前队列消费暂停;
RocketMQ(九)RocketMQ顺序消息相关推荐
- rocketmq 重复消费_消息队列 RocketMQ
引言 本文整理了RocketMQ的相关知识,方便以后查阅. 功能介绍 简单来说,消息队列就是基础数据结构课程里"先进先出"的一种数据结构,但是如果要消除单点故障,保证消息传输的可靠 ...
- RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage
文章目录 顺序消息的概念 顺序消费的原理 消费状态 演示 Producer Consumer 代码 顺序消息的概念 消息有序指的是可以按照消息的发送顺序来消费(FIFO). RocketMQ可以严格的 ...
- rocketmq发送顺序消息(四)
rocketmq怎么发送消息可参考我的上一篇博客:rocketmq发送第一条消息.此处我们讲解如何发送rocketmq顺序消息 producer public class ProducerOrder ...
- 聊一聊顺序消息(RocketMQ顺序消息的实现机制)
本文来自:https://www.cnblogs.com/hzmark/p/orderly_message.html 当我们说顺序时,我们在说什么? 日常思维中,顺序大部分情况会和时间关联起来,即时间 ...
- RocketMQ入门到入土(二)事务消息顺序消息
精彩推荐 一百期Java面试题汇总 SpringBoot内容聚合 IntelliJ IDEA内容聚合 Mybatis内容聚合 接上一篇:RocketMQ入门到入土(一)新手也能看懂的原理和实战! 一. ...
- RocketMQ——顺序消息
消息有序指的是可以按照消息的发送顺序来消费. RocketMQ可以严格的保证消息有序.但这个顺序,不是全局顺序,只是分区(queue)顺序.要全局顺序只能一个分区. 之所以出现你这个场景看起来不是顺序 ...
- 源码分析RocketMQ顺序消息消费实现原理
本节目录 1.消息队列负载 2.消息拉取 3.消息顺序消息消费 3.1核心属性与构造函数 3.2 start方法 3.3 submitConsumeRequest 3.4 ConsumeMessage ...
- RocketMQ - 6 生产者,顺序消息
Producer类型 Producer主要职能就是生产消息,发送消息.它可以对多个主题发送消息,甚至可以通过Tag定义些简单的过滤.更复杂的过滤可以使用filter组件来进行相应的业务操作 Rocke ...
- RocketMQ(九):rocketMQ设计的全链路消息零丢失方案?+rocketmq消息中间件事务消息机制的底层实现原理?+half是什么?+half消息是如何对消费者不可见的?
前言: 目前rocketmq更新已经更新了11篇博客了,预计接下来的2-3篇是暂时的更新进度了,准备更新一下springboot或者是jvm,mysql相关的专题出来,后续更新完事后,再分享一些实战性 ...
最新文章
- VC6.0下调bug的流程
- libopencv_core.so: file not recognized: File format not recognized
- 2.13 向量化 Logistic 回归-深度学习-Stanford吴恩达教授
- mybatis中mysql流式读取_MyBatis读取大量数据(流式读取)
- python所有文件都能用文本方式打开_python 打开文件方式讲解、常用读写操作指令(全)以及读写常见问题...
- 幅度和幅值有区别吗_克拉克 (Clark) 变换中等幅值 (2/3) 和等功率 (sqrt(2/3)) 变换的公式推导...
- mybatis_user_guide(7) SQL语句构建器类
- QT @ VS2017的安装
- Mybatis 原始Dao层开发
- Pentium的指令系统(1)——Pentium的寻址方式
- 列表生成式的复习以及生成器的练习, 杨辉三角实例(非常巧妙)
- Linux环境下通过gstack命令查看进程的运行堆栈信息
- web 网站抢购并发
- 绝对式编码器的ssi协议 stm32 hal
- 漂白android软件,原本图片漂白软件
- 大话李白flash系列(在线看,全)
- msn一直登陆不上,没有办法只好启用meebo!
- 零基础学图形学(9) 几何知识——行向量和列向量
- HBuilderX 连接 微信开发者工具
- 计算机语言属于人类意识的客观内容,《2008年考研政治800题精解》世界的物质性和人的实践活动(5)...
热门文章
- 第一届『Citric杯』NOIP提高组模拟赛 题解
- 三国杀ol服务器维护时间 11月6日,三国杀ol11月6日更新一览 聚宝盆功能上线
- 思科交换机配置试题_cisco交换机配置简单教程.doc
- 剪辑视频怎么赚钱怎么做?一个人0资金怎么创业?
- 人脸识别方案(包含tcp ,http,socket 三者的区别)
- 花了10分钟终于明白矩阵的逆怎么用了!
- Oracle生成data patching脚本
- 微信小程序分页功能(上拉触底事件)
- Python爬虫11-Scrapy爬虫框架
- 华为云GaussDB(for Redis)GaussDB(for Redis)全面对比Codis