为了系统间解耦,我们通常会引入MQ框架,大家各司其职共同完成上下游的业务流程。

大致过程:

  • 生产端,创建一条消息,通过网络发送到MQ Server

  • MQ将 消息存储在topic 的一个分区

  • 消费端,从分区中拉取消息,消费处理

但现实往往不一样!MQ 架构设计要满足高并发、高性能、高可用等指标

单分区,达不到我们的吞吐量要求,我们考虑采用多分区架构设计,正所谓 ”三个臭皮匠赛过一个诸葛亮“,多分区可以有效分摊全局压力,提升整体系统性能。

两台 MQ机器,组成一个集群,原先一个分区存储6条消息,现在分摊到两个分区,每个分区各存储3条消息,性能比上面那个提升一倍。

貌似可以满足我们的需求,但任何事情都有两面性!

我们看看下面业务场景:

一个用户在电商网站上下订单到交易完成,中间会经历一系列动作,订单的状态也会随之变化,一个订单会产生多条MQ消息,下单付款发货买家确认收货,消费端需要严格按照业务状态机的顺序处理,否则,就会出现业务问题。

我们发现,消息带上了状态,不再是一个个独立的个体,有了上下文依赖关系!

对于这个问题,突然想到HTTP协议,其本身也是无状态的,也就是说前后两次请求没有关联,但有些业务功能有登录要求,那怎么解决?

引入Cookie机制,每次请求客户端额外传输一些数据,来达到上下文关联。

回到MQ的消息顺序问题,我们要如何解决?

答案:各退一步,保证局部有序。

比如上面的电商例子,只要保证一个订单的多条状态消息在同一个分区,便可以满足业务需求,这个方案可以覆盖大部分的业务场景。

这里面只需要有一个路由策略组件,由它决定消息该放到哪个分区中!

考虑到市面MQ开源框架很多,常见的如:Kafka、Pulsar、RabbitMQ、RocketMQ 等,API方法略有区别,但设计思路是相通的。

接下来,我们以 RocketMQ 为例:

生产端提供了一个接口 MessageQueueSelector

public interface MessageQueueSelector {MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

接口内定义一个select方法,具体参数含义:

  • mqs:该Topic下所有的队列分片

  • msg:待发送的消息

  • arg:发送消息时传递的参数

关于MessageQueueSelector接口,RocketMQ 框架提供了三个默认实现类:

  • 1、SelectMessageQueueByHash:

arg参数的hashcode的绝对值,然后对mqs.size()取余,得到目标队列在mqs的下标

  • 2、SelectMessageQueueByRandom:

对mqs.size()值取随机数作为目标队列在mqs的下标

  • 3、SelectMessageQueueByMachineRoom

返回null

特别注意:

虽然保证了单个分片的消息有序,但每个分片的消费者只能是单线程处理,因为多线程无法控制消费顺序。这个可能会损失一些性能。

这里又引出另一个问题,如何保证一个队列只能有一个消费端呢?

1、

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance

  • 遍历一个topic下所有的MessageQueue

  • isOrder && !this.lock(mq) 尝试对它加锁,确保一个MessageQueue只能被一个消费者处理

2、将PullRequest对象放入PullMessageServicepullRequestQueue队列中

public void dispatchPullRequest(List<PullRequest> pullRequestList) {for (PullRequest pullRequest : pullRequestList) {this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);}
}

3、org.apache.rocketmq.client.impl.consumer.PullMessageService#run

  • PullMessageService 是一个Runnable线程任务

  • 无限循环,从队列中拉取、处理消息

另一个问题,如何保证一个队列,只有一个线程在处理消息呢?

1、 DefaultMQPushConsumerImpl#pullMessage

  • ConsumeMessageService 中有两个实现类,因为我们有消费顺序要求,会选择ConsumeMessageOrderlyService来处理业务

2、 ConsumeMessageOrderlyService.ConsumeRequest

  • ConcurrentMap中获取messageQueue对应的锁对象

  • 通过 synchronized 关键字,线程来抢占锁,互斥关系,从而保证了一个MessageQueue只能有一个线程并发处理

继续往下看,如果扩容了怎么办?

原来有6个分区,order_id_1的消息在MessageQueue6 中,此时扩容一倍,现在12个分区,order_id_1订单后面产生的消息可能路由到了MessageQueue8 中,同一个订单的消息分布在两个分区中,无法保证顺序。

我们能做的是,先将存量消息处理完,再扩容。如果是在线业务,可以搞个临时topic,先将消息暂时堆积,待扩容后,按新的路由规则重新发送。

顺序消息,如果某条失败了怎么办?会不会一直阻塞?

1、如果失败,不会提交消费位移,系统会自动重试(有重试上限),此时会阻塞后面的消息消费,直到这条消息处理完

2、如果这个消息达到重试上限,依然失败,会进入死信队列,可以继续处理后面的消息

