一、Kafka Indexing Service 运行原理

1、简介

Kafka Indexing Service 是 Druid 推出的利用 Druid 的索引服务实时消费 Kafka 数据的插件。该插件会在 Overlord 中启动一个 supervisor,supervisor 启动之后会负责创建task、调度task到Middlemanager中运行,并管理监控整个task的生命周期,而这些 task会连接到Kafka集群消费topic数据,并完成索引创,用户需要做的就是准备一个数据消费格式文件,之后通过 REST API 手动启动 supervisor,一个数据源对应一个supervisor。

2、KafkaSupervisor启动过程

3.  KafkaSupervisor 重要数据结构

KafkaSupervisor用于消费kafka的task的数量是由用户提交数据消费格式文件中的taskCount进行配置的,一个task可能消费一个或多个kafka partition,partition的编号被哪个task消费存在这样的一个映射关系:Id = partition % taskCount,用户可以通过配置文件中的replicas 为一个task设置多个副本,这样几个副本会消费相同的partition, 由于副本机制,KafkaSupervisor有了一个TaskGroup的概念,TaskGroup中的task消费的partition相同。

Kafka索引任务存在两种状态, reading 状态和publish状态,当task读取数据到达duration配置的时间,则进行publish状态,publish也会持续completionTimeout 时间,当task进入publish状态的时候立马又创建下一轮的任务开始从上一轮的task消费到的位置开始reading,这么一直不停地交错进行。Supervisor 也维护这两个队列用于存放两种状态的task,并且还维护一个全局的kafka 分区与offset的映射关系表:

由上可见Kafka Task 的数量是固定的,一旦某一个KafkaSupervisor需要修改task的数量,都必须手动重新提交KafkaSupervisor进程,而此刻KafkaSupervisor就先停止再重新启动,停止过程会kill所有的正在运行的task。

二、  实践中出现的问题

1、动态伸缩的需求造成高昂的人力成本

Supervisor在运行期间无法更改taskCount,如果到了流量高峰期,task消费不过来,导致Lag突增,目前的手动停止supervisor,并更改配置文件,由于现在的datasource数量比较多,不断重新修改配置文件重新启动造成比较大的人力成本消耗。

2、资源的浪费

为了避免流量高峰期的延迟,如果把taskCount设置成流量高峰期的值,到了流量低峰期的时候会造成资源的浪费。下图表明,该数据源的kafka流量在一天之内有很长时间处于低峰期,也有处于高峰期:

随着业务需求日益旺盛,datasource不断递增,手动运维成本越来越高,实现一个可自动化调整的taskCount的新特效变得非常有意义。

根据观察发现, Lag和CPU的关系紧密:

三、动态伸缩模块设计与实现

1、动态伸缩原理

KafkaSupervisor启动之后会创建一个单线程不停地消费一个notices队列中的任务,notice封装了KafkaSupervisor的各种职责任务。

notice作为一个接口,其实现类包括以下几个,例如:当用户发起Reset操作的时候会向该队列中添加一个ResetNotice,当用户想要关闭supervisor的时候会向该队列中添加一个ShowdowNotice,还有一个专门的线程会定时向该队列中添加一个RunNotice, 它是supervisor最重要的执行索引服务的逻辑。

我们需要一个模块能够快速发现task消费不过来的这种异常状态并自动扩大task的数量,这个模块被封装进DynamicTaskCountNotice中,有一个线程会周期性向队列添加DynamicTaskCountNotice,我们的检测模块能够根据CPU和Lag的趋势判断是否存在空闲和负载过高的情况,然后触发动态调整。

详细流程:整个过程包括Metric的获取,到metric的分析,再到伸缩判断,最后通过伸缩控制来决定是否进行动态调整,最后执行动态伸缩

 2、额外添加的配置项

为了实现伸缩策略的动态调整,我们开放出很多可配置的参数供不同的数据源的个性化动态伸缩调整。

"taskCount": 1      task count的初始值

总开关:

"dynamicTaskCountEnable":false     是否开启动态伸缩策略

伸缩策略 :

"taskCountMax": 100     task count的最大值

"taskCountMin": 1      task count的最小值

taskCountGrowthMultiplier : 1.5    伸的倍数

taskCountReduceMultiplier : 0.75    缩的倍数

minTriggerDynamicFrequency: "PT15M"     两次触发动态伸缩最小时间间隔

周期检测:

"dynamicCheckPeriod":"PT60S"    检测周期

"dynamicCheckStartDelay":"PT300S"    启动时检测的推迟时间

checkRecentMetricPoints": 5     每次做检测时参考最近几分钟的历史指标为依据(Metric是一分钟一次,所以分钟为单位)

cpu 指标相关:

"cpuUsageMax"  :  null     cpu的使用率最小的阀值, 没有默认值,一般为0.85

