RocketMQ Consumer 消费拉取的消息的方式有两种
1.      Push方式:rocketmq 已经提供了很全面的实现,consumer 通过长轮询拉取消息后回调 MessageListener 接口实现完成消费,应用系统只要重写 MessageListener 的方法完成业务逻辑即可
2.      Pull方式:完全由业务系统去控制,定时拉取消息,指定队列消费等等,当然这里需要业务系统去根据自己的业务需求去实现
下面介绍 push 方式(long-polling 长轮询方式实现):
1、DefaultMQPushConsumer 构造器初始化
DefaultMQPushConsumer 初始化 groupName,默认使用 AllocateMessageQueueAveragely 算法平均分配 queue 给 consumer。
3~6 订阅给定的 Topic
4、创建订阅数据对象
subExpression 为 null,则订阅 Topic 下全部内容
5、订阅数据对象放入缓存
ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner ,放入 「4」步骤构造好的订阅数据对象
6、发送心跳
如果 MQClientInstance 已经创建(步骤「12」创建),则将 consumer 心跳发送给所有 Broker
7、8、注册监听
给当前的 consumer 注册一个回调方法,当队列有消息的时候回调这个方法
9 ~ end 启动 consumer
11、复制一份订阅数据对象(其实就是 4、5 步骤,多了一个 retry+groupName 映射。重试队列里面 queue 看到只有1个,原来的队列queue有4个)
复制一份步骤「4」创建的订阅数据对象,放入缓存 ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner,topic 为重试 topic = %RETRY%{groupname}
12、创建 MQClientInstance
13、RebalanceImpl 属性初始化
14、PullAPIWrapper 初始化
15、offerset 存储
BROADCASTING 存在 consumer 本地文件(局部进度。多个 consumer 消费相同 queue,消费进度是跟自己本身 consumer 有关,所以存本地就行,其他 consumer 不关心也不会用到其他的 consumer 的消费进度)
CLUSTERING 存在远程 broker (全局进度。不同 consumer 消费不同 queue,是一个全局的进度,上报到 broker 在 consumer 宕机后才能由其他 consumer 继续消费)
16、load() 加载消费进度
17、消息消费服务初始化
  • 顺序消息消费 
  • 常规消息并发消费 ConsumeMessageConcurrentlyService
18、消息消费服务启动
  • 常规消息,则启动一个定时任务清理过期消息 cleanExpireMsg()
  • 顺序消息并且是 CLUSTERING 模式,则启动一个定时锁队列的任务 RebalanceImpl.lockAll()
19、注册 Consumer
将 consumer 信息存入 ConcurrentMap<String/* group */, MQConsumerInner> consumerTable
20 ~  和之前的图类似,这里就不认真画出时序关系了
20、MQClient 开始
这里的 start,和 producer 的一样,使用了同一个类同一个方法
21、Netty 服务启动
22、启动定时任务
(1)如果没有设置 nameserver 地址,则定时获取 nameserver 地址
(2)定时从 NameServer 更新 topic 路由信息(包含消费者 Set<MessageQueue> subscribeInfo 、生产者 TopicPublishInfo)
(3)定时清理下线的 Broker;给所有 Broker 发送心跳(包含订阅关系)
(4)定时持久化所有消费者 Offset(即消费进度)。 存储方式参考步骤「15」
(5)定时调整线程池
23、拉消息线程启动
push 方式,却使用  this.pullMessageService.start(); 拉消息服务,因为这是一个推拉结合的实现。
24~28 负载均衡服务
遍历步骤「5、11」赋值的 subscriptionInner
(1)Push 模式的均衡:DefaultMQPushConsumerImpl  
  • 无序 
<1> 集群模式
// topicSubscribeInfoTable 是 client 从 nameserver 上拉的路由信息
a. ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<String, Set<MessageQueue>>();  
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic)
topicSubscribeInfoTable
        "%RETRY%please_rename_unique_group_name_4" -> 
                MessageQueue [topic=%RETRY%please_rename_unique_group_name_4, brokerName=LAPTOP-P9TNK0JN, queueId=0]
        
        "TopicTest" ->
                MessageQueue [topic=TopicTest, brokerName=LAPTOP-P9TNK0JN, queueId=2]
                MessageQueue [topic=TopicTest, brokerName=LAPTOP-P9TNK0JN, queueId=3]
                MessageQueue [topic=TopicTest, brokerName=LAPTOP-P9TNK0JN, queueId=0]
                MessageQueue [topic=TopicTest, brokerName=LAPTOP-P9TNK0JN, queueId=1]
