本文个人博客地址:https://www.huweihuang.com/kubernetes-notes/code-analysis/kubelet/syncLoopIteration.html

kubelet源码分析(四)之 syncLoopIteration

以下代码分析基于 kubernetes v1.12.0 版本。

本文主要分析kubelet中syncLoopIteration部分。syncLoopIteration通过几种channel来对不同类型的事件进行监听并做增删改查的处理。

1. syncLoop

syncLoop是处理变更的循环。 它监听来自三种channel(file,apiserver和http)的更改。 对于看到的任何新更改,将针对所需状态和运行状态运行同步。 如果没有看到配置的变化,将在每个同步频率秒同步最后已知的所需状态。

此部分代码位于pkg/kubelet/kubelet.go

// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {glog.Info("Starting kubelet main sync loop.")// The resyncTicker wakes up kubelet to checks if there are any pod workers// that need to be sync'd. A one-second period is sufficient because the// sync interval is defaulted to 10s.syncTicker := time.NewTicker(time.Second)defer syncTicker.Stop()housekeepingTicker := time.NewTicker(housekeepingPeriod)defer housekeepingTicker.Stop()plegCh := kl.pleg.Watch()const (base   = 100 * time.Millisecondmax    = 5 * time.Secondfactor = 2)duration := basefor {if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 {glog.Infof("skipping pod synchronization - %v", rs)// exponential backofftime.Sleep(duration)duration = time.Duration(math.Min(float64(max), factor*float64(duration)))continue}// reset backoff if we have a successduration = basekl.syncLoopMonitor.Store(kl.clock.Now())if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {break}kl.syncLoopMonitor.Store(kl.clock.Now())}
}

其中调用了syncLoopIteration的函数来执行更具体的监控pod变化的循环。

2. syncLoopIteration

syncLoopIteration主要通过几种channel来对不同类型的事件进行监听并处理。其中包括:configChplegChsyncChhouseKeepingChlivenessManager.Updates()

syncLoopIteration实际执行了pod的操作,此部分设置了几种不同的channel:

syncLoopIteration部分代码位于pkg/kubelet/kubelet.go

2.1. configCh

configCh将配置更改的pod分派给事件类型的相应处理程序回调,该部分主要通过SyncHandler对pod的不同事件进行增删改查等操作。

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {case u, open := <-configCh:// Update from a config source; dispatch it to the right handler// callback.if !open {glog.Errorf("Update channel is closed. Exiting the sync loop.")return false}switch u.Op {case kubetypes.ADD:glog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))// After restarting, kubelet will get all existing pods through// ADD as if they are new pods. These pods will then go through the// admission process and *may* be rejected. This can be resolved// once we have checkpointing.handler.HandlePodAdditions(u.Pods)case kubetypes.UPDATE:glog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))handler.HandlePodUpdates(u.Pods)case kubetypes.REMOVE:glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))handler.HandlePodRemoves(u.Pods)case kubetypes.RECONCILE:glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))handler.HandlePodReconcile(u.Pods)case kubetypes.DELETE:glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))// DELETE is treated as a UPDATE because of graceful deletion.handler.HandlePodUpdates(u.Pods)case kubetypes.RESTORE:glog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))// These are pods restored from the checkpoint. Treat them as new// pods.handler.HandlePodAdditions(u.Pods)case kubetypes.SET:// TODO: Do we want to support this?glog.Errorf("Kubelet does not support snapshot update")}...
}

可以看出syncLoopIteration根据podUpdate的值来执行不同的pod操作,具体如下:

  • ADD:HandlePodAdditions
  • UPDATE:HandlePodUpdates
  • REMOVE:HandlePodRemoves
  • RECONCILE:HandlePodReconcile
  • DELETE:HandlePodUpdates
  • RESTORE:HandlePodAdditions
  • podsToSync:HandlePodSyncs

其中执行pod的handler操作的是SyncHandler,该类型是一个接口,实现体为kubelet本身,具体见后续分析。

2.2. plegCh

