https://www.infoq.cn/article/Q0o*QzLQiay31MWiOBJH/

一、背景

2019年快手Kafka集群日消息处理量为数万亿级别,峰值超过1亿/s。

在快手,Kafka集群被分成3类:

  • 在线集群:作为消息中间件,为不同在线业务之间提供异步的消息通知服务
  • Log集群:业务集群直接将log打给Kafka,并通过Kafka进行传输和收集,由于数据在业务应用层不落地,所以这个过程不能出现由于Kafka问题导致业务受到影响,这对Kafka可用性要求很高;Log集群还为重要的实时计算或模型训练提供数据源
  • 离线集群:Log数据的最终汇聚点,数据被dump到HDFS中,做离线处理。离线集群为次要的实时计算、实时训练提供数据源。

此外,也提供了Mirror服务,用于将数据从在线集群、Log集群传输到离线集群。

之所以将Kafka集群做物理划分,是为了保证服务质量,控制Kafka集群问题的影响面

业务规模:

二、技术演进


第一阶段:为了支持业务快速发展,做了多集群建设,并增加了Kafka平滑扩容功能。
第二阶段:为了保证业务稳定,对Kafka可用性优化:将单点宕机发现与恢复的时间从91s优化到6s,提升了15倍
第三阶段:为了增加可维护性及提升系统运维效率,对数据Mirror服务做了集群化,并开发了资源管理平台
第四阶段:为了进一步提升Kafka稳定性与性能,做了资源隔离,对cache进行改造,并对消费者进行了智能限速。

2.1、平滑扩容

先看原生Kafka的扩容流程:
1、假如集群有3个broker,4个TP(topic partition),每个3副本,均匀分布。
2、现在要扩容一台机器,新broker加入集群后,需要通过工具进行topic partition迁移。
3、一共迁移3个topic partition的副本到新broker上。
4、等迁移结束后,会重新进行leader balance。

最终topic partition分布如下图:

从微观角度看,TP从1台broker迁移到另1台broker的过程是怎样的呢?
以TP3的第三个副本,从broker1迁移到broker4来看:
broker4作为TP3的follower,从broker1上最早的offset进行获取数据,
直到追平offset为止,新副本被放入ISR中,并移除broker1上的副本,迁移完毕。

但原生Kafka扩容流程存在如下问题:
数据迁移从TP3的最初Offset开始copy数据,回导致大量的读磁盘,消耗大量的IO资源,导致磁盘繁忙,从而造成produce操作延迟变长。所以说迁移过程不够“平滑”。

优化思考:Kafka理论上是个消息缓存系统,不需要永久存储数据,很有可能费了很多工作迁移过来的数据,根本就不会被使用,甚至马上就被删除了。从这个角度来说,迁移数据时,为什么一定要从partition的最初offset开始迁移呢?仔细想想,实际不需要这样。


所以,平滑扩容的思路:在迁移TP时,直接从partition最新的offset开始迁移,但要同步保持一段时间,主要是确保所有consumer都已经跟上了。如上图所示,再来看这个TP3的第三个副本从broker1迁移到broker4的过程:
这次broker4从broker1最新的offset开始迁移,即transfer start这条竖线。此时,因为consumer1还没能跟上(可能有的consumer有消息积压,没有消费到consumer start),所以整个迁移过程需要保持一段时间,直到transfer end这个点。这时候,可以将TP3的新副本放到ISR中,同时去掉broker1上的副本,迁移过程完毕。

从这次迁移看,因为都是读最新的数据,不会出现源broker读大量磁盘数据的问题,仅仅多了一个副本的流量,基本对系统无影响。

基于这样的过程,我们就可以在晚高峰期间做扩容,从Kafka整体服务质量上看,对业务没有任何影响。
这个策略是Kafka官方的patch: https://issues.apache.org/jira/browse/KAFKA-8328

2.2、Mirror集群化

接下来看如何改进Mirror服务,使其具备较好的管理性,提升运维效率。
如上图所示,目前Kafka多集群之间的数据同步,采用的事MirrorMaker,这个架构存在2个问题:
1)被Mirror的topic是静态管理的,运维成本很高,且容易出错;
2)一旦有topic增加或减少,以及机器的加入或退出,都会导致原有正在Mirror的数据断流,这主要是因为经历了“停止服务,再启动服务”的过程。

为了解决这个问题,快手基于UReplicator,开发了KReplcator服务,并替换掉了现有的MirrorMaker服务。UReplicator是Uber开源的Kafka数据Mirror同步服务。

