前一篇文章中,我们探索了Informer工作的大致逻辑,提到了添加回调函数部分包含了三块,即:Informer的创建、函数调用的逻辑、以及回调函数本身。前两块已在前文谈到过,下面我们来看看第三块,即回调函数自身的处理逻辑:

一、回调函数

这里仍然以deployment为例。首先还是进入NewDeploymentController方法:

pkg/controller/deployment/deployment_controller.go

func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {...dc := &DeploymentController{client:        client,eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),}dc.rsControl = controller.RealRSControl{KubeClient: client,Recorder:   dc.eventRecorder,}dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:    dc.addDeployment,UpdateFunc: dc.updateDeployment,DeleteFunc: dc.deleteDeployment,})rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:    dc.addReplicaSet,UpdateFunc: dc.updateReplicaSet,DeleteFunc: dc.deleteReplicaSet,})podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{DeleteFunc: dc.deletePod,})dc.syncHandler = dc.syncDeploymentdc.enqueueDeployment = dc.enqueuedc.dLister = dInformer.Lister()dc.rsLister = rsInformer.Lister()dc.podLister = podInformer.Lister()dc.dListerSynced = dInformer.Informer().HasSynceddc.rsListerSynced = rsInformer.Informer().HasSynceddc.podListerSynced = podInformer.Informer().HasSyncedreturn dc, nil
}

我们看到,方法为Deployment和ReplicaSet的informer分别添加了增、改、删的回调函数,而为Pod的informer仅仅添加了删除的回调函数。这主要是针对策略为Recreate的Deployment(即所有旧Pod都删除后再创建新Pod),且仅当所有旧Pod都删除后才会进行下一步操作(马上提到)。对于RollingUpdate策略的Deployment,Pod数量的维持由dc的rsControl字段通过创建一个RSController来负责,不涉及进一步的操作。

这里说的进一步操作,指调用dc的enqueueDeployment方法,将Deployment加入队列中。比如,看addDeployment方法:

pkg/controller/deployment/deployment_controller.go
func (dc *DeploymentController) addDeployment(obj interface{}) {d := obj.(*apps.Deployment)klog.V(4).Infof("Adding deployment %s", d.Name)dc.enqueueDeployment(d)
}

再比如,看deleteDeployment方法:

pkg/controller/deployment/deployment_controller.go
func (dc *DeploymentController) deleteDeployment(obj interface{}) {d, ok := obj.(*apps.Deployment)if !ok {...}klog.V(4).Infof("Deleting deployment %s", d.Name)dc.enqueueDeployment(d)
}

7个回调函数都是这样,在最后调用了enqueueDeployment方法,将Deployment存入队列中。

enqueueDeployment已被设置为enqueue方法,所以看一下enqueue方法:

pkg/controller/deployment/deployment_controller.go
func (dc *DeploymentController) enqueue(deployment *apps.Deployment) {key, err := controller.KeyFunc(deployment)if err != nil {utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err))return}dc.queue.Add(key)
}

这里,KeyFunc的作用是将Deployment的namespace和name字段提取出来,并以namespace/name的格式返回字符串。而这个字符串则被存入队列中,以待下一步的处理。

所以,7个回调函数殊途同归,最终都是将相关的Deployment以namespace/name的格式存入了队列中。

二、Run

存入队列中的Deployment如何被处理呢?答案是在startDeploymentController方法中通过Go协程调用Run方法。现在我们来看一下Run方法:

pkg/controller/deployment/deployment_controller.go
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {defer utilruntime.HandleCrash()defer dc.queue.ShutDown()klog.Infof("Starting deployment controller")defer klog.Infof("Shutting down deployment controller")if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {return}for i := 0; i < workers; i++ {go wait.Until(dc.worker, time.Second, stopCh)}<-stopCh
}

其中,WaitForCacheSync方法是在对controller中Deployment、ReplicaSet和Pod的缓存进行同步,如果失败,则直接返回。

后面通过goroutine调用worker方法。worker方法则是调用了processNextWorkItem方法,在队列不为空的情况下持续取出队列中的下一个元素,并调用syncHandler方法执行操作。而前面的NewDeploymentController方法中,有一句dc.syncHandler = dc.syncDeployment,因此调用syncHandler方法即为调用syncDeployment方法。

三、syncDeployment

这个方法是对队列中Deployment元素的处理函数。方法中包含了对Deployment的一些操作,如获取ReplicaSet列表、进行更新和回滚等等。

