


  • minSharePreemptionTimeout: number of seconds the queue is under its minimum share before it will try to preempt containers to take resources from other queues. If not set, the queue will inherit the value from its parent queue.
  • fairSharePreemptionTimeout: number of seconds the queue is under its fair share threshold before it will try to preempt containers to take resources from other queues. If not set, the queue will inherit the value from its parent queue.
  • fairSharePreemptionThreshold: the fair share preemption threshold for the queue. If the queue waits fairSharePreemptionTimeout without receiving fairSharePreemptionThreshold*fairShare resources, it is allowed to preempt containers to take resources from other queues. If not set, the queue will inherit the value from its parent queue.


    private class UpdateThread extends Thread {@Overridepublic void run() {while (!Thread.currentThread().isInterrupted()) {try {Thread.sleep(updateInterval);long start = getClock().getTime();update();  //preemptTasksIfNecessary(); long duration = getClock().getTime() - start;fsOpDurations.addUpdateThreadRunDuration(duration);} catch (InterruptedException ie) {LOG.warn("Update thread interrupted. Exiting.");return;} catch (Exception e) {LOG.error("Exception in fair scheduler UpdateThread", e);}}}}protected synchronized void update() {long start = getClock().getTime();updateStarvationStats(); // Determine if any queues merit preemptionFSQueue rootQueue = queueMgr.getRootQueue();// Recursively update demands for all queuesrootQueue.updateDemand();rootQueue.setFairShare(clusterResource); ///root赋值集群所有值// Recursively compute fair shares for all queues// and update metricsrootQueue.recomputeShares(); /递归计算子队列fair share//FSLeafQueuepublic void recomputeShares() {readLock.lock();try {policy.computeShares(runnableApps, getFairShare());} finally {readLock.unlock();}}ComputeFairShares类public void computeShares(Collection<? extends Schedulable> schedulables,Resource totalResources) {ComputeFairShares.computeShares(schedulables, totalResources, ResourceType.MEMORY);}//此处有套算法
//weight *一个系数(系数目的是权重到资源值作个转换),加起来正好等于集群总量,这样就是每个队列的fair shareprivate static void computeSharesInternal(Collection<? extends Schedulable> allSchedulables,Resource totalResources, ResourceType type, boolean isSteadyShare) {Collection<Schedulable> schedulables = new ArrayList<Schedulable>();int takenResources = handleFixedFairShares(allSchedulables, schedulables, isSteadyShare, type);if (schedulables.isEmpty()) {return;}// Find an upper bound on R that we can use in our binary search. We start// at R = 1 and double it until we have either used all the resources or we// have met all Schedulables' max shares.int totalMaxShare = 0;for (Schedulable sched : schedulables) {int maxShare = getResourceValue(sched.getMaxShare(), type);   ///将每个队列的最大资源使用上限加起来得到个总数totalMaxShare = (int) Math.min((long)maxShare + (long)totalMaxShare,Integer.MAX_VALUE);if (totalMaxShare == Integer.MAX_VALUE) {break;}}int totalResource = Math.max((getResourceValue(totalResources, type) -takenResources), 0);   totalResource = Math.min(totalMaxShare, totalResource);  //资源最大使用上限总和与集群总资源比较,取较小值,一般获得的是集群总的资源double rMax = 1.0;while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type)  通过该方法找系数weigt*系数,type赋值内存类型,找到正好不超过总的集群资源< totalResource) {rMax *= 2.0;}//再通过二分查找,在0到这个系数之间,找到正好可以使用总的资源量最接近集群总容量的。// Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS stepsdouble left = 0;double right = rMax;for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {double mid = (left + right) / 2.0;int plannedResourceUsed = resourceUsedWithWeightToResourceRatio(mid, schedulables, type);if (plannedResourceUsed == totalResource) {right = mid;break;} else if (plannedResourceUsed < totalResource) {left = mid;} else {right = mid;}}// Set the fair shares based on the value of R we've converged tofor (Schedulable sched : schedulables) {if (isSteadyShare) {setResourceValue(computeShare(sched, right, type),((FSQueue) sched).getSteadyFairShare(), type);} else {setResourceValue(computeShare(sched, right, type), sched.getFairShare(), type);}}}/*** Compute the resources that would be used given a weight-to-resource ratio* w2rRatio, for use in the computeFairShares algorithm as described in #*/private static int resourceUsedWithWeightToResourceRatio(double w2rRatio,Collection<? extends Schedulable> schedulables, ResourceType type) {int resourcesTaken = 0;for (Schedulable sched : schedulables) {int share = computeShare(sched, w2rRatio, type);/resourcesTaken += share;}return resourcesTaken;}/*** Compute the resources assigned to a Schedulable given a particular* weight-to-resource ratio w2rRatio.*/private static int computeShare(Schedulable sched, double w2rRatio,ResourceType type) { /权重值*2,保证在最低资源保障和最高使用上限之间double share = sched.getWeights().getWeight(type) * w2rRatio;share = Math.max(share, getResourceValue(sched.getMinShare(), type));share = Math.min(share, getResourceValue(sched.getMaxShare(), type));return (int) share;}

