前言

MQ使用场景

  • 异步、解耦、削峰填谷

MQ选型

  • 吞吐量:

    • Kafka具有更高的吞吐量。Kafka在Producer端将多个小消息合并,批量发送给Broker,从而提高系统的吞吐量。同时,Kafka默认采用异步发送的机制,这样的设置也可以提高其吞吐量。由于Kafka的高吞吐量,因此通常被用于日志采集、大数据等领域。
    • RocketMQ不采用异步的方式发送消息。因为当采用异步的方式发送消息时,Producer发送的消息到达Broker就会返回成功。此时如果Producer宕机,而消息在Broker刷盘失败时,就会导致消息丢失,从而降低系统的可靠性。
  • 单机支持的topic数量
    • RocketMQ/Metaq单机可以支持更多的topic数量。因为Kafka在Broker端是将一个分区存储在一个文件中的,当topic增加时,分区的数量也会增加,就会产生过多的文件。当消息刷盘时,就会出现性能下降的情况。而RocketMQ/Metaq是将所有消息顺序写入文件的,因此不会出现这种情况。
    • 当Kafka单机的topic数量从几十到几百个时,就会出现吞吐量大幅度下降、load增高、响应时间变长等现象。而RocketMQ/Metaq的topic数量达到几千,甚至上万时,也只是会出现小幅度的性能下降。
  • 主流的mq有kafka、rabbitmq、rocketmq、activemq,选型主要基于以下几点考虑:
    • 由于我们系统的qps压力比较大,所以性能是首要考虑的要素。
    • 开发语言,由于我们开发语言是java,方便二次开发。
    • 对于高并发的业务场景是必须的,所以需要支持分布式架构的设计。
    • 功能全面,由于不同的业务场景,可能会用到顺序消息、事务消息等。
      基于以上几个考虑,我们最终选择了RocketMQ。

1. RocketMQ概述

1.1 基本介绍

  • 阿里巴巴自研的一款分布式消息中间件,使用Java语言开发。
  • MetaQ 和 RocketMQ 的区别:阿里巴巴内部版本为MataQ,外部开源的版本为RocketMQ。

1.2 发展历程

  • 2007年,阿里巴巴启动五彩石项目(淘宝网和淘宝商城数据打通),Notify作为项目中交易的核心消息流转系统,Notify就是RocketMQ的雏形。
  • 2011年初,Kafka开源。淘宝中间件团队在对Kafka进行深入研究后,开发了一款新的MQ:MetaQ。
  • 2012年,MetaQ发展到3.0版本,在其基础上进行了进一步的抽象,形成RocketMQ,然后进行了开源。
  • 2016年双十一,RocketMQ承载了万亿级消息的流转。11月28日,阿里巴巴向 Apache 软件基金会捐赠 RocketMQ,RocketMQ成为Apache孵化项目。
  • 2017年9月25日,Apache宣布RocketMQ孵化成为Apache顶级项目,RocketMQ成为国内首个互联网中间件在Apache的顶级项目。

2. RocketMQ基本原理

2.1 RocketMQ物理架构图

  • NameServer:NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。它是RocketMQ的注册中心,管理两部分数据:集群的Topic-Queue的路由配置和Broker的实时配置信息。其它模块通过Nameserver提供的接口获取最新的Topic配置和路由信息。

    • Producer/Consumer :通过查询接口获取Topic对应的Broker的地址信息
    • Broker : 注册配置信息到NameServer, 实时更新Topic信息到NameServer
  • Broker:分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。
  • Producer:Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
  • Consumer:Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