pkg/controller/deployment/deployment_controller.go
func (dc *DeploymentController) syncDeployment(key string) error {startTime := time.Now()klog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime)defer func() {klog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Since(startTime))}()namespace, name, err := cache.SplitMetaNamespaceKey(key)if err != nil {return err}deployment, err := dc.dLister.Deployments(namespace).Get(name)if errors.IsNotFound(err) {klog.V(2).Infof("Deployment %v has been deleted", key)return nil}if err != nil {return err}// Deep-copy otherwise we are mutating our cache.// TODO: Deep-copy only when needed.d := deployment.DeepCopy()everything := metav1.LabelSelector{}if reflect.DeepEqual(d.Spec.Selector, &everything) {dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")if d.Status.ObservedGeneration < d.Generation {d.Status.ObservedGeneration = d.Generationdc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(d)}return nil}// List ReplicaSets owned by this Deployment, while reconciling ControllerRef// through adoption/orphaning.rsList, err := dc.getReplicaSetsForDeployment(d)if err != nil {return err}// List all Pods owned by this Deployment, grouped by their ReplicaSet.// Current uses of the podMap are://// * check if a Pod is labeled correctly with the pod-template-hash label.// * check that no old Pods are running in the middle of Recreate Deployments.podMap, err := dc.getPodMapForDeployment(d, rsList)if err != nil {return err}if d.DeletionTimestamp != nil {return dc.syncStatusOnly(d, rsList)}// Update deployment conditions with an Unknown condition when pausing/resuming// a deployment. In this way, we can be sure that we won't timeout when a user// resumes a Deployment with a set progressDeadlineSeconds.if err = dc.checkPausedConditions(d); err != nil {return err}if d.Spec.Paused {return dc.sync(d, rsList)}// rollback is not re-entrant in case the underlying replica sets are updated with a new// revision so we should ensure that we won't proceed to update replica sets until we// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.if getRollbackTo(d) != nil {return dc.rollback(d, rsList)}scalingEvent, err := dc.isScalingEvent(d, rsList)if err != nil {return err}if scalingEvent {return dc.sync(d, rsList)}switch d.Spec.Strategy.Type {case apps.RecreateDeploymentStrategyType:return dc.rolloutRecreate(d, rsList, podMap)case apps.RollingUpdateDeploymentStrategyType:return dc.rolloutRolling(d, rsList)}return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}

方法大约包含以下几部分:

(1)取出的元素为namespace/name格式,所以首先根据这两个字段获取deployment对象。

(2)检查Deployment的DeletionTimestamp、PauseCondition、ScalingEvent等字段的内容,并调用相应的处理函数,包括sync、syncStatusOnly等。

(3)判断Deployment的策略是RollingUpdate还是Recreate,并调用相应的处理函数,包括rolloutRolling等。

下面我们看几个处理函数。

四、处理函数

先来看一下syncStatusOnly方法:

pkg/controller/deployment/sync.go// syncStatusOnly only updates Deployments Status and doesn't take any mutating actions.
func (dc *DeploymentController) syncStatusOnly(d *apps.Deployment, rsList []*apps.ReplicaSet) error {newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)if err != nil {return err}allRSs := append(oldRSs, newRS)return dc.syncDeploymentStatus(allRSs, newRS, d)
}

正如注释所说,syncStatusOnly方法只做了一件事,就是调用syncDeploymentStatus方法,来同步Deployment的状态。此方法在后面分析。

再来看sync方法:

pkg/controller/deployment/sync.go// sync is responsible for reconciling deployments on scaling events or when they
// are paused.
func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaSet) error {newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)if err != nil {return err}if err := dc.scale(d, newRS, oldRSs); err != nil {// If we get an error while trying to scale, the deployment will be requeued// so we can abort this resync
        return err}// Clean up the deployment when it's paused and no rollback is in flight.if d.Spec.Paused && getRollbackTo(d) == nil {if err := dc.cleanupDeployment(oldRSs, d); err != nil {return err}}allRSs := append(oldRSs, newRS)return dc.syncDeploymentStatus(allRSs, newRS, d)
}

如注释所说,此方法比syncStatusOnly方法多了对Deployment的scale和pause事件的处理,但最后仍然是调用syncDeploymentStatus方法,同步Deployment的状态。

再来看看syncDeploymentStatus方法:

pkg/controller/deployment/sync.go
func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {newStatus := calculateStatus(allRSs, newRS, d)if reflect.DeepEqual(d.Status, newStatus) {return nil}newDeployment := dnewDeployment.Status = newStatus_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(newDeployment)return err
}

此方法首先检查新旧Deployment的状态是否一样,如果一样则直接返回。否则,就通过client连接API Server,修改Deployment的状态。这一步,就是controller处理deployment的最终目的。至于后面对Pod的具体操作,就是kubelet的职责了,而controller所负责的就是与API Server交互,并在API Server中更新deployment的状态。

rolloutRolling方法用于处理Deployment的rollingupdat事件,最后的本质仍是通过与API Server交互来更新Deployment的状态。

五、总结

至此,Controller Manager的大致运行流程就整理完了。

总结一下,Controller Manager本质上是kubernetes中众多controller的管理组件。每个controller在启动时都会运行自己的informer。这些informer通过list-watch机制,通过与API Server交互,获取监听资源的实时状态变化,并在缓存中进行更新。

