本篇将对 Yarn 调度器中的资源抢占方式进行探究。分析当集群资源不足时,占用量资源少的队列,是如何从其他队列中抢夺资源的。我们将深入源码,一步步分析抢夺资源的具体逻辑。

一、简介

在资源调度器中,以 CapacityScheduler 为例(Fair 类似),每个队列可设置一个最小资源量和最大资源量。其中,最小资源量是资源紧缺情况下每个队列需保证的资源量,而最大资源量则是极端情况下队列也不能超过的资源使用量。
资源抢占发生的原因,是为了提高资源利用率,资源调度器(包括 Capacity Scheduler 和 Fair Scheduler)会将负载较轻的队列的资源暂时分配给负载重的队列。
仅当负载较轻队列突然收到新提交的应用程序时,调度器才进一步将本属于该队列的资源归还给它。
但由于此时资源可能正被其他队列使用,因此调度器必须等待其他队列释放资源后,才能将这些资源“物归原主”,为了防止应用程序等待时间过长,RM 在等待一段时间后强制回收。

开启容器抢占需要配置的参数 yarn-site.xml

yarn.resourcemanager.scheduler.monitor.enable
yarn.resourcemanager.scheduler.monitor.policies

二、抢占具体逻辑

这里我们主要分析如何选出待抢占容器这一过程。
整理流程如下图所示:

接下来我们深入源码,看看具体的逻辑:
首先 ResourceManager 通过 ResourceManager#createPolicyMonitors 方法创建资源抢占服务:

    protected void createPolicyMonitors() {// 只有 capacity scheduler 实现了 PreemptableResourceScheduler 接口,fair 是如何实现资源抢占的?if (scheduler instanceof PreemptableResourceScheduler&& conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) {LOG.info("Loading policy monitors");// 是否配置了 scheduler.monitor.policies// 默认值是 ProportionalCapacityPreemptionPolicy? 代码中没看到默认值,但是 yarn-site.xml doc 中有默认值List<SchedulingEditPolicy> policies = conf.getInstances(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,SchedulingEditPolicy.class);if (policies.size() > 0) {for (SchedulingEditPolicy policy : policies) {LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());// periodically check whether we need to take action to guarantee// constraints// 此处创建了资源抢占服务类。// 当此服务启动时,会启动一个线程每隔 PREEMPTION_MONITORING_INTERVAL(默认 3s)调用一次// ProportionalCapacityPreemptionPolicy 类中的 editSchedule方法,// 【重点】在此方法中实现了具体的资源抢占逻辑。SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy);addService(mon);}

资源抢占服务会启动一个线程每隔 3 秒钟调用配置的抢占规则,这里以 ProportionalCapacityPreemptionPolicy(比例容量抢占规则)为例介绍其中的抢占具体逻辑(editSchedule 方法):

// ProportionalCapacityPreemptionPolicy#editSchedulepublic void editSchedule() {updateConfigIfNeeded();long startTs = clock.getTime();CSQueue root = scheduler.getRootQueue();// 获取集群当前资源快照Resource clusterResources = Resources.clone(scheduler.getClusterResource());// 具体的资源抢占逻辑containerBasedPreemptOrKill(root, clusterResources);if (LOG.isDebugEnabled()) {LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms.");}}

editSchedule 方法很简单,逻辑都被封装到 containerBasedPreemptOrKill() 方法中,我们继续深入。
其中主要分三步:

  1. 生成资源快照
  2. 根据规则找出各队列待抢占的容器(重点)
  3. 执行容器资源抢占 或 kill超时未自动停止的容器
// 仅保留重要逻辑private void containerBasedPreemptOrKill(CSQueue root,Resource clusterResources) {// ------------ 第一步 ------------ (生成资源快照)// extract a summary of the queues from scheduler// 将所有队列信息拷贝到 queueToPartitions - Map<队列名, Map<资源池, 队列详情>>。生成快照,防止队列变化造成计算问题。for (String partitionToLookAt : allPartitions) {cloneQueues(root, Resources.clone(nlm.getResourceByLabel(partitionToLookAt, clusterResources)), partitionToLookAt);}// ------------ 第二步 ------------ (找出待抢占的容器)// compute total preemption allowed// based on ideal allocation select containers to be preemptionCandidates from each queue and each application// candidatesSelectionPolicies 默认会放入 FifoCandidatesSelector,// 如果配置了 INTRAQUEUE_PREEMPTION_ENABLED,会增加 IntraQueueCandidatesSelectorfor (PreemptionCandidatesSelector selector :candidatesSelectionPolicies) {// 【核心方法】 计算待抢占 Container 放到 preemptMaptoPreempt = selector.selectCandidates(toPreempt,clusterResources, totalPreemptionAllowed);}// 这里有个类似 dryrun 的参数 yarn.resourcemanager.monitor.capacity.preemption.observe_onlyif (observeOnly) {return;}// ------------ 第三步 ------------ (执行容器资源抢占 或 kill超时未自动停止的容器)// preempt (or kill) the selected containerspreemptOrkillSelectedContainerAfterWait(toPreempt);// cleanup staled preemption candidatescleanupStaledPreemptionCandidates();}

一)找出待抢占的容器

