目录

1、Job Stage划分

2、Task最佳位置

3、总结

3.1 Stage划分总结:

3.2 Task最佳位置总结:


1、Job Stage划分

Spark Application中因为不同的Action触发众多的Job,也就是说一个Application中可以有很多的Job,每个Job是由是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行。而Stage划分的依据就是宽依赖。下面以RDD的collect方法为例:

(1)他是一个action会触发一个具体的作业runJob

(2)runJob有很多重载方法,不断地往里调用,最后交给dagScheduler的runJob,在dagScheduler的runJob交给了submitJob,后面还有一个等待作业结果看成功还是失败,会有相应的动作。

(3)在submitJob中首先看一下分区长度,是因为要进行计算,这个肯定是RDD导致的action他要校验一下是不是在运行的时候相应的Partition存在。

eventProcessLoop调用post的时候有个Jobsubmitted的参数,他是一个case class,因为一个application中可能有很多的Job,不同的job的Jobsubmitted实例不一样所以不能用case object。他里面封装了job的id,最后一个RDD,具体对RDD操作的函数,有哪些Partition要被计算,监听作业状态等。

他的核心就是将Jobsubmitted交给eventProcessLoop。他是通过post方法post给eventProcessLoop,这个post其实就是发往EventLoop里面的eventQueue

(4)发现在EventLoop里面开辟了一个线程,他是setDaemon方式作为后台线程,因为要在后台做不断的循环(如果是前台线程的话对垃圾回收是有影响的),在run方法里面会不断的循环我们的消息队列,从eventQueue(是一个LinkedBlockingDeque,我们可以往他里面信息)中获得消息,调用了onReceive,发现在里面没有具体的实现所以在DAGSchedulerEventProcessLoop中对onReceive进行了实现,这里就收到了DAGSchedulerEvent,这里面再调用doOnReceive。doOnReceive收到信息就开始处理

(5)接下来就是HandleJobSubmited。这个时候Stage就开始了。我们知道最后一个Stage一定是ResultStage,前面所有的Stage都是ShuffleMapStage。

(6)发现有个getOrCreateParentStages的方法,开始创建ResultStage的父stage,里面有多个嵌套获取shuffle依赖和循环创建shuffleMapStage,若没有shuffle,操作则返回空list

进入到创建父Stage的方法getOrCreateParentStages,这里仅仅是抽取当前RDD的shuffle依赖,shuffleMapStage,如果不是shuffleDependency就继续抽取父RDD,迭代遍历一直到抽取出为止或者没有

进入getOrCreateShuffleMapStage方法中,进行匹配能不能取到ParentStage的值,当没有parentStage的时候会返回空,能取到就返回stage,ShuffleMapStage是根据遍历出的ShuffleDependencies一次次创建出来的

进入createShuffleMapStage方法 此方法是递归循环创建shuffleMapStage的过程

这个时候ShuffleMapStage已经创建完成了,并不是一次就创建完成,而是遇见shuffle的时候会由下往上递归创建ShuffleMapStage

(7)构建完所有的ShuffleMapStage后,将其作为参数创建ResultStage

(8)最后将Stage和id关联,更新job所有的Stage,并将Stage返回出去。

(9)回到handleJobsubmited方法中,finalStage构建完之后,新建一个ActiveJob保存了当前job的一些信息,打印一堆日志之类。getMissingParentStages(finalStage)根据finalStage,刚才找父Stage的时候如果有的话直接返回,如果没有的话就会创建,所以如果曾经有就不需要再去做。listenerBus.post监听事件,最后submitStage(finalStage)。

首先获得id,如果jobId是defined的话再次getMissingParentStages(stage)获得missing的stage之后判断一下是否为空,如果为空的话就submitMissingTasks(stage, jobId.get)个就是没有前置性的Tasks,也就是没有父Stage。在这个底层其实是DAGScheduler把这个处理的过程交给具体的TaskScheduler去处理

2、Task最佳位置

(1)在handleJobsubmited方法中最后是最后调用submitStage,在他里面会调用submitMissingTasks

(2)这里面有很多代码,我们要关心Stage本身的算法以及Task任务本地性把当前的Stage加进去,然后对Stage进行判断,一种是ShuffleMapStage,一种是ResultStage。继续往下走会看到taskIdToLocations这是关键的代码,taskIdToLocations是一个Map