面试官问: 如何保证 MQ消息是有序的?相关推荐

  1. 第十二期:面试官问你什么是消息队列?把这篇甩给他!

    消息队列不知道大家看到这个词的时候,会不会觉得它是一个比较高端的技术,反正我是觉得它好像是挺牛逼的. 一.什么是消息队列? 消息队列不知道大家看到这个词的时候,会不会觉得它是一个比较高端的技术,反正我 ...

  2. 面试官问你什么是消息队列?把这篇甩给他!

    来源:Java3y(ID:java3y) 一.什么是消息队列? 消息队列不知道大家看到这个词的时候,会不会觉得它是一个比较高端的技术,反正我是觉得它好像是挺牛逼的. 消息队列,一般我们会简称它为MQ( ...

  3. 关于使用消息队列今天被面试官问倒了

    为什么使用消息队列 其实就是问问你消息队列都有哪些使用场景,然后你项目里具体是什么场景,说说你在这个场景里用消息队列是什么? 面试官问你这个问题,期望的一个回答是说,你们公司有个什么业务场景,这个业务 ...

  4. 面试官问:Kafka 会不会丢消息?怎么处理的?

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! Kafka存在丢消息的问题,消息丢失会发生在Broker, ...

  5. 【283期】面试官问:高并发场景下,如何保证全局唯一分布式 ID 生成?

    点击上方"Java精选",选择"设为星标" 别问别人为什么,多问自己凭什么! 下方有惊喜,留言必回,有问必答! 每一天进步一点点,是成功的开始... 前言 系统 ...

  6. 【059期】面试官问:序列化是什么,为什么要序列化,如何实现?

    >>号外:关注"Java精选"公众号,回复"面试资料",免费领取资料!"Java精选面试题"小程序,3000+ 道面试题在线刷, ...

  7. 【240期】面试官问:说说基于 Redis 实现延时队列服务?

    点击上方"Java精选",选择"设为星标" 别问别人为什么,多问自己凭什么! 下方有惊喜,留言必回,有问必答! 每天 08:15 更新文章,每天进步一点点... ...

  8. 你以为面试官问的是分布式缓存,其实他想问……

    最近一个哥们去面试某当红大厂了,其中几个他印象深刻的面试题你们品品: 1.介绍下如何对MySQL SQL语句进行分析和优化? 2.Redis 怎样实现的分布式锁? 3.如何实现本地缓存和分布式缓存? ...

  9. 【146期】面试官问:说一说 RabbitMQ 的几种工作模式和优化建议?

    点击上方"Java精选",选择"设为星标" 别问别人为什么,多问自己凭什么! 下方留言必回,有问必答! 每天 08:00 更新文章,每天进步一点点... 1.组 ...

最新文章

  1. C#学习视频分享与开发技术QQ交流群
  2. 双亲委派机制_面试官:双亲委派机制的原理和作用是什么?
  3. php CI 实战教程:如何去掉index.php目录
  4. 分组数据方差公式_连续变量假设检验 之 单因素方差检验
  5. python tk combobox设置值为空_Python编程从入门到实践日记Day24
  6. 午后随笔 -- 定位的思考
  7. 解决问题 WebDriverException: Message: unknown error: cannot find Chrome binary
  8. JavaScript深入之执行上下文栈 1
  9. 错误排查:Cloudera Manager Agent 的 Parcel 目录位于可用空间小于 10.0 吉字节 的文件系统上。 /opt/cloudera/parcels...
  10. 抽象的数码艺术-分形艺术
  11. 外卖行业现状分析_外卖行业生存现状分析:你还会点外卖吗?
  12. 浪潮配置ipim_浪潮服务器管理口IP设置_IPMI设置
  13. ubuntu 1204 server xp 硬盘安装
  14. Python 最强 IDE 详细使用指南!
  15. 虚拟机和双系统的优缺点
  16. IIS与MySee插件冲突问题
  17. 《信号与系统学习笔记》—拉普拉斯变换(二)
  18. AI测试:让软件测试变得聪明伶俐
  19. 韩电商遭“暴击”,软银减持10亿美元-成都扬帆志远跨境电商
  20. flickr搜索_从提示框:DIY笔,将旧光盘回收到游戏中以及在Flickr中搜索Kindle屏幕保护程序...

热门文章

  1. matplotlib中文文档_python绘图库——Matplotlib及Seaborn使用(入门篇1)
  2. linux变量接收命令返回值,Linux Shell教程(一)
  3. 多个mapper的事务回滚_揭秘蚂蚁金服分布式事务 Seata 的AT、Saga和TCC模式
  4. 如何快速清空 Linux 中的大文件
  5. 无线红外探测器01-产品简介和功能需求
  6. php网站首页点击更多时获取数据,jQuery+PHP实现点击按钮加载更多,不刷新页面加载更多数据!附:可用源码+demo...
  7. CSS中的颜色和字体
  8. java学习笔记_Java学习笔记——第1篇
  9. BZOJ 2140 稳定婚姻(强联通分量判环)【BZOJ修复工程】
  10. 牛客练习赛61 C 四个选项(并查集、DP、排列组合)难度⭐⭐⭐