RocketMQ(十二)消息堆积与消费延迟

产生背景

消息处理流程中,如果Consumer的消费速度跟不上Producer的发送速度,MQ中未处理的消息会越来越多,这部分消息就被称为 堆积消息。消息出现堆积会导致消费延迟,以下场景需要重点关注消息堆积和消息延迟的问题。

  • 业务系统上下游能力不匹配造成的持续堆积,且无法自行恢复
  • 业务系统对消息的消费实时性要求较高,即使是短暂的堆积造成的消费延迟也无法接受。

Consumer使用长轮询Pull模式消费消息时,分为以下两个阶段:

消息拉取

Consumer通过长轮询Pull模式批量拉取的方式从服务端获取消息,将拉取到的消息缓存到本地缓冲队列中。对于拉取式消费,在内网环境下会有很高的吞吐量,所以这一阶段一般不会成为消息堆积的瓶颈。

一个单线程单分区的低规格主机(Consumer,4C8G),其可达到几万的TPS。如果是多个分区多个线程,则可以轻松达到几十万的TPS

消息消费

Consumer将本地缓存的消息提交到消费线程中,使用业务消费逻辑对消息进行处理,处理完毕后获取到一个结果。这是真正的消息消费过程。此时Consumer的消费能力就完全依赖于消息的消费耗时和消费并发度了。如果由于业务处理逻辑复杂等原因,导致处理单条消息的耗时较长,则整体的消息吞吐量肯定不会高,此时就会导致Consumer本地缓冲队列达到上限,停止从服务端拉取消息。

总结

消息堆积的主要瓶颈在于客户端的消费能力,而消费能力由消费耗时和消费并发度决定。注意,消费耗时的优先级要高于消费并发度。即在保证了消费耗时的合理性前提下,再考虑消费并发度问题。

消费耗时

影响消息处理时长的主要因素是代码逻辑。而代码逻辑中可能会影响处理时长代码主要有两种类型:CPU内部计算型代码和I/O操作型代码。

通常情况下代码中如果没有复杂的递归和循环的话,内部计算耗时相对外部I/O操作来说几乎可以忽略。所以外部IO型代码是消息处理时长变长的主要原因,如:

  • 读写远程数据库
  • 读写远程Redis
  • 对下游系统的http接口调用

关于下游系统调用逻辑需要进行提前梳理,掌握每个调用操作预期的耗时,这样做是为了能够判断消费逻辑中IO操作的耗时是否合理。通常消息堆积是由于下游系统出现了服务异常达到了DBMS容量限制,导致消费耗时增加。

  • 服务异常:也有可能是网络带宽等问题

消息并发度

一般情况下,消费者端的消费并发度由单节点线程数和节点数量共同决定,其值为单节点线程数*节点数量。不过,通常需要优先调整单节点的线程数,若单机硬件资源达到了上限,则需要通过横向扩展来提高消费并发度。

  • 单节点线程数,即单个Consumer所包含的线程数量

  • 节点数量,即Consumer Group所包含的Consumer数量

  • 全局顺序消息:该类型消息的Topic只有一个Queue分区。其可以保证该Topic的所有消息被 顺序消费。为了保证这个全局顺序性,Consumer Group中在同一时刻只能有一个Consumer的一 个线程进行消费。所以其并发度为1

  • 分区顺序消息:该类型消息的Topic有多个Queue分区。其仅可以保证该Topic的每个Queue 分区中的消息被顺序消费,不能保证整个Topic中消息的顺序消费。为了保证这个分区顺序性, 每个Queue分区中的消息在Consumer Group中的同一时刻只能有一个Consumer的一个线程进行 消费。即,在同一时刻最多会出现多个Queue分蘖有多个Consumer的多个线程并行消费。所以 其并发度为Topic的分区数量。

单机线程数计算

对于一台主机中线程池中线程数的设置需要谨慎,不能盲目直接调大线程数,设置过大的线程数反而会带来大量的线程切换的开销。理想环境下单节点的最优线程数计算模型为:
C∗(T1+T2)T1\frac{C*(T1+T2)}{T1} T1C∗(T1+T2)​

  • C:CPU内核数
  • T1:CPU内部逻辑计算耗时
  • T2:外部IO操作耗时

