创建调度器

创建调度器主要是构造调度队列,调度缓存,调度框架和调度算法,以及获取待调度pod和将调度失败pod加入不可调度队列的两个函数。

//pkg/scheduler/factory.go
func (c *Configurator) create() (*Scheduler, error) {...return &Scheduler{//调度缓存SchedulerCache:  c.schedulerCache,//调度算法Algorithm:       algo,Extenders:       extenders,//调度框架Profiles:        profiles,//从调度队列获取得调度pod,如果没有则堵塞NextPod:         internalqueue.MakeNextPodFunc(podQueue),//调度失败后,将pod加入不可调度队列Error:           MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),StopEverything:  c.StopEverything,//调度队列SchedulingQueue: podQueue,}, nil
}

调度缓存,调度队列和调度框架在前面文章已经分析过了,还有一个核心功能调度算法,实现也比较简单,只有一个接口函数,如下

type ScheduleAlgorithm interface {Schedule(context.Context, []framework.Extender, framework.Framework, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
}

genericScheduler实现了ScheduleAlgorithm接口,其结构如下

type genericScheduler struct {//用来调用cache提供的UpdateSnapshot做快照cache                    internalcache.Cache//将快照信息保存到nodeInfoSnapshotnodeInfoSnapshot         *internalcache.Snapshot//每次调度pod时,并不是所有的node都会参与调度,而是通过此变量控制百分百percentageOfNodesToScore int32//如上面所说,用来保存下一次调度时从哪个node开始调度nextStartNodeIndex       int
}

MakeNextPodFunc返回一个函数,用来从调度队列pop待调度pod,如果没有则堵塞

// MakeNextPodFunc returns a function to retrieve the next pod from a given
// scheduling queue
func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {return func() *framework.QueuedPodInfo {podInfo, err := queue.Pop()if err == nil {klog.V(4).InfoS("About to try and schedule pod", "pod", klog.KObj(podInfo.Pod))return podInfo}klog.ErrorS(err, "Error while retrieving next pod from scheduling queue")return nil}
}

MakeDefaultErrorFunc返回一个函数,用来将调度失败的pod加入不可调度队列等待重新调度

// MakeDefaultErrorFunc construct a function to handle pod scheduler error
func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodLister, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) {return func(podInfo *framework.QueuedPodInfo, err error) {pod := podInfo.Podif err == ErrNoNodesAvailable {klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod))} else if fitError, ok := err.(*framework.FitError); ok {// Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently.podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePluginsklog.V(2).InfoS("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", err)} else if apierrors.IsNotFound(err) {klog.V(2).InfoS("Unable to schedule pod, possibly due to node not found; waiting", "pod", klog.KObj(pod), "err", err)if errStatus, ok := err.(apierrors.APIStatus); ok && errStatus.Status().Details.Kind == "node" {nodeName := errStatus.Status().Details.Name// when node is not found, We do not remove the node right away. Trying again to get// the node and if the node is still not found, then remove it from the scheduler cache._, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})if err != nil && apierrors.IsNotFound(err) {node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}if err := schedulerCache.RemoveNode(&node); err != nil {klog.V(4).InfoS("Node is not found; failed to remove it from the cache", "node", node.Name)}}}} else {klog.ErrorS(err, "Error scheduling pod; retrying", "pod", klog.KObj(pod))}//如果pod已经被删除了,也就没必要加入不可调度队列了// Check if the Pod exists in informer cache.cachedPod, err := podLister.Pods(pod.Namespace).Get(pod.Name)if err != nil {klog.InfoS("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", err)return}// In the case of extender, the pod may have been bound successfully, but timed out returning its response to the scheduler.// It could result in the live version to carry .spec.nodeName, and that's inconsistent with the internal-queued version.if len(cachedPod.Spec.NodeName) != 0 {klog.InfoS("Pod has been assigned to node. Abort adding it back to queue.", "pod", klog.KObj(pod), "node", cachedPod.Spec.NodeName)return}// As <cachedPod> is from SharedInformer, we need to do a DeepCopy() here.podInfo.PodInfo = framework.NewPodInfo(cachedPod.DeepCopy())//将pod加入不可调度队列if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podQueue.SchedulingCycle()); err != nil {klog.ErrorS(err, "Error occurred")}}
}

运行调度器

创建调度器后,执行其Run函数开始进行调度

// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) {sched.SchedulingQueue.Run()//周期调用scheduleOne调度podwait.UntilWithContext(ctx, sched.scheduleOne, 0)sched.SchedulingQueue.Close()
}