"cpuUsageMin"  :  null      cpu的使用率最大的阀值, 没有默认值,一般为0.25

"tasksPercentOfHighCpuUsage" :0.3     task比例 (cpu超过最大阀值所占的task比例视为可伸条件之一)

"tasksPercentOfLowCpuUsage" :    0.9     task比例 (cpu超过最小阀值所占的task比例视为可缩条件之一)

lag指标相关:

"kafkaAvgLagMax": null     kafka的平均的Lag的最大的阀值, 没有默认值,一般为20000

"kafkaAvgLagMin": null       kafka的平均的Lag的最小的阀值, 没有默认值,一般为1000

"maxRestoreToleranceTime" :  "PT15M"    kafka Lag 恢复到常值的最大容忍时间, 如果最近的Lag已经很大,虽然是回落的趋势,但是恢复速度很慢也是无法容忍的,希望扩容来加速。 这个容忍期是希望至少在几分钟之内恢复到正常值,该值由最近五分钟之内的消费速率做的预测,所以可能偏离实际情况

3、模块启动

为了启动动态伸缩的功能,在配置Supervisor Configuration 的时候,在ioconfig配置信息中添加动态伸缩开关:       dynamicTaskCountEnable: true

对于没有配置的supervisor 将不会开启动态伸缩的功能

4、周期检测

检测的周期时间由dynamicCheckPeriod 参数控制

启动supervisor之后,推迟多久进行周期检测由dynamicCheckStartDelay参数控制

刚启动的时候Lag可能会比较大,所以不建议立马进行检测,一般推迟5分钟, 为了实现更高的灵敏性,检测周期可以设置得更小一点,这样发现异常的速度更快,触发动态调整的可能性更高。

检测出异常情况不一定会导致触发动态调整,因为触发还有一些其他的参数控制(minTriggerDynamicFrequency),避免频繁触发。

5、异常检测原理

CPU和Lag目前作为重要的参考指标

缩的策略: Lag很小 &&  cpu空闲 (两个条件同时满足)

伸的策略: Lag过大 && cpu负载过高 (两个条件同时满足)

但是通过不设,或者巧妙设置特殊值 能够单方面关闭某个维度的指标

5.1  CPU检测

一个supervisor同时在会启动多个task, task的状态分为多种 :reading、 publising、 pause、complete、fail

kafkaIndexTask 在启动之后会向overlord上报自己最近cpu使用率,这里专门取正在reading状态的task,task集合的cpu metric 是否需要动态扩缩: 参数latestCpuUsages

cpu使用率的取值范围为:[0, 1]

1)cpuUsageMin :  cpu <= cpuUsageMin, 表示CPU使用空闲,是缩的条件之一:(cpu.isLower && lag.isLower)

如果为null: 表示动态缩的时候不考虑cpu.isLower这个条件,作为掩码,isLower===true,表示关闭cpu作为(缩)的条件

特殊值 : cpuUsageMin<0,  是假命题, 所以,isLower===fasle, 表示坚决不缩

特殊值 : cpuUsageMin>=1, 是真命题,所以,isLower===true, 表示关闭cpu作为(缩)的条件

2)cpuUsageMax  cpu >= cpuUsageMax, 表示CPU负载高,是扩的条件之一, ex:(cpu.isHigher && lag.isHigher)

如果为null: 表示动态缩的时候不考虑cpu.isHigher这个条件 ,作为掩码,isHigher===true,表示关闭cpu作为(扩)的条件

特殊值: cpuUsageMax<=0, 是真命题,所以,isHigher===true, 表示关闭cpu作为(扩)的条件

特殊值: cpuUsageMax>1,   是假命题, 所以,isHigher===fasle, 表示坚决不扩

3)如果cpuUsageMin=null && cpuUsageMax=null

表示彻底关闭cpu作为(扩/缩)的条件,将返回一个掩码作用的 state(isHigher= true, isLower=true, isNormal=true)

4)tasksPercentOfHighCpuUsage : task.isHigher 的比例高于多少,可扩, 选填, 有默认值 = 0.3

tasksPercentOfLowCpuUsage :  task.isLower 的比例高于多少,可缩 ,选填, 有默认值 = 0.9

5)对于单个task的最近几分钟的cpu: List<Double>

判断单个task: cpu.isHigher: (平均值 >= cpuUsageMax)&&(最后一个值 >= cpuUscpuMax)

判断单个task: cpu.isLower: (最大值<=cpuUsageMin)&&(非严格单调递增)&&(最后一个值<=cpuUsageMin)

 5.2  Lag检测

Lag为task消费kafka 消息的滞后程度的一个指标。kafkaSupervisor每分钟收集一次所有的partiton Lag的均值

