第四课 k8s源码学习和二次开发-DeltaFIFO和Indexer原理学习

tags:

  • k8s
  • 源码学习

categories:

  • 源码学习
  • 二次开发

文章目录

  • 第四课 k8s源码学习和二次开发-DeltaFIFO和Indexer原理学习
    • 第一节 DeltaFIFO学习
      • 1.1 Delta介绍
      • 1.2 FIFO介绍
      • 1.3 FIFO简单方法实现
      • 1.4 DeltaFIFO的实现
      • 1.5 DeltaFIFO和Reflector过程
    • 第二节 Indexer学习
      • 2.1 Indexer概念说明
      • 2.2 Indexer示例解释概念
      • 2.3 Indexer的原理-ThreadSafeMap
      • 2.4 回看cache的实现原理
      • 2.5 Index的原理总结

第一节 DeltaFIFO学习

1.1 Delta介绍

  1. Reflector 中通过 ListAndWatch 获取到数据后传入到了本地的存储中,也就是 DeltaFIFO 中
  2. 从 DeltaFIFO 的名字可以看出它是一个 FIFO,也就是一个先进先出的队列,而Delta 表示的是变化的资源对象存储,包含操作资源对象的类型和数据,Reflector 就是这个队列的生产者
  3. 了解Delta在client-go 中是如何定义的,Delta 的数据结构定义位于staging/src/k8s.io/client-go/tools/cache/delta_fifo.go文件中。
// k8s.io/client-go/tools/cache/delta_fifo.go// DeltaType 是变化的类型(添加、删除等)
type DeltaType string// 变化的类型定义
const (Added   DeltaType = "Added"     // 增加Updated DeltaType = "Updated"   // 更新Deleted DeltaType = "Deleted"   // 删除// 当遇到 watch 错误,不得不进行重新list时,就会触发 Replaced。// 我们不知道被替换的对象是否发生了变化。//// 注意:以前版本的 DeltaFIFO 也会对 Replace 事件使用 Sync。// 所以只有当选项 EmitDeltaTypeReplaced 为真时才会触发 Replaced。Replaced DeltaType = "Replaced"// Sync 是针对周期性重新同步期间的合成事件Sync DeltaType = "Sync"          // 同步
)// Delta 是 DeltaFIFO 存储的类型。
// 它告诉你发生了什么变化,以及变化后对象的状态。
// [*] 除非变化是删除操作,否则你将得到对象被删除前的最终状态。
type Delta struct {Type   DeltaTypeObject interface{}
}
  1. Delta 其实就是 Kubernetes 系统中带有变化类型的资源对象,如下图所示:
  2. 比如我们现在添加了一个 Pod,那么这个 Delta 就是带有 Added 这个类型的 Pod,如果是删除了一个 Deployment,那么这个 Delta 就是带有 Deleted 类型的 Deployment,为什么要带上类型object, 因为我们需要根据不同的类型去执行不同的操作,增加、更新、删除的动作显然是不一样的。

1.2 FIFO介绍

  1. FIFO 很好理解,就是一个先进先出的队列,Reflector 是其生产者,其数据结构定义位于 staging/src/k8s.io/client-go/tools/cache/fifo.go 文件中。
  2. FIFO 数据结构中定义了** items 和 queue 两个属性**来保存队列中的数据。
    • 其中 queue 中存的是资源对象的 key 列表,
    • 而 items 是一个 map 类型,其 key 就是 queue 中保存的 key,value 值是真正的资源对象数据。
// k8s.io/client-go/tools/cache/fifo.go
type FIFO struct {lock sync.RWMutexcond sync.Cond// items 中的每一个 key 也在 queue 中items map[string]interface{}queue []string// 如果第一批 items 被 Replace() 插入或者先调用了 Deleta/Add/Update// 则 populated 为 true。populated bool// 第一次调用 Replace() 时插入的 items 数initialPopulationCount int// keyFunc 用于生成排队的 item 插入和检索的 key。keyFunc KeyFunc// 标识队列已关闭,以便在队列清空时控制循环可以退出。closed     boolclosedLock sync.Mutex
}var (_ = Queue(&FIFO{}) // FIFO 是一个 Queue
)
  1. 既然是先进先出的队列,那么就要具有队列的基本功能,结构体下面其实就有一个类型断言,表示当前的 FIFO 实现了 Queue 这个接口,所以 FIFO 要实现的功能都是在 Queue 中定义的,Queue 接口和 FIFO 位于同一文件中:
// k8s.io/client-go/tools/cache/fifo.go// Queue 扩展了 Store  // with a collection of Store keys to "process".
// 每一次添加、更新或删除都可以将对象的key放入到该集合中。
// Queue 具有使用给定的 accumulator 来推导出相应的 key 的方法
// Queue 可以从多个 goroutine 中并发访问
// Queue 可以被关闭,之后 Pop 操作会返回一个错误
type Queue interface {Store// Pop 一直阻塞,直到至少有一个key要处理或队列被关闭,队列被关闭会返回一个错误。// 在前面的情况下 Pop 原子性地选择一个 key 进行处理,从 Store 中删除关联(key、accumulator)的数据,// 并处理 accumulator。Pop 会返回被处理的 accumulator 和处理的结果。// PopProcessFunc 函数可以返回一个 ErrRequeue{inner},在这种情况下,Pop 将//(a)把那个(key,accumulator)关联作为原子处理的一部分返回到 Queue 中// (b) 从 Pop 返回内部错误。Pop(PopProcessFunc) (interface{}, error)// 仅当该 key 尚未与一个非空的 accumulator 相关联的时候,AddIfNotPresent 将给定的 accumulator 放入 Queue(与 accumulator 的 key 相关联的)AddIfNotPresent(interface{}) error// 如果第一批 keys 都已经 Popped,则 HasSynced 返回 true。// 如果在添加、更新、删除之前发生了第一次 Replace 操作,则第一批 keys 为 true// 否则为空。HasSynced() bool// 关闭该队列Close()
}
  1. 从上面的定义中可以看出 Queue 这个接口扩展了 Store 这个接口,这个就是前面我们说的本地存储,队列实际上也是一种存储,然后在 Store 的基础上增加 Pop、AddIfNotPresent、HasSynced、Close 4个函数就变成了 Queue 队列了,所以我们优先来看下 Store 这个接口的定义,该数据结构定义位于文件 k8s.io/client-go/tools/cache/store.go 中:
// k8s.io/client-go/tools/cache/store.go// Store 是一个通用的对象存储和处理的接口。
// Store 包含一个从字符串 keys 到 accumulators 的映射,并具有 to/from 当前
// 给定 key 关联的 accumulators 添加、更新和删除给定对象的操作。
// 一个 Store 还知道如何从给定的对象中获取 key,所以很多操作只提供对象。
//
// 在最简单的 Store 实现中,每个 accumulator 只是最后指定的对象,或者删除后为空,
// 所以 Store 只是简单的存储。
//
// Reflector 反射器知道如何 watch 一个服务并更新一个 Store 存储,这个包提供了 Store 的各种实现。
type Store interface {// Add 将指定对象添加到与指定对象的 key 相关的 accumulator(累加器)中。Add(obj interface{}) error// Update 与指定对象的 key 相关的 accumulator 中更新指定的对象Update(obj interface{}) error// Delete 根据指定的对象 key 删除指定的对象Delete(obj interface{}) error// List 返回当前所有非空的 accumulators 的列表List() []interface{}// ListKeys 返回当前与非空 accumulators 关联的所有 key 的列表ListKeys() []string// Get 根据指定的对象获取关联的 accumulatorGet(obj interface{}) (item interface{}, exists bool, err error)// GetByKey 根据指定的对象 key 获取关联的 accumulatorGetByKey(key string) (item interface{}, exists bool, err error)// Replace 会删除原来Store中的内容,并将新增的list的内容存入Store中,即完全替换数据// Store 拥有 list 列表的所有权,在调用此函数后,不应该引用它了。Replace([]interface{}, string) error// Resync 在 Store 中没有意义,但是在 DeltaFIFO 中有意义。Resync() error
}// KeyFunc 就是从一个对象中生成一个唯一的 Key 的函数,上面的 FIFO 中就有用到
type KeyFunc func(obj interface{}) (string, error)// MetaNamespaceKeyFunc 是默认的 KeyFunc,生成的 key 格式为:
// <namespace>/<name>
// 如果是全局的,则namespace为空,那么生成的 key 就是 <name>
// 当然要从 key 拆分出 namespace 和 name 也非常简单
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {if key, ok := obj.(ExplicitKey); ok {return string(key), nil}meta, err := meta.Accessor(obj)if err != nil {return "", fmt.Errorf("object has no meta: %v", err)}if len(meta.GetNamespace()) > 0 {return meta.GetNamespace() + "/" + meta.GetName(), nil}return meta.GetName(), nil
}
  1. **Store 就是一个通用的对象存储和处理的接口,可以用来写入对象和获取对象。**其中 cache 数据结构就实现了上面的 Store 接口,但是这个属于后面的 Indexer 部分的知识点,这里我们就不展开说明了。
  2. 我们说 Queue 扩展了 Store 接口,所以 Queue 本身也是一个存储,只是在存储的基础上增加了 Pop 这样的函数来实现弹出对象,是不是就变成了一个队列了。
  3. FIFO 就是一个具体的 Queue 实现,按照顺序弹出对象是不是就是一个先进先出的队列了?如下图所示:

1.3 FIFO简单方法实现

  1. FIFO 是如何实现存储和 Pop 的功能的。首先是实现 Store 存储中最基本的方法,第一个就是添加对象:
// k8s.io/client-go/tools/cache/fifo.go
// Add 插入一个对象,将其放入队列中,只有当元素不在集合中时才会插入队列。
func (f *FIFO) Add(obj interface{}) error {// 获取对象的 keyid, err := f.keyFunc(obj)if err != nil {return KeyError{obj, err}}f.lock.Lock()defer f.lock.Unlock()f.populated = true// 元素不在队列中的时候才插入队列if _, exists := f.items[id]; !exists {f.queue = append(f.queue, id)}// items 是一个 map,所以直接赋值给这个 key,这样对更新元素也同样适用f.items[id] = objf.cond.Broadcast()return nil
}
  1. 更新对象,实现非常简单,因为上面的 Add 方法就包含了 Update 的实现,因为 items 属性是一个 Map,对象有更新直接将对应 key 的 value 值替换成新的对象即可:
// k8s.io/client-go/tools/cache/fifo.go// Update 和 Add 相同的实现
func (f *FIFO) Update(obj interface{}) error {return f.Add(obj)
}
  1. 接着就是删除 Delete 方法的实现,这里可能大家会有一个疑问,下面的删除实现只删除了 items 中的元素,那这样岂不是 queue 和 items 中的 key 会不一致。的确会这样,但是这是一个队列,下面的 Pop() 函数会根据 queue 里面的元素一个一个的弹出 key,没有对象就不处理了,相当于下面的 Pop() 函数中实现了 queue 的 key 的删除 :
// k8s.io/client-go/tools/cache/fifo.go
// Delete 从队列中移除一个对象。
// 不会添加到 queue 中去,这个实现是假设消费者只关心对象
// 不关心它们被创建或添加的顺序。
func (f *FIFO) Delete(obj interface{}) error {// 获取对象的 keyid, err := f.keyFunc(obj)if err != nil {return KeyError{obj, err}}f.lock.Lock()defer f.lock.Unlock()f.populated = true// 删除 items 中 key 为 id 的元素,就是删除队列中的对象delete(f.items, id)//?为什么不直接处理 queue 这个 slice 呢?return err
}
  1. 然后是获取队列中所有对象的 List 方法的实现:
// k8s.io/client-go/tools/cache/fifo.go// List 获取队列中的所有对象
func (f *FIFO) List() []interface{} {f.lock.RLock()defer f.lock.RUnlock()list := make([]interface{}, 0, len(f.items))// 获取所有的items的values值(items是一个Map)for _, item := range f.items {list = append(list, item)}return list
}
  1. 然后是一个 Replace 替换函数的实现:
// k8s.io/client-go/tools/cache/fifo.go
// Replace 将删除队列中的内容,'f' 拥有 map 的所有权,调用该函数过后,不应该再引用 map。
// 'f' 的队列也会被重置,返回时,队列将包含 map 中的元素,没有特定的顺序。
func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {// 从 list 中提取出 key 然后和里面的元素重新进行映射items := make(map[string]interface{}, len(list))for _, item := range list {key, err := f.keyFunc(item)if err != nil {return KeyError{item, err}}items[key] = item}f.lock.Lock()defer f.lock.Unlock()if !f.populated {f.populated = truef.initialPopulationCount = len(items)}// 重新设置 items 和 queue 的值f.items = itemsf.queue = f.queue[:0]for id := range items {f.queue = append(f.queue, id)}if len(f.queue) > 0 {f.cond.Broadcast()}return nil
}

1.4 DeltaFIFO的实现

  1. 上面了解 了FIFO,下面看下 DeltaFIFO 是如何实现的,DeltaFIFO 和 FIFO 一样也是一个队列,但是也有不同的地方,里面的元素是一个 Delta的数组,Delta 上面我们已经提到表示的是带有变化类型的资源对象。
  2. DeltaFIFO 的数据结构定义位于 staging/src/k8s.io/client-go/tools/cache/delta_fifo.go 文件中:
// k8s.io/client-go/tools/cache/delta_fifo.gotype DeltaFIFO struct {// lock/cond 保护访问的 items 和 queuelock sync.RWMutexcond sync.Cond// 用来存储 Delta 数据 -> 对象key: Delta数组items map[string]Deltas// 用来存储资源对象的keyqueue []string// 通过 Replace() 接口将第一批对象放入队列,或者第一次调用增、删、改接口时标记为truepopulated bool// 通过 Replace() 接口(全量)将第一批对象放入队列的对象数量initialPopulationCount int// 对象键的计算函数keyFunc KeyFunc// knownObjects 列出 "known" 的键 -- 影响到 Delete(),Replace() 和 Resync()// knownObjects 其实就是 Indexer,里面存有已知全部的对象knownObjects KeyListerGetter// 标记 queue 被关闭了closed     boolclosedLock sync.Mutex// emitDeltaTypeReplaced 当 Replace() 被调用的时候,是否要 emit Replaced 或者 Sync// DeltaType(保留向后兼容)。emitDeltaTypeReplaced bool
}// KeyListerGetter 任何知道如何列出键和按键获取对象的东西
type KeyListerGetter interface {KeyListerKeyGetter
}// 获取所有的键
type KeyLister interface {ListKeys() []string
}// 根据键获取对象
type KeyGetter interface {GetByKey(key string) (interface{}, bool, error)
}
  1. DeltaFIFO 与 FIFO 一样都是一个 Queue,所以他们都实现了 Queue,所以我们这里来看下 DeltaFIFO 是如何实现 Queue 功能的,当然和 FIFO 一样都是实现 Queue 接口里面的所有方法。
  2. 虽然实现流程和 FIFO 是一样的,但是具体的实现是不一样的,比如 DeltaFIFO 的对象键计算函数就不同:
// k8s.io/client-go/tools/cache/delta_fifo.go// DeltaFIFO 的对象键计算函数
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {// 用 Deltas 做一次转换,判断是否是 Delta 切片if d, ok := obj.(Deltas); ok {if len(d) == 0 {return "", KeyError{obj, ErrZeroLengthDeltasObject}}// 使用最新版本的对象进行计算obj = d.Newest().Object}if d, ok := obj.(DeletedFinalStateUnknown); ok {return d.Key, nil}// 具体计算还是要看初始化 DeltaFIFO 传入的 KeyFunc 函数return f.keyFunc(obj)
}// Newest 返回最新的 Delta,如果没有则返回 nil。
func (d Deltas) Newest() *Delta {if n := len(d); n > 0 {return &d[n-1]}return nil
}
  1. DeltaFIFO 的计算对象键的函数为什么要先做一次 Deltas 的类型转换呢?那是因为 Pop() 出去的对象很可能还要再添加进来(比如处理失败需要再放进来),此时添加的对象就是已经封装好的 Deltas 对象了。
  2. 然后同样按照上面的方式来分析 DeltaFIFO 的实现,首先查看 Store 存储部分的实现,也就是增、删、改、查功能。
  3. 同样的 Add、Update 和 Delete 的实现方法基本上是一致的:
// k8s.io/client-go/tools/cache/delta_fifo.go// Add 插入一个元素放入到队列中
func (f *DeltaFIFO) Add(obj interface{}) error {f.lock.Lock()defer f.lock.Unlock()f.populated = true  // 队列第一次写入操作都要设置标记return f.queueActionLocked(Added, obj)
}// Update 和 Add 一样,只是是 Updated 一个 Delta
func (f *DeltaFIFO) Update(obj interface{}) error {f.lock.Lock()defer f.lock.Unlock()f.populated = true  // 队列第一次写入操作都要设置标记return f.queueActionLocked(Updated, obj)
}// 删除和添加一样,但会产生一个删除的 Delta。如果给定的对象还不存在,它将被忽略。
// 例如,它可能已经被替换(重新list)删除了。
// 在这个方法中,`f.knownObjects` 如果不为nil,则提供(通过GetByKey)被认为已经存在的 _additional_ 对象。
func (f *DeltaFIFO) Delete(obj interface{}) error {id, err := f.KeyOf(obj)if err != nil {return KeyError{obj, err}}f.lock.Lock()defer f.lock.Unlock()// 队列第一次写入操作都要设置这个标记f.populated = true// 相当于没有 Indexer 的时候,就通过自己的存储对象检查下if f.knownObjects == nil {if _, exists := f.items[id]; !exists {// 自己的存储里面都没有,那也就不用处理了return nil}} else // 相当于 Indexer 里面和自己的存储里面都没有这个对象,那么也就相当于不存在了,就不处理了。_, exists, err := f.knownObjects.GetByKey(id)_, itemsExist := f.items[id]if err == nil && !exists && !itemsExist {return nil}}// 同样调用 queueActionLocked 将数据放入队列return f.queueActionLocked(Deleted, obj)
}
  1. 可以看出 Add 、Update、Delete 方法最终都是调用的 queueActionLocked 函数来实现:
// k8s.io/client-go/tools/cache/delta_fifo.go
// queueActionLocked 追加到对象的 delta 列表中。
// 调用者必须先 lock。
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {id, err := f.KeyOf(obj)  // 获取对象键if err != nil {return KeyError{obj, err}}// 将 actionType 和资源对象 obj 构造成 Delta,添加到 items 中newDeltas := append(f.items[id], Delta{actionType, obj})// 去重newDeltas = dedupDeltas(newDeltas)if len(newDeltas) > 0 {// 新对象的 key 不在队列中则插入 queue 队列if _, exists := f.items[id]; !exists {f.queue = append(f.queue, id)}// 重新更新 itemsf.items[id] = newDeltas// 通知所有的消费者解除阻塞f.cond.Broadcast()} else {// 这种情况不会发生,因为给定一个非空列表时,dedupDeltas 永远不会返回一个空列表。// 但如果真的返回了一个空列表,那么我们就需要从 map 中删除这个元素。delete(f.items, id)}return nil
}// ==============排重==============
// 重新list和watch可以以任何顺序多次提供相同的更新。
// 如果最近的两个 Delta 相同,则将它们合并。
func dedupDeltas(deltas Deltas) Deltas {n := len(deltas)  if n < 2 {  // 小于两个 delta 没必要合并了return deltas}// Deltas是[]Delta,新的对象是追加到Slice后面// 所以取最后两个元素来判断是否相同a := &deltas[n-1]b := &deltas[n-2]// 执行去重操作if out := isDup(a, b); out != nil {// 将去重保留下来的delta追加到前面n-2个delta中去d := append(Deltas{}, deltas[:n-2]...)return append(d, *out)}return deltas
}// 判断两个 Delta 是否是重复的
func isDup(a, b *Delta) *Delta {// 这个函数应该应该可以判断多种类型的重复,目前只有删除这一种能够合并if out := isDeletionDup(a, b); out != nil {return out}return nil
}// 判断是否为删除类型的重复
func isDeletionDup(a, b *Delta) *Delta {// 二者类型都是删除那肯定有一个是重复的,则返回一个即可if b.Type != Deleted || a.Type != Deleted {return nil}// 更复杂的检查还是这样就够了?if _, ok := b.Object.(DeletedFinalStateUnknown); ok {return a}return b
}
  1. 因为系统对于删除的对象有 DeletedFinalStateUnknown 这个状态,所以会存在两次删除的情况,但是两次添加同一个对象由于 APIServer 可以保证对象的唯一性,所以这里没有考虑合并两次添加操作的情况。然后看看其他几个主要方法的实现:
// k8s.io/client-go/tools/cache/delta_fifo.go// 列举接口实现
func (f *DeltaFIFO) List() []interface{} {f.lock.RLock()defer f.lock.RUnlock()return f.listLocked()
}
// 真正的列举实现
func (f *DeltaFIFO) listLocked() []interface{} {list := make([]interface{}, 0, len(f.items))for _, item := range f.items {list = append(list, item.Newest().Object)}return list
}// 返回现在 FIFO 中所有的对象键。
func (f *DeltaFIFO) ListKeys() []string {f.lock.RLock()defer f.lock.RUnlock()list := make([]string, 0, len(f.items))for key := range f.items {list = append(list, key)}return list
}// 根据对象获取FIFO中对应的元素
func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {key, err := f.KeyOf(obj)if err != nil {return nil, false, KeyError{obj, err}}return f.GetByKey(key)
}// 通过对象键获取FIFO中的元素(获取到的是 Delta 数组)
func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {f.lock.RLock()defer f.lock.RUnlock()d, exists := f.items[key]if exists {// 复制元素的slice,这样对这个切片的操作就不会影响返回的对象了。d = copyDeltas(d)}return d, exists, nil
}// copyDeltas 返回 d 的浅拷贝,也就是说它拷贝的是切片,而不是切片中的对象。
// Get/List 可以返回一个不会被后续修改影响的对象。
func copyDeltas(d Deltas) Deltas {d2 := make(Deltas, len(d))copy(d2, d)return d2
}// 判断队列是否关闭了
func (f *DeltaFIFO) IsClosed() bool {f.closedLock.Lock()defer f.closedLock.Unlock()return f.closed
}
  1. 接下来我们来看看 Replace 函数的时候,这个也是 Store 里面的定义的接口:
// k8s.io/client-go/tools/cache/delta_fifo.gofunc (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {f.lock.Lock()defer f.lock.Unlock()keys := make(sets.String, len(list))// keep 对老客户端的向后兼容action := Syncif f.emitDeltaTypeReplaced {action = Replaced}// 遍历 listfor _, item := range list {// 计算对象键key, err := f.KeyOf(item)if err != nil {return KeyError{item, err}}// 记录处理过的对象键,使用 set 集合存储keys.Insert(key)// 重新同步一次对象if err := f.queueActionLocked(action, item); err != nil {return fmt.Errorf("couldn't enqueue object: %v", err)}}// 如果没有 Indexer 存储的话,自己存储的就是所有的老对象// 目的要看看那些老对象不在全量集合中,那么就是删除的对象了if f.knownObjects == nil {// 针对自己的列表进行删除检测。queuedDeletions := 0// 遍历所有元素for k, oldItem := range f.items {// 如果元素在输入的对象中存在就忽略了。if keys.Has(k) {continue}// 到这里证明当前的 oldItem 元素不在输入的列表中,证明对象已经被删除了var deletedObj interface{}if n := oldItem.Newest(); n != nil {deletedObj = n.Object}queuedDeletions++// 因为可能队列中已经存在 Deleted 类型的元素了,避免重复,所以采用 DeletedFinalStateUnknown 来包装下对象if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {return err}}// 如果 populated 没有设置,说明是第一次并且还没有任何修改操作执行过if !f.populated {// 这个时候需要标记下f.populated = true// 记录第一次设置的对象数量f.initialPopulationCount = len(list) + queuedDeletions}return nil}// 检测已经删除但是没有在队列中的元素。// 从 Indexer 中获取所有的对象键knownKeys := f.knownObjects.ListKeys()queuedDeletions := 0for _, k := range knownKeys {// 对象存在就忽略if keys.Has(k) {continue}// 到这里同样证明当前的对象键对应的对象被删除了// 获取被删除的对象键对应的对象deletedObj, exists, err := f.knownObjects.GetByKey(k)if err != nil {deletedObj = nilklog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)} else if !exists {deletedObj = nilklog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)}// 累加删除的对象数量queuedDeletions++// 把对象删除的 Delta 放入队列,和上面一样避免重复,使用 DeletedFinalStateUnknown 包装下对象if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {return err}}// 和上面一致if !f.populated {f.populated = truef.initialPopulationCount = len(list) + queuedDeletions}return nil
}
  1. Replace() 主要用于实现对象的全量更新,由于 DeltaFIFO 对外输出的就是所有目标的增量变化,所以每次全量更新都要判断对象是否已经删除,因为在全量更新前可能没有收到目标删除的请求。这一点与 cache 不同,cache 的Replace() 相当于重建,因为 cache 就是对象全量的一种内存映射,所以Replace() 就等于重建。接下来就是实现 DeltaFIFO 特性的 Pop 函数的实现了:
// k8s.io/client-go/tools/cache/delta_fifo.gofunc (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {f.lock.Lock()defer f.lock.Unlock()for {// 队列中是否有数据for len(f.queue) == 0 {// 如果队列关闭了这直接返回错误if f.IsClosed() {return nil, ErrFIFOClosed}// 没有数据就一直等待f.cond.Wait()}// 取出第一个对象键id := f.queue[0]// 更新下queue,相当于把第一个元素弹出去了f.queue = f.queue[1:]// 对象计数减一,当减到0就说明外部已经全部同步完毕了if f.initialPopulationCount > 0 {f.initialPopulationCount--}// 取出真正的对象,queue里面是对象键item, ok := f.items[id]if !ok {// Item 可能后来被删除了。continue}// 删除对象delete(f.items, id)// 调用处理对象的函数err := process(item)// 如果处理出错,那就重新入队列if e, ok := err.(ErrRequeue); ok {f.addIfNotPresent(id, item)err = e.Err}// 这里不需要 copyDeltas,因为我们要把所有权转移给调用者。return item, err}
}
  1. 然后再简单看下其他几个函数的实现:
// k8s.io/client-go/tools/cache/delta_fifo.go// AddIfNotPresent 插入不存在的对象到队列中
func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {// 放入的必须是 Delta 数组,就是通过 Pop 弹出的对象deltas, ok := obj.(Deltas)if !ok {return fmt.Errorf("object must be of type deltas, but got: %#v", obj)}// 多个 Delta 都是同一个对象,所以用最新的来获取对象键即可id, err := f.KeyOf(deltas.Newest().Object)if err != nil {return KeyError{obj, err}}f.lock.Lock()defer f.lock.Unlock()// 调用真正的插入实现f.addIfNotPresent(id, deltas)return nil
}// 插入对象的真正实现
func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {f.populated = true// 如果对象已经存在了,则忽略if _, exists := f.items[id]; exists {return}// 不在队列中,则插入队列f.queue = append(f.queue, id)f.items[id] = deltas// 通知消费者解除阻塞f.cond.Broadcast()
}// Resync 重新同步,带有 Sync 类型的 Delta 对象。
func (f *DeltaFIFO) Resync() error {f.lock.Lock()defer f.lock.Unlock()// Indexer 为空,重新同步无意义if f.knownObjects == nil {return nil}// 获取 Indexer 中所有的对象键keys := f.knownObjects.ListKeys()// 循环对象键,为每个对象产生一个同步的 Deltafor _, k := range keys {if err := f.syncKeyLocked(k); err != nil {return err}}return nil
}
// 对象同步接口的真正实现
func (f *DeltaFIFO) syncKeyLocked(key string) error {// 获取 Indexer 中的对象obj, exists, err := f.knownObjects.GetByKey(key)if err != nil {klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)return nil} else if !exists {klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)return nil}// 计算对象的键值,对象键不是已经传入了么?// 其实传入的是存在 Indexer 里面的对象键,可能与这里的计算方式不同id, err := f.KeyOf(obj)if err != nil {return KeyError{obj, err}}// 对象已经在存在,说明后续会通知对象的新变化,所以再加更新也没意义if len(f.items[id]) > 0 {return nil}// 添加对象同步的这个 Deltaif err := f.queueActionLocked(Sync, obj); err != nil {return fmt.Errorf("couldn't queue object: %v", err)}return nil
}// HasSynced 如果 Add/Update/Delete/AddIfNotPresent 第一次被调用则会返回 true。
// 或者通过 Replace 插入的元素都已经 Pop 完成了,则也会返回 true。
func (f *DeltaFIFO) HasSynced() bool {f.lock.Lock()defer f.lock.Unlock()// 同步就是全量内容已经进入 Indexer,Indexer 已经是系统中对象的全量快照了// 相当于就是全量对象从队列中全部弹出进入 Indexer,证明已经同步完成了return f.populated && f.initialPopulationCount == 0
}// 关闭队列
func (f *DeltaFIFO) Close() {f.closedLock.Lock()defer f.closedLock.Unlock()f.closed = truef.cond.Broadcast()
}
  1. 这里是否已同步是根据 populatedinitialPopulationCount 这两个变量来判断的,是否同步指的是第一次从 APIServer 中获取全量的对象是否全部 Pop 完成,全局同步到了缓存中,也就是 Indexer 中去了,因为 Pop 一次 initialPopulationCount 就会减1,当为0的时候就表示 Pop 完成了。