scheduleOne从待调度队列获取pod,对其执行调度算法Schedule,选择合适的node进行bind,如果调度失败还要执行抢占流程,并加入不可调度队列重新调度

// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne(ctx context.Context) {//NextPod()为MakeNextPodFunc返回的函数,用来从调度队列pop出一个待调度pod,如果没有待调度pod,则堵塞等待podInfo := sched.NextPod()// pod could be nil when schedulerQueue is closedif podInfo == nil || podInfo.Pod == nil {return}pod := podInfo.Podfwk, err := sched.frameworkForPod(pod)if err != nil {// This shouldn't happen, because we only accept for scheduling the pods// which specify a scheduler name that matches one of the profiles.klog.ErrorS(err, "Error occurred")return}//如下两种特殊情况可不必调度//a. pod的DeletionTimestamp自动不为空,即pod被删除了//b. pod处于假定状态if sched.skipPodSchedule(fwk, pod) {return}klog.V(3).InfoS("Attempting to schedule pod", "pod", klog.KObj(pod))// Synchronously attempt to find a fit for the pod.start := time.Now()//创建CycleStatestate := framework.NewCycleState()state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)// Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty.podsToActivate := framework.NewPodsToActivate()state.Write(framework.PodsToActivateKey, podsToActivate)schedulingCycleCtx, cancel := context.WithCancel(ctx)defer cancel()//执行调度算法,为pod选择一个合适的node,后面具体分析此函数scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, sched.Extenders, fwk, state, pod)if err != nil {// Schedule() may have failed because the pod would not fit on any host, so we try to// preempt, with the expectation that the next time the pod is tried for scheduling it// will fit due to the preemption. It is also possible that a different pod will schedule// into the resources that were preempted, but this is harmless.nominatedNode := ""//调度失败,并且失败原因为fitError,这种错误的话执行抢占有可能成功if fitError, ok := err.(*framework.FitError); ok {//没注册PostFilter插件if !fwk.HasPostFilterPlugins() {klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")} else {//执行PostFilter扩展点上的插件,目前只有抢占一个插件DefaultPreemption:pkg/scheduler/framework/plugins/defaultpreemption/defaultpreemption.go// Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)if status.Code() == framework.Error {klog.ErrorS(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)} else {klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)}//抢占成功,NominatedNodeName为提名node,即此pod很大可能会运行在此node上,为什么是可能?虽然抢占成功了但也有可能被其他高优先级的pod抢占if status.IsSuccess() && result != nil {nominatedNode = result.NominatedNodeName}}// Pod did not fit anywhere, so it is counted as a failure. If preemption// succeeds, the pod should get counted as a success the next time we try to// schedule it. (hopefully)metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))//没有可用的node,抢占也没用} else if err == ErrNoNodesAvailable {// No nodes available is counted as unschedulable rather than an error.metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))} else {klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))}//对调度失败pod的处理,后面会详细看一下此函数sched.recordSchedulingFailure(fwk, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)//返回,开始调度下一个podreturn}metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))//调度pod成功,告诉cache假定此pod运行在给定的node上,虽然还没有bind到node上,但这样我们继续调度下一个pod而不用等待bind成功assumedPodInfo := podInfo.DeepCopy()assumedPod := assumedPodInfo.Pod// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost//假定poderr = sched.assume(assumedPod, scheduleResult.SuggestedHost)if err != nil {metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))// This is most probably result of a BUG in retrying logic.// We report an error here so that pod scheduling can be retried.// This relies on the fact that Error will check if the pod has been bound// to a node and if so will not add it back to the unscheduled pods queue// (otherwise this would cause an infinite loop).sched.recordSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, "")return}//执行Reserve扩展点上的插件,比如保存PVif sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))// trigger un-reserve to clean up state associated with the reserved Podfwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")}sched.recordSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, "")return}//执行Permit扩展点上的插件,目前没插件runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() {var reason stringif runPermitStatus.IsUnschedulable() {metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))reason = v1.PodReasonUnschedulable} else {metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))reason = SchedulerError}// One of the plugins returned status different than success or wait.fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")}sched.recordSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, "")return}// At the end of a successful scheduling cycle, pop and move up Pods if needed.if len(podsToActivate.Map) != 0 {sched.SchedulingQueue.Activate(podsToActivate.Map)// Clear the entries after activation.podsToActivate.Map = make(map[string]*v1.Pod)}//执行异步bind流程// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).go func() {bindingCycleCtx, cancel := context.WithCancel(ctx)defer cancel()metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc()defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec()waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)if !waitOnPermitStatus.IsSuccess() {...}// Run "prebind" plugins.preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)if !preBindStatus.IsSuccess() {metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))// trigger un-reserve plugins to clean up state associated with the reserved Podfwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")} else {// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,// as the assumed Pod had occupied a certain amount of resources in scheduler cache.// TODO(#103853): de-duplicate the logic.sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)}sched.recordSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "")return}//执行bind插件err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state)if err != nil {metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))// trigger un-reserve plugins to clean up state associated with the reserved Podfwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil {klog.ErrorS(err, "scheduler cache ForgetPod failed")} else {// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,// as the assumed Pod had occupied a certain amount of resources in scheduler cache.// TODO(#103853): de-duplicate the logic.sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil)}sched.recordSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, "")} else {// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.if klog.V(2).Enabled() {klog.InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)}metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))// Run "postbind" plugins.fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)// At the end of a successful binding cycle, move up Pods if needed.if len(podsToActivate.Map) != 0 {sched.SchedulingQueue.Activate(podsToActivate.Map)// Unlike the logic in scheduling cycle, we don't bother deleting the entries// as `podsToActivate.Map` is no longer consumed.}}}()
}

