这一篇我们要说的话题是消息的堆积处理,其实这个话题还是挺大的,因为消息堆积还是真的很令人头疼的,当堆积的量很大的时候,这真的是个很暴躁的问题,不过这时候真考验大家冷静的处理问题的能力了

我们一起来分析分析有关问题吧

大量的消息堆积在MQ中几个小时还没解决怎么办呢

一般这种比较着急的问题,最好的办法就是临时扩容,用更快的速度来消费数据

1、临时建立一个新的Topic,然后调整queue的数量为原来的10倍或者20倍,根据堆积情况来决定

2、然后写一个临时分发消息的consumer程序,这个程序部署上去消费积压的消息,消费的就是刚刚新建的Topic,消费之后不做耗时的处理,只需要直接均匀的轮询将这些消息轮询的写入到临时创建的queue里面即可

3、然后增加相应倍数的机器来部署真正的consumer消费,注意这里的Topic,然后让这些consumer去真正的消费这些临时的queue里面的消息

不知道大家明白没有,很简单的道理,我给大家举个形象的例子

一个topic堵住了,新建一个topic去进行分流,临时将queue资源和consumer资源扩大10倍,将消息平均分配到这些新增的queue资源和consumer资源上,以正常10倍的速度来消费消息,等到这些堆积的消息消费完了,便可以恢复到原来的部署架构

这种只是用于临时解决一些异常情况导致的消息堆积的处理,如果消息经常出现堵塞的情况,那该考虑一下彻底增强系统的部署架构了

消息设置了过期时间,过期就丢了怎么办呢

在rabbitmq中,可以设置过期时间TTL,和Redis的过期时间一样,如果消息在queue中积压超过一定时间就会被rabbitmq清理掉,这个数据就没了

这样可能会造成大量的数据丢失

这种情况下上面的解决方案就不太合适了,可以采取批量重导的方案来解决,在系统流量比较低的时候,用程序去查询丢失的这部分数据,然后将消息重新发送到MQ中,把丢失的数据重新补回来

这也算是一种补偿任务吧,补偿任务一般是用于对定时跑批的一种补偿

分析下RocketMQ中的消息堆积原因

消息的堆积归根到底就是生产者生产消息的速度和消费者消费的速度不匹配导致的,输入的和消费的速度不统一

或许是突然搞了一波促销,系统业务量暴增,导致生产者发消息暴增,消费速度跟不上

也有可能是消费方出现失败的情况,疯狂重试,也或者就是消费方的消费能力太低了

RocketMQ是按照队列进行消息负载的,如果consumer中的一台机器由于硬件各方面原因导致该机器上的消息队列不能及时处理,就会造成整个消息队列的堆积

RocketMQ分为发布方和订阅方,双方都有负载均衡策略,默认都是采用平均分配,producer消息以轮询方式发送到消息队列queue中,broker将这些的queue再平均分配到属于同一个group id的订阅方集群

  • .如果消费者consumer机器数量和消息队列相等,则消息队列平均分配到每一个consumer上

  • 如果consumer数量大于消息队列数量,则超出消息队列数量的机器没有可以处理的消息队列

  • 若消息队列数量不是consumer的整数倍,则部分consumer会承担跟多的消息队列的消费任务

如果其中一台机器处理变慢,可能是机器硬件、系统、远程 RPC 调用或 Java GC 等原因导致分配至此机器上的 Queue 的消息不能及时处理

消息队列 RocketMQ 版的消息负载是按 Queue 为粒度维护,所以,整个 Queue 上的消息都会堆积

那说一下解决思路吧

我们知道了最根本原因是生产和消费速度不匹配导致的,这种问题要是经常出现,就是系统架构导致,这种需要考虑增加消费方的数量了

如果是搞促销的这种临时情况导致的,这种情况下系统应该会比较快的消化掉,堆积时间不会很快,如果搞促销时间很长,持续高流量时间很长,那没得办法,还是得加机器