2.2 RocketMQ逻辑架构图

  • Producer Group:用来表示一个发送消息应用,一个 Producer Group 下包含多个 Producer 实例,可以是多台机器,也可以 是一台机器的多个进程,或者一个进程的多个 Producer 对象。一个 Producer Group 可以发送多个 Topic 消息,Producer Group 作用如下:

    • 标识一类 Producer
    • 可以通过运维工具查询这个发送消息应用下有多个 Producer 实例
    • 发送分布式事务消息时,如果 Producer 中途意外宕机,Broker 会主动回调 Producer Group 内的任意 一台机器来确认事务状态。
  • Consumer Group:用来表示一个消费消息应用,一个 Consumer Group 下包含多个 Consumer 实例,可以是多台机器,也可 以是多个进程,或者是一个进程的多个 Consumer 对象。一个 Consumer Group 下的多个 Consumer 以均摊 方式消费消息,如果设置为广播方式,那么这个 Consumer Group 下的每个实例都消费全量数据。

  • 使用集群模式模拟广播:如果业务需要使用广播模式,也可以创建多个 Consumer ID,用于订阅同一个 Topic。

2.3 Producer

2.3.1 Producer负载均衡

  • Producer端的负载均衡看似是针对队列的选择,实际上是对broker的选择,为了保证高峰期的低延时及规避个别broker对整体性能的影响,负载均衡策略的选择是至关重要的,下面介绍Producer两种负载均衡策略

    • ①Round Robin + 容错:按照队列列表轮循,当上一次请求的broker出现异常时,本次请求会通过简单容错机制进行跳过。
    • ②broker RT排序 + 延时容错:维护一个Map,统计发送消息的每个broker的RT,优先从RT较低的几个broker中选择,当某个broker出现异常时,通过延时容错机制,使其在一定时间内更难被选中。
  • Producer默认采用第一种负载均衡策略,但第二种策略在面对个别broker抖动、故障时的应对能力及降低毛刺能力更高,对于一些RT比较敏感的应用,是一个比较好的选择。
  • 第一种负载均衡策略,当Producer发送消息时,默认会轮询目标Topic下的所有MessageQueue,并采用递增取模的方式往不同的MessageQueue上发送消息,以达到让消息平均落在不同的queue上的目的。而由于MessageQueue是分布在不同的Broker上的,所以消息也会发送到不同的broker上。

2.3.2 Producer发送失败的场景

  • 以最常用的同步发送为例:

    • 如果保证消息不丢,那么Broker.Master就必须同步刷盘成功;
    • 如果消息不丢的同时,如果Master故障,消费者也能立马消费到消息,那么Broker.Slave也必须同步刷盘成功;
    • 如果能容忍掉电级别导致的消息丢失,那么Broker.Master只需要写入PageCache即可。
  • 对高可靠的要求不同,Broker的刷盘策略及HA策略也各不相同,Producer的处理逻辑自然也就不同,站在Producer的角度,可以将发送失败分为:系统失败、业务失败及高可用失败
失败类型 具体形式 客户端表现
系统失败 ①客户端异常:Producer无法获取broker的地址;②通讯层面的异常:连接不可用、请求超时等;③Broker异常:磁盘满了、创建文件失败、写入PageCache超时等 MQClientException、RemotingException、MQBrokerException
业务失败 ①消息Topic长度超过上限;②消息体大小超过上限;③消息的properties长度超过上限等 MQClientException、MQBrokerException
高可用失败 ①Broker.Master刷盘失败;②Broker.Slave不可用或刷盘超时 无异常,根据发送返回值SendResult.sendStatus来判断
  • 针对系统失败和业务失败,可通过DefaultMQProducer.retryTimesWhenSendFailed来配置重试次数,对于高可用失败,可以通过DefaultMQProducer.retryAnotherBrokerWhenNotStoreOK来配置切换broker的重试,但建议在业务代码中,手动捕获相关异常并检查返回值的发送状态,以达到更灵活的控制。

2.3.3 合适的消息体大小

  • 客户端和Broker都有一个消息体大小的阈值配置,我们可以根据需求调整相关配置,但是消息体大小对整体的性能起到至关重要的作用,于是这个阈值必须得得到严格的控制。
  • 阿里云上的RocketMq默认的消息体大小阈值是256K,部分区域可以达到4M,我们在平常使用时,应在满足业务需求的同时尽量控制消息体的大小。