CycleState
CycleState用在调度周期中,被插件用来保存/获取任意数据,比如PreFilter插件保存某些数据到CycleState中,Filter从CycleState获取这些数据。

type CycleState struct {mx      sync.RWMutexstorage map[StateKey]StateData// if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle.recordPluginMetrics bool
}//接口类型,可用来存储任意数据
type StateData interface {// Clone is an interface to make a copy of StateData. For performance reasons,// clone should make shallow copies for members (e.g., slices or maps) that are not// impacted by PreFilter's optional AddPod/RemovePod methods.Clone() StateData
}

recordSchedulingFailure
recordSchedulingFailure用来将失败的pod加入不可调度队列,等待下次重新调度,如果是抢占成功的pod,还要更新调度队列,并且向apiserver发起请求,更新pod.Status.NominatedNodeName为nominatedNode

// recordSchedulingFailure records an event for the pod that indicates the
// pod has failed to schedule. Also, update the pod condition and nominated node name if set.
func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatedNode string) {//Error为MakeDefaultErrorFunc返回的函数,将pod放入不可调度队列,等待下次重新调度sched.Error(podInfo, err)// Update the scheduling queue with the nominated pod information. Without// this, there would be a race condition between the next scheduling cycle// and the time the scheduler receives a Pod Update for the nominated pod.// Here we check for nil only for tests.if sched.SchedulingQueue != nil {//如果nominatedNode不为空,说明抢占成功,则将此pod添加到提名node nominatedNode的nominatedpod中sched.SchedulingQueue.AddNominatedPod(podInfo.PodInfo, nominatedNode)}pod := podInfo.Podmsg := truncateMessage(err.Error())fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)//如果nominatedNode不为空,即抢占成功,则设置pod.Status.NominatedNodeName为nominatedNode,向apiserver发起请求更新此字段if err := updatePod(sched.client, pod, &v1.PodCondition{Type:    v1.PodScheduled,Status:  v1.ConditionFalse,Reason:  reason,Message: err.Error(),}, nominatedNode); err != nil {klog.ErrorS(err, "Error updating pod", "pod", klog.KObj(pod))}
}

assume
assume将假定pod加入cache中,并将pod的信息聚合到node上。
同时如果此pod是抢占pod,还要将抢占信息从cache中删除

// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
// assume modifies `assumed`.
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {// Optimistically assume that the binding will succeed and send it to apiserver// in the background.// If the binding fails, scheduler will release resources allocated to assumed pod// immediately.assumed.Spec.NodeName = hostif err := sched.SchedulerCache.AssumePod(assumed); err != nil {klog.ErrorS(err, "Scheduler cache AssumePod failed")return err}// if "assumed" is a nominated pod, we should remove it from internal cacheif sched.SchedulingQueue != nil {sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)}return nil
}

