几乎所有的Controller manager 和CRD Controller 都会使用Client-go 的Informer 函数,这样通过Watch 或者Get List 可以获取对应的Object,下面我们从源码分析角度来看一下Client go Informer 的机制。

kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)controller := NewController(kubeClient, exampleClient,kubeInformerFactory.Apps().V1().Deployments(),exampleInformerFactory.Samplecontroller().V1alpha1().Foos())// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
kubeInformerFactory.Start(stopCh)

这里的例子是以https://github.com/kubernetes/sample-controller/blob/master/main.go节选,主要以 k8s 默认的Deployment Informer 为例子。可以看到直接使用Client-go Informer 还是非常简单的,先不管NewCOntroller函数里面执行了什么,顺着代码来看一下kubeInformerFactory.Start 都干了啥。

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {f.lock.Lock()defer f.lock.Unlock()for informerType, informer := range f.informers {if !f.startedInformers[informerType] {go informer.Run(stopCh)f.startedInformers[informerType] = true}}
}

可以看到这里遍历了f.informers,而informers 的定义我们来看一眼数据结构

type sharedInformerFactory struct {client           kubernetes.Interfacenamespace        stringtweakListOptions internalinterfaces.TweakListOptionsFunclock             sync.MutexdefaultResync    time.DurationcustomResync     map[reflect.Type]time.Durationinformers map[reflect.Type]cache.SharedIndexInformer// startedInformers is used for tracking which informers have been started.// This allows Start() to be called multiple times safely.startedInformers map[reflect.Type]bool
}

我们这里的例子,在运行的时候,f.informers里面含有的内容如下

type *v1.Deployment informer &{0xc000379fa0 <nil> 0xc00038ccb0 {} 0xc000379f80 0xc00033bb00 30000000000 30000000000 0x28e5ec8 false false {0 0} {0 0}}

也就是说,每一种k8s 类型都会有自己的Informer函数。下面我们来看一下这个函数是在哪里注册的,这里以Deployment Informer 为例。

首先回到刚开始初始化kubeClient 的代码,

controller := NewController(kubeClient, exampleClient,kubeInformerFactory.Apps().V1().Deployments(),exampleInformerFactory.Samplecontroller().V1alpha1().Foos())deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: controller.handleObject,UpdateFunc: func(old, new interface{}) {newDepl := new.(*appsv1.Deployment)oldDepl := old.(*appsv1.Deployment)if newDepl.ResourceVersion == oldDepl.ResourceVersion {// Periodic resync will send update events for all known Deployments.// Two different versions of the same Deployment will always have different RVs.return}controller.handleObject(new)},DeleteFunc: controller.handleObject,})

注意这里的传参, kubeInformerFactory.Apps().V1().Deployments(), 这句话的意思就是指创建一个只关注Deployment 的Informer.

controller := &Controller{kubeclientset:     kubeclientset,sampleclientset:   sampleclientset,deploymentsLister: deploymentInformer.Lister(),deploymentsSynced: deploymentInformer.Informer().HasSynced,foosLister:        fooInformer.Lister(),foosSynced:        fooInformer.Informer().HasSynced,workqueue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),recorder:          recorder,}

deploymentInformer.Lister() 这里就是初始化了一个Deployment Lister,下面来看一下Lister函数里面做了什么。

// NewFilteredDeploymentInformer constructs a new informer for Deployment type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {return cache.NewSharedIndexInformer(&cache.ListWatch{ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.AppsV1().Deployments(namespace).List(options)},WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.AppsV1().Deployments(namespace).Watch(options)},},&appsv1.Deployment{},resyncPeriod,indexers,)
}func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}func (f *deploymentInformer) Informer() cache.SharedIndexInformer {return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}func (f *deploymentInformer) Lister() v1.DeploymentLister {return v1.NewDeploymentLister(f.Informer().GetIndexer())
}

注意这里的Lister 函数,它调用了Informer ,然后触发了f.factory.InformerFor 
这就最终调用了sharedInformerFactory InformerFor函数,

// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {f.lock.Lock()defer f.lock.Unlock()informerType := reflect.TypeOf(obj)informer, exists := f.informers[informerType]if exists {return informer}resyncPeriod, exists := f.customResync[informerType]if !exists {resyncPeriod = f.defaultResync}informer = newFunc(f.client, resyncPeriod)f.informers[informerType] = informerreturn informer
}

