在kubelet创建成功后,会将状态回写api-server,通知落到该node上的pod已经创建成功,分析两者如何交互。

syncLoop有个for循环,主要运行syncLoopIteration,并且和pleg组件有交互。

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {

select {

case u, open := <-configCh:

if !open {

glog.Errorf("Update channel is closed. Exiting the sync loop.")

return false

}

switch u.Op {

case kubetypes.ADD:

...

case kubetypes.UPDATE:

...

case kubetypes.REMOVE:

...

case kubetypes.RECONCILE:

...

case kubetypes.DELETE:

...

case kubetypes.RESTORE:

...

case kubetypes.SET:

...

}

...

case e := <-plegCh:

...

case <-syncCh:

...

case update := <-kl.livenessManager.Updates():

...

case <-housekeepingCh:

...

}

return true

}

HandlePodAdditions

syncLoopIteration中调用的HandlePodAdditions的逻辑:

先对要add的pods按创建时间排序

然后遍历pods

1. 先把pod写入podManager(如果podManager中没有某个pod,就意味着这个pod已经在apiserver中被删除了,并且除了cleanup不再会做别的操作)

2. 处理MirrorPod(为了监控static pod的状态,kubelet通过api server为每个static pod创建一个mirror pod)

3. 检查是否admit pod

4. 将pod分发到某个worker去进行sync操作

5. 将pod传给probeManager做健康检查

func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {

start := kl.clock.Now()

sort.Sort(sliceutils.PodsByCreationTime(pods))

// Responsible for checking limits in resolv.conf

// The limits do not have anything to do with individual pods

if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {

kl.dnsConfigurer.CheckLimitsForResolvConf()

}

for _, pod := range pods {

existingPods := kl.podManager.GetPods()

// Always add the pod to the pod manager. Kubelet relies on the pod

// manager as the source of truth for the desired state. If a pod does

// not exist in the pod manager, it means that it has been deleted in

// the apiserver and no action (other than cleanup) is required.

kl.podManager.AddPod(pod)

if kubepod.IsMirrorPod(pod) {

kl.handleMirrorPod(pod, start)

continue

}

if !kl.podIsTerminated(pod) {

// Only go through the admission process if the pod is not

// terminated.

// We failed pods that we rejected, so activePods include all admitted

// pods that are alive.

activePods := kl.filterOutTerminatedPods(existingPods)

// Check if we can admit the pod; if not, reject it.

if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {

kl.rejectPod(pod, reason, message)

continue

}

}

mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)

kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)

kl.probeManager.AddPod(pod)

}

}

主要看第4步:dispatchwork()做了什么。我们按照dispatchWork --> podWorkers.UpdatePod --> podWorkers.managePodLoop的代码链路一路追踪下去,发现最终调用了 podWorkers的syncPodFn,而syncPodFn是在NewMainKubelet对podWorkers初始化时赋值的,赋值为klet.syncPod,所以真正做同步工作的是syncPod

...

klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)

...

syncPod是同步单个pod的事务脚本。主要工作流是:

  • 如果要创建pod,记录pod worker的启动延时
  • 调用generateAPIPodStatus为pod准备一个v1.PodStatus对象,用来保存pod状态,并会写到statusManager,回写API server
  • 如果pod被视为第一次running,记录pod启动延迟
  • 在status manager中更新pod的状态
  • kill掉不该是running的pod
  • 如果pod是static pod,且没有mirror pod,创建一个mirror pod
  • 如果不存在,则为pod创建数据目录
  • 等待volume被attach/mount
  • 获取pod的pull secrets
  • 调用容器运行时的SyncPod回调
  • 更新reasonCache(缓存的是所有容器最近创建失败的原因,用于产生容器状态)

上面的注释中比较重要的是:

  1. syncPod会通过status Manager去回写apiserver pod的状态
  2. 会等待volume被attach/mount之后再继续执行
  3. 调用的容器运行时的SyncPod

    现在我们总结一下:

syncLoop主要就是将pod同步成期望状态。
另外通过grpc与dockershim通信,让dockershim向docker发送创建删除容器的请求,并通过CNI去配置pod网络
创建出来的pod实际上就是pause容器加上用户自己的容器(如init容器、业务容器)到这里SyncLoop就完成它的一次循环的工作了,当然每次循环的处理动作要看收到的数据。那么pod创建成功后,我们通过kubectl get pods看到的状态变为running

api-server回写

kubelet.syncPod中会往statusManager中更新pod状态,但是这个步骤在创建容器之前,创建容器完成后,kubelet中PLEG这个组件去同步。

func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {

klog.Info("Starting kubelet main sync loop.")

// The syncTicker wakes up kubelet to checks if there are any pod workers

// that need to be sync'd. A one-second period is sufficient because the

// sync interval is defaulted to 10s.

syncTicker := time.NewTicker(time.Second)

defer syncTicker.Stop()

housekeepingTicker := time.NewTicker(housekeepingPeriod)

defer housekeepingTicker.Stop()

# 该位置watch

plegCh := kl.pleg.Watch()

const (

base   = 100 * time.Millisecond

max    = 5 * time.Second

factor = 2

)

duration := base

for {

if err := kl.runtimeState.runtimeErrors(); err != nil {

klog.Infof("skipping pod synchronization - %v", err)

// exponential backoff

time.Sleep(duration)

duration = time.Duration(math.Min(float64(max), factor*float64(duration)))

continue

}

// reset backoff if we have a success

duration = base

kl.syncLoopMonitor.Store(kl.clock.Now())

#该位置传入该channel

if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {

break

}

kl.syncLoopMonitor.Store(kl.clock.Now())

}

}

pleg watch到数据传入了syncLoopIteration。pleg是用来在pod生命周期中生成事件的,它周期性地去监听容器状态。

syncLoopIteration中回去watch pleg的eventchannel,而pleg周期性地发现pod(或container)的变化,生成事件,写入eventchannel中。这样syncLoopIteration在select到数据以后会进行相应的处理

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,

syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {

case e := <-plegCh:

if isSyncPodWorthy(e) {

// PLEG event for a pod; sync it.

if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {

klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)

handler.HandlePodSyncs([]*v1.Pod{pod})

} else {

// If the pod no longer exists, ignore the event.

klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)

}

}

if e.Type == pleg.ContainerDied {

if containerID, ok := e.Data.(string); ok {

kl.cleanUpContainersInPod(e.ID, containerID)

}

}

}

将pod状态(假如是第一次创建到running,则为ContainerStarted)更新到statusManger。一路追踪代码发现数据写入了statusManager中的podStatusChannel,而statusManager在启动时就会select这个channel,并在statusManager的syncPod中去调用kubeClient去更新apiserver中pod的状态

func (m *manager) Start() {

// Don't start the status manager if we don't have a client. This will happen

// on the master, where the kubelet is responsible for bootstrapping the pods

// of the master components.

if m.kubeClient == nil {

klog.Infof("Kubernetes client is nil, not starting status manager.")

return

}

klog.Info("Starting to sync pod status with apiserver")

//lint:ignore SA1015 Ticker can link since this is only called once and doesn't handle termination.

syncTicker := time.Tick(syncPeriod)

// syncPod and syncBatch share the same go routine to avoid sync races.

go wait.Forever(func() {

select {

case syncRequest := <-m.podStatusChannel:

klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",

syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)

m.syncPod(syncRequest.podUID, syncRequest.status)

case <-syncTicker:

m.syncBatch()

}

}, 0)

}

syncPod函数调用statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, *oldStatus, mergePodStatus(*oldStatus, status.status))同步信息到api-server

// syncPod syncs the given status with the API server. The caller must not hold the lock.

func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {

if !m.needsUpdate(uid, status) {

klog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid)

return

}

// TODO: make me easier to express from client code

pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(status.podName, metav1.GetOptions{})