29、查询 ConsumerId 列表
// cidAll = 10.240.131.92@15804
List<String> cidAll = findConsumerIdList(final String topic, final String group) 。先根据 group 去 broker address (优先使用 master,没有master 随机一个 slaver 地址    )查找所有 consumer list(还记得之前说过的 client 会定时向 broker 注册自己的信息么 「为什么 client 要向 broker 发心跳(this.mQClientFactory.sendHeartbeatToAllBrokerWithLock())?  发 group 信息给 broker」)。
30、使用负载均衡算法分配队列
allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) 。然后根据步骤「1」中的分配算法 AllocateMessageQueueAveragely 给 consumr group 分配 queue。
AllocateMessageQueueAveragely  平均分配如下图:比如默认一个 topic 有 4 个 queue,2个人消费,就是一人分两个,平均分。根据 clientId,分配到的 queueid 固定。
                            
AllocateMessageQueueAveragelyByCircle 循环平均分配:    
31、步骤「30」负载均衡完成之后,更新本地处理队列缓存
ProcessQueue (主要是 TreeMap<Offset, Msg> 和读写锁)作用:保存了所有获取到,但是还未被处理的消息
有了 ProcessQueue 的帮助 PushConsumer 会判断获取但还未处理的消息个数、 消息总大小、 Offset 的跨度,任何一个值超过定的大小就隔一段时间再拉取消息, 从而达到流量控制的目的。 此外 ProcessQueue 还可以辅助 实现顺序消费的逻辑。
updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,final boolean isOrder)
a. 如果队列已经删除或者队列两分钟没有拉取消息,则 removeUnnecessaryMessageQueue(mq, pq) 删除无用消息队列,并且 processQueue 置为 drop = true 状态
b. 
long nextOffset = this.computePullFromWhere(mq) 。计算从哪里开始拉取,根据 topic、group、queueId 查询消费进度 offset
ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable 增加正在被消费的队列
创建数据拉取请求 PullRequest                               
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset); // 消费进度
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
this.dispatchPullRequest(pullRequestList)  :是在 PullMessageService 类中,将上一步对象赋值进 LinkedBlockingQueue<PullRequest> pullRequestQueue 。这是一个阻塞队列,当有有数据赋值进来,则下面方法就可以开始执行 pullMessage(pullRequest) 的方法
向 broker 发送请求
// PullMessageService.java
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {// Push Consumer 的示例,为什么会有 PullRequest ,这是通过 long polling 长轮询方式达到 Push 效果。既有 pull 的优点不会压垮 client,又有 Push 的实时性
PullRequest pullRequest = this.pullRequestQueue.take();
if (pullRequest != null) {
this.pullMessage(pullRequest);
}
} catch (InterruptedException e) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
2、获取消费者 ()
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup())
ConcurrentMap<String/* group */, MQConsumerInner> consumerTable
3、用真正实现类去拉取消息
pullMessage(pullRequest)
判断 processQueue.isDropped() 是否已经不可用,不可用用直接 return
设置最后一次拉取时间戳 pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis())
确保服务状态正常 this.makeSureStateOK()
限流,拉取还未消费超过默认值1000,则一定延时之后再拉。 if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue())
限流,拉取未消费数据量大于100M,则一定延时之后再拉。if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue())
消费模式:
正常消费(限流,拉取消息跨度太大,一定时间后再拉。 if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()))
顺序消费 (消费队列必须已经锁定,否则果断时间再尝试拉取)      
4、获取订阅数据
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
5、创建回调对象
PullCallback pullCallback = new PullCallback()。从 broker 异步拉取成功后回调方法
6、从内存中读取 commitOffsetValue
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY) // 这里获取到的是 803 与 pullRequest.getNextOffset() 获取内容一致
7、生成 sysFlag
这里我获取到的是 3
8、拉取消息
pullKernelImpl(mq,subExpression,expressionType,subVersion,offset,maxNums,sysFlag,commitOffset,brokerSuspendMaxTimeMillis,timeoutMillis,CommunicationMode communicationMode,PullCallback pullCallback)
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setQueueOffset(offset);
            requestHeader.setMaxMsgNums(maxNums);
            requestHeader.setSysFlag(sysFlagInner);
            requestHeader.setCommitOffset(commitOffset);
            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);  // 长轮询使用参数。broker 最长阻塞时间,broker 没有消息时才阻塞,有消息立刻返回。 
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(subVersion);
            requestHeader.setExpressionType(expressionType);