2.3.4 延时消息

  • 延时消息定义:生产者把消息发送到消息队列中以后,并不期望被立即消费,而是等待指定时间后才可以被消费。
  • RocketMQ中,只支持特定级别的延迟消息,但是不支持任意时间精度的延迟消息(最新引擎基于TimerWheel+Timerlog,实现了秒级的任意时间延时)。如果要支持任意时间精度,不能避免在Broker层面做消息排序,再涉及到持久化的考量,那么消息排序就不可避免产生巨大的性能开销。
  • 在RocketMQ中,消息延迟级别分别为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18个级别。
  • 延时消息在RocketMQ的工作流程如下:
    • 如果消息的延迟级别大于0,则表示该消息为延迟消息,修改该消息的主题为SCHEDULE_TOPIC_XXXX,队列Id为延迟级别减1。
    • 消息进入SCHEDULE_TOPIC_XXXX的队列中。
    • 定时任务(延迟消息服务类ScheduleMessageService)根据上次拉取的偏移量不断从队列中取出所有消息。
    • 根据延时级别的时间,筛选出已经到期的消息
    • 根据消息属性重新创建消息,清除延迟级别,恢复原主题和队列Id。
    • 重新发送消息到原主题的队列中,供消费者进行消费。

2.3.5 顺序消息

  • RocketMQ中的同一个队列,消息的到达顺序决定了消息的消费顺序(FIFO),但是RocketMQ是无法保证全局消息的有序性,原因是如果读写队列有多个,消息就会存储在多个队列中,消费者负载时可能会分配到多个消费队列同时进行消费,多队列并发消费时,无法保证消息消费顺序性。
  • 同一个队列实现顺序消息的流程
  • MessageQueueSelector(队列选择器)可以将顺序消息发送到同一个消息队列上,只需在消息发送的过程实现MessageQueueSelector接口的select方法,在select方法中根据订单ID选择消息队列即可。
  • 保证消息发送到同一个消息队列之后,还需要保证顺序消息,在RocketMQ中MessageListenerOrderly自带此实现,如果使用MessageListenerConcurrently则需要使用单线程模式

2.4 Topic

  • 一个topic的消息会被分成多个MessageQueue,存在多个Broker上,一般会将多个MessageQueue,平均分配给同一个ConsumerGroup的多台机器来消费,一般的原则就是一个MessageQueue只能给ConsumerGroup的一台机器来消费,但是一台机器可以消费多个MessageQueue。
  • 即每个Topic在Broker上会划分成几个逻辑队列,每个逻辑队列保存一部分消息数据,但是保存的消息数据实际上不是真正的消息数据,而是指向commit log的消息索引。
  • MessageQueue是Broker上的队列在consumer上的逻辑表示,并不是真正的队列,没有实际容量,通过topic、brokerName、queueId三个属性建立与Broker上队列一对一关系。MessageQueue不是真正存储Message的地方,真正存储Message的地方是在CommitLog。
  • Producer发送消息通过MessageQueue指定某个broker上的某个topic下的第几个consumerQueue,Consumer在通过MessageQueue拉取消息时(PullMessageRequestHeader),找到该broker上consumer queue 目录下topic的第几个consumerQueue文件,再根据tag hashcode和commintlog找到具体的消息内容,通过netty网络请求返回最终的消息信息Remoting Command。

2.5 消息存储

  • commitlog顺序写入,默认1G,满了新建;consumer消费topic分区下的consume_queue,默认30万条消息一个文件。顺序写入,随机读取。

    • mmap技术在映射的时候,一般有大小限制,在1.5G~2G之间,所以才让CommitLog单个文件1G
  • consumerQueue存储路径consumequeue/{topic}/{queueId}/{fileName},文件采取定长设计,20个字节/条(8字节的commitlog物理偏移量、4字节的消息长度、8字节tagHashcode),单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。顺序写入,顺序读取。
  • RocketMQ的消息存储是由consume queue和commit log配合完成的。consume queue是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件commit log上的位置。Commit Log的消息是真实的消息数据
  • indexfile文件,可通过 key 或时间区间来查询消息的方法。

