版本:v1.13.0

启动分析

kubernetes基础组件的入口均在cmd目录下,kube-schduler入口在scheduler.go下。
kubernetes所有的组件启动采用的均是command的形式,引用的是spf13类库。

func main() {rand.Seed(time.Now().UnixNano())//创建Cobra格式的Scheduler commandcommand := app.NewSchedulerCommand()// TODO: once we switch everything over to Cobra commands, we can go back to calling// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the// normalize func and add the go flag set by hand.//将配置中的‘_’字符转化为‘-’字符pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)// utilflag.InitFlags()logs.InitLogs()defer logs.FlushLogs()//执行Scheduler commandif err := command.Execute(); err != nil {fmt.Fprintf(os.Stderr, "%v\n", err)os.Exit(1)}
}

通过将配置文件转化成command的形式,调用Execute方法执行定义的Run方法

          Run: func(cmd *cobra.Command, args []string) {if err := runCommand(cmd, args, opts); err != nil {fmt.Fprintf(os.Stderr, "%v\n", err)os.Exit(1)}},

进入runCommand方法,通过完成配置的初始化,调用Run方法,进一步启动。

// runCommand runs the scheduler.func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {。。。// Get the completed configcc := c.Complete()// To help debugging, immediately log versionklog.Infof("Version: %+v", version.Get())// Apply algorithms based on feature gates.// TODO: make configurable?algorithmprovider.ApplyFeatureGates()// Configz registration.if cz, err := configz.New("componentconfig"); err == nil {cz.Set(cc.ComponentConfig)} else {return fmt.Errorf("unable to register configz: %s", err)}return Run(cc, stopCh)
}

Run方法分析

Run方法主要做了以下工作:
1、判断是否需要添加VolumeScheduling新特性;
2、初始化调度参数的相关结构体;
3、配置准备事件广播;
4、健康检查相关配置;
5、Metrics相关配置;
6、启动所有的Informer(kubernetes主要就是通过Informer和Workqueue机制监听事件的变化);
7、判断是否需要LeaderElection,决定最终的启动

调度入口

Run(cc, stopCh)->sched.Run()->sched.scheduleOne

scheduleOne方法分析

cheduleOne,顾名思义,每次调度一个Pod,整体文件如

func (sched *Scheduler) scheduleOne() {pod := sched.config.NextPod()// pod could be nil when schedulerQueue is closedif pod == nil {return}if pod.DeletionTimestamp != nil {sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)return}klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)// Synchronously attempt to find a fit for the pod.start := time.Now()// ljs:调度算法:最终的调度在generic_scheduler.go的Schedule方法// ljs:schedule()可能已经失败,因为pod不适合任何主机,// 因此我们尝试抢占,期望下次尝试pod进行调度时,由于抢占,它将适合。也可以安排不同的pod进入被抢占的资源,但这是无害的。suggestedHost, err := sched.schedule(pod) if err != nil {// ljs:当通过正常的调度流程如果没有找到合适的节点(主要是预选没有合适的节点),// 会判断需不需要进行抢占调度,具体的代码在pkg/scheduler/scheduler.go文件下,用到的方法preempt    if fitError, ok := err.(*core.FitError); ok {preemptionStartTime := time.Now()sched.preempt(pod, fitError)metrics.PreemptionAttempts.Inc()metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))metrics.PodScheduleFailures.Inc()} else {klog.Errorf("error selecting node for pod: %v", err)metrics.PodScheduleErrors.Inc()}return}metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))// 3.Pod与Node缓存,保证调度一直进行,不用等待每次绑定完成(绑定是一个耗时的过程)assumedPod := pod.DeepCopy()// ljs: 一个Pod被计划调度到机器A的事实被称为assume调度,即假定调度,// 这些调度安排被保存在特定的队列里,此时调度过程是能看到这个预安排的,因而影响到其他Pod的调度。allBound, err := sched.assumeVolumes(assumedPod, suggestedHost)if err != nil {klog.Errorf("error assuming volumes: %v", err)metrics.PodScheduleErrors.Inc()return}// assume modifies `assumedPod` by setting NodeName=suggestedHost    //ljs: 5. Pod对应的NodeName写上主机名,存入缓存err = sched.assume(assumedPod, suggestedHost)if err != nil {klog.Errorf("error assuming pod: %v", err)metrics.PodScheduleErrors.Inc()return}// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).//ljs: 6. 请求apiserver,异步处理最终的绑定,写入到etcdgo func() { // Bind volumes first before Podif !allBound {err := sched.bindVolumes(assumedPod)if err != nil {klog.Errorf("error binding volumes: %v", err)metrics.PodScheduleErrors.Inc()return}}err := sched.bind(assumedPod, &v1.Binding{ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},Target: v1.ObjectReference{Kind: "Node",Name: suggestedHost,},})metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))if err != nil {klog.Errorf("error binding pod: %v", err)metrics.PodScheduleErrors.Inc()} else {metrics.PodScheduleSuccesses.Inc()}}()
}

