kubelet源码分析(二)之 NewMainKubelet
本文个人博客地址:https://www.huweihuang.com/kubernetes-notes/code-analysis/kubelet/NewMainKubelet.html
kubelet源码分析(二)之 NewMainKubelet
以下代码分析基于
kubernetes v1.12.0
版本。本文主要分析 https://github.com/kubernetes/kubernetes/tree/v1.12.0/pkg/kubelet 部分的代码。
本文主要分析kubelet中的NewMainKubelet
部分。
1. NewMainKubelet
// NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
// No initialization of Kubelet and its modules should happen here.
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,kubeDeps *Dependencies,crOptions *config.ContainerRuntimeOptions,containerRuntime string,runtimeCgroups string,hostnameOverride string,nodeIP string,providerID string,cloudProvider string,certDirectory string,rootDirectory string,registerNode bool,registerWithTaints []api.Taint,allowedUnsafeSysctls []string,remoteRuntimeEndpoint string,remoteImageEndpoint string,experimentalMounterPath string,experimentalKernelMemcgNotification bool,experimentalCheckNodeCapabilitiesBeforeMount bool,experimentalNodeAllocatableIgnoreEvictionThreshold bool,minimumGCAge metav1.Duration,maxPerPodContainerCount int32,maxContainerCount int32,masterServiceNamespace string,registerSchedulable bool,nonMasqueradeCIDR string,keepTerminatedPodVolumes bool,nodeLabels map[string]string,seccompProfileRoot string,bootstrapCheckpointPath string,nodeStatusMaxImages int32) (*Kubelet, error) {...
}
1.1. PodConfig
通过makePodSourceConfig
生成Pod config。
if kubeDeps.PodConfig == nil {var err errorkubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)if err != nil {return nil, err}
}
1.1.1. makePodSourceConfig
// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {...// source of all configurationcfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)// define file config sourceif kubeCfg.StaticPodPath != "" {glog.Infof("Adding pod path: %v", kubeCfg.StaticPodPath)config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))}// define url config sourceif kubeCfg.StaticPodURL != "" {glog.Infof("Adding pod url %q with HTTP header %v", kubeCfg.StaticPodURL, manifestURLHeader)config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))}// Restore from the checkpoint path// NOTE: This MUST happen before creating the apiserver source// below, or the checkpoint would override the source of truth....if kubeDeps.KubeClient != nil {glog.Infof("Watching apiserver")if updatechannel == nil {updatechannel = cfg.Channel(kubetypes.ApiserverSource)}config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)}return cfg, nil
}
1.1.2. NewPodConfig
// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {updates := make(chan kubetypes.PodUpdate, 50)storage := newPodStorage(updates, mode, recorder)podConfig := &PodConfig{pods: storage,mux: config.NewMux(storage),updates: updates,sources: sets.String{},}return podConfig
}
1.1.3. NewSourceApiserver
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))newSourceApiserverFromLW(lw, updates)
}
1.2. Lister
serviceLister
和nodeLister
分别通过List-Watch
机制监听service
和node
的列表变化。
1.2.1. serviceLister
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
if kubeDeps.KubeClient != nil {serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0)go r.Run(wait.NeverStop)
}
serviceLister := corelisters.NewServiceLister(serviceIndexer)
1.2.2. nodeLister
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
if kubeDeps.KubeClient != nil {fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)go r.Run(wait.NeverStop)
}
nodeInfo := &predicates.CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}
1.3. 各种Manager
1.3.1. containerRefManager
containerRefManager := kubecontainer.NewRefManager()
1.3.2. oomWatcher
oomWatcher := NewOOMWatcher(kubeDeps.CAdvisorInterface, kubeDeps.Recorder)
1.3.3. dnsConfigurer
clusterDNS := make([]net.IP, 0, len(kubeCfg.ClusterDNS))
for _, ipEntry := range kubeCfg.ClusterDNS {ip := net.ParseIP(ipEntry)if ip == nil {glog.Warningf("Invalid clusterDNS ip '%q'", ipEntry)} else {clusterDNS = append(clusterDNS, ip)}
}
...dns.NewConfigurer(kubeDeps.Recorder, nodeRef, parsedNodeIP, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig),
1.3.4. secretManager & configMapManager
var secretManager secret.Manager
var configMapManager configmap.Manager
switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {case kubeletconfiginternal.WatchChangeDetectionStrategy:secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient)configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient)
case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:secretManager = secret.NewCachingSecretManager(kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))configMapManager = configmap.NewCachingConfigMapManager(kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
case kubeletconfiginternal.GetChangeDetectionStrategy:secretManager = secret.NewSimpleSecretManager(kubeDeps.KubeClient)configMapManager = configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient)
default:return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)
}klet.secretManager = secretManager
klet.configMapManager = configMapManager
1.3.5. livenessManager
klet.livenessManager = proberesults.NewManager()
1.3.6. podManager
// podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager, checkpointManager)
1.3.7. resourceAnalyzer
klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration)
1.3.8. containerGC
// setup containerGC
containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady)
if err != nil {return nil, err
}
klet.containerGC = containerGC
klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
1.3.9. imageManager
// setup imageManager
imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage)
if err != nil {return nil, fmt.Errorf("failed to initialize image manager: %v", err)
}
klet.imageManager = imageManager
1.3.10. statusManager
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
1.3.11. probeManager
klet.probeManager = prober.NewManager(klet.statusManager,klet.livenessManager,klet.runner,containerRefManager,kubeDeps.Recorder)
1.3.12. tokenManager
tokenManager := token.NewManager(kubeDeps.KubeClient)
1.3.13. volumePluginMgr
klet.volumePluginMgr, err =NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
if err != nil {return nil, err
}
if klet.enablePluginsWatcher {klet.pluginWatcher = pluginwatcher.NewWatcher(klet.getPluginsDir())
}
1.3.14. volumeManager
// setup volumeManager
klet.volumeManager = volumemanager.NewVolumeManager(kubeCfg.EnableControllerAttachDetach,nodeName,klet.podManager,klet.statusManager,klet.kubeClient,klet.volumePluginMgr,klet.containerRuntime,kubeDeps.Mounter,klet.getPodsDir(),kubeDeps.Recorder,experimentalCheckNodeCapabilitiesBeforeMount,keepTerminatedPodVolumes)
1.3.15. evictionManager
// setup eviction manager
evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)klet.evictionManager = evictionManager
klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
1.4. containerRuntime
目前pod所使用的runtime
只有docker
和remote
两种,rkt
已经废弃。
if containerRuntime == "rkt" {glog.Fatalln("rktnetes has been deprecated in favor of rktlet. Please see https://github.com/kubernetes-incubator/rktlet for more information.")
}
当runtime
是docker
的时候,会执行docker
相关操作。
switch containerRuntime {case kubetypes.DockerContainerRuntime:// Create and start the CRI shim running as a grpc server....// The unix socket for kubelet <-> dockershim communication....// Create dockerLegacyService when the logging driver is not supported....case kubetypes.RemoteContainerRuntime:// No-op.breakdefault:return nil, fmt.Errorf("unsupported CRI runtime: %q", containerRuntime)}
1.4.1. NewDockerService
// Create and start the CRI shim running as a grpc server.
streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions)
ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,&pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, !crOptions.RedirectContainerStreaming)
if err != nil {return nil, err
}
if crOptions.RedirectContainerStreaming {klet.criHandler = ds
}
1.4.2. NewDockerServer
// The unix socket for kubelet <-> dockershim communication.
glog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q",remoteRuntimeEndpoint,remoteImageEndpoint)
glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
if err := server.Start(); err != nil {return nil, err
}
1.4.3. DockerServer.Start
// Start starts the dockershim grpc server.
func (s *DockerServer) Start() error {// Start the internal service.if err := s.service.Start(); err != nil {glog.Errorf("Unable to start docker service")return err}glog.V(2).Infof("Start dockershim grpc server")l, err := util.CreateListener(s.endpoint)if err != nil {return fmt.Errorf("failed to listen on %q: %v", s.endpoint, err)}// Create the grpc server and register runtime and image services.s.server = grpc.NewServer(grpc.MaxRecvMsgSize(maxMsgSize),grpc.MaxSendMsgSize(maxMsgSize),)runtimeapi.RegisterRuntimeServiceServer(s.server, s.service)runtimeapi.RegisterImageServiceServer(s.server, s.service)go func() {if err := s.server.Serve(l); err != nil {glog.Fatalf("Failed to serve connections: %v", err)}}()return nil
}
1.5. podWorker
构造podWorkers
和workQueue
。
klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
1.5.1. PodWorkers接口
// PodWorkers is an abstract interface for testability.
type PodWorkers interface {UpdatePod(options *UpdatePodOptions)ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)ForgetWorker(uid types.UID)
}
podWorker
主要用来对pod相应事件进行处理和同步,包含以下三个方法:UpdatePod
、ForgetNonExistingPodWorkers
、ForgetWorker
。
2. 总结
NewMainKubelet
主要用来构造kubelet
结构体,其中kubelet除了包含必要的配置和client(例如:kubeClient、csiClient等)外,最主要的包含各种manager来管理不同的任务。核心的manager有以下几种:
oomWatcher
:监控pod内存是否发生OOM。podManager
:管理pod的生命周期,包括对pod的增删改查操作等。containerGC
:对死亡容器进行垃圾回收。imageManager
:对容器镜像进行垃圾回收。statusManager
:与apiserver同步pod状态,同时也作状态缓存。volumeManager
:对pod的volume进行attached/detached/mounted/unmounted
操作。evictionManager
:保证节点稳定,必要时对pod进行驱逐(例如资源不足的情况下)。
NewMainKubelet
还包含了serviceLister
和nodeLister
来监听service
和node
的列表变化。kubelet使用到的
containerRuntime
目前主要是docker
,其中rkt
已废弃。NewMainKubelet
启动了dockershim grpc server
来执行docker相关操作。构建了
podWorker
来对pod相关的更新逻辑进行处理。
参考文章:
- https://github.com/kubernetes/kubernetes/tree/v1.12.0
- https://github.com/kubernetes/kubernetes/tree/v1.12.0/pkg/kubelet
kubelet源码分析(二)之 NewMainKubelet相关推荐
- kubelet源码分析(一)之 NewKubeletCommand
本文个人博客地址:https://www.huweihuang.com/kubernetes-notes/code-analysis/kubelet/NewKubeletCommand.html ku ...
- kubelet源码分析(三)之 startKubelet
本文个人博客地址:https://www.huweihuang.com/kubernetes-notes/code-analysis/kubelet/startKubelet.html kubelet ...
- 【Android 事件分发】ItemTouchHelper 源码分析 ( OnItemTouchListener 事件监听器源码分析 二 )
Android 事件分发 系列文章目录 [Android 事件分发]事件分发源码分析 ( 驱动层通过中断传递事件 | WindowManagerService 向 View 层传递事件 ) [Andr ...
- SpringBoot源码分析(二)之自动装配demo
SpringBoot源码分析(二)之自动装配demo 文章目录 SpringBoot源码分析(二)之自动装配demo 前言 一.创建RedissonTemplate的Maven服务 二.创建测试服务 ...
- gSOAP 源码分析(二)
gSOAP 源码分析(二) 2012-5-24 flyfish 一 gSOAP XML介绍 Xml的全称是EXtensible Markup Language.可扩展标记语言.仅仅是一个纯文本.适合用 ...
- Android Q 10.1 KeyMaster源码分析(二) - 各家方案的实现
写在之前 这两篇文章是我2021年3月初看KeyMaster的笔记,本来打算等分析完KeyMaster和KeyStore以后再一起做成一系列贴出来,后来KeyStore的分析中断了,这一系列的文章就变 ...
- 【投屏】Scrcpy源码分析二(Client篇-连接阶段)
Scrcpy源码分析系列 [投屏]Scrcpy源码分析一(编译篇) [投屏]Scrcpy源码分析二(Client篇-连接阶段) [投屏]Scrcpy源码分析三(Client篇-投屏阶段) [投屏]Sc ...
- Nouveau源码分析(二):Nouveau结构体的基本框架
Nouveau源码分析(二) 在讨论Nouveau对Nvidia设备的初始化前,我准备先说一下Nouveau结构体的基本框架 Nouveau的很多结构体都可以看作是C++中的类,之间有很多相似的东西, ...
- ENS最新合约源码分析二
ENS(以太坊域名服务)智能合约源码分析二 0.简介 本次分享直接使用线上实际注册流程来分析最新注册以太坊域名的相关代码.本次主要分析最新的关于普通域名注册合约和普通域名迁移合约,短域名竞拍合约不 ...
最新文章
- Servlet的基本架构
- Android开发工具之Android Studio---版本控制SVN使用三(常规操作)
- idea样式报错_来自强迫症患者的IDEA设置
- JAVA程序员面试必知32个知识点
- php-7.2.13的安装,Centos7.2编译安装php-7.0.13
- 【leetcode】Min Stack -- python版
- svn 同步备份的所有问题,亲测可用
- 力扣225. 用队列实现栈(JavaScript)
- ORACLE完整数据库实例迁移
- VS2010与.NET4系列 6.ASP.NET,HTML,JavaScript片断支持
- 《Word 排版艺术》一书的人到此交流
- 服务器机械硬盘坏了怎么修复,硬盘修复软件:如何修复硬盘错误?
- 微pe添加网络组件_(已解决)干掉peset后如何启动PE的网络组件?
- cobalt strik启动
- linux服务器console口,Linux重定向console口控制台
- mac写python用什么软件_Mac安装软件,一条指令就搞定
- 自动关闭MessageBox
- GHT(广义霍夫曼变换)
- crh寄存器_STM32的CRH、CRL、ODR和IDR寄存器的使用总结
- ERP中的“蝴蝶效应”:重视过程的控制