

kubelet根据PodSpec起作用。 PodSpec是一个描述Pod的YAML或JSON对象。 kubelet接收通过各种机制(主要是通过apiserver)提供的PodSpec集合,并确保这些PodSpec中描述的容器正在运行且运行状况良好。 Kubelet不管理不是Kubernetes创建的容器。



func main() {runtime.GOMAXPROCS(runtime.NumCPU())   // 开启cpu对应个数的线程运行s := app.NewKubeletServer()                 // 初始化对象s.AddFlags(pflag.CommandLine)util.InitFlags()util.InitLogs()defer util.FlushLogs()verflag.PrintAndExitIfRequested()if err := s.Run(nil); err != nil {                // 运行起来fmt.Fprintf(os.Stderr, "%v\n", err)os.Exit(1)}


// NewKubeletServer will create a new KubeletServer with default values.
func NewKubeletServer() *KubeletServer {return &KubeletServer{Address:                     net.ParseIP(""),   // 本机监听的地址AuthPath:                    util.NewStringFlag("/var/lib/kubelet/kubernetes_auth"), // deprecatedCAdvisorPort:                4194,CertDirectory:               "/var/run/kubernetes",    // 证书路径CgroupRoot:                  "",ConfigureCBR0:               false,ContainerRuntime:            "docker",                                // 运行时的工具CPUCFSQuota:                 false,DockerDaemonContainer:       "/docker-daemon",DockerExecHandlerName:       "native",EnableDebuggingHandlers:     true,EnableServer:                true,FileCheckFrequency:          20 * time.Second,HealthzBindAddress:          net.ParseIP(""),    // 监控检测的地址HealthzPort:                 10248,HostNetworkSources:          kubelet.AllSource,HostPIDSources:              kubelet.AllSource,HostIPCSources:              kubelet.AllSource,HTTPCheckFrequency:          20 * time.Second,ImageGCHighThresholdPercent: 90,ImageGCLowThresholdPercent:  80,KubeConfig:                  util.NewStringFlag("/var/lib/kubelet/kubeconfig"),LowDiskSpaceThresholdMB:     256,MasterServiceNamespace:      api.NamespaceDefault,MaxContainerCount:           100,MaxPerPodContainerCount:     2,MaxOpenFiles:                1000000,MinimumGCAge:                1 * time.Minute,NetworkPluginDir:            "/usr/libexec/kubernetes/kubelet-plugins/net/exec/",  // 网络插件NetworkPluginName:           "",NodeStatusUpdateFrequency:   10 * time.Second,OOMScoreAdj:                 qos.KubeletOomScoreAdj,PodInfraContainerImage:      dockertools.PodInfraContainerImage,Port:                ports.KubeletPort,ReadOnlyPort:        ports.KubeletReadOnlyPort,RegisterNode:        true, // will be ignored if no apiserver is configuredRegistryBurst:       10,ResourceContainer:   "/kubelet",RktPath:             "",RktStage1Image:      "",RootDirectory:       defaultRootDir,SerializeImagePulls: true,SyncFrequency:       10 * time.Second,   // 同步的频率信息SystemContainer:     "",}
}....// Run runs the specified KubeletServer for the given KubeletConfig.  This should never exit.
// The kcfg argument may be nil - if so, it is initialized from the settings on KubeletServer.
// Otherwise, the caller is assumed to have set up the KubeletConfig object and all defaults
// will be ignored.
func (s *KubeletServer) Run(kcfg *KubeletConfig) error {if kcfg == nil {cfg, err := s.KubeletConfig()   // 生成配置信息if err != nil {return err}kcfg = cfgclientConfig, err := s.CreateAPIServerClientConfig()  // 获取连接到AIPServer的配置信息if err == nil {kcfg.KubeClient, err = client.New(clientConfig)     // 生成访问的实例}if err != nil && len(s.APIServerList) > 0 {glog.Warningf("No API client: %v", err)}cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)  // 初始化 云服务提供者if err != nil {return err}glog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)kcfg.Cloud = cloud}if kcfg.CAdvisorInterface == nil {ca, err := cadvisor.New(s.CAdvisorPort)   // 生成监控相关信息if err != nil {return err}kcfg.CAdvisorInterface = ca}util.ReallyCrash = s.ReallyCrashForTestingrand.Seed(time.Now().UTC().UnixNano())credentialprovider.SetPreferredDockercfgPath(s.RootDirectory)glog.V(2).Infof("Using root directory: %v", s.RootDirectory)// TODO(vmarmol): Do this through container config.oomAdjuster := oom.NewOomAdjuster()if err := oomAdjuster.ApplyOomScoreAdj(0, s.OOMScoreAdj); err != nil {glog.Warning(err)}if err := RunKubelet(kcfg, nil); err != nil {  // 运行kubeletreturn err}if s.HealthzPort > 0 {    // 开启监控检测信息healthz.DefaultHealthz()go util.Until(func() {err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress.String(), strconv.Itoa(s.HealthzPort)), nil)if err != nil {glog.Errorf("Starting health server failed: %v", err)}}, 5*time.Second, util.NeverStop)}if s.RunOnce {         // 是否运行一次 如果运行一次则退出return nil}// run foreverselect {}


// RunKubelet is responsible for setting up and running a kubelet.  It is used in three different applications:
//   1 Integration tests
//   2 Kubelet binary
//   3 Standalone 'kubernetes' binary
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kcfg *KubeletConfig, builder KubeletBuilder) error {...if builder == nil {builder = createAndInitKubelet     // 创建函数}if kcfg.OSInterface == nil {kcfg.OSInterface = kubecontainer.RealOS{}}k, podCfg, err := builder(kcfg)               // 生成配置信息if err != nil {return fmt.Errorf("failed to create kubelet: %v", err)}util.ApplyRLimitForSelf(kcfg.MaxOpenFiles)// process pods and exit.if kcfg.Runonce {if _, err := k.RunOnce(podCfg.Updates()); err != nil {return fmt.Errorf("runonce failed: %v", err)}glog.Infof("Started kubelet as runonce")} else {startKubelet(k, podCfg, kcfg)                // 如果不是一次运行 则开始循环运行glog.Infof("Started kubelet")}return nil
}func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfig) {// start the kubeletgo util.Until(func() { k.Run(podCfg.Updates()) }, 0, util.NeverStop)   // 监听pod的更新数据// start the kubelet serverif kc.EnableServer {            // 是否开始server端go util.Until(func() {k.ListenAndServe(kc.Address, kc.Port, kc.TLSOptions, kc.EnableDebuggingHandlers)}, 0, util.NeverStop)}if kc.ReadOnlyPort > 0 {go util.Until(func() {k.ListenAndServeReadOnly(kc.Address, kc.ReadOnlyPort)}, 0, util.NeverStop)}


func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) {// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop// up into "per source" synchronizations// TODO: KubeletConfig.KubeClient should be a client interface, but client interface misses certain methods// used by kubelet. Since NewMainKubelet expects a client interface, we need to make sure we are not passing// a nil pointer to it when what we really want is a nil interface.var kubeClient client.Interfaceif kc.KubeClient != nil {kubeClient = kc.KubeClient}gcPolicy := kubelet.ContainerGCPolicy{MinAge:             kc.MinimumGCAge,MaxPerPodContainer: kc.MaxPerPodContainerCount,MaxContainers:      kc.MaxContainerCount,}                  // 配置gc的频率daemonEndpoints := &api.NodeDaemonEndpoints{KubeletEndpoint: api.DaemonEndpoint{Port: int(kc.Port)},}pc = makePodSourceConfig(kc)               // 生成Pod相关的源文件配置 后续都监听该配置k, err = kubelet.NewMainKubelet(kc.Hostname,kc.NodeName,kc.DockerClient,kubeClient,kc.RootDirectory,kc.PodInfraContainerImage,kc.SyncFrequency,float32(kc.RegistryPullQPS),kc.RegistryBurst,kc.EventRecordQPS,kc.EventBurst,gcPolicy,pc.SeenAllSources,kc.RegisterNode,kc.StandaloneMode,kc.ClusterDomain,kc.ClusterDNS,kc.MasterServiceNamespace,kc.VolumePlugins,kc.NetworkPlugins,kc.NetworkPluginName,kc.StreamingConnectionIdleTimeout,kc.Recorder,kc.CAdvisorInterface,kc.ImageGCPolicy,kc.DiskSpacePolicy,kc.Cloud,kc.NodeStatusUpdateFrequency,kc.ResourceContainer,kc.OSInterface,kc.CgroupRoot,kc.ContainerRuntime,kc.RktPath,kc.RktStage1Image,kc.Mounter,kc.Writer,kc.DockerDaemonContainer,kc.SystemContainer,kc.ConfigureCBR0,kc.PodCIDR,kc.MaxPods,kc.DockerExecHandler,kc.ResolverConfig,kc.CPUCFSQuota,daemonEndpoints,kc.SerializeImagePulls,)if err != nil {return nil, nil, err}k.BirthCry()k.StartGarbageCollection()               // 开启垃圾回收return k, pc, nil


func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {// source of all configurationcfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kc.Recorder)// define file config sourceif kc.ConfigFile != "" {glog.Infof("Adding manifest file: %v", kc.ConfigFile)config.NewSourceFile(kc.ConfigFile, kc.NodeName, kc.FileCheckFrequency, cfg.Channel(kubelet.FileSource))}// define url config sourceif kc.ManifestURL != "" {glog.Infof("Adding manifest url %q with HTTP header %v", kc.ManifestURL, kc.ManifestURLHeader)config.NewSourceURL(kc.ManifestURL, kc.ManifestURLHeader, kc.NodeName, kc.HTTPCheckFrequency, cfg.Channel(kubelet.HTTPSource))}if kc.KubeClient != nil {glog.Infof("Watching apiserver")config.NewSourceApiserver(kc.KubeClient, kc.NodeName, cfg.Channel(kubelet.ApiserverSource))   // 连接APIServer的客户端并监听}return cfg


// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c *client.Client, nodeName string, updates chan<- interface{}) {lw := cache.NewListWatchFromClient(c, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, nodeName))newSourceApiserverFromLW(lw, updates)   // 监听远端Pod变化
}// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {send := func(objs []interface{}) {var pods []*api.Podfor _, o := range objs {pods = append(pods, o.(*api.Pod))}updates <- kubelet.PodUpdate{Pods: pods, Op: kubelet.SET, Source: kubelet.ApiserverSource}       // 将更新发送到updates中}cache.NewReflector(lw, &api.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0).Run()


// New creates a new Kubelet for use in main
func NewMainKubelet(hostname string,nodeName string,dockerClient dockertools.DockerInterface,kubeClient client.Interface,rootDirectory string,podInfraContainerImage string,resyncInterval time.Duration,pullQPS float32,pullBurst int,eventQPS float32,eventBurst int,containerGCPolicy ContainerGCPolicy,sourcesReady SourcesReadyFn,registerNode bool,standaloneMode bool,clusterDomain string,clusterDNS net.IP,masterServiceNamespace string,volumePlugins []volume.VolumePlugin,networkPlugins []network.NetworkPlugin,networkPluginName string,streamingConnectionIdleTimeout time.Duration,recorder record.EventRecorder,cadvisorInterface cadvisor.Interface,imageGCPolicy ImageGCPolicy,diskSpacePolicy DiskSpacePolicy,cloud cloudprovider.Interface,nodeStatusUpdateFrequency time.Duration,resourceContainer string,osInterface kubecontainer.OSInterface,cgroupRoot string,containerRuntime string,rktPath string,rktStage1Image string,mounter mount.Interface,writer kubeio.Writer,dockerDaemonContainer string,systemContainer string,configureCBR0 bool,podCIDR string,pods int,dockerExecHandler dockertools.ExecHandler,resolverConfig string,cpuCFSQuota bool,daemonEndpoints *api.NodeDaemonEndpoints,serializeImagePulls bool,
) (*Kubelet, error) {if rootDirectory == "" {return nil, fmt.Errorf("invalid root directory %q", rootDirectory)}if resyncInterval <= 0 {return nil, fmt.Errorf("invalid sync frequency %d", resyncInterval)}if systemContainer != "" && cgroupRoot == "" {return nil, fmt.Errorf("invalid configuration: system container was specified and cgroup root was not specified")}dockerClient = dockertools.NewInstrumentedDockerInterface(dockerClient)  // 获取操作接口serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)if kubeClient != nil {// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather// than an interface. There is no way to construct a list+watcher using resource name.listWatch := &cache.ListWatch{ListFunc: func() (runtime.Object, error) {return kubeClient.Services(api.NamespaceAll).List(labels.Everything())},WatchFunc: func(resourceVersion string) (watch.Interface, error) {return kubeClient.Services(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion)},}cache.NewReflector(listWatch, &api.Service{}, serviceStore, 0).Run() // 新生成一个监控services的信息变化}serviceLister := &cache.StoreToServiceLister{Store: serviceStore}  // 监控service的变化nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)if kubeClient != nil {// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather// than an interface. There is no way to construct a list+watcher using resource name.fieldSelector := fields.Set{client.ObjectNameField: nodeName}.AsSelector()listWatch := &cache.ListWatch{ListFunc: func() (runtime.Object, error) {return kubeClient.Nodes().List(labels.Everything(), fieldSelector) // 查看所有的节点的信息},WatchFunc: func(resourceVersion string) (watch.Interface, error) {return kubeClient.Nodes().Watch(labels.Everything(), fieldSelector, resourceVersion)},}cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run()}nodeLister := &cache.StoreToNodeLister{Store: nodeStore}  // 监听node的变化情况// TODO: get the real node object of ourself,// and use the real node name and UID.// TODO: what is namespace for node?nodeRef := &api.ObjectReference{Kind:      "Node",Name:      nodeName,UID:       types.UID(nodeName),Namespace: "",}containerGC, err := newContainerGC(dockerClient, containerGCPolicy)  // 生成GC容器if err != nil {return nil, err}imageManager, err := newImageManager(dockerClient, cadvisorInterface, recorder, nodeRef, imageGCPolicy)          // 生成镜像管理if err != nil { return nil, fmt.Errorf("failed to initialize image manager: %v", err)}diskSpaceManager, err := newDiskSpaceManager(cadvisorInterface, diskSpacePolicy) // 生成磁盘管理if err != nil {return nil, fmt.Errorf("failed to initialize disk manager: %v", err)}statusManager := status.NewManager(kubeClient)         // 获取状态管理readinessManager := kubecontainer.NewReadinessManager()   containerRefManager := kubecontainer.NewRefManager() volumeManager := newVolumeManager()                              // 生成容器卷管理oomWatcher := NewOOMWatcher(cadvisorInterface, recorder)klet := &Kubelet{hostname:                       hostname,nodeName:                       nodeName,dockerClient:                   dockerClient,kubeClient:                     kubeClient,rootDirectory:                  rootDirectory,resyncInterval:                 resyncInterval,containerRefManager:            containerRefManager,readinessManager:               readinessManager,httpClient:                     &http.Client{},sourcesReady:                   sourcesReady,registerNode:                   registerNode,standaloneMode:                 standaloneMode,clusterDomain:                  clusterDomain,clusterDNS:                     clusterDNS,serviceLister:                  serviceLister,nodeLister:                     nodeLister,runtimeMutex:                   sync.Mutex{},runtimeUpThreshold:             maxWaitForContainerRuntime,lastTimestampRuntimeUp:         time.Time{},masterServiceNamespace:         masterServiceNamespace,streamingConnectionIdleTimeout: streamingConnectionIdleTimeout,recorder:                       recorder,cadvisor:                       cadvisorInterface,containerGC:                    containerGC,imageManager:                   imageManager,diskSpaceManager:               diskSpaceManager,statusManager:                  statusManager,volumeManager:                  volumeManager,cloud:                          cloud,nodeRef:                        nodeRef,nodeStatusUpdateFrequency:      nodeStatusUpdateFrequency,resourceContainer:              resourceContainer,os:                             osInterface,oomWatcher:                     oomWatcher,cgroupRoot:                     cgroupRoot,mounter:                        mounter,writer:                         writer,configureCBR0:                  configureCBR0,podCIDR:                        podCIDR,pods:                           pods,syncLoopMonitor:                util.AtomicValue{},resolverConfig:                 resolverConfig,cpuCFSQuota:                    cpuCFSQuota,daemonEndpoints:                daemonEndpoints,}if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil {             // 初始化网络插件类return nil, err} else {klet.networkPlugin = plug}machineInfo, err := klet.GetCachedMachineInfo()if err != nil {return nil, err}oomAdjuster := oom.NewOomAdjuster()procFs := procfs.NewProcFs()// Initialize the runtime.switch containerRuntime {                   // 检查使用哪一种运行接口case "docker":// Only supported one for now, continue.klet.containerRuntime = dockertools.NewDockerManager(dockerClient,recorder,readinessManager,containerRefManager,machineInfo,podInfraContainerImage,pullQPS,pullBurst,containerLogsDir,osInterface,klet.networkPlugin,klet,klet.httpClient,dockerExecHandler,oomAdjuster,procFs,klet.cpuCFSQuota,serializeImagePulls,)case "rkt":conf := &rkt.Config{Path:               rktPath,Stage1Image:        rktStage1Image,InsecureSkipVerify: true,}rktRuntime, err := rkt.New(conf,klet,recorder,containerRefManager,readinessManager,klet.volumeManager,serializeImagePulls,)if err != nil {return nil, err}klet.containerRuntime = rktRuntime// No Docker daemon to put in a container.dockerDaemonContainer = ""default:return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime)}// Setup container manager, can fail if the devices hierarchy is not mounted// (it is required by Docker however).containerManager, err := newContainerManager(mounter, cadvisorInterface, dockerDaemonContainer, systemContainer, resourceContainer)  // 生成镜像管理if err != nil {return nil, fmt.Errorf("failed to create the Container Manager: %v", err)}klet.containerManager = containerManagergo util.Until(klet.syncNetworkStatus, 30*time.Second, util.NeverStop) // 同步网络状态if klet.kubeClient != nil {// Start syncing node status immediately, this may set up things the runtime needs to run.go util.Until(klet.syncNodeStatus, klet.nodeStatusUpdateFrequency, util.NeverStop) // 同步Pod状态}// Wait for the runtime to be up with a timeout.if err := waitUntilRuntimeIsUp(klet.containerRuntime, maxWaitForContainerRuntime); err != nil {return nil, fmt.Errorf("timed out waiting for %q to come up: %v", containerRuntime, err)}klet.lastTimestampRuntimeUp = time.Now()klet.runner = klet.containerRuntimeklet.podManager = newBasicPodManager(klet.kubeClient)  // 获取基础的podManagerruntimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)if err != nil {return nil, err}klet.runtimeCache = runtimeCacheklet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder)  // 新生成一个podWrokersmetrics.Register(runtimeCache)if err = klet.setupDataDirs(); err != nil {return nil, err}if err = klet.volumePluginMgr.InitPlugins(volumePlugins, &volumeHost{klet}); err != nil {           // 初始化容器卷插件return nil, err}// If the container logs directory does not exist, create it.if _, err := os.Stat(containerLogsDir); err != nil {if err := osInterface.Mkdir(containerLogsDir, 0755); err != nil {glog.Errorf("Failed to create directory %q: %v", containerLogsDir, err)}}klet.backOff = util.NewBackOff(resyncInterval, maxContainerBackOff)klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity)return klet, nil


// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan PodUpdate) {if kl.logServer == nil {kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))}if kl.kubeClient == nil {glog.Warning("No api server defined - no node status update will be sent.")}// Move Kubelet to a container.if kl.resourceContainer != "" {// Fixme: I need to reside inside ContainerManager interface.err := util.RunInResourceContainer(kl.resourceContainer)if err != nil {glog.Warningf("Failed to move Kubelet to container %q: %v", kl.resourceContainer, err)}glog.Infof("Running in container %q", kl.resourceContainer)}if err := kl.imageManager.Start(); err != nil {      // 开始imageManagerkl.recorder.Eventf(kl.nodeRef, "KubeletSetupFailed", "Failed to start ImageManager %v", err)glog.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err)}if err := kl.cadvisor.Start(); err != nil {          // 开启监控kl.recorder.Eventf(kl.nodeRef, "KubeletSetupFailed", "Failed to start CAdvisor %v", err)glog.Errorf("Failed to start CAdvisor, system may not be properly monitored: %v", err)}if err := kl.containerManager.Start(); err != nil {       // 开启容器管理kl.recorder.Eventf(kl.nodeRef, "KubeletSetupFailed", "Failed to start ContainerManager %v", err)glog.Errorf("Failed to start ContainerManager, system may not be properly isolated: %v", err)}if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {kl.recorder.Eventf(kl.nodeRef, "KubeletSetupFailed", "Failed to start OOM watcher %v", err)glog.Errorf("Failed to start OOM watching: %v", err)}go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop)// Start a goroutine responsible for killing pods (that are not properly// handled by pod workers).go util.Until(kl.podKiller, 1*time.Second, util.NeverStop)  // 循环开启检查是否需要杀掉pod// Run the system oom watcher forever.kl.statusManager.Start()kl.syncLoop(updates, kl)         // 同步所有的更新


// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {glog.Info("Starting kubelet main sync loop.")kl.resyncTicker = time.NewTicker(kl.resyncInterval)var housekeepingTimestamp time.Timefor {if !kl.containerRuntimeUp() {   // 检查是否启动time.Sleep(5 * time.Second)glog.Infof("Skipping pod synchronization, container runtime is not up.")continue}if !kl.doneNetworkConfigure() {        // 检查网络配置time.Sleep(5 * time.Second)glog.Infof("Skipping pod synchronization, network is not configured")continue}// Make sure we sync first to receive the pods from the sources before// performing housekeeping.if !kl.syncLoopIteration(updates, handler) {   // 同步数据break}// We don't want to perform housekeeping too often, so we set a minimum// period for it. Housekeeping would be performed at least once every// kl.resyncInterval, and *no* more than once every// housekeepingMinimumPeriod.// TODO (#13418): Investigate whether we can/should spawn a dedicated// goroutine for housekeepingif !kl.sourcesReady() {// If the sources aren't ready, skip housekeeping, as we may// accidentally delete pods from unready sources.glog.V(4).Infof("Skipping cleanup, sources aren't ready yet.")} else if housekeepingTimestamp.IsZero() {housekeepingTimestamp = time.Now()} else if time.Since(housekeepingTimestamp) > housekeepingMinimumPeriod {glog.V(4).Infof("SyncLoop (housekeeping)")if err := handler.HandlePodCleanups(); err != nil {glog.Errorf("Failed cleaning pods: %v", err)}housekeepingTimestamp = time.Now()}}
}func (kl *Kubelet) syncLoopIteration(updates <-chan PodUpdate, handler SyncHandler) bool {kl.syncLoopMonitor.Store(time.Now())select {case u, open := <-updates:        // 获取更新信息if !open {glog.Errorf("Update channel is closed. Exiting the sync loop.")return false}switch u.Op {case ADD:glog.V(2).Infof("SyncLoop (ADD): %q", kubeletUtil.FormatPodNames(u.Pods))handler.HandlePodAdditions(u.Pods)    // 添加Podcase UPDATE:glog.V(2).Infof("SyncLoop (UPDATE): %q", kubeletUtil.FormatPodNames(u.Pods))handler.HandlePodUpdates(u.Pods)          // 更新Podcase REMOVE:glog.V(2).Infof("SyncLoop (REMOVE): %q", kubeletUtil.FormatPodNames(u.Pods))handler.HandlePodDeletions(u.Pods)        // 删除Podcase SET:// TODO: Do we want to support this?glog.Errorf("Kubelet does not support snapshot update")}case <-kl.resyncTicker.C:// Periodically syncs all the pods and performs cleanup tasks.glog.V(4).Infof("SyncLoop (periodic sync)")handler.HandlePodSyncs(kl.podManager.GetPods())}kl.syncLoopMonitor.Store(time.Now())return true


func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) {start := time.Now()sort.Sort(podsByCreationTime(pods))for _, pod := range pods {kl.podManager.AddPod(pod)          // 通过podManger添加podif isMirrorPod(pod) { kl.handleMirrorPod(pod, start)continue}// Note that allPods includes the new pod since we added at the// beginning of the loop.allPods := kl.podManager.GetPods()// We failed pods that we rejected, so activePods include all admitted// pods that are alive and the new pod.activePods := kl.filterOutTerminatedPods(allPods)      // 获取所有的pods// Check if we can admit the pod; if not, reject it.if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok { // 检查是否正常 否则就拒绝kl.rejectPod(pod, reason, message)continue}mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)kl.dispatchWork(pod, SyncPodCreate, mirrorPod, start) }
}func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) {start := time.Now()for _, pod := range pods {kl.podManager.UpdatePod(pod)              // 通过podManager更新podif isMirrorPod(pod) {kl.handleMirrorPod(pod, start)continue}// TODO: Evaluate if we need to validate and reject updates.mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)kl.dispatchWork(pod, SyncPodUpdate, mirrorPod, start)}
}func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) {start := time.Now()for _, pod := range pods {kl.podManager.DeletePod(pod)            // 通过podManger删除podif isMirrorPod(pod) {kl.handleMirrorPod(pod, start)continue}// Deletion is allowed to fail because the periodic cleanup routine// will trigger deletion again.if err := kl.deletePod(pod.UID); err != nil {   // 删除podglog.V(2).Infof("Failed to delete pod %q, err: %v", kubeletUtil.FormatPodName(pod), err)}}


func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType SyncPodType, mirrorPod *api.Pod, start time.Time) {if kl.podIsTerminated(pod) {   // 如果pod已经停止则返回return}// Run the sync in an async worker.kl.podWorkers.UpdatePod(pod, mirrorPod, func() {metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))})   // 更新pod或者新增// Note the number of containers for new pods.if syncType == SyncPodCreate {metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))}


func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {var minRuntimeCacheTime time.Timefor newWork := range podUpdates {  // 获取更新的信息func() {defer p.checkForUpdates(newWork.pod.UID, newWork.updateCompleteFn)// We would like to have the state of the containers from at least// the moment when we finished the previous processing of that pod.if err := p.runtimeCache.ForceUpdateIfOlder(minRuntimeCacheTime); err != nil {glog.Errorf("Error updating the container runtime cache: %v", err)return}pods, err := p.runtimeCache.GetPods() // 获取podsif err != nil {glog.Errorf("Error getting pods while syncing pod: %v", err)return}err = p.syncPodFn(newWork.pod, newWork.mirrorPod,kubecontainer.Pods(pods).FindPodByID(newWork.pod.UID), newWork.updateType)  // 同步调用podif err != nil {glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)p.recorder.Eventf(newWork.pod, "FailedSync", "Error syncing pod, skipping: %v", err)return}minRuntimeCacheTime = time.Now()newWork.updateCompleteFn()  // 更新网络}()}
}// Apply the new setting to the specified pod. updateComplete is called when the update is completed.
func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func()) {uid := pod.UIDvar podUpdates chan workUpdatevar exists bool// TODO: Pipe this through from the kubelet. Currently kubelets operating with// snapshot updates (PodConfigNotificationSnapshot) will send updates, creates// and deletes as SET operations, which makes updates indistinguishable from// creates. The intent here is to communicate to the pod worker that it can take// certain liberties, like skipping status generation, when it receives a create// event for a pod.updateType := SyncPodUpdatep.podLock.Lock()defer p.podLock.Unlock()if podUpdates, exists = p.podUpdates[uid]; !exists {// We need to have a buffer here, because checkForUpdates() method that// puts an update into channel is called from the same goroutine where// the channel is consumed. However, it is guaranteed that in such case// the channel is empty, so buffer of size 1 is enough.podUpdates = make(chan workUpdate, 1)p.podUpdates[uid] = podUpdates// Creating a new pod worker either means this is a new pod, or that the// kubelet just restarted. In either case the kubelet is willing to believe// the status of the pod for the first pod worker sync. See corresponding// comment in syncPod.updateType = SyncPodCreate   // 如果是新创建go func() {defer util.HandleCrash()p.managePodLoop(podUpdates)  // 调用创建流程}()}if !p.isWorking[pod.UID] {p.isWorking[pod.UID] = true  // 设置标志位等信息podUpdates <- workUpdate{pod:              pod,mirrorPod:        mirrorPod,updateCompleteFn: updateComplete,updateType:       updateType,}  // } else {p.lastUndeliveredWorkUpdate[pod.UID] = workUpdate{pod:              pod,mirrorPod:        mirrorPod,updateCompleteFn: updateComplete,updateType:       updateType,}}


func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType SyncPodType) error {podFullName := kubecontainer.GetPodFullName(pod)  // 获取完整信息uid := pod.UIDstart := time.Now()var firstSeenTime time.Timeif firstSeenTimeStr, ok := pod.Annotations[ConfigFirstSeenAnnotationKey]; !ok {glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)} else {firstSeenTime = kubeletTypes.ConvertToTimestamp(firstSeenTimeStr).Get()}// Before returning, regenerate status and store it in the cache.defer func() {if isStaticPod(pod) && mirrorPod == nil {   // 判断是否是静态的pod// No need to cache the status because the mirror pod does not// exist yet.return}status, err := kl.generatePodStatus(pod)  // 获取pod状态if err != nil {glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err)} else {podToUpdate := podif mirrorPod != nil {podToUpdate = mirrorPod}existingStatus, ok := kl.statusManager.GetPodStatus(podToUpdate.UID)  // 获取状态if !ok || existingStatus.Phase == api.PodPending && status.Phase == api.PodRunning &&!firstSeenTime.IsZero() {metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))}kl.statusManager.SetPodStatus(podToUpdate, status)  // 设置pod状态}}()// Kill pods we can't run.if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil {  // 如果pod不能运行则杀掉if err := kl.killPod(pod, runningPod); err != nil {util.HandleError(err)}return err}// Create Mirror Pod for Static Pod if it doesn't already existif isStaticPod(pod) {if mirrorPod != nil && !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {// The mirror pod is semantically different from the static pod. Remove// it. The mirror pod will get recreated later.glog.Errorf("Deleting mirror pod %q because it is outdated", podFullName)if err := kl.podManager.DeleteMirrorPod(podFullName); err != nil {glog.Errorf("Failed deleting mirror pod %q: %v", podFullName, err)}}if mirrorPod == nil {glog.V(3).Infof("Creating a mirror pod for static pod %q", podFullName)if err := kl.podManager.CreateMirrorPod(pod); err != nil {glog.Errorf("Failed creating a mirror pod %q: %v", podFullName, err)}_, ok := kl.podManager.GetMirrorPodByPod(pod)if !ok {glog.Errorf("Mirror pod not available")}}}if err := kl.makePodDataDirs(pod); err != nil { // 设置pod的运行路径glog.Errorf("Unable to make pod data directories for pod %q (uid %q): %v", podFullName, uid, err)return err}// Starting phase:ref, err := api.GetReference(pod)if err != nil {glog.Errorf("Couldn't make a ref to pod %q: '%v'", podFullName, err)}// Mount volumes.podVolumes, err := kl.mountExternalVolumes(pod)  //挂载数据卷if err != nil {if ref != nil {kl.recorder.Eventf(ref, "FailedMount", "Unable to mount volumes for pod %q: %v", podFullName, err)}glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", podFullName, err)return err}kl.volumeManager.SetVolumes(pod.UID, podVolumes)  //设置卷// The kubelet is the source of truth for pod status. It ignores the status sent from// the apiserver and regenerates status for every pod update, incrementally updating// the status it received at pod creation time.//// The container runtime needs 2 pieces of information from the status to sync a pod:// The terminated state of containers (to restart them) and the podIp (for liveness probes).// New pods don't have either, so we skip the expensive status generation step.//// If we end up here with a create event for an already running pod, it could result in a// restart of its containers. This cannot happen unless the kubelet restarts, because the// delete before the second create would cancel this pod worker.//// If the kubelet restarts, we have a bunch of running containers for which we get create// events. This is ok, because the pod status for these will include the podIp and terminated// status. Any race conditions here effectively boils down to -- the pod worker didn't sync// state of a newly started container with the apiserver before the kubelet restarted, so// it's OK to pretend like the kubelet started them after it restarted.//// Also note that deletes currently have an updateType of `create` set in UpdatePods.// This, again, does not matter because deletes are not processed by this method.var podStatus api.PodStatusif updateType == SyncPodCreate {// This is the first time we are syncing the pod. Record the latency// since kubelet first saw the pod if firstSeenTime is set.if !firstSeenTime.IsZero() {metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))}podStatus = pod.StatuspodStatus.StartTime = &unversioned.Time{Time: start}kl.statusManager.SetPodStatus(pod, podStatus)  // 如果更新则设置状态glog.V(3).Infof("Not generating pod status for new pod %q", podFullName)} else {var err errorpodStatus, err = kl.generatePodStatus(pod)  // 获取当前状态if err != nil {glog.Errorf("Unable to get status for pod %q (uid %q): %v", podFullName, uid, err)return err}}pullSecrets, err := kl.getPullSecretsForPod(pod)if err != nil {glog.Errorf("Unable to get pull secrets for pod %q (uid %q): %v", podFullName, uid, err)return err}err = kl.containerRuntime.SyncPod(pod, runningPod, podStatus, pullSecrets, kl.backOff) //同步pod的状态信息if err != nil {return err}ingress, egress, err := extractBandwidthResources(pod) // 获取绑定的网络信息if err != nil {return err}if egress != nil || ingress != nil {if pod.Spec.HostNetwork {kl.recorder.Event(pod, "HostNetworkNotSupported", "Bandwidth shaping is not currently supported on the host network")} else if kl.shaper != nil {status, found := kl.statusManager.GetPodStatus(pod.UID)if !found {statusPtr, err := kl.containerRuntime.GetPodStatus(pod)if err != nil {glog.Errorf("Error getting pod for bandwidth shaping")return err}status = *statusPtr}if len(status.PodIP) > 0 {err = kl.shaper.ReconcileCIDR(fmt.Sprintf("%s/32", status.PodIP), egress, ingress)}} else {kl.recorder.Event(pod, "NilShaper", "Pod requests bandwidth shaping, but the shaper is undefined")}}return nil


// Sync the running pod to match the specified desired pod.
func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error {start := time.Now()defer func() {metrics.ContainerManagerLatency.WithLabelValues("SyncPod").Observe(metrics.SinceInMicroseconds(start))}()podFullName := kubecontainer.GetPodFullName(pod)containerChanges, err := dm.computePodContainerChanges(pod, runningPod, podStatus) // 更改pod状态if err != nil {return err}glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)if containerChanges.StartInfraContainer || (len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0) {if len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0 {glog.V(4).Infof("Killing Infra Container for %q because all other containers are dead.", podFullName)} else {glog.V(4).Infof("Killing Infra Container for %q, will start new one", podFullName)}// Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container)err = dm.KillPod(pod, runningPod)  //如果正在运行则杀掉if err != nil {return err}} else {// Otherwise kill any containers in this pod which are not specified as ones to keep.for _, container := range runningPod.Containers { // 获取正在运行的要保留_, keep := containerChanges.ContainersToKeep[kubeletTypes.DockerID(container.ID)]if !keep {glog.V(3).Infof("Killing unwanted container %+v", container)// attempt to find the appropriate container policyvar podContainer *api.Containerfor i, c := range pod.Spec.Containers {if c.Name == container.Name {podContainer = &pod.Spec.Containers[i]break}}err = dm.KillContainerInPod(container.ID, podContainer, pod) // 如果不保留的则杀掉if err != nil {glog.Errorf("Error killing container: %v", err)}}}}// If we should create infra container then we do it first.podInfraContainerID := containerChanges.InfraContainerId  // 获取运行的基础容器if containerChanges.StartInfraContainer && (len(containerChanges.ContainersToStart) > 0) {glog.V(4).Infof("Creating pod infra container for %q", podFullName)podInfraContainerID, err = dm.createPodInfraContainer(pod) // 创建Pod// Call the networking pluginif err == nil {err = dm.networkPlugin.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID)  // 设置网络}if err != nil {glog.Errorf("Failed to create pod infra container: %v; Skipping pod %q", err, podFullName)return err}// Setup the host interface (FIXME: move to networkPlugin when ready)podInfraContainer, err := dm.client.InspectContainer(string(podInfraContainerID)) // 进入容器if err != nil {glog.Errorf("Failed to inspect pod infra container: %v; Skipping pod %q", err, podFullName)return err}if err = hairpin.SetUpContainer(podInfraContainer.State.Pid, "eth0"); err != nil {glog.Warningf("Hairpin setup failed for pod %q: %v", podFullName, err) // 设置网络}// Find the pod IP after starting the infra container in order to expose// it safely via the downward API without a race and be able to use podIP in kubelet-managed /etc/hosts file.pod.Status.PodIP = dm.determineContainerIP(pod.Name, pod.Namespace, podInfraContainer)  // 获取网络IP}// Start everythingfor idx := range containerChanges.ContainersToStart {container := &pod.Spec.Containers[idx]// containerChanges.StartInfraContainer causes the containers to be restarted for config reasons// ignore backoffif !containerChanges.StartInfraContainer && dm.doBackOff(pod, container, podStatus, backOff) {glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, podFullName)continue}glog.V(4).Infof("Creating container %+v in pod %v", container, podFullName)err := dm.imagePuller.PullImage(pod, container, pullSecrets) // 拉取镜像dm.updateReasonCache(pod, container, "PullImageError", err) // 更新if err != nil {glog.Warningf("Failed to pull image %q from pod %q and container %q: %v", container.Image, kubecontainer.GetPodFullName(pod), container.Name, err)continue}if container.SecurityContext != nil && container.SecurityContext.RunAsNonRoot {err := dm.verifyNonRoot(container)dm.updateReasonCache(pod, container, "VerifyNonRootError", err)if err != nil {glog.Errorf("Error running pod %q container %q: %v", kubecontainer.GetPodFullName(pod), container.Name, err)continue}}// TODO(dawnchen): Check RestartPolicy.DelaySeconds before restart a container// Note: when configuring the pod's containers anything that can be configured by pointing// to the namespace of the infra container should use namespaceMode.  This includes things like the net namespace// and IPC namespace.  PID mode cannot point to another container right now.// See createPodInfraContainer for infra container setup.namespaceMode := fmt.Sprintf("container:%v", podInfraContainerID)_, err = dm.runContainerInPod(pod, container, namespaceMode, namespaceMode, getPidMode(pod))  // 运行容器dm.updateReasonCache(pod, container, "RunContainerError", err)if err != nil {// TODO(bburns) : Perhaps blacklist a container after N failures?glog.Errorf("Error running pod %q container %q: %v", kubecontainer.GetPodFullName(pod), container.Name, err)continue}// Successfully started the container; clear the entry in the failure// reason cache.dm.clearReasonCache(pod, container)}return nil





  1. k8s概念入门之apiserver-针对1.1.版本阅读

    apiserver k8s中最重要的一个通信节点就是apiserver,是一个中心节点连接着每一环,是kubelet,kube-proxy和control-manager的交互的中心点,提供基于API ...

  2. k8s概念入门之control-manager-针对1.1.版本阅读

    control-manager 资源控制器主要是为了控制各种资源的变更信息,例如pod的创建新增,副本控制器和账户控制器等信息,资源控制器的主要职责就是通过list-watch机制,从APIServe ...

  3. k8s概念入门之kube-proxy-针对1.1版本阅读

    背景 在后续阅读k8s0.4版本的过程中,发现文档上描述的确实是一个不完整的版本,故切换版本到1.1,因为在1.1文档中已经标明了可以在生成环境中使用,故重新再学习一下有关kube-proxy的内容, ...

  4. k8s概念入门之kube-proxy-针对早期(0.4)版本阅读

    k8s的kube-proxy分析 Kube-proxy主要是伴随着kubtlet进程一起部署在每个node节点中,proxy的功能主要就是为了完成在k8s集群中实现集群内部的通信,也可完成集群外的数据 ...

  5. Kubernetes ~ k8s 从入门到入坑。

    Kubernetes ~ k8s 从入门到入坑. 文章目录 Kubernetes ~ k8s 从入门到入坑. 1. Kubernetes 介绍. 1.1 应用部署方式演变. 1.2 kubernete ...

  6. Kubernetes(k8s)入门及集群部署文档

    文章目录 一.k8s 快速入门 简介 部署方式的进化 k8s能做什么 架构 整体主从方式 Master 节点架构 Node 节点架构 概念 快速体验 流程叙述 二.k8s 集群安装 前置要求 了解ku ...

  7. 【机器学习】机器学习和深度学习概念入门

    机器学习和深度学习概念入门(上) 作者:谭东  来源:机器学习算法与自然语言处理 目  录 1   人工智能.机器学习.深度学习三者关系 2   什么是人工智能 3  什么是机器学习 4  机器学习之 ...

  8. 19年8月 字母哥 第一章 spring boot 2.x基础及概念入门 这里全部看完了 热部署没出来 第二章在前面2页 用热点公司网不行

    http://springboot.zimug.com/1233100   文档 http://www.zimug.com/page/5     字母哥个人博客 11111 第一章 spring bo ...

  9. k8s dashboard_【大强哥-k8s从入门到放弃02】Kubernetes1.17部署Dashboard2.0

    号外号外,后面所有提升视频都会更新到知乎和B站上去,不会直接发群里了,哈哈,能看懂这句话的我都认识,大家可以先关注一下,我知乎上的所有文档也会录成视频 更多视频详见 杨哥天云:https://spac ...


  1. JQuery播放器代理--IE下支持wma格式
  2. 杭电2112HDU Today(map 最短路径)
  3. Linux automake命令
  4. paip.python错误解决 0x64024e96 指令引用的 0x00000135 内存。该内存不能为 read。
  5. 一文读懂全球CTRM市场的前世今生
  6. matlab vl_feat,matlab 安装 vl_feat
  7. Android开发之科大讯飞语音合成与播报
  8. 读《京东咚咚架构演进》有感
  9. 小熊的果篮 2021 CSP J2
  10. 剑英陪你玩转图形学(五)focus
  11. esc pos命令 java使用_18、ESC/POS指令集在android设备上使用实例(通过socket)
  12. 磁盘,分区,文件系统
  13. 秀技能:倒立及其他没用的
  14. C++ typename详解
  15. Flink【优质】面试
  16. 如何理解GPU中的SIMT(单指令流多线程模型)
  17. web前端网页制作课作业——用DIV+CSS技术设计的家乡旅游主题网站
  18. openflow交换机 ryu_在RYU中实现交换机的功能
  19. 用C++实现复数的四则运算
  20. 金龙鱼:营收增长,利润难求


  1. 使用卷积神经网络预防疲劳驾驶事故
  2. 原来Python用得好,工作这么好找
  3. 第十三届光华工程科技奖名单揭晓!这40位专家和1个团体获奖
  4. Python实现五子棋人机对战 | CSDN博文精选
  5. 关于知识蒸馏,这三篇论文详解不可错过
  6. ImageNet时代将终结?何恺明新作:Rethinking ImageNet Pre-training
  7. 资源 | 这是你要的Keras官方中文版(附文档链接)
  8. 同一份数据,Redis为什么要存两次?
  9. 干掉visio,这个画图神器真的绝了!!!
  10. 扔掉okhttp、httpClient,来试试这款轻量级HTTP客户端神器?