主要做了以下工作:
1、从队列中取出待调度的Pod
2、根据调度算法(预选+优选)获取待调度Pod匹配的主机,如果未获取到合适的主机,判断是否需要preempt,即Pod的抢占策略,为Pod分配节点
3、将当前Pod缓存起来,假定已经绑定成功(主要是为了将scheduling与binding过程分开)
4、判断是否需要VolumeScheduling特性继续添加Pod信息
5、Pod对应的NodeName写上主机名(调度的本质就是将为空的NodeName写上相应的Node的值)
6、启动新的binding协程,请求apiserver,异步处理最终的绑定,将结果写入到etcd中

调度算法

sched.scheduleOne -> sched.schedule(pod) -> generic_scheduler.go

func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))defer trace.LogIfLong(100 * time.Millisecond)// 对pod做一些基础检查,及检查pod对应的pvcif err := podPassesBasicChecks(pod, g.pvcLister); err != nil {return "", err}//取得node list列表nodes, err := nodeLister.List()if err != nil {return "", err}...trace.Step("Computing predicates")startPredicateEvalTime := time.Now()//ljs:调度算法预选filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)if err != nil {return "", err}...metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)//优选算法调用的接口,执行PrioritizeNodes方法对通过预选的node进行优选打分priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders) //ljs:优选算法if err != nil {return "", err}metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))metrics.SchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))trace.Step("Selecting host")//最后找出一个优选分数最高的node,如果有node优选分数一样,则随机返回一个分数最高的nodereturn g.selectHost(priorityList)
}

如果未自定义调度器,则启用默认的调度器genericScheduler,genericScheduler的Schedule方法如下:

  • 对pod做一些基础检查,及检查pod对应的pvc
  • 取得node list列表
  • 执行genericScheduler.findNodesThatFit方法进行预选
  • 执行PrioritizeNodes方法对通过预选的node进行优选打分
  • 最后找出一个优选分数最高的node,如果有node优选分数一样,则随机返回一个分数最高的node

预选

预选算法调用的接口是findNodesThatFit,主要代码如下:

func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {var filtered []*v1.NodefailedPredicateMap := FailedPredicateMap{}// 该if表示,如果没有配置预选的算法,则直接将所有的Node写入匹配数组if len(g.predicates) == 0 {filtered = nodes} else {allNodes := int32(g.cache.NodeTree().NumNodes)// numFeasibleNodesToFind保证一次性不用返回过多的Node数量,避免数组过大numNodesToFind := g.numFeasibleNodesToFind(allNodes)// Create filtered list with enough space to avoid growing it// and allow assigning.filtered = make([]*v1.Node, numNodesToFind)errs := errors.MessageCountMap{}var (predicateResultLock sync.MutexfilteredLen         int32equivClass          *equivalence.Class)ctx, cancel := context.WithCancel(context.Background())// We can use the same metadata producer for all nodes.meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)if g.equivalenceCache != nil {// getEquivalenceClassInfo will return immediately if no equivalence pod foundequivClass = equivalence.NewClass(pod)}// checkNode处理预选策略checkNode := func(i int) {var nodeCache *equivalence.NodeCache// 每次获取Node信息nodeName := g.cache.NodeTree().Next()if g.equivalenceCache != nil {nodeCache = g.equivalenceCache.LoadNodeCache(nodeName)}// 最终实现调度判断的接口fits, failedPredicates, err := podFitsOnNode(pod,meta,g.cachedNodeInfoMap[nodeName],g.predicates,nodeCache,g.schedulingQueue,g.alwaysCheckAllPredicates,equivClass,)if err != nil {predicateResultLock.Lock()errs[err.Error()]++predicateResultLock.Unlock()return}if fits {// 保证获取的Node数量在numNodesToFind内length := atomic.AddInt32(&filteredLen, 1)if length > numNodesToFind {// 通知ParallelizeUntil任务结束cancel()atomic.AddInt32(&filteredLen, -1)} else {filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()}} else {predicateResultLock.Lock()failedPredicateMap[nodeName] = failedPredicatespredicateResultLock.Unlock()}}// Stops searching for more nodes once the configured number of feasible nodes// are found.// 并行处理多个Node的checkNode工作workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)filtered = filtered[:filteredLen]if len(errs) > 0 {return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)}}//ljs:如果配置了调度的扩展算法,需要继续对筛选后的Pod与Node进行再一次的筛选,获取最终匹配的Node列表。if len(filtered) > 0 && len(g.extenders) != 0 {for _, extender := range g.extenders {if !extender.IsInterested(pod) {continue}filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap)if err != nil {if extender.IsIgnorable() {klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",extender, err)continue} else {return []*v1.Node{}, FailedPredicateMap{}, err}}for failedNodeName, failedMsg := range failedMap {if _, found := failedPredicateMap[failedNodeName]; !found {failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{}}failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))}filtered = filteredListif len(filtered) == 0 {break}}}return filtered, failedPredicateMap, nil
}