Schedule
Schedule从cache获取最新的node信息,调度pod

// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})defer trace.LogIfLong(100 * time.Millisecond)//g.cache.UpdateSnapshot(g.nodeInfoSnapshot)对cache中的node信息做快照if err := g.snapshot(); err != nil {return result, err}trace.Step("Snapshotting scheduler cache and node infos done")//如果node个数为0,则调度失败,返回错误ErrNoNodesAvailableif g.nodeInfoSnapshot.NumNodes() == 0 {return result, ErrNoNodesAvailable}//获取通过Filter扩展点上插件的node列表feasibleNodes, diagnosis, err := g.findNodesThatFitPod(ctx, extenders, fwk, state, pod)if err != nil {return result, err}trace.Step("Computing predicates done")//全部过滤失败,返回FitError错误,可执行抢占if len(feasibleNodes) == 0 {return result, &framework.FitError{Pod:         pod,NumAllNodes: g.nodeInfoSnapshot.NumNodes(),Diagnosis:   diagnosis,}}//只有一个node通过filter,返回即可if len(feasibleNodes) == 1 {return ScheduleResult{SuggestedHost:  feasibleNodes[0].Name,EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),FeasibleNodes:  1,}, nil}//如果有多个node通过了filter,则执行优选,调用prioritizeNodes对他们执行Score插件进行打分priorityList, err := prioritizeNodes(ctx, extenders, fwk, state, pod, feasibleNodes)if err != nil {return result, err}//分数最高的node即为fitnodehost, err := g.selectHost(priorityList)trace.Step("Prioritizing done")return ScheduleResult{SuggestedHost:  host,EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),FeasibleNodes:  len(feasibleNodes),}, err
}

findNodesThatFitPod返回适合pod的node列表

func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {diagnosis := framework.Diagnosis{NodeToStatusMap:      make(framework.NodeToStatusMap),UnschedulablePlugins: sets.NewString(),}//执行PreFilter扩展点上的插件s := fwk.RunPreFilterPlugins(ctx, state, pod)//从快照中获取所有的nodeallNodes, err := g.nodeInfoSnapshot.NodeInfos().List()...//如果pod.Status.NominatedNodeName不为空,说明此pod之前被调度失败并执行抢占成功的pod,再次对NominatedNodeName执行Filter插件,//如果key成功,则不用再进行后续流程,直接返回NominatedNodeName即可if len(pod.Status.NominatedNodeName) > 0 && feature.DefaultFeatureGate.Enabled(features.PreferNominatedNode) {//evaluateNominatedNode调用findNodesThatPassFilters看是否能通过filterfeasibleNodes, err := g.evaluateNominatedNode(ctx, extenders, pod, fwk, state, diagnosis)if err != nil {klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)}// Nominated node passes all the filters, scheduler is good to assign this node to the pod.if len(feasibleNodes) != 0 {return feasibleNodes, diagnosis, nil}}//返回通过filter的nodefeasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes)if err != nil {return nil, diagnosis, err}...return feasibleNodes, diagnosis, nil
}

findNodesThatPassFilters选取指定个数的node执行Filter插件,并返回通过的node列表