plegCh:更新runtime缓存,同步pod。此处调用了HandlePodSyncs的函数。

case e := <-plegCh:if isSyncPodWorthy(e) {// PLEG event for a pod; sync it.if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)handler.HandlePodSyncs([]*v1.Pod{pod})} else {// If the pod no longer exists, ignore the event.glog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)}}if e.Type == pleg.ContainerDied {if containerID, ok := e.Data.(string); ok {kl.cleanUpContainersInPod(e.ID, containerID)}}

2.3. syncCh

syncCh:同步所有等待同步的pod。此处调用了HandlePodSyncs的函数。

case <-syncCh:// Sync pods waiting for syncpodsToSync := kl.getPodsToSync()if len(podsToSync) == 0 {break}glog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))handler.HandlePodSyncs(podsToSync)

2.4. livenessManager.Update

livenessManager.Updates():对失败的pod或者liveness检查失败的pod进行sync操作。此处调用了HandlePodSyncs的函数。

case update := <-kl.livenessManager.Updates():if update.Result == proberesults.Failure {// The liveness manager detected a failure; sync the pod.// We should not use the pod from livenessManager, because it is never updated after// initialization.pod, ok := kl.podManager.GetPodByUID(update.PodUID)if !ok {// If the pod no longer exists, ignore the update.glog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)break}glog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))handler.HandlePodSyncs([]*v1.Pod{pod})}

2.5. housekeepingCh

houseKeepingCh:触发清理pod。此处调用了HandlePodCleanups的函数。

case <-housekeepingCh:if !kl.sourcesReady.AllReady() {// If the sources aren't ready or volume manager has not yet synced the states,// skip housekeeping, as we may accidentally delete pods from unready sources.glog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")} else {glog.V(4).Infof("SyncLoop (housekeeping)")if err := handler.HandlePodCleanups(); err != nil {glog.Errorf("Failed cleaning pods: %v", err)}}

3. SyncHandler

SyncHandler是一个定义Pod的不同Handler的接口,具体是实现者是kubelet,该接口的方法主要在syncLoopIteration中调用,接口定义如下:

// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {HandlePodAdditions(pods []*v1.Pod)HandlePodUpdates(pods []*v1.Pod)HandlePodRemoves(pods []*v1.Pod)HandlePodReconcile(pods []*v1.Pod)HandlePodSyncs(pods []*v1.Pod)HandlePodCleanups() error
}

SyncHandler部分代码位于pkg/kubelet/kubelet.go

3.1. HandlePodAdditions

HandlePodAdditions先根据pod创建时间对pod进行排序,然后遍历pod列表,来执行pod的相关操作。

// HandlePodAdditions is the callback in SyncHandler for pods being added from
// a config source.
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {start := kl.clock.Now()sort.Sort(sliceutils.PodsByCreationTime(pods))for _, pod := range pods {...}
}

将pod添加到pod manager中。

for _, pod := range pods {// Responsible for checking limits in resolv.confif kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {kl.dnsConfigurer.CheckLimitsForResolvConf()}existingPods := kl.podManager.GetPods()// Always add the pod to the pod manager. Kubelet relies on the pod// manager as the source of truth for the desired state. If a pod does// not exist in the pod manager, it means that it has been deleted in// the apiserver and no action (other than cleanup) is required.kl.podManager.AddPod(pod)...
}

如果是mirror pod,则对mirror pod进行处理。

if kubepod.IsMirrorPod(pod) {kl.handleMirrorPod(pod, start)continue
}

如果当前pod的状态不是Terminated状态,则判断是否接受该pod,如果不接受则将pod状态改为Failed

if !kl.podIsTerminated(pod) {// Only go through the admission process if the pod is not// terminated.// We failed pods that we rejected, so activePods include all admitted// pods that are alive.activePods := kl.filterOutTerminatedPods(existingPods)// Check if we can admit the pod; if not, reject it.if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {kl.rejectPod(pod, reason, message)continue}
}

执行dispatchWork函数,该函数是syncHandler中调用到的核心函数,该函数在pod worker中启动一个异步循环,来分派pod的相关操作。该函数的具体操作待后续分析。

mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)

最后加pod添加到probe manager中。

kl.probeManager.AddPod(pod)

3.2. HandlePodUpdates

HandlePodUpdates同样遍历pod列表,执行相应的操作。

// HandlePodUpdates is the callback in the SyncHandler interface for pods
// being updated from a config source.
func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {start := kl.clock.Now()for _, pod := range pods {...}
}

将pod更新到pod manager中。

for _, pod := range pods {// Responsible for checking limits in resolv.confif kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {kl.dnsConfigurer.CheckLimitsForResolvConf()}kl.podManager.UpdatePod(pod)...
}

如果是mirror pod,则对mirror pod进行处理。

if kubepod.IsMirrorPod(pod) {kl.handleMirrorPod(pod, start)continue
}

执行dispatchWork函数。

// TODO: Evaluate if we need to validate and reject updates.mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)

3.3. HandlePodRemoves

HandlePodRemoves遍历pod列表。

// HandlePodRemoves is the callback in the SyncHandler interface for pods
// being removed from a config source.
func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {start := kl.clock.Now()for _, pod := range pods {...}
}

从pod manager中删除pod。

for _, pod := range pods {kl.podManager.DeletePod(pod)...
}

如果是mirror pod,则对mirror pod进行处理。

if kubepod.IsMirrorPod(pod) {kl.handleMirrorPod(pod, start)continue
}

调用kubelet的deletePod函数来删除pod。

// Deletion is allowed to fail because the periodic cleanup routine
// will trigger deletion again.
if err := kl.deletePod(pod); err != nil {glog.V(2).Infof("Failed to delete pod %q, err: %v", format.Pod(pod), err)
}

deletePod 函数将需要删除的pod加入podKillingCh的channel中,有podKiller监听这个channel去执行删除任务,实现如下:

// deletePod deletes the pod from the internal state of the kubelet by:
// 1.  stopping the associated pod worker asynchronously
// 2.  signaling to kill the pod by sending on the podKillingCh channel
//
// deletePod returns an error if not all sources are ready or the pod is not
// found in the runtime cache.
func (kl *Kubelet) deletePod(pod *v1.Pod) error {if pod == nil {return fmt.Errorf("deletePod does not allow nil pod")}if !kl.sourcesReady.AllReady() {// If the sources aren't ready, skip deletion, as we may accidentally delete pods// for sources that haven't reported yet.return fmt.Errorf("skipping delete because sources aren't ready yet")}kl.podWorkers.ForgetWorker(pod.UID)// Runtime cache may not have been updated to with the pod, but it's okay// because the periodic cleanup routine will attempt to delete again later.runningPods, err := kl.runtimeCache.GetPods()if err != nil {return fmt.Errorf("error listing containers: %v", err)}runningPod := kubecontainer.Pods(runningPods).FindPod("", pod.UID)if runningPod.IsEmpty() {return fmt.Errorf("pod not found")}podPair := kubecontainer.PodPair{APIPod: pod, RunningPod: &runningPod}kl.podKillingCh <- &podPair// TODO: delete the mirror pod here?// We leave the volume/directory cleanup to the periodic cleanup routine.return nil
}

从probe manager中移除pod。

kl.probeManager.RemovePod(pod)

3.4. HandlePodReconcile

遍历pod列表。

// HandlePodReconcile is the callback in the SyncHandler interface for pods
// that should be reconciled.
func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {start := kl.clock.Now()for _, pod := range pods {...}
}

将pod更新到pod manager中。

for _, pod := range pods {// Update the pod in pod manager, status manager will do periodically reconcile according// to the pod manager.kl.podManager.UpdatePod(pod)...
}

必要时调整pod的Ready状态,执行dispatchWork函数。

// Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation.
if status.NeedToReconcilePodReadiness(pod) {mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
}

如果pod被设定为需要被驱逐的,则删除pod中的容器。

// After an evicted pod is synced, all dead containers in the pod can be removed.
if eviction.PodIsEvicted(pod.Status) {if podStatus, err := kl.podCache.Get(pod.UID); err == nil {kl.containerDeletor.deleteContainersInPod("", podStatus, true)}
}

3.5. HandlePodSyncs

HandlePodSyncssyncHandler接口回调函数,调用dispatchWork,通过pod worker来执行任务。

// HandlePodSyncs is the callback in the syncHandler interface for pods
// that should be dispatched to pod workers for sync.
func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {start := kl.clock.Now()for _, pod := range pods {mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)}
}

3.6. HandlePodCleanups

HandlePodCleanups主要用来执行pod的清理任务,其中包括terminating的pod,orphaned的pod等。

首先查看pod使用到的cgroup。

// HandlePodCleanups performs a series of cleanup work, including terminating
// pod workers, killing unwanted pods, and removing orphaned volumes/pod
// directories.
// NOTE: This function is executed by the main sync loop, so it
// should not contain any blocking calls.
func (kl *Kubelet) HandlePodCleanups() error {// The kubelet lacks checkpointing, so we need to introspect the set of pods// in the cgroup tree prior to inspecting the set of pods in our pod manager.// this ensures our view of the cgroup tree does not mistakenly observe pods// that are added after the fact...var (cgroupPods map[types.UID]cm.CgroupNameerr        error)if kl.cgroupsPerQOS {pcm := kl.containerManager.NewPodContainerManager()cgroupPods, err = pcm.GetAllPodsFromCgroups()if err != nil {return fmt.Errorf("failed to get list of pods that still exist on cgroup mounts: %v", err)}}...
}

列出所有pod包括mirror pod。

allPods, mirrorPods := kl.podManager.GetPodsAndMirrorPods()
// Pod phase progresses monotonically. Once a pod has reached a final state,
// it should never leave regardless of the restart policy. The statuses
// of such pods should not be changed, and there is no need to sync them.
// TODO: the logic here does not handle two cases:
//   1. If the containers were removed immediately after they died, kubelet
//      may fail to generate correct statuses, let alone filtering correctly.
//   2. If kubelet restarted before writing the terminated status for a pod
//      to the apiserver, it could still restart the terminated pod (even
//      though the pod was not considered terminated by the apiserver).
// These two conditions could be alleviated by checkpointing kubelet.
activePods := kl.filterOutTerminatedPods(allPods)desiredPods := make(map[types.UID]empty)
for _, pod := range activePods {desiredPods[pod.UID] = empty{}
}

pod worker停止不再存在的pod的任务,并从probe manager中清除pod。

// Stop the workers for no-longer existing pods.
// TODO: is here the best place to forget pod workers?
kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)
kl.probeManager.CleanupPods(activePods)

