spark 调度模块详解及源码分析

@(SPARK)[spark]

  • spark 调度模块详解及源码分析
  • 一概述
    • 一三个主要的类

      • 1class DAGScheduler
      • 2trait TaskScheduler
      • 3trait SchedulerBackend
    • 二基本流程
    • 三TaskScheduler SchedulerBackend
  • 二DAGScheduler
    • 一用户代码中创建SparkContext对象SparkContext中创建DAGScheduler与TaskSchedulerTaskSchedulerBackend对象

      • 1用户代码中创建SparkContext对象
      • 2SparkContext源码简单分析
      • 3SparkContext创建DAGScheduler与TaskSchedulerTaskSchedulerBackend对象
      • 4createTaskScheduler创建TaskSchedulerTaskSchedulerBackend对象的具体过程
    • 二用户代码创建各种transformation与至少一个action这个action会通过SparkContextrunJob调用DAGSchedulerrunJob
      • 1textFile
      • 2filter
      • 3count
    • 三步骤三DAGScheduler提交作业划分stage并生成最终的TaskSet
      • 1创建DAGScheduler对象的详细实现
      • 2作业的提交
      • 3stage的划分
      • 4任务的生成
  • 三TaskScheduler TaskSchedulerBackend

一、概述

通过spark-submit向集群提交应用后,spark就开始了调度的过程,其调度模块主要包括2部分:
* DAGScheduler:负责将用户提交的计算任务分割成不同的stage。
* TaskScheduler & SchedulerBackend:负责将stage中的task提交到集群中。
先看一下2个核心的图:

(一)三个主要的类

与调度模块相关的三个主要的类均位于org.apache.spark.scheduler,包括:

1、class DAGScheduler

负责分析用户提交的应用,并根据计算任务的依赖关系建立DAG,然后将DAG划分成不同的Stage,其中每个stage由可以并发执行的一组task构成,这些task的逻辑完全相同,只是作用于不同的数据。
DAGScheduler在不同的资源管理框架下的实现是完全相同的。

2、trait TaskScheduler

TaskScheduler从DAGScheduler中接收不同的stage的任务,并且向集群调度这些任务。
yarn-cluster的具体实现为:YarnClusterScheduler
yarn-client的具体实现为:YarnScheduler

3、trait SchedulerBackend

SchedulerBackend向当前等待分配资源的task分配计算资源,并且在分配的executor中启动task。
yarn-cluster的具体实现为:YarnClusterSchedulerBackend。
yarn-client的具体实现为:YarnClientSchedulerBackend

(二)基本流程


图片来源于spark内幕技术P45.

(三)TaskScheduler & SchedulerBackend


图片来源于spark内幕技术P44.
TaskScheduler侧重于调度,而SchedulerBackend是实际运行。

每个SchedulerBackend都会对应一个唯一的TaskScheduler。注意图中的TaskScheduler & SchedulerBackend都会有针对于yarn的特定实现。

二、DAGScheduler

DAGScheduler在不同的资源管理框架下的实现是完全相同的。因为DAGScheduler实现的功能是将DAG划分为不同的stage,这是根据宽依赖进行划分的,每个宽依赖均会调用shuffle,以此作为一个新的stage。这与具体的资源管理框架无关。

每个stage由可以并发执行的一组task构成,这些task的执行逻辑完全相同,只是作用于不同的数据。

DAGScheduler与TaskScheduler都是在SparkContext创建的时候创建的。其中TaskScheduler是通过SparkContext#createTaskScheduler创建的,而DAGScheduler是直接调用它的构造函数创建的。只不过,DAGScheduler保存了TaskScheduler的引用,因此需要先创建TaskScheduler。

=================================
步骤一:用户代码中创建SparkContext对象,SparkContext中创建DAGScheduler与TaskScheduler/TaskSchedulerBackend对象
步骤二:用户代码构建各种tranformation及至少一个action,这个action会通过SparkContext#runJob调用DAGScheduler#runJob
步骤三:DAGScheduler提交作业到一队列,handleJobSubmitted从这个队列取出作业,划分stage,并开始生成最终的TaskSet,调用submitTasks()向TaskScheduler提交任务

(一)用户代码中创建SparkContext对象,SparkContext中创建DAGScheduler与TaskScheduler/TaskSchedulerBackend对象

