快手基于 RocketMQ 的在线消息系统建设实践
作者:黄理,10 多年软件开发和架构经验,热衷于代码和性能优化,开发和参与过多个开源项目。曾在淘宝任业务架构师多年,当前在快手负责在线消息系统建设工作。
为什么建设在线消息系统
在引入 RocketMQ 之前,快手已经在大量的使用 Kafka 了,但并非所有情况下 Kafka 都是最合适的,比如以下场景:
业务希望个别消费失败以后可以重试,并且不堵塞后续其它消息的消费。
业务希望消息可以延迟一段时间再投递。
业务需要发送的时候保证数据库操作和消息发送是一致的(也就是事务发送)。
为了排查问题,有的时候业务需要一定的单个消息查询能力。
为了应对以上这类场景,我们需要建设一个主要面向在线业务的消息系统,作为 Kafka 的补充。在考察的一些消息中间件中,RocketMQ 和业务需求匹配度比较高,同时部署结构简单,使用的公司也比较多,于是最后我们就采用了 RocketMQ。
部署模式和落地策略
在一个已有的体系内落地一个开源软件,通常大概有两种方式:
方式一:在开源软件的基础上做深度修改,很容易实现公司内需要的定制功能。但和社区开源版本分道扬镳,以后如何升级?
方式二:尽量不修改社区版本(或减少不兼容的修改),而是在它的外围或者上层进一步包装来实现公司内部需要的定制功能。
注:上图方式一的图画的比较极端,实际上很多公司是方式一、方式二结合的。
我们选择了方式二。最早的时候,我们使用的是 4.5.2 版本,后来社区 4.7 版本大幅减小了同步复制的延迟,正好我们的部署模式就是同步复制,于是就很轻松的升级了 4.7 系列,享受了新版本的红利。
在部署集群的时候,还会面临很多部署策略的选择:
大集群 vs 小集群
选择副本数
同步刷盘 vs 异步刷盘
同步复制 vs 异步复制
SSD vs 机械硬盘
大集群会有更好的性能弹性,而小集群具有更好的隔离型,此外小集群可以不需要跨可用区 /IDC 部署,所以会有更好的健壮性。我们非常看重稳定性,因此选择了小集群。集群同步复制异步刷盘,首选 SSD。
客户端封装策略
如上所述,我们没有在 RocketMQ 里面做深度修改,所以需要提供一个 SDK 来实现公司内需要的定制功能,这个 SDK 大概是这样的:
对外只提供最基本的 API,所有访问必须经过我们提供的接口。简洁的 API 就像冰山的一个角,除了对外的简单接口,下面所有的东西都可以升级更换,而不会破坏兼容性。
业务开发起来也很简单,只要需要提供 Topic(全局唯一)和 Group 就可以生产和消费,不用提供环境、NameServer 地址等。SDK 内部会根据 Topic 解析出集群 NameServer 的地址,然后连接相应的集群。生产环境和测试环境环境会解析出不同的地址,从而实现了隔离。
上图分为 3 层,第二层是通用的,第三层才对应具体的 MQ 实现,因此,理论上可以更换为其它消息中间件,而客户端程序不需要修改。
SDK 内部集成了热变更机制,可以在不重启 Client 的情况下做动态配置,比如下发路由策略(更换集群 NameServer 的地址,或者连接到别的集群去),Client 的线程数、超时时间等。通过 Maven 强制更新机制,可以保证业务使用的 SDK 基本上是最新的。
集群负载均衡 & 机房灾备
所有的 Topic 默认都分配到两个可用区,生产者和消费者会同时连接至少两个独立集群(分布在不同的可用区),如下图:
生产者同时连接两个集群,如果可用区 A 出现故障,流量就会自动切换到可用区 B 的集群 2 去。我们开发了一个小组件来实现自适应的集群负载均衡,它包含以下能力:
千万级 OPS
灵活的权重调整策略
健康检查支持/事件通知
并发度控制(自动降低响应慢的服务器的请求数)
资源优先级(类似 Envoy,实现本地机房优先,或是被调服务器很多的时候选取一个子集来调用)
自动优先级管理
增量热变更
实际上它并不仅仅用于消息生产者,而是一个通用的主调方负载均衡类库,可以在 Github 上找到:https://github.com/PhantomThief/simple-failover-java。
核心的 SimpleFailover 接口和 PriorityFailover 类没有传递第三方依赖,非常容易整合。
多样的消息功能
延迟消息
延迟消息是非常重要的业务功能,不过 RocketMQ 内置的延迟消息只能支持几个固定的延迟级别,所以我们又开发了单独的 Delay Server 来调度延迟消息:
上图这个结构没有直接将延迟消息发到 Delay Server,而是更换 Topic 以后存入 RocketMQ。这样的好处是可以复用现有的消息发送接口(以及上面的所有扩展能力)。对业务来说,只需要在构造消息的时候额外指定一个延迟时间字段即可,其它用法都不变。
事务消息
RocketMQ 4.3 版本以后支持了事务消息,可以保证本地事务和消费发送同时成功或者失败,对于一些业务场景很有帮助。事务消息的用法和原理有很多资料,这里就不细述了。但关于事务消息的实践网上资料较少,我们可以给出一些建议。
首先,事务消息功能一直在不断完善,应该使用最新的版本,至少是 4.6.1 以后的版本,可以避免很多问题。
其次,事务消息性能是不如普通消息的,它在内部实际上会生成 3 个消息(一阶段 1 个,二阶段 2 个),所以性能大约只有普通消息的 1/3,如果事务消息量大的话,要做好容量规划。回查调度线程也只有 1 个,不要用极限压力去考验它。
最后有一些参数注意事项。在 Broker 的配置中:
transientStorePoolEnable 这个参数必须保持默认值 false,否则会有严重的问题。
endTransactionThreadPoolNums是事务消息二阶段处理线程大小,sendMessageThreadPoolNums 则指定一阶段处理线程池大小。如果二阶段的处理速度跟不上一阶段,就会造成二阶段消息丢失导致大量回查,所以建议 endTransactionThreadPoolNums 应该大于 sendMessageThreadPoolNums,建议至少 4 倍。
useReentrantLockWhenPutMessage 设置为 true(默认值是 false),以免线程抢锁出现严重的不公平,导致二阶段处理线程长时间抢不到锁。
transactionTimeOut 默认值 6 秒太短了,如果事务执行时间超过 6 秒,就可能导致消息丢失。建议改到 1 分钟左右。
生产者 Client 也有一个注意事项,如果有多组 Broker,并且是 2 副本(有 1 个 Slave),应该打开 retryAnotherBrokerWhenNotStoreOK,以免某个 Slave 出现故障以后,大量消息发送失败。
分布式对账监控
除了比较一些常规的监控手段以外,我们开发了一个监控程序做分布式对账。可以发现我们的集群以及我们提供的 SDK 是否有异常。
具体做法是在每个 Broker 上都建立一个监控专用的 Topic,监控程序使用我们自己提供的 SDK 框架来连接集群(就像我们的业务用户那样),监控生产者会给每个集群发送少量消息。然后检查发送是否成功:
发送成功 |
成功 |
刷盘超时 |
|
Slave 超时 |
|
Slave 不可用 |
|
发送失败 |
具体错误码 |
生产者只对这些结果进行打点,不判断是否正常,具体到监控(或者演练)场景可以配置不同的报警规则。
消费者收到了消息会通过 TCP 旁路 ACK 生产者,生产者这边会做分布式对账,将对账结果打点:
收到消息
消息丢失(或超时未收到消息)
重复收到消息
消息生成到最终消费的时间差
ACK 生产者失败(由消费者打点)
同样监控程序只负责打点,报警规则可另外配置。
这套机制也可以用于分布式性能压测和故障演练。在做压测的时候,每个消息都 ACK 的话,对生产者的内存压力很大,因为它发出去的消息,需要在内存中保留一段时间(直到到达这个消息的对账时间),这段时间消费者 ACK 或者重复 ACK 都需要记录。所以我们实现了按比例抽样对账的功能,开启以后只有需要对账的消息才会在内存中保留一段时间。
顺便说一下,我们做压测时,合格的标准是异步生产不失败、消费不延迟、每一个消息都不丢失。这样做是为了保证压测时能给出更加准确的,可供线上系统参考的性能数字,而不是制造理想条件,追求一个大的数字。比如异步生产比同步生产更脆弱(压测 Client 如果同步生产,Broker 抖动的时候,同步 Client 会被堵塞导致发送速度降低,于是降低了 Broker 压力,消息发送不容易失败,但是会看到发送速率在波动),更贴近生产环境的实际情况,我们就选择异步生产来评估。
性能优化
Broker 默认的参数在我们的场景下(SSD、同步复制、异步刷盘)不是最优的,有的参数也许在大多数场景下都不是最优的。我们列出一些重要的参数,供大家参考:
参数 |
默认值 |
说明 |
flushCommitLogTimed |
False |
默认值不合理,异步刷盘这个参数应该设置成 true,导致频繁刷盘,对性能影响极大。 |
deleteWhen |
04 |
几点删除过期文件的时间,删除文件时有很多磁盘读,这个默认值是合理的,有条件的话还是建议低峰删除。 |
sendMessageThreadPoolNums |
1 |
处理生产消息的线程数,这个线程干的事情很多,建议设置为 2~4,但太多也没有什么用。因为最终写 commit log 的时候只有一个线程能拿到锁。 |
useReentrantLockWhenPutMessage |
False |
如果前一个参数设置比较大,这个最好设置为 true,避免高负载下自旋锁空转消耗 CPU。 |
sendThreadPoolQueueCapacity |
10000 |
处理生产消息的队列大小,默认值可能有点小,比如 5 万 TPS(异步发送)的情况下,卡 200ms 就会爆。设置比较小的数字可能是担心有大量大消息撑爆内存(比如 100K 的话, 1 万个的消息大概占用 1G 内存,也还好),具体可以自己算,如果都是小消息,可以把这个数字改大。可以修改 Broker 参数限制 Client 发送大消息。 |
brokerFastFailureEnable |
True |
Broker 端快速失败(限流),和下面两个参数配合。这个机制可能有争议,client 设置了超时时间,如果 client 还愿意等,并且 sendThreadPoolQueue 还没有满,不应该失败,sendThreadPoolQueue 满了自然会拒绝新的请求。但如果 Client 设置的超时时间很短,没有这个机制可能导致消息重复。可以自行决定是否开启。理想情况下,能根据 Client 设置的超时时间来清理队列是最好的。 |
waitTimeMillsInSendQueue |
200 |
200ms 很容易导致发送失败,建议改大,比如 1000ms。 |
osPageCacheBusyTimeOutMills |
1000 |
Page cache 超时时间,如果内存比较多,比如 32G 以上,建议改大点。 |
总结
得益于简单、几乎 0 依赖的部署模式,使得我们部署小集群的成本非常低;不对社区版本进行魔改,保证我们可以及时升级;统一 SDK 入口方便集群维护和功能升级;通过复合小集群+自动负载均衡实现多机房多活;充分利用 RocketMQ 的功能,比如事务消息、延迟消息(增强)来满足业务的多样性需求;通过自动的分布式对账,对每一个 Broker 以及我们的 SDK 进行正确性监控。
本文也进行了一些性能参数的分享,但写的比较简单,基本只说了怎么调,但没能细说为什么,以后我们会另写文章详述。目前 RocketMQ 已经应用在公司在大多数业务线,期待将来会有更好的发展!
特别推荐一个分享架构+算法的优质内容,还没关注的小伙伴,可以长按关注一下:
长按订阅更多精彩▼如有收获,点个在看,诚挚感谢
快手基于 RocketMQ 的在线消息系统建设实践相关推荐
- 快手基于RocketMQ的在线消息系统建设实践
简介:快手需要建设一个主要面向在线业务的消息系统作为 Kafka 的补充,低延迟.高并发.高可用.高可靠的分布式消息中间件 RocketMQ 正是我们所需的. 作者:黄理 黄理,10多年软件开发和架构 ...
- 消息规模超千亿,同程艺龙的消息系统建设实践
导读: 同程艺龙的机票.火车票.汽车票.酒店相关业务已经接入了RocketMQ,用于流量高峰时候的削峰,以减少后端的压力.同时,对常规的系统进行解耦,将一些同步处理改成异步处理,每天处理的数据达150 ...
- 基于web的在线考试系统的设计与开发
欢迎添加微信互相交流学习哦! 项目源码:https://gitee.com/oklongmm/biye2 在线考试系统的设计与开发 目录 TOC 1-3 第一章 绪论 1 1.1在线考试系统的研究背景 ...
- 基于html5的在线教育,基于HTML5的在线学习系统的设计与实现
基于HTML5的在线学习系统的设计与实现 发布时间:2019-11-18所属分类:科技论文浏览:1次 摘 要: 摘 要: 在线课程学习网站的发展迅速,吸引了广大用户.基于 HTML5 的在线学习系统经 ...
- JOJ——基于爬虫的在线测评系统(Online Judge)
这是一个基于爬虫的在线测评系统(OJ). 相信喜欢刷题的各位大佬应该对OJ并不陌生.本系统旨在使用一个账号,就可以刷遍各个OJ的题目. 系统基于后端SpringBoot.Mybatis-Plus.Th ...
- 基于php体育场馆在线预约系统
基于php体育场馆在线预约系统 随着国民生活水平的逐渐提高,体育运动已渐渐成为人们生活的一部分.互联网的普及给人们带来的便利不需多说.因此如果把体育运动与互联网结合起来,利用PHP技术建设体育场馆在线 ...
- 基于SSM的在线教育系统的设计与实现【附源码】
基于SSM的在线教育系统的设计与实现 需求规格说明书 Version: 1.0.0 目 录 项 目 承 担 部 门: HELLOWORLD! 1 撰 写 人(签名): 陈徐锋 1 引言 1 1.1 目 ...
- 基于Java毕业设计在线阅读系统源码+系统+mysql+lw文档+部署软件
基于Java毕业设计在线阅读系统源码+系统+mysql+lw文档+部署软件 基于Java毕业设计在线阅读系统源码+系统+mysql+lw文档+部署软件 本源码技术栈: 项目架构:B/S架构 开发语言: ...
- 基于JavaWeb的在线音乐系统
019基于JavaWeb的在线音乐系统(含论文) 开发环境: Jdk7(8)+Tomcat7(8)+Mysql+IntelliJ IDEA(Eclipse) 数据库: MySQL 技术: Java+S ...
最新文章
- Python 数据类型:列表
- Android Fragment 深度解析
- Fiori 出试(WEBIDE平台)day1
- linux 设置git ip,centos7安装gitlab并更改端口IP地址
- CorelDraw技巧|设计师要了解数位板怎么用
- 获取网络接口信息——ioctl()函数与结构体struct ifreq、 struct ifconf
- Linux中find用法
- CS 期刊哪家强?CCF 发布最新期刊分级目录!
- 雷军卸任小米电子董事长!常程等人也退出该公司 网友:为造车放弃家业?
- EXCEL教程下载地址
- 干货!NB-IoTLoRa物联网项目实操来了!
- Browsers简介
- 微信小程序:十二星座运势查询
- 再也不要相信你的眼睛:步步逼近的AI换脸术
- Boundary Sensitive Network (BSN) 源码运行
- centos 编译安装cmake和常见过程错误解决办法(linux系统均适用,以爬坑。。)
- 全数字化城市道路视频监控存储解决方案
- 软考初级程序员上午单选题(19)
- 北塔网管软件BTSO2.5安装过程记录
- 关于Vite 客户端代码不支持node内置模块path的处理
热门文章
- 了解下RDF 都柏林核心元数据
- 对我而言Linux究竟有什么魅力
- 如何在 Windows 中检查计算机正常运行时间
- C语言模拟实现库函数 atoi
- [SCOI2007]蜥蜴 (网格图经典四方向建边)
- 大掌门2显示服务器繁忙,《大掌门2》二周年庆典开启 真传弟子潇洒归来
- c1 c2在c语言中,c1和c2科目一样吗
- Android官方模拟器root,在Android模拟器上如何获得root权限?
- oracle11g insufficient,ORACLE11GORA-01031:insufficientprivileges
- 第十一届山东省大学生程序设计竞赛 L. Construction of 5G Base Stations(概率期望,递推前缀和优化)