在《Spark源码分析之Job提交运行总流程概述》一文中,我们提到了,Job提交与运行的第一阶段Stage划分与提交,可以分为三个阶段:

1、Job的调度模型与运行反馈;

2、Stage划分;

3、Stage提交:对应TaskSet的生成。

今天,我们就结合源码来分析下第一个小阶段:Job的调度模型与运行反馈。

首先由DAGScheduler负责将Job提交到事件队列eventProcessLoop中,等待调度执行。入口方法为DAGScheduler的runJon()方法。代码如下:

/*** Run an action job on the given RDD and pass all the results to the resultHandler function as* they arrive.** @param rdd target RDD to run tasks on* @param func a function to run on each partition of the RDD* @param partitions set of partitions to run on; some jobs may not want to compute on all*   partitions of the target RDD, e.g. for operations like first()* @param callSite where in the user program this job was called* @param resultHandler callback to pass each result to* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name** @throws Exception when the job fails*/def runJob[T, U](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],callSite: CallSite,resultHandler: (Int, U) => Unit,properties: Properties): Unit = {// 开始时间val start = System.nanoTime// 调用submitJob()方法,提交Job,返回JobWaiter// rdd为最后一个rdd,即target RDD to run tasks on// func为该rdd上每个分区需要执行的函数,a function to run on each partition of the RDD// partitions为该rdd上需要执行操作的分区集合,set of partitions to run on// callSite为用户程序job被调用的地方,where in the user program this job was calledval waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)// JobWaiter调用awaitResult()方法等待结果waiter.awaitResult() match {case JobSucceeded => // Job运行成功logInfo("Job %d finished: %s, took %f s".format(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))case JobFailed(exception: Exception) =>// Job运行失败logInfo("Job %d failed: %s, took %f s".format(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.val callerStackTrace = Thread.currentThread().getStackTrace.tailexception.setStackTrace(exception.getStackTrace ++ callerStackTrace)throw exception}}

runJob()方法就做了三件事:

首先,获取开始时间,方便最后计算Job执行时间;

其次,调用submitJob()方法,提交Job,返回JobWaiter类型的对象waiter;

最后,waiter调用JobWaiter的awaitResult()方法等待Job运行结果,这个运行结果就俩:JobSucceeded代表成功,JobFailed代表失败。

awaitResult()方法通过轮询标志位_jobFinished,如果为false,则调用this.wait()继续等待,否则说明Job运行完成,返回JobResult,其代码如下:

def awaitResult(): JobResult = synchronized {// 循环,如果标志位_jobFinished为false,则一直循环,否则退出,返回JobResultwhile (!_jobFinished) {this.wait()}return jobResult}

而这个标志位_jobFinished是在Task运行完成后,如果已完成Task数目等于总Task数目时,或者整个Job运行失败时设置的,随着标志位的设置,Job运行结果jobResult也同步进行设置,代码如下:

// 任务运行完成override def taskSucceeded(index: Int, result: Any): Unit = synchronized {if (_jobFinished) {throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")}resultHandler(index, result.asInstanceOf[T])finishedTasks += 1// 已完成Task数目是否等于总Task数目if (finishedTasks == totalTasks) {// 设置标志位_jobFinished为ture_jobFinished = true// 作业运行结果为成功jobResult = JobSucceededthis.notifyAll()}}// 作业失败override def jobFailed(exception: Exception): Unit = synchronized {// 设置标志位_jobFinished为ture_jobFinished = true// 作业运行结果为失败jobResult = JobFailed(exception)this.notifyAll()}

接下来,看看submitJob()方法,代码定义如下:

/*** Submit an action job to the scheduler.** @param rdd target RDD to run tasks on* @param func a function to run on each partition of the RDD* @param partitions set of partitions to run on; some jobs may not want to compute on all*   partitions of the target RDD, e.g. for operations like first()* @param callSite where in the user program this job was called* @param resultHandler callback to pass each result to* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name** @return a JobWaiter object that can be used to block until the job finishes executing*         or can be used to cancel the job.** @throws IllegalArgumentException when partitions ids are illegal*/def submitJob[T, U](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],callSite: CallSite,resultHandler: (Int, U) => Unit,properties: Properties): JobWaiter[U] = {// Check to make sure we are not launching a task on a partition that does not exist.// 检测rdd分区以确保我们不会在一个不存在的partition上launch一个taskval maxPartitions = rdd.partitions.lengthpartitions.find(p => p >= maxPartitions || p < 0).foreach { p =>throw new IllegalArgumentException("Attempting to access a non-existent partition: " + p + ". " +"Total number of partitions: " + maxPartitions)}// 为Job生成一个jobId,jobId为AtomicInteger类型,getAndIncrement()确保了原子操作性,每次生成后都自增val jobId = nextJobId.getAndIncrement()// 如果partitions大小为0,即没有需要执行任务的分区,快速返回if (partitions.size == 0) {// Return immediately if the job is running 0 tasksreturn new JobWaiter[U](this, jobId, 0, resultHandler)}assert(partitions.size > 0)// func转化下,否则JobSubmitted无法接受这个func参数,T转变为_val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]// 创建一个JobWaiter对象val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)// eventProcessLoop加入一个JobSubmitted事件到事件队列中eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,SerializationUtils.clone(properties)))// 返回JobWaiterwaiter}

submitJob()方法一共做了5件事情:

第一,数据检测,检测rdd分区以确保我们不会在一个不存在的partition上launch一个task,并且,如果partitions大小为0,即没有需要执行任务的分区,快速返回;

第二,为Job生成一个jobId,该jobId为AtomicInteger类型,getAndIncrement()确保了原子操作性,每次生成后都自增;

第三,将func转化下,否则JobSubmitted无法接受这个func参数,T转变为_;

第四,创建一个JobWaiter对象waiter,该对象会在方法结束时返回给上层方法,以用来监测Job运行结果;

第五,将一个JobSubmitted事件加入到事件队列eventProcessLoop中,等待工作线程轮询调度(速度很快)。

这里,我们有必要研究下事件队列eventProcessLoop,eventProcessLoop为DAGSchedulerEventProcessLoop类型的,在DAGScheduler初始化时被定义并赋值,代码如下:

// 创建DAGSchedulerEventProcessLoop类型的成员变量eventProcessLoopprivate[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)

DAGSchedulerEventProcessLoop继承自EventLoop,我们先来看看这个EventLoop的定义。

/*** An event loop to receive events from the caller and process all events in the event thread. It* will start an exclusive event thread to process all events.* EventLoop用来接收来自调用者的事件并在event thread中除了所有的事件。它将开启一个专门的事件处理线程处理所有的事件。** Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can* handle events in time to avoid the potential OOM.*/
private[spark] abstract class EventLoop[E](name: String) extends Logging {// LinkedBlockingDeque类型的事件队列,队列元素为E类型private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()// 标志位private val stopped = new AtomicBoolean(false)// 事件处理线程private val eventThread = new Thread(name) {// 设置为后台线程setDaemon(true)override def run(): Unit = {try {// 如果标志位stopped没有被设置为true,一直循环while (!stopped.get) {// 从事件队列中take一条事件val event = eventQueue.take()try {// 调用onReceive()方法进行处理onReceive(event)} catch {case NonFatal(e) => {try {onError(e)} catch {case NonFatal(e) => logError("Unexpected error in " + name, e)}}}}} catch {case ie: InterruptedException => // exit even if eventQueue is not emptycase NonFatal(e) => logError("Unexpected error in " + name, e)}}}def start(): Unit = {if (stopped.get) {throw new IllegalStateException(name + " has already been stopped")}// Call onStart before starting the event thread to make sure it happens before onReceiveonStart()eventThread.start()}def stop(): Unit = {if (stopped.compareAndSet(false, true)) {eventThread.interrupt()var onStopCalled = falsetry {eventThread.join()// Call onStop after the event thread exits to make sure onReceive happens before onStoponStopCalled = trueonStop()} catch {case ie: InterruptedException =>Thread.currentThread().interrupt()if (!onStopCalled) {// ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since// it's already called.onStop()}}} else {// Keep quiet to allow calling `stop` multiple times.}}/*** Put the event into the event queue. The event thread will process it later.* 将事件加入到时间队列。事件线程过会会处理它。*/def post(event: E): Unit = {// 将事件加入到待处理队列eventQueue.put(event)}/*** Return if the event thread has already been started but not yet stopped.*/def isActive: Boolean = eventThread.isAlive/*** Invoked when `start()` is called but before the event thread starts.*/protected def onStart(): Unit = {}/*** Invoked when `stop()` is called and the event thread exits.*/protected def onStop(): Unit = {}/*** Invoked in the event thread when polling events from the event queue.** Note: Should avoid calling blocking actions in `onReceive`, or the event thread will be blocked* and cannot process events in time. If you want to call some blocking actions, run them in* another thread.*/protected def onReceive(event: E): Unit/*** Invoked if `onReceive` throws any non fatal error. Any non fatal error thrown from `onError`* will be ignored.*/protected def onError(e: Throwable): Unit}

我们可以看到,EventLoop实际上就是一个任务队列及其对该队列一系列操作的封装。在它内部,首先定义了一个LinkedBlockingDeque类型的事件队列,队列元素为E类型,其中DAGSchedulerEventProcessLoop存储的则是DAGSchedulerEvent类型的事件,代码如下:

// LinkedBlockingDeque类型的事件队列,队列元素为E类型private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

并提供了一个后台线程,专门对事件队列里的事件进行监控,并调用onReceive()方法进行处理,代码如下:

// 事件处理线程private val eventThread = new Thread(name) {// 设置为后台线程setDaemon(true)override def run(): Unit = {try {// 如果标志位stopped没有被设置为true,一直循环while (!stopped.get) {// 从事件队列中take一条事件val event = eventQueue.take()try {// 调用onReceive()方法进行处理onReceive(event)} catch {case NonFatal(e) => {try {onError(e)} catch {case NonFatal(e) => logError("Unexpected error in " + name, e)}}}}} catch {case ie: InterruptedException => // exit even if eventQueue is not emptycase NonFatal(e) => logError("Unexpected error in " + name, e)}}}

那么如何向队列中添加事件呢?调用其post()方法,传入事件即可。如下:

/*** Put the event into the event queue. The event thread will process it later.* 将事件加入到时间队列。事件线程过会会处理它。*/def post(event: E): Unit = {// 将事件加入到待处理队列eventQueue.put(event)}

言归正传,上面提到,submitJob()方法利用eventProcessLoop的post()方法加入一个JobSubmitted事件到事件队列中,那么DAGSchedulerEventProcessLoop对于JobSubmitted事件是如何处理的呢?我们看它的onReceive()方法,源码如下:

/*** The main event loop of the DAG scheduler.* DAGScheduler中事件主循环*/override def onReceive(event: DAGSchedulerEvent): Unit = {val timerContext = timer.time()try {// 调用doOnReceive()方法,将DAGSchedulerEvent类型的event传递进去doOnReceive(event)} finally {timerContext.stop()}}

继续看doOnReceive()方法,代码如下:

// 事件处理调度函数private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {// 如果是JobSubmitted事件,调用dagScheduler.handleJobSubmitted()方法处理case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)// 如果是MapStageSubmitted事件,调用dagScheduler.handleMapStageSubmitted()方法处理case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)case StageCancelled(stageId) =>dagScheduler.handleStageCancellation(stageId)case JobCancelled(jobId) =>dagScheduler.handleJobCancellation(jobId)case JobGroupCancelled(groupId) =>dagScheduler.handleJobGroupCancelled(groupId)case AllJobsCancelled =>dagScheduler.doCancelAllJobs()case ExecutorAdded(execId, host) =>dagScheduler.handleExecutorAdded(execId, host)case ExecutorLost(execId) =>dagScheduler.handleExecutorLost(execId, fetchFailed = false)case BeginEvent(task, taskInfo) =>dagScheduler.handleBeginEvent(task, taskInfo)case GettingResultEvent(taskInfo) =>dagScheduler.handleGetTaskResult(taskInfo)case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>dagScheduler.handleTaskCompletion(completion)case TaskSetFailed(taskSet, reason, exception) =>dagScheduler.handleTaskSetFailed(taskSet, reason, exception)case ResubmitFailedStages =>dagScheduler.resubmitFailedStages()}

对于JobSubmitted事件,我们通过调用DAGScheduler的handleJobSubmitted()方法来处理。

好了,到这里,第一阶段Job的调度模型与运行反馈大体已经分析完了,至于后面的第二、第三阶段,留待后续博文继续分析吧~

Spark源码分析之二:Job的调度模型与运行反馈相关推荐

  1. spark 源码分析之二十 -- Stage的提交

    引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...

  2. Spark源码分析之九:内存管理模型

    Spark是现在很流行的一个基于内存的分布式计算框架,既然是基于内存,那么自然而然的,内存的管理就是Spark存储管理的重中之重了.那么,Spark究竟采用什么样的内存管理模型呢?本文就为大家揭开Sp ...

  3. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  4. Spark 源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 一. 启动篇 (一) 引子 在spark-shell终端执行 val arr = Array(1,2,3,4) val rdd ...

  5. spark 源码分析之十八 -- Spark存储体系剖析

    本篇文章主要剖析BlockManager相关的类以及总结Spark底层存储体系. 总述 先看 BlockManager相关类之间的关系如下: 我们从NettyRpcEnv 开始,做一下简单说明. Ne ...

  6. spark 源码分析之十九 -- DAG的生成和Stage的划分

    上篇文章 spark 源码分析之十八 -- Spark存储体系剖析 重点剖析了 Spark的存储体系.从本篇文章开始,剖析Spark作业的调度和计算体系. 在说DAG之前,先简单说一下RDD. 对RD ...

  7. Spark源码分析之Sort-Based Shuffle读写流程

    一 概述 我们知道Spark Shuffle机制总共有三种: # 未优化的Hash Shuffle:每一个ShuffleMapTask都会为每一个ReducerTask创建一个单独的文件,总的文件数是 ...

  8. Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(1)

    一.综述 HDFS写文件是整个Hadoop中最为复杂的流程之一,它涉及到HDFS中NameNode.DataNode.DFSClient等众多角色的分工与合作. 首先上一段代码,客户端是如何写文件的: ...

  9. Spring Cloud源码分析(二)Ribbon(续)

    因文章长度限制,故分为两篇.上一篇:<Spring Cloud源码分析(二)Ribbon> 负载均衡策略 通过上一篇对Ribbon的源码解读,我们已经对Ribbon实现的负载均衡器以及其中 ...

最新文章

  1. mkyaffs2image编译
  2. CodeSmith实体类模板
  3. linux python开发环境sql数据迁移到mysql_运用Python语言编写获取Linux基本系统信息(三):Python与数据库编程,把获取的信息存入数据库...
  4. 关于排版与交互的问题
  5. AutoCad2012安装与使用
  6. 爬虫必备的防止反爬虫策略
  7. vue在新的标签页打开pdf文件
  8. 【将金令】炒白银,切忌!切忌!
  9. pandas算加权平均值_pandas和groupby:如何计算agg中的加权平均值
  10. 技术问答-1 跨平台
  11. php中=%3e -%3e的区别,华为nova3e与nova2s买哪个好?华为nova2s和nova3e区别对比详细评测...
  12. 微信浏览器页面默认背景色的问题
  13. 微信小程序上传图片 预览 删除
  14. ubuntu下U盘文件全部变成只读模式
  15. 大数据应用在医疗行业的5个经典案例
  16. 展锐智能机平台sc9820e调试pwm背光所遇问题小结
  17. 网络及其服务配置------网络配置
  18. Linux 两台主机之间建立信任关系方式及基本原理
  19. 《大道至简 第二章》读后感
  20. 盲沟低比?不不不,是芒果的笔,今天学一下分布式文件存储数据库MongoDB。

热门文章

  1. Yann LeCun遭三位UC伯克利教授连怼:双重职位多重危害
  2. 像人一样脑补世界!DeepMind历时一年半搞出GQN,登上Science
  3. 加州出台严格无人车路测新政:要求各公司尽快申请部署许可
  4. 老司机和驾驶辅助系统相处得如何?MIT研究人员做了个科学研究
  5. 大厂2020届实习生笔试题
  6. Ubuntu 16.04 - 64bit 解压 rar 报错 Parsing Filters not supported
  7. Scala 入门学习
  8. OC无法进行多人会话
  9. 你还不会小程序啊?手把手带你做第一个和服务器交互的小程序
  10. python (continue与break)区别