打个广告,如果想旅游(国内外周边,签证,酒店,门票等)可以联系 15713688208 ,微信同号。

常量参数

首先看一些常量参数:

maxRetries: 服务在退出队列之前的重试次数。会因为限速控制器,重试间隔越来越长。

maxCapacity: 存储在endpoint资源中的最大地址数,在未来的版本中可能会超过此长度的控制。

const (// maxRetries is the number of times a service will be retried before it is dropped out of the queue.// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the// sequence of delays between successive queuings of a service.//// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82smaxRetries = 15// maxCapacity represents the maximum number of addresses that should be// stored in an Endpoints resource. In a future release, this controller// may truncate endpoints exceeding this length.maxCapacity = 1000// TolerateUnreadyEndpointsAnnotation is an annotation on the Service denoting if the endpoints// controller should go ahead and create endpoints for unready pods. This annotation is// currently only used by StatefulSets, where we need the pod to be DNS// resolvable during initialization and termination. In this situation we// create a headless Service just for the StatefulSet, and clients shouldn't// be using this Service for anything so unready endpoints don't matter.// Endpoints of these Services retain their DNS records and continue// receiving traffic for the Service from the moment the kubelet starts all// containers in the pod and marks it "Running", till the kubelet stops all// containers and deletes the pod from the apiserver.// This field is deprecated. v1.Service.PublishNotReadyAddresses will replace it// subsequent releases.  It will be removed no sooner than 1.13.TolerateUnreadyEndpointsAnnotation = "service.alpha.kubernetes.io/tolerate-unready-endpoints"
)

执行流程

kube-controller-manager 中可以设置参数,在同一时间处理的endpoint数量,可以调整的更大,那么相应的就会耗费更加多的cpu和负载。

–concurrent-endpoint-syncs int32 Default: 5

The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load

用于批量pod更新,默认为0值,表示每个pod都会触发更新。

–endpoint-updates-batch-period duration

The length of endpoint updates batching period. Processing of pod changes will be delayed by this duration to join them with potential upcoming updates and reduce the overall number of endpoints updates. Larger number = higher endpoint programming latency, but lower number of endpoints revision generated

// EndpointControllerConfiguration contains elements describing EndpointController.
type EndpointControllerConfiguration struct {
// concurrentEndpointSyncs is the number of endpoint syncing operations
// that will be done concurrently. Larger number = faster endpoint updating,
// but more CPU (and network) load.
ConcurrentEndpointSyncs int32// EndpointUpdatesBatchPeriod can be used to batch endpoint updates.
// All updates of endpoint triggered by pod change will be delayed by up to
// 'EndpointUpdatesBatchPeriod'. If other pods in the same endpoint change
// in that period, they will be batched to a single endpoint update.
// Default 0 value means that each pod update triggers an endpoint update.
EndpointUpdatesBatchPeriod metav1.Duration
}

开始执行endpoint进行调用,传入响应的参数。

func startEndpointController(ctx ControllerContext) (http.Handler, bool, error) {go endpointcontroller.NewEndpointController(ctx.InformerFactory.Core().V1().Pods(),ctx.InformerFactory.Core().V1().Services(),ctx.InformerFactory.Core().V1().Endpoints(),ctx.ClientBuilder.ClientOrDie("endpoint-controller"),ctx.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration,).Run(int(ctx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs), ctx.Stop)return nil, true, nil
}

根据worker数启动相应数量的协程数,执行对应的逻辑。

func (e *Controller) Run(workers int, stopCh <-chan struct{}) {defer utilruntime.HandleCrash()defer e.queue.ShutDown()klog.Infof("Starting endpoint controller")defer klog.Infof("Shutting down endpoint controller")if !cache.WaitForNamedCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) {return}for i := 0; i < workers; i++ {go wait.Until(e.worker, e.workerLoopPeriod, stopCh)}go func() {defer utilruntime.HandleCrash()e.checkLeftoverEndpoints()}()<-stopCh
}