2.6 Consumer消费

2.6.1 Consumer消费过程

  • 集群模式下,一个Queue只能由一个消费者消费,消费者进度保存在服务器端

2.6.2 Consumer负载均衡算法(集群)

  • 平均分配算法(默认):给每个consumer分配均等的队列(MessageQueue)。一个consumer可以对应多个队列,一个队列只能给一个consumer消费,consumer和队列之间是一对多的关系。如果consumer的数量大于队列的数量,会有部分consumer分配不到队列,这些分配不到队列的Consumer机器不会有消息到达。
  • 循环平均分配算法:遍历消费者把MessageQueue分一个给遍历到的消费者,如果MessageQueue数量比消费者多。需要进行多次遍历,遍历次数=MessageQueue数量/消费者数量。
  • 基于机房分配算法:只消费指定机房的MessageQueue,broker命名必须按照格式:机房名@brokerName,因为分配时先按照机房名称过滤所有的MessageQueue,再按照平均分配策略进行分配。
  • 按照机房就近分配算法:相对于机房分配算法,可以对没有消费者的机房进行分配,如果一个机房没有消费者,就把这个机房的MessageQueue分配给集群中所有的消费者。
  • 一致性hash分配算法:把消费者经过hash计算分布到hash环上,对所有的MessageQueue进行Hash计算,找到顺时针方向最近的消费者节点进行绑定。
  • 配置分配算法(自定义):消费者启动时可以指定消费哪些MessageQueue

2.6.3 两种消费方式

  • RocketMQ两种消费方式:Pull和Push

    • Pull:使用较少,因为需要自己处理消息消费位点的维护,消息拉取频率的设置,消息拉取和消费异常。
    • Push:当消费者发送请求到Broker去拉取消息的时候,如果有新的消息可以消费,那么会立马返回一批消息到消费者机器里去,处理完之后,会接着立刻发送请求到Broker去处理下一批消息,消息的时效性非常好。
    • Push模式下的请求挂起和长轮询的机制:当请求Broker时,如果没有消息需要处理,会让请求线程挂起,默认时间15s,期间有个后台线程每隔一段时间去检查一下是否有新的消息,如果在挂起的过程中如果有新的消息到达会主动唤醒挂起的线程,然后将消息返回。
  • 默认配置下JVM只有一个客户端实例,即只有一个拉取消息线程,它负责当前JVM中的所有消费者实例的拉取消息行为,当拉取到消息后,根据消息的消费者实例找到对应的消费线程池,执行消费行为。
  • 因为拉取行为和消费行为是异步的,就存在消费速度跟不上拉取速度的情形,这时就需要对拉取消息进行限流,相关配置有:
    • pullThresholdForQueue及pullThresholdSizeForQueue:客户端针对单个队列缓存的未消费消息个数及大小(MiB)的限流。
    • pullInterval:拉取消息线程针对单个队列的拉取行为的时间间隔,默认为0,也就是不间断。
    • pullBatchSize:客户端一次拉取请求的最大条数,默认:32,但是需要注意,broker也有阈值控制,默认为800。
    • consumeMessageBatchMaxSize:一次消费的最大条数,也就是单个消费线程一次消费多少消息,默认为1。如果修改改值,注意需要在消费逻辑中手动控制ackIndex。
    • consumeConcurrentlyMaxSpan:客户端单个队列中,首尾消息的跨度阈值,默认为2000,用于极端情况下的限流。例如:个别消息消费过程出现死锁或耗时特别长,导致该条消息的进度迟迟无法更新;或个别消息消费过程出现异常,且在发送回broker时出现失败,使得消费失败消息无法进入重试队列。
    • consumeTimeout:单个消息消费超时时间,默认为15min,超过15min仍然没有消费成功的消息,直接发回broker,放入重试消息队列,防止阻塞消费进度。具体分析见下面第4条。
    • consumeThreadMin及consumeThreadMax:消费线程池采用JDK默认线程池,且没有配置阻塞队列长度(默认Integer.MAX_VALUE),所以不要寄望线程池的动态扩容,而是需要计算好consumeThreadMin的值。
  • 消费者拉取消息会批量拉取,原因有两点:
    • 批量拉取减少了Consumer端的IO次数,等于是小IO合并为了大IO,提升了效率;
    • 利用pageCache的预取算法,利用其批量、提前、预测的读取能力,提升cache命中。

