Spark对于Yarn priority的支持源码详解

Yarn的调度器

在Yarn中,提供了Capacity scheduler和Fair scheduler,它们都支持priority的。这里我们简单介绍下概念,不做过多的描述。

Capacity Scheduler

Capacity scheuler设计的目的是为了让Hadoop上的applications可以以一个多租户的形式下分享资源运行,这种调度器一般应用在有一个较大的公有集群,按照队列来分配资源给特定的用户组。我们可以简单的通过配置就可以设定队列在cluster中资源或者用户在队列中的的使用限制(最低保障和最高上限等),当一个队列的资源空余的时候,Yarn可以暂时利用剩余的资源分享给其他需要的队列。

Fair Scheduler

Fair scheduler就如同它的名字一样,他在分配资源的时候,是秉承着公平原则,applications在一段时间内分配到的平均资源会趋于相等。如果一个只有一个application在集群上运行的时候,资源都可供这一个application使用。如果有另外的application被提交到集群上时,空闲的资源就会被分配给新提交的application上,这样最后每个运行的application都会分配到相等的资源。

Priority在Yarn中的使用

Capacity Scheduler

Capacity scheduler支持对应用的priority的设置。Yarn的priority是整数型,更大的数就代表更高的优先级,这个功能只支持在FIFO(默认)的策略下进行。priority可以针对cluster或者queue级别进行设置。

cluster level: 如果你的application设置的priority超过了cluster最大值,那按照最大的cluster priority对待。
queue level: 队列有一个默认的priority值,queue下的applications如果没有设置具体的priority会被设置成该默认值。如果application变更了queue,它的priority值不会更改。

Fair Scheduler

Fair scheduler支持把一个正在运行的application迁移到另一个priority不同的queue里,这样这个application获取资源的权重就会跟着queue变化。被迁移得application的资源就会算在新的queue上,如果所需资源超过了新的queue的最大限制,迁移就会失败。

SparkOnYarn支持priority

如何为Spark app设置priority

只需要再SparkConf里进行设置即可,遵循Yarn对于priority的定义,数值越大,priority越高,在同一时间提交的job会有更高的优先级获取资源:

val sparkConf = new SparkConf().set(APPLICATION_TAGS.key, ",tag1, dup,tag2 , ,multi word , dup").set(MAX_APP_ATTEMPTS, 42).set("spark.app.name", "foo-test-app").set(QUEUE_NAME, "staging-queue").set(APPLICATION_PRIORITY, 1)

Spark源码

Spark目前已经有了对于Yarn的priority官方支持,这里给出一个在Jira上closed的SPARK-10879。这个Jira是很早以前的一个版本,diff仅供参考,用于让大家理解Spark on Yarn如何设置priority的基本流程。
其实需要支持priority很简单,一是需要在submit的时候提供priority参数的设置,官方是放在了SparkConf里去设置;另一个是需要在createApplicationSubmissionContext的时候,调用setPriority将priority传入到Yarn。这里给出关键的地方的代码:

/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala

  private[spark] val APPLICATION_PRIORITY = ConfigBuilder("spark.yarn.priority").doc("Application priority for YARN to define pending applications ordering policy, those" +" with higher value have a better opportunity to be activated. Currently, YARN only" +" supports application priority when using FIFO ordering policy.").intConf.createOptional

/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala中对createApplicationSubmissionContext函数的修改:

/*** Set up the context for submitting our ApplicationMaster.* This uses the YarnClientApplication not available in the Yarn alpha API.*/def createApplicationSubmissionContext(newApp: YarnClientApplication,containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {val componentName = if (isClusterMode) {config.YARN_DRIVER_RESOURCE_TYPES_PREFIX} else {config.YARN_AM_RESOURCE_TYPES_PREFIX}val yarnAMResources = getYarnResourcesAndAmounts(sparkConf, componentName)val amResources = yarnAMResources ++getYarnResourcesFromSparkResources(SPARK_DRIVER_PREFIX, sparkConf)logDebug(s"AM resources: $amResources")val appContext = newApp.getApplicationSubmissionContextappContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))appContext.setQueue(sparkConf.get(QUEUE_NAME))appContext.setAMContainerSpec(containerContext)appContext.setApplicationType("SPARK")sparkConf.get(APPLICATION_TAGS).foreach { tags =>appContext.setApplicationTags(new java.util.HashSet[String](tags.asJava))}sparkConf.get(MAX_APP_ATTEMPTS) match {case Some(v) => appContext.setMaxAppAttempts(v)case None => logDebug(s"${MAX_APP_ATTEMPTS.key} is not set. " +"Cluster's default value will be used.")}sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval =>appContext.setAttemptFailuresValidityInterval(interval)}val capability = Records.newRecord(classOf[Resource])capability.setMemory(amMemory + amMemoryOverhead)capability.setVirtualCores(amCores)if (amResources.nonEmpty) {ResourceRequestHelper.setResourceRequests(amResources, capability)}logDebug(s"Created resource capability for AM request: $capability")sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {case Some(expr) =>val amRequest = Records.newRecord(classOf[ResourceRequest])amRequest.setResourceName(ResourceRequest.ANY)amRequest.setPriority(Priority.newInstance(0))amRequest.setCapability(capability)amRequest.setNumContainers(1)amRequest.setNodeLabelExpression(expr)appContext.setAMContainerResourceRequest(amRequest)case None =>appContext.setResource(capability)}sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach { includePattern =>try {val logAggregationContext = Records.newRecord(classOf[LogAggregationContext])logAggregationContext.setRolledLogsIncludePattern(includePattern)sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern =>logAggregationContext.setRolledLogsExcludePattern(excludePattern)}appContext.setLogAggregationContext(logAggregationContext)} catch {case NonFatal(e) =>logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " +"does not support it", e)}}appContext.setUnmanagedAM(isClientUnmanagedAMEnabled)sparkConf.get(APPLICATION_PRIORITY).foreach { appPriority =>appContext.setPriority(Priority.newInstance(appPriority))}appContext}

spark任务优先级设置:spark.yarn.priority相关推荐

  1. 平台搭建---Spark提交应用程序---Spark Submit提交应用程序及yarn

    本部分来源,也可以到spark官网查看英文版. spark-submit 是在spark安装目录中bin目录下的一个shell脚本文件,用于在集群中启动应用程序(如***.py脚本):对于spark支 ...

  2. Spark(十二) -- Spark On Yarn Spark as a Service Spark On Tachyon

    Spark On Yarn: 从0.6.0版本其,就可以在在Yarn上运行Spark 通过Yarn进行统一的资源管理和调度 进而可以实现不止Spark,多种处理框架并存工作的场景 部署Spark On ...

  3. Spark运行模式Local+Standalone+Yarn+mesos

    Spark运行模式Local+Standalone+Yarn+mesos bin/spark-submit --help 注意: --master MASTER_URL spark://host:po ...

  4. 【原创】大数据基础之Spark(9)spark部署方式yarn/mesos

    1 下载解压 https://spark.apache.org/downloads.html $ wget http://mirrors.shu.edu.cn/apache/spark/spark-2 ...

  5. spark代码中添加logger_JAVA代码如何设置SPARK的日志打印级别

    问题场景:在使用spark sql 增加where条件过滤时,会出现打印很多的被过滤掉的记录(几十万条),导致跑spark sql 特别慢! var df2 = sqc.sql("SELEC ...

  6. Java搭建Spark程序,提交到Yarn

    文章目录 Java搭建Spark程序,提交到Yarn测试 Demo Java搭建Spark程序,提交到Yarn测试 Demo pow文件依赖 <?xml version="1.0&qu ...

  7. Spark Standalone -- 独立集群模式、Spark 提交任务的两种模式、spark在yarn上运行的环境搭建、自己写的spark代码如何提交到yarn上并运行...

    目录 Spark Standalone -- 独立集群模式 Standalone 架构图 Standalone 的搭建 1.上传.解压.重命名 2.配置环境变量 3.修改配置文件 conf 4.同步到 ...

  8. Carbondata 1.4.0+Spark 2.2.1 On Yarn集成安装

    微信公众号(SZBigdata-Club):后续博客的文档都会转到微信公众号中.  1.公众号会持续给大家推送技术文档.学习视频.技术书籍.数据集等.  2.接受大家投稿支持.  3.对于各公司hr招 ...

  9. Spark环境搭建(Hadoop YARN模式)

    前言 按照前面环境部署中所学习的,如果我们想要一个稳定的生产Spark环境,那么最优的选择就是构建:HA StandAlone集 群. 不过在企业中, 服务器的资源总是紧张的,许多企业不管做什么业务, ...

最新文章

  1. 深度学习笔记:windows+tensorflow 指定GPU占用内存(解决gpu爆炸问题)
  2. Android 图形系统之图形缓冲区分配
  3. win下python2,3和pip2,3双版本共存
  4. )标识符不能是c语言的关键字,标识符不能是C的关键字
  5. 将你一张表的值覆盖_山西联通携手华为完成长风商务区宏微协同,立体覆盖,打造5G精品网络...
  6. 基于visual Studio2013解决面试题之1404希尔排序
  7. java socket.close_java – Socket.close()在Socket.connect()期间无效
  8. aspen怎么做灵敏度分析_【技巧篇】Aspen系列篇之——灵敏度分析
  9. MarkDown学习手册
  10. 如何给网页设置自己的字体
  11. 如何批量删除 Word 中的页眉页脚、图片、超链接等内容?
  12. 给在读研究生+未来要读研同学们的一封受益匪浅的信
  13. 突发事件检测: kleinberg 状态机模型
  14. 研究生数学建模竞赛-无人机在抢险救灾中的优化应用
  15. 移动互联网技术课程简介
  16. java使用字符流进行写入和读取
  17. 怎么修改图片尺寸大小?这几种修改尺寸方法很简单
  18. Python自动化办公:将文本文档内容批量分类导入Excel表格
  19. visio中公式太小_串联管道/并联管道中调节阀可调比R的计算
  20. 三相逆变器 PID 工作原理

热门文章

  1. 范式青春er,寻找同行的你!
  2. debug Tensorflow: ‘_UserObject‘ object has no attribute ‘add_slot‘
  3. AI理论知识整理(16)-线性方程组有解
  4. 有粉丝想转行推荐算法,我觉得......
  5. 【机器学习基础】(四):通俗理解支持向量机SVM及代码实践
  6. 【数据竞赛】从0梳理1场时间序列赛事!
  7. 【NLP】相当全面:各种深度学习模型在文本分类任务上的应用
  8. 深度学习的一些经验总结和建议| To do v.s Not To Do
  9. 干货 | BBR及其在实时音视频领域的应用
  10. 视频直播关键技术:流畅、拥塞和延时追赶