client-go: Informer机制之reflector源码分析

目的

为了能充分了解Inform机制的原理,我们需要了解Inform机制的起点——reflector,那么,reflector是如何将数据从api-server拉下来?又是如何将数据存入本地的呢?解决这两个疑问就是本篇文章的重点。希望大家也能在此过程中能顺便了解k8s中list-watch机制缓存对象数据的原理。

源码分析的大致流程

首先,需要了解reflector结构体中的各个属性,然后是reflector是如何初始化,最后针对reflector中几个核心方法进行源码解析。

reflector

reflector直接英译结果是反光板,我觉得开发人员想表达的意思是一种映射/对称的含义,就是将api-server中对象数据获取到并实时的更新进本地,使得本地数据和etcd数据的完全一样,我觉得本地对象数据可以称为k8s资源数据的一份快照。reflector实际的作用是监控指定资源的Kubernetes资源,当监控的资源发生变化时触发相应的变更事件,例如Added(资源添加)事件、Updated(资源更新)事件、Deleted(资源删除)事件,并将其资源对象存放到本地缓存DeltaFIFO中。

reflector属性

看下reflector结构体的属性以及里面的注释

type Reflector struct {name string // 名称expectedTypeName string // 期待的事件类型名称,用于判断和监控到的事件是否一致expectedType reflect.Type // 期待事件类型,用于判断和监控到的事件是否一致expectedGVK *schema.GroupVersionKind // 期待的GVK,用于判断和监控到的对象的GVK是否一致store Store // deltalFIFO队列还是IndexerlisterWatcher ListerWatcher // 封装list 和 watch接口的实例backoffManager wait.BackoffManager // 退避管理器resyncPeriod time.Duration // 重新同步的间隔ShouldResync func() bool // 标记是否开启重新同步clock clock.Clock // 时钟paginatedResult bool // list的时候是否需要分页lastSyncResourceVersion string // 上一次同步的资源版本isLastSyncResourceVersionUnavailable bool // 上一次同步资源不可用状态lastSyncResourceVersionMutex sync.RWMutex // 上一次同步资源版本的读写锁WatchListPageSize int64 //页大小watchErrorHandler WatchErrorHandler // watch接口的错误处理
}

reflector的初始化

这边可以看到有三种方式:NewReflector,NewNamedReflector和NewNamespaceKeyedIndexerAndReflector