if errors.IsNotFound(err) {

klog.V(3).Infof("Pod %q (%s) does not exist on the server", status.podName, uid)

// If the Pod is deleted the status will be cleared in

// RemoveOrphanedStatuses, so we just ignore the update here.

return

}

if err != nil {

klog.Warningf("Failed to get status for pod %q: %v", format.PodDesc(status.podName, status.podNamespace, uid), err)

return

}

translatedUID := m.podManager.TranslatePodUID(pod.UID)

// Type convert original uid just for the purpose of comparison.

if len(translatedUID) > 0 && translatedUID != kubetypes.ResolvedPodUID(uid) {

klog.V(2).Infof("Pod %q was deleted and then recreated, skipping status update; old UID %q, new UID %q", format.Pod(pod), uid, translatedUID)

m.deletePodStatus(uid)

return

}

oldStatus := pod.Status.DeepCopy()

newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, *oldStatus, mergePodStatus(*oldStatus, status.status))

klog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes)

if err != nil {

klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)

return

}

pod = newPod

klog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status)

m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version

// We don't handle graceful deletion of mirror pods.

if m.canBeDeleted(pod, status.status) {

deleteOptions := metav1.NewDeleteOptions(0)

// Use the pod UID as the precondition for deletion to prevent deleting a newly created pod with the same name and namespace.

deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(pod.UID))

err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, deleteOptions)

if err != nil {

klog.Warningf("Failed to delete status for pod %q: %v", format.Pod(pod), err)

return

}

klog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod))

m.deletePodStatus(uid)

}

}

PatchPodStatus定义在pkg/util/pod/pod.go中,真正向api-server建连

// PatchPodStatus patches pod status.

func PatchPodStatus(c clientset.Interface, namespace, name string, oldPodStatus, newPodStatus v1.PodStatus) (*v1.Pod, []byte, error) {

patchBytes, err := preparePatchBytesforPodStatus(namespace, name, oldPodStatus, newPodStatus)

if err != nil {

return nil, nil, err

}

updatedPod, err := c.CoreV1().Pods(namespace).Patch(name, types.StrategicMergePatchType, patchBytes, "status")

if err != nil {

return nil, nil, fmt.Errorf("failed to patch status %q for pod %q/%q: %v", patchBytes, namespace, name, err)

}

return updatedPod, patchBytes, nil

}

接下来要调用client-go客户端来进行restful的调用。

client-go

client-go/kubernetes/typed/core/v1/pod.go

type PodInterface interface {

Create(*v1.Pod) (*v1.Pod, error)

Update(*v1.Pod) (*v1.Pod, error)

UpdateStatus(*v1.Pod) (*v1.Pod, error)

Delete(name string, options *metav1.DeleteOptions) error

DeleteCollection(options *metav1.DeleteOptions, listOptions metav1.ListOptions) error

Get(name string, options metav1.GetOptions) (*v1.Pod, error)

List(opts metav1.ListOptions) (*v1.PodList, error)

Watch(opts metav1.ListOptions) (watch.Interface, error)

Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1.Pod, err error)

GetEphemeralContainers(podName string, options metav1.GetOptions) (*v1.EphemeralContainers, error)

UpdateEphemeralContainers(podName string, ephemeralContainers *v1.EphemeralContainers) (*v1.EphemeralContainers, error)

PodExpansion

}

// Patch applies the patch and returns the patched pod.

func (c *pods) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1.Pod, err error) {

result = &v1.Pod{}

err = c.client.Patch(pt).

Namespace(c.ns).

Resource("pods").

SubResource(subresources...).

Name(name).

Body(data).

#注意这个Do方法

Do().

Into(result)

return

}

Do方法会在接下来调用

kubernetes\staging\src\k8s.io\client-go\rest\client.go

type Interface interface {

GetRateLimiter() flowcontrol.RateLimiter

Verb(verb string) *Request

Post() *Request

Put() *Request

Patch(pt types.PatchType) *Request

Get() *Request

Delete() *Request

APIVersion() schema.GroupVersion

}

// Patch begins a PATCH request. Short for c.Verb("Patch").