partitionsToCompute这里面获得是具体的要计算的PartitionID,我们我们这边看到的map里面的id是Partition的id。这里面匿名函数,产生的是tuple根据Partition的id。后面toMap就是Partition的id和TaskLocation的位置。

(3)进入到getPreferredLocs(stage.rdd, id),进来的是RDD,PartitionID返回的是一个集合。

再进入getPreferredLocsInternal

visited: HashSet[(RDD[_], Int)]这个HashSet开始是空,所以直接传进来一个new HashSet,然后判断visited如果已经有的话,那么添加就不成功,那么就是已经计算了数据本地性了,就返回Nil。

下面的cached就是已经在DAGScheduler的内存数据结构中了。进入getCacheLocs,这边返回的是序列,cacheLocs是一个HashMap,这包含了每个RDD的Partition的id以及id对应的taskLocation,这个包含了Stage本身也包含了Stage内部任务的本地性

(4)回到getPreferredLocsInternal中,上面是看一下DAGScheduler中有没有缓存根据Partition而保存的数据本地性的内容,如果不为空的话就把内容返回。然后调用下面的getpreferdLocations(如果自定义一个RDD的话是一定要写这个方法的)

(5)最后判断一下如果是窄依赖的话就自己调用自己

3、总结

3.1 Stage划分总结:

(1)Action触发Job,开始逆向分析job执行过程Action中利用SparkContext runJob路由到dagScheduler.runJob(rdd,func,分区数,其他),提交Job作业;

(2)DAGScheduler的runJob中调用submitJob并返回监听waiter,生命周期内监听Job状态;

(3)在submitJob内部,将该获取到的Job(已有JobId),插入到名为eventProcessLoop的LinkedBlockingDeque结构的事件处理队列中;

(4)eventProcessLoop放入新事件后,调起底层的DAGSchedulerEventProcessLoop的onReceive方法;

(5)执行doOnReceive,根据DAGSchedulerEvent的具体类型如JobSubmitted事件或者MapStageSubmitted事件,调取具体的Submitted handle函数提交具体的Job;

(6)以JobSubmitted为例,在handleJobSubmitted内部,返回从ResultStage 建立stage 建立finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite),finalStage激活Job val job = new ActiveJob(jobId, finalStage, callSite, listener, properties),同时开始逆向构建缺失的stage;

(7)DAG构建完毕,提交stage,submitStage(finalStage),submitStage中stage提交为tasks,submitMissingTasks(),submitMissingTasks,根据ShuffleMapStage还是ResultStage创建 ShuffleMapTask 或 ResultTask。

(7)taskScheduler.submitTasks()开始调起具体的task

3.2 Task最佳位置总结:

(1)在划分Stage的时候submitMissingTasks方法中会有一个taskIdToLocations的属性,他的结构为 Map[Int, Seq[TaskLocation]],他保存的就是PartitionID及其对应的最佳位置

(2)在对taskIdToLocations赋值的时候会调用getPreferredLocs方法,再路由到getPreferredLocsInternal返回最佳位置Seq[TaskLocation]

(3)在getPreferredLocsInternal方法中

①判断rdd的partition是否被访问过,如果被访问过,则什么都不做

②然后判断DAGScheduler的内存中是否cache了在当前Paritition的信息,如果有的话直接返回

③如果没有cache,则调用rdd.getPreferredLocations方法,获取RDD partition的最佳位置

④遍历RDD的依赖,如果有窄依赖,遍历父依赖的partition,对遍历到的每个partition,递归调用getPreferredLocsInternal方法

即从第一个窄依赖的第一个partition开始,然后将每个partition的最佳位置,添加到序列中,最后返回所有partition的最佳位置序列

注意:DAGScheduler计算数据本地性的时候借助了RDD自身的getPreferredLocations中的数据,因为getPreferredLocations中表明了每个Partition的数据本地性,虽然当前Partition可能被persist或者checkpoint,但是persist或者checkpoint默认情况下肯定是和getPreferredLocations中的Partition的数据本地性是一致的,所以这就极大的简化Task数据本地性算法的实现和效率的优化。