这里可以看到,informer = newFunc(f.client, resyncPeriod)这句话最终完成了对于informer的创建,并且注册到了Struct object中,完成了前面我们的问题。

下面我们再回到informer start 

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {f.lock.Lock()defer f.lock.Unlock()for informerType, informer := range f.informers {if !f.startedInformers[informerType] {go informer.Run(stopCh)f.startedInformers[informerType] = true}}
}

这里可以看到,它会遍历所有的informer,然后选择异步调用Informer 的RUN方法。我们来全局看一下Run方法

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)cfg := &Config{Queue:            fifo,ListerWatcher:    s.listerWatcher,ObjectType:       s.objectType,FullResyncPeriod: s.resyncCheckPeriod,RetryOnError:     false,ShouldResync:     s.processor.shouldResync,Process: s.HandleDeltas,}func() {s.startedLock.Lock()defer s.startedLock.Unlock()s.controller = New(cfg)s.controller.(*controller).clock = s.clocks.started = true}()// Separate stop channel because Processor should be stopped strictly after controllerprocessorStopCh := make(chan struct{})var wg wait.Groupdefer wg.Wait()              // Wait for Processor to stopdefer close(processorStopCh) // Tell Processor to stopwg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)wg.StartWithChannel(processorStopCh, s.processor.run)defer func() {s.startedLock.Lock()defer s.startedLock.Unlock()s.stopped = true // Don't want any new listeners}()s.controller.Run(stopCh)
}

首先它根据得到的 key 拆分函数和Store index 创建一个FIFO队列,这个队列是一个先进先出的队列,主要用来保存对象的各种事件。

func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {f := &DeltaFIFO{items:        map[string]Deltas{},queue:        []string{},keyFunc:      keyFunc,knownObjects: knownObjects,}f.cond.L = &f.lockreturn f
}

可以看到这个队列创建的比较简单,就是使用 Map 来存放数据,String 数组来存放队列的 Key。

后面根据client 创建的List 和Watch 函数,还有队列创建了一个 config,下面将根据这个config 来初始化controller. 这个controller是client-go 的Cache controller ,主要用来控制从 APIServer 获得的对象的 cache 以及更新对象。

下面主要关注这个函数调用

wg.StartWithChannel(processorStopCh, s.processor.run)

这里进行了真正的Listering 调用。

func (p *sharedProcessor) run(stopCh <-chan struct{}) {func() {p.listenersLock.RLock()defer p.listenersLock.RUnlock()for _, listener := range p.listeners {p.wg.Start(listener.run)p.wg.Start(listener.pop)}p.listenersStarted = true}()<-stopChp.listenersLock.RLock()defer p.listenersLock.RUnlock()for _, listener := range p.listeners {close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop}p.wg.Wait() // Wait for all .pop() and .run() to stop
}

主要看 run 方法,还记得前面已经把ADD UPDATE DELETE 注册了自定义的处理函数了吗。这里就实现了前面函数的触发

func (p *processorListener) run() {// this call blocks until the channel is closed.  When a panic happens during the notification// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)// the next notification will be attempted.  This is usually better than the alternative of never// delivering again.stopCh := make(chan struct{})wait.Until(func() {// this gives us a few quick retries before a long pause and then a few more quick retrieserr := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {for next := range p.nextCh {switch notification := next.(type) {case updateNotification:p.handler.OnUpdate(notification.oldObj, notification.newObj)case addNotification:p.handler.OnAdd(notification.newObj)case deleteNotification:p.handler.OnDelete(notification.oldObj)default:utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))}}// the only way to get here is if the p.nextCh is empty and closedreturn true, nil})// the only way to get here is if the p.nextCh is empty and closedif err == nil {close(stopCh)}}, 1*time.Minute, stopCh)
}

可以看到当p.nexhCh channel 接收到一个对象进入的时候,就会根据通知类型的不同,选择对应的用户注册函数去调用。那么这个channel 谁来向其中传入参数呢

func (p *processorListener) pop() {defer utilruntime.HandleCrash()defer close(p.nextCh) // Tell .run() to stopvar nextCh chan<- interface{}var notification interface{}for {select {case nextCh <- notification:// Notification dispatchedvar ok boolnotification, ok = p.pendingNotifications.ReadOne()if !ok { // Nothing to popnextCh = nil // Disable this select case}case notificationToAdd, ok := <-p.addCh:if !ok {return}if notification == nil { // No notification to pop (and pendingNotifications is empty)// Optimize the case - skip adding to pendingNotificationsnotification = notificationToAddnextCh = p.nextCh} else { // There is already a notification waiting to be dispatchedp.pendingNotifications.WriteOne(notificationToAdd)}}}
}