第一步资源快照没什么好说的,直接进入到重点:第二步找出待抢占的容器
selector.selectCandidates(),以默认的 FifoCandidatesSelector 实现为例讲解,其他的同理。
主要分两步:

  1. 根据使用量和需求量重新分配资源,得到各队列要被抢占的资源量
  2. 根据资源差额,计算要 kill 的 container
// yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.javapublic Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,Resource clusterResource, Resource totalPreemptionAllowed) {// ------------ 第一步 ------------ (根据使用量和需求量重新分配资源)// Calculate how much resources we need to preempt// 计算出每个资源池每个队列当前资源分配量,和实际要 preempt 的量preemptableAmountCalculator.computeIdealAllocation(clusterResource,totalPreemptionAllowed);// ------------ 第二步 ------------ (根据资源差额,计算要 kill 的 container)// 选 container 是有优先级的: 使用共享池的资源 -> 队列中后提交的任务 -> amContainerfor (String queueName : preemptionContext.getLeafQueueNames()) {synchronized (leafQueue) {// 省略了大部分逻辑,在后面介绍// 从 application 中选出要被抢占的容器preemptFrom(fc, clusterResource, resToObtainByPartition,skippedAMContainerlist, skippedAMSize, selectedCandidates,totalPreemptionAllowed);}}

重新计算各队列分配的资源量

我们先来看「根据使用量和需求量重新分配资源」,即 PreemptableResourceCalculator#computeIdealAllocation()

  // 计算每个队列实际要被 preempt 的量public void computeIdealAllocation(Resource clusterResource,Resource totalPreemptionAllowed) {for (String partition : context.getAllPartitions()) {TempQueuePerPartition tRoot = context.getQueueByPartition(CapacitySchedulerConfiguration.ROOT, partition);// 这里计算好每个队列超出资源配置的部分,存在 TempQueuePerPartition// preemptableExtra 表示可以被抢占的// untouchableExtra 表示不可被抢占的(队列配置了不可抢占)// yarn.scheduler.capacity.<queue>.disable_preemptionupdatePreemptableExtras(tRoot);tRoot.idealAssigned = tRoot.getGuaranteed();// 【重点】遍历队列树,重新计算资源分配,并计算出每个队列计划要 Preempt 的量recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);}// 计算实际每个队列要被 Preempt 的量 actuallyToBePreempted(有个阻尼因子,不会一下把所有超量的都干掉)calculateResToObtainByPartitionForLeafQueues(context.getLeafQueueNames(),clusterResource);}
}