1.5 DeltaFIFO和Reflector过程

  1. 上面说了DeltaFIFO的实现,然后加上前面的 Reflector 反射器,就可以结合起来了:
  2. Reflector 通过ListAndWatch首先获取全量的资源对象数据,然后调用DeltaFIFO 的 Replace() 方法全量插入队列,然后后续通过 Watch 操作根据资源对象的操作类型调用 DeltaFIFO 的 Add、Update、Delete 方法,将数据更新到队列中。我们可以用下图来总结这两个组件之间的关系:
  3. 至于 Pop 出来的元素如何处理,就要看 Pop 的回调函数 PopProcessFunc 了。我们可以回到最初的 SharedInformer 中,在 sharedIndexInformer 的 Run 函数中就初始化了 DeltaFIFO,也配置了用于 Pop 回调处理的函数:
// k8s.io/client-go/tools/cache/shared_informer.gofunc (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()// 初始化 DeltaFIFO,这里就可以看出来 KnownObjects 就是一个 Indexerfifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KnownObjects:          s.indexer,EmitDeltaTypeReplaced: true,})cfg := &Config{Queue:            fifo,ListerWatcher:    s.listerWatcher,ObjectType:       s.objectType,FullResyncPeriod: s.resyncCheckPeriod,RetryOnError:     false,ShouldResync:     s.processor.shouldResync,Process: s.HandleDeltas,  // 指定 Pop 函数的回调处理函数}......
}// 真正的 Pop 回调处理函数
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {s.blockDeltas.Lock()defer s.blockDeltas.Unlock()// from oldest to newestfor _, d := range obj.(Deltas) {switch d.Type {case Sync, Replaced, Added, Updated:s.cacheMutationDetector.AddObject(d.Object)if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {......} else {// 将对象添加到 Indexer 中if err := s.indexer.Add(d.Object); err != nil {return err}......}case Deleted:// 删除 Indexer 中的对象if err := s.indexer.Delete(d.Object); err != nil {return err}......}}return nil
}
  1. 从上面可以看出DeltaFIFO中的元素被弹出来后被同步到了 Indexer 存储中,而在 DeltaFIFO 中的 KnownObjects 也就是这个指定的 Indexer,所以接下来我们就需要重点分析 Indexer 组件的实现了。

第二节 Indexer学习

2.1 Indexer概念说明

  1. 我们知道 DeltaFIFO 中的元素通过 Pop 函数弹出后,在指定的回调函数中将元素添加到了 Indexer 中。
  2. Indexer 是什么?字面意思是索引器,它就是 Informer 中的 LocalStore 部分,我们可以和数据库进行类比,数据库是建立在存储之上的,索引也是构建在存储之上,只是和数据做了一个映射,使得按照某些条件查询速度会非常快,所以说Indexer 本身也是一个存储,只是它在存储的基础上扩展了索引功能。从 Indexer 接口的定义可以证明这一点:
// k8s.io/client-go/tools/cache/indexer.go
// Indexer 使用多个索引扩展了 Store,并限制了每个累加器只能容纳当前对象
// 这里有3种字符串需要说明:
// 1. 一个存储键,在 Store 接口中定义(其实就是对象键)
// 2. 一个索引的名称(相当于索引分类名称)
// 3. 索引键,由 IndexFunc 生成,可以是一个字段值或从对象中计算出来的任何字符串
type Indexer interface {Store  // 继承了 Store 存储接口,所以说 Indexer 也是存储// indexName 是索引类名称,obj 是对象,计算 obj 在 indexName 索引类中的索引键,然后通过索引键把所有的对象取出来// 获取 obj 对象在索引类中的索引键相匹配的对象Index(indexName string, obj interface{}) ([]interface{}, error)// indexKey 是 indexName 索引分类中的一个索引键// 函数返回 indexKey 指定的所有对象键 IndexKeys returns the storage keys of the stored objects whose// set of indexed values for the named index includes the given// indexed valueIndexKeys(indexName, indexedValue string) ([]string, error)// ListIndexFuncValues returns all the indexed values of the given indexListIndexFuncValues(indexName string) []string// ByIndex returns the stored objects whose set of indexed values// for the named index includes the given indexed valueByIndex(indexName, indexedValue string) ([]interface{}, error)// GetIndexer return the indexersGetIndexers() Indexers// 添加更多的索引在存储中AddIndexers(newIndexers Indexers) error
}
  1. 在去查看 Indexer 的接口具体实现之前,我们需要了解 Indexer 中几个非常重要的概念:Indices、Index、Indexers 及 IndexFunc
// k8s.io/client-go/tools/cache/indexer.go
// 用于计算一个对象的索引键集合
type IndexFunc func(obj interface{}) ([]string, error)// 索引键与对象键集合的映射
type Index map[string]sets.String// 索引器名称与 IndexFunc 的映射,相当于存储索引的各种分类
type Indexers map[string]IndexFunc// 索引器名称与 Index 索引的映射
type Indices map[string]Index


4. 下面示例的索引数据如下所示:

// Indexers 就是包含的所有索引器(分类)以及对应实现
Indexers: {  "namespace": NamespaceIndexFunc,"nodeName": NodeNameIndexFunc,
}
// Indices 就是包含的所有索引分类中所有的索引数据
Indices: {"namespace": {  //namespace 这个索引分类下的所有索引数据"default": ["pod-1", "pod-2"],  // Index 就是一个索引键下所有的对象键列表"kube-system": ["pod-3"]   // Index},"nodeName": {  //nodeName 这个索引分类下的所有索引数据(对象键列表)"node1": ["pod-1"],  // Index"node2": ["pod-2", "pod-3"]  // Index}
}

2.2 Indexer示例解释概念

  1. 这4个数据结构的命名非常容易让大家混淆,直接查看源码也不是那么容易的。这里我们来仔细解释下。首先什么叫索引,索引就是为了快速查找的,比如我们需要查找某个节点上的所有 Pod,那就让 Pod 按照节点名称排序列举出来,对应的就是 Index 这个类型,具体的就是 map[node]sets.pod,但是如何去查找可以有多种方式,就是上面的 Indexers 这个类型的作用。我们可以用一个比较具体的示例来解释他们的关系和含义,如下所示:
package mainimport ("fmt"v1 "k8s.io/api/core/v1""k8s.io/apimachinery/pkg/api/meta"metav1 "k8s.io/apimachinery/pkg/apis/meta/v1""k8s.io/client-go/tools/cache"
)const (NamespaceIndexName = "namespace"NodeNameIndexName  = "nodeName"
)func NamespaceIndexFunc(obj interface{}) ([]string, error) {m, err := meta.Accessor(obj)if err != nil {return []string{""}, fmt.Errorf("object has no meta: %v", err)}return []string{m.GetNamespace()}, nil
}func NodeNameIndexFunc(obj interface{}) ([]string, error) {pod, ok := obj.(*v1.Pod)if !ok {return []string{}, nil}return []string{pod.Spec.NodeName}, nil
}func main() {index := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{NamespaceIndexName: NamespaceIndexFunc,NodeNameIndexName:  NodeNameIndexFunc,})pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name:      "index-pod-1",Namespace: "default",},Spec: v1.PodSpec{NodeName: "node1"},}pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name:      "index-pod-2",Namespace: "default",},Spec: v1.PodSpec{NodeName: "node2"},}pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name:      "index-pod-3",Namespace: "kube-system",},Spec: v1.PodSpec{NodeName: "node2"},}_ = index.Add(pod1)_ = index.Add(pod2)_ = index.Add(pod3)// ByIndex 两个参数:IndexName(索引器名称)和 indexKey(需要检索的key)pods, err := index.ByIndex(NamespaceIndexName, "default")if err != nil {panic(err)}for _, pod := range pods {fmt.Println(pod.(*v1.Pod).Name)}fmt.Println("==========================")pods, err = index.ByIndex(NodeNameIndexName, "node2")if err != nil {panic(err)}for _, pod := range pods {fmt.Println(pod.(*v1.Pod).Name)}}// 输出结果为:
index-pod-1
index-pod-2
==========================
index-pod-2
index-pod-3
  1. 在上面的示例中首先通过 NewIndexer 函数实例化 Indexer 对象,

  2. 第一个参数就是用于计算资源对象键的函数,这里我们使用的是 MetaNamespaceKeyFunc 这个默认的对象键函数;

  3. 第二个参数是 Indexers,也就是存储索引器,上面我们知道 Indexers 的定义为 map[string]IndexFunc,为什么要定义成一个 map 呢?我们可以类比数据库中,我们要查询某项数据,索引的方式是不是多种多样啊?为了扩展,Kubernetes 中就使用一个 map 来存储各种各样的存储索引器,至于存储索引器如何生成,就使用一个 IndexFunc 暴露出去,给使用者自己实现即可

  4. 我们定义的了两个索引键生成函数: NamespaceIndexFuncNodeNameIndexFunc,一个根据资源对象的命名空间来进行索引,一个根据资源对象所在的节点进行索引。然后定义了3个 Pod,前两个在 default 命名空间下面,另外一个在 kube-system 命名空间下面,然后通过 index.Add 函数添加这3个 Pod 资源对象。然后通过 index.ByIndex 函数查询在名为 namespace 的索引器下面匹配索引键为 default 的 Pod 列表。也就是查询 default 这个命名空间下面的所有 Pod,这里就是前两个定义的 Pod。

  5. 对上面的示例如果我们理解了,那么就很容易理解上面定义的4个数据结构了:

    • IndexFunc:索引器函数,用于计算一个资源对象的索引值列表,上面示例是指定命名空间为索引值结果,当然我们也可以根据需求定义其他的,比如根据 Label 标签、Annotation 等属性来生成索引值列表。
    • Index:存储数据,对于上面的示例,我们要查找某个命名空间下面的 Pod,那就要让 Pod 按照其命名空间进行索引,对应的 Index 类型就是 map[namespace]sets.pod
    • Indexers:存储索引器,key 为索引器名称,value 为索引器的实现函数,上面的示例就是 map["namespace"]MetaNamespaceIndexFunc
    • Indices:存储缓存器,key 为索引器名称,value 为缓存的数据,对于上面的示例就是 map["namespace"]map[namespace]sets.pod
  6. 可能最容易混淆的是 Indexers 和 Indices 这两个概念,因为平时很多时候我们没有怎么区分二者的关系,这里我们可以这样理解:dexers 是存储索引的,Indices 里面是存储的真正的数据(对象键,这样可能更好理解。

2.3 Indexer的原理-ThreadSafeMap

  1. 上面我们理解了 Indexer 中的几个重要的数据类型,下面我们来看下 Indexer 接口的具体实现 cache,位于文件 k8s.io/client-go/tools/cache/store.go 中:
// [k8s.io/client-go/tools/cache/store.go](http://k8s.io/client-go/tools/cache/store.go)// cache 用一个 ThreadSafeStore 和一个关联的 KeyFunc 来实现 Indexer
type cache struct {// cacheStorage 是一个线程安全的存储cacheStorage ThreadSafeStore// keyFunc 用于计算对象键keyFunc KeyFunc
}
  1. 我们可以看到这个 cache 包含一个 ThreadSafeStore 的属性,这是一个并发安全的存储,因为是存储,所以自然就有存储相关的增、删、改、查等操作,Indexer 就是在 ThreadSafeMap 基础上进行封装的,实现了索引相关的功能。接下来我们先来看看 ThreadSafeStore 的定义,位于 k8s.io/client-go/tools/cache/thread_safe_store.go 文件中:
type ThreadSafeStore interface {Add(key string, obj interface{})Update(key string, obj interface{})Delete(key string)Get(key string) (item interface{}, exists bool)List() []interface{}ListKeys() []stringReplace(map[string]interface{}, string)Index(indexName string, obj interface{}) ([]interface{}, error)IndexKeys(indexName, indexKey string) ([]string, error)ListIndexFuncValues(name string) []stringByIndex(indexName, indexKey string) ([]interface{}, error)GetIndexers() IndexersAddIndexers(newIndexers Indexers) errorResync() error
}
  1. 从接口的定义可以看出 ThreadSafeStore 和 Index 基本上差不多,但还是有一些区别的,这个接口是需要通过对象键来进行索引的。接下来我们来看看这个接口的具体实现 threadSafeMap 的定义
// k8s.io/client-go/tools/cache/thread_safe_store.go// threadSafeMap 实现了 ThreadSafeStore
type threadSafeMap struct {lock  sync.RWMutex// 存储资源对象数据,key(对象键) 通过 keyFunc 得到// 这就是真正存储的底层数据(对象键 -> 对象)items map[string]interface{}// indexers 索引分类与索引键函数的映射indexers Indexers// indices 通过索引可以快速找到对象键indices Indices
}
  1. 不要把索引键和对象键搞混了,索引键是用于对象快速查找的;对象键是对象在存储中的唯一命名,对象是通过名字+对象的方式存储的。接下来我们来仔细看下接口的具体实现,首先还是比较简单的 Add、Delete、Update 几个函数的实现:
// k8s.io/client-go/tools/cache/thread_safe_store.go// 添加对象
func (c *threadSafeMap) Add(key string, obj interface{}) {c.lock.Lock()defer c.lock.Unlock()// 获取老的对象oldObject := c.items[key]// 写入新的对象,items 中存的是 objKey -> obj 的映射c.items[key] = obj// 添加了新的对象,所以要更新索引c.updateIndices(oldObject, obj, key)
}// 更新对象,可以看到实现和 Add 是一样的
func (c *threadSafeMap) Update(key string, obj interface{}) {c.lock.Lock()defer c.lock.Unlock()oldObject := c.items[key]c.items[key] = objc.updateIndices(oldObject, obj, key)
}// 删除对象
func (c *threadSafeMap) Delete(key string) {c.lock.Lock()defer c.lock.Unlock()// 判断对象是否存在,存在才执行删除操作if obj, exists := c.items[key]; exists {// 删除对象索引c.deleteFromIndices(obj, key)// 删除对象本身delete(c.items, key)}
}
  1. 可以看到基本的实现比较简单,就是添加、更新、删除对象数据后,然后更新或删除对应的索引,所以我们需要查看下更新或删除索引的具体实现:
// k8s.io/client-go/tools/cache/thread_safe_store.go
// updateIndices 更新索引
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {// 如果有旧的对象,需要先从索引中删除这个对象if oldObj != nil {c.deleteFromIndices(oldObj, key)}// 循环所有的索引器for name, indexFunc := range c.indexers {// 获取对象的索引键indexValues, err := indexFunc(newObj)if err != nil {panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))}// 得到当前索引器的索引index := c.indices[name]if index == nil {// 没有对应的索引,则初始化一个索引index = Index{}c.indices[name] = index}// 循环所有的索引键for _, indexValue := range indexValues {// 得到索引键对应的对象键列表set := index[indexValue]if set == nil {// 没有对象键列表则初始化一个空列表set = sets.String{}index[indexValue] = set}// 将对象键插入到集合中,方便索引set.Insert(key)}}
}// deleteFromIndices 删除对象索引
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {// 循环所有的索引器for name, indexFunc := range c.indexers {// 获取删除对象的索引键列表indexValues, err := indexFunc(obj)if err != nil {panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))}// 获取当前索引器的索引index := c.indices[name]if index == nil {continue}// 循环所有索引键for _, indexValue := range indexValues {// 获取索引键对应的对象键列表set := index[indexValue]if set != nil {// 从对象键列表中删除当前要删除的对象键set.Delete(key)// 如果当集合为空的时候不删除set,那么具有高基数的短生命资源的 indices 会导致未使用的空集合随时间增加内存。// `kubernetes/kubernetes/issues/84959`.if len(set) == 0 {delete(index, indexValue)}}}}
}
  1. 添加索引和删除索引的实现都挺简单的,其实主要还是要对 indices、indexs 这些数据结构非常了解,这样就非常容易了,我们可以将 indexFunc 当成当前对象的命名空间来看待,这样对于上面的索引更新和删除的理解就肯定没问题了。然后接下来就是几个查询相关的接口实现:
// k8s.io/client-go/tools/cache/thread_safe_store.go// 获取对象
func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {c.lock.RLock()  // 只需要读锁defer c.lock.RUnlock()// 直接从 map 中读取值item, exists = c.items[key]return item, exists
}// 对象列举
func (c *threadSafeMap) List() []interface{} {c.lock.RLock()defer c.lock.RUnlock()list := make([]interface{}, 0, len(c.items))for _, item := range c.items {list = append(list, item)}return list
}// 返回 threadSafeMap 中所有的对象键列表
func (c *threadSafeMap) ListKeys() []string {c.lock.RLock()defer c.lock.RUnlock()list := make([]string, 0, len(c.items))for key := range c.items {list = append(list, key)}return list
}// 替换所有对象,相当于重新构建索引
func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {c.lock.Lock()defer c.lock.Unlock()// 直接覆盖之前的对象c.items = items// 重新构建索引c.indices = Indices{}for key, item := range c.items {// 更新元素的索引c.updateIndices(nil, item, key)}
}
  1. 然后接下来就是和索引相关的几个接口实现,第一个就是 Index 函数
// k8s.io/client-go/tools/cache/thread_safe_store.go// 通过指定的索引器和对象获取符合这个对象特征的所有对象
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {c.lock.RLock()defer c.lock.RUnlock()// 获得索引器 indexName 的索引键计算函数indexFunc := c.indexers[indexName]if indexFunc == nil {return nil, fmt.Errorf("Index with name %s does not exist", indexName)}// 获取指定 obj 对象的索引键indexedValues, err := indexFunc(obj)if err != nil {return nil, err}// 获得索引器 indexName 的所有索引index := c.indices[indexName]// 用来存储对象键的集合var storeKeySet sets.Stringif len(indexedValues) == 1 {// 大多数情况下只有一个值匹配(默认获取的索引键就是对象的 namespace)// 直接拿到这个索引键的对象键集合storeKeySet = index[indexedValues[0]]} else {// 由于有多个索引键,则可能有重复的对象键出现,索引需要去重storeKeySet = sets.String{}// 循环索引键for _, indexedValue := range indexedValues {// 循环索引键下面的对象键,因为要去重for key := range index[indexedValue] {storeKeySet.Insert(key)}}}// 拿到了所有的对象键集合过后,循环拿到所有的对象集合list := make([]interface{}, 0, storeKeySet.Len())for storeKey := range storeKeySet {list = append(list, c.items[storeKey])}return list, nil
}
  1. 这个 Index 函数就是获取一个指定对象的索引键,然后把这个索引键下面的所有的对象全部获取到,比如我们要获取一个 Pod 所在命名空间下面的所有 Pod,如果更抽象一点,就是符合对象某些特征的所有对象,而这个特征就是我们指定的索引键函数计算出来的。然后接下来就是一个比较重要的 ByIndex 函数的实现:
// k8s.io/client-go/tools/cache/thread_safe_store.go
// 和上面的 Index 函数类似基本一样,只是是直接指定的索引键
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {c.lock.RLock()defer c.lock.RUnlock()// 获得索引器 indexName 的索引键计算函数indexFunc := c.indexers[indexName]if indexFunc == nil {return nil, fmt.Errorf("Index with name %s does not exist", indexName)}// 获得索引器 indexName 的所有索引index := c.indices[indexName]// 获取指定索引键的所有所有对象键set := index[indexedValue]// 然后根据对象键遍历获取对象list := make([]interface{}, 0, set.Len())for key := range set {list = append(list, c.items[key])}return list, nil
}
  1. 可以很清楚地看到 ByIndex 函数和 Index 函数比较类似,但是更简单了,直接获取一个指定的索引键的全部资源对象。然后是其他几个索引相关的函数:
// k8s.io/client-go/tools/cache/thread_safe_store.go
// IndexKeys 和上面的 ByIndex 几乎是一样的,只是这里是直接返回对象键列表
func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, error) {c.lock.RLock()defer c.lock.RUnlock()// 获取索引器 indexName 的索引键计算函数indexFunc := c.indexers[indexName]if indexFunc == nil {return nil, fmt.Errorf("Index with name %s does not exist", indexName)}// 获取索引器 indexName 的所有索引index := c.indices[indexName]// 直接获取指定索引键的对象键集合set := index[indexedValue]return set.List(), nil
}// 获取索引器下面的所有索引键
func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {c.lock.RLock()defer c.lock.RUnlock()// 获取索引器 indexName 的所有索引index := c.indices[indexName]names := make([]string, 0, len(index))// 遍历索引得到索引键for key := range index {names = append(names, key)}return names
}// 直接返回 indexers
func (c *threadSafeMap) GetIndexers() Indexers {return c.indexers
}// 添加一个新的 Indexers
func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {c.lock.Lock()defer c.lock.Unlock()if len(c.items) > 0 {return fmt.Errorf("cannot add indexers to running index")}// 获取旧的索引器和新的索引器keysoldKeys := sets.StringKeySet(c.indexers)newKeys := sets.StringKeySet(newIndexers)// 如果包含新的索引器,则提示冲突if oldKeys.HasAny(newKeys.List()...) {return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))}// 将新的索引器添加到 Indexers 中for k, v := range newIndexers {c.indexers[k] = v}return nil
}// 没有真正实现 Resync 操作
func (c *threadSafeMap) Resync() error {return nil
}
  1. 这里我们就将 ThreadSafeMap 的实现进行了分析说明。整体来说比较方便,一个就是将对象数据存入到一个 map 中,然后就是维护索引,方便根据索引来查找到对应的对象。