9、获取 broker 地址 
findBrokerAddressInSubscribe(brokerName,brokerId,onlyThisBroker)  先从本地取,没有再去nameserve取。
10、11、使用 Netty 发起异步请求
pullMessage(addr,PullMessageRequestHeader,timeoutMillis,CommunicationMode,pullCallback)
将请求响应对应信息维护在 ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable,这样异步请求成功的时候就能在回调的时候根据 opaque,就能找到请求对应的响应
broker 接受请求:
1、处理consumer发来的请求
2、创建响应对象
3、响应对象赋值 opaque
4、获取订阅组配置
SubscriptionGroupConfig [groupName=please_rename_unique_group_name_4, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]
5、获取 topic 配置
ConcurrentMap<String, TopicConfig> topicConfigTable
this.topicConfigTable.get(topic)
TopicConfig [topicName=TopicTest, readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
6、获取消费组的信息
ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable
this.consumerTable.get(group)
7 ~ 15、获取消息 getMessage(group,topic,queueId,offset,maxMsgNums,messageFilter)
8、9 获取最大偏移量
10 获取消费队列
GetMessageResult [status=OFFSET_OVERFLOW_ONE, nextBeginOffset=805, minOffset=0, maxOffset=805, bufferTotalSize=0, suggestPullingFromSlave=false]
11、获取队列中最小偏移量,表示队列中数据当前最小位置
12、获取队列中最大偏移量,表示队列中数据最大的位置。 当传进来的下一个要读取 offset 和 maxoffset 相同则说明没有新的数据可读
13、getIndexBuffer(offset) 获取
14、commitLog.getMessage(offsetPy, sizePy)
15、selectMappedBuffer(int pos, int size) 从 mappedByteBuffer 读取信息
上面是有消息的情况,如果没有消息,response 不立刻返回,而是靠 PullRequestHoldService.java 的 run 方法轮询处理
//PullMessageProcessor.java 
....
case ResponseCode.PULL_NOT_FOUND:
                    if (brokerAllowSuspend && hasSuspendFlag) {
                        long pollingTimeMills = suspendTimeoutMillisLong;
                        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                        }
                        String topic = requestHeader.getTopic();
                        long offset = requestHeader.getQueueOffset();
                        int queueId = requestHeader.getQueueId();
                        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                            this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
                        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);  // broker hold 住请求
                        response = null;  // 没消息,没有马上返回 response
                        break;
                    }
....
PullRequestHoldService.java
@Override
    public void run() {
        log.info("{} service started", this.getServiceName());
        while (!this.isStopped()) {
            try {
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { // 允许长轮询就多等会.. 
                    this.waitForRunning(5 * 1000);
                } else {
                    this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); // 不允许长轮询,就只延迟默认 1000ms
                }
                long beginLockTimestamp = this.systemClock.now();
                // 遍历 hold 住的 request list,
                // 1、查看 topic 下的 queue 有没有新消息进来,有的话就返回 response。 
                // 2、没有新消息,而且hold时间超过了,之前传进来的 broker 最大挂起时间 timeoutMillis,也立即返回 response。
                // 3、剩余的,继续等待最后进入 1或者2
                this.checkHoldRequest();
                long costTime = this.systemClock.now() - beginLockTimestamp;
                if (costTime > 5 * 1000) {
                    log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
                }
            } catch (Throwable e) {
                log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }
        log.info("{} service end", this.getServiceName());
    }
长轮询方式的局限性,是在HOLD住Consumer请求的时候需要占用资源,它适合用在消息队列这种客户端连接数可控的场景中。
32、消息队列变更
messageQueueChanged(topic, mqSet, allocateResultSet)  将心跳信息发给所有 broker
从 broker 拉取消息后返回 consumer。对应代码就是消息怎么回调到 PullCallback
1、Netty client 的回调方法 
2 ~ ? 处理接收到的消息
4、从缓存中获取 opaque 对应的响应信息
ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable
ResponseFuture responseFuture = responseTable.get(opaque);
在发送请求的时候已经赋值好了 responseTable,所以在拿到 response 的时候,根据传回来的 opaque 就能对应上原来的 request。
5、删除缓存 opaque 信息
拿到响应对象后,就可以先从缓存删除 opaque 相应的响应信息
6、7、8 执行 netty 的回调,响应服务端的请求
因为在 consumer pull message 的时候,netty 是异步调用,所以响应的时候去回调步骤「8」
9、回调 PullCallback 的回调方法
10、处理返回的结果
11、反序列化
12、设置下一个偏移量
13、将消息放入 ProcessQueue
将消息放进 porcessQueue 的 TreeMap 数据结构里面,这个过程加锁执行
TreeMap<Long, MessageExt> msgTreeMap
14、15、16、17 将消息提交给消费执行线程池 consumeExecutor 消费
非顺序消息  ConsumeMessageConcurrentlyService 的 run 方法就会去回调 consumer 最初设置的 listener 回调方法,这样消息就到了 consumer 测试用例重写的方法了。
顺序消息  ConsumeMessageConcurrentlyService 会在 listener 回调前进行一些操作(例如mq锁检查),已经调用后失败的处理与非顺序消息不同。(顺序消息不能像无序消息一样,消费失败再次丢进 broker,这样就乱序了,只能延迟一会再消费。)
18、处理消费后的结果
广播模式消费失败,报错
集群模式消费失败,丢回 broker(sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) 这个消息不是马上就能被消费的,是有一定延迟的延迟消息),丢broker 如果失败,则自己延迟消费(submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()))
19、清除这批消息
20、持久化 offset (这属于消费完毕更新,当然还有之前启动时候的定时任务持久化 offset)
集群模式更新远程 offset(RemoteBrokerOffsetStore)
广播模式更新本地 offset
<2> 广播模式
  • 有序