2.7 Broker

  • Broker高可用原理:RocketMQ 4.5引入了DLedger机制,DLedger是利用了Raft算法实现Broker主从节点的故障自动转移以及数据同步。
  • DLedger存在的问题:选举过程不能提供服务;至少要复制到半数以上的节点才返回写入成功,性能不如主从异步复制;资源损耗导致成本高
    • 解决方案一(开源RocketMQ):选举过程不能提供服务:Producer开启sendLatencyFaultEnable(容错机制),如果发现Broker无法访问,就会自动回避访问这个Broker一段时间,就可以规避选举过程。
    • 解决方案二(公有云RocketMQ):通过 跨副本组故障转移 代替副本组选主,规避选主期间的不可用。因为消息系统中可从任意节点拉取数据组成完整的消息流。消息系统在一致性协议上的特点是无需选主和Random Read,
  • Broker主从数据同步:Leader Broker将数据同步给Follower Broker的过程叫“Log Replication”。数据同步分为两个阶段:uncommitted阶段、commited阶段。
    • Leader Broker接受到一条消息后,会标记为uncommitted状态,然后通过DLedgerServer组件将uncommitted的消息数据发送给Follower Broker的DLedgerServer。
    • Follower Broker的DLedgerServer收到uncommitted消息之后,返回一个ack给Leader Broker的DLedgerServer,如果Leader Broker收到了超过半数Follower Broker返回的ack,就会将消息标记为committed状态。
    • Leader Broker上的DLedgerServer就会发送commited消息给Follower Broker机器的DLedgerServer,让他们也把消息标记为comitted状态。
  • Broker优化:内存预映射机制:Broker会针对磁盘上的各种CommitLog、ConsumerQueue文件预先分配好mapperFile(接下来要读写的磁盘文件),提前使用mappedByteBuffer执行map函数完成映射。文件预热:文件映射后并不会加载到内存中,还需要进行madvise系统调用,以提前尽可能多地把磁盘文件加载到内存去。

3. RocketMQ的特点

3.1 RocketMQ速度快

  • 因为使用了顺序存储、Page Cache和异步刷盘。

    • 我们在写入commitlog的时候是顺序写入的,这样比随机写入的性能就会提高很多
    • 写入commitlog的时候并不是直接写入磁盘,而是先写入操作系统的PageCache
    • 最后由操作系统异步将缓存中的数据刷到磁盘
  • MessageQueue分布策略:RockectMQ在创建topic时会制定MessageQueue的数量,MessageQueue类似kafka的分区均匀分布在不同broker上,生产者发送消息时,会根据策略选择MessageQueue,实现负载均衡提高吞吐量

3.2 RocketMQ实现事务过程

  • 事务消息:MQ提供的类似XA的分布式事务能力,通过事务消息可以达到分布式事务的最终一致性。
  • 半事务消息:MQ收到了生产者的消息,但是没有收到二次确认,不能投递的消息。
  • 如上图,具体的实现原理是:
    • 生产者先发送一条半事务消息到MQ
    • MQ收到消息后返回ack确认
    • 生产者开始执行本地事务
    • 如果事务执行成功发送commit到MQ,失败发送rollback
    • 如果MQ长时间未收到生产者的二次确认commit或者rollback,MQ对生产者发起消息回查
    • 生产者查询事务执行最终状态
    • 根据查询事务状态再次提交二次确认
    • 如果MQ收到二次确认commit,就可以把消息投递给消费者,如果是rollback,消息会保存下来并且在3天后被删除。

