Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

1、spreadOutApp尽量平均分配到每个executor上;

2、非spreadOutApp尽量在使用单个executor的资源。

源码分析

org.apache.spark.deploy.master.Master

1、首先判断,master状态不是ALIVE的话,直接返回
2、调度driver
3、 Application的调度机制(核心之核心,重中之重)

源码如下:

  1    /*
  2    *schedule()解决了spark资源调度的问题
  3    */
  4   private def schedule() {
  5     //首先判断,master状态不是ALIVE的话,直接返回
  6     //也就是说,stanby master是不会进行application等资源调度的
  7     if (state != RecoveryState.ALIVE) { return }
  8
  9     // First schedule drivers, they take strict precedence over applications
 10     // Randomization helps balance drivers
 11
 12     //Random.shuffle的原理,大家要清楚,就是对传入的集合的元素进行随机的打乱
 13     //取出了workers中的所有之前注册上来的worker,进行过滤,必须是状态为ALIVE的worker
 14     //对状态为ALIVE的worker,调用Random的shuffle方法进行随机的打乱
 15     val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
 16     val numWorkersAlive = shuffledAliveWorkers.size
 17     var curPos = 0
 18
 19     //首先,调度driver
 20     //为什么要调度driver,大家想一下,什么情况下,会注册driver,并且会导致driver被调度
 21     //其实 ,只有用yarn-cluster模式提交的时候,才会注册driver;因为standalone和yarn-client模式,都会在本地直接
 22     //启动driver,而不会来注册driver,就更不可能让master调度driver了
 23
 24     //driver调度机制
 25     //遍历waittingDrivers ArrayBuffer
 26     for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
 27       // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
 28       // start from the last worker that was assigned a driver, and continue onwards until we have
 29       // explored all alive workers.
 30       var launched = false
 31       var numWorkersVisited = 0
 32
 33       //while的条件,numWorkersVisited小于numWorkersAlive
 34       //什么意思?就是说,只要还有活着的worker没有遍历到,那么就继续进行遍历
 35       //而且,当前这个driver还没有被启动,也就是launched为false
 36       while (numWorkersVisited < numWorkersAlive && !launched) {
 37         val worker = shuffledAliveWorkers(curPos)
 38         numWorkersVisited += 1
 39
 40         //如果当前这个worker的空闲内存量大于等于,driver需要的内存
 41         //并且worker的空闲cpu数量,大于等于driver需要的cpu数量
 42         if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
 43           //启动driver
 44           launchDriver(worker, driver)
 45           //并且将driver从waitingDrivers队列中移除
 46           waitingDrivers -= driver
 47           launched = true
 48         }
 49
 50         //将指针指向下一个worker
 51         curPos = (curPos + 1) % numWorkersAlive
 52       }
 53     }
 54
 55     // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
 56     // in the queue, then the second app, etc.
 57     // Application的调度机制(核心之核心,重中之重)
 58     // 首先, application的调度算法有两种,一种是spreadOutApps,另一种是非spreadOutApps
 59     if (spreadOutApps) {
 60       // Try to spread out each app among all the nodes, until it has all its cores
 61
 62       //首先,遍历waitingApps中的ApplicationInfo,并且过滤出application还需要高度的cores的application
 63       for (app <- waitingApps if app.coresLeft > 0) {
 64         //从workers中,过滤状态为ALIVE的,再次过滤可以被Application使用的Worker,然后按照剩余cpu数量倒序排序
 65         val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
 66           .filter(canUse(app, _)).sortBy(_.coresFree).reverse
 67         val numUsable = usableWorkers.length
 68         //创建一个空数组,存储了要分配给每个worker的cpu数量
 69         val assigned = new Array[Int](numUsable) // Number of cores to give on each node
 70         //获取到底要分配多少cpu,取app剩余要分配的cpu的数量和worker总共可用cpu数量的最小值
 71         var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
 72
 73         //通过这种算法,其实会将每个application,要启动的executor,都平均分布到各个worker上去
 74         //比如有20个cpu core要分配,那么实际会循环两遍worker,每次循环,给每个worker分配1个core
 75         //最后每个worker分配了2个core
 76
 77         //while条件,只要要分配的cpu,还没有分配完,就继续循环
 78         var pos = 0
 79         while (toAssign > 0) {
 80           //每一个worker,如果空闲的cpu数量大于,已经分配出去的cpu数量
 81           //也就是说,worker还有可分配的cpu
 82           if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
 83             //将总共要分配的cpu数量-1,因为这里已经决定在这个worker上分配一个cpu了
 84             toAssign -= 1
 85             //给这个worker分配的cpu数量,加1
 86             assigned(pos) += 1
 87           }
 88           //指针移动到下一下worker
 89           pos = (pos + 1) % numUsable
 90         }
 91
 92         // Now that we've decided how many cores to give on each node, let's actually give them
 93         // 给每个worker分配完application要求的cpu core之后
 94         // 遍历worker
 95         for (pos <- 0 until numUsable) {
 96           //只要判断之前给这个worker分配到了core
 97           if (assigned(pos) > 0) {
 98             //首先,在application内部缓存结构中,添加executor
 99             //并且创建ExecutorDesc对象,其中封装了,给这个executor分配多少个cpu core
100             //在spark-submit脚本中,可以指定要多少executor,每个execuor多少个cpu,多少内存
101             //那么基于源码机制,实际上,executor的实际数量,以及每个executor的cpu,可能与配置是不一样的
102             //因为,我人帝里基于总的cpu来分配的,就是比如,要求3个executor,每个要3个cpu,那么比如,有9个workers,每个有1个cpu
103             //那么其实总其知道,要分配9个core,其实根据这种算法,会给每个worker分配一个core,然后给每个worker启动一个executor
104             //最后会启动,9个executor,每个executor有1个cpu core
105             val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
106             //那么就在worker上启动executor
107             launchExecutor(usableWorkers(pos), exec)
108             //将application状态设置为running
109             app.state = ApplicationState.RUNNING
110           }
111         }
112       }
113     } else {
114       // Pack each app into as few nodes as possible until we've assigned all its cores
115
116       //非spreadOutApps调度算法
117
118       //这种算法与spreadOutApps算法正好相反,1、spreadOutApp尽量平均分配到每个executor上;2、非spreadOutApp尽量在使用单个executor的资源。
119       //每个application,都尽可能分配到尽量少的worker上去,比如总其有10个worker,每个有10个core
120       //app总共要分配 20个core,那么其实,只会分配到两个worker上,每个worker都占满10个core
121       //那么,其余的app,就只能 分配到下一个worker了
122       //比如,spark-submit里,配置的是要10个executor,每个要2个core,那么总共是20个croe
123       //只会启动2个executor,每个有10个cores
124
125       //将每个Application,尽可能少的分配到worker上去
126       //首先,遍历worker,并且是状态为ALIVE,还有空闲cpu的worker
127       for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
128         //遍历application,并且是还有城朵分配的core的application
129         for (app <- waitingApps if app.coresLeft > 0) {
130           //判断,如果当前这个worker可以被 application使用
131           if (canUse(app, worker)) {
132             //取worker剩余cpu数量,与app要分配的cpu数量的最小值
133             val coresToUse = math.min(worker.coresFree, app.coresLeft)
134             //如果Worker剩余cpu为0了,就不分配了
135             if (coresToUse > 0) {
136               // 给app添加一个executor
137               val exec = app.addExecutor(worker, coresToUse)
138               //在worker上启动executor
139               launchExecutor(worker, exec)
140               //将application状态设置为running
141               app.state = ApplicationState.RUNNING
142             }
143           }
144         }
145       }
146     }
147   }

