前序
问题引出:
假设现在有这么一个业务,上游系统通过消息队列,发送一个订单的状态操作信息,上游先发生操作订单价格乘以2倍,再操作订单价格减去10元,但是系统发送的两条消息时间相差不大,下游系统如何接受消息,并且需要严格保证订单状态的操作顺序性?
针对这个问题,我们来介绍下rocketmq的消息机制中的顺序消费实现。
一、rocketMq的消费模式划分
 1.并发消费模式
当同一类消息(一般指同一个topic)业务设计上不需要有序的消费信息,这时候我们可以使用高性能的并发消费模式。并发消费模式下,一个消费端可以同时消费多个数据队列(queue)的多个消息(批量拉取默认32条),消费端中的每个线程也可以一次消费多条消息。
2.顺序消费模式
顺序消费,分为全局顺序消费,和局部顺序消费。
全局顺序消费:只能有一个数据队列(queue),和一个消费者实例。原因是RocketMQ只提供在单个queue上使用FIFO顺序的有序消息。多个queue之间并不能保证消息的严格先后性。
局部顺序消费:通常在实际应用中,我们需要将同一个订单号的相关操作,按照规则(可以是hash或取模等)发送到同一个queue上(使用MessageQueueSelector ),然后消费者实例,使用顺序消费模式消费消息(使用MessageListenerOrderly)。
二、顺序消费代码实例
1.生产端demo
public class OrderedProducer {
public static void main(String[] args) throws Exception {
//初始化生成端
MQProducer producer = new DefaultMQProducer("example_group_name");
//启动
producer.start();
//对消息进行 tag标签划分
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
//创建一个消息实例,指定主题、标签和消息主体
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//借助MessageQueueSelector 选择queue发送
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
//关闭生成端
producer.shutdown();
}
}
2.消费端demo
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
//声明一个推模式的消费端(rocketmq分为 服务器broker  push推消息模式,和生产端主动pull拉模式)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
//设置消费起点: CONSUME_FROM_FIRST_OFFSET
//一个新的订阅组第一次启动从队列的最后位置开始消费<br>* 后续再启动接着上次消费的进度开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//这里的这个实例 只消费 tag a,c,d 的消息
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
//采用顺序消费模式 MessageListenerOrderly
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(false);
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
//消费开启
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
三.局部有序消费详细流程说明
1.生产端
多线程发送的消息无法保证有序性,因此,需要业务方在发送时,针对同一个业务编号(如同一笔订单)的消息需要保证在一个线程内顺序发送,在上一个消息发送成功后,在进行下一个消息的发送。对应到mq中,消息发送方法就得使用同步发送,异步发送无法保证顺序性,而且rocketmq的每个topic下会存在多个queue,要保证消息的顺序性,同一个订单相关的操作消息需要被发送到同一个queue中。代码里,需要使用MessageQueueSelector来选择要发送的queue
2.服务端
mq broker端的每个queue都是采用的FIFO模式,即采用的队列存储,可以保证顺行性。
3.消费端
rocketmq默认消费逻辑:
负载均衡,指定消费者负责某些队列;当前消费者开启多个线程开始同时消费这个队列,远程拉取消息。从消费逻辑中可以看到,如果要保证消息有序消费,就要解决这两个问题。在broker端,如果使用MessageListenerOrderly(顺序消费)的模式,broker通过锁定MessageQueue的方式,来保证同一时刻,只能有一个消费者进行消费。在消费端,在DefaultMQPushConsumer 中的pullMessage方法中,我们进入this.consumeMessageService.submitConsumeRequest方法,看看顺序消费与并发消费的区别。ConsumeMessageOrderlyService(顺序消费)ConsumeRequest(线程类)中的run方法

ConsumeMessageConcurrentlyService(并发消费)ConsumeRequest(线程类)类中的run方法。

我们能清晰看到,顺序消费的线程 会对 messageQueue进行一个锁操作,只有获取到锁之后才会处理消息体,保证同时,只有一个线程消费一个queue。

问题解决:

相信大家看到这,就能够对前序中提出的问题,就能解答出来了,只要上游系统发送的时候把相同订单的状态消息发送到同一个queue,然后下游系统使用顺序消费模式,就能严格保证订单的操作顺序了。