1、用户代码中创建SparkContext对象

下面我们从一个简单的程进行出发,分析它的进行过程。程序如下:

package com.lujinhong.sparkdemoimport org.apache.spark.SparkContext
object GrepWord {def grepCountLog(sc:SparkContext,path: String, keyWord: String) {println("grep " + keyWord + " in " + path + ", the lineCount is: ")val all = sc.textFile(path)val ret = all.filter(line => line.contains(keyWord))println(ret.count)}def main(args: Array[String]) {val sc = new SparkContext();grepCountLog(sc,"/src/20151201", "\"keyword\": \"20302\"");}
}

上面的代码很简单,指定一个目录,搜索这个目录中的文件有多少个keyword。分为三步:读入文件,过滤关键字,count。
我们这里主要分析yarn-cluster/client的模式,根据前面的分析,我们向YARN提交应用后,YARN会返回分配资源,然后启动AM。在AM中的driver会开始执行用户的代码,开始进行调度。详细分析请见:
http://blog.csdn.net/lujinhong2/article/details/50344095

那我们这里就从用户的代码开始继续往下分析。

用户代码中开始的时候必须首先创建一个SparkContext,我们看一下SparkContext的代码,以及它被创建时执行了哪些操作。


先看一下官方的一个图。DriverProgram就是用户提交的程序,在用户代码中创建一个SparkContext的对象。SparkContext是所有Spark应用的入口,它负责和整个集群的交互,包括创建RDD,累积器、广播变量等。

每个JVM中只能有一个SparkContext,在创建一个新的Context前,你必须先stop()旧的。这个限制可能会在以后去掉,见SPARK-2243。

2、SparkContext源码简单分析

SparkContext完成了以下几个主要的功能:
(1)创建RDD,通过类似textFile等的方法。
(2)与资源管理器交互,通过runJob等方法启动应用。
(3)创建DAGScheduler、TaskScheduler等。

3、SparkContext创建DAGScheduler与TaskScheduler/TaskSchedulerBackend对象

创建一个SparkContext,只需要一个SparkConf参数,表示一些配置项。如果未指定参数,则会创建一个默认的SparkConf,如我们代码中的:

val sc = new SparkContext();

创建的代码为:

def this() = this(new SparkConf())

在SparkContext的一个try模块中,会进行一些初始化的工作,其中一部分是创建了DAGScheduler与TaskScheduler/TaskSchedulerBackend对象。

// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's constructor
_taskScheduler.start()

其中TaskScheduler/TaskSchedulerBackend对象通过createTaskScheduler()方法进行创建,而DAGScheduler对象直接使用构建函数创建。

4、createTaskScheduler:创建TaskScheduler/TaskSchedulerBackend对象的具体过程

SparkContext通过createTaskScheduler来同时创建TaskScheduler/TaskSchedulerBackend对象。它接收的参数mater是一个url,或者一个yarn-cluster等的字符串,指明了使用哪种运行模式。

createTaskScheduler函数中主要就是match各种master,然后创建相应的TaskScheduler/TaskSchedulerBackend对象。

我们先看一下yarn-cluster的:

case "yarn-standalone" | "yarn-cluster" =>if (master == "yarn-standalone") {logWarning("\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.")}val scheduler = try {val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")val cons = clazz.getConstructor(classOf[SparkContext])cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]} catch {// TODO: Enumerate the exact reasons why it can fail// But irrespective of it, it means we cannot proceed !case e: Exception => {throw new SparkException("YARN mode not available ?", e)}}val backend = try {val clazz =Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]} catch {case e: Exception => {throw new SparkException("YARN mode not available ?", e)}}scheduler.initialize(backend)(backend, scheduler)

TaskScheduler的实现类为:

org.apache.spark.scheduler.cluster.YarnClusterScheduler

TaskSchedulerBacked的实现类为:

org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend

其中后者需要前者来创建:

scheduler.initialize(backend)

最后返回一个元组:

(backend, scheduler)

再看看yarn-client的:

  case "yarn-client" =>val scheduler = try {val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")val cons = clazz.getConstructor(classOf[SparkContext])cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]} catch {case e: Exception => {throw new SparkException("YARN mode not available ?", e)}}val backend = try {val clazz =Utils.classForName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]} catch {case e: Exception => {throw new SparkException("YARN mode not available ?", e)}}scheduler.initialize(backend)(backend, scheduler)

TaskScheduler的实现类为:

org.apache.spark.scheduler.cluster.YarnScheduler

TaskSchedulerBacked的实现类为:

org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend

(二)用户代码创建各种transformation与至少一个action,这个action会通过SparkContext#runJob调用DAGScheduler#runJob

在用户代码中创建了一个SparkContext对象后,就可以开始创建RDD,转换RDD等了。我们的代码只有3行:

val all = sc.textFile(path)
val ret = all.filter(line => line.contains(keyWord))
println(ret.count)

1、textFile

我们先看一下如何创建一个RDD:

  def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {assertNotStopped()hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],minPartitions).map(pair => pair._2.toString)}

