初始化

文件位置:cmd/kube-controller-manager/app/controllermanager.go

func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {...controllers["horizontalpodautoscaling"] = startHPAController...
}

HPA Controller和其他的Controller一样,都在NewControllerInitializers方法中进行注册,然后通过startHPAController来启动。

startHPAController|-> startHPAControllerWithRESTClient|-> startHPAControllerWithMetricsClient|-> NewHorizontalController

文件位置:/pkg/controller/podautoscaler/horizontal.go

// NewHorizontalController creates a new HorizontalController.
func NewHorizontalController(evtNamespacer v1core.EventsGetter,scaleNamespacer scaleclient.ScalesGetter,hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter,mapper apimeta.RESTMapper,metricsClient metricsclient.MetricsClient,hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer,podInformer coreinformers.PodInformer,resyncPeriod time.Duration,downscaleStabilisationWindow time.Duration,tolerance float64,cpuInitializationPeriod,delayOfInitialReadinessStatus time.Duration,) *HorizontalController {broadcaster := record.NewBroadcaster()broadcaster.StartStructuredLogging(0)broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: evtNamespacer.Events("")})recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "horizontal-pod-autoscaler"})...hpaInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{AddFunc:    hpaController.enqueueHPA,UpdateFunc: hpaController.updateHPA,DeleteFunc: hpaController.deleteHPA,},resyncPeriod,)...return hpaController
}

核心逻辑是监听hpa对象的事件,分别对应hpaController.unqueueHPA,hpaController.updateHPA和hpaController.deleteHPA。enqueueHPA本质上就是把hpa对象注册到HorizontalController的队列里,updateHPA是更新hpa对象,deleteHPA是删除对象。hpa对象存在hpaController的workqueue中。

代码见下文

文件位置:/pkg/controller/podautoscaler/horizontal.go

// obj could be an *v1.HorizontalPodAutoscaler, or a DeletionFinalStateUnknown marker item.
func (a *HorizontalController) updateHPA(old, cur interface{}) {a.enqueueHPA(cur)
}// obj could be an *v1.HorizontalPodAutoscaler, or a DeletionFinalStateUnknown marker item.
func (a *HorizontalController) enqueueHPA(obj interface{}) {key, err := controller.KeyFunc(obj)if err != nil {utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))return}// Requests are always added to queue with resyncPeriod delay.  If there's already// request for the HPA in the queue then a new request is always dropped. Requests spend resync// interval in queue so HPAs are processed every resync interval.a.queue.AddRateLimited(key)
}func (a *HorizontalController) deleteHPA(obj interface{}) {key, err := controller.KeyFunc(obj)if err != nil {utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))return}// TODO: could we leak if we fail to get the key?a.queue.Forget(key)
}

startHPAController

文件位置:cmd/kube-controller-manager/app/autoscaling.go

最后会调用到startHPAControllerWithMetricsClient方法,启动一个线程来调用NewHorizontalController方法初始化一个HPA Controller,然后执行Run方法。
func startHPAController(ctx ControllerContext) (http.Handler, bool, error) {...return startHPAControllerWithLegacyClient(ctx)
}func startHPAControllerWithLegacyClient(ctx ControllerContext) (http.Handler, bool, error) {hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")metricsClient := metrics.NewHeapsterMetricsClient(hpaClient,metrics.DefaultHeapsterNamespace,metrics.DefaultHeapsterScheme,metrics.DefaultHeapsterService,metrics.DefaultHeapsterPort,)return startHPAControllerWithMetricsClient(ctx, metricsClient)
}func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient metrics.MetricsClient) (http.Handler, bool, error) {hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")hpaClientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery())scaleClient, err := scale.NewForConfig(hpaClientConfig, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)if err != nil {return nil, false, err}// 初始化go podautoscaler.NewHorizontalController(hpaClient.CoreV1(),scaleClient,hpaClient.AutoscalingV1(),ctx.RESTMapper,metricsClient,ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),ctx.InformerFactory.Core().V1().Pods(),ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration,ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance,ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration,ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration,).Run(ctx.Stop)return nil, true, nil
}

Run

文件位置:pkg/controller/podautoscaler/horizontal.go

func (a *HorizontalController) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()defer a.queue.ShutDown()klog.Infof("Starting HPA controller")defer klog.Infof("Shutting down HPA controller")if !cache.WaitForNamedCacheSync("HPA", stopCh, a.hpaListerSynced, a.podListerSynced) {return}// 启动异步线程,每秒执行一次go wait.Until(a.worker, time.Second, stopCh)<-stopCh
}

