Stage划分和Task最佳位置
目录
1、Job Stage划分
2、Task最佳位置
3、总结
3.1 Stage划分总结:
3.2 Task最佳位置总结:
1、Job Stage划分
Spark Application中因为不同的Action触发众多的Job,也就是说一个Application中可以有很多的Job,每个Job是由是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行。而Stage划分的依据就是宽依赖。下面以RDD的collect方法为例:
(1)他是一个action会触发一个具体的作业runJob
(2)runJob有很多重载方法,不断地往里调用,最后交给dagScheduler的runJob,在dagScheduler的runJob交给了submitJob,后面还有一个等待作业结果看成功还是失败,会有相应的动作。
(3)在submitJob中首先看一下分区长度,是因为要进行计算,这个肯定是RDD导致的action他要校验一下是不是在运行的时候相应的Partition存在。
eventProcessLoop调用post的时候有个Jobsubmitted的参数,他是一个case class,因为一个application中可能有很多的Job,不同的job的Jobsubmitted实例不一样所以不能用case object。他里面封装了job的id,最后一个RDD,具体对RDD操作的函数,有哪些Partition要被计算,监听作业状态等。
他的核心就是将Jobsubmitted交给eventProcessLoop。他是通过post方法post给eventProcessLoop,这个post其实就是发往EventLoop里面的eventQueue
(4)发现在EventLoop里面开辟了一个线程,他是setDaemon方式作为后台线程,因为要在后台做不断的循环(如果是前台线程的话对垃圾回收是有影响的),在run方法里面会不断的循环我们的消息队列,从eventQueue(是一个LinkedBlockingDeque,我们可以往他里面信息)中获得消息,调用了onReceive,发现在里面没有具体的实现所以在DAGSchedulerEventProcessLoop中对onReceive进行了实现,这里就收到了DAGSchedulerEvent,这里面再调用doOnReceive。doOnReceive收到信息就开始处理
(5)接下来就是HandleJobSubmited。这个时候Stage就开始了。我们知道最后一个Stage一定是ResultStage,前面所有的Stage都是ShuffleMapStage。
(6)发现有个getOrCreateParentStages的方法,开始创建ResultStage的父stage,里面有多个嵌套获取shuffle依赖和循环创建shuffleMapStage,若没有shuffle,操作则返回空list
进入到创建父Stage的方法getOrCreateParentStages,这里仅仅是抽取当前RDD的shuffle依赖,shuffleMapStage,如果不是shuffleDependency就继续抽取父RDD,迭代遍历一直到抽取出为止或者没有
进入getOrCreateShuffleMapStage方法中,进行匹配能不能取到ParentStage的值,当没有parentStage的时候会返回空,能取到就返回stage,ShuffleMapStage是根据遍历出的ShuffleDependencies一次次创建出来的
进入createShuffleMapStage方法 此方法是递归循环创建shuffleMapStage的过程
这个时候ShuffleMapStage已经创建完成了,并不是一次就创建完成,而是遇见shuffle的时候会由下往上递归创建ShuffleMapStage
(7)构建完所有的ShuffleMapStage后,将其作为参数创建ResultStage
(8)最后将Stage和id关联,更新job所有的Stage,并将Stage返回出去。
(9)回到handleJobsubmited方法中,finalStage构建完之后,新建一个ActiveJob保存了当前job的一些信息,打印一堆日志之类。getMissingParentStages(finalStage)根据finalStage,刚才找父Stage的时候如果有的话直接返回,如果没有的话就会创建,所以如果曾经有就不需要再去做。listenerBus.post监听事件,最后submitStage(finalStage)。
首先获得id,如果jobId是defined的话再次getMissingParentStages(stage)获得missing的stage之后判断一下是否为空,如果为空的话就submitMissingTasks(stage, jobId.get)个就是没有前置性的Tasks,也就是没有父Stage。在这个底层其实是DAGScheduler把这个处理的过程交给具体的TaskScheduler去处理
2、Task最佳位置
(1)在handleJobsubmited方法中最后是最后调用submitStage,在他里面会调用submitMissingTasks
(2)这里面有很多代码,我们要关心Stage本身的算法以及Task任务本地性把当前的Stage加进去,然后对Stage进行判断,一种是ShuffleMapStage,一种是ResultStage。继续往下走会看到taskIdToLocations这是关键的代码,taskIdToLocations是一个Map
partitionsToCompute这里面获得是具体的要计算的PartitionID,我们我们这边看到的map里面的id是Partition的id。这里面匿名函数,产生的是tuple根据Partition的id。后面toMap就是Partition的id和TaskLocation的位置。
(3)进入到getPreferredLocs(stage.rdd, id),进来的是RDD,PartitionID返回的是一个集合。
再进入getPreferredLocsInternal
visited: HashSet[(RDD[_], Int)]这个HashSet开始是空,所以直接传进来一个new HashSet,然后判断visited如果已经有的话,那么添加就不成功,那么就是已经计算了数据本地性了,就返回Nil。
下面的cached就是已经在DAGScheduler的内存数据结构中了。进入getCacheLocs,这边返回的是序列,cacheLocs是一个HashMap,这包含了每个RDD的Partition的id以及id对应的taskLocation,这个包含了Stage本身也包含了Stage内部任务的本地性
(4)回到getPreferredLocsInternal中,上面是看一下DAGScheduler中有没有缓存根据Partition而保存的数据本地性的内容,如果不为空的话就把内容返回。然后调用下面的getpreferdLocations(如果自定义一个RDD的话是一定要写这个方法的)
(5)最后判断一下如果是窄依赖的话就自己调用自己
3、总结
3.1 Stage划分总结:
(1)Action触发Job,开始逆向分析job执行过程Action中利用SparkContext runJob路由到dagScheduler.runJob(rdd,func,分区数,其他),提交Job作业;
(2)DAGScheduler的runJob中调用submitJob并返回监听waiter,生命周期内监听Job状态;
(3)在submitJob内部,将该获取到的Job(已有JobId),插入到名为eventProcessLoop的LinkedBlockingDeque结构的事件处理队列中;
(4)eventProcessLoop放入新事件后,调起底层的DAGSchedulerEventProcessLoop的onReceive方法;
(5)执行doOnReceive,根据DAGSchedulerEvent的具体类型如JobSubmitted事件或者MapStageSubmitted事件,调取具体的Submitted handle函数提交具体的Job;
(6)以JobSubmitted为例,在handleJobSubmitted内部,返回从ResultStage 建立stage 建立finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite),finalStage激活Job val job = new ActiveJob(jobId, finalStage, callSite, listener, properties),同时开始逆向构建缺失的stage;
(7)DAG构建完毕,提交stage,submitStage(finalStage),submitStage中stage提交为tasks,submitMissingTasks(),submitMissingTasks,根据ShuffleMapStage还是ResultStage创建 ShuffleMapTask 或 ResultTask。
(7)taskScheduler.submitTasks()开始调起具体的task
3.2 Task最佳位置总结:
(1)在划分Stage的时候submitMissingTasks方法中会有一个taskIdToLocations的属性,他的结构为 Map[Int, Seq[TaskLocation]],他保存的就是PartitionID及其对应的最佳位置
(2)在对taskIdToLocations赋值的时候会调用getPreferredLocs方法,再路由到getPreferredLocsInternal返回最佳位置Seq[TaskLocation]
(3)在getPreferredLocsInternal方法中
①判断rdd的partition是否被访问过,如果被访问过,则什么都不做
②然后判断DAGScheduler的内存中是否cache了在当前Paritition的信息,如果有的话直接返回
③如果没有cache,则调用rdd.getPreferredLocations方法,获取RDD partition的最佳位置
④遍历RDD的依赖,如果有窄依赖,遍历父依赖的partition,对遍历到的每个partition,递归调用getPreferredLocsInternal方法
即从第一个窄依赖的第一个partition开始,然后将每个partition的最佳位置,添加到序列中,最后返回所有partition的最佳位置序列
注意:DAGScheduler计算数据本地性的时候借助了RDD自身的getPreferredLocations中的数据,因为getPreferredLocations中表明了每个Partition的数据本地性,虽然当前Partition可能被persist或者checkpoint,但是persist或者checkpoint默认情况下肯定是和getPreferredLocations中的Partition的数据本地性是一致的,所以这就极大的简化Task数据本地性算法的实现和效率的优化。
Stage划分和Task最佳位置相关推荐
- Spark 的核心 RDD 以及 Stage 划分细节,运行模式总结
精选30+云产品,助力企业轻松上云!>>> 阅读文本大概需要 5 分钟. 以下内容,部分参考网络资料,也有自己的理解, 图片 99% 为自己制作.如有错误,欢迎留言指出,一起交流. ...
- Spark技术内幕:Stage划分及提交源码分析
当触发一个RDD的action后,以count为例,调用关系如下: org.apache.spark.rdd.RDD#count org.apache.spark.SparkContext#runJo ...
- 用实例说明Spark stage划分原理
注意:此文的stage划分有错,stage的划分是以shuffle操作作为边界的,可以参考<spark大数据处理技术>第四章page rank例子! 参考:http://litaotao. ...
- Spark基础学习笔记19:RDD的依赖与Stage划分
文章目录 零.本讲学习目标 一.RDD的依赖 (一)窄依赖 1.map()与filter()算子 2.union()算子 3.join()算子 (二)宽依赖 1.groupBy()算子 2.join( ...
- spark重要参数调优建议:spark.default.parallelism设置每个stage默认的task数量
spark.default.parallelism 参数说明:该参数用于设置每个stage的默认task数量.这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能. 参数调优建议:Spar ...
- android studio电影院选座,8排电影院选座最佳位置
8排电影院选座最佳位置在哪里呢?8排电影院属于小影厅,小影厅银幕宽度在10米以下,座位100以内,座位排数通常拥有8-14排,小影厅整体空间小,选座时要选中间稍靠后一些的位置.由于整体排数少,因此选即 ...
- 基于顺序存储结构的图书信息表的最佳位置图书的查找(C++)
描述 定义一个包含图书信息(书号.书名.价格)的顺序表,读入相应的图书数据来完成图书信息表的创建,然后根据指定的最佳位置的序号,查找该位置上的图书,输出相应图书的信息. 输入 总计n+m+2行.首先输 ...
- 开始位置 环状图_【技术分享】如何找到压铸模具中真空阀的最佳位置?
与砂型和重力铸造相比,传统压铸件的微观结构不尽人意,主要原因是高速金属流在浇口处的喷射,要比金属缓慢喂入砂型或金属模具型腔时更容易接触型腔内的空气.真空压铸工艺的重点是尽量减少这种气液接触,因此,将型 ...
- 6个座位办公室最佳位置_办公室座位最佳位置(讲解)
在办公室选择办公位置也很重要,办公位置的好坏也会直接影响工作效率和进度,从而影响到个人健康以及公司的财运等问题. 所以办公室座位不得不重视,下面大师为大家讲解关于办公室位置最佳位置. 办公室座位最佳位 ...
最新文章
- NEJM | 益生菌LGG治疗肠胃炎无效,Immunity|LGG促进生骨
- Access中复制表
- asp.net mvc连接mysql_ASP.Net MVC连接MySQL和Code First的使用
- 错误: libstdc++.so.6: cannot open shared object file: No such file or directory
- 类型与通用语言运行时
- Python调用ansible API系列(一)获取资产信息
- ES5-拓展 箭头函数的this、this的优先级
- [渝粤教育] 潍坊职业学院 化工安全技术 参考 资料
- mysql sqlyog讲解_详细讲解如何用SQLyog来分析MySQL数据库
- 用友T3 反结账反记账
- 长字符串的算术编码matlab,算术编码及MATLAB实现
- MVVM?瞎搞一波?
- android安卓手机分屏多窗口实现方法
- android热修复技术tinker,Android热修复方案第一弹——Tinker篇
- eBay Inc(EBAY)2020年第三季度收益电话会议记录
- 【英语语法入门】 第29讲 情态动词的否定和疑问
- 论文阅读:《A Wavenet For Speech Denoising》
- 建模方法(十)-灰色预测模型GM(1,1)
- PAT甲级1009 Product of Polynomials (25分)
- matlab主成分分析散点图_基于matlab的主成分分析与因子分析