原创不易,转载请注明出处

文章目录

  • 前言
  • 1.消息流程介绍
  • 2.源码解析
    • 2.1 并发消费
    • 2.2 顺序消费

前言

我们在《RocketMQ源码解析之消息消费者(pullMessage)》一文中介绍了消息消费者push模式拉取消息的流程,消息从broker拉取回来之后,将消息放入对应的ProcessQueue的treeMap中,接着就是提交消费请求了,本文主要是介绍下RocketMQ提供的2种消费模式(并发消费,顺序消费)的执行流程与源码剖析。

1.消息流程介绍

RocketMQ支持并发消费与顺序消费,这个采用顺序消费还是并发消费,是与你采用实现的MessageListener(这个监听器就是具体处理业务要实现的那个接口)类型有关,如果你实现的MessageListenerConcurrently这个接口的话,就是采用并发消费,如果是实现MessageListenerOrderly 这个接口的话,就是采用顺序消费。
这里先介绍下并发消费,并发消费是由ConsumeMessageConcurrentlyService 这个服务来处理的,然后当消费请求提交到这个服务的时候,它会根据你每次消费多少(默认是1),按照这个数量将消息列表分成一个个的小集合,封装消费请求,然后提交到线程池中。每个消费请求在执行的时候,会去调用你实现的MessageListener 来处理业务逻辑,然后处理消费结果,如果有失败的,会将失败消息发送给broker 的重试topic中,接着就是更新本地存储消费offset中的值(这个消费offset 会在一定条件下 同步给broker,ProcessQueue移除的时候或者是定时任务,默认是5s触发一次)。
我们在来介绍下这个顺序消费,顺序消费由ConsumeMessageOrderlyService 这个服务组件来处理,这个组件收到消费请求的时候,会封装成一个ConsumeRequest,然后扔到线程池中,就一个ConsumeRequest,然后就是当线程循环获取到的消息集合,也是根据你每次消费多少,然后一批一批的去调用你实现的MessageListener 来处理业务逻辑,执行完这一批后处理执行结果,如果你处理业务逻辑的时候发生了异常或者执行结果返回null,这个时候就不会继续消费了,会等待一会再消费,默认情况下是暂停1s,1s后再提交消费请求,如果正常消费,没有出现异常啥的,就会更新消费offset,可以说这个顺序消费能够保证你这个消费是有序的,单线程,异常暂停,其实还有就是再rebalance的时候要向broker 申请锁,也就是说只有一个消费者实例能够消费对应的一个queue,消费之前要判断这个锁有没有,然后有没有失效。接下来我们分别看下并发消费与顺序消费的实现源码是什么样子的

2.源码解析

这个要从DefaultMQPushConsumerImpl启动的时候判断监听器MessageListener类型说起。

可以看到,不同类型实现的监听器,对应的ConsumeMessageService也就不一样,接下来在看下提交消费请求的代码

第一个参数就是 从broker拉取的消息集合,然后第二个参数是ProcessQueue,第三个是对应的MessageQueue,第四个是要不要去消费

2.1 并发消费

先来看下并发消费的实现

上边这一块就是拉取回来的消息个数正好在你批量消费范围之内的话,就直接封装ConsumeRequst,提交给线程池处理,如果是拉取回来的消息个数大于你批量消费范围的话,就分批封装成ConsumeRequst,扔给线程池处理,默认批量消费是1个,如果你拉回来32个消息,他就会给你封装成ConsumeRequst对象,扔到线程池中并发消费, 注意这个线程池core是20 ,maxCore是60 ,然后队列没有限制大小。
接下来看下这个ConsumeRequst 任务的run方法实现

先是获取你的messageListener ,然后执行前置hook,接着就是设置重试topic,然后循环给这些消息添加消费时间,最后就是执行你messageListener中实现的consumeMessage 方法进行消费,这里还没有完,注意它的返回状态status。

上面这一大块其实就是处理这个status,也就是消费结果,如果你异常了,它这个status就是null,后面如果你是null的话,就设置RECONSUME_LATER,也就是稍后重新消费,接着就是执行消费后的钩子,接着下面就是一堆记录指标的东西了,如果processQueue没有销毁的话,就处理这个消费结果。我们来看下是怎样处理消费结果的

这里这一段就是计算成功几个,然后失败几个的,这个ackIndex,你可以在消费的时候根据实际成功情况来确认ack,当然你也可以不用设置,只要你消费的时候异常不往上抛,它就认为你消费成功了,一旦让它捕获到异常,你这一批消费都要被放到重试队列中。

这一块就是根据这个ackIndex,往后的消息它认为是失败的,然后发给broker 的重试队列中,进行重试消费,如果发送重试消息失败就加到集合中,等一会在本地再消费一下。
最后就是在processQueue的treeMap中删除消费完的消息,获取一个最小的offset,更新本地对应的消费offset。

这个就是将失败的消息发送到broker的重试队列中,可以看到有个延迟等级,这个其实就是根据你的重试次数进行延迟消费的,这里默认的延迟等级是0,但是到broker端,它发现你传过来的是0,它会给你设置成3+重试次数

2.2 顺序消费

顺序消费是在ConsumeMessageOrderlyService 这个服务组件中完成的,我们来看下

这里就封装一个消费请求,然后扔到线程池中,也就是从broker 拉回来的数据是顺序执行的。