func (c *RESTClient) Patch(pt types.PatchType) *Request {

return c.Verb("PATCH").SetHeader("Content-Type", string(pt))

}

func (c *RESTClient) Verb(verb string) *Request {

backoff := c.createBackoffMgr()

if c.Client == nil {

return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle, 0)

}

return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle, c.Client.Timeout)

}

kubernetes\staging\src\k8s.io\client-go\rest\request.go

func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, serializers Serializers, backoff BackoffManager, throttle flowcontrol.RateLimiter, timeout time.Duration) *Request {

if backoff == nil {

klog.V(2).Infof("Not implementing request backoff strategy.")

backoff = &NoBackoff{}

}

pathPrefix := "/"

if baseURL != nil {

pathPrefix = path.Join(pathPrefix, baseURL.Path)

}

r := &Request{

client:      client,

verb:        verb,

baseURL:     baseURL,

pathPrefix:  path.Join(pathPrefix, versionedAPIPath),

content:     content,

serializers: serializers,

backoffMgr:  backoff,

throttle:    throttle,

timeout:     timeout,

}

switch {

case len(content.AcceptContentTypes) > 0:

r.SetHeader("Accept", content.AcceptContentTypes)

case len(content.ContentType) > 0:

r.SetHeader("Accept", content.ContentType+", */*")

}

return r

}

func (r *Request) Do() Result {

if err := r.tryThrottle(); err != nil {

return Result{err: err}

}

var result Result

err := r.request(func(req *http.Request, resp *http.Response) {

result = r.transformResponse(resp, req)

})

if err != nil {

return Result{err: err}

}

return result

}

r.resource = "pods"

r.namespace = "default"

r.subresource = "status"

r.resourcename = "tengine2-test"

r.body = byte.NewReader(data)