2.4 回看cache的实现原理

  1. 接下来再回过头去看cache 的实现就非常简单了,因为 cache 就是对 ThreadSafeStore 的一个再次封装,很多操作都是直接调用的 ThreadSafeStore 的操作实现的,如下所示:
// k8s.io/client-go/tools/cache/store.go
// Add 插入一个元素到 cache 中
func (c *cache) Add(obj interface{}) error {key, err := c.keyFunc(obj)  // 生成对象键if err != nil {return KeyError{obj, err}}// 将对象添加到底层的 ThreadSafeStore 中c.cacheStorage.Add(key, obj)return nil
}// 更新cache中的对象
func (c *cache) Update(obj interface{}) error {key, err := c.keyFunc(obj)if err != nil {return KeyError{obj, err}}c.cacheStorage.Update(key, obj)return nil
}// 删除cache中的对象
func (c *cache) Delete(obj interface{}) error {key, err := c.keyFunc(obj)if err != nil {return KeyError{obj, err}}c.cacheStorage.Delete(key)return nil
}// 得到cache中所有的对象
func (c *cache) List() []interface{} {return c.cacheStorage.List()
}// 得到cache中所有的对象键
func (c *cache) ListKeys() []string {return c.cacheStorage.ListKeys()
}// 得到cache中的Indexers
func (c *cache) GetIndexers() Indexers {return c.cacheStorage.GetIndexers()
}// 得到对象obj与indexName索引器关联的所有对象
func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) {return c.cacheStorage.Index(indexName, obj)
}func (c *cache) IndexKeys(indexName, indexKey string) ([]string, error) {return c.cacheStorage.IndexKeys(indexName, indexKey)
}func (c *cache) ListIndexFuncValues(indexName string) []string {return c.cacheStorage.ListIndexFuncValues(indexName)
}func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) {return c.cacheStorage.ByIndex(indexName, indexKey)
}func (c *cache) AddIndexers(newIndexers Indexers) error {return c.cacheStorage.AddIndexers(newIndexers)
}func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {key, err := c.keyFunc(obj)if err != nil {return nil, false, KeyError{obj, err}}return c.GetByKey(key)
}func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {item, exists = c.cacheStorage.Get(key)return item, exists, nil
}// 替换cache中所有的对象
func (c *cache) Replace(list []interface{}, resourceVersion string) error {items := make(map[string]interface{}, len(list))for _, item := range list {key, err := c.keyFunc(item)if err != nil {return KeyError{item, err}}items[key] = item}c.cacheStorage.Replace(items, resourceVersion)return nil
}func (c *cache) Resync() error {return nil
}
  1. 可以看到 cache 没有自己独特的实现方式,都是调用的包含的 ThreadSafeStore 操作接口。