如上图所示,在部署的时候,快手部署了多个KReplicator cluster,主要是为了保证数据同步的稳定性。
在实现细节上,我们对UReplicator进行了扩展,使其可以动态感知不同的Kafka集群。这样只需要部署一个Mirror集群,就可以进行不同源集群及不同目标集群的数据同步,而不再需要部署多个Mirror集群。

KReplicator集群包括三个模块:

1、Controller:
用于动态管理topic, worker的增减
负责TP的分配策略,支持部分partition的迁移,这样新增节点或节点宕机会触发部分TP的迁移,不会造成Mirror服务的整体断流,仅仅是一小部分有抖动。

2、Worker:
支持动态增减与减少topic,这样增加或减少topic,避免了对已有TP传输的影响。
吃吃同时传输多个源集群到多个目标集群的数据传输能力
支持将数据dump到HDFS中

3、Zookeeper:
负责协调controller与worker

有了KReplicator cluster管理Kafka多集群间数据Mirror,极大地减少了我们的运维成本,以及出错的情况。此外,由于集群化管理的存在,我们可以快速地对Mirror服务进行扩缩容,以便对应业务的突发流量。

总结:KReplicator主要用于解决TP动态变更导致Mirror服务断流的问题。

2.3、资源隔离

问题1、不同业务线之间的topic会相互影响。
如下图,这个broker服务两个业务线的TP,不同业务线的TP会共享一块磁盘。如果此时,consumer出现问题,导致消费产生lag,而lag积累会导致读取磁盘中的数据,进而造成磁盘繁忙。最终,会影响在同一块磁盘的其他业务线TP的写入。

解决思路很简单,就是对不同业务的topic进行物理隔离。把不同业务线的topic放到不同的broker,如下图所示,这样任何业务线产生问题,不会影响其他业务线。这个改动需要对broker打上不同的标签,并在topic创建、TP迁移、宕机恢复流程中,增加按标签的TP分片算法就可以。

问题2、Kafka RPC队列缺少隔离,一旦某个topic处理慢,会导致所有请求hang住。
即没做topic维度的RPC队列隔离。

如上图所示,Kafka RPC框架中,首先由accepter从网络中接收连接,每收到一个连接,都会交给一个网络处理线程(processor)处理,processor读取网络中的数据,并将请求简单解析处理后,放到call队列中,RPC线程会从call队列中获取请求,然后进行RPC处理。此时,如果topic2的写入出现延迟,例如由于磁盘繁忙导致,则会最终将RPC线程池打满,进而阻塞call队列,进而打满网络线程池,这样发到这个broker的所有请求都没法处理了。


解决这个问题的思路也很直接,需要按照控制流、数据流分离,且数据流要能够按照topic做隔离。首先将call队列拆解成多个,并为每个call队列都分配一个线程池。在call队列的配置上,一个队列单独处理controller请求的队列(隔离控制流),其余多个队列按照topic做hash分散开(数据流之间隔离)。如果一个topic出现问题,只会阻止其中一个RPC处理线程池,以及call队列。

2.4、Cache改造

Kafka之所以有如此高的性能,主要依赖于page cache。Producer的写操作,broker会将数据写入到page cache中,随后consumer发起读操作,如短时间内page cache仍有效,则broker直接从内存中返回数据。

但由于page cache是操作系统层面的缓存,难于控制,有些时候,容易受到污染,从而导致整个kafka性能下降。看下面2个例子:

Case1、Consumer的lag读会对page cache产生污染

如上图所示,假如有2个Consumer,1个Producer。其中,蓝色的Producer在生产数据,蓝色consumer正在消费数据,但他们之间有一定的lag,导致分别访问的是不同page cache中的Page。如果一个橙色的consumer从topic partition最初的offset开始消费数据的话,会触发大量的读盘,并填充page cache。其中的5个蓝色的topic的page数据都会被橙色topic的数据填充了。另一方面,刚刚蓝色producer生产的数据,也已经被冲掉了。此时,如果蓝色的consumer读取到了蓝色producer刚刚生产的数据,他不得不将刚刚写入的数据从磁盘读取到page cache中。综上所述,大lag的consumer会造成page cache污染,在极端情况下,会造成整体吞吐量下降。

Case2、follower也会造成page cache污染

在上图中,broker1机器内部,其中page cache中除了包括蓝色producer写入外,还包括橙色follower写入的数据。但是,橙色follower写入的数据,,再正常情况下,之后不会再有访问,这相当于将不需要再被访问的数据放入了cache,这是对cache造成了浪费和污染。所以,很容易想到Kafka是否可以自己维护cache呢?首先,严格按照实际顺序进行cache,可以避免异常consumer的lag读造成的cache污染。其次,控制follower的数据不再进入cache,这样阻止了follower对cache的污染,进一步提升cache的容量。