我们直接深入到 recursivelyComputeIdealAssignment() 方法中的核心逻辑:重新计算各队列资源分配值 AbstractPreemptableResourceCalculator#computeFixpointAllocation()
主要逻辑如下:

  1. 首先保障每个队列有自己配置的资源。若使用量小于配置量,多余的资源会被分配到其他队列
  2. 若队列有超出配置资源需求,则放到一个优先级队列中,按 (使用量 / 配置量) 从小到大排序
  3. 对于有资源需求的队列,在剩余的资源中,按配置比例计算每个队列可分配的资源量
  4. 每次从优先级队列中选需求优先级最高的,进行分配
  5. 计算 min(可分配量, 队列最大剩余用量, 需求量)。作为本次分配的资源。若仍有资源需求则放回优先级队列,等待下次分配
  6. 当满足所有队列资源需求,或者没有剩余资源时结束
  7. 仍有资源需求的队列会记录在 underServedQueues
  // 按一定规则将资源分给各个队列protected void computeFixpointAllocation(Resource totGuarant,Collection<TempQueuePerPartition> qAlloc, Resource unassigned,boolean ignoreGuarantee) {// 传进来 unassigned = totGuarant// 有序队列,(使用量 / 配置量) 从小到大排序PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,tqComparator);// idealAssigned = min(使用量,配置量)。  对于不可抢占队列,则再加上超出的部分,防止资源被再分配。if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) {q.idealAssigned = Resources.add(q.getGuaranteed(), q.untouchableExtra);} else {q.idealAssigned = Resources.clone(used);}// 如果该队列有超出配置资源需求,就把这个队列放到 orderedByNeed 有序队列中(即这个队列有资源缺口)if (Resources.lessThan(rc, totGuarant, q.idealAssigned, curPlusPend)) {orderedByNeed.add(q);}}// 此时 unassigned 是 整体可用资源 排除掉 所有已使用的资源(used)// 把未分配的资源(unassigned)分配出去// 方式就是从 orderedByNeed 中每次取出 most under-guaranteed 队列,按规则分配一块资源给他,如果仍不满足就按顺序再放回 orderedByNeed// 直到满足所有队列资源,或者没有资源可分配while (!orderedByNeed.isEmpty() && Resources.greaterThan(rc, totGuarant,unassigned, Resources.none())) {Resource wQassigned = Resource.newInstance(0, 0);// 对于有资源缺口的队列,重新计算他们的资源保证比例:normalizedGuarantee。// 即 (该队列保证量 / 所有资源缺口队列保证量)resetCapacity(unassigned, orderedByNeed, ignoreGuarantee);// 这里返回是个列表,是因为可能有需求度(优先级)相等的情况Collection<TempQueuePerPartition> underserved = getMostUnderservedQueues(orderedByNeed, tqComparator);for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i.hasNext();) {TempQueuePerPartition sub = i.next();// 按照 normalizedGuarantee 比例能从剩余资源中分走多少。Resource wQavail = Resources.multiplyAndNormalizeUp(rc, unassigned,sub.normalizedGuarantee, Resource.newInstance(1, 1));// 【重点】按一定规则将资源分配给队列,并返回剩下的资源。Resource wQidle = sub.offer(wQavail, rc, totGuarant,isReservedPreemptionCandidatesSelector);// 分配给队列的资源Resource wQdone = Resources.subtract(wQavail, wQidle);// 这里 wQdone > 0 证明本次迭代分配出去了资源,那么还会放回到待分配资源的集合中(哪怕本次已满足资源请求),直到未再分配资源了才退出。if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) {orderedByNeed.add(sub);}Resources.addTo(wQassigned, wQdone);}Resources.subtractFrom(unassigned, wQassigned);}// 这里有可能整个资源都分配完了,还有队列资源不满足while (!orderedByNeed.isEmpty()) {TempQueuePerPartition q1 = orderedByNeed.remove();context.addPartitionToUnderServedQueues(q1.queueName, q1.partition);}}