findNodesThatFit主要做了几个操作
1、判断是否配置了预选算法,如果没有,直接返回Node列表信息;
2、如果配置了预选算法,则同时对多个Node(最多一次16个)调用checkNode方法,判断Pod是否可以调度在该Node上;
3、预选筛选之后,如果配置了调度的扩展算法,需要继续对筛选后的Pod与Node进行再一次的筛选,获取最终匹配的Node列表。

这里有一个注意的地方,获取匹配的Node节点数量时,通过numFeasibleNodesToFind函数限制了每次获取的节点数,最大值为100。这样当匹配到相应的Node数时,checkNode方法不再调用。
这里个人觉着有些问题,当Node数量足够多的时候(大于100),由于numFeasibleNodesToFind限制了Node数量,导致并不能扫描到所有的Node,这样可能导致最合适的Node没有被扫描到,匹配到的只是较优先的Node,则最终调度到的Node也不是最合适的Node,只是相较于比较合适。

预选调度实际接口:podFitsOnNode

最终实现调度判断的接口是podFitsOnNode。

这里的逻辑是从一个for循环开始的,关于这个2次循环的含义代码里有很长的一段注释,我们先看一下注释里怎么说的(这里可以多看几遍体会一下):
出于某些原因考虑我们需要运行两次predicate. 如果node上有更高或者相同优先级的“指定pods”(这里的“指定pods”指的是通过schedule计算后指定要跑在一个node上但是还未真正运行到那个node上的pods),我们将这些pods加入到meta和nodeInfo后执行一次计算过程。如果这个过程所有的predicates都成功了,我们再假设这些“指定pods”不会跑到node上再运行一次。第二次计算是必须的,因为有一些predicates比如pod亲和性,也许在“指定pods”没有成功跑到node的情况下会不满足。如果没有“指定pods”或者第一次计算过程失败了,那么第二次计算不会进行。我们在第一次调度的时候只考虑相等或者更高优先级的pods,因为这些pod是当前pod必须“臣服”的,也就是说不能够从这些pod中抢到资源,这些pod不会被当前pod“抢占”;这样当前pod也就能够安心从低优先级的pod手里抢资源了。新pod在上述2种情况下都可调度基于一个保守的假设:资源和pod反亲和性等的predicate在“指定pods”被处理为Running时更容易失败;pod亲和性在“指定pods”被处理为Not Running时更加容易失败。我们不能假设“指定pods”是Running的因为它们当前还没有运行,而且事实上,它们确实有可能最终又被调度到其他node上了。

func podFitsOnNode(pod *v1.Pod,meta algorithm.PredicateMetadata,info *schedulercache.NodeInfo,predicateFuncs map[string]algorithm.FitPredicate,nodeCache *equivalence.NodeCache,queue internalqueue.SchedulingQueue,alwaysCheckAllPredicates bool,equivClass *equivalence.Class,
) (bool, []algorithm.PredicateFailureReason, error) {var (eCacheAvailable  boolfailedPredicates []algorithm.PredicateFailureReason)podsAdded := false// ljs:第一次循环,将所有的优先级比较高或者相等的nominatedPods加入到Node中,// 更新meta和nodeInfo。nominatedPods是指已经分配到Node内但是还没有真正运行起来的Pods。// 这样做可以保证优先级高的Pods不会因为现在的Pod的加入而导致调度失败;// ljs:第一次调度,根据NominatedPods更新meta和nodeInfo信息,pod根据更新后的信息去预选// ljs:第二次调度,meta和nodeInfo信息不变,保证pod不完全依赖于NominatedPods//(主要考虑到pod亲和性之类的,比如某个nominatedPod没有在这个节点上运行,predicate可能会失败)// ljs:不将nominatedPods加入到Node内。这样的原因是因为考虑到像Pod affinity策略的话,如果当前的Pod依赖的是nominatedPods,// 这样就会有问题。因为,nominatedPods不能保证一定可以调度到相应的Node上。for i := 0; i < 2; i++ {metaToUse := metanodeInfoToUse := infoif i == 0 {podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)} else if !podsAdded || len(failedPredicates) != 0 {break}// Bypass eCache if node has any nominated pods.// TODO(bsalamat): consider using eCache and adding proper eCache invalidations// when pods are nominated or their nominations change.eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAddedfor predicateID, predicateKey := range predicates.Ordering() {var (fit     boolreasons []algorithm.PredicateFailureReasonerr     error)//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric// predicate相关函数在sc, err := configurator.CreateFromConfig(*policy)进行注册,// 具体代码在:pkg/scheduler/algorithmprovider/defaults/defaults.go下,defaultPredicates 方法返回的是默认的一系列预选算法。// ljs:如果当前pod在之前有一个等价pod,则直接从缓存中返回相应的上一次结果(一个节点上有多个相同的pod要发布)if predicate, exist := predicateFuncs[predicateKey]; exist {if eCacheAvailable {fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, predicateID, pod, metaToUse, nodeInfoToUse, equivClass)} else { //ljs:直接调用预选算法fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)}if err != nil {return false, []algorithm.PredicateFailureReason{}, err}if !fit {// eCache is available and valid, and predicates result is unfit, record the fail reasonsfailedPredicates = append(failedPredicates, reasons...)// if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.if !alwaysCheckAllPredicates {klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +"evaluation is short circuited and there are chances " +"of other predicates failing as well.")break}}}}}return len(failedPredicates) == 0, failedPredicates, nil
}