参考最近几分钟之内的Lag,参数: ArrayBlockingQueue<Double> latestKafkaAvgLags

1)可扩的条件:isHigher

avgLag >= kafkaAvgLagMax  &&  lastLag >= kafkaAvgLagMax  &&  (!isStrictlyDecline || (isStrictlyDecline&&isTolerance))

三个同时满足:

a. avgLag >= kafkaAvgLagMax : 均值大于阀值

b. lastLag >= kafkaAvgLagMax : 最近分钟的Lag大于阀值

c. 如果为非严格单调递减(:保留或上升的趋势)  ||   如果为严格单调递减 且但是减的速度已经超过的容忍的时间

以上三个条件同时满足的情况下判断为可扩展状态

2)可缩的条件: isLower

maxLag<=kafkaAvgLagMin  &&  !isStrictlyAscend

maxLag<=kafkaAvgLagMin: 如果最近几分钟之内的最大值小于阀值

!isStrictlyAscend : 非严格单调递增

以上两个条件同时满足的情况下判断为可回收状态

3)   灵活参数设置:

a. 如果不考虑Lag这个维度,不管是task 扩展 还是收缩,对最后的伸缩决策都无影响力,那么设置为掩码状态:isHigher = true, isLower = true

参数的设置方式:  kafkaAvgLagMax = null  并且  kafkaAvgLagMin = null ,  在提交json文件的时候可以无须配置,默认就null

b. 单方面的关闭:

设置为kafkaAvgLagMax = null, 或者其他的(特殊值<=0 ), 表示让伸的时候不考虑Lag情况, 关闭 kafkaAvgLagMax,这样isHigher === true,  在与其他值&运行的时候起到掩码的作用,不影响别人。

kafkaAvgLagMin <= 0 , 表示任何时候都不考虑缩,isLower ===  false

6、触发控制

一旦检测出异常,需要伸缩的时候,需要通过:taskCountMax 和taskCountMin,taskCountGrowthMultiplier ,taskCountReduceMultiplier,minTriggerDynamicFrequency参数控制

(1)task数量范围控制

出于资源的考虑,task的不能无限制的伸展,也不能缩到为0。

首先  task的数量在 [taskCountMin,  taskCountMax]区间,而taskCountMax的最大数量不能超过partition的数量, taskCountMin不能小于1。

(2)伸缩速度

伸的速度由taskCountGrowthMultiplier参数控制, 新的task数量=taskCount*taskCountGrowthMultiplier

缩的速度由taskCountReduceMultiplier参数控制, 新的task数量=taskCount*taskCountReduceMultiplier

(3)伸缩的频率

动态伸缩的时候会终止处于于reading状态的task线程,改变了原有的生命周期,可能会导致pushlish的顺序乱序而直接被系统终止。

所以需要控制伸缩的频率,minTriggerDynamicFrequency表示两次伸缩的最小的时间间隔

7、调整过程

动态的调整主要是停止正在reading状态的线程,并重新开始创建新的一批的task。

1) 终止没有被分配middlemanager,或者处于pedding状态的线程

2)触使所有的reading状态线程开始进行publish

3)清除所有的关于task,taskGroup与partion的各种映射信息

4)更新当前的taskCount,并持久化到mysql数据库中,并记录当前的伸缩调整的时间

5)创建新的一轮的task

8、效果评估

(1)自动伸缩性能,减少人力成本方面评估。

我们期待的效果应该是随着Kaka的lag的流量的波动,taskCount会敏捷跟随着Lag一样的方向波动。

如上图所示,我们监控了两天之内的taskCount 随着Kafka Lag的增大而增大,随着Lag的减少而减少,大体的趋势是我们的预期。

接着我们检查一下它的敏捷性:

从上图来看,Lag持续超过阀值的5分钟之内,我们的动态扩充已经执行,并且在动态扩充之后的Lag立马得到回落,这也是我们所期待的。

(2)提高资源利用率方面的评估。

当TaskCount 的数量过多,一方面在流量低峰期的时候造成资源的浪费,另一方面,会生成过多的小文件,这样对查询和存储都不太友好。

我们对线上开启动态伸缩的task进行统计,排除业务数据不断扩张 数据量日益增加 taskCount必须逐渐上涨的情况 。

我们统计了上线前后一天之内数据源的taskCount的平均值,对于流量变动大的task,  资源的节省率最大能够达到72%, 平均能够节省20%。

9、 遗留的问题 

目前对于毛刺问题依然无法解决,因为这些毛刺是由于两轮task更换过渡期产生,并且持续时间很短,不足5分钟。

