消息有序指的是可以按照消息的发送顺序来消费。

RocketMQ可以严格的保证消息有序。但这个顺序,不是全局顺序,只是分区(queue)顺序。要全局顺序只能一个分区。

之所以出现你这个场景看起来不是顺序的,是因为发送消息的时候,消息发送默认是会采用轮询的方式发送到不通的queue(分区)。如图:

而消费端消费的时候,是会分配到多个queue的,多个queue是同时拉取提交消费。如图:

但是同一条queue里面,RocketMQ的确是能保证FIFO的。那么要做到顺序消息,应该怎么实现呢——把消息确保投递到同一条queue。

下面用订单进行示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

rocketmq消息生产端示例代码如下:

/*** Producer,发送顺序消息*/
public class Producer {public static void main(String[] args) throws IOException {try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("10.11.11.11:9876;10.11.11.12:9876");producer.start();String[] tags = new String[] { "TagA", "TagC", "TagD" };// 订单列表List<OrderDemo> orderList =  new Producer().buildOrders();Date date = new Date();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(date);for (int i = 0; i < 10; i++) {// 加个时间后缀String body = dateStr + " Hello RocketMQ " + orderList.get(i);Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, body.getBytes());SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Long id = (Long) arg;long index = id % mqs.size();return mqs.get((int)index);}}, orderList.get(i).getOrderId());//订单idSystem.out.println(sendResult + ", body:" + body);}producer.shutdown();} catch (MQClientException e) {e.printStackTrace();} catch (RemotingException e) {e.printStackTrace();} catch (MQBrokerException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}System.in.read();}/*** 生成模拟订单数据 */private List<OrderDemo> buildOrders() {List<OrderDemo> orderList = new ArrayList<OrderDemo>();OrderDemo orderDemo = new OrderDemo();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderDemo();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderDemo();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderDemo();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderDemo();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderDemo();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderDemo();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderDemo();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderDemo();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderDemo();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("完成");orderList.add(orderDemo);return orderList;}

输出:

从图中红色框可以看出,orderId等于15103111039的订单被顺序放入queueId等于7的队列。queueOffset同时在顺序增长。

发送时有序,接收(消费)时也要有序,才能保证顺序消费。如下这段代码演示了普通消费(非有序消费)的实现方式。

/*** 普通消息消费*/
public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");consumer.setNamesrvAddr("10.11.11.11:9876;10.11.11.12:9876");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerConcurrently() {Random random = new Random();@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );for (MessageExt msg: msgs) {System.out.println(msg + ", content:" + new String(msg.getBody()));}try {//模拟业务逻辑处理中...TimeUnit.SECONDS.sleep(random.nextInt(10));} catch (Exception e) {e.printStackTrace();}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}
}

输出:

可见,订单号为15103111039的订单被消费时顺序完成乱了。所以用MessageListenerConcurrently这种消费者是无法做到顺序消费的,采用下面这种方式就做到了顺序消费:

/*** 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)*/
public class ConsumerInOrder {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");consumer.setNamesrvAddr("10.11.11.11:9876;10.11.11.12:9876");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerOrderly() {Random random = new Random();@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );for (MessageExt msg: msgs) {System.out.println(msg + ", content:" + new String(msg.getBody()));}try {//模拟业务逻辑处理中...TimeUnit.SECONDS.sleep(random.nextInt(10));} catch (Exception e) {e.printStackTrace();}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}
}

输出:


MessageListenerOrderly能够保证顺序消费,从图中我们也看到了期望的结果。图中的输出是只启动了一个消费者时的输出,看起来订单号还是混在一起,但是每组订单号之间是有序的。因为消息发送时被分配到了三个队列(参见前面生产者输出日志),那么这三个队列的消息被这唯一消费者消费。

如果启动2个消费者呢?那么其中一个消费者对应消费2个队列,另一个消费者对应消费剩下的1个队列。

如果启动3个消费者呢?那么每个消费者都对应消费1个队列,订单号就区分开了。输出变为这样:

消费者1输出:

消费者2输出:

消费者3输出:

很完美,有木有?!

按照这个示例,把订单号取了做了一个取模运算再丢到selector中,selector保证同一个模的都会投递到同一条queue。即: 相同订单号的--->有相同的模--->有相同的queue。最后就会类似这样:

总结:

rocketmq的顺序消息需要满足2点:

1.Producer端保证发送消息有序,且发送到同一个队列。
2.consumer端保证消费同一个队列。

部分内容图片引用自https://www.zhihu.com/question/30195969

RocketMQ——顺序消息相关推荐

  1. 聊一聊顺序消息(RocketMQ顺序消息的实现机制)