// findNodesThatPassFilters finds the nodes that fit the filter plugins.
func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context,fwk framework.Framework,state *framework.CycleState,pod *v1.Pod,diagnosis framework.Diagnosis,nodes []*framework.NodeInfo) ([]*v1.Node, error) {//返回此次参与调度的node个数://如果总node个数小于100或者percentageOfNodesToScore为100,则返回总node个数,即所有node都参与调度,//否则按照percentageOfNodesToScore指定的百分比选取node个数numNodesToFind := g.numFeasibleNodesToFind(int32(len(nodes)))// Create feasible list with enough space to avoid growing it// and allow assigning.feasibleNodes := make([]*v1.Node, numNodesToFind)//没有配置Filter插件,则返回所有的可用node feasibleNodesif !fwk.HasFilterPlugins() {length := len(nodes)for i := range feasibleNodes {feasibleNodes[i] = nodes[(g.nextStartNodeIndex+i)%length].Node()}g.nextStartNodeIndex = (g.nextStartNodeIndex + len(feasibleNodes)) % lengthreturn feasibleNodes, nil}errCh := parallelize.NewErrorChannel()var statusesLock sync.Mutexvar feasibleNodesLen int32ctx, cancel := context.WithCancel(ctx)//参数i指的是要处理的数据的索引//比如要处理的node个数为160,启动16个协程进行处理,则每个协程要处理10个,//第一个协程处理0-15,第二个处理16-31,以此类推checkNode := func(i int) {// We check the nodes starting from where we left off in the previous scheduling cycle,// this is to make sure all nodes have the same chance of being examined across pods.nodeInfo := nodes[(g.nextStartNodeIndex+i)%len(nodes)]//执行 filter 插件status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)if status.Code() == framework.Error {errCh.SendErrorWithCancel(status.AsError(), cancel)return}if status.IsSuccess() {length := atomic.AddInt32(&feasibleNodesLen, 1)if length > numNodesToFind {cancel()atomic.AddInt32(&feasibleNodesLen, -1)} else {feasibleNodes[length-1] = nodeInfo.Node()}} else {statusesLock.Lock()diagnosis.NodeToStatusMap[nodeInfo.Node().Name] = statusdiagnosis.UnschedulablePlugins.Insert(status.FailedPlugin())statusesLock.Unlock()}}beginCheckNode := time.Now()statusCode := framework.Successdefer func() {// We record Filter extension point latency here instead of in framework.go because framework.RunFilterPlugins// function is called for each node, whereas we want to have an overall latency for all nodes per scheduling cycle.// Note that this latency also includes latency for `addNominatedPods`, which calls framework.RunPreFilterAddPod.metrics.FrameworkExtensionPointDuration.WithLabelValues(runtime.Filter, statusCode.String(), fwk.ProfileName()).Observe(metrics.SinceInSeconds(beginCheckNode))}()// Stops searching for more nodes once the configured number of feasible nodes// are found.//启动16个协程,并行执行checkNode,len(nodes)表示要处理的总个数fwk.Parallelizer().Until(ctx, len(nodes), checkNode)//处理的node个数为过滤成功的node个数和失败的node个数总和processedNodes := int(feasibleNodesLen) + len(diagnosis.NodeToStatusMap)//nextStartNodeIndex是下次调度时要处理的node的索引//对len(nodes)取模的作用是达到最大值后反转到0g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(nodes)feasibleNodes = feasibleNodes[:feasibleNodesLen]if err := errCh.ReceiveError(); err != nil {statusCode = framework.Errorreturn nil, err}return feasibleNodes, nil
}

prioritizeNodes对node执行score插件进行打分

func prioritizeNodes(ctx context.Context,extenders []framework.Extender,fwk framework.Framework,state *framework.CycleState,pod *v1.Pod,nodes []*v1.Node,
) (framework.NodeScoreList, error) {// If no priority configs are provided, then all nodes will have a score of one.// This is required to generate the priority list in the required format//如果没配置score插件,则设置每个node得分为1if len(extenders) == 0 && !fwk.HasScorePlugins() {result := make(framework.NodeScoreList, 0, len(nodes))for i := range nodes {result = append(result, framework.NodeScore{Name:  nodes[i].Name,Score: 1,})}return result, nil}//执行 prescore 插件preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)if !preScoreStatus.IsSuccess() {return nil, preScoreStatus.AsError()}//执行 score 插件scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)if !scoreStatus.IsSuccess() {return nil, scoreStatus.AsError()}//汇总每个node总得分result := make(framework.NodeScoreList, 0, len(nodes))for i := range nodes {result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})for j := range scoresMap {result[i].Score += scoresMap[j][i].Score}}...return result, nil
}

selectHost选择得分最高的node,如果多个node得分相同,则随机选择一个node