func (r *Request) request(fn func(*http.Request, *http.Response)) error {

//Metrics for total request latency

start := time.Now()

defer func() {

metrics.RequestLatency.Observe(r.verb, r.finalURLTemplate(), time.Since(start))

}()

if r.err != nil {

klog.V(4).Infof("Error in request: %v", r.err)

return r.err

}

// TODO: added to catch programmer errors (invoking operations with an object with an empty namespace)

if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 {

return fmt.Errorf("an empty namespace may not be set when a resource name is provided")

}

if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 {

return fmt.Errorf("an empty namespace may not be set during creation")

}

client := r.client

if client == nil {

client = http.DefaultClient

}

// Right now we make about ten retry attempts if we get a Retry-After response.

maxRetries := 10

retries := 0

for {

#该函数生成url

url := r.URL().String()

req, err := http.NewRequest(r.verb, url, r.body)

func (r *Request) URL() *url.URL {

p := r.pathPrefix

if r.namespaceSet && len(r.namespace) > 0 {

p = path.Join(p, "namespaces", r.namespace)

}

if len(r.resource) != 0 {

p = path.Join(p, strings.ToLower(r.resource))

}

// Join trims trailing slashes, so preserve r.pathPrefix's trailing slash for backwards compatibility if nothing was changed

if len(r.resourceName) != 0 || len(r.subpath) != 0 || len(r.subresource) != 0 {

p = path.Join(p, r.resourceName, r.subresource, r.subpath)

}

finalURL := &url.URL{}

根据上面r结构体的信息,可知最后访问的url为:prefix/namespaces/default/pods/tengine2-test/status,body为old status和new status

至此,向api-server发送流程结束

kubelet与api-server交互相关推荐

  1. k8s 组件介绍-API Server

    API Server简介 k8s API Server提供了k8s各类资源对象(pod,RC,Service等)的增删改查及watch等HTTP Rest接口,是整个系统的数据总线和数据中心. kub ...

  2. API Server简介

    一.API Server简介 1.1 API Server功能 k8s API Server提供了k8s各类资源对象(pod,RC,Service等)的增删改查及watch等HTTP Rest接口,是 ...

  3. k8s实践(6)--Kubernetes安全:API Server访问控制

    Kubernetes安全 安全永远是一个重大的话题,特别是云计算平台,更需要设计出一套完善的安全方案,以应对复杂的场景. Kubernetes主要使用Docker作为应用承载环境,Kubernetes ...

  4. 关于Kubernetes中API Server使用token、kubeconfig文件认证的一些笔记

    写在前面 学习K8s涉及,整理笔记记忆 博文偏实战,内容涉及: token方式的API Server认证Demo Kubeconfig文件方式的API Server认证Demo Kubeconfig文 ...

  5. Kubernetes API Server 之集群安全认证

    文章目录 前言 一.为什么要有 api-server 集群安全认证? 二.安全机制的三个流程 三.HTTP Bearer Token 认证 四.HTTPS 双向证书认证 总结 前言 kubernete ...

  6. 资深专家深度剖析Kubernetes API Server第1章(共3章)

    欢迎来到深入学习Kubernetes API Server的系列文章,在本系列文章中我们将深入的探究Kubernetes API Server的相关实现.如果你对Kubernetes的内部实现机制比较 ...

  7. kubernetes API Server 权限管理实践

    2019独角兽企业重金招聘Python工程师标准>>> kubernetes API Server 权限管理实践 API Server权限控制方式介绍 API Server权限控制分 ...

  8. 深度剖析Kubernetes API Server三部曲 - part 1

    欢迎来到深入学习Kubernetes API Server的系列文章,在本系列文章中我们将深入的探究Kubernetes API Server的相关实现.如果你对Kubernetes 的内部实现机制比 ...

  9. 深度剖析Kubernetes API Server三部曲 - part 2

    欢迎来到深入学习Kubernetes API Server的系列文章的第二部分.在上一部分中我们对APIserver总体,相关术语及request请求流进行探讨说明.在本部分文章中,我们主要聚焦于探究 ...

  10. k8s组件说明:api server

    api server:是所有服务访问的统一入口 包括kubelet和kube proxy都要访问它

最新文章

  1. Java Socket传输数据的文件系统介绍
  2. java jdbc 链接pg_Java零基础教程
  3. 正则表达式的汉字匹配
  4. DJANGO中,用QJUERY的AJAX的json返回中文乱码的解决办法
  5. canal mysql重置_canal: 首先装完阿里的canal,然后数据库同步,仅供学习参考
  6. ik分词器 mysql php_php环境下使用elasticSearch+ik分词器进行全文搜索
  7. shiro学习(12)No WebApplicationContext found:
  8. C语言hello work的编译过程分解
  9. Linux Shell Record
  10. .NET 2.0泛型集合类与.NET 1.1集合类的区别(一)
  11. 【Flutter】flutter doctor 报错Android license status unknown. Run `flutter doctor --android-licenses‘
  12. 用visio制作机柜服务器,visio 绘制机柜接线图 实例教程
  13. 知名公司GitHub官网
  14. 【爬虫】根据月份从地理空间数据云上爬取遥感影像信息
  15. js之如何计算两个时间的时间差
  16. html右键打印,怎么在鼠标右键添加打印选项?
  17. AT91SAM9260使用SAM-BA调试BOOT程序
  18. 上传视频到FTP服务器+播放
  19. 用Vue完成加减乘除运算
  20. python项目开发实例集锦-python项目开发案例集锦 mobi|金融租赁公司 业务

热门文章

  1. 使用 echarts实现中国地图
  2. 从 “城市大脑”实践,瞭望未来城市源起
  3. 名词解释第七十八讲:加仓
  4. 洞庭龙梦·回合文案·1.0版本到1.4版本的VR时代
  5. HTML禁止复制粘贴以及禁止下载图片
  6. html5的video播放器上禁止下载和禁止右键下载实现。
  7. SpringCloud Alibaba 之Sentinel
  8. Netty由浅到深_第三章_NIO模型3大组件详细介绍
  9. matlab在通信中的应用实验指导书 课后答案,《MATLAB及Simulink应用》实验指导书+答案...
  10. 基于Android平台的记事本软件(Android Studio项目+报告+app文件)