读取一个hadoop支持的类型的文件,返回一个String类型的RDD。
它接收2个参数,一个是文件路径,一个是最小分区数,默认为:

def defaultMinPartitions: Int = math.min(defaultParallelism, 2)。

注意,这里使用的是min,因此如果没指定分区数量,最大的情况下就是2个分区了,详细分析请见: https://github.com/mesos/spark/pull/718

最后是调用hadoopFile来创建RDD的:

  def hadoopFile[K, V](path: String,inputFormatClass: Class[_ <: InputFormat[K, V]],keyClass: Class[K],valueClass: Class[V],minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {assertNotStopped()// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)new HadoopRDD(this,confBroadcast,Some(setInputPathsFunc),inputFormatClass,keyClass,valueClass,minPartitions).setName(path)}

2、filter

仅返回符合条件的元素组成的RDD。

3、count

count的逻辑很简单:

  def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

sum用于统计一个集合中的元素数量。
但它调用了SparkContext的runJob开始执行任务了,我们分析一下这个过程。
SparkContext定义了多个runJob的形式,但它最后的调用为:

  def runJob[T, U: ClassTag](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],resultHandler: (Int, U) => Unit): Unit = {if (stopped.get()) {throw new IllegalStateException("SparkContext has been shutdown")}val callSite = getCallSiteval cleanedFunc = clean(func)logInfo("Starting job: " + callSite.shortForm)if (conf.getBoolean("spark.logLineage", false)) {logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)}dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)progressBar.foreach(_.finishAll())rdd.doCheckpoint()}

关键代码就是一行:

dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

开始调用DAGScheduler的runJob了。

(三)步骤三:DAGScheduler提交作业,划分stage,并生成最终的TaskSet

经过上面的分析,我们知道了DAGScheduler与TaskScheduler/TaskSchedulerBackend对象的对象是如何创建的,并分析到了由用户代码出发,如果至调用dagScheduler.runJob。下面我们分析一下dagScheduler.runJob完成了什么功能。

1、创建DAGScheduler对象的详细实现

上面介绍过在SparkContext中会通过DAGScheduler的构建函数创建一个DAGScheduler对象,具体是如何实现的呢?

class DAGScheduler(private[scheduler] val sc: SparkContext,private[scheduler] val taskScheduler: TaskScheduler,listenerBus: LiveListenerBus,mapOutputTracker: MapOutputTrackerMaster,blockManagerMaster: BlockManagerMaster,env: SparkEnv,clock: Clock = new SystemClock())extends Logging {......}

看一下DAGScheduler的主构造函数。
* SparkContext:就是前面创建的对象。
* taskScheduler:DAGScheduler保存一个taskScheduler,当最后处理完生成TaskSet时,需要调用submitMissingTasks,而在这个方法中会调用taskScheduler.submitTasks(),就是将TaskSet交由taskScheduler进行下一步的处理。
* mapOutputTracker:是运行在Driver端管理shuffle的中间输出位置信息的。
* blockManagerMaster:也是运行在Driver端的,它是管理整个Job的Bolck信息。

2、作业的提交

首先注意区分2个概述:
job: 每个action都是执行runJob方法,可以将之视为一个job。
stage:在这个job内部,会根据宽依赖,划分成多个stage。

前面说过,用户代码中存在一个action时,它最终会调用SparkContext#runJob(),而SparkContext#runJob()的最后一步都是调用DAGScheduler#runJob()

dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

而DAGScheduler#runJob()的核心代码为:

val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

