spark 调度模块详解及源码分析
spark 调度模块详解及源码分析
@(SPARK)[spark]
- spark 调度模块详解及源码分析
- 一概述
- 一三个主要的类
- 1class DAGScheduler
- 2trait TaskScheduler
- 3trait SchedulerBackend
- 二基本流程
- 三TaskScheduler SchedulerBackend
- 一三个主要的类
- 二DAGScheduler
- 一用户代码中创建SparkContext对象SparkContext中创建DAGScheduler与TaskSchedulerTaskSchedulerBackend对象
- 1用户代码中创建SparkContext对象
- 2SparkContext源码简单分析
- 3SparkContext创建DAGScheduler与TaskSchedulerTaskSchedulerBackend对象
- 4createTaskScheduler创建TaskSchedulerTaskSchedulerBackend对象的具体过程
- 二用户代码创建各种transformation与至少一个action这个action会通过SparkContextrunJob调用DAGSchedulerrunJob
- 1textFile
- 2filter
- 3count
- 三步骤三DAGScheduler提交作业划分stage并生成最终的TaskSet
- 1创建DAGScheduler对象的详细实现
- 2作业的提交
- 3stage的划分
- 4任务的生成
- 一用户代码中创建SparkContext对象SparkContext中创建DAGScheduler与TaskSchedulerTaskSchedulerBackend对象
- 三TaskScheduler TaskSchedulerBackend
一、概述
通过spark-submit向集群提交应用后,spark就开始了调度的过程,其调度模块主要包括2部分:
* DAGScheduler:负责将用户提交的计算任务分割成不同的stage。
* TaskScheduler & SchedulerBackend:负责将stage中的task提交到集群中。
先看一下2个核心的图:
(一)三个主要的类
与调度模块相关的三个主要的类均位于org.apache.spark.scheduler,包括:
1、class DAGScheduler
负责分析用户提交的应用,并根据计算任务的依赖关系建立DAG,然后将DAG划分成不同的Stage,其中每个stage由可以并发执行的一组task构成,这些task的逻辑完全相同,只是作用于不同的数据。
DAGScheduler在不同的资源管理框架下的实现是完全相同的。
2、trait TaskScheduler
TaskScheduler从DAGScheduler中接收不同的stage的任务,并且向集群调度这些任务。
yarn-cluster的具体实现为:YarnClusterScheduler
yarn-client的具体实现为:YarnScheduler
3、trait SchedulerBackend
SchedulerBackend向当前等待分配资源的task分配计算资源,并且在分配的executor中启动task。
yarn-cluster的具体实现为:YarnClusterSchedulerBackend。
yarn-client的具体实现为:YarnClientSchedulerBackend
(二)基本流程
图片来源于spark内幕技术P45.
(三)TaskScheduler & SchedulerBackend
图片来源于spark内幕技术P44.
TaskScheduler侧重于调度,而SchedulerBackend是实际运行。
每个SchedulerBackend都会对应一个唯一的TaskScheduler。注意图中的TaskScheduler & SchedulerBackend都会有针对于yarn的特定实现。
二、DAGScheduler
DAGScheduler在不同的资源管理框架下的实现是完全相同的。因为DAGScheduler实现的功能是将DAG划分为不同的stage,这是根据宽依赖进行划分的,每个宽依赖均会调用shuffle,以此作为一个新的stage。这与具体的资源管理框架无关。
每个stage由可以并发执行的一组task构成,这些task的执行逻辑完全相同,只是作用于不同的数据。
DAGScheduler与TaskScheduler都是在SparkContext创建的时候创建的。其中TaskScheduler是通过SparkContext#createTaskScheduler创建的,而DAGScheduler是直接调用它的构造函数创建的。只不过,DAGScheduler保存了TaskScheduler的引用,因此需要先创建TaskScheduler。
=================================
步骤一:用户代码中创建SparkContext对象,SparkContext中创建DAGScheduler与TaskScheduler/TaskSchedulerBackend对象
步骤二:用户代码构建各种tranformation及至少一个action,这个action会通过SparkContext#runJob调用DAGScheduler#runJob
步骤三:DAGScheduler提交作业到一队列,handleJobSubmitted从这个队列取出作业,划分stage,并开始生成最终的TaskSet,调用submitTasks()向TaskScheduler提交任务
(一)用户代码中创建SparkContext对象,SparkContext中创建DAGScheduler与TaskScheduler/TaskSchedulerBackend对象
1、用户代码中创建SparkContext对象
下面我们从一个简单的程进行出发,分析它的进行过程。程序如下:
package com.lujinhong.sparkdemoimport org.apache.spark.SparkContext
object GrepWord {def grepCountLog(sc:SparkContext,path: String, keyWord: String) {println("grep " + keyWord + " in " + path + ", the lineCount is: ")val all = sc.textFile(path)val ret = all.filter(line => line.contains(keyWord))println(ret.count)}def main(args: Array[String]) {val sc = new SparkContext();grepCountLog(sc,"/src/20151201", "\"keyword\": \"20302\"");}
}
上面的代码很简单,指定一个目录,搜索这个目录中的文件有多少个keyword。分为三步:读入文件,过滤关键字,count。
我们这里主要分析yarn-cluster/client的模式,根据前面的分析,我们向YARN提交应用后,YARN会返回分配资源,然后启动AM。在AM中的driver会开始执行用户的代码,开始进行调度。详细分析请见:
http://blog.csdn.net/lujinhong2/article/details/50344095
那我们这里就从用户的代码开始继续往下分析。
用户代码中开始的时候必须首先创建一个SparkContext,我们看一下SparkContext的代码,以及它被创建时执行了哪些操作。
先看一下官方的一个图。DriverProgram就是用户提交的程序,在用户代码中创建一个SparkContext的对象。SparkContext是所有Spark应用的入口,它负责和整个集群的交互,包括创建RDD,累积器、广播变量等。
每个JVM中只能有一个SparkContext,在创建一个新的Context前,你必须先stop()旧的。这个限制可能会在以后去掉,见SPARK-2243。
2、SparkContext源码简单分析
SparkContext完成了以下几个主要的功能:
(1)创建RDD,通过类似textFile等的方法。
(2)与资源管理器交互,通过runJob等方法启动应用。
(3)创建DAGScheduler、TaskScheduler等。
3、SparkContext创建DAGScheduler与TaskScheduler/TaskSchedulerBackend对象
创建一个SparkContext,只需要一个SparkConf参数,表示一些配置项。如果未指定参数,则会创建一个默认的SparkConf,如我们代码中的:
val sc = new SparkContext();
创建的代码为:
def this() = this(new SparkConf())
在SparkContext的一个try模块中,会进行一些初始化的工作,其中一部分是创建了DAGScheduler与TaskScheduler/TaskSchedulerBackend对象。
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's constructor
_taskScheduler.start()
其中TaskScheduler/TaskSchedulerBackend对象通过createTaskScheduler()方法进行创建,而DAGScheduler对象直接使用构建函数创建。
4、createTaskScheduler:创建TaskScheduler/TaskSchedulerBackend对象的具体过程
SparkContext通过createTaskScheduler来同时创建TaskScheduler/TaskSchedulerBackend对象。它接收的参数mater是一个url,或者一个yarn-cluster等的字符串,指明了使用哪种运行模式。
createTaskScheduler函数中主要就是match各种master,然后创建相应的TaskScheduler/TaskSchedulerBackend对象。
我们先看一下yarn-cluster的:
case "yarn-standalone" | "yarn-cluster" =>if (master == "yarn-standalone") {logWarning("\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.")}val scheduler = try {val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")val cons = clazz.getConstructor(classOf[SparkContext])cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]} catch {// TODO: Enumerate the exact reasons why it can fail// But irrespective of it, it means we cannot proceed !case e: Exception => {throw new SparkException("YARN mode not available ?", e)}}val backend = try {val clazz =Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]} catch {case e: Exception => {throw new SparkException("YARN mode not available ?", e)}}scheduler.initialize(backend)(backend, scheduler)
TaskScheduler的实现类为:
org.apache.spark.scheduler.cluster.YarnClusterScheduler
TaskSchedulerBacked的实现类为:
org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend
其中后者需要前者来创建:
scheduler.initialize(backend)
最后返回一个元组:
(backend, scheduler)
再看看yarn-client的:
case "yarn-client" =>val scheduler = try {val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")val cons = clazz.getConstructor(classOf[SparkContext])cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]} catch {case e: Exception => {throw new SparkException("YARN mode not available ?", e)}}val backend = try {val clazz =Utils.classForName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]} catch {case e: Exception => {throw new SparkException("YARN mode not available ?", e)}}scheduler.initialize(backend)(backend, scheduler)
TaskScheduler的实现类为:
org.apache.spark.scheduler.cluster.YarnScheduler
TaskSchedulerBacked的实现类为:
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend
(二)用户代码创建各种transformation与至少一个action,这个action会通过SparkContext#runJob调用DAGScheduler#runJob
在用户代码中创建了一个SparkContext对象后,就可以开始创建RDD,转换RDD等了。我们的代码只有3行:
val all = sc.textFile(path)
val ret = all.filter(line => line.contains(keyWord))
println(ret.count)
1、textFile
我们先看一下如何创建一个RDD:
def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {assertNotStopped()hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],minPartitions).map(pair => pair._2.toString)}
读取一个hadoop支持的类型的文件,返回一个String类型的RDD。
它接收2个参数,一个是文件路径,一个是最小分区数,默认为:
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)。
注意,这里使用的是min,因此如果没指定分区数量,最大的情况下就是2个分区了,详细分析请见: https://github.com/mesos/spark/pull/718
最后是调用hadoopFile来创建RDD的:
def hadoopFile[K, V](path: String,inputFormatClass: Class[_ <: InputFormat[K, V]],keyClass: Class[K],valueClass: Class[V],minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {assertNotStopped()// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)new HadoopRDD(this,confBroadcast,Some(setInputPathsFunc),inputFormatClass,keyClass,valueClass,minPartitions).setName(path)}
2、filter
仅返回符合条件的元素组成的RDD。
3、count
count的逻辑很简单:
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
sum用于统计一个集合中的元素数量。
但它调用了SparkContext的runJob开始执行任务了,我们分析一下这个过程。
SparkContext定义了多个runJob的形式,但它最后的调用为:
def runJob[T, U: ClassTag](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],resultHandler: (Int, U) => Unit): Unit = {if (stopped.get()) {throw new IllegalStateException("SparkContext has been shutdown")}val callSite = getCallSiteval cleanedFunc = clean(func)logInfo("Starting job: " + callSite.shortForm)if (conf.getBoolean("spark.logLineage", false)) {logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)}dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)progressBar.foreach(_.finishAll())rdd.doCheckpoint()}
关键代码就是一行:
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
开始调用DAGScheduler的runJob了。
(三)步骤三:DAGScheduler提交作业,划分stage,并生成最终的TaskSet
经过上面的分析,我们知道了DAGScheduler与TaskScheduler/TaskSchedulerBackend对象的对象是如何创建的,并分析到了由用户代码出发,如果至调用dagScheduler.runJob。下面我们分析一下dagScheduler.runJob完成了什么功能。
1、创建DAGScheduler对象的详细实现
上面介绍过在SparkContext中会通过DAGScheduler的构建函数创建一个DAGScheduler对象,具体是如何实现的呢?
class DAGScheduler(private[scheduler] val sc: SparkContext,private[scheduler] val taskScheduler: TaskScheduler,listenerBus: LiveListenerBus,mapOutputTracker: MapOutputTrackerMaster,blockManagerMaster: BlockManagerMaster,env: SparkEnv,clock: Clock = new SystemClock())extends Logging {......}
看一下DAGScheduler的主构造函数。
* SparkContext:就是前面创建的对象。
* taskScheduler:DAGScheduler保存一个taskScheduler,当最后处理完生成TaskSet时,需要调用submitMissingTasks,而在这个方法中会调用taskScheduler.submitTasks(),就是将TaskSet交由taskScheduler进行下一步的处理。
* mapOutputTracker:是运行在Driver端管理shuffle的中间输出位置信息的。
* blockManagerMaster:也是运行在Driver端的,它是管理整个Job的Bolck信息。
2、作业的提交
首先注意区分2个概述:
job: 每个action都是执行runJob方法,可以将之视为一个job。
stage:在这个job内部,会根据宽依赖,划分成多个stage。
前面说过,用户代码中存在一个action时,它最终会调用SparkContext#runJob(),而SparkContext#runJob()的最后一步都是调用DAGScheduler#runJob()
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
而DAGScheduler#runJob()的核心代码为:
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
即调用submitJob方法,我们进一步看看submitJob()
def submitJob[T, U](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],callSite: CallSite,resultHandler: (Int, U) => Unit,properties: Properties): JobWaiter[U] = {
.... val jobId = nextJobId.getAndIncrement()
.....val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,SerializationUtils.clone(properties)))waiter}
submitJob()方法主要完成了以下3个工作:
* 获取一个新的jobId
* 生成一个JobWaiter,它会监听Job的执行状态,而Job是由多个Task组成的,因此只有当Job的所有Task均已完成,Job才会标记成功
* 最后调用eventProcessLoop.post()将Job提交到一个队列中,等待处理。这是一个典型的生产者消费者模式。这些消息都是通过handleJobSubmitted来处理。
简单看一下handleJobSubmitted是如何被调用的。
首先是DAGSchedulerEventProcessLoop#onReceive调用doOnReceive:
/*** The main event loop of the DAG scheduler.*/override def onReceive(event: DAGSchedulerEvent): Unit = {val timerContext = timer.time()try {doOnReceive(event)} finally {timerContext.stop()}}
DAGSchedulerEventProcessLoop是EventLoop的子类,它重写了EventLoop的onReceive方法。以后再分析这个EventLoop。
然后,doOnReceive会调用handleJobSubmitted。
3、stage的划分
刚才说到handleJobSubmitted会从eventProcessLoop中取出Job来进行处理,处理的第一步就是将Job划分成不同的stage。handleJobSubmitted主要2个工作,一是进行stage的划分,这是这部分要介绍的内容;二是创建一个activeJob,并生成一个任务,这在下一小节介绍。
private[scheduler] def handleJobSubmitted(jobId: Int,finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],callSite: CallSite,listener: JobListener,properties: Properties) {...finalStage = newResultStage(finalRDD, partitions.length, jobId, callSite).....activeJobs += job......submitStage(finalStage)}submitWaitingStages()}
newResultStage()经过多层调用后,最终会调用getParentStages()。
因为是从最终的stage往回推算的,这需要计算最终stage所依赖的各个stage。
4、任务的生成
回到handleJobSubmitted中的代码:
submitStage(finalStage)
submitStage会提交finalStage,如果这个stage的某些parentStage未提交,则递归调用submitStage(),直至所有的stage均已计算完成。
submitStage()会调用submitMissingTasks():
submitMissingTasks(stage, jobId.get)
而submitMissingTasks()会完成DAGScheduler最后的工作:它判断出哪些Partition需要计算,为每个Partition生成Task,然后这些Task就会封闭到TaskSet,最后提交给TaskScheduler进行处理。
taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
三、TaskScheduler && TaskSchedulerBackend
上文分析到在DAGScheduler中最终会执行taskScheduler.submitTasks()方法,我们先简单看一下从这里开始往下的执行逻辑:
(1)taskScheduler#submitTasks()
(2) schedulableBuilder#addTaskSetManager()
(3)CoarseGrainedSchedulerBackend#reviveOffers()
(4)CoarseGrainedSchedulerBackend#makeOffers()
(5)TaskSchedulerImpl#resourceOffers
(6)CoarseGrainedSchedulerBackend#launchTasks
(7)executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
步骤一、二中主要将这组任务的TaskSet加入到一个TaskSetManager中。TaskSetManager会根据数据就近原则为task分配计算资源,监控task的执行状态等,比如失败重试,推测执行等。
步骤三、四逻辑较为简单。
步骤五为每个task具体分配资源,它的输入是一个Executor的列表,输出是TaskDescription的二维数组。TaskDescription包含了TaskID, Executor ID和task执行的依赖信息等。
步骤六、七就是将任务真正的发送到executor中执行了,并等待executor的状态返回。
spark 调度模块详解及源码分析相关推荐
- spark RDD详解及源码分析
spark RDD详解及源码分析 @(SPARK)[spark] spark RDD详解及源码分析 一基础 一什么是RDD 二RDD的适用范围 三一些特性 四RDD的创建 1由一个已经存在的scala ...
- hadoop作业初始化过程详解(源码分析第三篇)
(一)概述 我们在上一篇blog已经详细的分析了一个作业从用户输入提交命令到到达JobTracker之前的各个过程.在作业到达JobTracker之后初始化之前,JobTracker会通过submit ...
- SpringMVC异常处理机制详解[附带源码分析]
SpringMVC异常处理机制详解[附带源码分析] 参考文章: (1)SpringMVC异常处理机制详解[附带源码分析] (2)https://www.cnblogs.com/fangjian0423 ...
- FPGA学习之路—接口(2)—I2C协议详解+Verilog源码分析
FPGA学习之路--I2C协议详解+Verilog源码分析 定义 I2C Bus(Inter-Integrated Circuit Bus) 最早是由Philips半导体(现被NXP收购)开发的两线时 ...
- HashMap、ConcurretnHashMap面试题详解,源码分析
文章目录 面试题 HashMap.LinkedHashMap和TreeMap的区别是什么? ①:为什么hashmap每次扩容大小为2的n次方? ③:jdk1.7的hashmap的扩容操作是在元素插入之 ...
- 安卓PopupWindow使用详解与源码分析(附项目实例)
基本用法 首先定义弹窗的Layout文件 res/layout/popup_window.xml <?xml version="1.0" encoding="utf ...
- Epoll详解及源码分析
文章来源:http://blog.csdn.net/chen19870707/article/details/42525887 Author:Echo Chen(陈斌) Email:chenb1987 ...
- JDK动态代理实现原理详解(源码分析)
无论是静态代理,还是Cglib动态代理,都比较容易理解,本文就通过进入源码的方式来看看JDK动态代理的实现原理进行分析 要了解动态代理的可以参考另一篇文章,有详细介绍,这里仅仅对JDK动态代理做源码分 ...
- 解密android日志xlog,XLog 详解及源码分析
一.前言 这里的 XLog 不是微信 Mars 里面的 xLog,而是elvishew的xLog.感兴趣的同学可以看看作者 elvishwe 的官文史上最强的 Android 日志库 XLog.这里先 ...
最新文章
- 福利 | 如何创造可信的AI?人工智能大牛盖瑞·马库斯的11条建议
- excel 用VBA将所有单元格内容全部转换为文本
- Python的pycurl库升级升级失败的解决方法
- python做平面设计有前途吗_现在学平面设计还有发展前景吗?
- Taro+react开发(61) 一条虚线
- java 责任链模式 链表_责任链模式的实现及源码中应用
- POJ 2185 Milking Grid (KMP,GCD)
- python时间格式转换为美式日期_如何将日期时间格式的排列转换为python中的打印?...
- 鸿蒙智慧屏安装应用,谁说华为智慧屏不能装APP,我来打脸了,附零难度安装APP教程...
- 网络流行语“不作不死”英文入选美国词典
- oracle 添加表权限不足,oracle 创建表空间报权限不足,引发的问题如下 | 学步园...
- 已知 XYZ+YZZ=532,其中,X、Y、Z 为数字,编程求出 X、Y 和 Z 的值
- 数据来源渠道及采集工具_几款简单好用的爬虫抓取数据采集工具
- 右手螺旋判断磁感应强度方向_如何判断磁感应强度方向 方法是什么
- ionic4.x仿京东 - 10.2.确认订单-去结算跳到确认订单(返回特定页面),确认订单页面布局
- 最好用的 20 款数据可视化工具
- WiFi温湿度传感器开发
- androidlib.java_实现 Java SDK 库
- DiskMan使用方法
- Shell语法详解专栏目录
热门文章
- poj 2342 树形DP
- 《数据结构》c语言版学习笔记——单链表结构(线性表的链式存储结构Part1)
- 0x0000050蓝屏srvsys_win7电脑蓝屏,显示的应该是srv.sys造成的,是什么情况?应该如何处理?...
- java中生成不重复随机的数字
- mysql中给用户添加密码_MySql中添加用户,新建数据库,用户授权,删除用户,修改密码...
- python常用方法总结-Python3常用函数、方法总结(持续更新…)
- mysql 数据趋势,2019年8月全球数据库流行度排行--oracle、mysql增长趋势明显
- linux下后缀为so的文件怎么打开,linux中.so后缀的文件怎么使用啊
- 深度学习tensorflow变量op
- java广度优先遍历