(2)Pull 模式的均衡:DefaultMQPullConsumerImpl

RocketMQ 源码阅读 ---- 消息消费(普通消息)相关推荐

  1. RocketMQ源码(八)Broker asyncSendMessage处理消息以及自动创建Topic

    此前已经梳理了RocketMQ的broker接收Producer消息的入口源码RocketMQ(七)broker接收消息入口源码_代码---小白的博客-CSDN博客 在文章的最后我们到了SendMes ...

  2. RocketMQ源码阅读-NameServer篇

    说明:此代码是跟着<RocketMQ技术内幕>这本书阅读的,借鉴了很多东西,在此感谢丁威大佬和RocketMQ的贡献者们,文章如有问题,欢迎批评指正 RocketMQ版本:4.8.0 2. ...

  3. RocketMQ源码解析:Producer发送消息+Broker消息存储

    文章目录 1. Producer 发送消息 2. Broker 接收消息 1. Producer 发送消息 先上一段简单的生产者代码 public static void main(String[] ...

  4. RocketMQ源码(十)—Broker 消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析

    深入的介绍了broker的消息刷盘服务源码解析,以及高性能的刷盘机制. 学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者是异步: 1. 同步刷盘:如上图所示,只有消息真正 ...

  5. RocketMQ源码(12)—Broker 消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析【一万字】

    基于RocketMQ release-4.9.3,深入的介绍了Broker 的消息刷盘源码解析,以及高性能的刷盘机制. 学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者异 ...

  6. 探秘RocketMQ源码——Series1:Producer视角看事务消息

    简介:探秘RocketMQ源码--Series1:Producer视角看事务消息 1. 前言 Apache RocketMQ作为广为人知的开源消息中间件,诞生于阿里巴巴,于2016年捐赠给了Apach ...

  7. RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的?

    RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 文章目录 RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 前言 项目 ...

  8. RocketMQ源码分析之延迟消息

    文章目录 前言 一.延迟消息 1.特点 2.使用场景 3.demo 二.发送延迟消息 三.broker端存储延迟消息 四.总结 1.延迟消息工作原理 2.延迟消息在消费者消费重试中的应用 前言 本篇文 ...

  9. RocketMQ源码(十七)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码

    转载来源: RocketMQ源码(19)-Broker处理DefaultMQPushConsumer发起的拉取消息请求源码[一万字]_刘Java的博客-CSDN博客 此前我们学习了RocketMQ源码 ...

最新文章

  1. CEO 赠书 | 当我们谈企业文化时,我们在谈什么?
  2. Linux_unix系统编程手册以同步方式等待信号
  3. IDC、刘润:企业如何通过数字化转型,驱动业务发展?附98页PPT
  4. 《Linux多线程服务端编程:使用muduo C++网络库》书摘6.6.2节
  5. idea git 整合使用
  6. azure机器学习_如何在Azure机器学习中使用JSON数据
  7. Java内存区域-运行时数据区域
  8. 挨踢部落故事汇(10):技术晋升管理的心声
  9. containers文件夹可以删除吗_C盘空间严重不足,原来这些文件夹是可以删除的,不要弄错了...
  10. 使用Arduino IDE环境学习ESP32--CAM
  11. 类加载器详解(自己实现类加载器)
  12. matlab二进制香农,香农编码及MATLAB实现.ppt
  13. 微信公众号编辑模式下推送消息
  14. 腾讯云服务器IP地址绑定域名步骤
  15. linux控制风扇转速的命令,Linux下笔记本的风扇控制问题
  16. python mian 方法传递参数个数判定 | 简记
  17. 牧云Webshell检测神器
  18. cuda8.0 出错:/usr/bin/ld: 找不到 -lGL【转】
  19. ProtoPie 学习
  20. 5G技术—移动通信制式演进发展历程测试题目

热门文章

  1. SSE/SSE2版ceilf
  2. 俞敏洪 同济大学演讲
  3. TVS,在汽车电源线的应用详解
  4. Flajolet-Martin算法
  5. “COK-LIKE”的传承与颠覆 《万国觉醒》70天体验总结
  6. 串口低频刷卡密码键盘ID卡发卡器YD791开关设置选择
  7. Java面试必问!javasocket服务端持久化
  8. springBoot下的ftp下载
  9. 易基因:DNA甲基化研究的3大前期探索性实验思路|干货系列
  10. Java商城首页优化_Java 实战:记一次线上商城系统高并发的优化