RocketMQ——顺序消息
消息有序指的是可以按照消息的发送顺序来消费。
RocketMQ可以严格的保证消息有序。但这个顺序,不是全局顺序,只是分区(queue)顺序。要全局顺序只能一个分区。
之所以出现你这个场景看起来不是顺序的,是因为发送消息的时候,消息发送默认是会采用轮询的方式发送到不通的queue(分区)。如图:
而消费端消费的时候,是会分配到多个queue的,多个queue是同时拉取提交消费。如图:
但是同一条queue里面,RocketMQ的确是能保证FIFO的。那么要做到顺序消息,应该怎么实现呢——把消息确保投递到同一条queue。
下面用订单进行示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
rocketmq消息生产端示例代码如下:
/*** Producer,发送顺序消息*/
public class Producer {public static void main(String[] args) throws IOException {try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("10.11.11.11:9876;10.11.11.12:9876");producer.start();String[] tags = new String[] { "TagA", "TagC", "TagD" };// 订单列表List<OrderDemo> orderList = new Producer().buildOrders();Date date = new Date();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(date);for (int i = 0; i < 10; i++) {// 加个时间后缀String body = dateStr + " Hello RocketMQ " + orderList.get(i);Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, body.getBytes());SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Long id = (Long) arg;long index = id % mqs.size();return mqs.get((int)index);}}, orderList.get(i).getOrderId());//订单idSystem.out.println(sendResult + ", body:" + body);}producer.shutdown();} catch (MQClientException e) {e.printStackTrace();} catch (RemotingException e) {e.printStackTrace();} catch (MQBrokerException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}System.in.read();}/*** 生成模拟订单数据 */private List<OrderDemo> buildOrders() {List<OrderDemo> orderList = new ArrayList<OrderDemo>();OrderDemo orderDemo = new OrderDemo();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderDemo();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderDemo();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderDemo();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderDemo();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderDemo();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderDemo();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderDemo();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderDemo();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderDemo();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("完成");orderList.add(orderDemo);return orderList;}
输出:
从图中红色框可以看出,orderId等于15103111039的订单被顺序放入queueId等于7的队列。queueOffset同时在顺序增长。
发送时有序,接收(消费)时也要有序,才能保证顺序消费。如下这段代码演示了普通消费(非有序消费)的实现方式。
/*** 普通消息消费*/
public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");consumer.setNamesrvAddr("10.11.11.11:9876;10.11.11.12:9876");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerConcurrently() {Random random = new Random();@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );for (MessageExt msg: msgs) {System.out.println(msg + ", content:" + new String(msg.getBody()));}try {//模拟业务逻辑处理中...TimeUnit.SECONDS.sleep(random.nextInt(10));} catch (Exception e) {e.printStackTrace();}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}
}
输出:
可见,订单号为15103111039的订单被消费时顺序完成乱了。所以用MessageListenerConcurrently这种消费者是无法做到顺序消费的,采用下面这种方式就做到了顺序消费:
/*** 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)*/
public class ConsumerInOrder {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");consumer.setNamesrvAddr("10.11.11.11:9876;10.11.11.12:9876");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerOrderly() {Random random = new Random();@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );for (MessageExt msg: msgs) {System.out.println(msg + ", content:" + new String(msg.getBody()));}try {//模拟业务逻辑处理中...TimeUnit.SECONDS.sleep(random.nextInt(10));} catch (Exception e) {e.printStackTrace();}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}
}
输出:
MessageListenerOrderly能够保证顺序消费,从图中我们也看到了期望的结果。图中的输出是只启动了一个消费者时的输出,看起来订单号还是混在一起,但是每组订单号之间是有序的。因为消息发送时被分配到了三个队列(参见前面生产者输出日志),那么这三个队列的消息被这唯一消费者消费。
如果启动2个消费者呢?那么其中一个消费者对应消费2个队列,另一个消费者对应消费剩下的1个队列。
如果启动3个消费者呢?那么每个消费者都对应消费1个队列,订单号就区分开了。输出变为这样:
消费者1输出:
消费者2输出:
消费者3输出:
很完美,有木有?!
按照这个示例,把订单号取了做了一个取模运算再丢到selector中,selector保证同一个模的都会投递到同一条queue。即: 相同订单号的--->有相同的模--->有相同的queue。最后就会类似这样:
总结:
rocketmq的顺序消息需要满足2点:
1.Producer端保证发送消息有序,且发送到同一个队列。
2.consumer端保证消费同一个队列。
部分内容图片引用自https://www.zhihu.com/question/30195969
RocketMQ——顺序消息相关推荐
- 聊一聊顺序消息(RocketMQ顺序消息的实现机制)
本文来自:https://www.cnblogs.com/hzmark/p/orderly_message.html 当我们说顺序时,我们在说什么? 日常思维中,顺序大部分情况会和时间关联起来,即时间 ...
- 源码分析RocketMQ顺序消息消费实现原理
本节目录 1.消息队列负载 2.消息拉取 3.消息顺序消息消费 3.1核心属性与构造函数 3.2 start方法 3.3 submitConsumeRequest 3.4 ConsumeMessage ...
- RocketMQ(九)RocketMQ顺序消息
目录 一.什么是顺序消息? 二.顺序消息的原理 三.全局顺序消息 四.局部顺序消息 五.顺序消息缺陷 一.什么是顺序消息? 消息有序指的是,消费者端消费消息时,需按照消息的发送顺序来消费,即先发送的消 ...
- RocketMQ 顺序消息解析——图解、源码级解析
- rocketmq发送顺序消息(四)
rocketmq怎么发送消息可参考我的上一篇博客:rocketmq发送第一条消息.此处我们讲解如何发送rocketmq顺序消息 producer public class ProducerOrder ...
- RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage
文章目录 顺序消息的概念 顺序消费的原理 消费状态 演示 Producer Consumer 代码 顺序消息的概念 消息有序指的是可以按照消息的发送顺序来消费(FIFO). RocketMQ可以严格的 ...
- rocketmq 顺序消费_10 分钟看懂消息队列 RocketMQ
RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于2017年9月25日成为 Apache 的顶级项目.作为经历过多次阿里巴巴双十一这种&qu ...
- 一文理清RocketMQ顺序消费、重复消费、消息丢失问题
前言 在使用消息队列时不可避免的会遇到顺序消费.重复消费.消息丢失三个问题.在一次面试字节的时候,面试官问到如何保证顺序消费,当时回答不太准确,特意此文回顾如何解决顺序消费.重复消费.消息丢失三个问题 ...
- RocketMQ入门到入土(二)事务消息顺序消息
精彩推荐 一百期Java面试题汇总 SpringBoot内容聚合 IntelliJ IDEA内容聚合 Mybatis内容聚合 接上一篇:RocketMQ入门到入土(一)新手也能看懂的原理和实战! 一. ...
最新文章
- Angular2封装拖拽指令
- QML WebEngineView简单用法和常用接口
- Linux多线程编程(一)---多线程基本编程
- Nginx URL重写(rewrite)配置及信息详解
- 在用dw.GetSqlSelect()获得到的Sql语句出现PBSELECT( VERSION的解决办法
- SpringBoot在自定义类中调用service层等Spring其他层
- Android官方开发文档Training系列课程中文版:OpenGL绘图之环境配置
- mysql linux32_Linux 配置 mysql 5.7.32 实操记录
- thinkphp5.0解决控制器驼峰命名时提示找不到类名
- 21. Upgrade-Insecure-Requests: 1
- 红旗Linux职称考试模块,计算机职称考试红旗Linux Desktop 6.0考试大纲
- linux安装gcc等程序包,Linux手动安装gcc-8.3.0
- 关键词组合工具终结版标题自由组合工具使用教程
- 【信息融合】基于BP神经网络和DS 证据理论实现不确定性信息融合问题附matlab代码
- Linux系统软件安装
- 【免费】各种hadoop版本对应的hadoop.dll和winutils.exe
- scl语言用plc脉冲做定时器_scl语言用plc脉冲做定时器_西门子PLC SCL语言开发学习笔记(二)...
- 跳一跳改分php源码,小游戏“跳一跳”居然可改分,微信小程序现漏洞
- VS2010如何安装MSComm控件
- proe服务器高速缓存位置,一招搞定Proe低版本打开高版本的问题 | 我爱分享网
热门文章
- python创建person类用printinfo方法_python学习(三)面向对象
- Mac安装软件时提示已损坏的解决方法
- Python3脚本抢票
- 半桥llc 增益 matlab程序,【我已收藏】很完整的LLC谐振半桥电路分析与计算
- 阿里云安全组设定(虚拟机端口打开)
- Tracup|减少压力和更多的成功,帮助您摆脱工作焦虑的利器
- 一步控制台编译java_在控制台运行一个 Java 程序 Test . class ,使用的命令正确的是( )_学小易找答案...
- JavaScript高级04 正则表达式
- 官方教程之短视频app源码接入openinstall实现免填邀请码功能
- 【OpenCV 例程200篇】20. 图像的按位运算(cv2.bitwise)