基于这个想法,快手对kafka cache进行了整体设计,如下图:

快手在Broker中引入了两个对象:一个是block cache, 另一个是flush queue。Producer写入请求在broker端首先会被以原message的形式写入flush queue中,之后在将数据写入到block cache的一个block中,之后整个请求就结束了。在flush queue中的数据,会由其他线程异步地写入磁盘中(会经历page cache过程)。而follower的处理流程仍和原来保持一致,从其他broker读取数据后,直接把数据写入到磁盘(也会经历page cache),这种模式保证了block cache中的数据全都是producer产生的,不会被follower污染。

对于consumer而言,在broker接到消费请求后,首先会从block cache中检索数据,如果命中,则直接返回。否则,则从磁盘读取数据。这样的读模式,保证了consumer的cache miss读并不会填充block cache,从而避免了产生污染,即使有大lag的consumer读磁盘,也仍可保证block cache的稳定性。


接下来,我们看block cache的微观设计,整个block cache由3个部分组成:
第一部分:2类block pool,维护着空闲的block信息,之所以分为2类,主要因为segment数据及segment的索引大小不同,统一划分会导致空间浪费。
第二部分:先进先出的block队列,用于维护block生产的时序搞关系,在触发淘汰时,会优先淘汰时间上最早的block。
第三部分:TP+offset到有效的blocks的索引,用于快速定位一个block。一个block可以看做是segment的一部分,segment数据以及segment索引和block的对应关系如上图所示。

最后,还有两个额外的线程:
1、eliminater线程,用于异步进行block cache淘汰,当然,如果Producer请求处理时,发现block cache满,也会同步进行cache淘汰的。
2、异步写线程,用于将flush queue中的Message异步地写入到磁盘中。

这个就是Kafka cache的整体设计,这样就解决了上述两个对cache的污染问题了。

测试结果:搭建了5个broker集群,其中一个换成了Kafka cache版本,并创建了一个150个parition的topic, 3副本。所以算上副本,一共有450个partition,每台机器上90台TP,之后Mirror了一个现实的流量数据,并启动了150个consumer,总体lag 450w条数据开始读。

从图中可以看出,原始版本再这种情况下,会造成大量的磁盘读,而Kafka cache版本没有任何磁盘读操作。

除此之外,在看下改进后的broker,从上图可以看出,producer整个写入过程,先同步写入内存,然后再异步刷入磁盘。虽然page cache模式也是类似这种,但page cache会存在一定不稳定性(可能触发同步写盘)

2.5、智能限速


在刚才讲资源隔离的时候,看到这个的case, 如果consumer操作大量读磁盘,会影响producer操作的延迟。当时我们通过资源物理隔离,达到了隔离不同业务线topic的目标,避免了互相影响。但对于同一个业务线的topic之间还可能会互相影响,如何解决consumer lag后读磁盘导致producer写入受阻问题呢?解决办法是,当磁盘繁忙时,对lag的consumer进行限速控制。

如上图所示,整个限速逻辑实现在RPC工作线程处理的末端,一旦RPC处理完毕,则通过限速控制模块进行限速检测,如果要限速,则确定等待时间,之后放入到delayed queue中,否则放到response queue中。放到delayed queue中的请求,等待时间达到后,会被delayed线程放入到response queue中。最终在response queue中的请求被返回给consumer。对于限速控制模块的检测逻辑,则是根据当前请求topic所在磁盘是否繁忙,以及这次的lag是否超过阈值(无积压的consumer不能限速。阈值的设置凭经验,但后续会和Kafka cache进行结构,则可以精确哪些请求是block cache miss的,进而进行限速控制)。
Metric采集线程则周期性采集磁盘metric等信息,并给限速决策模块提供数据。

2.6、后续计划

1、由于机房的限制,我们无法集群内扩容。如果搭建新集群,势必会带来大量的业务迁移过程,搞得大家都很痛苦,所以,解决的思路是,是否可以建设跨IDC的统一大集群的方案。

2、随着业务规模越来越大,目前controller存在一系列性能问题。极端情况下,会影响系统稳定,接下来做进一步优化。

3、部分业务线有对事务的需求,后续也会参考高版本的设计,加进来事务的功能。

4、机器的磁盘会出现“半死不活”的情况,这段时间请求会卡死,造成业务的不稳定,需要想办法解决掉。