答案就是这个pop 函数,这里会从p.addCh中读取增加的通知,然后转给p.nexhCh  并且保证每个通知只会读取一次。

下面就是最终的Controller run 函数,我们来看看到底干了什么

// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()go func() {<-stopChc.config.Queue.Close()}()r := NewReflector(c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,)r.ShouldResync = c.config.ShouldResyncr.clock = c.clockc.reflectorMutex.Lock()c.reflector = rc.reflectorMutex.Unlock()var wg wait.Groupdefer wg.Wait()wg.StartWithChannel(stopCh, r.Run)wait.Until(c.processLoop, time.Second, stopCh)
}

这里主要的就是wg.StartWithChannel(stopCh, r.Run)

// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)wait.Until(func() {if err := r.ListAndWatch(stopCh); err != nil {utilruntime.HandleError(err)}}, r.period, stopCh)
}

这里就调用了r.ListAndWatch 方法,这个方法比较复杂,我们慢慢来看。

// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {start := r.clock.Now()eventCount := 0// Stopping the watcher should be idempotent and if we return from this function there's no way// we're coming back in with the same watch interface.defer w.Stop()// update metricsdefer func() {r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))r.metrics.watchDuration.Observe(time.Since(start).Seconds())}()loop:for {select {case <-stopCh:return errorStopRequestedcase err := <-errc:return errcase event, ok := <-w.ResultChan():if !ok {break loop}if event.Type == watch.Error {return apierrs.FromObject(event.Object)}if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))continue}meta, err := meta.Accessor(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))continue}newResourceVersion := meta.GetResourceVersion()switch event.Type {case watch.Added:err := r.store.Add(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))}case watch.Modified:err := r.store.Update(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))}case watch.Deleted:// TODO: Will any consumers need access to the "last known// state", which is passed in event.Object? If so, may need// to change this.err := r.store.Delete(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))}default:utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))}*resourceVersion = newResourceVersionr.setLastSyncResourceVersion(newResourceVersion)eventCount++}}watchDuration := r.clock.Now().Sub(start)if watchDuration < 1*time.Second && eventCount == 0 {r.metrics.numberOfShortWatches.Inc()return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)}klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)return nil
}

这里就是真正调用watch 方法,根据返回的watch 事件,将其放入到前面创建的 FIFO 队列中。

最终调用了controller 的POP 方法