将需要杀死的pod加入到podKillingCh的channel中,podKiller的任务会监听该channel并获取需要杀死的pod列表来执行杀死pod的操作。

runningPods, err := kl.runtimeCache.GetPods()
if err != nil {glog.Errorf("Error listing containers: %#v", err)return err
}
for _, pod := range runningPods {if _, found := desiredPods[pod.ID]; !found {kl.podKillingCh <- &kubecontainer.PodPair{APIPod: nil, RunningPod: pod}}
}

当pod不再被绑定到该节点,移除podStatus,其中removeOrphanedPodStatuses最后调用的函数是statusManagerRemoveOrphanedStatuses方法。

kl.removeOrphanedPodStatuses(allPods, mirrorPods)

移除所有的orphaned volume。

// Remove any orphaned volumes.
// Note that we pass all pods (including terminated pods) to the function,
// so that we don't remove volumes associated with terminated but not yet
// deleted pods.
err = kl.cleanupOrphanedPodDirs(allPods, runningPods)
if err != nil {// We want all cleanup tasks to be run even if one of them failed. So// we just log an error here and continue other cleanup tasks.// This also applies to the other clean up tasks.glog.Errorf("Failed cleaning up orphaned pod directories: %v", err)
}

移除mirror pod。

// Remove any orphaned mirror pods.
kl.podManager.DeleteOrphanedMirrorPods()