列出当前存在的endpoint,并添加他们到服务的队列中。 他们检查哪些endpoint还存在,但是service没有了。这些endpoint需要做一个删除的修正。

  1. 先列出所有的endpoint。

  2. 通过列出来的svc进行遍历。

当获取到组件的注解时候就会跳过。这种情况在多组件kubernetes-controller-manager情况下。

  1. 如果能获取到这个key,就加入队列中。

LeaderElectionRecordAnnotationKey = “control-plane.alpha.kubernetes.io/leader”

// checkLeftoverEndpoints lists all currently existing endpoints and adds their
// service to the queue. This will detect endpoints that exist with no
// corresponding service; these endpoints need to be deleted. We only need to
// do this once on startup, because in steady-state these are detected (but
// some stragglers could have been left behind if the endpoint controller
// reboots).
func (e *Controller) checkLeftoverEndpoints() {list, err := e.endpointsLister.List(labels.Everything())if err != nil {utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err))return}for _, ep := range list {if _, ok := ep.Annotations[resourcelock.LeaderElectionRecordAnnotationKey]; ok {// when there are multiple controller-manager instances,// we observe that it will delete leader-election endpoints after 5min// and cause re-election// so skip the delete here// as leader-election only have endpoints without servicecontinue}key, err := controller.KeyFunc(ep)if err != nil {utilruntime.HandleError(fmt.Errorf("Unable to get key for endpoint %#v", ep))continue}e.queue.Add(key)}
}

worker运行一个worker线程,您可以根据需要并行运行任意数量,workqueue 保证它们最终不会同时处理相同的服务。(queue厉害,直呼内行!!!!)

// worker runs a worker thread that just dequeues items, processes them, and
// marks them done. You may run as many of these in parallel as you wish; the
// workqueue guarantees that they will not end up processing the same service
// at the same time.
func (e *Controller) worker() {for e.processNextWorkItem() {}
}

主要的逻辑处理是在syncService。

func (e *Controller) processNextWorkItem() bool {eKey, quit := e.queue.Get()if quit {return false}defer e.queue.Done(eKey)err := e.syncService(eKey.(string))e.handleErr(err, eKey)return true
}

sync service 分解流程

如果cache中存在service,但是get查找svc的时候已经找不到了。进一步分析:

如果是找不到的错误,那么就删除endpoint,也就是删掉负载。

然后在删除svc,返回。

  service, err := e.serviceLister.Services(namespace).Get(name)if err != nil {if !errors.IsNotFound(err) {return err}// Delete the corresponding endpoint, as the service has been deleted.// TODO: Please note that this will delete an endpoint when a// service is deleted. However, if we're down at the time when// the service is deleted, we will miss that deletion, so this// doesn't completely solve the problem. See #6877.err = e.client.CoreV1().Endpoints(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})if err != nil && !errors.IsNotFound(err) {return err}e.triggerTimeTracker.DeleteService(namespace, name)return nil}

如果selector是nil的,那么这个是用户自己创建的svc,并且自己去处理endpoint。直接返回。

  if service.Spec.Selector == nil {// services without a selector receive no endpoints from this controller;// these services will receive the endpoints that are created out-of-band via the REST API.return nil}

检查service.Spec.PublishNotReadyAddresses是否为true,

或者Service Annotations "service.alpha.kubernetes.io/tolerate-unready-endpoints"是否为true(/t/T/True/TRUE/1),

如果为true,则表示tolerate Unready Endpoints,即Unready的Pods信息也会被加入该Service对应的Endpoints中。

注意,Annotations "service.alpha.kubernetes.io/tolerate-unready-endpoints"在Kubernetes 1.13中将被弃用,后续只使用.Spec.PublishNotReadyAddresses Field。

klog.V(5).Infof("About to update endpoints for service %q", key)pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())if err != nil {// Since we're getting stuff from a local cache, it is// basically impossible to get this error.return err}// If the user specified the older (deprecated) annotation, we have to respect it.tolerateUnreadyEndpoints := service.Spec.PublishNotReadyAddressesif v, ok := service.Annotations[TolerateUnreadyEndpointsAnnotation]; ok {b, err := strconv.ParseBool(v)if err == nil {tolerateUnreadyEndpoints = b} else {utilruntime.HandleError(fmt.Errorf("Failed to parse annotation %v: %v", TolerateUnreadyEndpointsAnnotation, err))}}// We call ComputeEndpointLastChangeTriggerTime here to make sure that the// state of the trigger time tracker gets updated even if the sync turns out// to be no-op and we don't update the endpoints object.endpointsLastChangeTriggerTime := e.triggerTimeTracker.ComputeEndpointLastChangeTriggerTime(namespace, service, pods)

后续开始遍历pod,如果这个pod没有pod ip,那么跳过这个继续下一个。 还有检测不容忍unready的并且有被删除的。

    if len(pod.Status.PodIP) == 0 {klog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)continue}if !tolerateUnreadyEndpoints && pod.DeletionTimestamp != nil {klog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name)continue}

之后就是想pod到endpoint中因为service。其中有对ipv4和ipv6的一些补充。传入svc和单个pod,最后组合成ep。

func podToEndpointAddressForService(svc *v1.Service, pod *v1.Pod) (*v1.EndpointAddress, error) {var endpointIP stringipFamily := v1.IPv4Protocolif !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {// In a legacy cluster, the pod IP is guaranteed to be usableendpointIP = pod.Status.PodIP} else {//feature flag enabled and pods may have multiple IPsif len(svc.Spec.IPFamilies) > 0 {// controller is connected to an api-server that correctly sets IPFamiliesipFamily = svc.Spec.IPFamilies[0] // this works for headful and headless} else {// controller is connected to an api server that does not correctly// set IPFamilies (e.g. old api-server during an upgrade)if len(svc.Spec.ClusterIP) > 0 && svc.Spec.ClusterIP != v1.ClusterIPNone {// headful service. detect via service clusterIPif utilnet.IsIPv6String(svc.Spec.ClusterIP) {ipFamily = v1.IPv6Protocol}} else {// Since this is a headless service we use podIP to identify the family.// This assumes that status.PodIP is assigned correctly (follows pod cidr and// pod cidr list order is same as service cidr list order). The expectation is// this is *most probably* the case.// if the family was incorrectly indentified then this will be corrected once the// the upgrade is completed (controller connects to api-server that correctly defaults services)if utilnet.IsIPv6String(pod.Status.PodIP) {ipFamily = v1.IPv6Protocol}}}// find an ip that matches the familyfor _, podIP := range pod.Status.PodIPs {if (ipFamily == v1.IPv6Protocol) == utilnet.IsIPv6String(podIP.IP) {endpointIP = podIP.IPbreak}}}if endpointIP == "" {return nil, fmt.Errorf("failed to find a matching endpoint for service %v", svc.Name)}return &v1.EndpointAddress{IP:       endpointIP,NodeName: &pod.Spec.NodeName,TargetRef: &v1.ObjectReference{Kind:            "Pod",Namespace:       pod.ObjectMeta.Namespace,Name:            pod.ObjectMeta.Name,UID:             pod.ObjectMeta.UID,ResourceVersion: pod.ObjectMeta.ResourceVersion,},}, nil
}

如果ports==0,再去判断是不是clusterIP为none呢,如果是的话,就进行区分。

    ep, err := podToEndpointAddressForService(service, pod)if err != nil {// this will happen, if the cluster runs with some nodes configured as dual stack and some as not// such as the case of an upgrade..klog.V(2).Infof("failed to find endpoint for service:%v with ClusterIP:%v on pod:%v with error:%v", service.Name, service.Spec.ClusterIP, pod.Name, err)continue}epa := *epif endpointutil.ShouldSetHostname(pod, service) {epa.Hostname = pod.Spec.Hostname}// Allow headless service not to have ports.if len(service.Spec.Ports) == 0 {if service.Spec.ClusterIP == api.ClusterIPNone {subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, tolerateUnreadyEndpoints)// No need to repack subsets for headless service without ports.}} else {for i := range service.Spec.Ports {servicePort := &service.Spec.Ports[i]portNum, err := podutil.FindPort(pod, servicePort)if err != nil {klog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)continue}epp := endpointPortFromServicePort(servicePort, portNum)var readyEps, notReadyEps intsubsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints)totalReadyEps = totalReadyEps + readyEpstotalNotReadyEps = totalNotReadyEps + notReadyEps}}

返回整体的endpoint聚合subnet

func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress,epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) {var readyEps intvar notReadyEps intports := []v1.EndpointPort{}if epp != nil {ports = append(ports, *epp)}   if tolerateUnreadyEndpoints || podutil.IsPodReady(pod) {subsets = append(subsets, v1.EndpointSubset{Addresses: []v1.EndpointAddress{epa},Ports:     ports,})readyEps++} else if shouldPodBeInEndpoints(pod) {klog.V(5).Infof("Pod is out of service: %s/%s", pod.Namespace, pod.Name)subsets = append(subsets, v1.EndpointSubset{NotReadyAddresses: []v1.EndpointAddress{epa},Ports:             ports,})notReadyEps++}return subsets, readyEps, notReadyEps
}

如果不是headless service,那么还需要如果是名字,就去查找pod中的名字,和协议,返回一个端口。如果直接是数字那就更好了,直接返回。

func FindPort(pod *v1.Pod, svcPort *v1.ServicePort) (int, error) {portName := svcPort.TargetPortswitch portName.Type {case intstr.String:name := portName.StrValfor _, container := range pod.Spec.Containers {for _, port := range container.Ports {if port.Name == name && port.Protocol == svcPort.Protocol {return int(port.ContainerPort), nil}}}case intstr.Int:return portName.IntValue(), nil}return 0, fmt.Errorf("no suitable port for manifest: %s", pod.UID)
}

那么就需要找到endpointport的数据结构,然后进行添加。

func endpointPortFromServicePort(servicePort *v1.ServicePort, portNum int) *v1.EndpointPort {epp := &v1.EndpointPort{Name:        servicePort.Name,Port:        int32(portNum),Protocol:    servicePort.Protocol,AppProtocol: servicePort.AppProtocol,}return epp
}

去获取当前的endpoint

  // See if there's actually an update here.currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)if err != nil {if errors.IsNotFound(err) {currentEndpoints = &v1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name:   service.Name,Labels: service.Labels,},}} else {return err}}
createEndpoints := len(currentEndpoints.ResourceVersion) == 0

判断是不是需要更新endpoint

  if !createEndpoints &&apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets) &&apiequality.Semantic.DeepEqual(compareLabels, service.Labels) &&capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) {klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)return nil}