我们这个run方法比较长,但是大体思想就是获取锁,然后再做一些判断,比如说顺序消费的话有没有从broker申请到锁,申请的锁有没有过期,接着就是循环消费了,每次循环之前都要做一堆的判断,再接着往下看

这里上来先判断了一下ConsumeRequest消耗的时间有没有超过1分钟,如果超过了就等会再消费,接着获取每次消费的个数,从ProcessQueue中取出来对应个数的消息,再往后就是执行消费前的hook

你会发现它每次消费前都要判断一下有没有drop,drop了就不要再消费了,drop出现在rebalance的时候,这次你这个消费者实例没有分配到这个MessageQueue,或者这个MessageQueue好久没有消费进行拉取消息消费了,然后它就会给你drop,同时将本地存储的offset 同步给broker ,接下来就是调用你实现的MessageListener 来处理你的业务逻辑进行消费,返回status,下面就需要关注status了

如果你处理业务异常了,或者返回了null,不管怎么说只要status是null,就是暂停一小会,从字面意思上也能看出来,接着就是执行消费后的hook了,最后是处理消费结果,我们看下处理消费结果

先是判断是否是自动提交,默认是的当然你可以改成非自动提交,如果成功的话,就获取一下要提交的offset,如果是等一会的话,就等会提交消费请求,然后continueConsume=false,这个时候,前面的那个for循环就停了,就会等1s后再将消费请求提交,继续消费,反正就是暂停1s。

前面那一大段代码就是手动提交的,其实都差不多,主要是看最后面这一行,如果没有drop的话,就更新一下本地存储消费offset,这个一般是存在内存里,然后5s向broker 同步一下。
到这我们消息消费者的并发消费与顺序消费就介绍完了,并解析了对应的源码实现。

RocketMQ源码解析之消息消费者(consume Message)相关推荐

  1. RocketMQ源码解析-事务消息的二阶段提交

    在生产者producer当中,通过sendMessageInTransaction()方法来发送事务消息,但是在一开始向Broker发送的事务消息的时候,具体的事务操作还并没有进行处理,而是相当于向B ...

  2. RocketMQ源码解析-Producer消息发送

    首先以默认的异步消息发送模式作为例子.DefaultMQProducer中的send()方法会直接调用DefaultMQProducerImpl的send()方法,在DefaultMQProducer ...

  3. RocketMQ源码(十)—Broker 消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析

    深入的介绍了broker的消息刷盘服务源码解析,以及高性能的刷盘机制. 学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者是异步: 1. 同步刷盘:如上图所示,只有消息真正 ...

  4. 6、RocketMQ 源码解析之 Broker 启动(上)

    上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...

  5. RocketMQ源码解析之broker文件清理

    原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...

  6. Dubbo源码解析-Dubbo服务消费者_Dubbo协议(一)

    前言: 在介绍完Dubbo 本地模式(Injvm协议)下的服务提供与消费后,上文我们又介绍了Dubbo远程模式(dubbo协议)下的服务暴露过程,本质上就是通过Netty将dubbo协议端口暴露出去, ...

  7. RocketMQ源码(12)—Broker 消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析【一万字】

    基于RocketMQ release-4.9.3,深入的介绍了Broker 的消息刷盘源码解析,以及高性能的刷盘机制. 学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者异 ...

  8. RocketMQ源码解析:Filtersrv

    ???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...

  9. rocketmq源码解析之name启动(一)

    2019独角兽企业重金招聘Python工程师标准>>> 说在前面 主要解析namrsrv启动部分,namesrv配置加载.netty server创建.注册出处理器. 正文 源码解析 ...

最新文章

  1. 品质检测破局:工业视觉检测云平台为智能制造“点睛”
  2. jquery ajax返回Internal server error 500错误解决方案
  3. 浅谈网络爬虫中广度优先算法和代码实现
  4. Windows Server 2008安装Memcached笔记
  5. Linux下安装配置virtualenv与virtualenvwrapper
  6. 巡检水中机器人_海洋与地球学院学子在2020年国际水中机器人大赛中获得佳绩...
  7. 《系统集成项目管理工程师》必背100个知识点-63供应商选择
  8. python怎么样另存为_python要怎么保存python生成式
  9. 68-Flutter中极光推送的使用
  10. vs 正则表达式转大写_liunx之通配符amp;正则表达式
  11. 详谈Hibernate框架关系映射!
  12. Chrome 35个开发者工具的小技巧
  13. VisualSVN https 钩子失效 关闭服务器信任
  14. Prefer rather than 喜欢 Prefer to
  15. Mendeley Destop引用格式自定义调整
  16. 时间序列分析实验报告总结_时间序列分析实验报告
  17. 指南针经纬度分秒格式转换10进制经纬度
  18. 微信消息模板换行符转义问题处理
  19. 云服务器无法连接怎么办
  20. 修改Git提交历史中的author,email和name等信息

热门文章

  1. WEB端唤起 百度|腾讯|高德 地图一键导航功能
  2. 敏捷CSM认证:什么是产品负责人?
  3. mysql 十万条 输出_mysql - php导出十多万条数据有没有办法更快?
  4. 单线程一定是线程安全的吗
  5. c if sortable html,Rails 5 - html5sortable - sortable不是HTMLDocument.ready中的函数
  6. 5G时代:和TCP/IP说拜拜
  7. 对于JAVAEE的理解
  8. SpringBoot+Nginx前后端分离项目部分配置总结
  9. 基于SpringBoot+SSM的大学生兼职平台
  10. RADOS分布式对象存储原理简介