// 传入listwatcher对象,期待类型,deltafifo,重新同步周期
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {// 调用的下面的新建方法return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}// 与上一个初始化的区别在于可以摄入Name
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {realClock := &clock.RealClock{}r := &Reflector{name:          name, // 设置名字listerWatcher: lw, // listWatcherstore:         store, // 本地存储backoffManager:    wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), // 退避管理器resyncPeriod:      resyncPeriod, // 重新同步周期clock:             realClock, // 时钟watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), // 错误处理器}r.setExpectedType(expectedType) // 设置期待类型return r
}// 新建Indexer和reflector。
func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {// index指定KeyFuncindexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})reflector = NewReflector(lw, expectedType, indexer, resyncPeriod) // 调用第一个函数return indexer, reflector
}

reflector中的核心方法

1、Run方法:

func (r *Reflector) Run(stopCh <-chan struct{}) {klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)wait.BackoffUntil(func() {// 将根据backoffManager周期性运行这个函数if err := r.ListAndWatch(stopCh); err != nil { // 调用listAndWatch方法r.watchErrorHandler(r, err)}}, r.backoffManager, true, stopCh)klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

2、ListAndWatch方法:

这个方法有点长,主要分为list、定时同步和watch三个部分;

  • List部分逻辑:设置分页参数;执行list方法;将list结果同步进DeltaFIFO队列中;

  • 定时同步:定时同步以协程的方式运行,使用定时器实现定期同步;

  • Watch部分逻辑:在for循环里;执行watch函数获取resultchan;监听resultchan中数据并处理;

以下是ListAndWatch方法的逻辑图:

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)var resourceVersion stringoptions := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}// list接口的参数,设置lastSyncResourceVersion上一同步版本if err := func() error {// 匿名函数initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})defer initTrace.LogIfLong(10 * time.Second)var list runtime.Objectvar paginatedResult boolvar err errorlistCh := make(chan struct{}, 1) // list通到panicCh := make(chan interface{}, 1) // panic错误通道go func() { // 以协程方式运行defer func() {if r := recover(); r != nil {panicCh <- r}}()// 新建pager,放入list方法作为处理函数pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {return r.listerWatcher.List(opts) // 该方法返回list结果}))switch {case r.WatchListPageSize != 0:// 设置分页大小pager.PageSize = r.WatchListPageSizecase r.paginatedResult:case options.ResourceVersion != "" && options.ResourceVersion != "0": // 资源版本已经有了pager.PageSize = 0}list, paginatedResult, err = pager.List(context.Background(), options) // list的结果放在list变量中if isExpiredError(err) || isTooLargeResourceVersionError(err) {r.setIsLastSyncResourceVersionUnavailable(true)list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) // 尝试重新获取List结果}close(listCh) // 关闭通道}()// 检查三项类似检查开关的配置,都没问题才继续select {case <-stopCh: // 是否被停止return nilcase r := <-panicCh: // 是否发生无法弥补的错误panic(r)case <-listCh:// 收到list通道的关闭信息,说明list的记过已经有了,就在list变量中}if err != nil {return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)}if options.ResourceVersion == "0" && paginatedResult {r.paginatedResult = true}r.setIsLastSyncResourceVersionUnavailable(false) // list was successfulinitTrace.Step("Objects listed")listMetaInterface, err := meta.ListAccessor(list) // 解析Listif err != nil {return fmt.Errorf("unable to understand list result %#v: %v", list, err)}resourceVersion = listMetaInterface.GetResourceVersion()// 获取资源版本initTrace.Step("Resource version extracted")items, err := meta.ExtractList(list)// 从list对象中获取对象数组if err != nil {return fmt.Errorf("unable to understand list result %#v (%v)", list, err)}initTrace.Step("Objects extracted")// 将数据塞入deltaFIFO中if err := r.syncWith(items, resourceVersion); err != nil { // 这边将list的结果items的数据放入detalFIFO中return fmt.Errorf("unable to sync list result: %v", err)}initTrace.Step("SyncWith done")r.setLastSyncResourceVersion(resourceVersion)initTrace.Step("Resource version updated")return nil}(); err != nil {return err}resyncerrc := make(chan error, 1) // 重新同步错误通道cancelCh := make(chan struct{}) // 取消通道defer close(cancelCh)go func() { // 协程,一直再跑resyncCh, cleanup := r.resyncChan() // 返回重新同步的定时通道,里面有计时器, cleanup是定时器关闭函数defer func() {cleanup() // Call the last one written into cleanup}()for {select {case <-resyncCh: // 定时器 阻塞式, 定时时间到,就跳出阻塞case <-stopCh: // 是否被停止returncase <-cancelCh: // 是否被取消return}// 下面是定时重新同步流程if r.ShouldResync == nil || r.ShouldResync() { // 判断是否应该同步klog.V(4).Infof("%s: forcing resync", r.name)// 开始同步,将indexer的数据和deltafifo进行同步if err := r.store.Resync(); err != nil { // 同步出错resyncerrc <- errreturn // 退出}}cleanup() // 当前定时器停止resyncCh, cleanup = r.resyncChan() // 重新启用定时器定时 触发 设置}}()// 开始watch 循环for {// give the stopCh a chance to stop the loop, even in case of continue statements further down on errorsselect {case <-stopCh:return nildefault:}timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) // 超时时间设定,避免夯住options = metav1.ListOptions{ // watch接口的参数ResourceVersion: resourceVersion,TimeoutSeconds: &timeoutSeconds,AllowWatchBookmarks: true,}start := r.clock.Now()w, err := r.listerWatcher.Watch(options) // 执行watch,返回结果中有resultChan,就是wif err != nil {if utilnet.IsConnectionRefused(err) { // 拒绝连接的话,需要重试time.Sleep(time.Second)continue}return err}// 调用watch长连接,从通道中获取值,要是通道关闭就退出, watch的处理函数if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {if err != errorStopRequested {switch {case isExpiredError(err): // 超时错误klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)default:klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)}}return nil}}
}

3、LastSyncResourceVersion

该函数主要获取上一次同步的资源版本

func (r *Reflector) LastSyncResourceVersion() string {r.lastSyncResourceVersionMutex.RLock()defer r.lastSyncResourceVersionMutex.RUnlock()return r.lastSyncResourceVersion
}

4、resyncChan

返回一个定时通道和清理函数,清理函数就是停止计时器。这边的定时重新同步是使用定时器实现的。

func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {if r.resyncPeriod == 0 { // 未设置重新同步周期return neverExitWatch, func() bool { return false }}t := r.clock.NewTimer(r.resyncPeriod) // 定时器return t.C(), t.Stop
}

5、syncWith

将从apiserver list的资源对象结果同步进DeltaFIFO队列中,调用队列的Replace方法实现。

func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {found := make([]interface{}, 0, len(items))for _, item := range items {found = append(found, item)}return r.store.Replace(found, resourceVersion)
}

6、watchHandler

watch的处理:接收watch的接口作为参数,watch接口对外方法是Stop和Resultchan,前者关闭结果通道,后者获取通道。

func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {eventCount := 0defer w.Stop() // 关闭watch通道loop:for {select {case <-stopCh:return errorStopRequested // 收到停止通道的case err := <-errc: // 错误通道return errcase event, ok := <-w.ResultChan(): // 从resultChan通道中获取事件if !ok { // 通道被关闭break loop // 跳出循环}if event.Type == watch.Error { // 事件类型是ERRORreturn apierrors.FromObject(event.Object)}if r.expectedType != nil { // 查看reflector是设置了期望获取的资源类型// 这是在判断期待的类型和监听到的事件类型是否一致if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))continue}}if r.expectedGVK != nil {// GVK是否一致if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))continue}}meta, err := meta.Accessor(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))continue}newResourceVersion := meta.GetResourceVersion()switch event.Type { // 根据事件类型,对delta队列进行增删改操作case watch.Added: // 创建事件err := r.store.Add(event.Object) // 将该事件放入deltalFIFOif err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))}case watch.Modified:err := r.store.Update(event.Object) // 将该事件放入deltalFIFOif err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))}case watch.Deleted:err := r.store.Delete(event.Object) // 将该事件放入deltalFIFOif err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))}case watch.Bookmark: // 意思是”表示监听已在此处同步,只需更新default:utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))}*resourceVersion = newResourceVersionr.setLastSyncResourceVersion(newResourceVersion)eventCount++}}watchDuration := r.clock.Since(start)if watchDuration < 1*time.Second && eventCount == 0 {return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)}klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)return nil
}

7、relistResourceVersion

返回上一次同步资源版本

func (r *Reflector) relistResourceVersion() string {r.lastSyncResourceVersionMutex.RLock() // 上锁defer r.lastSyncResourceVersionMutex.RUnlock()if r.isLastSyncResourceVersionUnavailable { // 上次失败,返回空return ""}if r.lastSyncResourceVersion == "" { // 上次同步为空,返回0return "0"}return r.lastSyncResourceVersion // 返回该有数值
}

总结

本篇主要讲解了reflector的源码实现,它在informer机制中的主要的功能就是使用ListAndWatch从api server中实时获取资源对象数据,然后将资源对象放入DeltaFIFO队列,相关逻辑在Run方法、ListAndWatch方法和watchHandler方法中。

client-go: Informer机制之reflector源码分析相关推荐

  1. k8s client-go源码分析 informer源码分析(3)-Reflector源码分析

    k8s client-go源码分析 informer源码分析(3)-Reflector源码分析 1.Reflector概述 Reflector从kube-apiserver中list&watc ...

  2. quartz集群调度机制调研及源码分析---转载

    quartz2.2.1集群调度机制调研及源码分析 引言 quartz集群架构 调度器实例化 调度过程 触发器的获取 触发trigger: Job执行过程: 总结: 附: 引言 quratz是目前最为成 ...

  3. android的消息处理机制(图+源码分析)——Looper,Handler,Message

    android源码中包含了大量的设计模式,除此以外,android sdk还精心为我们设计了各种helper类,对于和我一样渴望水平得到进阶的人来说,都太值得一读了.这不,前几天为了了解android ...

  4. 【转】android的消息处理机制(图+源码分析)——Looper,Handler,Message

    原文地址:http://www.cnblogs.com/codingmyworld/archive/2011/09/12/2174255.html#!comments 作为一个大三的预备程序员,我学习 ...

  5. hadoop之MapReduce框架TaskTracker端心跳机制分析(源码分析第六篇)

    1.概述 MapReduce框架中的master/slave心跳机制是整个集群运作的基础,是沟通TaskTracker和JobTracker的桥梁.TaskTracker周期性地调用心跳RPC函数,汇 ...

  6. View事件分发机制(源码分析篇)

    01.Android中事件分发顺序 1.1 事件分发的对象是谁 事件分发的对象是事件.注意,事件分发是向下传递的,也就是父到子的顺序. 当用户触摸屏幕时(View或ViewGroup派生的控件),将产 ...

  7. Handler机制源码分析

    一.Handler使用上需要注意的几点 1.1 handler使用不当造成的内存泄漏 public class MainActivity extends AppCompatActivity {priv ...

  8. MyBatis 源码分析 - 配置文件解析过程

    文章目录 * 本文速览 1.简介 2.配置文件解析过程分析 2.1 配置文件解析入口 2.2 解析 properties 配置 2.3 解析 settings 配置 2.3.1 settings 节点 ...

  9. JUC AQS ReentrantLock源码分析

    Java的内置锁一直都是备受争议的,在JDK 1.6之前,synchronized这个重量级锁其性能一直都是较为低下,虽然在1.6后,进行大量的锁优化策略,但是与Lock相比synchronized还 ...

最新文章

  1. 3项目里面全局用less变量 cli vue_VUE CLI3 less 全局变量引用
  2. C/C++-style输入输出函数
  3. matlab 常用命令
  4. 如何自定义SAP Spartacus店铺的界面颜色风格
  5. 使用Jenkins来实现内部的持续集成流程(下)
  6. 原生js实现tab栏切换效果
  7. 人工智能能够构建一个自主驱动云吗?
  8. 计算机二级只有前十套简单,计算机二级Access上机十套试题详细解析(经典版).
  9. Python实现人工神经网络逼近股票价格
  10. 存储服务器之间的传输速度与服务器内部读写速度_3000MB/s读写带来的PC体验升级,东芝RD500固态硬盘评测...
  11. 导入超大mysql数据库文件工具_用 BigDump 工具导入超大 MySQL 数据库备份文件
  12. linux oracle 强制覆盖_赤兔Oracle数据库恢复软件下载-赤兔Oracle数据库恢复软件v11.6免费版...
  13. 搜索引擎优化、常用SEO优化方法总结
  14. ARVR | AR技术发展简史(下)
  15. Ubuntu18.04系统硬盘分区方法
  16. 【QGIS插件安装】buildseg: QGIS plugin for building extraction
  17. linux常用免杀,【kali linux】详细分析两个免杀远控 了解远控和免杀原理
  18. 把一个合数分成质数的乘积
  19. 装系统中----专业名词小结
  20. 从网上搜集的成都火锅资料

热门文章

  1. 使用CNN实现手机头像解锁
  2. ABAQUS软件实训(三):Mesh模块之圆形平面网格划分技巧
  3. LoadRunner录制图片验证码
  4. 手把手教你搭建Linux时间同步服务器
  5. windows subst命令实现原理模拟3 - subst挂载目录为盘符
  6. 阿里巴巴的相关-----ODPS技术架构、Java Web架构、PAI机器学习平台
  7. 谨以此文祭奠我逝去的文件们~~~
  8. arr访问绝对地址_西门子1200PLC与汇川伺服电机的MODBUS-RTU通讯
  9. [ESXi 6.5] 设置ESXi宿主机开机自动启动虚拟机
  10. 韩国科学家研发透明RRAM存储颗粒