一、概要

首先,YARN FairScheduler主要做的事情:
① 处理NM心跳NodeUpdate,分配container。
② 树状维护队列和任务,定时计算fair share等信息,并进行排序。

本文重点分析②

二、代码

1、初始化FairScheduler

在RM启动时会初始化FairScheduler,

  private void initScheduler(Configuration conf) throws IOException {synchronized (this) {this.conf = new FairSchedulerConfiguration(conf);validateConf(this.conf);minimumAllocation = this.conf.getMinimumAllocation();initMaximumResourceCapability(this.conf.getMaximumAllocation());incrAllocation = this.conf.getIncrementAllocation();// 持续调度,默认false,一般用于时效性高的实时任务continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();continuousSchedulingSleepMs =this.conf.getContinuousSchedulingSleepMs();balanceSchedulingEnabled = this.conf.isBalanceSchedulingEnabled();...preemptionUtilizationThreshold =this.conf.getPreemptionUtilizationThreshold();// 一次性分配多个container,加大吞吐assignMultiple = this.conf.getAssignMultiple();maxAssignDynamic = this.conf.isMaxAssignDynamic();maxAssign = this.conf.getMaxAssign();sizeBasedWeight = this.conf.getSizeBasedWeight();preemptionInterval = this.conf.getPreemptionInterval();waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();usePortForNodeName = this.conf.getUsePortForNodeName();reservableNodesRatio = this.conf.getReservableNodes();if (this.conf.isCpuSchedulingEnabled()) {RESOURCE_CALCULATOR = new CpuResourceCalculator();} else {RESOURCE_CALCULATOR = new DefaultResourceCalculator();}// 重新计算fair share的频率updateInterval = this.conf.getUpdateInterval();if (updateInterval < 0) {updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS;LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS+ " is invalid, so using default value " ++FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS+ " ms instead");}// 一些性能指标打点采集rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);fsOpDurations = FSOpDurations.getInstance(true);// This stores per-application scheduling informationthis.applications = new ConcurrentHashMap<ApplicationId, SchedulerApplication<FSAppAttempt>>();this.eventLog = new FairSchedulerEventLog();eventLog.init(this.conf);allocConf = new AllocationConfiguration(conf);rmNodeLabelsManager = rmContext.getNodeLabelManager();try {// QueueManager管理队列及挂在队列下的applicationqueueMgr.initialize(conf);} catch (Exception e) {throw new IOException("Failed to start FairScheduler", e);}// 启动定时计算demand和fair share线程updateThread = new UpdateThread();updateThread.setName("FairSchedulerUpdateThread");updateThread.setDaemon(true);// 启动持续调度线程if (continuousSchedulingEnabled) {// start continuous scheduling threadschedulingThread = new ContinuousSchedulingThread();schedulingThread.setName("FairSchedulerContinuousScheduling");schedulingThread.setDaemon(true);}}// 初始化AllocationFileLoaderServiceallocsLoader.init(conf);// If we fail to load allocations file on initialize, we want to fail// immediately.  After a successful load, exceptions on future reloads// will just result in leaving things as they are.try {allocsLoader.reloadAllocations();// 获取 NM labelrmNodeLabelsManager.reinitializeQueueLabels(getQueueToLabels());} catch (Exception e) {throw new IOException("Failed to initialize FairScheduler", e);}}
2、updateDemand

更新每个队列、application资源需求

  public void updateDemand() {// Compute demand by iterating through apps in the queue// Limit demand to maxResourcesResource maxRes = scheduler.getAllocationConfiguration().getMaxResources(getName());demand = Resources.createResource(0);readLock.lock();try {for (FSAppAttempt sched : runnableApps) {// demand达上限,breakif (Resources.equals(demand, maxRes)) {break;}// 内部逻辑是把当前已经占用的资源加上额外请求的资源总和// 遍历每个额外请求,对所请求的资源求和,如果加起来大于// 最大资源限制,则将demand设为mapResupdateDemandForApp(sched, maxRes);}for (FSAppAttempt sched : nonRunnableApps) {if (Resources.equals(demand, maxRes)) {break;}updateDemandForApp(sched, maxRes);}} finally {readLock.unlock();}// sort it in advance.Comparator<Schedulable> comparator = policy.getComparator();writeLock.lock();try {// 对队列 application进行排序Collections.sort(runnableApps, comparator);} finally {writeLock.unlock();}if (LOG.isDebugEnabled()) {LOG.debug("The updated demand for " + getName() + " is " + demand+ "; the max is " + maxRes);}}

队列排序规则,FairShareComparator:

  public int compare(Schedulable s1, Schedulable s2) {Priority priority1 = s1.getPriority();Priority priority2 = s2.getPriority();// 先比较优先级,优先级低排后if (!priority1.equals(priority2)) {return priority1.compareTo(priority2);}Resource demand1 = s1.getDemand();Resource demand2 = s2.getDemand();// 不需要资源的,排后if (demand1.equals(Resources.none()) &&!demand2.equals(Resources.none())) {return 1;} else if (demand2.equals(Resources.none()) &&!demand1.equals(Resources.none())) {return -1;}double minShareRatio1, minShareRatio2;double useToWeightRatio1, useToWeightRatio2;double weight1, weight2;//Do not repeat the getResourceUsage calculationResource resourceUsage1 = s1.getResourceUsageFaster();Resource resourceUsage2 = s2.getResourceUsageFaster();// 取min share和资源需求中的最小值Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,s1.getMinShare(), demand1);Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,s2.getMinShare(), demand2);// 根据当前使用资源和minShare比较,如果小于则需要资源boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,resourceUsage1, minShare1);boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,resourceUsage2, minShare2);// 内存使用占比minShareRatio1 = (double) resourceUsage1.getMemory()/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemory();minShareRatio2 = (double) resourceUsage2.getMemory()/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemory();// 比较队列权重weight1 = s1.getWeights().getWeight(ResourceType.MEMORY);weight2 = s2.getWeights().getWeight(ResourceType.MEMORY);if (weight1 > 0.0 && weight2 > 0.0) {// 根据队列权重计算比例useToWeightRatio1 = resourceUsage1.getMemory() / weight1;useToWeightRatio2 = resourceUsage2.getMemory() / weight2;} else { // Either weight1 or weight2 equals to 0if (weight1 == weight2) {// 权重相等则直接比较使用的内存useToWeightRatio1 = resourceUsage1.getMemory();useToWeightRatio2 = resourceUsage2.getMemory();} else {// 权重绝对值越接近0,排后useToWeightRatio1 = -weight1;useToWeightRatio2 = -weight2;}}int res = 0;if (s1Needy && !s2Needy)res = -1;else if (s2Needy && !s1Needy)res = 1;else if (s1Needy && s2Needy)// 比谁share大,用的多 排后res = (int) Math.signum(minShareRatio1 - minShareRatio2);else// 比谁使用权重占比大,大的排后res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);if (res == 0) {// Apps are tied in fairness ratio. Break the tie by submit time and job// name to get a deterministic ordering, which is useful for unit tests.// 仍然相同则比开始时间,后开始的 排后res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());if (res == 0)// 还相同,比name字符串,字典序靠前的先跑res = s1.getName().compareTo(s2.getName());}return res;}
3、recomputeShares

计算Instantaneous Fair Share和Steady Fair Share逻辑

ComputeFairShares.computeSharesInternal:

  private static void computeSharesInternal(Collection<? extends Schedulable> allSchedulables,Resource totalResources, ResourceType type, boolean isSteadyShare) {// 前提:type均为memory,vcore在此不考虑,DominantResourceFairnessPolicy会同时考虑memory和vcore做排序。// fair-scheduler.xml中配置的所有队列minShare之和必小于集群NM资源总和// 过滤出需要参与计算fair share的队列// isSteadyShare=true,过滤掉weight和maxShare不符合规定的队列// isSteadyShare=false,过滤掉weight和maxShare不符合规定的队列、没有running application的队列Collection<Schedulable> schedulables = new ArrayList<Schedulable>();int takenResources = handleFixedFairShares(allSchedulables, schedulables, isSteadyShare, type);if (schedulables.isEmpty()) {return;}// Find an upper bound on R that we can use in our binary search. We start// at R = 1 and double it until we have either used all the resources or we// have met all Schedulables' max shares.// 获取所有队列的MaxShare总和int totalMaxShare = 0;for (Schedulable sched : schedulables) {int maxShare = getResourceValue(sched.getMaxShare(), type);totalMaxShare = (int) Math.min((long)maxShare + (long)totalMaxShare,Integer.MAX_VALUE);if (totalMaxShare == Integer.MAX_VALUE) {break;}}// 剩余参与fair share分配的总资源int totalResource = Math.max((getResourceValue(totalResources, type) -takenResources), 0);totalResource = Math.min(totalMaxShare, totalResource);// rMax为第一个能够满足所有队列资源均在min和max之间,且大于集群总资源,从1开始每次扩大一倍double rMax = 1.0;while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type)< totalResource) {rMax *= 2.0;}// 二分rMax,迭代25次或算出来的资源刚好等于totalResource// 目的是得出来的总资源标准尽量接近真实总资源double left = 0;double right = rMax;for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {double mid = (left + right) / 2.0;int plannedResourceUsed = resourceUsedWithWeightToResourceRatio(mid, schedulables, type);if (plannedResourceUsed == totalResource) {right = mid;break;} else if (plannedResourceUsed < totalResource) {left = mid;} else {right = mid;}}// 给每个队列设置fair share 或 steady fair sharefor (Schedulable sched : schedulables) {if (isSteadyShare) {setResourceValue(computeShare(sched, right, type),((FSQueue) sched).getSteadyFairShare(), type);} else {setResourceValue(computeShare(sched, right, type), sched.getFairShare(), type);}}}

handleFixedFairShares:

  private static int handleFixedFairShares(Collection<? extends Schedulable> schedulables,Collection<Schedulable> nonFixedSchedulables,boolean isSteadyShare, ResourceType type) {// 所有队列资源总和int totalResource = 0;for (Schedulable sched : schedulables) {// 若maxShare或者weight配置为0,这个队列在任何时候都不会运行任何app,即固定队列,并且分配给他的fair share或instaneous fair share都为0// 若计算instaneous fair share,且队列没有app运行,那么,这个队列的instaneous fair share是0,并且这个队列被判定为fix sheduler,所以这个队列不再参与instaneous fair share的计算// 若计算的steady fair share,steady fair share值只和该队列的min max配置有关,和是否有app正在运行无关int fixedShare = getFairShareIfFixed(sched, isSteadyShare, type);if (fixedShare < 0) {nonFixedSchedulables.add(sched);} else {// 若isSteadyShare=true,即steady fairshares,则将其steady fair share设置为fixedShare// 若isSteadyShare=false,即instaneous fair share,则将instaneous fair share设置为fixedSharesetResourceValue(fixedShare,isSteadyShare? ((FSQueue)sched).getSteadyFairShare(): sched.getFairShare(),type);totalResource = (int) Math.min((long)totalResource + (long)fixedShare,Integer.MAX_VALUE);}}return totalResource;}

resourceUsedWithWeightToResourceRatio:

  private static int resourceUsedWithWeightToResourceRatio(double w2rRatio,Collection<? extends Schedulable> schedulables, ResourceType type) {long resourcesTaken = 0;// 对每个队列计算share,再求和for (Schedulable sched : schedulables) {int share = computeShare(sched, w2rRatio, type);resourcesTaken += share;}return (int)Math.min(resourcesTaken, Integer.MAX_VALUE);}

computeShare:

  private static int computeShare(Schedulable sched, double w2rRatio,ResourceType type) {// 根据总share*权重计算出该队列share值,并控制在minShare和maxShare之间double share = sched.getWeights().getWeight(type) * w2rRatio;share = Math.max(share, getResourceValue(sched.getMinShare(), type));share = Math.min(share, getResourceValue(sched.getMaxShare(), type));return (int) share;}

三、SLS性能测试

根据jmx采集的指标,加上SLS可以针对RM进行性能评估。
具体暂时先不介绍了,以后空了再写。
官方文档 YARN Scheduler Load Simulator

四、Metrics采集

FSOpDuration
{"beans" : [ {"name" : "Hadoop:service=ResourceManager,name=FSOpDurations","modelerType" : "FSOpDurations","tag.FSOpDurations" : "FSOpDurations","tag.Context" : "fairscheduler-op-durations","tag.Hostname" : "host","ContinuousSchedulingRunNumOps" : 0,"ContinuousSchedulingRunAvgTime" : 0.0,"ContinuousSchedulingRunStdevTime" : 0.0,"ContinuousSchedulingRunIMinTime" : 3.4028234663852886E38,"ContinuousSchedulingRunIMaxTime" : 1.401298464324817E-45,"ContinuousSchedulingRunMinTime" : 3.4028234663852886E38,"ContinuousSchedulingRunMaxTime" : 1.401298464324817E-45,// 处理NodeUpdate总次数"NodeUpdateCallNumOps" : 19230,// 一次采样周期内NodeUpdate平均耗时ms"NodeUpdateCallAvgTime" : 14.101851851851853,"NodeUpdateCallStdevTime" : 7.589392615725218,"NodeUpdateCallIMinTime" : 10.0,"NodeUpdateCallIMaxTime" : 113.0,"NodeUpdateCallMinTime" : 3.0,"NodeUpdateCallMaxTime" : 1403.0,// fair share计算总次数"UpdateThreadRunNumOps" : 115,// 一次采样周期内计算fair share平均耗时ms"UpdateThreadRunAvgTime" : 2350.0,"UpdateThreadRunStdevTime" : 110.30865786510141,"UpdateThreadRunIMinTime" : 2272.0,"UpdateThreadRunIMaxTime" : 2428.0,"UpdateThreadRunMinTime" : 4.0,"UpdateThreadRunMaxTime" : 6177.0,// 和上面差不多,这些指标统计更内层的方法,不包括ContinuousScheduling"UpdateCallNumOps" : 115,"UpdateCallAvgTime" : 2350.0,"UpdateCallStdevTime" : 110.30865786510141,"UpdateCallIMinTime" : 2272.0,"UpdateCallIMaxTime" : 2428.0,"UpdateCallMinTime" : 4.0,"UpdateCallMaxTime" : 6169.0,"PreemptCallNumOps" : 0,"PreemptCallAvgTime" : 0.0,"PreemptCallStdevTime" : 0.0,"PreemptCallIMinTime" : 3.4028234663852886E38,"PreemptCallIMaxTime" : 1.401298464324817E-45,"PreemptCallMinTime" : 3.4028234663852886E38,"PreemptCallMaxTime" : 1.401298464324817E-45,// 处理AssignContainer请求"AssignContainerCallNumOps" : 2860902,"AssignContainerCallAvgTime" : 31.927672432911855,"AssignContainerCallStdevTime" : 575.0777254854487,"AssignContainerCallIMinTime" : 16.0,"AssignContainerCallIMaxTime" : 100921.0,"AssignContainerCallMinTime" : 15.0,"AssignContainerCallMaxTime" : 1393653.0,// 处理CompletedContainer请求"CompletedContainerCallNumOps" : 1293897,"CompletedContainerCallAvgTime" : 26.40577603228669,"CompletedContainerCallStdevTime" : 22.917249699220484,"CompletedContainerCallIMinTime" : 14.0,"CompletedContainerCallIMaxTime" : 2026.0,"CompletedContainerCallMinTime" : 13.0,"CompletedContainerCallMaxTime" : 991449.0} ]
}

YARN : FairScheduler深入解析(队列维护,demand、fair share计算)相关推荐

  1. 【BZOJ3048】Cow lineup,贪心+队列维护(或二分答案)

    传送门(权限题) 题面: 3048: [Usaco2013 Jan]Cow Lineup Time Limit: 2 Sec Memory Limit: 128 MB Submit: 121 Solv ...

  2. fair share用法

    Listen. As someone who's seen more than her fair share of bad beef, I'll tell you: that is not such ...

  3. [源码解析] TensorFlow 分布式之 MirroredStrategy 分发计算

    [源码解析] TensorFlow 分布式之 MirroredStrategy 分发计算 文章目录 [源码解析] TensorFlow 分布式之 MirroredStrategy 分发计算 0x1. ...

  4. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [二] 请大家看原文去. 接上文Flink 1.12.2 源码分析 : yarn-per-job模式浅析 [一 ...

  5. 12、加权平均队列(WFQ-Weight Fair Queue)算法

    1.  队列调度算法总述 WFQ,WF2Q,等均是基于时戳的持续调度算法.这类算法都使用了类似的"分组有序排队"机制(sorted priority queue mechanism ...

  6. 最大子序和:单调队列维护一个上升序列

    最大子序和 输入一个长度为n的整数序列,从中找出一段长度不超过m的连续子序列,使得子序列中所有数的和最大. 注意: 子序列的长度至少是1. 输入格式 第一行输入两个整数n,m. 第二行输入n个数,代表 ...

  7. 【论文解析】D2FQ Device-Direct Fair Queueing for NVMe SSDs

    本学期系统结构与数据存储课的论文讲解随机抽到的一篇,本博客是从0开始记录 一些概念 SSD:固态驱动器(Solid State Disk或Solid State Drive,简称SSD),俗称固态硬盘 ...

  8. Kafka设计解析(七)- 流式计算的新贵 Kafka Stream

    http://www.infoq.com/cn/articles/kafka-analysis-part-7 Kafka Stream背景 Kafka Stream是什么 Kafka Stream是A ...

  9. UVALive - 3231 Fair Share(最大流+二分)

    题目链接:点击查看 题目大意:给出n个处理器和m个任务,每个任务给出可以运行的两个处理器,只需要其中一个处理器完成即可, 问如何分配处理方案,能使得n个处理器中处理任务最多的处理器所处理的任务最少 题 ...

  10. 【转】详细解析电源滤波电容的选取与计算

    本文转载自电源联盟 电感的阻抗与频率成正比,电容的阻抗与频率成反比.所以,电感可以阻扼高频通过,电容可以阻扼低频通过.二者适当组合,就可过滤各种频率信号.如在整流电路中,将电容并在负载上或将电感串联在 ...

最新文章

  1. 一夜暴富之前的漫漫长路
  2. C Primer+Plus(十一)
  3. 网络编程BIO,NIO一
  4. 多角度对比数据中心常见的三种走线方式
  5. kubernetes组件介绍
  6. Windows环境下maven 安装与环境变量配置
  7. K3 LEDE固件更改FRP客户端版本
  8. Webform DropDownList控件绑定数据源
  9. [BZOJ 1046] [HAOI2007] 上升序列 【DP】
  10. macbook所有型号大全_苹果笔记本型号大全
  11. MySQL中concat函数(连接字符串)
  12. PHP添加网站版权信息,如何将版权和作者信息添加到用PHP创建的图像?
  13. 原生社区交友婚恋视频即时通讯双端APP源码 ONE兔2.0版
  14. 初级官方卡刷包精简 添加万能ROOT
  15. esp8266 安信可AiThinkerIDE_V1.5.2开发环境搭建
  16. 【项目实训】实验八 数据处理
  17. Beta的计划和人员的变动
  18. windows10装detectron2-0.6,并运行fasterrcnn
  19. 国际贸易术语解释通则(DDP 完税后交货(……指定目的港))
  20. 用R来求解一元二次方程

热门文章

  1. 笑话:如果你在河边等待得足够久,你会看到你的敌人的尸体漂过,是怎么翻译出来的?
  2. 经验分享 | ENVI app store
  3. php验证码 失效,php验证码无法显示的解决方法
  4. 浅谈springboot和VUE整合PageHelper和element.eleme实现分页查询
  5. python为什么会出现无响应怎么办,Python多处理中无响应进程的终止
  6. 11月14日Fluent建模笔记
  7. 一文了解驱动程序及更新方法
  8. 【法律】如何保障未来夫妻合法权益:婚前房屋财产约定协议书
  9. WIN2003 装不上mssql2000
  10. 2015最好用的PHP开源建站系统