k8s概念入门之kubelet-针对1.1.版本阅读
kubelet
kubelet是在每个节点上运行的主要“节点代理”。它可以使用以下之一向apiserver注册该节点:主机名;用于覆盖主机名的标志;或云提供商的特定逻辑。
kubelet根据PodSpec起作用。 PodSpec是一个描述Pod的YAML或JSON对象。 kubelet接收通过各种机制(主要是通过apiserver)提供的PodSpec集合,并确保这些PodSpec中描述的容器正在运行且运行状况良好。 Kubelet不管理不是Kubernetes创建的容器。
kubelet流程
kubelet核心的功能点就是保住每个pod的状态并且运行正常,能够根据apiserver的状态来同步当前的pod的状态,首先查看一下整个流程的概述。
主流程开启
func main() {runtime.GOMAXPROCS(runtime.NumCPU()) // 开启cpu对应个数的线程运行s := app.NewKubeletServer() // 初始化对象s.AddFlags(pflag.CommandLine)util.InitFlags()util.InitLogs()defer util.FlushLogs()verflag.PrintAndExitIfRequested()if err := s.Run(nil); err != nil { // 运行起来fmt.Fprintf(os.Stderr, "%v\n", err)os.Exit(1)}
}
整体的开始代码比较简洁明了,先是初始化,然后直接运行。
// NewKubeletServer will create a new KubeletServer with default values.
func NewKubeletServer() *KubeletServer {return &KubeletServer{Address: net.ParseIP("0.0.0.0"), // 本机监听的地址AuthPath: util.NewStringFlag("/var/lib/kubelet/kubernetes_auth"), // deprecatedCAdvisorPort: 4194,CertDirectory: "/var/run/kubernetes", // 证书路径CgroupRoot: "",ConfigureCBR0: false,ContainerRuntime: "docker", // 运行时的工具CPUCFSQuota: false,DockerDaemonContainer: "/docker-daemon",DockerExecHandlerName: "native",EnableDebuggingHandlers: true,EnableServer: true,FileCheckFrequency: 20 * time.Second,HealthzBindAddress: net.ParseIP("127.0.0.1"), // 监控检测的地址HealthzPort: 10248,HostNetworkSources: kubelet.AllSource,HostPIDSources: kubelet.AllSource,HostIPCSources: kubelet.AllSource,HTTPCheckFrequency: 20 * time.Second,ImageGCHighThresholdPercent: 90,ImageGCLowThresholdPercent: 80,KubeConfig: util.NewStringFlag("/var/lib/kubelet/kubeconfig"),LowDiskSpaceThresholdMB: 256,MasterServiceNamespace: api.NamespaceDefault,MaxContainerCount: 100,MaxPerPodContainerCount: 2,MaxOpenFiles: 1000000,MinimumGCAge: 1 * time.Minute,NetworkPluginDir: "/usr/libexec/kubernetes/kubelet-plugins/net/exec/", // 网络插件NetworkPluginName: "",NodeStatusUpdateFrequency: 10 * time.Second,OOMScoreAdj: qos.KubeletOomScoreAdj,PodInfraContainerImage: dockertools.PodInfraContainerImage,Port: ports.KubeletPort,ReadOnlyPort: ports.KubeletReadOnlyPort,RegisterNode: true, // will be ignored if no apiserver is configuredRegistryBurst: 10,ResourceContainer: "/kubelet",RktPath: "",RktStage1Image: "",RootDirectory: defaultRootDir,SerializeImagePulls: true,SyncFrequency: 10 * time.Second, // 同步的频率信息SystemContainer: "",}
}....// Run runs the specified KubeletServer for the given KubeletConfig. This should never exit.
// The kcfg argument may be nil - if so, it is initialized from the settings on KubeletServer.
// Otherwise, the caller is assumed to have set up the KubeletConfig object and all defaults
// will be ignored.
func (s *KubeletServer) Run(kcfg *KubeletConfig) error {if kcfg == nil {cfg, err := s.KubeletConfig() // 生成配置信息if err != nil {return err}kcfg = cfgclientConfig, err := s.CreateAPIServerClientConfig() // 获取连接到AIPServer的配置信息if err == nil {kcfg.KubeClient, err = client.New(clientConfig) // 生成访问的实例}if err != nil && len(s.APIServerList) > 0 {glog.Warningf("No API client: %v", err)}cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) // 初始化 云服务提供者if err != nil {return err}glog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)kcfg.Cloud = cloud}if kcfg.CAdvisorInterface == nil {ca, err := cadvisor.New(s.CAdvisorPort) // 生成监控相关信息if err != nil {return err}kcfg.CAdvisorInterface = ca}util.ReallyCrash = s.ReallyCrashForTestingrand.Seed(time.Now().UTC().UnixNano())credentialprovider.SetPreferredDockercfgPath(s.RootDirectory)glog.V(2).Infof("Using root directory: %v", s.RootDirectory)// TODO(vmarmol): Do this through container config.oomAdjuster := oom.NewOomAdjuster()if err := oomAdjuster.ApplyOomScoreAdj(0, s.OOMScoreAdj); err != nil {glog.Warning(err)}if err := RunKubelet(kcfg, nil); err != nil { // 运行kubeletreturn err}if s.HealthzPort > 0 { // 开启监控检测信息healthz.DefaultHealthz()go util.Until(func() {err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress.String(), strconv.Itoa(s.HealthzPort)), nil)if err != nil {glog.Errorf("Starting health server failed: %v", err)}}, 5*time.Second, util.NeverStop)}if s.RunOnce { // 是否运行一次 如果运行一次则退出return nil}// run foreverselect {}
}
从流程上看就是通过解析配置文件来,启动访问APIServer的客户端,启动监控信息等。
// RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications:
// 1 Integration tests
// 2 Kubelet binary
// 3 Standalone 'kubernetes' binary
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kcfg *KubeletConfig, builder KubeletBuilder) error {...if builder == nil {builder = createAndInitKubelet // 创建函数}if kcfg.OSInterface == nil {kcfg.OSInterface = kubecontainer.RealOS{}}k, podCfg, err := builder(kcfg) // 生成配置信息if err != nil {return fmt.Errorf("failed to create kubelet: %v", err)}util.ApplyRLimitForSelf(kcfg.MaxOpenFiles)// process pods and exit.if kcfg.Runonce {if _, err := k.RunOnce(podCfg.Updates()); err != nil {return fmt.Errorf("runonce failed: %v", err)}glog.Infof("Started kubelet as runonce")} else {startKubelet(k, podCfg, kcfg) // 如果不是一次运行 则开始循环运行glog.Infof("Started kubelet")}return nil
}func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfig) {// start the kubeletgo util.Until(func() { k.Run(podCfg.Updates()) }, 0, util.NeverStop) // 监听pod的更新数据// start the kubelet serverif kc.EnableServer { // 是否开始server端go util.Until(func() {k.ListenAndServe(kc.Address, kc.Port, kc.TLSOptions, kc.EnableDebuggingHandlers)}, 0, util.NeverStop)}if kc.ReadOnlyPort > 0 {go util.Until(func() {k.ListenAndServeReadOnly(kc.Address, kc.ReadOnlyPort)}, 0, util.NeverStop)}
}
初始化过程主要就是通过createAndInitKubelet来初始化。
func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) {// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop// up into "per source" synchronizations// TODO: KubeletConfig.KubeClient should be a client interface, but client interface misses certain methods// used by kubelet. Since NewMainKubelet expects a client interface, we need to make sure we are not passing// a nil pointer to it when what we really want is a nil interface.var kubeClient client.Interfaceif kc.KubeClient != nil {kubeClient = kc.KubeClient}gcPolicy := kubelet.ContainerGCPolicy{MinAge: kc.MinimumGCAge,MaxPerPodContainer: kc.MaxPerPodContainerCount,MaxContainers: kc.MaxContainerCount,} // 配置gc的频率daemonEndpoints := &api.NodeDaemonEndpoints{KubeletEndpoint: api.DaemonEndpoint{Port: int(kc.Port)},}pc = makePodSourceConfig(kc) // 生成Pod相关的源文件配置 后续都监听该配置k, err = kubelet.NewMainKubelet(kc.Hostname,kc.NodeName,kc.DockerClient,kubeClient,kc.RootDirectory,kc.PodInfraContainerImage,kc.SyncFrequency,float32(kc.RegistryPullQPS),kc.RegistryBurst,kc.EventRecordQPS,kc.EventBurst,gcPolicy,pc.SeenAllSources,kc.RegisterNode,kc.StandaloneMode,kc.ClusterDomain,kc.ClusterDNS,kc.MasterServiceNamespace,kc.VolumePlugins,kc.NetworkPlugins,kc.NetworkPluginName,kc.StreamingConnectionIdleTimeout,kc.Recorder,kc.CAdvisorInterface,kc.ImageGCPolicy,kc.DiskSpacePolicy,kc.Cloud,kc.NodeStatusUpdateFrequency,kc.ResourceContainer,kc.OSInterface,kc.CgroupRoot,kc.ContainerRuntime,kc.RktPath,kc.RktStage1Image,kc.Mounter,kc.Writer,kc.DockerDaemonContainer,kc.SystemContainer,kc.ConfigureCBR0,kc.PodCIDR,kc.MaxPods,kc.DockerExecHandler,kc.ResolverConfig,kc.CPUCFSQuota,daemonEndpoints,kc.SerializeImagePulls,)if err != nil {return nil, nil, err}k.BirthCry()k.StartGarbageCollection() // 开启垃圾回收return k, pc, nil
}
在Pod相关操作中就如下。
func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {// source of all configurationcfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kc.Recorder)// define file config sourceif kc.ConfigFile != "" {glog.Infof("Adding manifest file: %v", kc.ConfigFile)config.NewSourceFile(kc.ConfigFile, kc.NodeName, kc.FileCheckFrequency, cfg.Channel(kubelet.FileSource))}// define url config sourceif kc.ManifestURL != "" {glog.Infof("Adding manifest url %q with HTTP header %v", kc.ManifestURL, kc.ManifestURLHeader)config.NewSourceURL(kc.ManifestURL, kc.ManifestURLHeader, kc.NodeName, kc.HTTPCheckFrequency, cfg.Channel(kubelet.HTTPSource))}if kc.KubeClient != nil {glog.Infof("Watching apiserver")config.NewSourceApiserver(kc.KubeClient, kc.NodeName, cfg.Channel(kubelet.ApiserverSource)) // 连接APIServer的客户端并监听}return cfg
}
通过client来监听APIServer对应的Pod的信息的变更。
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c *client.Client, nodeName string, updates chan<- interface{}) {lw := cache.NewListWatchFromClient(c, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, nodeName))newSourceApiserverFromLW(lw, updates) // 监听远端Pod变化
}// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {send := func(objs []interface{}) {var pods []*api.Podfor _, o := range objs {pods = append(pods, o.(*api.Pod))}updates <- kubelet.PodUpdate{Pods: pods, Op: kubelet.SET, Source: kubelet.ApiserverSource} // 将更新发送到updates中}cache.NewReflector(lw, &api.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0).Run()
}
至此监听Pod的流程完成了,通过updates来进行数据传输。接下来就是查看MainKubelet来生成新的实例。
// New creates a new Kubelet for use in main
func NewMainKubelet(hostname string,nodeName string,dockerClient dockertools.DockerInterface,kubeClient client.Interface,rootDirectory string,podInfraContainerImage string,resyncInterval time.Duration,pullQPS float32,pullBurst int,eventQPS float32,eventBurst int,containerGCPolicy ContainerGCPolicy,sourcesReady SourcesReadyFn,registerNode bool,standaloneMode bool,clusterDomain string,clusterDNS net.IP,masterServiceNamespace string,volumePlugins []volume.VolumePlugin,networkPlugins []network.NetworkPlugin,networkPluginName string,streamingConnectionIdleTimeout time.Duration,recorder record.EventRecorder,cadvisorInterface cadvisor.Interface,imageGCPolicy ImageGCPolicy,diskSpacePolicy DiskSpacePolicy,cloud cloudprovider.Interface,nodeStatusUpdateFrequency time.Duration,resourceContainer string,osInterface kubecontainer.OSInterface,cgroupRoot string,containerRuntime string,rktPath string,rktStage1Image string,mounter mount.Interface,writer kubeio.Writer,dockerDaemonContainer string,systemContainer string,configureCBR0 bool,podCIDR string,pods int,dockerExecHandler dockertools.ExecHandler,resolverConfig string,cpuCFSQuota bool,daemonEndpoints *api.NodeDaemonEndpoints,serializeImagePulls bool,
) (*Kubelet, error) {if rootDirectory == "" {return nil, fmt.Errorf("invalid root directory %q", rootDirectory)}if resyncInterval <= 0 {return nil, fmt.Errorf("invalid sync frequency %d", resyncInterval)}if systemContainer != "" && cgroupRoot == "" {return nil, fmt.Errorf("invalid configuration: system container was specified and cgroup root was not specified")}dockerClient = dockertools.NewInstrumentedDockerInterface(dockerClient) // 获取操作接口serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)if kubeClient != nil {// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather// than an interface. There is no way to construct a list+watcher using resource name.listWatch := &cache.ListWatch{ListFunc: func() (runtime.Object, error) {return kubeClient.Services(api.NamespaceAll).List(labels.Everything())},WatchFunc: func(resourceVersion string) (watch.Interface, error) {return kubeClient.Services(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion)},}cache.NewReflector(listWatch, &api.Service{}, serviceStore, 0).Run() // 新生成一个监控services的信息变化}serviceLister := &cache.StoreToServiceLister{Store: serviceStore} // 监控service的变化nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)if kubeClient != nil {// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather// than an interface. There is no way to construct a list+watcher using resource name.fieldSelector := fields.Set{client.ObjectNameField: nodeName}.AsSelector()listWatch := &cache.ListWatch{ListFunc: func() (runtime.Object, error) {return kubeClient.Nodes().List(labels.Everything(), fieldSelector) // 查看所有的节点的信息},WatchFunc: func(resourceVersion string) (watch.Interface, error) {return kubeClient.Nodes().Watch(labels.Everything(), fieldSelector, resourceVersion)},}cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run()}nodeLister := &cache.StoreToNodeLister{Store: nodeStore} // 监听node的变化情况// TODO: get the real node object of ourself,// and use the real node name and UID.// TODO: what is namespace for node?nodeRef := &api.ObjectReference{Kind: "Node",Name: nodeName,UID: types.UID(nodeName),Namespace: "",}containerGC, err := newContainerGC(dockerClient, containerGCPolicy) // 生成GC容器if err != nil {return nil, err}imageManager, err := newImageManager(dockerClient, cadvisorInterface, recorder, nodeRef, imageGCPolicy) // 生成镜像管理if err != nil { return nil, fmt.Errorf("failed to initialize image manager: %v", err)}diskSpaceManager, err := newDiskSpaceManager(cadvisorInterface, diskSpacePolicy) // 生成磁盘管理if err != nil {return nil, fmt.Errorf("failed to initialize disk manager: %v", err)}statusManager := status.NewManager(kubeClient) // 获取状态管理readinessManager := kubecontainer.NewReadinessManager() containerRefManager := kubecontainer.NewRefManager() volumeManager := newVolumeManager() // 生成容器卷管理oomWatcher := NewOOMWatcher(cadvisorInterface, recorder)klet := &Kubelet{hostname: hostname,nodeName: nodeName,dockerClient: dockerClient,kubeClient: kubeClient,rootDirectory: rootDirectory,resyncInterval: resyncInterval,containerRefManager: containerRefManager,readinessManager: readinessManager,httpClient: &http.Client{},sourcesReady: sourcesReady,registerNode: registerNode,standaloneMode: standaloneMode,clusterDomain: clusterDomain,clusterDNS: clusterDNS,serviceLister: serviceLister,nodeLister: nodeLister,runtimeMutex: sync.Mutex{},runtimeUpThreshold: maxWaitForContainerRuntime,lastTimestampRuntimeUp: time.Time{},masterServiceNamespace: masterServiceNamespace,streamingConnectionIdleTimeout: streamingConnectionIdleTimeout,recorder: recorder,cadvisor: cadvisorInterface,containerGC: containerGC,imageManager: imageManager,diskSpaceManager: diskSpaceManager,statusManager: statusManager,volumeManager: volumeManager,cloud: cloud,nodeRef: nodeRef,nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,resourceContainer: resourceContainer,os: osInterface,oomWatcher: oomWatcher,cgroupRoot: cgroupRoot,mounter: mounter,writer: writer,configureCBR0: configureCBR0,podCIDR: podCIDR,pods: pods,syncLoopMonitor: util.AtomicValue{},resolverConfig: resolverConfig,cpuCFSQuota: cpuCFSQuota,daemonEndpoints: daemonEndpoints,}if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil { // 初始化网络插件类return nil, err} else {klet.networkPlugin = plug}machineInfo, err := klet.GetCachedMachineInfo()if err != nil {return nil, err}oomAdjuster := oom.NewOomAdjuster()procFs := procfs.NewProcFs()// Initialize the runtime.switch containerRuntime { // 检查使用哪一种运行接口case "docker":// Only supported one for now, continue.klet.containerRuntime = dockertools.NewDockerManager(dockerClient,recorder,readinessManager,containerRefManager,machineInfo,podInfraContainerImage,pullQPS,pullBurst,containerLogsDir,osInterface,klet.networkPlugin,klet,klet.httpClient,dockerExecHandler,oomAdjuster,procFs,klet.cpuCFSQuota,serializeImagePulls,)case "rkt":conf := &rkt.Config{Path: rktPath,Stage1Image: rktStage1Image,InsecureSkipVerify: true,}rktRuntime, err := rkt.New(conf,klet,recorder,containerRefManager,readinessManager,klet.volumeManager,serializeImagePulls,)if err != nil {return nil, err}klet.containerRuntime = rktRuntime// No Docker daemon to put in a container.dockerDaemonContainer = ""default:return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime)}// Setup container manager, can fail if the devices hierarchy is not mounted// (it is required by Docker however).containerManager, err := newContainerManager(mounter, cadvisorInterface, dockerDaemonContainer, systemContainer, resourceContainer) // 生成镜像管理if err != nil {return nil, fmt.Errorf("failed to create the Container Manager: %v", err)}klet.containerManager = containerManagergo util.Until(klet.syncNetworkStatus, 30*time.Second, util.NeverStop) // 同步网络状态if klet.kubeClient != nil {// Start syncing node status immediately, this may set up things the runtime needs to run.go util.Until(klet.syncNodeStatus, klet.nodeStatusUpdateFrequency, util.NeverStop) // 同步Pod状态}// Wait for the runtime to be up with a timeout.if err := waitUntilRuntimeIsUp(klet.containerRuntime, maxWaitForContainerRuntime); err != nil {return nil, fmt.Errorf("timed out waiting for %q to come up: %v", containerRuntime, err)}klet.lastTimestampRuntimeUp = time.Now()klet.runner = klet.containerRuntimeklet.podManager = newBasicPodManager(klet.kubeClient) // 获取基础的podManagerruntimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)if err != nil {return nil, err}klet.runtimeCache = runtimeCacheklet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder) // 新生成一个podWrokersmetrics.Register(runtimeCache)if err = klet.setupDataDirs(); err != nil {return nil, err}if err = klet.volumePluginMgr.InitPlugins(volumePlugins, &volumeHost{klet}); err != nil { // 初始化容器卷插件return nil, err}// If the container logs directory does not exist, create it.if _, err := os.Stat(containerLogsDir); err != nil {if err := osInterface.Mkdir(containerLogsDir, 0755); err != nil {glog.Errorf("Failed to create directory %q: %v", containerLogsDir, err)}}klet.backOff = util.NewBackOff(resyncInterval, maxContainerBackOff)klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity)return klet, nil
}
通过初始化kubelet实例,然后再调用该实例的Run方法来监听获取的Pod的变化信息。
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan PodUpdate) {if kl.logServer == nil {kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))}if kl.kubeClient == nil {glog.Warning("No api server defined - no node status update will be sent.")}// Move Kubelet to a container.if kl.resourceContainer != "" {// Fixme: I need to reside inside ContainerManager interface.err := util.RunInResourceContainer(kl.resourceContainer)if err != nil {glog.Warningf("Failed to move Kubelet to container %q: %v", kl.resourceContainer, err)}glog.Infof("Running in container %q", kl.resourceContainer)}if err := kl.imageManager.Start(); err != nil { // 开始imageManagerkl.recorder.Eventf(kl.nodeRef, "KubeletSetupFailed", "Failed to start ImageManager %v", err)glog.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err)}if err := kl.cadvisor.Start(); err != nil { // 开启监控kl.recorder.Eventf(kl.nodeRef, "KubeletSetupFailed", "Failed to start CAdvisor %v", err)glog.Errorf("Failed to start CAdvisor, system may not be properly monitored: %v", err)}if err := kl.containerManager.Start(); err != nil { // 开启容器管理kl.recorder.Eventf(kl.nodeRef, "KubeletSetupFailed", "Failed to start ContainerManager %v", err)glog.Errorf("Failed to start ContainerManager, system may not be properly isolated: %v", err)}if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {kl.recorder.Eventf(kl.nodeRef, "KubeletSetupFailed", "Failed to start OOM watcher %v", err)glog.Errorf("Failed to start OOM watching: %v", err)}go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop)// Start a goroutine responsible for killing pods (that are not properly// handled by pod workers).go util.Until(kl.podKiller, 1*time.Second, util.NeverStop) // 循环开启检查是否需要杀掉pod// Run the system oom watcher forever.kl.statusManager.Start()kl.syncLoop(updates, kl) // 同步所有的更新
}
继续查看syncLoop处理流程。
// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {glog.Info("Starting kubelet main sync loop.")kl.resyncTicker = time.NewTicker(kl.resyncInterval)var housekeepingTimestamp time.Timefor {if !kl.containerRuntimeUp() { // 检查是否启动time.Sleep(5 * time.Second)glog.Infof("Skipping pod synchronization, container runtime is not up.")continue}if !kl.doneNetworkConfigure() { // 检查网络配置time.Sleep(5 * time.Second)glog.Infof("Skipping pod synchronization, network is not configured")continue}// Make sure we sync first to receive the pods from the sources before// performing housekeeping.if !kl.syncLoopIteration(updates, handler) { // 同步数据break}// We don't want to perform housekeeping too often, so we set a minimum// period for it. Housekeeping would be performed at least once every// kl.resyncInterval, and *no* more than once every// housekeepingMinimumPeriod.// TODO (#13418): Investigate whether we can/should spawn a dedicated// goroutine for housekeepingif !kl.sourcesReady() {// If the sources aren't ready, skip housekeeping, as we may// accidentally delete pods from unready sources.glog.V(4).Infof("Skipping cleanup, sources aren't ready yet.")} else if housekeepingTimestamp.IsZero() {housekeepingTimestamp = time.Now()} else if time.Since(housekeepingTimestamp) > housekeepingMinimumPeriod {glog.V(4).Infof("SyncLoop (housekeeping)")if err := handler.HandlePodCleanups(); err != nil {glog.Errorf("Failed cleaning pods: %v", err)}housekeepingTimestamp = time.Now()}}
}func (kl *Kubelet) syncLoopIteration(updates <-chan PodUpdate, handler SyncHandler) bool {kl.syncLoopMonitor.Store(time.Now())select {case u, open := <-updates: // 获取更新信息if !open {glog.Errorf("Update channel is closed. Exiting the sync loop.")return false}switch u.Op {case ADD:glog.V(2).Infof("SyncLoop (ADD): %q", kubeletUtil.FormatPodNames(u.Pods))handler.HandlePodAdditions(u.Pods) // 添加Podcase UPDATE:glog.V(2).Infof("SyncLoop (UPDATE): %q", kubeletUtil.FormatPodNames(u.Pods))handler.HandlePodUpdates(u.Pods) // 更新Podcase REMOVE:glog.V(2).Infof("SyncLoop (REMOVE): %q", kubeletUtil.FormatPodNames(u.Pods))handler.HandlePodDeletions(u.Pods) // 删除Podcase SET:// TODO: Do we want to support this?glog.Errorf("Kubelet does not support snapshot update")}case <-kl.resyncTicker.C:// Periodically syncs all the pods and performs cleanup tasks.glog.V(4).Infof("SyncLoop (periodic sync)")handler.HandlePodSyncs(kl.podManager.GetPods())}kl.syncLoopMonitor.Store(time.Now())return true
}
调用的handler的实例最终调用的还是Kubelet实例的方法。
func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) {start := time.Now()sort.Sort(podsByCreationTime(pods))for _, pod := range pods {kl.podManager.AddPod(pod) // 通过podManger添加podif isMirrorPod(pod) { kl.handleMirrorPod(pod, start)continue}// Note that allPods includes the new pod since we added at the// beginning of the loop.allPods := kl.podManager.GetPods()// We failed pods that we rejected, so activePods include all admitted// pods that are alive and the new pod.activePods := kl.filterOutTerminatedPods(allPods) // 获取所有的pods// Check if we can admit the pod; if not, reject it.if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok { // 检查是否正常 否则就拒绝kl.rejectPod(pod, reason, message)continue}mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)kl.dispatchWork(pod, SyncPodCreate, mirrorPod, start) }
}func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) {start := time.Now()for _, pod := range pods {kl.podManager.UpdatePod(pod) // 通过podManager更新podif isMirrorPod(pod) {kl.handleMirrorPod(pod, start)continue}// TODO: Evaluate if we need to validate and reject updates.mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)kl.dispatchWork(pod, SyncPodUpdate, mirrorPod, start)}
}func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) {start := time.Now()for _, pod := range pods {kl.podManager.DeletePod(pod) // 通过podManger删除podif isMirrorPod(pod) {kl.handleMirrorPod(pod, start)continue}// Deletion is allowed to fail because the periodic cleanup routine// will trigger deletion again.if err := kl.deletePod(pod.UID); err != nil { // 删除podglog.V(2).Infof("Failed to delete pod %q, err: %v", kubeletUtil.FormatPodName(pod), err)}}
}
通过更新或者添加的pod都会通过podWorker来进行更新。
func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType SyncPodType, mirrorPod *api.Pod, start time.Time) {if kl.podIsTerminated(pod) { // 如果pod已经停止则返回return}// Run the sync in an async worker.kl.podWorkers.UpdatePod(pod, mirrorPod, func() {metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))}) // 更新pod或者新增// Note the number of containers for new pods.if syncType == SyncPodCreate {metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))}
}
此时就会调用如下UpdatePod函数进行处理。
func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {var minRuntimeCacheTime time.Timefor newWork := range podUpdates { // 获取更新的信息func() {defer p.checkForUpdates(newWork.pod.UID, newWork.updateCompleteFn)// We would like to have the state of the containers from at least// the moment when we finished the previous processing of that pod.if err := p.runtimeCache.ForceUpdateIfOlder(minRuntimeCacheTime); err != nil {glog.Errorf("Error updating the container runtime cache: %v", err)return}pods, err := p.runtimeCache.GetPods() // 获取podsif err != nil {glog.Errorf("Error getting pods while syncing pod: %v", err)return}err = p.syncPodFn(newWork.pod, newWork.mirrorPod,kubecontainer.Pods(pods).FindPodByID(newWork.pod.UID), newWork.updateType) // 同步调用podif err != nil {glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)p.recorder.Eventf(newWork.pod, "FailedSync", "Error syncing pod, skipping: %v", err)return}minRuntimeCacheTime = time.Now()newWork.updateCompleteFn() // 更新网络}()}
}// Apply the new setting to the specified pod. updateComplete is called when the update is completed.
func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func()) {uid := pod.UIDvar podUpdates chan workUpdatevar exists bool// TODO: Pipe this through from the kubelet. Currently kubelets operating with// snapshot updates (PodConfigNotificationSnapshot) will send updates, creates// and deletes as SET operations, which makes updates indistinguishable from// creates. The intent here is to communicate to the pod worker that it can take// certain liberties, like skipping status generation, when it receives a create// event for a pod.updateType := SyncPodUpdatep.podLock.Lock()defer p.podLock.Unlock()if podUpdates, exists = p.podUpdates[uid]; !exists {// We need to have a buffer here, because checkForUpdates() method that// puts an update into channel is called from the same goroutine where// the channel is consumed. However, it is guaranteed that in such case// the channel is empty, so buffer of size 1 is enough.podUpdates = make(chan workUpdate, 1)p.podUpdates[uid] = podUpdates// Creating a new pod worker either means this is a new pod, or that the// kubelet just restarted. In either case the kubelet is willing to believe// the status of the pod for the first pod worker sync. See corresponding// comment in syncPod.updateType = SyncPodCreate // 如果是新创建go func() {defer util.HandleCrash()p.managePodLoop(podUpdates) // 调用创建流程}()}if !p.isWorking[pod.UID] {p.isWorking[pod.UID] = true // 设置标志位等信息podUpdates <- workUpdate{pod: pod,mirrorPod: mirrorPod,updateCompleteFn: updateComplete,updateType: updateType,} // } else {p.lastUndeliveredWorkUpdate[pod.UID] = workUpdate{pod: pod,mirrorPod: mirrorPod,updateCompleteFn: updateComplete,updateType: updateType,}}
}
如果新增调用的syncPodFn函数,就是初始化传入的kubelet的syncPod函数。
func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType SyncPodType) error {podFullName := kubecontainer.GetPodFullName(pod) // 获取完整信息uid := pod.UIDstart := time.Now()var firstSeenTime time.Timeif firstSeenTimeStr, ok := pod.Annotations[ConfigFirstSeenAnnotationKey]; !ok {glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)} else {firstSeenTime = kubeletTypes.ConvertToTimestamp(firstSeenTimeStr).Get()}// Before returning, regenerate status and store it in the cache.defer func() {if isStaticPod(pod) && mirrorPod == nil { // 判断是否是静态的pod// No need to cache the status because the mirror pod does not// exist yet.return}status, err := kl.generatePodStatus(pod) // 获取pod状态if err != nil {glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err)} else {podToUpdate := podif mirrorPod != nil {podToUpdate = mirrorPod}existingStatus, ok := kl.statusManager.GetPodStatus(podToUpdate.UID) // 获取状态if !ok || existingStatus.Phase == api.PodPending && status.Phase == api.PodRunning &&!firstSeenTime.IsZero() {metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))}kl.statusManager.SetPodStatus(podToUpdate, status) // 设置pod状态}}()// Kill pods we can't run.if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil { // 如果pod不能运行则杀掉if err := kl.killPod(pod, runningPod); err != nil {util.HandleError(err)}return err}// Create Mirror Pod for Static Pod if it doesn't already existif isStaticPod(pod) {if mirrorPod != nil && !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {// The mirror pod is semantically different from the static pod. Remove// it. The mirror pod will get recreated later.glog.Errorf("Deleting mirror pod %q because it is outdated", podFullName)if err := kl.podManager.DeleteMirrorPod(podFullName); err != nil {glog.Errorf("Failed deleting mirror pod %q: %v", podFullName, err)}}if mirrorPod == nil {glog.V(3).Infof("Creating a mirror pod for static pod %q", podFullName)if err := kl.podManager.CreateMirrorPod(pod); err != nil {glog.Errorf("Failed creating a mirror pod %q: %v", podFullName, err)}_, ok := kl.podManager.GetMirrorPodByPod(pod)if !ok {glog.Errorf("Mirror pod not available")}}}if err := kl.makePodDataDirs(pod); err != nil { // 设置pod的运行路径glog.Errorf("Unable to make pod data directories for pod %q (uid %q): %v", podFullName, uid, err)return err}// Starting phase:ref, err := api.GetReference(pod)if err != nil {glog.Errorf("Couldn't make a ref to pod %q: '%v'", podFullName, err)}// Mount volumes.podVolumes, err := kl.mountExternalVolumes(pod) //挂载数据卷if err != nil {if ref != nil {kl.recorder.Eventf(ref, "FailedMount", "Unable to mount volumes for pod %q: %v", podFullName, err)}glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", podFullName, err)return err}kl.volumeManager.SetVolumes(pod.UID, podVolumes) //设置卷// The kubelet is the source of truth for pod status. It ignores the status sent from// the apiserver and regenerates status for every pod update, incrementally updating// the status it received at pod creation time.//// The container runtime needs 2 pieces of information from the status to sync a pod:// The terminated state of containers (to restart them) and the podIp (for liveness probes).// New pods don't have either, so we skip the expensive status generation step.//// If we end up here with a create event for an already running pod, it could result in a// restart of its containers. This cannot happen unless the kubelet restarts, because the// delete before the second create would cancel this pod worker.//// If the kubelet restarts, we have a bunch of running containers for which we get create// events. This is ok, because the pod status for these will include the podIp and terminated// status. Any race conditions here effectively boils down to -- the pod worker didn't sync// state of a newly started container with the apiserver before the kubelet restarted, so// it's OK to pretend like the kubelet started them after it restarted.//// Also note that deletes currently have an updateType of `create` set in UpdatePods.// This, again, does not matter because deletes are not processed by this method.var podStatus api.PodStatusif updateType == SyncPodCreate {// This is the first time we are syncing the pod. Record the latency// since kubelet first saw the pod if firstSeenTime is set.if !firstSeenTime.IsZero() {metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))}podStatus = pod.StatuspodStatus.StartTime = &unversioned.Time{Time: start}kl.statusManager.SetPodStatus(pod, podStatus) // 如果更新则设置状态glog.V(3).Infof("Not generating pod status for new pod %q", podFullName)} else {var err errorpodStatus, err = kl.generatePodStatus(pod) // 获取当前状态if err != nil {glog.Errorf("Unable to get status for pod %q (uid %q): %v", podFullName, uid, err)return err}}pullSecrets, err := kl.getPullSecretsForPod(pod)if err != nil {glog.Errorf("Unable to get pull secrets for pod %q (uid %q): %v", podFullName, uid, err)return err}err = kl.containerRuntime.SyncPod(pod, runningPod, podStatus, pullSecrets, kl.backOff) //同步pod的状态信息if err != nil {return err}ingress, egress, err := extractBandwidthResources(pod) // 获取绑定的网络信息if err != nil {return err}if egress != nil || ingress != nil {if pod.Spec.HostNetwork {kl.recorder.Event(pod, "HostNetworkNotSupported", "Bandwidth shaping is not currently supported on the host network")} else if kl.shaper != nil {status, found := kl.statusManager.GetPodStatus(pod.UID)if !found {statusPtr, err := kl.containerRuntime.GetPodStatus(pod)if err != nil {glog.Errorf("Error getting pod for bandwidth shaping")return err}status = *statusPtr}if len(status.PodIP) > 0 {err = kl.shaper.ReconcileCIDR(fmt.Sprintf("%s/32", status.PodIP), egress, ingress)}} else {kl.recorder.Event(pod, "NilShaper", "Pod requests bandwidth shaping, but the shaper is undefined")}}return nil
}
此处就是检查启动之前所有的状态并在检查之后启动,如果此时使用的是docker的接口,则会调用如下正真去启动容器。
// Sync the running pod to match the specified desired pod.
func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error {start := time.Now()defer func() {metrics.ContainerManagerLatency.WithLabelValues("SyncPod").Observe(metrics.SinceInMicroseconds(start))}()podFullName := kubecontainer.GetPodFullName(pod)containerChanges, err := dm.computePodContainerChanges(pod, runningPod, podStatus) // 更改pod状态if err != nil {return err}glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)if containerChanges.StartInfraContainer || (len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0) {if len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0 {glog.V(4).Infof("Killing Infra Container for %q because all other containers are dead.", podFullName)} else {glog.V(4).Infof("Killing Infra Container for %q, will start new one", podFullName)}// Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container)err = dm.KillPod(pod, runningPod) //如果正在运行则杀掉if err != nil {return err}} else {// Otherwise kill any containers in this pod which are not specified as ones to keep.for _, container := range runningPod.Containers { // 获取正在运行的要保留_, keep := containerChanges.ContainersToKeep[kubeletTypes.DockerID(container.ID)]if !keep {glog.V(3).Infof("Killing unwanted container %+v", container)// attempt to find the appropriate container policyvar podContainer *api.Containerfor i, c := range pod.Spec.Containers {if c.Name == container.Name {podContainer = &pod.Spec.Containers[i]break}}err = dm.KillContainerInPod(container.ID, podContainer, pod) // 如果不保留的则杀掉if err != nil {glog.Errorf("Error killing container: %v", err)}}}}// If we should create infra container then we do it first.podInfraContainerID := containerChanges.InfraContainerId // 获取运行的基础容器if containerChanges.StartInfraContainer && (len(containerChanges.ContainersToStart) > 0) {glog.V(4).Infof("Creating pod infra container for %q", podFullName)podInfraContainerID, err = dm.createPodInfraContainer(pod) // 创建Pod// Call the networking pluginif err == nil {err = dm.networkPlugin.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID) // 设置网络}if err != nil {glog.Errorf("Failed to create pod infra container: %v; Skipping pod %q", err, podFullName)return err}// Setup the host interface (FIXME: move to networkPlugin when ready)podInfraContainer, err := dm.client.InspectContainer(string(podInfraContainerID)) // 进入容器if err != nil {glog.Errorf("Failed to inspect pod infra container: %v; Skipping pod %q", err, podFullName)return err}if err = hairpin.SetUpContainer(podInfraContainer.State.Pid, "eth0"); err != nil {glog.Warningf("Hairpin setup failed for pod %q: %v", podFullName, err) // 设置网络}// Find the pod IP after starting the infra container in order to expose// it safely via the downward API without a race and be able to use podIP in kubelet-managed /etc/hosts file.pod.Status.PodIP = dm.determineContainerIP(pod.Name, pod.Namespace, podInfraContainer) // 获取网络IP}// Start everythingfor idx := range containerChanges.ContainersToStart {container := &pod.Spec.Containers[idx]// containerChanges.StartInfraContainer causes the containers to be restarted for config reasons// ignore backoffif !containerChanges.StartInfraContainer && dm.doBackOff(pod, container, podStatus, backOff) {glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, podFullName)continue}glog.V(4).Infof("Creating container %+v in pod %v", container, podFullName)err := dm.imagePuller.PullImage(pod, container, pullSecrets) // 拉取镜像dm.updateReasonCache(pod, container, "PullImageError", err) // 更新if err != nil {glog.Warningf("Failed to pull image %q from pod %q and container %q: %v", container.Image, kubecontainer.GetPodFullName(pod), container.Name, err)continue}if container.SecurityContext != nil && container.SecurityContext.RunAsNonRoot {err := dm.verifyNonRoot(container)dm.updateReasonCache(pod, container, "VerifyNonRootError", err)if err != nil {glog.Errorf("Error running pod %q container %q: %v", kubecontainer.GetPodFullName(pod), container.Name, err)continue}}// TODO(dawnchen): Check RestartPolicy.DelaySeconds before restart a container// Note: when configuring the pod's containers anything that can be configured by pointing// to the namespace of the infra container should use namespaceMode. This includes things like the net namespace// and IPC namespace. PID mode cannot point to another container right now.// See createPodInfraContainer for infra container setup.namespaceMode := fmt.Sprintf("container:%v", podInfraContainerID)_, err = dm.runContainerInPod(pod, container, namespaceMode, namespaceMode, getPidMode(pod)) // 运行容器dm.updateReasonCache(pod, container, "RunContainerError", err)if err != nil {// TODO(bburns) : Perhaps blacklist a container after N failures?glog.Errorf("Error running pod %q container %q: %v", kubecontainer.GetPodFullName(pod), container.Name, err)continue}// Successfully started the container; clear the entry in the failure// reason cache.dm.clearReasonCache(pod, container)}return nil
}
至此,pod的相关的容器的创建工作基本完成。这其中经过了多重的初始化流程,最终通过监听从APIServer传入的信息,来通过podWorkers来进行pod的状态管理,从而完成每个node节点的pod维护工作。
总结
本文只是非常简单的描述了一下kubelet初始化相关的流程,并初步查看了有关pod的一些初始化准备的工作,从而加深kubelet的工作原理,本文其实还有大量的细节未展开学习,如pod的删除更新、容器的网络等详细的流程需要在以后的学习中进一步理解。由于本人才疏学浅,如有错误请批评指正。
k8s概念入门之kubelet-针对1.1.版本阅读相关推荐
- k8s概念入门之apiserver-针对1.1.版本阅读
apiserver k8s中最重要的一个通信节点就是apiserver,是一个中心节点连接着每一环,是kubelet,kube-proxy和control-manager的交互的中心点,提供基于API ...
- k8s概念入门之control-manager-针对1.1.版本阅读
control-manager 资源控制器主要是为了控制各种资源的变更信息,例如pod的创建新增,副本控制器和账户控制器等信息,资源控制器的主要职责就是通过list-watch机制,从APIServe ...
- k8s概念入门之kube-proxy-针对1.1版本阅读
背景 在后续阅读k8s0.4版本的过程中,发现文档上描述的确实是一个不完整的版本,故切换版本到1.1,因为在1.1文档中已经标明了可以在生成环境中使用,故重新再学习一下有关kube-proxy的内容, ...
- k8s概念入门之kube-proxy-针对早期(0.4)版本阅读
k8s的kube-proxy分析 Kube-proxy主要是伴随着kubtlet进程一起部署在每个node节点中,proxy的功能主要就是为了完成在k8s集群中实现集群内部的通信,也可完成集群外的数据 ...
- Kubernetes ~ k8s 从入门到入坑。
Kubernetes ~ k8s 从入门到入坑. 文章目录 Kubernetes ~ k8s 从入门到入坑. 1. Kubernetes 介绍. 1.1 应用部署方式演变. 1.2 kubernete ...
- Kubernetes(k8s)入门及集群部署文档
文章目录 一.k8s 快速入门 简介 部署方式的进化 k8s能做什么 架构 整体主从方式 Master 节点架构 Node 节点架构 概念 快速体验 流程叙述 二.k8s 集群安装 前置要求 了解ku ...
- 【机器学习】机器学习和深度学习概念入门
机器学习和深度学习概念入门(上) 作者:谭东 来源:机器学习算法与自然语言处理 目 录 1 人工智能.机器学习.深度学习三者关系 2 什么是人工智能 3 什么是机器学习 4 机器学习之 ...
- 19年8月 字母哥 第一章 spring boot 2.x基础及概念入门 这里全部看完了 热部署没出来 第二章在前面2页 用热点公司网不行
http://springboot.zimug.com/1233100 文档 http://www.zimug.com/page/5 字母哥个人博客 11111 第一章 spring bo ...
- k8s dashboard_【大强哥-k8s从入门到放弃02】Kubernetes1.17部署Dashboard2.0
号外号外,后面所有提升视频都会更新到知乎和B站上去,不会直接发群里了,哈哈,能看懂这句话的我都认识,大家可以先关注一下,我知乎上的所有文档也会录成视频 更多视频详见 杨哥天云:https://spac ...
最新文章
- JQuery播放器代理--IE下支持wma格式
- 杭电2112HDU Today(map 最短路径)
- Linux automake命令
- paip.python错误解决 0x64024e96 指令引用的 0x00000135 内存。该内存不能为 read。
- 一文读懂全球CTRM市场的前世今生
- matlab vl_feat,matlab 安装 vl_feat
- Android开发之科大讯飞语音合成与播报
- 读《京东咚咚架构演进》有感
- 小熊的果篮 2021 CSP J2
- 剑英陪你玩转图形学(五)focus
- esc pos命令 java使用_18、ESC/POS指令集在android设备上使用实例(通过socket)
- 磁盘,分区,文件系统
- 秀技能:倒立及其他没用的
- C++ typename详解
- Flink【优质】面试
- 如何理解GPU中的SIMT(单指令流多线程模型)
- web前端网页制作课作业——用DIV+CSS技术设计的家乡旅游主题网站
- openflow交换机 ryu_在RYU中实现交换机的功能
- 用C++实现复数的四则运算
- 金龙鱼:营收增长,利润难求