    本文来自:https://www.cnblogs.com/hzmark/p/orderly_message.html 当我们说顺序时,我们在说什么? 日常思维中,顺序大部分情况会和时间关联起来,即时间 ...

  2. 源码分析RocketMQ顺序消息消费实现原理

    本节目录 1.消息队列负载 2.消息拉取 3.消息顺序消息消费 3.1核心属性与构造函数 3.2 start方法 3.3 submitConsumeRequest 3.4 ConsumeMessage ...

  3. RocketMQ(九)RocketMQ顺序消息

    目录 一.什么是顺序消息? 二.顺序消息的原理 三.全局顺序消息 四.局部顺序消息 五.顺序消息缺陷 一.什么是顺序消息? 消息有序指的是,消费者端消费消息时,需按照消息的发送顺序来消费,即先发送的消 ...

  4. RocketMQ 顺序消息解析——图解、源码级解析

  5. rocketmq发送顺序消息(四)

    rocketmq怎么发送消息可参考我的上一篇博客:rocketmq发送第一条消息.此处我们讲解如何发送rocketmq顺序消息 producer public class ProducerOrder ...

  6. RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage

    文章目录 顺序消息的概念 顺序消费的原理 消费状态 演示 Producer Consumer 代码 顺序消息的概念 消息有序指的是可以按照消息的发送顺序来消费(FIFO). RocketMQ可以严格的 ...

  7. rocketmq 顺序消费_10 分钟看懂消息队列 RocketMQ

    RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于2017年9月25日成为 Apache 的顶级项目.作为经历过多次阿里巴巴双十一这种&qu ...

  8. 一文理清RocketMQ顺序消费、重复消费、消息丢失问题

    前言 在使用消息队列时不可避免的会遇到顺序消费.重复消费.消息丢失三个问题.在一次面试字节的时候,面试官问到如何保证顺序消费,当时回答不太准确,特意此文回顾如何解决顺序消费.重复消费.消息丢失三个问题 ...

  9. RocketMQ入门到入土(二)事务消息顺序消息

    精彩推荐 一百期Java面试题汇总 SpringBoot内容聚合 IntelliJ IDEA内容聚合 Mybatis内容聚合 接上一篇:RocketMQ入门到入土(一)新手也能看懂的原理和实战! 一. ...

最新文章

  1. Angular2封装拖拽指令
  2. QML WebEngineView简单用法和常用接口
  3. Linux多线程编程(一)---多线程基本编程
  4. Nginx URL重写(rewrite)配置及信息详解
  5. 在用dw.GetSqlSelect()获得到的Sql语句出现PBSELECT( VERSION的解决办法
  6. SpringBoot在自定义类中调用service层等Spring其他层
  7. Android官方开发文档Training系列课程中文版:OpenGL绘图之环境配置
  8. mysql linux32_Linux 配置 mysql 5.7.32 实操记录
  9. thinkphp5.0解决控制器驼峰命名时提示找不到类名
  10. 21. Upgrade-Insecure-Requests: 1
  11. 红旗Linux职称考试模块,计算机职称考试红旗Linux Desktop 6.0考试大纲
  12. linux安装gcc等程序包,Linux手动安装gcc-8.3.0
  13. 关键词组合工具终结版标题自由组合工具使用教程
  14. 【信息融合】基于BP神经网络和DS 证据理论实现不确定性信息融合问题附matlab代码
  15. Linux系统软件安装
  16. 【免费】各种hadoop版本对应的hadoop.dll和winutils.exe
  17. scl语言用plc脉冲做定时器_scl语言用plc脉冲做定时器_西门子PLC SCL语言开发学习笔记(二)...
  18. 跳一跳改分php源码,小游戏“跳一跳”居然可改分,微信小程序现漏洞
  19. VS2010如何安装MSComm控件
  20. proe服务器高速缓存位置,一招搞定Proe低版本打开高版本的问题 | 我爱分享网

热门文章

  1. python创建person类用printinfo方法_python学习(三)面向对象
  2. Mac安装软件时提示已损坏的解决方法
  3. Python3脚本抢票
  4. 半桥llc 增益 matlab程序,【我已收藏】很完整的LLC谐振半桥电路分析与计算
  5. 阿里云安全组设定(虚拟机端口打开)
  6. Tracup|减少压力和更多的成功,帮助您摆脱工作焦虑的利器
  7. 一步控制台编译java_在控制台运行一个 Java 程序 Test . class ,使用的命令正确的是( )_学小易找答案...
  8. JavaScript高级04 正则表达式
  9. 官方教程之短视频app源码接入openinstall实现免填邀请码功能
  10. 【OpenCV 例程200篇】20. 图像的按位运算(cv2.bitwise)