快手Kafka集群演进之路学习笔记相关推荐

  1. 分布式集群架构场景解决方案学习笔记

    课程学习 一致性哈希算法 集群时钟同步问题 分布式ID解决方案 分布式任务调度问题 session共享(一致性)问题 一致性哈希算法 一致性哈希算法在1997年由麻省理工学院的Karger等人在解决分 ...

  2. hbase1.1.1 连接集群_HBase-1.0.1学习笔记(一)集群搭建

    鲁春利的工作笔记,好记性不如烂笔头 如下配置参照了http://hbase.apache.org/book.html,详见:hbase-1.0.1.1/docs/book.html 环境配置 1.安装 ...

  3. Kubernetes全栈架构师(二进制高可用安装k8s集群扩展篇)--学习笔记

    目录 二进制Metrics&Dashboard安装 二进制高可用集群可用性验证 生产环境k8s集群关键性配置 Bootstrapping: Kubelet启动过程 Bootstrapping: ...

  4. Kafka学习 之 理解Kafka集群(二)

    在学习之前,已经假设已经成功搭建了Kafka集群,开始下面的学习: 1. 理论学习 broker:生产环境中,一台服务器上只会安装一个 Kafka 软件,这台服务器就是一个 Kafka Server, ...

  5. ELK集群+Kafka集群+FileBeat——命运多舛的安装采坑之路

    欢迎大家关注我的公众号,添加我为好友! 开始的时候感觉日志监控是比较NB的技术,感觉很神奇,那么多日志,为什么一下子就能够找到自己想要的?后来初步了解到了ELK(ElasticSearch + Log ...

  6. Kafka学习:CentOS7下Kafka集群搭建

    文章目录 准备 集群安装 1.创建目录 2.解压缩安装包 3.修改配置文件 4.启动 5.查看集群是否安装成功 测试Kafka 1.创建测试mytopic 2.查看mytopic副本信息 3.查看已创 ...

  7. Kafka学习之(五)搭建kafka集群之Zookeeper集群搭建

    Zookeeper是一种在分布式系统中被广泛用来作为:分布式状态管理.分布式协调管理.分布式配置管理.和分布式锁服务的集群.kafka增加和减少服务器都会在Zookeeper节点上触发相应的事件kaf ...

  8. 滴滴Logi-KafkaManager开源之路:一站式Kafka集群指标监控与运维管控平台

    导读 从2019年4月份计划开源到2021月1月14号完成开源,历时22个月终于修成正果,一路走来实属不易,没有前端.设计.产品,我们找实习生.合作方.外部资源支持,滴滴Kafka服务团队人员也几经调 ...

  9. Kafka集群在马蜂窝大数据平台的优化与应用扩展

    导读 Kafka 是当下热门的消息队列中间件,它可以实时地处理海量数据,具备高吞吐.低延时等特性及可靠的消息异步传递机制,可以很好地解决不同系统间数据的交流和传递问题. Kafka 在马蜂窝也有非常广 ...

最新文章

  1. git查看某个文件的提交历史
  2. 第五周项目一-三角形类雏形(5)
  3. 《C++ Primer Plus 6th》读书笔记 - 第8章 函数探幽
  4. matlab7.0 win7 64,安装matlab7.0出现问题,我是win7+64位系统,求解
  5. spring boot实战(第七篇)内嵌容器tomcat配置
  6. 各种光源(灯)的光谱
  7. php程序里的configini_程序员手册 修改php.ini的几种方法
  8. 自学python单片机编程-作为一个硬件工程师,你该学学Python了
  9. java 前后台传参数为json格式,如何取出
  10. Vue3.x 深入浅出系列(连载三)
  11. Compact Multi-Signatures for Smaller Blockchains代码解析
  12. 线性回归:自相关检测及其处理方法
  13. C#时间/日期格式大全
  14. matplotlib隐藏坐标轴
  15. 授权(authorization)的设计思路
  16. 汇编语言mov al,0c5h,汇编语言读书笔记 Day 04
  17. C语言 | 常见问题汇总
  18. 人和计算机比赛下棋结果,人机大战趣谈:会下棋的电脑 像人一样聪明
  19. 永远不要去依赖别人_别太依赖一个人的说说 不依赖别人的经典语录_经典语录...
  20. (绝对防御勒索病毒)装机员 ghost win7 Sp1 64位纯净6月版

热门文章

  1. 韩剧《幽灵》-渗透入侵反病毒神器介绍总结
  2. Ubuntu终端代理工具——proxychains
  3. java小白的学习自述(大三)
  4. Mobile APP(Apple IOS app store)特性分析
  5. windows10电脑安装vs2013全过程
  6. Python快速计算24点游戏并获取表达式
  7. Java偏序关系_java中的偏序关系
  8. spring boot项目接入支付宝支付
  9. 【Appium系列】AppiumDriver简介
  10. 本科团队例会分享1 多米诺与托米诺平铺问题 c语言