2.5 Index的原理总结

  1. 前面我们已经知道了 Reflector 通过 ListAndWatch 把数据传入 DeltaFIFO 后,经过 DeltaFIFO 的 Pop 函数将资源对象存入到了本地的一个存储 Indexer 中,而这个底层真正的存储其实就是上面的 ThreadSafeStore。
  2. 要理解 Indexer 组件,最主要就是要把索引、索引器(索引分类)、索引键、对象键这几个概念弄清楚,有时候确实容易混乱,我们将上面的示例理解了应该就很好理解了,我们可以简单的理解为 Indexer 就是简单的把相同命名空间的对象放在一个集合中,然后基于命名空间来查找对象

第四课 k8s源码学习和二次开发-DeltaFIFO和Indexer原理学习相关推荐

  1. 第十四课 k8s源码学习和二次开发原理篇-调度器原理

    第十四课 k8s源码学习和二次开发原理篇-调度器原理 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第十四课 k8s源码学习和二次开发原理篇-调度器原理 第一节 ...

  2. 第八课 k8s源码学习和二次开发原理篇-KubeBuilder使用和Controller-runtime原理

    第八课 k8s源码学习和二次开发原理篇-KubeBuilder使用和Controller-runtime原理 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第八课 ...

  3. 第三课 k8s源码学习和二次开发-缓存机制Informers和Reflector组件学习

    第三课 k8s源码学习和二次开发-缓存机制Informers和Reflector组件学习 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第三课 k8s源码学习和二 ...

  4. 类似爱美刻 右糖 轻剪辑 捷映 秀展网 秀多多 来画 创视网 传影 影大师 闪剪源码 技术源头 二次开发 提供源码 逗拍 趣推 飞推 美册 搞定视频 简影 剪影 爱字幕 幸福相册 八角星

    需要源码的下面评论 介绍 类似爱美刻 右糖 轻剪辑 捷映 秀展网 秀多多 来画 创视网 传影 影大师 闪剪源码 技术源头 二次开发 提供源码. 类似 逗拍 趣推 飞推 美册 搞定视频 简影 剪影 爱字 ...

  5. 【流媒体开发】VLC Media Player - Android 平台源码编译 与 二次开发详解 (提供详细800M下载好的编译源码及eclipse可调试播放器源码下载)

    作者 : 韩曙亮  博客地址 : http://blog.csdn.net/shulianghan/article/details/42707293 转载请注明出处 : http://blog.csd ...

  6. 网狐荣耀源码(含内核源码)可二次开发

    网狐荣耀版本,可以二次开发. 含内核源码(可自定义消息包加解密),含约战服务(房卡模式),十几个子游戏源码,网站源码(网站前台,网站后台,房卡后台). PS:这几天熬夜弄这个,可算能好好睡一下了! 转 ...

  7. 超强在线考试系统源码(私有部署二次开发)

    随着信息化技术的发展,考试系统也在进行着深入的变革.从传统的纸质考试人工评分到现在的在线考试自动评分. 在线考试系统的应用场景也在逐渐扩宽,例如:学校的学生考试.员工培训考试.招聘考试.职称考试等等. ...

  8. Openfire开发配置,Openfire源码配置,OpenFire二次开发配置

    1.下载源码:http://www.igniterealtime.org/downloads/source.jsp 2.把源码解压出的openfire_src目录放至eclipse workplace ...

  9. (源码)CAD二次开发—实现AutoCAD中缺失字体自动替换

    使用C# 实现AutoCAD中缺失字体的自动替换 设计思路: 获取到当前字体样式的设计集合 遍历所有的字体样式,对对应的字体样式进行查找 如果找不到则进行替换 namespace AutoCADTo ...

最新文章

  1. php支付密码控件,vue支付密码的图文实例
  2. serialVersionUID的作用(转)
  3. day12装饰器进阶
  4. C# list与数组互相转换
  5. 常用CSS代码片段常见css bug
  6. Bytom国密网说明和指南
  7. mysql查询所有姓王的信息_MySQL的查询练习
  8. python识别验证码登陆学校网站
  9. php服务器启动错误,服务器意外重启之后PHP-FPM不能启动
  10. 【正点原子MP157连载】第六章STM32Cube固件包-摘自【正点原子】STM32MP1 M4裸机CubeIDE开发指南
  11. 如何清理和删除 Docker 镜像
  12. a5松下驱动器参数设置表_「精品干货」松下A5伺服驱动器参数设置与常见故障解决分析...
  13. 485的信号测试软件,RS485通信测试项目中的压力测试方法、原理及基本测试模型...
  14. realtek没有禁用前面板_为什么HD声卡必须禁用前面板插孔检测前置耳机和麦克才可以有声...
  15. 《数解道法》(一)前言
  16. 这8个坏习惯加重体内湿气,一定要改掉!否则……
  17. 开源软件的安全性风险_认真对待开源安全性
  18. 爬虫实战之爬取电影天堂全部电影信息
  19. 微信小程序 - BILIBILI-demo
  20. 2021华数杯数学建模选题建议

热门文章

  1. 无线网络hack学习1
  2. oracle12c没有单库口令,ORA-01017:无效的用户名/密码;登录被Oracle 12c“数据库配置助手”工具引发被拒绝...
  3. unity优化杂谈1
  4. jupyter安装及配置
  5. JavaScript小案例——登入页面(密码小眼睛)
  6. Python求水仙花数(包含简单运算符使用方法)初级学习
  7. 有关字符常量和字符变量
  8. 单片机AT指令操作GA6-B短信模块连接阿里云MQTT服务器(双向通信)
  9. ubuntu 16.04 安装后需要做的事情
  10. 自定义标签---word