另一方面,informer会使用listener机制,从缓存中取出内容,并调用相应的回调函数,对取出的内容进行处理。

以deployment controller为例,deployment的informer最终会将deployment存入队列中。队列中的元素取出后经过一系列deployment的scale、rollingupdate等处理,最终再次通过与API Server的交互,在API Server中更新处理后资源的状态。这就是一个controller的循环。其余的Controller大致也是依照这一流程在运行的。

转载于:https://www.cnblogs.com/00986014w/p/10570488.html

Kubernetes源码阅读笔记——Controller Manager(之三)相关推荐

  1. kubernetes源码阅读笔记——Kubelet(之三)

    回顾第一篇文章(https://www.cnblogs.com/00986014w/p/10458231.html),我们讲到RunKubelet方法实现kubelet的运行,而RunKubelet方 ...

  2. CI框架源码阅读笔记4 引导文件CodeIgniter.php

    到了这里,终于进入CI框架的核心了.既然是"引导"文件,那么就是对用户的请求.参数等做相应的导向,让用户请求和数据流按照正确的线路各就各位.例如,用户的请求url: http:// ...

  3. 【Flink】Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型

    1.概述 转载:Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型 相似文章:[Flink]Flink 基于 MailBox 实现的 StreamTask 线程模型 Fl ...

  4. Android TV TIF源码阅读笔记

                                   Android TV TIF源码阅读笔记 1.SystemSever.java if (mPackageManager.hasSystem ...

  5. Transformers包tokenizer.encode()方法源码阅读笔记

    Transformers包tokenizer.encode()方法源码阅读笔记_天才小呵呵的博客-CSDN博客_tokenizer.encode

  6. 源码阅读笔记 BiLSTM+CRF做NER任务 流程图

    源码阅读笔记 BiLSTM+CRF做NER任务(二) 源码地址:https://github.com/ZhixiuYe/NER-pytorch 本篇正式进入源码的阅读,按照流程顺序,一一解剖. 一.流 ...

  7. 代码分析:NASM源码阅读笔记

    NASM源码阅读笔记 NASM(Netwide Assembler)的使用文档和代码间的注释相当齐全,这给阅读源码 提供了很大的方便.按作者的说法,这是一个模块化的,可重用的x86汇编器, 而且能够被 ...

  8. Yii源码阅读笔记 - 日志组件

    2015-03-09 一 By youngsterxyf 使用 Yii框架为开发者提供两个静态方法进行日志记录: Yii::log($message, $level, $category); Yii: ...

  9. AQS源码阅读笔记(一)

    AQS源码阅读笔记 先看下这个类张非常重要的一个静态内部类Node.如下: static final class Node {//表示当前节点以共享模式等待锁static final Node SHA ...

最新文章

  1. 赠票 | 互联网大厂的数据治理与资产管理实战 | DAMS 2020
  2. 关于Ajax 错误:'sys'未定义解决方法.
  3. JUnit4用法详解
  4. 优先级队列应用-称检测点查询
  5. c语言 fscanf的头文件,fscanf函数在哪个头文件中
  6. IDC MarketScape:华为云IoT物联网平台位居领导者象限
  7. trimble ux5hp无人机航测_无人机航测:这几件事情你必须知道
  8. 怎么查询服务器绑定的网站吗,服务器绑定网站吗
  9. Webstorm2018破解
  10. ug怎么画曲线_UG怎么画雨伞?ug曲面造型实例教程
  11. Matlab故障树的最小割集的求解
  12. 聊天室应用开发实践(二):实现基于 Web 的聊天室
  13. 【文印技巧】明明选了黑白打印,却印出了棕红色,怎么解决?
  14. 论文阅读 | Combating Adversarial Misspellings with Robust Word Recognition
  15. Linux--RAID 磁盘列阵与阵列卡
  16. 机器人开源项目KDL源码学习整理
  17. 从“半部电台”到“云监工” 天翼云助力红色电信启航新征程
  18. Numpy:两小时速成
  19. 2D/3D视图变换、canvas画布
  20. 人脸识别、二维码电子签到,让会议会展入场更加智能!

热门文章

  1. GDI+ 中Image::FromStream ,用流的方式显示图像
  2. 用计算机源码计算加法,MFC实现简单计算器(支持加减乘除和括号运算)
  3. python3 rsa加密_【Python】Python3 实现 JS 中 RSA 加密的 NoPadding 模式
  4. php redis 接口,PHP 开发 APP 接口 --Redis篇(示例代码)
  5. vue 富文本存储_Vue富文本编辑器
  6. oracle 优化逻辑读过高,详述逻辑读与arraysize的关系
  7. matlab神经网络的简单程序设计,BP神经网络设计的matlab简单实现
  8. 基于MFC相机采集的实现与采集回调函数的应用实例
  9. ROS中RVIS导入机器人模型,添加摄像头,雷达,Kinect
  10. java 对象 php对象_java对象是什么?