上面第 5 步是重点,也就是 sub.offer(),是计算给该队列在保证值之外,还能提供多少资源:

  /*** 计算队列 idealAssigned,在原有基础上增加新分配的资源。同时返回 avail 中未使用的资源。* 参数说明:* avail 按比例该队列能从剩余资源中分配到的* clusterResource 整体资源量* considersReservedResource ?* idealAssigned = min(使用量,配置量)*/Resource offer(Resource avail, ResourceCalculator rc,Resource clusterResource, boolean considersReservedResource) {// 计算的是还有多少可分配资源的空间( maxCapacity - assigned )Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(Resources.subtract(getMax(), idealAssigned),Resource.newInstance(0, 0));// remain = avail - min(avail, (max - assigned), (current + pending - assigned))// 队列接受资源的计算方法:可提供的资源,队列最大资源-已分配资源,当前已使用资源+未满足的资源-min(使用量,配置量) 三者中的最小值。Resource accepted = Resources.min(rc, clusterResource,absMaxCapIdealAssignedDelta,Resources.min(rc, clusterResource, avail, Resources.subtract(Resources.add((considersReservedResource? getUsed(): getUsedDeductReservd()), pending),idealAssigned)));Resource remain = Resources.subtract(avail, accepted);Resources.addTo(idealAssigned, accepted);return remain;}

核心的资源重新分配算法逻辑已经计算完毕,剩下的就是:
根据重新计算的资源分配,得到各队列超用的资源,这部分就是要被抢占的资源。
这里不会一下把队列超用的资源都干掉,有个阻尼因子,用于平滑抢占处理。

根据资源差额,计算要抢占的容器

回到 selector.selectCandidates(),上面已经介绍了各队列抢占量的计算逻辑,接下来介绍「如何选出各队列中的 container」

  1. 抢占该队列在共享池使用资源的 container
  2. 抢占后提交任务中,后生成的 container(也就是越晚生成的 container,会被先处理)
  3. 抢占 amContainer
  public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,Resource clusterResource, Resource totalPreemptionAllowed) {// ......// ------------ 第二步 ------------ (根据资源差额,计算要 kill 的 container)// 根据计算得到的要抢占的量,计算各资源池各队列要 kill 的 containerList<RMContainer> skippedAMContainerlist = new ArrayList<>();// Loop all leaf queues// 这里是有优先级的: 使用共享池的资源 -> 队列中后提交的任务 -> amContainerfor (String queueName : preemptionContext.getLeafQueueNames()) {// 获取该队列在每个资源池要被抢占的量Map<String, Resource> resToObtainByPartition =CapacitySchedulerPreemptionUtils.getResToObtainByPartitionForLeafQueue(preemptionContext,queueName, clusterResource);synchronized (leafQueue) {// 使用共享池资源的,先处理Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityContainers =leafQueue.getIgnoreExclusivityRMContainers();for (String partition : resToObtainByPartition.keySet()) {if (ignorePartitionExclusivityContainers.containsKey(partition)) {TreeSet<RMContainer> rmContainers =ignorePartitionExclusivityContainers.get(partition);// 最后提交的任务,会被最先抢占for (RMContainer c : rmContainers.descendingSet()) {if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,selectedCandidates)) {// Skip already selected containerscontinue;}// 将 Container 放到待抢占集合 preemptMap 中boolean preempted = CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(rc,preemptionContext, resToObtainByPartition, c,clusterResource, selectedCandidates,totalPreemptionAllowed);}}}// preempt other containersResource skippedAMSize = Resource.newInstance(0, 0);// 默认是 FifoOrderingPolicy,desc 也就是最后提交的在最前面Iterator<FiCaSchedulerApp> desc =leafQueue.getOrderingPolicy().getPreemptionIterator();while (desc.hasNext()) {FiCaSchedulerApp fc = desc.next();if (resToObtainByPartition.isEmpty()) {break;}// 从 application 中选出要被抢占的容器(后面介绍)preemptFrom(fc, clusterResource, resToObtainByPartition,skippedAMContainerlist, skippedAMSize, selectedCandidates,totalPreemptionAllowed);}// Can try preempting AMContainersResource maxAMCapacityForThisQueue = Resources.multiply(Resources.multiply(clusterResource,leafQueue.getAbsoluteCapacity()),leafQueue.getMaxAMResourcePerQueuePercent());preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist,resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue,totalPreemptionAllowed);}}return selectedCandidates;}

二)执行容器资源抢占