即调用submitJob方法,我们进一步看看submitJob()

  def submitJob[T, U](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],callSite: CallSite,resultHandler: (Int, U) => Unit,properties: Properties): JobWaiter[U] = {
....    val jobId = nextJobId.getAndIncrement()
.....val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,SerializationUtils.clone(properties)))waiter}

submitJob()方法主要完成了以下3个工作:
* 获取一个新的jobId
* 生成一个JobWaiter,它会监听Job的执行状态,而Job是由多个Task组成的,因此只有当Job的所有Task均已完成,Job才会标记成功
* 最后调用eventProcessLoop.post()将Job提交到一个队列中,等待处理。这是一个典型的生产者消费者模式。这些消息都是通过handleJobSubmitted来处理。

简单看一下handleJobSubmitted是如何被调用的。
首先是DAGSchedulerEventProcessLoop#onReceive调用doOnReceive:

  /*** The main event loop of the DAG scheduler.*/override def onReceive(event: DAGSchedulerEvent): Unit = {val timerContext = timer.time()try {doOnReceive(event)} finally {timerContext.stop()}}

DAGSchedulerEventProcessLoop是EventLoop的子类,它重写了EventLoop的onReceive方法。以后再分析这个EventLoop。

然后,doOnReceive会调用handleJobSubmitted。

3、stage的划分

刚才说到handleJobSubmitted会从eventProcessLoop中取出Job来进行处理,处理的第一步就是将Job划分成不同的stage。handleJobSubmitted主要2个工作,一是进行stage的划分,这是这部分要介绍的内容;二是创建一个activeJob,并生成一个任务,这在下一小节介绍。

  private[scheduler] def handleJobSubmitted(jobId: Int,finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],callSite: CallSite,listener: JobListener,properties: Properties) {...finalStage = newResultStage(finalRDD, partitions.length, jobId, callSite).....activeJobs += job......submitStage(finalStage)}submitWaitingStages()}

newResultStage()经过多层调用后,最终会调用getParentStages()。
因为是从最终的stage往回推算的,这需要计算最终stage所依赖的各个stage。

4、任务的生成

回到handleJobSubmitted中的代码:

 submitStage(finalStage)

submitStage会提交finalStage,如果这个stage的某些parentStage未提交,则递归调用submitStage(),直至所有的stage均已计算完成。

submitStage()会调用submitMissingTasks():

submitMissingTasks(stage, jobId.get)

而submitMissingTasks()会完成DAGScheduler最后的工作:它判断出哪些Partition需要计算,为每个Partition生成Task,然后这些Task就会封闭到TaskSet,最后提交给TaskScheduler进行处理。

taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

三、TaskScheduler && TaskSchedulerBackend

上文分析到在DAGScheduler中最终会执行taskScheduler.submitTasks()方法,我们先简单看一下从这里开始往下的执行逻辑:

(1)taskScheduler#submitTasks()
(2) schedulableBuilder#addTaskSetManager()
(3)CoarseGrainedSchedulerBackend#reviveOffers()
(4)CoarseGrainedSchedulerBackend#makeOffers()
(5)TaskSchedulerImpl#resourceOffers
(6)CoarseGrainedSchedulerBackend#launchTasks
(7)executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

步骤一、二中主要将这组任务的TaskSet加入到一个TaskSetManager中。TaskSetManager会根据数据就近原则为task分配计算资源,监控task的执行状态等,比如失败重试,推测执行等。
步骤三、四逻辑较为简单。
步骤五为每个task具体分配资源,它的输入是一个Executor的列表,输出是TaskDescription的二维数组。TaskDescription包含了TaskID, Executor ID和task执行的依赖信息等。
步骤六、七就是将任务真正的发送到executor中执行了,并等待executor的状态返回。