func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) {if len(nodeScoreList) == 0 {return "", fmt.Errorf("empty priorityList")}maxScore := nodeScoreList[0].Scoreselected := nodeScoreList[0].NamecntOfMaxScore := 1for _, ns := range nodeScoreList[1:] {if ns.Score > maxScore {maxScore = ns.Scoreselected = ns.NamecntOfMaxScore = 1} else if ns.Score == maxScore {cntOfMaxScore++if rand.Intn(cntOfMaxScore) == 0 {// Replace the candidate with probability of 1/cntOfMaxScoreselected = ns.Name}}}return selected, nil
}

kube-scheduler 调度流程相关推荐

  1. 从零开始入门 K8s | 调度器的调度流程和算法介绍

    作者 | 汪萌海(木苏)  阿里巴巴技术专家 关注"阿里巴巴云原生"公众号,回复关键词**"入门"**,即可下载从零入门 K8s 系列文章 PPT. 导读:Ku ...

  2. 第 19 课时:调度器的调度流程和算法介绍(木苏)

    本文将主要分享以下四个部分的内容: 调度流程 调度算法 如何配置调度器 如何扩展调度器 调度流程 调度流程概览 首先来看一下调度器流程概览图: 调度器启动时会通过配置文件 File,或者是命令行参数, ...

  3. CDH中hue的oozie调度流程shell

    CDH中hue的oozie调度流程shell 1.将shell脚本上传到对应的hdfs的路径下 2.创建工作流 3.编辑工作流 4.上传脚本 5.添加文件 6.指定执行oozie的时候使用的Hadoo ...

  4. K8s scheduler 调度:NodeName、NodeSelector与Taint

    1 前言 上篇介绍了k8s调度的预选和优选策略,K8s scheduler 调度:预选和优选策略. 本篇介绍三个影响pod调度的配置:NodeName.NodeSelector与Taint. 2 No ...

  5. K8S之Scheduler调度器

    K8S之Scheduler调度器 kubernetes Scheduler 简介 kubernetes Scheduler 运行在 master 节点,它的核心功能是监听 apiserver 来获取 ...

  6. 配置hadoop 使用fair scheduler调度器

    hadoop版本为cloudera hadoop cdh3u3 配置步骤为 1. 将$HADOOP_HOME/contrib/fairscheduler/hadoop-fairscheduler-0. ...

  7. hadoop3 Yarn容量(Capacity Scheduler)调度器和公平(Fair Scheduler)调度器配置

    文章目录 组件模块说明 容量调度器(Capacity Scheduler) 容量调度器特点 公平调度器(Fair Scheduler) 配置容量调度器案例 例子1 例子2 例子3 例子4 配置公平调度 ...

  8. 2、Scheduler调度过程

    一.回顾 public class Test {public static void main(String[] args) {Scheduler scheduler = new Scheduler( ...

  9. Spark的调度流程(任务调度+资源调度)

    文章目录 绪论 1.伪代码 2.小知识点普及 3.图解 4.流程介绍 5.Spark更多内容 绪论   阅读前请参考<Spark的任务调度>和<Spark的资源调度>,以便您更 ...

最新文章

  1. 单臂路由的过程模拟和数据分析
  2. visual studio code的使用
  3. C#中发送消息给指定的窗口到消息循环
  4. [Abp vNext 源码分析] - 4. 工作单元
  5. Java版 微信红包算法
  6. bzoj 1856: [Scoi2010]字符串(卡特兰数)
  7. 2016-1-8 windows 7下安装mysql及其配置和运用
  8. AFA人工鱼群算法函数优化求解实例C++(2020.11.4)
  9. html5人脸登录,基于HTML5 的人脸识别活体认证
  10. 基于STM32MP157的tf-a移植
  11. 图像处理:理想低通滤波器、butterworth滤波器(巴特沃斯)、高斯滤波器实现(python)
  12. 如何修改网卡的MAC地址?
  13. Ueditor详细配置说明文档
  14. echarts 地图自定义图标_echarts自定义图标的点击事件怎么添加
  15. Django Q查询
  16. JavaScript—节点
  17. Vue+element ui表单中省市区级联选择—v-distpicker/Cascader
  18. 使用Intel DCI/Inte System Debugger跟踪主机启动过程 中CSME/Bios信息
  19. 通过QQ号码获取用户性别
  20. 中国大陆肯德基餐厅全面停用塑料吸管;可口可乐与时尚包袋品牌Kipling推出联名系列 | 美通企业日报...

热门文章

  1. 百度互联网创业者俱乐部 搜索引擎优化指南
  2. Google SEO官方《搜索引擎优化指南》(6): 写好锚文本
  3. 微信和联通“闪婚” 首推微信“零流量”资费
  4. python 的取模问题
  5. 什么是敏捷?——从《功勋》中首颗卫星的故事说起
  6. 剑网3虚拟机,总是登不上,提示无法连接服务器。。。愁死了?
  7. SRPG游戏开发(二)第一章 FE4部分技术简述
  8. 使用 xbrowser 连接 redhatas5 linux 服务器,出现黑屏。故障分析及处理!
  9. STM32中读写flash
  10. 简单的python决策树案例