把要被抢占的 container 都选出来之后,就剩最后一步, kill 这些 container。
回到 containerBasedPreemptOrKill()

  private void containerBasedPreemptOrKill(CSQueue root,Resource clusterResources) {// ......// ------------ 第三步 ------------ (执行容器资源抢占 或 kill超时未自动停止的容器)// preempt (or kill) the selected containerspreemptOrkillSelectedContainerAfterWait(toPreempt);// cleanup staled preemption candidatescleanupStaledPreemptionCandidates();}

三、总结

至此,分析完毕整个资源抢占的过程。
总结一下主要逻辑:

  1. 重新计算各资源池中各队列应分配的资源;
  2. 与现在已使用的资源进行对比,如果超过新计算的分配量,(超用的部分*阻尼系数)就是要被抢占的资源量;
  3. 各队列根据要被抢占的资源量,选出要被 kill 的 container。优先度低的 container 就会被先处理(使用了共享资源的、后生成的 container);
  4. 通过心跳通知 AM 要被 kill 的 container,或者处理掉通知过已超时的 container。

【深入浅出 Yarn 架构与实现】5-3 Yarn 调度器资源抢占模型相关推荐

  1. Hadoop学习(二)——MapReduce\Yarn架构

    其他更多java基础文章: java基础学习(目录) 学习资料 理解Hadoop YARN架构 本文先讲MapReduce 1.x的框架.再讲MapReduce 1.x升级改进后MapReduce 2 ...

  2. Hadoop生态圈(九)YARN架构深入学习

    1. YARN框架概述 1.1 YARN产生和发展简史 1.1.1 Hadoop演进阶段 数据.程序.运算资源(内存.cpu)三者组在一起,完成了数据的计算处理过程.在单机环境下,这些都不是太大问题. ...

  3. Hadoop生态圈(三十三)- YARN架构深入学习

    目录 前言 1. YARN框架概述 1.1 YARN产生和发展简史 1.1.1 Hadoop演进阶段 1.1.1.1 阶段0:Ad Hoc集群 1.1.1.2 阶段1:HOD集群 1.1.1.3 阶段 ...

  4. YARN中的任务队列调度器-公平调度器(Fair Scheduler)

    一.概述 公平调度器可以为所有的应用"平均公平"分配资源,当然,这种"公平"是可以配置的,称为权重,可以在分配文件中为每一个队列设置分配资源的权重,如果没有设置 ...

  5. Yarn的调度器--Scheduler探究

    引言 在Yarn体系中,Scheduler负责为Application分配资源,按照调度策略可分为以下3种: FIFO Scheduler Capacity Scheduler Fair Schedu ...

  6. 7种主流案例,告诉你调度器架构设计通用法则(干货!)

    女主宣言 今天小编为大家转载一篇来自DBAplus社群的干货文章,希望能够帮助大家对关于调度器的理解.作者张晨,Strikingly数据平台工程师,算法.分布式系统和函数式编程爱好者.Shanghai ...

  7. 【深入浅出 Yarn 架构与实现】1-2 搭建 Hadoop 源码阅读环境

    本文将介绍如何使用 idea 搭建 Hadoop 源码阅读环境.(默认已安装好 Java.Maven 环境) 一.搭建源码阅读环境 一)idea 导入 hadoop 工程 从 github 上拉取代码 ...

  8. yarn架构——本质上是在做解耦 将资源分配和应用程序状态监控两个功能职责分离为RM和AM...

    Hadoop YARN架构解读 原Mapreduce架构 原理 架构图如下: 图 1.Hadoop 原 MapReduce 架构 原 MapReduce 程序的流程: 首先用户程序 (JobClien ...

  9. 启动hadoop没有resourcemanager_5.hadoop-MR YARN架构理论与集群搭建

    MR原语 输入(格式化k,v)数据集map映射成一个中间数据集(k,v)reduce 相同"的key为一组,调用一次reduce方法,方法内迭代这一组数据进行计算 计算框架 Partit ...

最新文章

  1. python教学上机实验报告怎么写_Python基础(下)
  2. 关于Redis缓存,这3个问题一定要知道!
  3. python100个免费实例-Python实例100个(基于最新Python3.7版本)
  4. ECharts概念学习系列之ECharts是什么?
  5. Division and Union
  6. Ubuntu中的密钥环密码与登陆密码不同
  7. 华南理工大计算机博士毕业条件,华南理工大学博士毕业要求-2018年7月版.doc
  8. oracle表与表之间更新,Oracle 两个表之间更新的实现
  9. “当了十年IT程序员,我转型做自动驾驶开发的这五年”
  10. mongodb mac安装_在Mac OS X上安装MongoDB
  11. python版webpower的edm的api接口
  12. TouchSlop与VelocityTracker认识
  13. mysql error1205 博客_mysql主从复制Error1205
  14. 秋冬季健康生活小常识
  15. 【安价】亚拉奈夫想要复兴贫弱男爵家的样子【内政】2
  16. 自学Java一点都不难!
  17. 【手写 Vue2.x 源码】第十九篇 - 根据 vnode 创建真实节点
  18. javascript利用iframe打印pdf文档失败的问题
  19. idea如何启动vue项目
  20. 用计算机解决问题时 为什么要用计算思维,在亲历计算机解决问题的全过程中发展计算思维...

热门文章

  1. 华宝证券围绕智能交易战略实现华宝智投条件单、小T自动交易机器人、打新机器人、定投机器人等智能交易模块
  2. linux的644权限,常用linux系统644和755及777权限详解
  3. 南邮 起名字真难
  4. 未来集市未来怎么样_大流行后的未来
  5. 《基于Android微博整合客户端的设计与实现》毕业设计论文开题报告
  6. cool-admin框架后端使用-node版本,使用事务装饰器来创建和事务回滚
  7. Android 蓝牙基础篇之 —— SPP
  8. 不死马php如何取证_AWD不死马与克制方法
  9. 极光推送REST API 实例(分等级推送,使用别名推送)
  10. 网络质量指标及测试工具iperf