spark 调度模块详解及源码分析相关推荐

  1. spark RDD详解及源码分析

    spark RDD详解及源码分析 @(SPARK)[spark] spark RDD详解及源码分析 一基础 一什么是RDD 二RDD的适用范围 三一些特性 四RDD的创建 1由一个已经存在的scala ...

  2. hadoop作业初始化过程详解(源码分析第三篇)

    (一)概述 我们在上一篇blog已经详细的分析了一个作业从用户输入提交命令到到达JobTracker之前的各个过程.在作业到达JobTracker之后初始化之前,JobTracker会通过submit ...

  3. SpringMVC异常处理机制详解[附带源码分析]

    SpringMVC异常处理机制详解[附带源码分析] 参考文章: (1)SpringMVC异常处理机制详解[附带源码分析] (2)https://www.cnblogs.com/fangjian0423 ...

  4. FPGA学习之路—接口(2)—I2C协议详解+Verilog源码分析

    FPGA学习之路--I2C协议详解+Verilog源码分析 定义 I2C Bus(Inter-Integrated Circuit Bus) 最早是由Philips半导体(现被NXP收购)开发的两线时 ...

  5. HashMap、ConcurretnHashMap面试题详解,源码分析

    文章目录 面试题 HashMap.LinkedHashMap和TreeMap的区别是什么? ①:为什么hashmap每次扩容大小为2的n次方? ③:jdk1.7的hashmap的扩容操作是在元素插入之 ...

  6. 安卓PopupWindow使用详解与源码分析(附项目实例)

    基本用法 首先定义弹窗的Layout文件 res/layout/popup_window.xml <?xml version="1.0" encoding="utf ...

  7. Epoll详解及源码分析

    文章来源:http://blog.csdn.net/chen19870707/article/details/42525887 Author:Echo Chen(陈斌) Email:chenb1987 ...

  8. JDK动态代理实现原理详解(源码分析)

    无论是静态代理,还是Cglib动态代理,都比较容易理解,本文就通过进入源码的方式来看看JDK动态代理的实现原理进行分析 要了解动态代理的可以参考另一篇文章,有详细介绍,这里仅仅对JDK动态代理做源码分 ...

  9. 解密android日志xlog,XLog 详解及源码分析

    一.前言 这里的 XLog 不是微信 Mars 里面的 xLog,而是elvishew的xLog.感兴趣的同学可以看看作者 elvishwe 的官文史上最强的 Android 日志库 XLog.这里先 ...

最新文章

  1. 福利 | 如何创造可信的AI?人工智能大牛盖瑞·马库斯的11条建议
  2. excel 用VBA将所有单元格内容全部转换为文本
  3. Python的pycurl库升级升级失败的解决方法
  4. python做平面设计有前途吗_现在学平面设计还有发展前景吗?
  5. Taro+react开发(61) 一条虚线
  6. java 责任链模式 链表_责任链模式的实现及源码中应用
  7. POJ 2185 Milking Grid (KMP,GCD)
  8. python时间格式转换为美式日期_如何将日期时间格式的排列转换为python中的打印?...
  9. 鸿蒙智慧屏安装应用,谁说华为智慧屏不能装APP,我来打脸了,附零难度安装APP教程...
  10. 网络流行语“不作不死”英文入选美国词典
  11. oracle 添加表权限不足,oracle 创建表空间报权限不足,引发的问题如下 | 学步园...
  12. 已知 XYZ+YZZ=532,其中,X、Y、Z 为数字,编程求出 X、Y 和 Z 的值
  13. 数据来源渠道及采集工具_几款简单好用的爬虫抓取数据采集工具
  14. 右手螺旋判断磁感应强度方向_如何判断磁感应强度方向 方法是什么
  15. ionic4.x仿京东 - 10.2.确认订单-去结算跳到确认订单(返回特定页面),确认订单页面布局
  16. 最好用的 20 款数据可视化工具
  17. WiFi温湿度传感器开发
  18. androidlib.java_实现 Java SDK 库
  19. DiskMan使用方法
  20. Shell语法详解专栏目录

热门文章

  1. poj 2342 树形DP
  2. 《数据结构》c语言版学习笔记——单链表结构(线性表的链式存储结构Part1)
  3. 0x0000050蓝屏srvsys_win7电脑蓝屏,显示的应该是srv.sys造成的,是什么情况?应该如何处理?...
  4. java中生成不重复随机的数字
  5. mysql中给用户添加密码_MySql中添加用户,新建数据库,用户授权,删除用户,修改密码...
  6. python常用方法总结-Python3常用函数、方法总结(持续更新…)
  7. mysql 数据趋势,2019年8月全球数据库流行度排行--oracle、mysql增长趋势明显
  8. linux下后缀为so的文件怎么打开,linux中.so后缀的文件怎么使用啊
  9. 深度学习tensorflow变量op
  10. java广度优先遍历