这里会调用worker执行具体的扩缩容的逻辑。

hpa逻辑路口:定时执行worker

go wait.Until(a.worker, time.Second, stopCh)

核心代码分析

processNextWorkItem: 遍历所有hpa对象

func (a *HorizontalController) worker() {for a.processNextWorkItem() {}klog.Infof("horizontal pod autoscaler controller worker shutting down")
}
func (a *HorizontalController) processNextWorkItem() bool {key, quit := a.queue.Get()if quit {return false}defer a.queue.Done(key)deleted, err := a.reconcileKey(key.(string))if err != nil {utilruntime.HandleError(err)}if !deleted {a.queue.AddRateLimited(key)}return true
}
hpa对象存储在HorizontalController的队列中,遍历每个hpa对象进行处理。
processNextWorkItem->|-> reconcileKey|-> reconcileAutoscaler

worker里面一路执行下来会走到reconcileAutoscaler方法里面,这里是HPA的核心。下面我们专注看看这部分。

reconcileAutoscaler:计算副本数

func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.HorizontalPodAutoscaler, key string) error {...//副本数为0,不启动自动扩缩容if scale.Spec.Replicas == 0 && minReplicas != 0 {// Autoscaling is disabled for this resourcedesiredReplicas = 0rescale = falsesetCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero")//  如果当前副本数大于最大期望副本数,那么设置期望副本数为最大副本数} else if currentReplicas > hpa.Spec.MaxReplicas {rescaleReason = "Current number of replicas above Spec.MaxReplicas"desiredReplicas = hpa.Spec.MaxReplicas// 同上} else if currentReplicas < minReplicas {rescaleReason = "Current number of replicas below Spec.MinReplicas"desiredReplicas = minReplicas} else {var metricTimestamp time.Time//计算需要扩缩容的数量metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(hpa, scale, hpa.Spec.Metrics)if err != nil {...}klog.V(4).Infof("proposing %v desired replicas (based on %s from %s) for %s", metricDesiredReplicas, metricName, metricTimestamp, reference)rescaleMetric := ""if metricDesiredReplicas > desiredReplicas {desiredReplicas = metricDesiredReplicasrescaleMetric = metricName}if desiredReplicas > currentReplicas {rescaleReason = fmt.Sprintf("%s above target", rescaleMetric)}if desiredReplicas < currentReplicas {rescaleReason = "All metrics below target"}//可以在扩缩容的时候指定一个稳定窗口,以防止缩放目标中的副本数量出现波动//doc:https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#support-for-configurable-scaling-behaviorif hpa.Spec.Behavior == nil {desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas)} else {desiredReplicas = a.normalizeDesiredReplicasWithBehaviors(hpa, key, currentReplicas, desiredReplicas, minReplicas)}rescale = desiredReplicas != currentReplicas}...
}
这一段代码是reconcileAutoscaler里面的核心代码,在这里会确定一个区间,首先根据当前的scale对象和当前hpa里面配置的对应的参数的值,决策当前的副本数量,其中针对于超过设定的maxReplicas和小于minReplicas两种情况,只需要简单的修正为对应的值,直接更新对应的scale对象即可,而scale副本为0的对象,则hpa不会在进行任何操作。

对于当前副本数在maxReplicas和minReplicas之间的时候,则需要计算是否需要扩缩容,计算则是调用computeReplicasForMetrics方法来实现。

computeReplicasForMetrics 遍历度量目标

func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {...//这里的度量目标可以是一个列表,所以遍历之后取最大的需要扩缩容的数量for i, metricSpec := range metricSpecs {//根据type类型计算需要扩缩容的数量replicaCountProposal, metricNameProposal, timestampProposal, condition, err := a.computeReplicasForMetric(hpa, metricSpec, specReplicas, statusReplicas, selector, &statuses[i])if err != nil {if invalidMetricsCount <= 0 {invalidMetricCondition = conditioninvalidMetricError = err}invalidMetricsCount++}//记录最大的需要扩缩容的数量if err == nil && (replicas == 0 || replicaCountProposal > replicas) {timestamp = timestampProposalreplicas = replicaCountProposalmetric = metricNameProposal}}...return replicas, metric, statuses, timestamp, nil
}

在上面的代码中遍历所有的metrics,然后选取返回副本数最大的那个。主要计算逻辑都在computeReplicasForMetric中,下面我们看看这个方法。

computeReplicasForMetric:根据type计算副本数