Stage划分和Task最佳位置相关推荐

  1. Spark 的核心 RDD 以及 Stage 划分细节,运行模式总结

    精选30+云产品,助力企业轻松上云!>>> 阅读文本大概需要 5 分钟. 以下内容,部分参考网络资料,也有自己的理解, 图片 99% 为自己制作.如有错误,欢迎留言指出,一起交流. ...

  2. Spark技术内幕:Stage划分及提交源码分析

    当触发一个RDD的action后,以count为例,调用关系如下: org.apache.spark.rdd.RDD#count org.apache.spark.SparkContext#runJo ...

  3. 用实例说明Spark stage划分原理

    注意:此文的stage划分有错,stage的划分是以shuffle操作作为边界的,可以参考<spark大数据处理技术>第四章page rank例子! 参考:http://litaotao. ...

  4. Spark基础学习笔记19:RDD的依赖与Stage划分

    文章目录 零.本讲学习目标 一.RDD的依赖 (一)窄依赖 1.map()与filter()算子 2.union()算子 3.join()算子 (二)宽依赖 1.groupBy()算子 2.join( ...

  5. spark重要参数调优建议:spark.default.parallelism设置每个stage默认的task数量

    spark.default.parallelism 参数说明:该参数用于设置每个stage的默认task数量.这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能. 参数调优建议:Spar ...

  6. android studio电影院选座,8排电影院选座最佳位置

    8排电影院选座最佳位置在哪里呢?8排电影院属于小影厅,小影厅银幕宽度在10米以下,座位100以内,座位排数通常拥有8-14排,小影厅整体空间小,选座时要选中间稍靠后一些的位置.由于整体排数少,因此选即 ...

  7. 基于顺序存储结构的图书信息表的最佳位置图书的查找(C++)

    描述 定义一个包含图书信息(书号.书名.价格)的顺序表,读入相应的图书数据来完成图书信息表的创建,然后根据指定的最佳位置的序号,查找该位置上的图书,输出相应图书的信息. 输入 总计n+m+2行.首先输 ...

  8. 开始位置 环状图_【技术分享】如何找到压铸模具中真空阀的最佳位置?

    与砂型和重力铸造相比,传统压铸件的微观结构不尽人意,主要原因是高速金属流在浇口处的喷射,要比金属缓慢喂入砂型或金属模具型腔时更容易接触型腔内的空气.真空压铸工艺的重点是尽量减少这种气液接触,因此,将型 ...

  9. 6个座位办公室最佳位置_办公室座位最佳位置(讲解)

    在办公室选择办公位置也很重要,办公位置的好坏也会直接影响工作效率和进度,从而影响到个人健康以及公司的财运等问题. 所以办公室座位不得不重视,下面大师为大家讲解关于办公室位置最佳位置. 办公室座位最佳位 ...

最新文章

  1. NEJM | 益生菌LGG治疗肠胃炎无效,Immunity|LGG促进生骨
  2. Access中复制表
  3. asp.net mvc连接mysql_ASP.Net MVC连接MySQL和Code First的使用
  4. 错误: libstdc++.so.6: cannot open shared object file: No such file or directory
  5. 类型与通用语言运行时
  6. Python调用ansible API系列(一)获取资产信息
  7. ES5-拓展 箭头函数的this、this的优先级
  8. [渝粤教育] 潍坊职业学院 化工安全技术 参考 资料
  9. mysql sqlyog讲解_详细讲解如何用SQLyog来分析MySQL数据库
  10. 用友T3 反结账反记账
  11. 长字符串的算术编码matlab,算术编码及MATLAB实现
  12. MVVM?瞎搞一波?
  13. android安卓手机分屏多窗口实现方法
  14. android热修复技术tinker,Android热修复方案第一弹——Tinker篇
  15. eBay Inc(EBAY)2020年第三季度收益电话会议记录
  16. 【英语语法入门】 第29讲 情态动词的否定和疑问
  17. 论文阅读:《A Wavenet For Speech Denoising》
  18. 建模方法(十)-灰色预测模型GM(1,1)
  19. PAT甲级1009 Product of Polynomials (25分)
  20. matlab主成分分析散点图_基于matlab的主成分分析与因子分析

热门文章

  1. spring boot 实战
  2. jmeter持续集成测试中mongodb版本问题
  3. 获取要素集中字段的唯一值
  4. Angular之简单的登录注册
  5. Hibernate写hql语句与不写hql语句的区别?
  6. 常见的getchar 与EOF的问题
  7. worth,worthy,worthwhile的区别(一)
  8. Windows Store App 获取文件及文件夹列表
  9. Everyday is an Opportunity
  10. Javascript:前端利器 之 JSDuck