经常出现这种消息堆积问题,需要先定位一下消费满的原因,也也可能是代码bug,导致多次重试,如果是bug则处理bug,优化下消费的逻辑

再者就要考虑水平扩容,增加Topic的queue数量和消费者的数量,这两者增加的时候需要考虑两边的平衡,队列数量一定要增加,不然新增加的消费数量者会导致无消息消费的尴尬场面,一个topic中的一个队列只会分配给一个消费者

消费者数量超过队列数量的时候,超出的部分消费者就无消息可以消费了

RocketMQ中消费完的消息去了哪里呢

消息的存储是一直存在于CommitLog文件中的,大家都知道CommitLog是以文件为单位存在的,而且RocketMQ的设计是只允许顺序写,也就意味着所有消息都是顺序的写入到这个文件中的

而每个消息的大小又不是定长的,所以这就决定了消息几乎不可能按照消息为单位进行删除,逻辑极其复杂

消息一旦被消费了之后是不会被立即清除的,还是会存在于CommitLog文件中的,那问题来了,消息未删除,RocketMQ是如何知道哪些消息已经被消费过,哪些还未消费呢

答案就是客户端会维护一个消息的offset,客户端拉取完消息之后,broker会随着响应体返回一个下一次拉取的位置,消费者会更新自己的下一次的pull的位置

CommitLog文件什么时候进行清除

消息存储到该文件之后,也是会被清理的,但是这个清理只会在下面这些条件中,任一条件成立的时候才会批量的删除CommitLog消息文件

  • 消息文件过期(默认72小时),且到达清理时点(默认是凌晨4点),删除过期文件。

  • 消息文件过期(默认72小时),且磁盘空间达到了水位线(默认75%),删除过期文件。

  • 磁盘已经达到必须释放的上限(85%水位线)的时候,则开始批量清理文件(无论是否过期),直到空间充足。

注:若磁盘空间达到危险水位线(默认90%),出于保护自身的目的,broker会拒绝写入服务。

为什么这么设计呢

CommitLog文件默认大小是1GB,在清理的时候属于大文件操作了,IO压力也是有的,这样设计该文件的优点我大概说几个,当然肯定还有些别的

只需要保存一份消息文件:一个消息如果需要被多个消费者组消费,消息只需要保存一份即可,消费进度单独保存,这样比较容易支撑强大的消息存储能力

支持回溯:把消息的消费位置的决定权放在客户端,只要消息还在,就可以消费,所以也就有了RocketMQ支持的回溯消费

像看视频一样,可以把镜头调到前面去,重新看一遍刚刚的视频

支持消息索引服务:RocketMQ中有一个索引文件,消息只要还存在于CommitLog中,就可以被搜索出来,方便排查问题

有道无术,术可成;有术无道,止于术

欢迎大家关注Java之道公众号

好文章,我在看❤️