将当前的endpoint深拷贝到newendpoint。

  newEndpoints := currentEndpoints.DeepCopy()newEndpoints.Subsets = subsetsnewEndpoints.Labels = service.Labels

如果超过个单个的配额的话会增加注解。

  if overCapacity(newEndpoints.Subsets) {newEndpoints.Annotations[v1.EndpointsOverCapacity] = "warning"} else {delete(newEndpoints.Annotations, v1.EndpointsOverCapacity)}

判断是否需要创建endpoint,根据是否存在而进行调用不同的方法。

 if createEndpoints {// No previous endpoints, create them_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(context.TODO(), newEndpoints, metav1.CreateOptions{})} else {// Pre-existing_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(context.TODO(), newEndpoints, metav1.UpdateOptions{})}

kubernetes endpoint 代码阅读相关推荐

  1. ORB_SLAM2代码阅读(5)——Bundle Adjustment

    ORB_SLAM2代码阅读(5)--Bundle Adjustment 1. 说明 2. Bundle Adjustment(BA)的物理意义 3. BA的数学表达 4. BA的求解方法 4.1 最速 ...

  2. ORB_SLAM2代码阅读(3)——LocalMapping线程

    ORB_SLAM2代码阅读(3)--LocalMapping线程 1.说明 2.简介 3.处理关键帧 4. 地图点剔除 5. 创建新的地图点 6.相邻搜索 6.剔除冗余关键帧 1.说明 本文介绍ORB ...

  3. ORB_SLAM2代码阅读(4)——LoopClosing线程

    ORB_SLAM2代码阅读(4)--LoopClosing线程 1.说明 2.简介 3.检测回环 4.计算Sim3 4.1 为什么在进行回环检测的时候需要计算相似变换矩阵,而不是等距变换? 4.2 累 ...

  4. ORB_SLAM2代码阅读(2)——tracking线程

    ORB_SLAM2代码阅读(2)--Tracking线程 1. 说明 2. 简介 2.1 Tracking 流程 2.2 Tracking 线程的二三四 2.2.1 Tracking 线程的二种模式 ...

  5. ORB_SLAM2代码阅读(1)——系统入口

    ORB_SLAM2代码阅读(1)--系统简介 1.说明 2.简介 3.stereo_kitti.cc 4.SLAM系统文件(System.cc) 4.1 构造函数System() 4.2 TrackS ...

  6. 深度学习项目代码阅读建议

    点击上方"小白学视觉",选择加"星标"或"置顶" 重磅干货,第一时间送达本文转自|机器学习实验室 犹豫很久要不要把读代码这个事情专门挑出来写 ...

  7. JavaScript权威Douglas Crockford:代码阅读和每个人都该学的编程

    作者:Peter Seibel 关于JavaScript Seibel:在程序学习之路上有哪些令你后悔的事情? Crockford:我了解一些语言,但却一直没有机会使用.我花了不少时间学习APL并了解 ...

  8. MFC按钮CXPButton类,代码阅读起来还是挺不错的

    在操手MFC的时候,经常会抱怨MFC界面不如其他的框架或语言,比如VB,C#等等,面对MS在系统上的不断更新换代,我们也越来越追求软件的视觉效果,譬如我们会更喜欢win7下的玻璃效果,看起来很炫. 在 ...

  9. 《代码阅读方法与实践之读书笔记之一》

    <代码阅读方法与实践之读书笔记之一> 阅读代码是程序员的基本技能,同时也是软件开发.维护.演进.审查和重用过程中不可或缺的组成部分.<代码阅读方法与实践之读书笔记之一>这本书围 ...

最新文章

  1. C#2.0实例程序STEP BY STEP--实例二:数据类型
  2. 使用ASP.NET Core 3.x 构建 RESTful API - 3.4 内容协商
  3. python正十三边形_一起学python-opencv十三(直方图反向投影和模板匹配)
  4. 吴恩达作业7:梯度下降优化算法
  5. 如何实现开关CD-ROM
  6. 对于python初学者,如何使用python定义联合(使用ctypes)?
  7. three.js制作3d模型工具_浙江3D打印模型制作收费标准▁来图定制
  8. 计算机原理专科试卷带答案,计算机组成原理专科生期末试卷一
  9. macos下刻录系统盘
  10. 使用htps进行,Get请求和post请求
  11. 社区动态——恭喜海豚调度中国区用户组新晋 9 枚“社群管理员”
  12. [RelativeNAS] Relative Neural Architecture Search via Slow-Fast Learning
  13. 科研论文中的图片如何保证高清
  14. 计算机组成原理 MOOC(下)
  15. Unity Shader - Get Screen Pos
  16. 怎么查看建筑图纸?有什么技巧吗?
  17. Programming Languages PartA Week2学习笔记——SML基本语法
  18. 消息传递框架MPNN: Neural Message Passing for Quantum Chemistry
  19. eclips报错如下:
  20. Qt-C++基础界面开发(2- 简单Display Widget控件和Item Widget控件的使用)

热门文章

  1. Kernel Trick
  2. LLC开关电源详细工作过程
  3. UML统一建模语言第4章 用例和用例图课后习题
  4. 艾洛积分系统(Elo Rating System)
  5. SpringMVC jdbctemplate实现底层架构封装
  6. Android 给地震监视器添加Notification
  7. MPLS多协议标签交换技术
  8. android webview和ji交互分析
  9. SLAM——ORB-SLAM3代码分析(七)Converter
  10. 自动驾驶的理想破灭?我看到的这些场景都是噩梦 | 分析