3.3 RocketMQ维护注册信息流程

3.4 RocketMQ网络通信框架

  • 多线程网络模型,其实有点像redis,多线程分发模型,原理类似,在处理并发的网络请求时,可以通过多线程配合
    达到提高吞吐量的目的。各个线程/线程池互不影响,责任明确

4. 关于RocketMQ的疑问

4.1 消息可靠性怎么保证?

4.1.1 生产者丢失

  • 要保证发送的消息不丢,只要消息在Master同步落盘即可,需要做到以下三点:

    • Broker的刷盘策略需要配置为同步刷盘,即FlushDiskType==SYNC_FLUSH。
    • Producer在发送消息时,properties中的“WAIT”属性设置为“true”,表示客户端同步等待刷盘完成。
    • 客户端需要手动检查发送状态,保证SendResult.sendStatus=SEND_OK。默认重试两次
  • 大部分场景下为了保证性能,都是采用同步写PageCache+异步刷盘的策略,甚至是同步写预分配内存+异步写PageCache+异步刷盘。

4.1.2 Broker丢失

  • Broker的刷盘策略配置为同步刷盘,确保Producer写入Broker成功。
  • 主从配置保证高可用。利用DLedger(Raft算法)实现Broker主从节点的故障自动转移以及数据同步。

4.1.3 消费者丢失

  • 消费者会先把消息拉取到本地,然后进行业务逻辑,业务逻辑完成后手动进行ack确认,这时候才会真正的代表消费完成。而不是说pull到本地后消息就算消费完了
  • Consumer自身维护一个持久化的offset(对应MessageQueue里面的min offset),标记已成功消费或发回Broker消息的下标
    • 如果Consumer消息消费失败,会将消息发回到Broker,然后更新自己的offset
    • 如果在消息发回到Broker过程中Broker挂了,Consumer会定时重试这个操作
    • 如果Broker和Consumer同时挂了,消息也不会丢失(CommitLog和持久化offset),在重启恢复后继续从offset继续消费消息

4.2 消息积压如何处理?

  • 短期增加消费者来消费消息,同时排查积压原因,是机器问题、网络问题、消费者代码bug等
  • 如果消费者数量 < MessageQueue的数量,增加消费者可以加快消息消费速度,
    • 如果本地消息消费慢,就会延迟一段时间后再去拉取,消费者拉取的消息存在ProcessQueue中,是有流量控制的,当保存的消息数量超过阈值(默认1000)或保存的消息大小超过阈值(默认100M)或非顺序消费中最后一条和第一条消息offset超过阈值(默认2000),就不会主动拉取。
    • 如果是顺序消息,ProcessQueue加锁失败也会延迟拉取(默认延迟时间3s)
    • 消费者延迟拉取消息,一般是因为消息消费慢,消费慢的原因是消费者处理的业务逻辑复杂,RT高;消费者有慢查询或数据库负载高导致响应慢;缓存等中间件响应慢;调用外部服务接口响应慢。
    • 增加了消费者后,外部系统调用量突增,如果达到吞吐量上限,外部系统响应变慢,甚至可能被打挂。同时也要考虑本地数据库、缓存的压力,如果数据库响应变慢,处理消息的速度就会变慢。
    • Consumer在拉取消息之前,需要对MessageQueue进行负载操作,RocketMQ使用一个RebalanceService定时器来完成负载工作,默认每间隔20s重新负载一次,默认选择平均负载策略。
  • 如果消费者数量 >= MessageQueue的数量,增加消费者是没有用的。