RocketMQ的顺序消费相关推荐

  1. rocketmq 消费方式_RocketMQ事务消费和顺序消费详解

    一.RocketMq有3中消息类型 1.普通消费 2. 顺序消费 3.事务消费 顺序消费场景 在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一.创建订单 ,第二:订单付款,第三:订单完成. ...

  2. RocketMQ 顺序消费只消费一次 坑

    rocketMq实现顺序消费的原理 produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一 ...

  3. RocketMQ事务消费和顺序消费详解

    一.RocketMq有3中消息类型 1.普通消费 2. 顺序消费 3.事务消费 顺序消费场景 在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一.创建订单 ,第二:订单付款,第三:订单完成. ...

  4. rocketmq 顺序消费_10 分钟看懂消息队列 RocketMQ

    RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于2017年9月25日成为 Apache 的顶级项目.作为经历过多次阿里巴巴双十一这种&qu ...

  5. 一文理清RocketMQ顺序消费、重复消费、消息丢失问题

    前言 在使用消息队列时不可避免的会遇到顺序消费.重复消费.消息丢失三个问题.在一次面试字节的时候,面试官问到如何保证顺序消费,当时回答不太准确,特意此文回顾如何解决顺序消费.重复消费.消息丢失三个问题 ...

  6. RocketMQ如何保证消息顺序消费?又为何不解决消息重复消费问题?

    消息的顺序消费对于业务系统来说非常重要,一笔订单产生了3条消息,分别是订单创建.订单付款.订单完成.消费时,必须按照顺序消费才有意义,与此同时多笔订单之间又是可以并行消费的. 如何保证消息顺序消费? ...

  7. rocketMq 顺序消费

    什么是顺序消费? 消息有序指的是一类消息消费时,能按照发送的顺序来消费.例如:一个订单产生了 3 条消息,分别是订单创建.订单付款.订单完成.消费时,要按照这个顺序消费才有意义.与此同时多笔订单之间又 ...

  8. rocketmq 顺序消费_RocketMQ核心概念扫盲

    在正式进入RocketMQ的学习之前,我觉得有必要梳理一下RocketMQ核心概念,为大家学习RocketMQ打下牢固的基础. 1.RocketMQ部署架构 在RocketMQ主要的组件如下: Nam ...

  9. RocketMQ——顺序消费(代码)

    关于rocketmq顺序消费的理解和图示可以查看该博文:RocketMQ--顺序消费和重复消费 本博客主要是以代码示例来了解顺序消费的相关内容,建议在此之前先了解下顺序消费的原理. 注:RocketM ...

最新文章

  1. 多面体的顶点方向以及分解定理以及多胞形凸组合
  2. 笔记合并_.NET Core开发实战(第23课:静态文件中间件:前后端分离开发合并部署骚操作)学习笔记(上)...
  3. Py之pyglet:Python之pyglet库的简介、安装、使用详细攻略
  4. 如何利用 Myflash 解析 binlog ?
  5. 连续出现的字符(信息学奥赛一本通-T1148)
  6. 解决新版DBUtils使用连接池from DBUtils.PooledDB import PooledDB报错
  7. linux 查看安装的系统版本,linux之查看版本信息命令
  8. linux的functions之killproc函数详解
  9. 几个安卓 app 暴露超1亿用户的数据
  10. php mysql link_php与mysql连接
  11. 协程 c语言,协程-C语言实现
  12. 写给想做好社区网站人员的一本书
  13. Tomcat SSL Configuration
  14. 2019年1月30日
  15. 基于fdw的跨Greenplum集群数据库查询实现
  16. 金山毒霸11,更新内容,问题修复了什么?
  17. 面试官交流中的问题与后感
  18. 吃西瓜—先磨刀之概率论
  19. sizeof(long)
  20. uva 10537 The Toll! Revisited

热门文章

  1. 系列学习 docker 之第 5 篇 —— Docker 常用命令
  2. STM32直流减速电机控制篇(一)PWM调速
  3. 微信公众号发送小程序卡片_微信公众号将能够直接给用户推送小程序卡片,社交电商开始发力...
  4. Astyle格式说明
  5. Codeforces Round #618 (Div. 2)C、Anu Has a Function
  6. Spring中常用的设计模式——工厂模式
  7. Big Data Caching for Networking: Moving from Cloud to Edge 论文分享
  8. jquery表格日历写入html代码,简单的日历记事本jQuery插件e-calendar(带样式美化)
  9. 绘制镇街区域的Echarts地图
  10. 区块链架构--fabric基本介绍