删除不再运行的pod的cgroup。

// Remove any cgroups in the hierarchy for pods that are no longer running.
if kl.cgroupsPerQOS {kl.cleanupOrphanedPodCgroups(cgroupPods, activePods)
}

执行垃圾回收(GC)操作。

kl.backOff.GC()

4. dispatchWork

dispatchWork通过pod worker启动一个异步的循环。

完整代码如下:

// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod is terminated, dispatchWork
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {if kl.podIsTerminated(pod) {if pod.DeletionTimestamp != nil {// If the pod is in a terminated state, there is no pod worker to// handle the work item. Check if the DeletionTimestamp has been// set, and force a status update to trigger a pod deletion request// to the apiserver.kl.statusManager.TerminatePod(pod)}return}// Run the sync in an async worker.kl.podWorkers.UpdatePod(&UpdatePodOptions{Pod:        pod,MirrorPod:  mirrorPod,UpdateType: syncType,OnCompleteFunc: func(err error) {if err != nil {metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))}},})// Note the number of containers for new pods.if syncType == kubetypes.SyncPodCreate {metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))}
}

以下分段进行分析:

如果pod的状态是处于Terminated状态,则执行statusManagerTerminatePod操作。

// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod is terminated, dispatchWork
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {if kl.podIsTerminated(pod) {if pod.DeletionTimestamp != nil {// If the pod is in a terminated state, there is no pod worker to// handle the work item. Check if the DeletionTimestamp has been// set, and force a status update to trigger a pod deletion request// to the apiserver.kl.statusManager.TerminatePod(pod)}return}...
}

执行pod worker的UpdatePod函数,该函数是pod worker的核心函数,来执行pod相关操作。具体逻辑待下文分析。

// Run the sync in an async worker.
kl.podWorkers.UpdatePod(&UpdatePodOptions{Pod:        pod,MirrorPod:  mirrorPod,UpdateType: syncType,OnCompleteFunc: func(err error) {if err != nil {metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))}},
})

当创建类型是SyncPodCreate(即创建pod的时候),统计新pod中容器的数目。

// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}

5. PodWorkers.UpdatePod

PodWorkers是一个接口类型:

// PodWorkers is an abstract interface for testability.
type PodWorkers interface {UpdatePod(options *UpdatePodOptions)ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)ForgetWorker(uid types.UID)
}

其中UpdatePod是一个核心方法,通过podUpdates的channel来传递需要处理的pod信息,对于新创建的pod每个pod都会由一个goroutine来执行managePodLoop

此部分代码位于pkg/kubelet/pod_workers.go

// Apply the new setting to the specified pod.
// If the options provide an OnCompleteFunc, the function is invoked if the update is accepted.
// Update requests are ignored if a kill pod request is pending.
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {pod := options.Poduid := pod.UIDvar podUpdates chan UpdatePodOptionsvar exists boolp.podLock.Lock()defer p.podLock.Unlock()if podUpdates, exists = p.podUpdates[uid]; !exists {// We need to have a buffer here, because checkForUpdates() method that// puts an update into channel is called from the same goroutine where// the channel is consumed. However, it is guaranteed that in such case// the channel is empty, so buffer of size 1 is enough.podUpdates = make(chan UpdatePodOptions, 1)p.podUpdates[uid] = podUpdates// Creating a new pod worker either means this is a new pod, or that the// kubelet just restarted. In either case the kubelet is willing to believe// the status of the pod for the first pod worker sync. See corresponding// comment in syncPod.go func() {defer runtime.HandleCrash()p.managePodLoop(podUpdates)}()}if !p.isWorking[pod.UID] {p.isWorking[pod.UID] = truepodUpdates <- *options} else {// if a request to kill a pod is pending, we do not let anything overwrite that request.update, found := p.lastUndeliveredWorkUpdate[pod.UID]if !found || update.UpdateType != kubetypes.SyncPodKill {p.lastUndeliveredWorkUpdate[pod.UID] = *options}}
}

6. managePodLoop

managePodLoop通过读取podUpdateschannel的信息,执行syncPodFn函数,而syncPodFn函数在newPodWorkers的时候赋值了,即kubelet.syncPodkubelet.syncPod具体代码逻辑待后续文章单独分析。

// newPodWorkers传入syncPod函数
klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)

newPodWorkers函数参考:

func newPodWorkers(syncPodFn syncPodFnType, recorder record.EventRecorder, workQueue queue.WorkQueue,resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers {return &podWorkers{podUpdates:                map[types.UID]chan UpdatePodOptions{},isWorking:                 map[types.UID]bool{},lastUndeliveredWorkUpdate: map[types.UID]UpdatePodOptions{},syncPodFn:                 syncPodFn,  // 构造传入klet.syncPod函数recorder:                  recorder,workQueue:                 workQueue,resyncInterval:            resyncInterval,backOffPeriod:             backOffPeriod,podCache:                  podCache,}
}

managePodLoop函数参考:

此部分代码位于pkg/kubelet/pod_workers.go

func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {var lastSyncTime time.Timefor update := range podUpdates {err := func() error {podUID := update.Pod.UID// This is a blocking call that would return only if the cache// has an entry for the pod that is newer than minRuntimeCache// Time. This ensures the worker doesn't start syncing until// after the cache is at least newer than the finished time of// the previous sync.status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)if err != nil {// This is the legacy event thrown by manage pod loop// all other events are now dispatched from syncPodFnp.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)return err}err = p.syncPodFn(syncPodOptions{mirrorPod:      update.MirrorPod,pod:            update.Pod,podStatus:      status,killPodOptions: update.KillPodOptions,updateType:     update.UpdateType,})lastSyncTime = time.Now()return err}()// notify the call-back function if the operation succeeded or notif update.OnCompleteFunc != nil {update.OnCompleteFunc(err)}if err != nil {// IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errorsglog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err)}p.wrapUp(update.Pod.UID, err)}
}

7. 总结

syncLoopIteration基本流程如下:

  1. 通过几种channel来对不同类型的事件进行监听并处理。其中channel包括:configChplegChsyncChhouseKeepingChlivenessManager.Updates()
  2. 不同的SyncHandler执行不同的增删改查操作。
  3. 其中HandlePodAdditionsHandlePodUpdatesHandlePodReconcileHandlePodSyncs都调用到了dispatchWork来执行pod的相关操作。HandlePodCleanups的pod清理任务,通过channel的方式加需要清理的pod给podKiller来清理。
  4. dispatchWork调用podWorkers.UpdatePod执行异步操作。
  5. podWorkers.UpdatePod中调用managePodLoop来执行pod相关操作循环。

channel类型及作用:

  • configCh:将配置更改的pod分派给事件类型的相应处理程序回调。
  • plegCh:更新runtime缓存,同步pod。
  • syncCh:同步所有等待同步的pod。
  • houseKeepingCh:触发清理pod。
  • livenessManager.Updates():对失败的pod或者liveness检查失败的pod进行sync操作。

参考:

  • https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/kubelet/kubelet.go
  • https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/kubelet/pod_workers.go

kubelet源码分析(四)之 syncLoopIteration相关推荐

  1. kubelet源码分析(三)之 startKubelet

    本文个人博客地址:https://www.huweihuang.com/kubernetes-notes/code-analysis/kubelet/startKubelet.html kubelet ...

  2. ABP源码分析四十七:ABP中的异常处理

    ABP源码分析四十七:ABP中的异常处理 参考文章: (1)ABP源码分析四十七:ABP中的异常处理 (2)https://www.cnblogs.com/1zhk/p/5538983.html (3 ...

  3. 【投屏】Scrcpy源码分析四(最终章 - Server篇)

    Scrcpy源码分析系列 [投屏]Scrcpy源码分析一(编译篇) [投屏]Scrcpy源码分析二(Client篇-连接阶段) [投屏]Scrcpy源码分析三(Client篇-投屏阶段) [投屏]Sc ...

  4. gSOAP 源码分析(四)

    gSOAP 源码分析(四) 2012-6-2 邵盛松 前言 本文主要说明gSOAP中对Client的认证分析 gSOAP中包含了HTTP基本认证,NTLM认证等,还可以自定义SOAP Heard实现认 ...

  5. kubelet源码分析(二)之 NewMainKubelet

    本文个人博客地址:https://www.huweihuang.com/kubernetes-notes/code-analysis/kubelet/NewMainKubelet.html kubel ...

  6. kubelet源码分析(一)之 NewKubeletCommand

    本文个人博客地址:https://www.huweihuang.com/kubernetes-notes/code-analysis/kubelet/NewKubeletCommand.html ku ...

  7. Spring 源码分析(四) ——MVC(二)概述

    随时随地技术实战干货,获取项目源码.学习资料,请关注源代码社区公众号(ydmsq666) from:Spring 源码分析(四) --MVC(二)概述 - 水门-kay的个人页面 - OSCHINA ...

  8. 【转】ABP源码分析四十七:ABP中的异常处理

    ABP 中异常处理的思路是很清晰的.一共五种类型的异常类. AbpInitializationException用于封装ABP初始化过程中出现的异常,只要抛出AbpInitializationExce ...

  9. 【转】ABP源码分析四:Configuration

    核心模块的配置 Configuration是ABP中设计比较巧妙的地方.其通过AbpStartupConfiguration,Castle的依赖注入,Dictionary对象和扩展方法很巧妙的实现了配 ...

  10. 谷歌chrome浏览器的源码分析(四)

    上一次说到需要调用这个OpenURLFromTab函数,那么这个函数是做什么的呢?从名称上可能猜到它是打开网页,但是是从目前TAB页里打开呢?还是新建一个?或者使用每个TAB页一个进程呢?这些疑惑,只 ...

最新文章

  1. RabbitMQ 简介
  2. PLS-00172: string literal too long
  3. 云栖大会|感受万物数字化,体验千行视频化
  4. Linux APT VIM 的一些指令
  5. deepin10安装docker、docker-compose
  6. Mysql-centos下的安装
  7. idea tomcat项目部署失败_阿里云centons安装Jdk和tomcat,mysql以及项目部署(阿里云入门)...
  8. IT业最新风险投资排名
  9. datagrip连接oracle
  10. 帮你躲坑:pip install tensorflow 报错怎么办,import tensorflow 报错怎么办?
  11. CF984C Finite or not?
  12. android usb pos机,USB支持安卓系统NFC刷卡器|RFID刷卡机ACR122U
  13. 自动量程万用表的实现原理_自动量程万用表模块设计方案[图]
  14. AssertionError: Invalid device id
  15. xp证书错误补丁_爱思助手 IPA 签名功能常见问题汇总,安卓软件修复南瓜影视无法播放问题,新增皮皮虾xp版,内置皮皮虾伴侣!...
  16. web(vue) 简单使用 mixpanel 网页埋点
  17. Glide 源码解析之监听生命周期
  18. 【vue】Element Calendar 组件显示农历及节日
  19. 微信小程序预览 word、excel、ppt、pdf 等文件
  20. SpringCloud Alibaba Senta处理分布式事务

热门文章

  1. 澳洲穷人的生活什么样?
  2. 《Sre google运维解密》笔记
  3. 今夜酒店特价:订得早,不如订得好
  4. 苹果笔记本突然开机黑屏
  5. 90.精读《极客公园 2019》
  6. LANP+KEEPALIVED集群(三)
  7. 【量化分析】债券投资分析
  8. 量化新手初识Campisi模型
  9. 对于listview条目item上checkbook点击监听回调更新listview外UI以及对listview外checkbook点击监听itemUI变化
  10. 华为手机最大屏是几英寸的_华为有史以来最大屏幕的手机,屏幕尺寸高达7.12寸,性价比很好!...