最优线程数计算公式:
最优线程数=C∗(T1+T2)T1=C∗T1T2+C最优线程数=\frac{C*(T1+T2)}{T1}=C*\frac{T1}{T2}+C 最优线程数=T1C∗(T1+T2)​=C∗T2T1​+C

注意,该计算出的数值是理想状态下的理论数据,在生产环境中,不建议直接使用。而是先设置一个比该值小的数值,观察其压测效果,然后再根据效果逐步调大线程数,直至找到在该环境中性能最佳时的值。

消息堆积的避免

为了避免在业务使用时出现非预期的消息堆积和消费延迟问题,需要在前期设计阶段对整个业务逻辑进行完善的排查和梳理。其中最重要的就是梳理消息的消费耗时设置消息消费的并发度

梳理消息的消费耗时

通过压测获取消息的消费耗时,并对耗时较高的操作的代码逻辑进行分析。梳理消息的消费耗时需要关注以下信息:

  • 消息消费逻辑的计算复杂度是否过高,代码是否存在无限循环和递归等缺陷。
  • 消息消费逻辑中的I/O操作是否是必须的,能否用本地缓存等方案规避。
  • 消费逻辑中的复杂耗时的操作是否可以做异步化处理。如果可以,是否会造成逻辑错乱。

设置消费并发度

对于消息消费并发度的计算,可以通过以下两步实施:

  • 逐步调大单个Consumer节点的线程数,并观测节点的系统指标,得到单个节点最优的消费线程数和消息吞吐量。

  • 根据上下游链路的流量峰值计算出需要设置的节点数
    节点数=流量峰值单个节点消息吞吐量节点数= \frac{流量峰值}{单个节点消息吞吐量} 节点数=单个节点消息吞吐量流量峰值​

消息的清理

消息被消费过后不会被清理,是被顺序存储在commitlog文件的,且消息大小不定长,所以是不可能以消息为单位进行清理的,而是以commitlog文件为单位进行清理的。

commitlog文件存在一个过期时间,默认为72小时,即三天。除了用户手动清理外,在以下情况下也会被自动清理,无论文件中的消息是否被消费过:

  • 文件过期,且到达清理时间点(默认为凌晨4点)后,自动清理过期文件
  • 文件过期,且磁盘空间占用率已达过期清理警戒线(默认75%)后,无论是否达到清理时间点,
  • 都会自动清理过期文件
  • 磁盘占用率达到清理警戒线(默认85%)后,开始按照设定好的规则清理文件,无论是否过期。
  • 默认会从最老的文件开始清理
  • 磁盘占用率达到系统危险警戒线(默认90%)后,Broker将拒绝消息写入

需要注意以下几点:

  • 对于RocketMQ系统来说,删除一个1G大小的文件,是一个压力巨大的IO操作。在删除过程 中,系统性能会骤然下降。所以,其默认清理时间点为凌晨4点,访问量最小的时间。也正因如果,我们要保障磁盘空间的空闲率,不要使系统出现在其它时间点删除commitlog文件的情况。

  • 官方建议RocketMQ服务的Linux文件系统采用ext4。因为对于文件删除操作,ext4要比ext3性能更好