遇到了消息堆积,但是问题不大相关推荐

  1. 消息队列面试连环问:如何保证消息不丢失?处理重复消息?消息有序性?消息堆积处理?...

    大家好,我是 yes. 最近我一直扎在消息队列实现细节之中无法自拔,已经写了 3 篇Kafka源码分析,还剩很多没肝完.之前还存着RocketMQ源码分析还没整理.今儿暂时先跳出来盘一盘大方向上的消息 ...

  2. 【RocketMQ工作原理】消息堆积与消费延迟

    概念 消息处理流程中,如果Consumer的消费速度跟不上Producer的发送速度,MQ中未处理的消息会越来 越多(进的多出的少),这部分消息就被称为堆积消息.消息出现堆积进而会造成消息的消费延迟. ...

  3. RocketMQ(十二)消息堆积与消费延迟

    RocketMQ(十二)消息堆积与消费延迟 产生背景 消息处理流程中,如果Consumer的消费速度跟不上Producer的发送速度,MQ中未处理的消息会越来越多,这部分消息就被称为 堆积消息.消息出 ...

  4. kafka消息堆积原因解析

    kafka消息堆积,可以调节如下两个参数 max.poll.records 一次调用poll()返回的最大记录数. 默认值500 就是一次最多拉取500条记录 max.poll.interval.ms ...

  5. kafka消息堆积且CPU过高代码优化

    kafka消息堆积且CPU过高代码优化 直接部署已有的代码程序到线上服务器,发现CPU立马升高500%左右,立马停掉服务并看源代码排查问题,翻看代码,发现通过多线程消费 kafka消息,根据对多线程的 ...

  6. 解决kafka 消息堆积问题的排查及调优

    一.背景说明 深夜接到客户紧急电话,反馈腾讯云 kafka 中有大量消息堆积未及时消费.每分钟堆积近 100w 条数据.但是查看 ES 监控,各项指标都远还没到性能瓶颈.后天公司就要搞电商促销活动,到 ...

  7. 消息队列产生严重消息堆积怎么处理?

    1. 为什么产生消息堆积? 大多是因为 Consumer 出问题了,没有及时发现,或者故障恢复需要较长的时间,导致大量消息积压在 MQ 中. 2. 消息堆积会有什么后果呢? 2.1 消息被丢弃 例如 ...

  8. 【RocketMQ 二十八】RocketMQ 消息堆积

    消息堆积本质 ⽣产者的⽣产速度 >> 消费者的处理速度 ⽣产者的⽣产速度骤增,⽐如⽣产者的流量突然骤增 消费速度变慢,⽐如消费者实例 IO 阻塞严重或者宕机 如何处理消息堆积 如何处理消息 ...

  9. RocketMQ(十二)消息堆积与消息延迟

    1.概念 消息处理流程中,如果Consumer的消费速度跟不上Producer的发送速度,MQ中未处理的消息会越来越多(进的多出的少),这部分消息就被称为堆积消息.消息出现堆积进而会造成消息的消费延迟 ...

最新文章

  1. 最小系统必须安装的组件(仅做参考)
  2. 调css支持firefox、IE6、IE7的方法
  3. python+Eclipse+pydev环境搭建与入门
  4. Java 单例模式探讨
  5. 网站安全测试报告模板
  6. java输入输出流实例代码
  7. 五金冲压模具设计分享pressCAD外挂使用小窍门
  8. 2019年下半年教师资格幼儿园《综合素质》真题与参考答案
  9. 2022软件测试常见抓包工具
  10. spyglass 学习笔记之cdc check
  11. Docker_尚硅谷视频学习笔记
  12. 【学习周报】深度学习笔记第二周
  13. GreatSQL特性介绍及未来展望--叶金荣|万里数据库
  14. BC26 OPEN开发之--LWM2M连接分析
  15. web项目修改favicon.ico图标
  16. 怎么用计算机实现复数开平方,用你手中的计算器进行复数运算
  17. 透过社群网站加强公益的力量 让爱心无国界传遍各地!
  18. echart饼状图、柱状图上显示百分比、数据值
  19. 如何用python写一个计算日期间隔的程序?
  20. 3D MAX 人物骨骼建设

热门文章

  1. python解析.pyd文件
  2. pycham窗口显示多个编辑页面
  3. go 多行字符串_Go语言基本功,了解Go语言基础语法
  4. win7系统修复工具_205个电脑系统修复小工具, 联想工程师专用!
  5. basic认证 接口 php,PHP 模拟 HTTP 基本认证(Basic Authentication) - 黄棣-dee - 博客园...
  6. linux 物理内存用完了_调整linux内核尽量用内存,而不用swap
  7. 移动互联网APP测试流程及测试点(转载) (二)
  8. (软件工程复习核心重点)第三章需求分析习题
  9. c++程序设计中的多态与虚函数知识点
  10. Ubuntu开机启动Python脚本