转载于:https://www.cnblogs.com/haoyy/p/6173703.html

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法相关推荐

  1. android asynctask源码分析,Android通过Handler与AsyncTask两种方式动态更新ListView(附源码)...

    本文实例讲述了Android通过Handler与AsyncTask两种方式动态更新ListView的方法.分享给大家供大家参考,具体如下: 有时候我们需要修改已经生成的列表,添加或者修改数据,noti ...

  2. 从源码分析DEARGUI之动态绘图的两种方法

    from dearpygui.dearpygui import * import numpy as npdef addButtons(*args):add_button("添加的按钮1&qu ...

  3. Spark RPC框架源码分析(二)RPC运行时序

    前情提要: Spark RPC框架源码分析(一)简述 一. Spark RPC概述 上一篇我们已经说明了Spark RPC框架的一个简单例子,Spark RPC相关的两个编程模型,Actor模型和Re ...

  4. Apache Storm 实时流处理系统通信机制源码分析

    我们今天就来仔细研究一下Apache Storm 2.0.0-SNAPSHOT的通信机制.下面我将从大致思想以及源码分析,然后我们细致分析实时流处理系统中源码通信机制研究. 1. 简介 Worker间 ...

  5. AsyncHttpClient源码分析-基于Netty的连接池实现

    原文地址:asynchttpclient源码分析-基于Netty的连接池实现 最近项目重构,有了个机会更多接触一个有别于HttpAsyncClient的异步网络框架AsyncHttpClient,是个 ...

  6. uboot源码分析(基于S5PV210)之启动第一阶段

    目录 一.start.S引入 1.u-boot.lds中找到start.S入口 2.SourceInsight中如何找到文件 3.SI中找文件技巧 二.start.S解析 1.不简单的头文件包含 2. ...

  7. iostat IO统计原理linux内核源码分析----基于单通道SATA盘

    iostat IO统计原理linux内核源码分析----基于单通道SATA盘 先上一个IO发送submit_bio流程图,本文基本就是围绕该流程讲解. 内核版本 3.10.96 详细的源码注释:htt ...

  8. spark读取文件源码分析-2

    文章目录 1. job1产生时机源码分析 1. DataSoure.getOrInferFileFormatSchema() 2. ParquetFileFormat.inferSchema 1. 简 ...

  9. ART虚拟机 | Cleaner机制源码分析

    目录 思考问题 1.Android为什么要将Finalize机制替换成Cleaner机制? 2.Cleaner机制回收Native堆内存的原理是什么? 3.Cleaner机制源码是如何实现的? 一.版 ...

最新文章

  1. nodejs没有net模块_Node.js实战16:用http模块创建web服务器
  2. Matlab实用程序--图形应用-填充图
  3. 排队两小时买墨茉、虎头局,但别指望年轻人“味蕾忠诚”
  4. Comparable与Comparator浅析
  5. css-样式重构-代码分享
  6. qr分解求线性方程组_梯度下降求解线性方程组算例设计
  7. Variant 与 内存泄露
  8. Python len函数 - Python零基础入门教程
  9. Loj #6060. 「2017 山东一轮集训 Day1 / SDWC2018 Day1」Set
  10. 能改变原生web前端元素样式的water.css
  11. HDU-6341 Problem J. Let Sudoku Rotate(dfs 剪枝)
  12. Linux服务器authorized_keys添加公钥后登录仍需要密码
  13. [Scikit-learn教程] 03.02 文本处理:分类与优化
  14. 网络拓扑图自动生成_SAP ABAP关键字语法图和ABAP代码自动生成工具Code Composer
  15. 可以在linux下运行的u盘制作工具,U盘Linux制作工具LiveUSB Creator下载
  16. 使用Cisco Packet Tracer之图解无线网络全网互联
  17. 《Spring Cloud Netflix》--服务注册和服务发现-Eureka的深入了解
  18. 4G终端附着时如何选择SGW和PGW
  19. 基于STM32-消防栓监测系统毕业设计---论文(附加最全面的从硬件电路设计->驱动程序设计->阿里云物联网搭建->安卓APP设计)
  20. 商品订购系统设计c语言有链表和指针,2014年计算机二级考试VF冲刺试卷及答案 2...

热门文章

  1. 高等数学笔记-乐经良老师-第五章-积分(Ⅰ)-定积分与不定积分-第四节-不定积分
  2. Cocos 2.x 扩展开发教程
  3. 51博客,51,博客,文章,新闻,主页,51主页,帅举,shuaiju,boke,51boke,51zhuye,zhuye
  4. 第三个Sprint ------第十一天
  5. 建筑业和VECP项目(上)
  6. Linux - systemd 依赖
  7. 服务器系统wlanapi,wlanapi.dll
  8. xmd:AP transaction timeout: ACK = 0x01, expected=0x02)
  9. 大数据技术原理与应用第8讲:数据仓库Hive
  10. 总结软连接与硬连接的区别