Rocket MQ 详解
开这个专栏主要之前实习的时候经常用到Rocket MQ,大厂使用的也比较多,主要讲解一些关键原理和概念。
介绍
定义:分布式消息中间件。MQ是消息队列的意思。
特点:低延迟、高并发、高可用、高可靠。
解释一下“高可用”:通常是指,通过设计减少系统不能提供服务的时间。 假设系统一直能够提供服务,我们说系统的可用性是100%。 如果系统每运行100个时间单位,会有1个时间单位无法提供服务,我们说系统的可用性是99%。高可用,是消息队列 RocketMQ 跟其他的消息服务器的主要区别
核心概念
- Topic:消息主题,我理解为与特定主题任务相关的一类消息。是消息的第一级(最大)类型,比如交易消息,物流消息。
- 生产者:生产消息,发送消息
- 消费者:接受消息,并消费
- 消息:我理解为一个任务。有消息属性,即消息附带的业务相关的属性,比如key,tag/
- Group: 我理解为同一类具体生产者/消费者,使用同一个dag完成相同的工作。
核心特性
这里参考了此博客:原作者很多地方只点了一下,我在这里做补充说明。
添加链接描述
业务场景说明:
电商行业的下单业务场景,以最简单的下单流程来说,下单流程如下:
锁库存
创建订单
用户支付
扣减库存
修改订单状态
给用户发送购买短信通知
给用户增加积分
通知商家发货
异步解耦
下单之后,用户完成支付,此时有个逻辑叫做支付回调,即我们通常在dag中会使用feedback节点。
在这个回调里,包括上述后面几步:
1.扣减库存
2.修改订单状态
给用户发送购买短信通知
给用户增加积分
通知商家发货
那么问题来了,如果线性的跑这几个步骤需要5个阶段,但是其实对于用户来说,完成了1-2之后,后面三步作为用户本身并不需要关心顺序,他们只需要自由的完成即可,所以我们把这三步发送到rocket mq,让其广播任务,完成任务即可。
打个很简单的比方,你去餐厅吃饭,你只需要告诉服务员来一碗大碗宽面你就可以玩儿手机了,至于这个面怎么拉的,怎么煮的,汤怎么调的,怎么装盘的,这些你都不关心。
所以总体时间并没有变化,只是对于你来说,你直接参与的时间变短了很多
削峰
主要指在并发大流量的冲击下,利用Rocket MQ可以抗住瞬时流量,保护系统稳定
原理是我们把所有的任务都丢到队列中去,消费者尽自己可能的去消费消息就完事儿了,慢可能是慢了一点,起码我系统没有崩溃。(前提该业务得支持异步处理 ,我们知道一个dag中会有多个节点,每个节点的input &output是不一样的,当节点x完成之后产生了节点x+n的input,那这个时候x+n这个节点直接跑就是了,他不需要等他们两个中间的节点跑完才运行,因为逻辑上他们没有依赖关系)
消息幂等——重试导致重复
RocketMQ 的 Exactly-Once 投递语义,就是用于解决幂等问题。Exactly-Once 是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。
最佳方法是通过不同的消息的ID来区分,但这样所有消费者都需要记录所有已经被消费的id。不合理。
幂等判度通过为每条消息设置一个MessageKey,唯一标识业务。
只有第一个MessageKey能够被消费。
消息过滤
我们说了Topic是消息的一级类型,比如订单topic,可能分为三个关于订单的模块。
创建订单,处理订单,取消订单。
通过给消息指定tag可以来区分消息类型,从而在Rocket MQ服务端过滤
Rocket MQ架构
几个主要组成部分的功能:
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。
Name server集群:负责命名服务,更新和发现 Broker 服务
名称服务器。
先看官方解释“是一个几乎无状态节点,可集群部署,节点之间无任何信息同步”。听不懂很正常。来看下他的功能:提供命名服务,更新和发现 Broker 服务。
- 注册功能:接收broker的请求,注册broker的路由信息。
- 发现功能:接收client(producer/consumer)的请求,根据某个topic获取其到broker的路由信息
解释一下,注册即每个broker启动时会先到Name server注册。
发现即,Producer发消息前会根据topic到Name server获取路由(这个路由是到broker的)信息。
而相应的,consumer会定时获取topic路由信息。
Broker:是RocketMQ的核心,负责存储/转发消息。
可以理解为消息队列服务器,提供了消息的接收、存储、拉取和转发服务。broker是RocketMQ的核心,它不不能挂的,所以需要保证broker的高可用。
分为 Master Broker 和 Slave Broker,一个 Master Broker 可以对应多个 Slave Broker,但是一个 Slave Broker 只能对应一个 Master Broker。两者的BrokerName是相同的,但是Broker_ID不同,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。
Broker 启动后需要完成一次将自己注册至 Name Server 的操作;每个Broker与Name Server集群中的所有节点建立长连接,每隔 30s 定期上报Topic信息到所有Name Server。
生产者&消费者:
生产者:
与 Name Server 集群中的一个随机节点建立长链接(Keep-alive),定期从 Name Server 读取 Topic 路由信息
并向提供 Topic 服务的 Master Broker 建立长链接,且定时向 Master Broker 发送心跳。
消费者类似:但是broker连接不同和心跳的发送包含了slave节点
与 Name Server 集群中的其中一个节点(随机)建立长连接,定期从 Name Server 拉取 Topic 路由信息,并向提供 Topic 服务的 Master Broker、Slave Broker 建立长连接,且定时向 Master Broker、Slave Broker 发送心跳。Consumer 既可以从 Master Broker 订阅消息,也可以从 Slave Broker 订阅消息,订阅规则由 Broker 配置决定。
关键——通信流程
- broker启动,注册自己的主题至nameserver。即name server在这里发挥注册功能,注册broker的主题和路由信息。而后每隔30s,其都要向Name server 同步心跳,以证明自己还活着(如果name server发现120s 该broker还没消息,就认为其挂了,去除其注册信息)。
- producer先查name server,以知道自己topic下有哪几个broker及其路由信息.而后name server返还对应信息。
- 通过负载均衡策略(如轮询)跟每一个broker建立长连接,(比如有a b两个broker,producer会均分自己的消息给这二者)
- consumer启动也会查name server,同2.
- consumer通过拉取操作pull,负载均衡主动轮询拉取消息,而后进行处理。
关键2——高可用方案
生产高可用:建立2-3个name server,且多主多从broker方案。
多个name server可以避免单点故障。。
选master broker做主,slave broker做从,主做消息写入、读取,从属服务器只做消息提取以进行消息同步,consumer 每次pull从属服务器
情况一: 如果某个master broker挂掉了
如果某个master a主服务器挂掉,自然会停止主从同步,且producer向master 主服务器发送消息也会失败。(注意老版的slave不会主动升级成为master,但是如果用raft就可)
又因为consumer连接的是从属服务器,原有积压的数据可以继续消费。
数据是否会丢失? 取决于复制方式,如果异步复制会丢,同步复制不会丢。
异步复制:不关心broker是否主从同步,就返回给Producer,适合重视效率速度,不在乎完整性
可能导致主从同步不一致,就丢包
同步复制:使用在银行等高完整性保护场景,要阻塞,(阻塞是指主从同步过程中,让producer一直等着,直到响应)
又因为会有重试机制。
消息重试
消息在消费方消费失败后,RocketMQ 服务端会重新进行消息的投递,直到消费者成功消费消息,当然重试有次数限制,默认 16 次。
消息重试在一定程度上保证了消息不丢失,通过重试来达到最终被消费的目的。需要注意的是消费者在消费的时候一定要等本地业务成功后才能进行 ACK(消费确认),不然就会出现消费失败,但是已经 ACK,消息将不会重复投递。
如果采取异步消费的方式,需要进行异步转同步,等异步操作完才进行 ACK
最后需要做好对应的监控,如果重试了 4,5 次还是失败的,基本上后面重试也是失败的。这个时候需要让开发人员知道,该人工处理的就人工介入。或者直接监控死信队列。
知道了重试机制,
a 挂掉之后,因为我们是多主broker,producer经过对a的重试,发现不行,这时可以把消息发到b。注意a挂120s后name server就会将其注册信息移除
情况2:如果master & slave都挂掉了
代表整个a集群都挂掉了,此时a积压的数据consumer无法pull,但也不会丢失,因为数据会持久化到硬盘上,等a恢复可以继续消费
producer重试发送消息给b,可以正常消费
情况3:如果多主同时挂掉
消费者高可用:只要主从服务器有一个没有挂,消费者就不会停止。
情况4:name server挂了 因为不是单点name server 生产者和消费者,在默认server挂掉之后,都可选择另外的Name server,以获取最新的broker信息。
情况5:某个集群因为网络原因,和某一个name server无法通信,导致多个Name server之间状态不一致 如果consumer默认选择的是缺失消息的那个name server,那就只能从部分broker中获取消息,导致另外的broker积压。 如果能明确知道哪个name server缺失了broker信息,及时从consumer默认选择中移除。
消息类型-事务消息一致性
常用的四种消息类型:普通,顺序,定时,事务。
普通消息:无特性。注意消息可以同步发送或者异步发送,同步要得到服务端返回结果继续,异步不用等,随时可以处理服务端的响应。
顺序消息:顺序消息是指生产者按照一定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即先发布的消息一定会先被消费者接收到。比如数据分发的场景,如果我们订阅了 Mysql 的 binlog 来进行数据异构。消息要是没有顺序,就会出现数据错乱问题。比如新增一条 id=1 的数据,然后马上删除。这样就产生了两条消息。正常的消费顺序是先新增,然后删除,此时数据是没有的。如果消息没有顺序,删除的先被消费了,然后消费新增的,此时数据还在,没被删除掉,就会导致不一致。
定时消息:定时消息可以在订单超时未支付自动取消等场景使用。
事务消息:,通过 RocketMQ 事务消息能达到分布式事务的最终一致性。
RocketMQ 的事务消息采用了二阶段提交的方式。并且结合了消息反查的机制来确保最终一致性。
发送方首先发送半事务消息到 RocketMQ 服务端。(半消息:相当于临时消息,在此状态下的消息,无法被投递给消费者)
RocketMQ 服务端接收到消息,然后将消息持久化成功之后,向发送方返回 Ack 确认消息已经发送成功,此时消息为半事务消息,不会投递给消费方。
收到半事务消息的 Ack 后,发送方开始执行本地事务逻辑。发送方根据本地事务执行结果向服务端提交二次确认,如果本地事务执行成则进行消息的 Commit,如果执行失败则进行消息的 Rollback;
服务端收到 Commit 状态则将半事务消息标记为可投递,消费方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,消费方将不会收到该消息。
如果出现意外情况,步骤 4 没有进行消息的二次确认,等待固定时间后服务端将对该消息发起消息回查。
发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行操作。
本地事务消息封装
上述模式导致每一个业务场景都需要反查。所以ebay提出了一个新的方案——本地消息表
- 本地事务消息在服务端对应数据库中建一个消息表,发消息并不直接发给MQ,而是往消息表中插入一条消息数据,这个插入的动作和本地事务逻辑相同,如果本地事务执行成功,消息才会落表成功,才会发送给 MQ, 本地事务失败,消息数据回滚。对应上图事务插入到消息表的左边部分。
- 然后需要有一个专门的程序去拉取消息表中未发送的消息投递给 MQ,如果投递失败,可以一直重试,直到成功或者人工介入。对应上图中间右边一个步骤。
前面我们提到,RocketMQ 的事务消息会有回查的机制,消息表的方式,也需要有一个机制来保证消息被消费了,否则就需要不断的重试去发送消息,直到消息被消费。
在消息表中需要有一个字段来标识当前这条消息的状态,比如 未发送,已发送,已消费。当消息还是未发送的时候就会被发送到 MQ, 如果发送成功了,状态就是已发送。但是过了几分钟,状态还是已发送,这个时候就要去做一些动作了。
一般主要是消费能力不够导致堆积。
消费模式
RocketMQ 消费模式有两种,集群消费和广播消费。
集群消费
消费者部署了多个实例我们称之为一个集群,集群消费只会被其中的某一个实例进行消费。
上图可能有一些小问题,我理解如果是不同消息,不应该由同一个group来消费,但可能这个图的意思是同一种消息的不同个。
广播消费让每个示例都消费所有消息
使用情况:比如使用本地缓存,数据刷新时,需要刷新每个节点的本地缓存,那么每个节点都需要接收到同样的信息。
常见问题
主要参考https://blog.csdn.net/lupengfei1009/article/details/114525762
博主提的问题很好,但是讲的不是很清楚。
顺序问题
- 生产者要求按照id等唯一标识,来分配消息队列
- 消费端采用专用监听器,保证对队列单线程应用。
假如生产者产生了 2 条消息:M1、M2,假定 M1 发送到 S1,M2 发送到 S2,如果要保证 M1 先于 M2 被消费,怎么做?
- 答:让两者发到同一个server上,由于server按收到的顺序分配,M1先被消费者收到。
- 问题:会导致吞吐量不够(并行度降低),会导致一个点阻塞整个流程。
- 优化:首先明确不在意顺序的应用大量存在的,其次在某个队列无序,并不一定意味着消息无序,我们应该在业务层面保证消息顺序,而不仅仅依赖于消息系统。
重复问题
主要导致原因是网络不可达,消费完broker没有收到ack,消息重投又发给了consumer,导致消费端收到了两条一样的消息
我们上面说了幂等性
RocketMQ 的 Exactly-Once 投递语义,就是用于解决幂等问题。Exactly-Once 是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。
最佳方法是通过不同的消息的ID来区分,但这样所有消费者都需要记录所有已经被消费的id。不合理。
幂等判度通过为每条消息设置一个MessageKey,唯一标识业务。
只有第一个MessageKey能够被消费。
解决方案:
redis:发送消息前向redis中插入消息表主键的信息,如果没插成功,则要查一下是否已经发送过了。
消费者用id或者MessageKey判断。
RocketMQ Broker中的消息被消费后会立即删除吗?
不会,只会更新offset,每条消息都会持久化到CommitLog中,48小时过后清楚不再使用的log,每个Consumer连接到Broker后会维持消费进度信息,当有消息消费后只是当前Consumer的消费进度(CommitLog的offset)更新了。
长轮询机制——RocketMQ消息是push还是pull?
RocketMQ ** 没有push,都是pull,即拉取方式 。
虽然有push类,但是底层实际采用的是长轮询机制:客户端发起pull请求,服务端接受请求后,如果发现队列没有消息,并不返回,持久化一段时间,在这段时间内不断轮询队列中是否有新消息,如果有则用现有连接将消息返回,反之返回空** ,是pull。
为什么要主动拉取消息而不使用事件监听方式?
时间监听得建立好长连接,然后由broker主动推送。消费能力如果跟不上则会出现在某一个consumer端大量堆积,却不能被其他consumer消费的情况,pull会根据consumer的能力主动拉,在consumer端不会堆积。
Broker怎么处理pull请求?
consumer请求broker,broker检查是否由符合条件的消息:
- 有:响应并等待下次请求。
- 没有:几种处理方式:1.每隔 1ms 检查 commitLog 中是否有新消息,有的话写入到 pullRequestTable。2.保持连接,每5秒检查一次 pullRequestTable 有没有消息,有的话立即推送3.挂起
负载均衡
通过topic在多个broker中分布式存储实现
producer端:生产者负载均衡的过程的实质就是选择broker集群和queue_Id的过程。
每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下,如下图,消息分布在多个broker中,为负载消费做准备,来达到写入负载均衡。
维护一个随机的index,每次递增然后模broker的个数。从而得到broker的id。
consumer端:平均分配算法来进行负载均衡。
在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。
而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。
我使用过的是环状轮流分queue的形式:
这个时候比如5个consumer,3个group,第一次分配是221,第二次是212,第三次122.
少的时候consumer和一个queue是一对一消费,但是当consumer多余queue的时候,就会浪费consumer。
消息存储内容
broker存储目录结构:
队列中存放的消息体结构:(某didi shabby面试官居然问了我这个)
Rocket MQ 详解相关推荐
- Rocket MQ详解
Rocket MQ 一,是啥,从哪来 RocketMQ是一个开源的分布式消息中间件,最初由阿里巴巴集团开发.它的设计目标是为了在高并发.高吞吐量的场景下,实现可靠的消息传输,并且具有良好的可伸缩性和可 ...
- mq系列传感器的程序_消息中间件(一)MQ详解及四大MQ比较
一.消息中间件相关知识 1.概述 消息队列已经逐渐成为企业IT系统内部通信的核心手段.它具有低耦合.可靠投递.广播.流量控制.最终一致性等一系列功能,成为异步RPC的主要手段之一.当今市面上有很多主流 ...
- (十四)消息中间件MQ详解及四大MQ比较
一.消息中间件相关知识 1.概述 消息队列已经逐渐成为企业IT系统内部通信的核心手段.它具有低耦合.可靠投递.广播.流量控制.最终一致性等一系列功能,成为异步RPC的主要手段之一.当今市面上有很多主流 ...
- MQ详解及四大MQ比较
一.消息中间件相关知识 1.概述 消息队列已经逐渐成为企业IT系统内部通信的核心手段.它具有低耦合.可靠投递.广播.流量控制.最终一致性等一系列功能,成为异步RPC的主要手段之一.当今市面上有很多主流 ...
- 消息中间件(一)MQ详解及四大MQ比较
一.消息中间件相关知识 1.概述 消息队列已经逐渐成为企业IT系统内部通信的核心手段.它具有低耦合.可靠投递.广播.流量控制.最终一致性等一系列功能,成为异步RPC的主要手段之一.当今市面上有很多主流 ...
- 详解RPC远程调用和消息队列MQ的区别
谈到分布式架构,就不得不谈到分布式架构的基石RPC. 什么是RPC RPC(Remote Procedure Call)远程过程调用,主要解决远程通信间的问题,不需要了解底层网络的通信机制. RPC服 ...
- 消息中间件系列(四):消息队列MQ的特点、选型、及应用场景详解
前面集中谈了分布式缓存Redis系列: 高并发架构系列:分布式锁的由来.特点.及Redis分布式锁的实现详解 高并发架构系列:Redis并发竞争key的解决方案详解 高并发架构系列:Redis缓存和M ...
- Linux安装消息队列IBM MQ 7.5开发版安装配置详解
消息队列IBM MQ 7.5开发版安装配置详解 文章目录 消息队列IBM MQ 7.5开发版安装配置详解 前言 一.什么是IBM MQ? 二.安装前准备 1.安装前准备 2.安装MQ Server 3 ...
- MQ消息队列详解、四大MQ的优缺点分析
MQ消息队列详解.四大MQ的优缺点分析 前言 面试题切入 面试官心理分析 面试题剖析 ①为什么要使用MQ 系统解耦 异步调用 流量削峰 消息队列的优缺点 四大主流MQ(kafka.ActiveMQ.R ...
最新文章
- golang 切片 slice 去掉重复元素
- mongo java client_mongodb java客户端的使用,即MongoClient
- 现代谱估计:Blackman-Tukey 相关图
- Java1.7之后Arrays.sort对数组排序DualPivotQuicksort.sort
- 【Python】Matplotlib绘制七彩锥面
- 花书+吴恩达深度学习(二四)蒙特卡罗方法(重要采样,MCMC)
- Linux定时函数介绍
- ThickBox在ASP.NET中的应用
- mac IDEA java 如何进入到方法跳转 快捷键
- 韩顺平 2021零基础学Java 学习笔记(3)(自用)
- 【笔记分享】LED点阵屏幕显示原理
- c语言司机牌照前两位数一样,c语言
- Bzoj 4173 数学
- eclipse、ddms、android studio连接不上手机问题解决
- (未解决)SpringMVC学习——为什么网址不是locahost而是desktop-nottqjs(如图)
- 【Python办公自动化】使用reportlab制作pdf报告
- win10 labelme 使用记录
- Websocket服务端和客户端通信(WSS、WS)
- 看果粉如何让拾主自动归还丢失的iPad
- C语言习题11.3,用指针数组编程输出月份的英文表示