[Spark调优]--TaskSetManager的有效Locality Levels
原文:https://www.jianshu.com/p/05034a9c8cae
基于spark-1.5.0测试
PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY
几个值。这篇文章将从这几个值入手,从源码角度分析 TaskSetManager 的 Locality Levels
这几个值在图中代表 task 的计算节点和 task 的输入数据的节点位置关系
PROCESS_LOCAL
: 数据在同一个 JVM 中,即同一个 executor 上。这是最佳数据 locality。NODE_LOCAL
: 数据在同一个节点上。比如数据在同一个节点的另一个 executor上;或在 HDFS 上,恰好有 block 在同一个节点上。速度比 PROCESS_LOCAL 稍慢,因为数据需要在不同进程之间传递或从文件中读取NO_PREF
: 数据从哪里访问都一样快,不需要位置优先RACK_LOCAL
: 数据在同一机架的不同节点上。需要通过网络传输数据及文件 IO,比 NODE_LOCAL 慢ANY
: 数据在非同一机架的网络上,速度最慢
我们在上图中看到的其实是结果,即某个 task 计算节点与其输入数据的位置关系,下面将要挖掘Spark 的调度系统如何产生这个结果,这一过程涉及 RDD、DAGScheduler、TaskScheduler,搞懂了这一过程也就基本搞懂了 Spark 的 PreferredLocations(位置优先策略)
RDD 的 PreferredLocations
我们知道,根据输入数据源的不同,RDD 可能具有不同的优先位置,通过 RDD 的以下方法可以返回指定 partition 的最优先位置:
protected def getPreferredLocations(split: Partition): Seq[String]
返回类型为 Seq[String]
,其实对应的是 Seq[TaskLocation]
,在返回前都会执行 TaskLocation#toString
方法。TaskLocation 是一个 trait,共有以三种实现,分别代表数据存储在不同的位置:
- ExecutorCacheTaskLocation: 代表 partition 数据已经被 cache 到内存,比如 KafkaRDD 会将 partitions 都 cache 到内存,其 toString 方法返回的格式如
executor_$host_$executorId
- HostTaskLocation:代表 partition 数据存储在某个节点的磁盘上(且不在 hdfs 上),其 toString 方法直接返回 host
- HDFSCacheTaskLocation:代表 partition 数据存储在 hdfs 上,比如从 hdfs 上加载而来的 HadoopRDD 的 partition,其 toString 方法返回的格式如
hdfs_cache_$host
这样,我们就知道不同的 RDD 会有不同的优先位置,并且存储在不同位置的优先位置的字符串的格式是不同的,这在之后 TaskSetManager 计算 tasks 的最优本地性起了关键作用。
DAGScheduler 生成 taskSet
DAGScheduler 通过调用 submitStage 来提交一个 stage 对应的 tasks,submitStage 会调用submitMissingTasks,submitMissingTasks 会以下代码来确定每个需要计算的 task 的preferredLocations,这里调用到了 RDD#getPreferredLocs,getPreferredLocs返回的 partition 的优先位置,就是这个 partition 对应的 task 的优先位置
这段调用返回的 taskIdToLocations: Seq[ taskId -> Seq[hosts] ]
会在submitMissingTasks生成要提交给 TaskScheduler 调度的 taskSet: Seq[Task[_]]时用到,如下,注意看注释:
简而言之,在 DAGScheduler 为 stage 创建要提交给 TaskScheduler 调度执行的 taskSet 时,对于 taskSet 中的每一个 task,其优先位置与其对应的 partition 对应的优先位置一致
构造 TaskSetManager,确定 locality levels
在 DAGScheduler 向 TaskScheduler 提交了 taskSet 之后,TaskSchedulerImpl 会为每个 taskSet 创建一个 TaskSetManager 对象,该对象包含taskSet 所有 tasks,并管理这些 tasks 的执行,其中就包括计算 taskSetManager 中的 tasks 都有哪些locality levels,以便在调度和延迟调度 tasks 时发挥作用。
在构造 TaskSetManager 对象时,会调用var myLocalityLevels = computeValidLocalityLevels()
来确定locality levels
这个函数是在解决4个问题:
- taskSetManager 的 locality levels是否包含
PROCESS_LOCAL
- taskSetManager 的 locality levels是否包含
NODE_LOCAL
- taskSetManager 的 locality levels是否包含
NO_PREF
- taskSetManager 的 locality levels是否包含
RACK_LOCAL
让我们来各个击破
taskSetManager 的 locality levels是否包含 PROCESS_LOCAL
关键代码:
真正关键的其实是这段代码,其他两个判断都很简单
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))
要搞懂这段代码,首先要搞明白下面两个问题
- pendingTasksForExecutor是怎么来的,什么含义?
- sched.isExecutorAlive(_)干了什么?
pendingTasksForExecutor是怎么来的,什么含义?
pendingTasksForExecutor 在 TaskSetManager 构造函数中被创建,如下 private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
其中,key 为executoroId,value 为task index 数组。在 TaskSetManager 的构造函数中如下调用
for (i <- (0 until numTasks).reverse) {addPendingTask(i)}
这段调用为 taskSetManager 中的优先位置类型为 ExecutorCacheTaskLocation
(这里通过 toString 返回的格式进行匹配) 的 tasks 调用 addPendingTask,addPendingTask 获取 task 的优先位置,即一个 Seq[String]
;再获得这组优先位置对应的 executors,从来反过来获得了 executor 对应 partition 缓存在其上内存的 tasks,即pendingTasksForExecutor
简单的说,pendingTasksForExecutor保存着当前可用的 executor 对应的 partition 缓存在在其上内存中的 tasks 的映射关系
sched.isExecutorAlive(_)干了什么?
sched.isExecutorAlive的实现为:
def TaskSchedulerImpl#isExecutorAlive(execId: String): Boolean = synchronized {activeExecutorIds.contains(execId)}
activeExecutorIds: HashSet[String]
保存集群当前所有可用的 executor id(这里对 executor 的 free cores 个数并没有要求,可为0),每当 DAGScheduler 提交 taskSet 会触发 TaskScheduler 调用 resourceOffers 方法,该方法会更新当前可用的 executors 至 activeExecutorIds;当有 executor lost 的时候,TaskSchedulerImpl 也会调用 removeExecutor 来将 lost 的executor 从 activeExecutorIds 中去除
所有isExecutorAlive就是判断参数中的 executor id 当前是否 active
结合以上两段代码的分析,可以知道这行代码pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))
的含义: taskSetManager 的所有对应 partition 数据缓存在 executor 内存中的 tasks 对应的所有 executor,是否有任一 active,若有则返回 true;否则返回 false
这样,也就知道了如何去判断一个 taskSetManager 对象的 locality levels 是否包含 PROCESS_LOCAL
taskSetManager 的 locality levels是否包含 NODE_LOCAL
有了上面对 PROCESS_LOCAL 的详细分析,这里对是否包含 NODE_LOCAL 只做简要分析。最关键代码
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))
,其中
- pendingTasksForHost:
HashMap[String, ArrayBuffer[Int]]
类型,key 为 host,value 为 preferredLocations 包含该 host 的 tasks indexs 数组 - sched.hasExecutorsAliveOnHost(_): 源码如下,其中executorsByHost为
HashMap[String, HashSet[String]]
类型,key 为 host,value 为该 host 上的 active executors
def hasExecutorsAliveOnHost(host: String): Boolean = synchronized {executorsByHost.contains(host)}
这样,也就知道如何判断 taskSetManager 的 locality levels:taskSetManager 的所有 tasks 对应的所有 hosts,是否有任一是 tasks 的优先位置 hosts,若有返回 true;否则返回 false
taskSetManager 的 locality levels是否包含 RACK_LOCAL
关键代码:pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))
,其中
- pendingTasksForRack:
HashMap[String, ArrayBuffer[Int]]
类型,key为 rack,value 为优先位置所在的 host 属于该机架的 tasks - sched.hasHostAliveOnRack(_):源码如下,其中
hostsByRack: HashMap[String, HashSet[String]]
的 key 为 rack,value 为该 rack 上所有作为 taskSetManager 优先位置的 hosts
def hasHostAliveOnRack(rack: String): Boolean = synchronized {hostsByRack.contains(rack)}
RACK_LOCAL
的规则为:taskSetManager 的所有 tasks 的优先位置 host 所在的所有 racks 与当前 active executors 所在的机架是否有交集,若有则返回 true,否则返回 false
taskSetManager 的 locality levels是否包含 NO_PREF
关键代码如下:
if (!pendingTasksWithNoPrefs.isEmpty) {levels += NO_PREF}
如果一个 RDD 的某些 partitions 没有优先位置(如是以内存集合作为数据源且 executors 和 driver不在同一个节点),那么这个 RDD action 产生的 taskSetManagers 的 locality levels 就包含 NO_PREF
对于所有的 taskSetManager 均包含 ANY
作者:牛肉圆粉不加葱 链接:https://www.jianshu.com/p/05034a9c8cae 來源:简书 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
参考:
https://spark.apache.org/docs/1.6.1/tuning.html#garbage-collection-tuning
http://www.jianshu.com/p/05034a9c8cae
http://blog.csdn.net/pelick/article/details/41866845
http://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.rdd.RDD
[Spark调优]--TaskSetManager的有效Locality Levels相关推荐
- rdd数据存内存 数据量_大数据开发-Spark调优常用手段
Spark调优 spark调优常见手段,在生产中常常会遇到各种各样的问题,有事前原因,有事中原因,也有不规范原因,spark调优总结下来可以从下面几个点来调优. 1. 分配更多的资源 分配更多的资源: ...
- Spark学习之Spark调优与调试(7)
Spark学习之Spark调优与调试(7) 1. 对Spark进行调优与调试通常需要修改Spark应用运行时配置的选项. 当创建一个SparkContext时就会创建一个SparkConf实例. 2. ...
- 跟我一起学【Spark】之——Spark调优与调试
第8章 Spark调优与调试 1.总结Spark的配置机制 2.理解Spark应用性能表现的基础知识.设置相关配置项.编写高性能应用设计模式 3.探讨Spark的用户界面.执行的组成部分.日志机制 8 ...
- 【Spark调优】大表join大表,少数key导致数据倾斜解决方案
[Spark调优]大表join大表,少数key导致数据倾斜解决方案 参考文章: (1)[Spark调优]大表join大表,少数key导致数据倾斜解决方案 (2)https://www.cnblogs. ...
- 【Spark调优】小表join大表数据倾斜解决方案
[Spark调优]小表join大表数据倾斜解决方案 参考文章: (1)[Spark调优]小表join大表数据倾斜解决方案 (2)https://www.cnblogs.com/wwcom123/p/1 ...
- spark调优(一)-开发调优,数据倾斜,shuffle调优
主要分为开发调优.资源调优.数据倾斜调优.shuffle调优几个部分. 开发调优和资源调优是所有Spark作业都需要注意和遵循的一些基本原则,是高性能Spark作业的基础:数据倾斜调优,主要讲解了一套 ...
- 【Spark篇】---Spark调优之代码调优,数据本地化调优,内存调优,SparkShuffle调优,Executor的堆外内存调优...
一.前述 Spark中调优大致分为以下几种 ,代码调优,数据本地化,内存调优,SparkShuffle调优,调节Executor的堆外内存. 二.具体 1.代码调优 1.避免创建重复的RDD,尽 ...
- Spark 调优技巧总结
Spark 是大数据处理必备技术之一,在开发工作中必然会面对性能调优和各种问题故障的处理,那么面试官也最爱在这些方面进行机关枪式的提问,本 Chat 就针对当前实际开发工作中常遇到的热门和冷门问题进行 ...
- spark调优面试专题
1.1.介绍一下join操作优化经验? 需要尚硅谷 八斗学院 奈学教育完整大数据资料和多家机构面试题的加威: Y17744650906 资料来之不易,不能接受小额有偿的勿扰,谢谢 答:join其实常见 ...
- spark 写本地文件_(纯干货建议收藏)一次GC引发的Spark调优大全
上一篇Tungsten On Spark-内存模型设计总结了Spark内存设计相关的知识点,本篇会快速为读者复习一下JVM相关的知识点,然后基于线上的GC调优对spark整体的调优做一个汇总,希望能让 ...
最新文章
- HTML, CSS, Javascript, jQuery之间的关系
- 【问链财经-区块链基础知识系列】 第二十六课 隐私保护方法:多方安全计算和区块链
- 【Linux系统编程】进程间通信概述
- 为DEV C++/CodeBlock配置gdb调试遇到的问题
- 【iOS】利用CocoaPods创建私有库进行组件化开发
- Greenplum元数据信息
- python requests库 response_Pytest 如何模拟 requests库中的Response对象?
- 移动**21*设置无法接通_手机通话质量不好?你可能只差一步设置!
- 为什么使用php工厂模式,PHP 工厂模式使用方法
- 安装rvm的时候提示curl证书的问题..
- 浏览器页面渲染的过程
- 如何用getevent查看C-TouchPanel上报数据?
- 如何转换图片格式?建议收藏这两个方法
- (瑞萨,norti系统,partner-jet2) 实时系统下DMAC使用的注意事项
- 巨头集体跨界,老玩家悄然出圈,谁在争夺6亿电竞用户?
- 力扣 167. 两数之和 II - 输入有序数组
- 免费PDF转JPG的开源软件
- 云呼HTML自适应官网源码
- python正则检验_Python 检测生僻字
- Pygame详解(八):locals 模块