maxRetries: 服务在退出队列之前的重试次数。会因为限速控制器,重试间隔越来越长。

maxCapacity: 存储在endpoint资源中的最大地址数,在未来的版本中可能会超过此长度的控制。

const (// maxRetries is the number of times a service will be retried before it is dropped out of the queue.// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the// sequence of delays between successive queuings of a service.//// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82smaxRetries = 15// maxCapacity represents the maximum number of addresses that should be// stored in an Endpoints resource. In a future release, this controller// may truncate endpoints exceeding this length.maxCapacity = 1000// TolerateUnreadyEndpointsAnnotation is an annotation on the Service denoting if the endpoints// controller should go ahead and create endpoints for unready pods. This annotation is// currently only used by StatefulSets, where we need the pod to be DNS// resolvable during initialization and termination. In this situation we// create a headless Service just for the StatefulSet, and clients shouldn't// be using this Service for anything so unready endpoints don't matter.// Endpoints of these Services retain their DNS records and continue// receiving traffic for the Service from the moment the kubelet starts all// containers in the pod and marks it "Running", till the kubelet stops all// containers and deletes the pod from the apiserver.// This field is deprecated. v1.Service.PublishNotReadyAddresses will replace it// subsequent releases.  It will be removed no sooner than 1.13.TolerateUnreadyEndpointsAnnotation = "service.alpha.kubernetes.io/tolerate-unready-endpoints"


kube-controller-manager 中可以设置参数,在同一时间处理的endpoint数量,可以调整的更大,那么相应的就会耗费更加多的cpu和负载。

–concurrent-endpoint-syncs int32 Default: 5

The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load


–endpoint-updates-batch-period duration

The length of endpoint updates batching period. Processing of pod changes will be delayed by this duration to join them with potential upcoming updates and reduce the overall number of endpoints updates. Larger number = higher endpoint programming latency, but lower number of endpoints revision generated

// EndpointControllerConfiguration contains elements describing EndpointController.
type EndpointControllerConfiguration struct {
// concurrentEndpointSyncs is the number of endpoint syncing operations
// that will be done concurrently. Larger number = faster endpoint updating,
// but more CPU (and network) load.
ConcurrentEndpointSyncs int32// EndpointUpdatesBatchPeriod can be used to batch endpoint updates.
// All updates of endpoint triggered by pod change will be delayed by up to
// 'EndpointUpdatesBatchPeriod'. If other pods in the same endpoint change
// in that period, they will be batched to a single endpoint update.
// Default 0 value means that each pod update triggers an endpoint update.
EndpointUpdatesBatchPeriod metav1.Duration


func startEndpointController(ctx ControllerContext) (http.Handler, bool, error) {go endpointcontroller.NewEndpointController(ctx.InformerFactory.Core().V1().Pods(),ctx.InformerFactory.Core().V1().Services(),ctx.InformerFactory.Core().V1().Endpoints(),ctx.ClientBuilder.ClientOrDie("endpoint-controller"),ctx.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration,).Run(int(ctx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs), ctx.Stop)return nil, true, nil


func (e *Controller) Run(workers int, stopCh <-chan struct{}) {defer utilruntime.HandleCrash()defer e.queue.ShutDown()klog.Infof("Starting endpoint controller")defer klog.Infof("Shutting down endpoint controller")if !cache.WaitForNamedCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) {return}for i := 0; i < workers; i++ {go wait.Until(e.worker, e.workerLoopPeriod, stopCh)}go func() {defer utilruntime.HandleCrash()e.checkLeftoverEndpoints()}()<-stopCh

列出当前存在的endpoint,并添加他们到服务的队列中。 他们检查哪些endpoint还存在,但是service没有了。这些endpoint需要做一个删除的修正。

  1. 先列出所有的endpoint。

  2. 通过列出来的svc进行遍历。


  1. 如果能获取到这个key,就加入队列中。

LeaderElectionRecordAnnotationKey = “control-plane.alpha.kubernetes.io/leader”

// checkLeftoverEndpoints lists all currently existing endpoints and adds their
// service to the queue. This will detect endpoints that exist with no
// corresponding service; these endpoints need to be deleted. We only need to
// do this once on startup, because in steady-state these are detected (but
// some stragglers could have been left behind if the endpoint controller
// reboots).
func (e *Controller) checkLeftoverEndpoints() {list, err := e.endpointsLister.List(labels.Everything())if err != nil {utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err))return}for _, ep := range list {if _, ok := ep.Annotations[resourcelock.LeaderElectionRecordAnnotationKey]; ok {// when there are multiple controller-manager instances,// we observe that it will delete leader-election endpoints after 5min// and cause re-election// so skip the delete here// as leader-election only have endpoints without servicecontinue}key, err := controller.KeyFunc(ep)if err != nil {utilruntime.HandleError(fmt.Errorf("Unable to get key for endpoint %#v", ep))continue}e.queue.Add(key)}

worker运行一个worker线程,您可以根据需要并行运行任意数量,workqueue 保证它们最终不会同时处理相同的服务。(queue厉害,直呼内行!!!!)

// worker runs a worker thread that just dequeues items, processes them, and
// marks them done. You may run as many of these in parallel as you wish; the
// workqueue guarantees that they will not end up processing the same service
// at the same time.
func (e *Controller) worker() {for e.processNextWorkItem() {}


func (e *Controller) processNextWorkItem() bool {eKey, quit := e.queue.Get()if quit {return false}defer e.queue.Done(eKey)err := e.syncService(eKey.(string))e.handleErr(err, eKey)return true

sync service 分解流程




  service, err := e.serviceLister.Services(namespace).Get(name)if err != nil {if !errors.IsNotFound(err) {return err}// Delete the corresponding endpoint, as the service has been deleted.// TODO: Please note that this will delete an endpoint when a// service is deleted. However, if we're down at the time when// the service is deleted, we will miss that deletion, so this// doesn't completely solve the problem. See #6877.err = e.client.CoreV1().Endpoints(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})if err != nil && !errors.IsNotFound(err) {return err}e.triggerTimeTracker.DeleteService(namespace, name)return nil}


  if service.Spec.Selector == nil {// services without a selector receive no endpoints from this controller;// these services will receive the endpoints that are created out-of-band via the REST API.return nil}


或者Service Annotations "service.alpha.kubernetes.io/tolerate-unready-endpoints"是否为true(/t/T/True/TRUE/1),

如果为true,则表示tolerate Unready Endpoints,即Unready的Pods信息也会被加入该Service对应的Endpoints中。

注意,Annotations "service.alpha.kubernetes.io/tolerate-unready-endpoints"在Kubernetes 1.13中将被弃用,后续只使用.Spec.PublishNotReadyAddresses Field。

klog.V(5).Infof("About to update endpoints for service %q", key)pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())if err != nil {// Since we're getting stuff from a local cache, it is// basically impossible to get this error.return err}// If the user specified the older (deprecated) annotation, we have to respect it.tolerateUnreadyEndpoints := service.Spec.PublishNotReadyAddressesif v, ok := service.Annotations[TolerateUnreadyEndpointsAnnotation]; ok {b, err := strconv.ParseBool(v)if err == nil {tolerateUnreadyEndpoints = b} else {utilruntime.HandleError(fmt.Errorf("Failed to parse annotation %v: %v", TolerateUnreadyEndpointsAnnotation, err))}}// We call ComputeEndpointLastChangeTriggerTime here to make sure that the// state of the trigger time tracker gets updated even if the sync turns out// to be no-op and we don't update the endpoints object.endpointsLastChangeTriggerTime := e.triggerTimeTracker.ComputeEndpointLastChangeTriggerTime(namespace, service, pods)

后续开始遍历pod,如果这个pod没有pod ip,那么跳过这个继续下一个。 还有检测不容忍unready的并且有被删除的。

    if len(pod.Status.PodIP) == 0 {klog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)continue}if !tolerateUnreadyEndpoints && pod.DeletionTimestamp != nil {klog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name)continue}


func podToEndpointAddressForService(svc *v1.Service, pod *v1.Pod) (*v1.EndpointAddress, error) {var endpointIP stringipFamily := v1.IPv4Protocolif !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {// In a legacy cluster, the pod IP is guaranteed to be usableendpointIP = pod.Status.PodIP} else {//feature flag enabled and pods may have multiple IPsif len(svc.Spec.IPFamilies) > 0 {// controller is connected to an api-server that correctly sets IPFamiliesipFamily = svc.Spec.IPFamilies[0] // this works for headful and headless} else {// controller is connected to an api server that does not correctly// set IPFamilies (e.g. old api-server during an upgrade)if len(svc.Spec.ClusterIP) > 0 && svc.Spec.ClusterIP != v1.ClusterIPNone {// headful service. detect via service clusterIPif utilnet.IsIPv6String(svc.Spec.ClusterIP) {ipFamily = v1.IPv6Protocol}} else {// Since this is a headless service we use podIP to identify the family.// This assumes that status.PodIP is assigned correctly (follows pod cidr and// pod cidr list order is same as service cidr list order). The expectation is// this is *most probably* the case.// if the family was incorrectly indentified then this will be corrected once the// the upgrade is completed (controller connects to api-server that correctly defaults services)if utilnet.IsIPv6String(pod.Status.PodIP) {ipFamily = v1.IPv6Protocol}}}// find an ip that matches the familyfor _, podIP := range pod.Status.PodIPs {if (ipFamily == v1.IPv6Protocol) == utilnet.IsIPv6String(podIP.IP) {endpointIP = podIP.IPbreak}}}if endpointIP == "" {return nil, fmt.Errorf("failed to find a matching endpoint for service %v", svc.Name)}return &v1.EndpointAddress{IP:       endpointIP,NodeName: &pod.Spec.NodeName,TargetRef: &v1.ObjectReference{Kind:            "Pod",Namespace:       pod.ObjectMeta.Namespace,Name:            pod.ObjectMeta.Name,UID:             pod.ObjectMeta.UID,ResourceVersion: pod.ObjectMeta.ResourceVersion,},}, nil


    ep, err := podToEndpointAddressForService(service, pod)if err != nil {// this will happen, if the cluster runs with some nodes configured as dual stack and some as not// such as the case of an upgrade..klog.V(2).Infof("failed to find endpoint for service:%v with ClusterIP:%v on pod:%v with error:%v", service.Name, service.Spec.ClusterIP, pod.Name, err)continue}epa := *epif endpointutil.ShouldSetHostname(pod, service) {epa.Hostname = pod.Spec.Hostname}// Allow headless service not to have ports.if len(service.Spec.Ports) == 0 {if service.Spec.ClusterIP == api.ClusterIPNone {subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, tolerateUnreadyEndpoints)// No need to repack subsets for headless service without ports.}} else {for i := range service.Spec.Ports {servicePort := &service.Spec.Ports[i]portNum, err := podutil.FindPort(pod, servicePort)if err != nil {klog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)continue}epp := endpointPortFromServicePort(servicePort, portNum)var readyEps, notReadyEps intsubsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints)totalReadyEps = totalReadyEps + readyEpstotalNotReadyEps = totalNotReadyEps + notReadyEps}}


func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress,epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) {var readyEps intvar notReadyEps intports := []v1.EndpointPort{}if epp != nil {ports = append(ports, *epp)}   if tolerateUnreadyEndpoints || podutil.IsPodReady(pod) {subsets = append(subsets, v1.EndpointSubset{Addresses: []v1.EndpointAddress{epa},Ports:     ports,})readyEps++} else if shouldPodBeInEndpoints(pod) {klog.V(5).Infof("Pod is out of service: %s/%s", pod.Namespace, pod.Name)subsets = append(subsets, v1.EndpointSubset{NotReadyAddresses: []v1.EndpointAddress{epa},Ports:             ports,})notReadyEps++}return subsets, readyEps, notReadyEps

如果不是headless service,那么还需要如果是名字,就去查找pod中的名字,和协议,返回一个端口。如果直接是数字那就更好了,直接返回。

func FindPort(pod *v1.Pod, svcPort *v1.ServicePort) (int, error) {portName := svcPort.TargetPortswitch portName.Type {case intstr.String:name := portName.StrValfor _, container := range pod.Spec.Containers {for _, port := range container.Ports {if port.Name == name && port.Protocol == svcPort.Protocol {return int(port.ContainerPort), nil}}}case intstr.Int:return portName.IntValue(), nil}return 0, fmt.Errorf("no suitable port for manifest: %s", pod.UID)


func endpointPortFromServicePort(servicePort *v1.ServicePort, portNum int) *v1.EndpointPort {epp := &v1.EndpointPort{Name:        servicePort.Name,Port:        int32(portNum),Protocol:    servicePort.Protocol,AppProtocol: servicePort.AppProtocol,}return epp


  // See if there's actually an update here.currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)if err != nil {if errors.IsNotFound(err) {currentEndpoints = &v1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name:   service.Name,Labels: service.Labels,},}} else {return err}}
createEndpoints := len(currentEndpoints.ResourceVersion) == 0


  if !createEndpoints &&apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets) &&apiequality.Semantic.DeepEqual(compareLabels, service.Labels) &&capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) {klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)return nil}


  newEndpoints := currentEndpoints.DeepCopy()newEndpoints.Subsets = subsetsnewEndpoints.Labels = service.Labels


  if overCapacity(newEndpoints.Subsets) {newEndpoints.Annotations[v1.EndpointsOverCapacity] = "warning"} else {delete(newEndpoints.Annotations, v1.EndpointsOverCapacity)}


 if createEndpoints {// No previous endpoints, create them_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(context.TODO(), newEndpoints, metav1.CreateOptions{})} else {// Pre-existing_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(context.TODO(), newEndpoints, metav1.UpdateOptions{})}