4.3 RocketMQ为什么不使用Zookeeper?

  • 根据CAP理论,同时最多只能满足两个点,而zookeeper满足的是CP,也就是说zookeeper并不能保证服务的可用性,zookeeper在进行选举的时候,整个选举的时间太长,期间整个集群都处于不可用的状态,而这对于一个注册中心来说肯定是不能接受的,作为服务发现来说就应该是为可用性而设计。
  • 基于性能的考虑,NameServer本身的实现非常轻量,而且可以通过增加机器的方式水平扩展,增加集群的抗压能力,而zookeeper的写是不可扩展的,而zookeeper要解决这个问题只能通过划分领域,划分多个zookeeper集群来解决,首先操作起来太复杂,其次这样还是又违反了CAP中的A的设计,导致服务之间是不连通的。
  • 持久化的机制来带的问题,ZooKeeper 的 ZAB 协议对每一个写请求,会在每个 ZooKeeper 节点上保持写一个事务日志,同时再加上定期的将内存数据镜像(Snapshot)到磁盘来保证数据的一致性和持久性,而对于一个简单的服务发现的场景来说,这其实没有太大的必要,这个实现方案太重了。而且本身存储的数据应该是高度定制化的。
  • 消息发送应该弱依赖注册中心,而RocketMQ的设计理念也正是基于此,生产者在第一次发送消息的时候从NameServer获取到Broker地址后缓存到本地,如果NameServer整个集群不可用,短时间内对于生产者和消费者并不会产生太大影响。

4.4 RocketMQ如何做到消费控速?

  • 需要消费控速的原因:单机能力有限,如果消息量增大时单机压力瞬间增长,影响服务正常访问
  • 不用sentinel控速的原因:消息请求增加时触发限流后会产生大量重试,重试消息会进入重试队列,重试的量逐渐增大,broker上重试队列中消息量也越来越多,这种无意义的重复动作会增加broker的压力。对于Consumer,同一条消息需要多次从Broker拉取,如果发生重试又要投递到重试队列,消耗Consumer机器资源。
  • 主动控速,有以下三种方式,但根据压测情况来看调大pullInterval一个参数就可达到很好的控速效果,其余两个参数可以不用调整。
    • 降低消费任务提交的速率:即降低消息拉取速率。可通过调大参数pullInterval实现,默认值0,可动态调整
    • 降低每次提交的数量:即降低每次拉取的消息数量。可通过调小参数pullBatchSize实现,默认值32,可动态调整
    • 降低消费线程数:即减小核心线程数,可通过调小参数consumeThreadMin实现,默认值20,该参数需要在Consumer.start()前设置,start之后再设置无效。消费线程池中阻塞队列采用默认大小,可以认为无界,因此最大线程数参数无效。
  • 被动控速:在消费代码中埋点精确控制消费请求qps,可通过RateLimiter实现。

4.5 consumer负载均衡失效的场景?

  • 集群消息下,设置instanceName可能会导致负载均衡失效
  • 场景:一台机器部署多个JVM,每个JVM都有相同的consumer实例,手动配置每个consumer的instanceName为固定的值。
  • 现象:该机器上的consumer实例负载均衡失效,消费同样的消息,等于变成了广播消息消费模式。
  • 原因:
    • ①客户端实例标识ClientId=ip@instanceName,在不配置instanceName的情况下,instanceName默认等于当前JVM进程id。
    • ②ClientId是负载均衡策略对一个consumer实例的唯一标识。
    • 因此当instanceName配置为固定值时,该机器上不同jvm中consumer实例的ClientId就变成一样了,负载均衡策略无法区分。

参考

  1. RocketMQ Architecture - Apache