RocketMQ(十二)消息堆积与消费延迟相关推荐

  1. 【RocketMQ工作原理】消息堆积与消费延迟

    概念 消息处理流程中,如果Consumer的消费速度跟不上Producer的发送速度,MQ中未处理的消息会越来 越多(进的多出的少),这部分消息就被称为堆积消息.消息出现堆积进而会造成消息的消费延迟. ...

  2. RabbitMq(十二) 借用死信交换机实现延迟队列

    概述:延迟队列即在消息发送后延迟固定时间后再去接受处理,做相应的一些相应. 应用场景举例:在电商购物后,订单支付前发送消息信息,在三分钟之后检查订单是否支付成功,如果支付,则取消订单并库存数量恢复:或 ...

  3. Flask(十二)——消息闪现

    一个基于GUI好的应用程序需要向用户提供交互的反馈信息. 例如,桌面应用程序使用对话框或消息框,JavaScript使用 alert() 函数用于类似的目的. 在Flask Web应用程序中生成这样的 ...

  4. RocketMQ(十二)消息堆积与消息延迟

    1.概念 消息处理流程中,如果Consumer的消费速度跟不上Producer的发送速度,MQ中未处理的消息会越来越多(进的多出的少),这部分消息就被称为堆积消息.消息出现堆积进而会造成消息的消费延迟 ...

  5. 遇到了消息堆积,但是问题不大

    这一篇我们要说的话题是消息的堆积处理,其实这个话题还是挺大的,因为消息堆积还是真的很令人头疼的,当堆积的量很大的时候,这真的是个很暴躁的问题,不过这时候真考验大家冷静的处理问题的能力了 我们一起来分析 ...

  6. 【RocketMQ 二十八】RocketMQ 消息堆积

    消息堆积本质 ⽣产者的⽣产速度 >> 消费者的处理速度 ⽣产者的⽣产速度骤增,⽐如⽣产者的流量突然骤增 消费速度变慢,⽐如消费者实例 IO 阻塞严重或者宕机 如何处理消息堆积 如何处理消息 ...

  7. RocketMQ消息消费源码分析(二消息的消费)

    首先回到DefaultMQPushConsumerImpl  start方法 public synchronized void start() throws MQClientException {sw ...

  8. rocketmq 重复消费_RocketMQ的十二个特性,你都知道吗「下」

    接上文,今天继续. 9 消息重试 Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次.Consumer消费消息失败通常可以认为有以下几种情况: 由于消息本身的原因,例如反序列化失败, ...

  9. RocketMQ中的消息类型种类(二)

    消息种类 按照发送的特点分 同步消息 异步消息 单向消息 按照使用功能特点分 顺序消息 广播模式 延迟消息 批量消息 过滤消息 事务消息 按照发送的特点分 同步消息 同步发送是指消息发送方发出数据后, ...

最新文章

  1. Ubuntu 18.04缺少libgconf-2.so.4
  2. java批量下载文件为zip包
  3. 弱电工程网络传输基础知识讲解
  4. python报错_python 常见报错
  5. [python] 使用scikit-learn工具计算文本TF-IDF值
  6. 《剑指offer》按之字行顺序打印二叉树
  7. android icon命名规则,安卓手机的APP图标尺寸规范和图标命名规范
  8. Android之如何解决Android Studio左边的的project不见了
  9. VS2017的C++开发心得(一)VS的项目创建
  10. 车载android播放器,KX万能播放器
  11. 网页游戏未来发展的一些趋势
  12. ps中颜色和图片本身颜色不一样,白色呈米色
  13. Unity3d 中 PlayerPrefs 保存数据的总结
  14. NG Toolset开发笔记--5GNR Resource Grid(10)
  15. 进不去系统rpc服务器不可用,WinXP系统RPC服务器不可用怎么办?
  16. 二极管(二):肖特基二极管
  17. java 手机号归属地查询
  18. 计算机组成原理——乘法运算(一位乘)
  19. ABAP:增强篇-CJ20N屏幕增强
  20. 卡尔曼滤波 | Matlab实现非线性卡尔曼滤波(Nonlinear KF)

热门文章

  1. 【BZOJ3879】SvT,后缀数组+单调栈维护sum
  2. 【BZOJ1500】【codevs1758】维修数列,简析Splay的综合操作
  3. java 获取本机的ip和mac_java获取本机ip和mac地址
  4. 2017.10.28 压缩 思考记录
  5. 【英语学习】【Level 07】U06 First Time L1 My very first trip
  6. 【英语学习】【English L06】U07 Jobs L3 Dr. Allen is a Dentist
  7. java事务代码_关于java中实现JDBC事务控制代码示例
  8. springboot 日志级别_SpringBoot实战(十三):Admin动态修改日志级别
  9. 零元学Expression Blend 4 - Chapter 3 熟悉操作第一步(制作一个猴子脸)
  10. Java 使用GDAL 读写 shapefile