Druid Kafka索引服务的Task动态伸缩相关推荐

  1. Druid 索引服务的资源精细化调度

    本文将从三个方面讨论关于Druid 索引服务的资源精细化调度: 一.Druid索引服务的简介 介绍了Druid 索引服务的架构和调度细节.(介绍原理) 二.实践中出现的问题 经过线上生产环境的实践,旧 ...

  2. 干货 | 跨多业务线挑战下,携程订单索引服务的1.0到2.0

    作者简介 唐巍,携程用户平台部订单服务组资深后端开发,在互联网尤其是移动互联网方面有丰富的经验,目前主要负责OrderIndex的维护和架构升级工作. 经过团队几个月的努力,我们最近终于完成了OI(订 ...

  3. 我是如何将一个老系统的kafka消费者服务的性能提升近百倍的

    大家好,又见面了~ kafka作为一种高吞吐量的分布式发布订阅消息系统,在业务系统中被广泛的使用. 如果问你,如何提高kafka队列中的消息消费速度呢? 答案很简单,topic多分几个分片,然后使用消 ...

  4. Druid.io索引过程分析——时间窗,列存储,LSM树,充分利用内存,concise压缩

    Druid底层不保存原始数据,而是借鉴了Apache Lucene.Apache Solr以及ElasticSearch等检索引擎的基本做法,对数据按列建立索引,最终转化为Segment,用于存储.查 ...

  5. kafka 消息服务

    apache kafka参考 http://kafka.apache.org/documentation.html 消息队列方式: 点对点: 消息生产者生产消息发送到queue中,然后消息消费者从qu ...

  6. kafka消息服务的producer、broker、consumer的配置

    2019独角兽企业重金招聘Python工程师标准>>> server.properties配置: server.properties中所有配置参数说明(解释)如下列表: 参数 说明( ...

  7. Kubernetes-集群结合普罗米修斯、监控nginx、hpa动态伸缩

    目录: Prometheus简介 一.k8s集群部署Prometheus 二. Prometheus监控应用nginx 三. prometheus实现k8s集群的hpa动态伸缩 Prometheus简 ...

  8. hikaripool信息_HikariPool源码(三)资源池动态伸缩

    Java极客  |  作者  /  铿然一叶 这是 1.资源池的动态伸缩 1.为了提升资源池的性能,需要设置最小闲置资源数量,在资源池初始化时完成初始化:而当使用的资源超过最小闲置资源数,消费者释放回 ...

  9. 微服务架构:动态配置中心搭建

    版权声明:本文为博主原创文章,转载请注明出处,欢迎交流学习! 在微服务架构中,服务之间有着错综复杂的依赖关系,每个服务都有自己的依赖配置,在运行期间很多配置会根据访问流量等因素进行调整,传统的配置信息 ...

最新文章

  1. R语言泊松分布函数Poisson Distribution(dpois, ppois, qpois rpois)实战
  2. SAP MM MM17里不能修改物料主数据'Purchasing Value Key'字段值?
  3. python爬虫实战-python爬虫实战一:分析豆瓣中最新电影的影评
  4. 数据库查询性能优化之利器—索引(二)
  5. CS229 1 .线性回归与特征归一化(feature scaling)
  6. JavaScript省市二级联动
  7. 35岁北大博士拟升市长
  8. 信息学奥赛C++语言:求平均分
  9. Javascript——进阶(事件、数组操作、字符串操作、定时器)
  10. beatsx三闪红灯是什么意思_“左转弯待转区”还有坑?走错了等于闯红灯,驾照直接扣6分罚200...
  11. 《机器学习实战》学习总结(六)PCA算法原理
  12. 网络通信之通过get/post方式提交参数给web应用
  13. 计算机考试有python吗_计算机二级考试有python吗
  14. SpringbootJPA分页 PageRequest过时
  15. 基于双月数据集利用最小二乘法进行分类
  16. nginx过滤HttpHeader的 中划线
  17. 模拟线上应用cpu100%解决方法
  18. 消防系统设计市场现状及未来发展趋势
  19. 鬼泣4refrain 《鬼泣4 refrain》图文全攻略(iphone版)
  20. UCK全球路演走进佛山,跨链技术将加速区块链商业应用

热门文章

  1. 实验管理系统springboot+vue+element ui项目开发
  2. 2020年英语四级作文
  3. cucumber java从入门到精通_cucumber java从入门到精通(4)Scenario Outline及数据驱动...
  4. ICCV 2019视频目标跟踪算法Pipeline集合
  5. 华米手表2 是android,手表 | 续航怪兽 华米AMAZFIT智能运动手表2代深度评测(二)...
  6. alternate端口什么意思_alternate是什么意思
  7. error CS1061
  8. 运算放大器中的正负反馈判断和电压电流反馈判断
  9. 【Rust日报】2022-02-22 Slint - 为桌面和嵌入式设备创建一个新的GUI框架
  10. 二目运算符是什么意思?