func (a *HorizontalController) computeReplicasForMetric(hpa *autoscalingv2.HorizontalPodAutoscaler, spec autoscalingv2.MetricSpec,specReplicas, statusReplicas int32, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, metricNameProposal string,timestampProposal time.Time, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {//根据不同的类型来进行计量switch spec.Type {//表示如果是一个k8s对象,如Ingress对象case autoscalingv2.ObjectMetricSourceType:...//  表示pod度量类型case autoscalingv2.PodsMetricSourceType:metricSelector, err := metav1.LabelSelectorAsSelector(spec.Pods.Metric.Selector)if err != nil {condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)}//仅支持AverageValue度量目标,计算需要扩缩容的数量replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForPodsMetric(specReplicas, spec, hpa, selector, status, metricSelector)if err != nil {return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)}//    表示Resource度量类型case autoscalingv2.ResourceMetricSourceType:...case autoscalingv2.ExternalMetricSourceType:...default:errMsg := fmt.Sprintf("unknown metric source type %q", string(spec.Type))err = fmt.Errorf(errMsg)condition := a.getUnableComputeReplicaCountCondition(hpa, "InvalidMetricSourceType", err)return 0, "", time.Time{}, condition, err}return replicaCountProposal, metricNameProposal, timestampProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
}

这里会根据不同的度量类型来进行统计,目前度量类型有四种,分别是Pods、Object、Resource、External。

computeStatusForPodsMetric&GetMetricReplicas:计算需要扩缩容的数量

文件位置:pkg/controller/podautoscaler/replica_calculator.go

func (a *HorizontalController) computeStatusForPodsMetric(currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler, selector labels.Selector, status *autoscalingv2.MetricStatus, metricSelector labels.Selector) (replicaCountProposal int32, timestampProposal time.Time, metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {//计算需要扩缩容的数量replicaCountProposal, utilizationProposal, timestampProposal, err := a.replicaCalc.GetMetricReplicas(currentReplicas, metricSpec.Pods.Target.AverageValue.MilliValue(), metricSpec.Pods.Metric.Name, hpa.Namespace, selector, metricSelector)if err != nil {condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)return 0, timestampProposal, "", condition, err}...return replicaCountProposal, timestampProposal, fmt.Sprintf("pods metric %s", metricSpec.Pods.Metric.Name), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
}func (c *ReplicaCalculator) GetMetricReplicas(currentReplicas int32, targetUtilization int64, metricName string, namespace string, selector labels.Selector, metricSelector labels.Selector) (replicaCount int32, utilization int64, timestamp time.Time, err error) {//获取pod中度量数据metrics, timestamp, err := c.metricsClient.GetRawMetric(metricName, namespace, selector, metricSelector)if err != nil {return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v", metricName, err)}//通过结合度量数据来计算希望扩缩容的数量是多少replicaCount, utilization, err = c.calcPlainMetricReplicas(metrics, currentReplicas, targetUtilization, namespace, selector, v1.ResourceName(""))return replicaCount, utilization, timestamp, err
}

这里会调用GetRawMetric方法来获取pod对应的度量数据,然后再调用calcPlainMetricReplicas方法结合度量数据与目标期望来计算希望扩缩容的数量是多少。

calcPlainMetricReplicas:计算副本数具体实现

func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUtilization int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, utilization int64, err error) {podList, err := c.podLister.Pods(namespace).List(selector)...//将pod分成三类进行统计,得到ready的pod数量、ignored Pod集合、missing Pod集合readyPodCount, ignoredPods, missingPods := groupPods(podList, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus)//在度量的数据里移除ignored Pods集合的数据removeMetricsForPods(metrics, ignoredPods)//计算pod中container request 设置的资源之和requests, err := calculatePodRequests(podList, resource)...//获取资源使用率usageRatio, utilization := metricsclient.GetMetricUtilizationRatio(metrics, targetUtilization)...
}

这里会调用groupPods将pod列表的进行一个分类统计。ignoredPods集合里面包含了pod状态为PodPending的数据;missingPods列表里面包含了在度量数据里面根据pod名找不到的数据。

因为missingPods的度量数据已经在metrics里是找不到的,然后只需要剔除掉ignored Pods集合中度量的资源就好了。接下来调用calculatePodRequests方法统计pod中container request 设置的资源之和。

总结

hpa整个逻辑流程图:

kubernetes hpa源码分析相关推荐

  1. k8s源码分析 pdf_Spark Kubernetes 的源码分析系列 - features

    1 Overview features 包里的代码,主要是用于构建 Spark 在 K8S 中的各类资源所需要的特征,个人觉得可以理解成这些 features 就是帮你写各类 Kind 的 YAML ...

  2. Kubernetes StatefulSet源码分析

    2019独角兽企业重金招聘Python工程师标准>>> Author: xidianwangtao@gmail.com,Based on Kubernetes 1.9 摘要:Kube ...

  3. kubernetes apiserver源码分析二之路由

    apiserver的man函数在 k8s.io/kubernetes/cmd/kube-apiserver 目录. 但是大部分源码却在 k8s.io/apiserver 这个库里面. cmd 目录下的 ...

  4. 【kubernetes/k8s源码分析】 kubelet cgroup 资源预留源码分析

    kubernetes 1.13 WHY 默认情况下 pod 能够使用节点全部可用资源.用户 pod 中的应用疯狂占用内存,pod 将与 node 上的系统守护进程和 kubernetes 组件争夺资源 ...

  5. 【kubernetes/k8s源码分析】eviction机制原理以及源码解析

    kubernetes v1.12.1 What? kubelet 驱赶的是节点上的某些 Pod,驱赶哪些 Pod与 Qos 机制有关(1.8),1.9 以后的版本请看下文分解 只有当节点内存和磁盘资源 ...

  6. Kubernetes Scheduler源码分析--启动过程与多队列缓存(续)

    继续上文对Scheduler的分析,分析在Scheduler主循环处理过程中,podQueue,Queue和assumePod 三个队列的处理. Scheduler中SchedulerOne为主要的处 ...

  7. 【kubernetes/k8s源码分析】kubelet cri源码分析

    CRI基本原理 早期的 kubernetes 使用 docker 作为默认的 runtime,后来又加入 rkt,每加入一种新运行时,k8s 都要修改接口.container runtime 加入,不 ...

  8. kubeadm源码分析(内含kubernetes离线包,三步安装)

    k8s离线安装包 三步安装,简单到难以置信 kubeadm源码分析 说句实在话,kubeadm的代码写的真心一般,质量不是很高. 几个关键点来先说一下kubeadm干的几个核心的事: kubeadm ...

  9. Kubernetes监控之Heapster源码分析

    源码版本 heapster version: release-1.2 简介 Heapster是Kubernetes下的一个监控项目,用于进行容器集群的监控和性能分析. 基本的功能及概念介绍可以回顾我之 ...

最新文章

  1. 帝国cms7.5电脑端手机端多终端解决方案之选择哪种方案
  2. http服务器 如何传输文件,http服务器 如何传输文件
  3. Effect of Switchovers, Failovers, and Control File Creation on Backups
  4. IDEA运行redis多线程访问报错Exception in thread “main“ java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory
  5. mysql --explicit_【MySQL】 explicit_defaults_for_timestamp 参数解析
  6. php 转码iconv,PHP iconv()函数转字符编码的问题(转)
  7. JAVA进阶教学之(Enum枚举类)
  8. java怎么让1的数据2可以拥有,【如何让代码变“高级”(二)】-这样操作值得一波666(Java Stream)(这么有趣)...
  9. nginx 中location和root,你确定真的明白他们关系?
  10. 卷积神经网络-感受野的定义
  11. linux ubuntu文件系统,Ubuntu Linux 文件系统的主要目录
  12. android p 牛轧糖_Android牛轧糖快速设置图块
  13. 亏损208亿,滴滴橙心优选裁员关停!曾融资12亿,最高估值320亿
  14. bootstracp实现树形列表_用 Python 实现一个网页下载工具
  15. Github视频教程-黄棒清-专题视频课程
  16. 俄亥俄州立大学计算机科学系,俄亥俄州立大学计算机科学与工程系教授张殷乾老师来实验室作学术报告...
  17. 如何设置一个按钮 左边方形右边圆形
  18. 孩子升年级难适应?猿辅导语文金牌教研来支招
  19. 用python画猪_用python画小猪佩奇(非原创)
  20. python tensorflow教程_真正从零开始,TensorFlow详细安装入门图文教程!

热门文章

  1. 多线程的几种实现方式
  2. 【性能测试】轻商城-项目实战2
  3. 来听听一位『大龄程序员』的心声
  4. 嵌入式 Linux LED 驱动开发实验
  5. QuickBooks profitandloss report 获取Not Specified 详情
  6. [剑指 offer]--大顶堆 ➕ 快速选择 --面试题40. 最小的k个数
  7. 没有植入的内容就是TM在逗我
  8. 实时即未来,车联网项目之电子围栏分析【六】
  9. (二)使用数组长度实现ADT bag(java)
  10. 2022AcWing寒假算法每日一题之2058. 笨拙的手指