RocketMQ原理剖析相关推荐

  1. socket之send和recv原理剖析

    socket之send和recv原理剖析 1. 认识TCP socket的发送和接收缓冲区 当创建一个TCP socket对象的时候会有一个发送缓冲区和一个接收缓冲区,这个发送和接收缓冲区指的就是内存 ...

  2. fastText的原理剖析

    fastText的原理剖析 1. fastText的模型架构 fastText的架构非常简单,有三层:输入层.隐含层.输出层(Hierarchical Softmax) 输入层:是对文档embeddi ...

  3. lua游戏脚本实例源码_Lua与其他宿主语言交互原理剖析

    Lua与其他宿主语言交互原理剖析 题外话:今天周末,刚好在家有时间就把我这次项目组内部分享的文章贴出来,分享给大家,同时也方便以后自己翻阅. 一. Lua简介 目标:Lua语言本身是用C语言来编写开发 ...

  4. Go语言底层原理剖析

    作者:郑建勋 出版社:电子工业出版社 品牌:博文视点 出版时间:2021-08-01 Go语言底层原理剖析

  5. 彻底搞透视觉三维重建:原理剖析、代码讲解、及优化改进

    视觉三维重建 = 定位定姿 + 稠密重建 + surface reconstruction +纹理贴图.三维重建技术是计算机视觉的重要技术之一,基于视觉的三维重建技术通过深度数据获取.预处理.点云配准 ...

  6. Elasticsearch分布式一致性原理剖析(一)-节点篇

    2019独角兽企业重金招聘Python工程师标准>>> 摘要: ES目前是最流行的开源分布式搜索引擎系统,其使用Lucene作为单机存储引擎并提供强大的搜索查询能力.学习其搜索原理, ...

  7. java 反序列化 ysoserial exploit/JRMPListener 原理剖析

    目录 0 前言 1 payloads/JRMPClient 1.1 Externalizable 1.2 生成payload 1.3 gadget链分析 2 exploit/JRMPListener ...

  8. 统计学习方法|支持向量机(SVM)原理剖析及实现

    欢迎直接到我的博客查看最近文章:www.pkudodo.com.更新会比较快,评论回复我也能比较快看见,排版也会更好一点. 原始blog链接: http://www.pkudodo.com/2018/ ...

  9. 统计学习方法|逻辑斯蒂原理剖析及实现

    欢迎直接到我的博客查看最近文章:www.pkudodo.com.更新会比较快,评论回复我也能比较快看见,排版也会更好一点. 原始blog链接: http://www.pkudodo.com/2018/ ...

最新文章

  1. MongoDB学习笔记~Update方法更新集合属性后的怪问题
  2. pwm控制的基本原理_最详细的电机控制说明
  3. 开平推进智慧城市等领域信息化建设及公共数据资源共享
  4. 张凯江:架构能力-“构建”世界的能力
  5. spark sql 优化心得
  6. 使用 Productivity Power Tools 高级扩展 来帮助你提高 VS2012 的工作效率
  7. mediainfo php,media.php
  8. 《构建之法》阅读笔记03
  9. sap php 接口,SAP调用RestfulApi接口接收数据
  10. 模糊综合评价在matlab上的实现
  11. 回炉重造--数据库操作速成记
  12. C. Banh-mi
  13. 考研复试数据库原理课后习题(一)——绪论
  14. 【Unity3D开发小游戏】Unity3D零基础一步一步教你制作跑酷类游戏
  15. vue中watch的详细用法,带deep,immediate
  16. 加快深度学习模型训练速度@tf.function
  17. 2022年终总结:少年不惧岁月长,彼方尚有荣光在。
  18. Python爬虫实战(1):抓取毒舌电影最新推送
  19. 史上最超级KB的10个故事~你撑到第几个才发抖?
  20. 洛谷P5594-【XR-4】模拟赛

热门文章

  1. MySQL数据库实际应用中,需求分析阶段需要做什么?
  2. Fairplay之streamingContentKeyRequestDataForApp makeStreamingContentKeyRequestDataForApp
  3. 顶会竞赛最后5天被Facebook超越,又大比分反超夺冠是什么体验?
  4. 如何理解对数似然损失函数
  5. linux c 网络编程与信号量,详解Linux多线程使用信号量同步
  6. 部署AlphaSSL
  7. 命令行把java项目打成jar包
  8. 一个由SEO优化展开的meta标签大讲解
  9. [Spring Boot 6]企业级开发
  10. 你必需知道的5个开源游戏引擎