podFitsOnNode最难理解的就是for循环了两次,根据注释,大致意思如下:
1、第一次循环,将所有的优先级比较高或者相等的nominatedPods加入到Node中,更新meta和nodeInfo。nominatedPods是指已经分配到Node内但是还没有真正运行起来的Pods。这样做可以保证优先级高的Pods不会因为现在的Pod的加入而导致调度失败;
2、第二次调度,不将nominatedPods加入到Node内。这样的原因是因为考虑到像Pod affinity策略的话,如果当前的Pod依赖的是nominatedPods,这样就会有问题。因为,nominatedPods不能保证一定可以调度到相应的Node上。

之后就是根据预选的调度算法,一个个判断是否都满足。这里有个小优化,如果当前的Pod在之前有一个等价的Pod,则直接从缓存返回相应上一次的结果。如果成功则不用继续调用预选算法。但是,对于缓存部分,我个人有些疑问,可能对于上一个Pod缓存的结果是成功的,但是本次调度,Node信息发生变化了,缓存结果是成功的,但是实际上可能并不一定会成功。

默认预选调度算法

本节主要说的是默认的调度算法。默认的代码在pkg/scheduler/algorithmprovider/defaults/defaults.go下,defaultPredicates方法返回的是默认的一系列预选算法。与预选相关的代码都在pkg/scheduler/algorithm/predicates/predicates.go下

var (predicatesOrdering = []string{CheckNodeConditionPred, CheckNodeUnschedulablePred,GeneralPred, HostNamePred, PodFitsHostPortsPred,MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred, MaxCSIVolumeCountPred,MaxAzureDiskVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,CheckNodeMemoryPressurePred, CheckNodePIDPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred}
)

对于每一个调度算法,有一个优先级Order,官网有详细的描述。
调度方法基本一致,参数为(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo),返回值为(bool, []algorithm.PredicateFailureReason, error)。
官网地址:
https://github.com/kubernetes/community/blob/master/contributors/design-proposals/scheduling/predicates-ordering.md

当然这个顺序是可以被配置文件覆盖的,用户可以使用类似这样的配置:

{"kind" : "Policy","apiVersion" : "v1","predicates" : [{"name" : "PodFitsHostPorts", "order": 2},{"name" : "PodFitsResources", "order": 3},{"name" : "NoDiskConflict", "order": 5},{"name" : "PodToleratesNodeTaints", "order": 4},{"name" : "MatchNodeSelector", "order": 6},{"name" : "PodFitsHost", "order": 1}],"priorities" : [{"name" : "LeastRequestedPriority", "weight" : 1},{"name" : "BalancedResourceAllocation", "weight" : 1},{"name" : "ServiceSpreadingPriority", "weight" : 1},{"name" : "EqualPriority", "weight" : 1}],"hardPodAffinitySymmetricWeight" : 10}

具体的predicate函数

一直在讲predicate,那么predicate函数到底长什么样子呢,我们从具体的实现函数找一个看一下。开始讲design的时候提到过predicate的实现在pkg/scheduler/algorithm/predicates/predicates.go文件中,先看一眼Structure吧:

func NoDiskConflict(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {for _, v := range pod.Spec.Volumes {for _, ev := range nodeInfo.Pods() {if isVolumeConflict(v, ev) {return false, []algorithm.PredicateFailureReason{ErrDiskConflict}, nil}}}return true, nil, nil}

我们知道predicate函数的特点,这样就很好在这个一千六百多行go文件中寻找predicate函数了。像上面这个NoDiskConflict()函数,参数是pod、meta和nodeinfo,很明显是FitPredicate类型的,标准的predicate函数。这个函数的实现也特别简单,遍历pod的Volumes,然后对于pod的每一个Volume,遍历node上的每个pod,看是否和当前podVolume冲突。如果不fit就返回false加原因;如果fit就返回true,很清晰。

优选

func (g *genericScheduler) Schedule()函数在
预选完成之后会得到一个Node的数组。如果预选合适的节点数大于1,则需要调用优选算法根据评分获取最优的节点。
优选算法调用的接口是PrioritizeNodes。

优选调度算法

优选算法调用的接口是PrioritizeNodes,使用与预选类似的多任务同步调用方式,采用MapReduce的思想,Map根据不同的优选算法获取对某一Node的值,根据Reduce统计最终的结果。

  • PrioritizeNodes要做的事情是给已经通过predicate的nodes赋上一个分值,从而抉出一个最优node用于运行当前pod.
  • PrioritizeNodes通过并发调用一个个priority函数来给node排优先级。每一个priority函数会给一个1-10之间的分值,0最低10最高。每一个priority函数可以有自己的权重,单个函数返回的分值*权重后得到一个加权分值,最终所有的加权分值加在一起就是这个node的最终分值。
func PrioritizeNodes(pod *v1.Pod,nodeNameToInfo map[string]*schedulercache.NodeInfo,meta interface{},priorityConfigs []algorithm.PriorityConfig,nodes []*v1.Node,extenders []algorithm.SchedulerExtender,
) (schedulerapi.HostPriorityList, error) {// If no priority configs are provided, then the EqualPriority function is applied// This is required to generate the priority list in the required format// ljs:没有优选配置,默认每个节点等权重if len(priorityConfigs) == 0 && len(extenders) == 0 {result := make(schedulerapi.HostPriorityList, 0, len(nodes))for i := range nodes {hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])if err != nil {return nil, err}result = append(result, hostPriority)}return result, nil}var (mu   = sync.Mutex{}wg   = sync.WaitGroup{}errs []error)appendError := func(err error) {mu.Lock()defer mu.Unlock()errs = append(errs, err)}// 最后一个变量results也不难理解,类型是[]schedulerapi.HostPriorityList,这里需要注意这个类型// 的作用,它保存的是所有算法作用所有node之后得到的结果集,相当于一个二维数组,每个格子是1个算法// 作用于1个节点的结果,一行也就是1个算法作用于所有节点的结果;一行展成一个二维就是所有算法作用于所有节点;// 假设有3中优先级配置:result:=[[0]:[{1,1},{2,1},{3,1},...], [1]:[{1,1},{2,1},{3,1},...], [3]:[{1,1},{2,1},{3,1},...]]results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))// DEPRECATED: we can remove this when all priorityConfigs implement the// Map-Reduce pattern.for i := range priorityConfigs {if priorityConfigs[i].Function != nil {wg.Add(1)go func(index int) {defer wg.Done()var err error// ljs:求出每个节点在配置index上的得分results[index], err = priorityConfigs[index].Function(pod, nodeNameToInfo, nodes)if err != nil {appendError(err)}}(i)} else {results[i] = make(schedulerapi.HostPriorityList, len(nodes))}}// 这里的index是node的序号,和上面的index不同,上面的index是指priorityConfigs的序号workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {nodeInfo := nodeNameToInfo[nodes[index].Name]for i := range priorityConfigs {// 这个for循环遍历的是所有的优选配置,如果有老Fun就跳过,新逻辑就继续;if priorityConfigs[i].Function != nil {// 因为前面old已经运行过了,也就是priorityConfigs[i].Function// 这里是两种计算result的方法,选择其中一种就行,Function是old,map是新方法continue}var err errorresults[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)if err != nil {appendError(err)results[i][index].Host = nodes[index].Name}}})for i := range priorityConfigs {if priorityConfigs[i].Reduce == nil {continue}wg.Add(1)go func(index int) {defer wg.Done()if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {appendError(err)}if klog.V(10) {for _, hostPriority := range results[index] {klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)}}}(i)}// Wait for all computations to be finished.wg.Wait()if len(errs) != 0 {return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)}// Summarize all scores.//result用于存储每个node的Score,注意区别result和resultsresult := make(schedulerapi.HostPriorityList, 0, len(nodes))for i := range nodes {// 初始化节点i的得分score为0result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})for j := range priorityConfigs {// 遍历所有优先级算法,每个算法有一个加权得分,累加就可以得到节点i的最终得分result[i].Score += results[j][i].Score * priorityConfigs[j].Weight}}if len(extenders) != 0 && nodes != nil {combinedScores := make(map[string]int, len(nodeNameToInfo))for i := range extenders {if !extenders[i].IsInterested(pod) {continue}wg.Add(1)go func(extIndex int) {defer wg.Done()prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)if err != nil {// Prioritization errors from extender can be ignored, let k8s/other extenders determine the prioritiesreturn}mu.Lock()for i := range *prioritizedList {host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Scoreif klog.V(10) {klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, extenders[extIndex].Name(), score)}combinedScores[host] += score * weight}mu.Unlock()}(i)}// wait for all go routines to finishwg.Wait()for i := range result {result[i].Score += combinedScores[result[i].Host]}}if klog.V(10) {for i := range result {klog.Infof("Host %s => Score %d", result[i].Host, result[i].Score)}}return result, nil
}

这段代码有两段代码感觉是重复了,代码如下:

  代码1// DEPRECATED: we can remove this when all priorityConfigs implement the// Map-Reduce pattern.for i := range priorityConfigs {if priorityConfigs[i].Function != nil {wg.Add(1)go func(index int) {。。。// ljs:求出每个节点在配置index上的得分results[index], err = priorityConfigs[index].Function(pod, nodeNameToInfo, nodes)。。。}(i)} else {results[i] = make(schedulerapi.HostPriorityList, len(nodes))}}// 代码2workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {nodeInfo := nodeNameToInfo[nodes[index].Name]for i := range priorityConfigs {// 这个for循环遍历的是所有的优选配置,如果有老Fun就跳过,新逻辑就继续;if priorityConfigs[i].Function != nil {// 因为前面old已经运行过了,也就是priorityConfigs[i].Function// 这里是两种计算result的方法,选择其中一种就行,Function是old,map是新方法continue}var err errorresults[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)if err != nil {appendError(err)results[i][index].Host = nodes[index].Name}}})// 分割——————————    type PriorityConfig struct {Name   stringMap    PriorityMapFunctionReduce PriorityReduceFunction// TODO: Remove it after migrating all functions to// Map-Reduce pattern.Function PriorityFunctionWeight   int
}