// processLoop drains the work queue.
// TODO: Consider doing the processing in parallel. This will require a little thought
// to make sure that we don't end up processing the same object multiple times
// concurrently.
//
// TODO: Plumb through the stopCh here (and down to the queue) so that this can
// actually exit when the controller is stopped. Or just give up on this stuff
// ever being stoppable. Converting this whole package to use Context would
// also be helpful.
func (c *controller) processLoop() {for {obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err != nil {if err == FIFOClosedError {return}if c.config.RetryOnError {// This is the safe way to re-enqueue.c.config.Queue.AddIfNotPresent(obj)}}}
}

前面是将 watch 到的对象加入到队列中,这里的goroutine 就是用来消费的。具体的消费函数就是前面创建的Process 函数

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {s.blockDeltas.Lock()defer s.blockDeltas.Unlock()// from oldest to newestfor _, d := range obj.(Deltas) {switch d.Type {case Sync, Added, Updated:isSync := d.Type == Syncs.cacheMutationDetector.AddObject(d.Object)if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {if err := s.indexer.Update(d.Object); err != nil {return err}s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)} else {if err := s.indexer.Add(d.Object); err != nil {return err}s.processor.distribute(addNotification{newObj: d.Object}, isSync)}case Deleted:if err := s.indexer.Delete(d.Object); err != nil {return err}s.processor.distribute(deleteNotification{oldObj: d.Object}, false)}}return nil
}

这个函数就是根据传进来的obj,先从自己的cache 中取一下,看是否存在,如果存在就代表是Update ,那么更新自己的队列后,调用用户注册的Update 函数,如果不存在,就调用用户的 Add 函数。

到此Client-go 的Informer 流程源码分析基本完毕。

原文链接
本文为云栖社区原创内容,未经允许不得转载。

Kubernetes Client-go Informer 源码分析相关推荐

  1. k8s client-go源码分析 informer源码分析(3)-Reflector源码分析

    k8s client-go源码分析 informer源码分析(3)-Reflector源码分析 1.Reflector概述 Reflector从kube-apiserver中list&watc ...

  2. Kubernetes监控之Heapster源码分析

    源码版本 heapster version: release-1.2 简介 Heapster是Kubernetes下的一个监控项目,用于进行容器集群的监控和性能分析. 基本的功能及概念介绍可以回顾我之 ...

  3. MapReduce中Client提交Job源码分析

    回顾 在进行submit源码分析之前,先来回顾一下WordCount案例(点击查看WordCount案例).仔细回想一下曾经Client都干了点啥?获取对象-->一通set-->job.w ...

  4. Informer源码分析

    1 数据准备 首先是数据准备阶段的入口函数,位于Exp_Informer类的train函数内 train_data, train_loader = self._get_data(flag = 'tra ...

  5. kubeadm源码分析(内含kubernetes离线包,三步安装)

    k8s离线安装包 三步安装,简单到难以置信 kubeadm源码分析 说句实在话,kubeadm的代码写的真心一般,质量不是很高. 几个关键点来先说一下kubeadm干的几个核心的事: kubeadm ...

  6. tfs_client php,TFS 源码分析 写文件操作 Client端

    整个写文件的总体流程这里有介绍 主要分析了写文件时,NameServer端的源码分析 这篇文章介绍写文件时,Client端的源码分析 本文描述的内容涉及TFS写入流程图中的step1, step2, ...

  7. kube-controller-manager源码分析(三)之 Informer机制

    本文个人博客地址:https://www.huweihuang.com/kubernetes-notes/code-analysis/kube-controller-manager/sharedInd ...

  8. Kubernetes StatefulSet源码分析

    2019独角兽企业重金招聘Python工程师标准>>> Author: xidianwangtao@gmail.com,Based on Kubernetes 1.9 摘要:Kube ...

  9. kubeadm源码分析(kubernetes离线安装包,三步安装)

    k8s离线安装包 三步安装,简单到难以置信 kubeadm源码分析 说句实在话,kubeadm的代码写的真心一般,质量不是很高. 几个关键点来先说一下kubeadm干的几个核心的事: kubeadm ...

最新文章

  1. nyoj19 全排列
  2. 赛灵思PLL重配置一PLL配置介绍___S6器件族
  3. 购物车的实现 ajax
  4. python语言入门详解-python初级教程:入门详解
  5. java io流顶层_Java中的IO流(一)
  6. 使用游标显示销售报表_协助报表开发之 MongoDB join mysql
  7. JUC队列-ArrayBlockingQueue(一)
  8. python语言的取余运算符_Python 中用于整数除法取余的运算符是()_学小易找答案...
  9. 使用sublime的SQLTools插件访问MySQL的配置方法
  10. string返回第n个字符_Programming in Lualua学习第13期 Lua字符串库
  11. Java中的自定义注解
  12. 运维之我的docker-不要在给你的docker安装ssh server
  13. LeetCode 16. 3Sum Closest(最接近的三数之和)
  14. Java solrj client 添加JavaEntity Bean
  15. 中压变频器行业现状调研及趋势分析报告
  16. PSPNet | 语义分割及场景分析
  17. edem颗粒替换_EDEM离散元软件中颗粒替换与填充编程模版
  18. Ignite 安装启动(本地单机)
  19. 自媒体赚钱网站有哪些(写文章赚钱的网站大全)
  20. python extractor_Day 16: Goose Extractor —— 好用的文章提取工具

热门文章

  1. php数组o m n mn,O(m + n)和O(mn)之间的区别?
  2. php 类常量用法,php类常量用法实例分析
  3. latex 分页_latex 图片跨页显示问题???
  4. vb中怎么使图片适应框的大小_叮!VB考前练习了解一下?
  5. 传统的线性降维方法效果不佳。_10分钟数据降维入门
  6. js原生popup_JavaScript的popup框
  7. 引用另一模板的宏_生信人值得拥有的编程模板Shell
  8. 一条消息未发,粉丝已破千万
  9. AI如何用特征、权重和避免过拟合等技术来分辨冬瓜和西瓜?
  10. 一日之计在于晨,早起挑战万元大奖!