上述两段代码,其实做的是同一件事,就是遍历每个优先级算法函数func,计算每个节点在这个func上的得分。只是使用的方式不一样,我们也可以从PriorityConfig的定义可以看出来,PriorityConfig.Map会取代PriorityFunction。

优先调度算法实例

优选调度算法默认代码在pkg/scheduler/algorithmprovider/defaults/defaults.go下,defaultPriorities方法返回的是默认的一系列优选算法,通过工厂模式处理相应的优选算法,代码如下:

func defaultPriorities() sets.String {return sets.NewString(// spreads pods by minimizing the number of pods (belonging to the same service or replication controller) on the same node.factory.RegisterPriorityConfigFactory("SelectorSpreadPriority",factory.PriorityConfigFactory{MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)},Weight: 1,},),// pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.)// as some other pods, or, conversely, should not be placed in the same topological domain as some other pods.factory.RegisterPriorityConfigFactory("InterPodAffinityPriority",factory.PriorityConfigFactory{Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {return priorities.NewInterPodAffinityPriority(args.NodeInfo, args.NodeLister, args.PodLister, args.HardPodAffinitySymmetricWeight)},Weight: 1,},),// Prioritize nodes by least requested utilization.factory.RegisterPriorityFunction2("LeastRequestedPriority", priorities.LeastRequestedPriorityMap, nil, 1),// Prioritizes nodes to help achieve balanced resource usagefactory.RegisterPriorityFunction2("BalancedResourceAllocation", priorities.BalancedResourceAllocationMap, nil, 1),// Set this weight large enough to override all other priority functions.// TODO: Figure out a better way to do this, maybe at same time as fixing #24720.factory.RegisterPriorityFunction2("NodePreferAvoidPodsPriority", priorities.CalculateNodePreferAvoidPodsPriorityMap, nil, 10000),// Prioritizes nodes that have labels matching NodeAffinityfactory.RegisterPriorityFunction2("NodeAffinityPriority", priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1),// Prioritizes nodes that marked with taint which pod can tolerate.factory.RegisterPriorityFunction2("TaintTolerationPriority", priorities.ComputeTaintTolerationPriorityMap, priorities.ComputeTaintTolerationPriorityReduce, 1),// ImageLocalityPriority prioritizes nodes that have images requested by the pod present.factory.RegisterPriorityFunction2("ImageLocalityPriority", priorities.ImageLocalityPriorityMap, nil, 1),)
}

Function和Map-Reduce实例分析

InterPodAffinityPriority(Function)

这个算法做的是Pod间亲和性优选,也就是亲和pod越多的节点分值越高,反亲和pod越多分值越低。我们撇开具体的亲和性计算规则,从优选函数的形式上看一下这段代码的逻辑:

// 代码位置: pkg/scheduler/algorithm/priorities/interpod_affinity.go:119func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {affinity := pod.Spec.Affinity// 是否有亲和性约束;hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil// 是否有反亲和性约束;hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil// 这里有一段根据亲和性和反亲和性来计算一个node上匹配的pod数量的逻辑,我们先跳过这些逻辑,从优选算法实现的角度看这个算法的架子;// 当遍历完所有的node之后,可以得到1个最高分和1个最低分,分别记为maxCount和minCount;for _, node := range nodes {if pm.counts[node.Name] > maxCount {maxCount = pm.counts[node.Name]}if pm.counts[node.Name] < minCount {minCount = pm.counts[node.Name]}}// 这个result类型和前面看到的一样,都是存储单个算法的计算结果的;result := make(schedulerapi.HostPriorityList, 0, len(nodes))for _, node := range nodes {fScore := float64(0)// 如果分差大于0,也就是说不是所有的node都一样的情况,需要对分值做一个处理;if (maxCount - minCount) > 0 {// MaxPriority定义的是优选最高分10,第二个因数是当前node的count-最小count,// 然后除以(maxCount - minCount);举个例子,当前node的计算结果是5,最大count是20,最小// count是-3,那么这里就是10*[5-(-3)/20-(-3)]// 这个计算的结果显然会在[0-10]之间;fScore = float64(schedulerapi.MaxPriority) * ((pm.counts[node.Name] - minCount) / (maxCount - minCount))}// 如果分差不大于0,这时候int(fScore)也就是0,对于各个node的结果都是0;result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})}return result, nil}

如上,我们可以发现最终这个函数计算出了每个node的分值,这个分值在[0-10]之间。所以说到底Function做的事情就是根据一定的规则给每个node赋一个分值,这个分值要求在[0-10]之间,然后把这个HostPriorityList返回就行。

CalculateNodeAffinityPriorityMap(Map)

这个算法和上一个类似,上一个是Pod的Affinity,这个是Node的Affinity;我们来看代码:

// 代码位置:pkg/scheduler/algorithm/priorities/node_affinity.go:34func CalculateNodeAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {node := nodeInfo.Node()if node == nil {return schedulerapi.HostPriority{}, fmt.Errorf("node not found")}// default is the podspec.affinity := pod.Spec.Affinityif priorityMeta, ok := meta.(*priorityMetadata); ok {// We were able to parse metadata, use affinity from there.affinity = priorityMeta.affinity}var count int32if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {// Match PreferredDuringSchedulingIgnoredDuringExecution term by term.for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]if preferredSchedulingTerm.Weight == 0 {continue}nodeSelector, err := v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)if err != nil {return schedulerapi.HostPriority{}, err}if nodeSelector.Matches(labels.Set(node.Labels)) {count += preferredSchedulingTerm.Weight}}}return schedulerapi.HostPriority{Host:  node.Name,Score: int(count),}, nil}

撇开具体的亲和性计算细节,我们可以发现这个的count没有特定的规则,可能会加到10以上;另外这里的返回值是HostPriority类型,前面的Function返回了HostPriorityList类型。

map函数

pkg/scheduler/algorithm/priorities/selector_spreading.go:221func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {var firstServiceSelector labels.Selectornode := nodeInfo.Node()if node == nil {return schedulerapi.HostPriority{}, fmt.Errorf("node not found")}priorityMeta, ok := meta.(*priorityMetadata)if ok {firstServiceSelector = priorityMeta.podFirstServiceSelector} else {firstServiceSelector = getFirstServiceSelector(pod, s.serviceLister)}// 查找给定node在给定namespace下符合selector的pod,返回值是[]*v1.PodmatchedPodsOfNode := filteredPod(pod.Namespace, firstServiceSelector, nodeInfo)return schedulerapi.HostPriority{Host:  node.Name,// 返回值中Score设置成上面找到的pod的数量Score: int(len(matchedPodsOfNode)),}, nil}

这个函数比较短,可以看到在指定node上查询到匹配selector的pod越多,分值就越高。假设找到了20个,那么这里的分值就是20;假设找到的是2,那这里的分值就是2.

CalculateNodeAffinityPriorityReduce(Reduce)

和上面这个Map对应的Reduce函数其实没有单独实现,通过NormalizeReduce函数做了一个通用的Reduce处理:


pkg/scheduler/algorithm/priorities/node_affinity.go:77var CalculateNodeAffinityPriorityReduce = NormalizeReduce(schedulerapi.MaxPriority, false)pkg/scheduler/algorithm/priorities/reduce.go:29func NormalizeReduce(maxPriority int, reverse bool) algorithm.PriorityReduceFunction {return func(_ *v1.Pod,_ interface{},_ map[string]*schedulercache.NodeInfo,// 注意到这个result是HostPriorityList,对应1个算法N个node的结果集result schedulerapi.HostPriorityList) error {var maxCount int// 遍历result将最高的Score赋值给maxCount;for i := range result {if result[i].Score > maxCount {maxCount = result[i].Score}}if maxCount == 0 {if reverse {for i := range result {result[i].Score = maxPriority}}return nil}for i := range result {score := result[i].Score// 举个例子:10*(5/20)score = maxPriority * score / maxCountif reverse {// 如果score是3,得到7;如果score是4,得到6,结果反转;score = maxPriority - score}result[i].Score = score}return nil}}

map-reduce小节

  • Function:一个算法一次性计算出所有node的Score,这个Score的范围是规定的[0-10];
  • Map-Reduce:一个Map算法计算1个node的Score,这个Score可以灵活处理,可能是20,可能是-3;Map过程并发进行;最终得到的结果result通过Reduce归约,将这个算法对应的所有node的分值归约为[0-10];

抢占调度

sched.scheduleOne -> sched.preempt -> func (g *genericScheduler) Preempt
详见k8s源码分析--kube-scheduler源码(二)

参考

https://juejin.im/post/5c889c2e5188257df700a732

https://www.kubernetes.org.cn/5122.html

https://www.cnblogs.com/cloudgeek/p/10561221.html

https://www.kubernetes.org.cn/5221.html

http://tang.love/2018/07/24/learning-kubernetes-source-code/

https://www.huweihuang.com/article/source-analysis/kube-scheduler/registerAlgorithmProvider/#1-applyfeaturegates

https://my.oschina.net/u/3797264/blog/2615842

https://my.oschina.net/jxcdwangtao/blog/1594348

k8s源码分析--kube-scheduler源码(一)相关推荐

  1. k8s client-go源码分析 informer源码分析(3)-Reflector源码分析

    k8s client-go源码分析 informer源码分析(3)-Reflector源码分析 1.Reflector概述 Reflector从kube-apiserver中list&watc ...

  2. spring源码分析第一天------源码分析知识储备

    spring源码分析第一天------源码分析知识储备 Spring源码分析怎么学? 1.环境准备: 2.思路    看:是什么? 能干啥    想:为什么?     实践:怎么做?         ...

  3. Java集合类框架源码分析 之 LinkedList源码解析 【4】

    上一篇介绍了ArrayList的源码分析[点击看文章],既然ArrayList都已经做了介绍,那么作为他同胞兄弟的LinkedList,当然必须也配拥有姓名! Talk is cheap,show m ...

  4. 【转】Spark源码分析之-scheduler模块

    原文地址:http://jerryshao.me/architecture/2013/04/21/Spark%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E4%B9%8B- ...

  5. android view 源码分析,Android ViewPager源码详细分析

    1.问题 由于Android Framework源码很庞大,所以读源码必须带着问题来读!没有问题,创造问题再来读!否则很容易迷失在无数的方法与属性之中,最后无功而返. 那么,关于ViewPager有什 ...

  6. 【Android 插件化】VirtualApp 源码分析 ( 启动应用源码分析 | HomePresenterImpl 启动应用方法 | VirtualCore 启动插件应用最终方法 )

    文章目录 一.启动应用源码分析 1.HomeActivity 启动应用点击方法 2.HomePresenterImpl 启动应用方法 3.VirtualCore 启动插件应用最终方法 一.启动应用源码 ...

  7. 【Android 插件化】VirtualApp 源码分析 ( 添加应用源码分析 | LaunchpadAdapter 适配器 | 适配器添加元素 | PackageAppData 元素 )

    文章目录 一.添加应用源码分析 1.LaunchpadAdapter 适配器 2.适配器添加元素 3.PackageAppData 元素 一.添加应用源码分析 1.LaunchpadAdapter 适 ...

  8. 【Android 插件化】VirtualApp 源码分析 ( 安装应用源码分析 | HomePresenterImpl 添加应用 | AppRepository.addVirtualApp )

    文章目录 一.安装应用源码分析 1.HomePresenterImpl 添加应用 2.AppRepository.addVirtualApp 安装 SD 卡 APK 应用 一.安装应用源码分析 1.H ...

  9. 【Android 电量优化】JobScheduler 相关源码分析 ( ConnectivityController 底层源码分析 | 构造函数 | 追踪任务更新 | 注册接收者监听连接变化 )

    文章目录 一.ConnectivityController 连接控制器引入 二.ConnectivityController 构造方法解析 ( 注册接收者 ) 三.mConnectivityRecei ...

  10. 资源调度源码分析和任务调度源码分析

    1.资源调度源码分析 资源请求简单图 资源调度Master路径: 路径:spark-1.6.0/core/src/main/scala/org.apache.spark/deploy/Master/M ...

最新文章

  1. IDEA设置单击左侧项目文件,自动在右侧编辑器打开
  2. 支付宝sdk 支付订单查询失败
  3. 详解Objective-C的meta-class
  4. cout输出字符串_leetcode C++题解系列-042 字符串相乘
  5. linux command read the content,Linux while 和 read 的用法
  6. Create a restful application with AngularJS and CakePHP (I)
  7. [paper reading] GoogLeNet
  8. Chrome DevTools
  9. 【图像处理】C++将读取图像并将图像转换为矩阵的形式
  10. laravel validate
  11. 理解 MeasureSpec
  12. 怎么将多个文本文件合并为一个文本文件
  13. 黑马程序员-----视频看完了,谈谈自己的感受
  14. android系统版本卸掉,使用内置软件卸载最新版本的Android
  15. 基于SpringBoot+Thymeleaf兼职招聘网站
  16. python能做些什么事_一起来看看Python能干什么?使用Python能做哪些事
  17. 批量转换中文名称为英文名称(注:一般为转换格式拼音)
  18. 详细的SQL注入相关的命令
  19. c++ dynamic_cast,static_cast,const_cast,reinterpret_cast四种cast用法整理
  20. 企业视频制作的多媒体软件

热门文章

  1. 机顶盒ttl无法输入_中兴盒子连接TTL线后无法输入代码、不跑码乱码的解决方法分享...
  2. telnet远程管理及AAA认证方式
  3. LKM完全指南 (收集得比较全了)
  4. linux限制网口带宽指令,Linux使用wondershaper限制网络带宽
  5. 洛谷P1424 小鱼的航程(改进版)-c++题解
  6. HTML5与视频传输_拔剑-浆糊的传说_新浪博客
  7. 算法:求10万以内的质数
  8. 6.存储结构与磁盘划分
  9. C语言中 %md 的输入输出使用(还有printf函数的 %0格式控制符的使用)
  10. of undifine报错