Spark性能调优 之 合理规划资源配置
文章目录
- 1. 合理规划资源配置
- 1.1 简介
- 1.1.1 资源配置调优原理
- 1.2 资源配置说明
- 1.2.1 资源配置种类
- 1.2.2 资源配置方式
- 1.2.3 调优原则
- 2. 调节并行度
- 2.1 简介
- 2.1.1 并行度简介
- 2.1.2 并行度调优原理
- 2.2 并行度调节说明
- 2.2.1 并行度调节方式
- 2.2.2 并行度调节原则
- 3. 重构RDD架构与RDD持久化
- 3.1 说明
- 3.2 重构与RDD持久化说明
- 3.2.1 重构说明
- 3.2.2 持久化说明
- 3.2.3 持久化级别
- 3.2.3.1 级别
- 3.2.3.2 持久化级别选择策略
- 3.2.4 持久化设置
- 4. 广播大变量
- 4.1 说明
- 4.2 广播大变量说明
- 4.2.1 大变量说明
- 4.2.2 广播变量说明
- 4.2.3 广播变量设置方式
1. 合理规划资源配置
1.1 简介
性能调优最最基础的一步,就是增加和分配更多的资源:
基本上,在一定范围之内,增加资源与性能的提升是成正比的。因此,增加和分配更多的资源,在性能和速度上的提升,是显而易见的。因此,在我们编写完成Spark作业之后,首先第一步,便是要调节最优资源配置。在给予程序所能获取到的最大资源之后,才考虑对程序进行其他方面的调优。
1.1.1 资源配置调优原理
在这里,我们有一个问题,为什么资源配置对于性能的提升这么明显呢?
要回答这个问题,便涉及到了应用程序对资源的申请和分配了。首先我们知道,Task从生成到执行并返回结果的流程1如下(如下图及说明):
- DAGScheduler将一个Job划分为多个Stage,又将Stage划分生成TaskSet。
- DAGScheduler将TaskSet交给TaskScheduler去提交。
- TaskScheduler将TaskSet中的Task逐个提交到它对应的Executor上去执行。一个Task提交给一个Executor。
- Executor从线程池取出一个线程执行,并返回执行结果。(可见,只要线程数(即CPU数)充足,多个Task可以提交给同一个Executor的)
由此,我们可以知道,Task是Spark任务的核心,而Task是在Executor上执行的,因此,对资源的配置,主要是为Executor配置资源
1.2 资源配置说明
1.2.1 资源配置种类
在资源配置时,我我们主要配置以下种类的资源:
Task执行并行度
= Executor数量 * 每个Executor的CPU数量
(当然了,每个Executor的CPU数量可能不同)
RDD 与 Task 的关系说明
:
RDD在计算的时候,每个分区都会起一个Task,所以RDD的分区数目决定了总的的Task数目。每个Task执行的结果就是生成了目标RDD的一个Partiton。
类别 | 说明 |
---|---|
Executor数量
|
从上面的公式可知,如果Executor数量比较少,那么,能够并行执行的Task数量就比较少,就意味着,我们的Application的并行执行的能力就很弱。
比如有10个Executor,每个Executor有10个CPU core,那么同时能够并行执行的Task就是100个。100个执行完以后,再换下一批100个task。增加了Executor数量意味着增加了能够并行执行的Task数量。比如原先是100个,现在可能可以并行执行200个,2000个,甚至20000个。那么并行能力就比之前提升了数倍,数十倍。相应的,性能(执行的速度),也能提升数倍~数十倍。 |
Executor CPU
|
同理,增加每个Executor的CPU core,也是增加了执行的并行能力。 |
Executor Memory
|
增加每个Executor的内存量。增加了内存量以后,对性能的提升,有三点:
1、如果需要对RDD进行cache,那么更多的内存,就可以缓存更多的数据,将更少的数据写入磁盘,甚至不写入磁盘。减少了磁盘IO。 |
Driver Memory
|
增加Driver的内存量。增加Driver内存量对性能的提升主要体现在:
1、DAGscheduler在Stage划分过程中,产生大量的Task对象,会占用大量的内存,如果内存不足也会导致频繁GC。SparkContext本身维护了大量的对象,也占用很多内存。 2、Driver在待计算数据分发和计算数据接收过程中,一方面需要保存这些数据,另外一方面在发送和接收时需要进行序列化和反序列化,同样是非常耗费内存。 |
1.2.2 资源配置方式
在我们在生产环境中,提交spark作业时,用的spark-submit shell脚本,里面调整对应的参数:
/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \ //配置executor的数量
--driver-memory 100m \ //配置driver的内存(影响不大)
--executor-memory 100m \ //配置每个executor的内存大小
--executor-cores 3 \ //配置每个executor的cpu core数量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
1.2.3 调优原则
调优原则
:为任务分配,可以获取到并使用的最大资源
Spark Standalone模式
:了解集群中可使用的资源,依据实际情况,去计算这些参数应该设置的数值。Spark Yarn模式
:Yarn的是资源队列进行资源的分配和调度,因此,在写编写submit脚本的时候,就根据Spark作业要提交到的资源队列,进行资源分配。(比如资源队列有400G内存,100个CPU core,那么指定50个Executor,每个Executor分配8G内存,2个CPU core。)
2. 调节并行度
2.1 简介
在配置资源之后,便是针对这些资源来调节并行度,以充分的利用资源,从而达到最佳性能。
2.1.1 并行度简介
并行度
:指Spark作业中,各个Stage的Task的数量,也就代表了Spark作业在各个阶段的并行度。
如上图,Stage0的并行度为12,Stage1的并行度为4。而这影响到的便是:
- Executor数量的设定
- Executor中工作CPU数量的设定
- 中间文件(Local / Cache)的存储
2.1.2 并行度调优原理
问题:如果不调节并行度,导致并行度过低,会怎么样?
假设,现在已经在spark-submit脚本里面,给我们的spark作业分配了足够多的资源,比如50个executor,每个executor有10G内存,每个executor有3个cpu core。(基本已经达到了集群或者yarn队列的资源上限。)
如果并行度没有设置,或者设置的很小(比如只设置并行度为100,即同时执行100个task),就意味着,只有100个task
能同时执行,每个executor
平均分配到2个task
(即每个executor只会并行运行2个task)。而对于现有资源来说(50个executor,每个executor有3个cpu core),当前的Application任何一个stage运行的时候,都有总数在150个cpu core,可以并行运行。即原本每个Executor可以同时执行3个Task,但实际上,每个executor
只平均分配到2个task
,每个executor剩下的一个cpu core,就浪费掉了。
你的资源虽然分配足够了,但是问题是,并行度没有与资源相匹配,导致你分配下去的资源都浪费掉了。
合理的并行度的设置,应该是要设置的足够大,大到可以完全合理的利用你的集群资源;比如上面的例子,总共集群有150个cpu core可用,那么就应该将你的Application的并行度,至少设置成150,才能完全有效的利用你的集群资源,让150个task,并行执行;
而且提高并行度以后,既可以同时并行运行,还可以让每个task要处理的数据量变少;比如总共150G的数据要处理,如果是100个task,每个task计算1.5G的数据;现在增加到150个task,可以并行运行,而且每个task主要处理1G的数据就可以。
2.2 并行度调节说明
2.2.1 并行度调节方式
调节参数:spark.default.parallelism
SparkConf conf = new SparkConf().set("spark.default.parallelism", "500")
2.2.2 并行度调节原则
task数量,至少设置成
与可用cpu core数量相同
(最理想情况,比如总共150个cpu core,分配了150个task,一起运行,差不多同一时间运行完毕)官方推荐,task数量,设置成
可用cpu core数量的2 ~ 3倍
(比如150个cpu core,基本要设置task数量为300~500)。
关于第 2 点说明:实际情况,与理想情况不同的,有些task会运行的快一点,有些task可能会慢一点,所以如果你的task数量,刚好设置的跟cpu core数量相同,可能还是会导致资源的浪费。
比如150个task,10个先运行完了,剩余140个还在运行,但是这个时候,有10个cpu core就空闲出来了,就导致了浪费。
那如果task数量设置成cpu core总数的2~3倍,那么一个task运行完了以后,另一个task马上可以补上来,就尽量让cpu core不要空闲,同时也是尽量提升spark作业运行的效率和速度,提升性能。
3. 重构RDD架构与RDD持久化
3.1 说明
在编写程序过程中:
- 一方面由于种种原因,导致了原本可以复用的RDD,被重复计算并赋予了不同的名字,导致重复计算;
- 另外一方面,一些需要多次重复使用的RDD,没有被持久化,导致后面再次用到此RDD时,又要重新从头开始计算,影响性能。如下图:
因此,重构RDD架构与RDD持久化可以减少不必要的重复计算,从而提高性能。
3.2 重构与RDD持久化说明
3.2.1 重构说明
一 、RDD架构重构与优化
:尽量去复用RDD,差不多的RDD,可以抽取称为一个共同的RDD,供后面的RDD计算时,反复使用。
3.2.2 持久化说明
关于RDD持久化原理,请参考一下文章:
- Spark内存管理之存储内存管理
- Spark存储管理之Storage模块解析
二、持久化公共RDD(务必)
:对于要多次计算和使用的公共RDD,一定要进行持久化。将RDD的数据缓存到内存中/磁盘中(使用BlockManager进行数据存取操作),以后无论对这个RDD做多少次计算,那么都是直接取这个RDD的持久化的数据,比如从内存中或者磁盘中,直接提取一份数据。
三、以序列化方式进行持久化
:如果正常将数据持久化在内存中,那么可能会导致内存的占用过大,这样的话,也许,会导致OOM(Out Of Memory,内存溢出)。当纯内存无法支撑公共RDD数据完全存放的时候,就优先考虑,使用序列化的方式在纯内存中存储。将RDD的每个partition的数据,序列化成一个大的字节数组,就一个对象;序列化后,大大减少内存的空间占用。序列化的方式,唯一的缺点就是,在获取数据的时候,需要反序列化。</font
如果序列化纯内存方式,还是导致OOM,内存溢出;就只能考虑磁盘的方式,内存+磁盘的普通方式(无序列化)。
内存+磁盘,序列化
四、采取双副本机制持久化
:为了数据的高可靠性,而且内存充足,可以使用双副本机制,进行持久化
持久化的双副本机制,持久化后的一个副本,因为机器宕机了,副本丢了,就还是得重新计算一次;持久化的每个数据单元,存储一份副本,放在其他节点上面;从而进行容错;一个副本丢了,不用重新计算,还可以使用另外一份副本。
这种方式,仅仅针对你的内存资源极度充足
3.2.3 持久化级别
RDD持久化用于RDD重用和节省重新计算,方便构建迭代算法,缓存粒度为整个RDD,下面看看实际提供的持久化级别有哪些,并分析级别的选择策略。
3.2.3.1 级别
StorageLevel | 说明 |
---|---|
MEMORY_ONLY
|
使用未序列化 的Java对象格式,将数据保存在内存 中。如果内存不够存放所有的数据,则数据可能就不会进行持久化,默认的持久化策略
|
MEMORY_ONLY_SER
|
RDD的每个partition会被序列化 成一个字节数组 ,节省空间,但读取时更占CPU
|
MEMORY_AND_DISK
|
使用未序列化 的Java对象格式,优先 尝试将数据保存在内存 中。如果内存不够存放所有的数据,会将数据写入 磁盘文件 中。不会立刻输出到磁盘
|
MEMORY_AND_DISK_SER
|
序列化存储 ,超出部分 写入磁盘文件 中
|
DISK_ONLY
|
使用未序列化 的Java对象格式,将数据全部 写入磁盘文件 中
|
MEMORY_ONLY_2
|
对于上述任意一种持久化策略,如果加上后缀_2 ,代表的是将每个持久化的数据,都复制一份副本 ,并将副本保存到其他节点 上
|
OFF_HEAP
|
RDD序列化 存储在分布式存储系统 Tachyon中
|
3.2.3.2 持久化级别选择策略
选择级别 | StorageLevel | 说明 |
---|---|---|
一 |
StorageLevel.MEMORY_ONLY
|
纯内存。 如果是persist(StorageLevel.MEMORY_ONLY()),纯内存,无序列化,那么就可以用 cache() 方法来替代
|
二 |
StorageLevel.MEMORY_ONLY_SER()
|
纯内存+序列化 如果不能与MEMORY_ONLY很好的契合,建议使用MEMORY_ONLY_SER |
三 |
StorageLevel.MEMORY_AND_DISK()
|
内存+磁盘 尽可能不要存储数据到磁盘上,除非数据集函数计算量特别大,或者它过滤了大量数据,否则从新计算一个分区的速度和从磁盘中读取差不多 |
四 |
StorageLevel.MEMORY_AND_DISK_SER()
|
内存+磁盘+序列化 首先,从磁盘读取效率不高,其次,对于序列化和反序列化,其消耗的CPU资源是不容忽视的。 |
五 |
StorageLevel.DISK_ONLY()
|
纯磁盘 很少使用,适用于那些计算特别消耗CPU,并且本身非常庞大的,长期驻留内存会导致可用内存不足。 |
如果想拥有快速故障恢复能力,可以使用复制存储级别(_2)
可以自定义存储级别(如复制因子为3),使用
StorageLevel
单例对象apply()
方法第一选择:
StorageLevel.MEMORY_ONLY
,优先使用默认持久化级别MEMORY_ONLY
。如果是persist(StorageLevel.MEMORY_ONLY()),纯内存,无序列化,那么就可以用cache()
方法来替代第二选择:
StorageLevel.MEMORY_ONLY_SER()
,如果不能与MEMORY_ONLY
很好的契合,建议使用MEMORY_ONLY_SER
第三选择:
StorageLevel.MEMORY_AND_DISK()
,尽可能不要存储数据到磁盘上,除非数据集函数计算量特别大,或者它过滤了大量数据,否则从新计算一个分区的速度和从磁盘中读取差不多第四选择:
StorageLevel.MEMORY_AND_DISK_SER()
如果想拥有快速故障恢复能力,可以使用复制存储级别(_2)
可以自定义存储级别(如复制因子为3),使用
StorageLevel
单例对象apply()
方法
如果是persist(StorageLevel.MEMORY_ONLY()),纯内存,无序列化,那么就可以用cache()方法来替代
* StorageLevel.MEMORY_ONLY_SER(),第二选择 StorageLevel.MEMORY_AND_DISK(),第三选择
* StorageLevel.MEMORY_AND_DISK_SER(),第四选择 StorageLevel.DISK_ONLY(),第五选择
*
* 如果内存充足,要使用双副本高可靠机制 选择后缀带_2的策略 StorageLevel.MEMORY_ONLY_2()
3.2.4 持久化设置
持久化,很简单,就是对RDD调用persist()方法,并传入一个持久化级别
JavaPairRDD<String, Row> RDD = getRDD(actionRDD);
RDD = RDD.persist(StorageLevel.MEMORY_ONLY());
4. 广播大变量
4.1 说明
在Task提交到Executor执行的过程中,可能出现一些大变量。首先,这些大变量如果被Task当中包含的语句使用到,那么,在DAGScheduler划分stage生成Task时,默认情况下会将这些大变量一同封装在Task中,这就会导致以下问题:
默认情况下,变量在Driver与Executor之间的传递过程如下:
虽然一个Task中的的大变量可能只有1~100M,但是如果放眼整个集群,如果Task数量有1000个,那么即使Task只有1M,那么也就是说网络传输开销和内存消耗至少就是近1G;而如果是100M,那么就是100G了,这在有限的资源状况下,是难以接收的。
因此,Spark提供了一种广播变量的方式,即Broadcast,将大变量广播出去,而不是直接使用。
4.2 广播大变量说明
4.2.1 大变量说明
- 在Java变量中,对于基本的数据类型这类的,往往占用的内存空间很小,因此没有必要广播。
- 然而,对于一些大的对象/变量,诸如容器类(Map、List、Set)这种,底层基于复杂数据结构的对象,会保存数据量的增加而变大,以及一些自定义的对象,非常有可能占用非常多的内存,在创建Task时,如果也一同写入Task中,毫无疑问是使得Task变得非常大。
4.2.2 广播变量说明
- 广播变量,在driver上会有一份初始的副本
- Task在运行的时候,想要使用广播变量中的数据:
2.1 首先会在自己本地的Executor对应的BlockManager中,尝试获取变量副本;
2.2 如果本地没有,那么就从Driver远程/其他节点的BlockManager
拉取变量副本,并保存在本地的BlockManager中;
2.3 Executor的BlockManager除了从driver上拉取,也可能从其他节点的BlockManager上拉取变量副本,距离越近越好。
(此后这个Executor上的Task,都会直接使用本地的BlockManager中的副本。)
广播变量的好处,不是每个Task一份变量副本,而是变成每个节点的qExecutor才一份副本。这样的话,就可以让变量产生的副本大大减少。
设置广播变量之后,大变量从Driver到Executor的传递及使用过程如下:
举例来说,假设有50个executor,1000个task。一个map,10M。
- 默认情况下,1000个task,1000份副本。10G的数据,网络传输,在集群中,耗费10G的内存资源。
- 如果使用了广播变量。50个execurtor,50个副本。500M的数据,网络传输,而且不一定都是从Driver传输到每个节点,还可能是就近从最近的节点的executor的bockmanager上拉取变量副本,网络传输速度大大增加;500M的内存消耗。
可见,对性能的提升和影响,还是很客观的。(10000M,500M,20倍。20倍~以上的网络传输性能消耗的降低;20倍的内存消耗的减少)虽然说,不一定会对性能产生决定性的作用。比如运行30分钟的spark作业,可能做了广播变量以后,速度快了2分钟,或者5分钟。但是一点一滴的调优,积少成多。最后还是会有效果的。没有经过任何调优手段的spark作业,16个小时;三板斧下来,就可以到5个小时;然后非常重要的一个调优,影响特别大,shuffle调优,2~3个小时;应用了10个以上的性能调优的技术点,JVM+广播,30分钟。16小时~30分钟。
4.2.3 广播变量设置方式
广播变量
:很简单,其实就是调用SparkContext的broadcast()方法,传入你要广播的变量,即可。 这个broadcast()会返回一个Broadcast<T> 类型的变量,这个泛型<T>就是你要广播的变量本身的类型,指定此广播变量实际存储的类型。
使用广播变量
:直接调用广播变量的value() / getValue()可以获取到之前封装的变量
import org.apache.spark.broadcast.Broadcast;// 1. 定义将要广播的变量
Map<String, IntList> hourMap = new HashMap<String, IntList>();
// 2. 使用SparkContext的broadcast()将此变量广播出去,并使用Brocast<T>类型的变量保存返回结果。
// 这个泛型<T>就是你要广播的变量本身的类型,指定此广播变量实际存储的类型。
// 由于后续需要在匿名内部类访问此外部对象,因此这个hourMapBroadcast需要使用final修饰符。
final Broadcast<Map<String, IntList>> hourMapBroadcast = sc.broadcast(hourMap);
// 3. 使用广播变量的value() / getValue()来获取之前封装的广播变量
Map<String, IntList> HourMap = hourMapBroadcast.value();
再看一个例子:List<Tuple2<Long, Row>> userInfos = userid2InfoRDD.collect();
final Broadcast<List<Tuple2<Long, Row>>> userInfosBroadcast = sc.broadcast(userInfos);
// 得到用户信息map
List<Tuple2<Long, Row>> userInfos = userInfosBroadcast.value();
(源码、原理请参考系列文章):
Spark 任务调度之Driver send Task
Spark DAG之SubmitJob
Spark DAG之SubmitTask
Spark DAG之SubmitStage
Spark 任务调度之Executor执行task并返回结果 ↩︎
Spark性能调优 之 合理规划资源配置相关推荐
- Spark性能调优之资源分配
**性能优化王道就是给更多资源!**机器更多了,CPU更多了,内存更多了,性能和速度上的提升,是显而易见的.基本上,在一定范围之内,增加资源与性能的提升,是成正比的:写完了一个复杂的spark作业之后 ...
- Spark性能调优-RDD算子调优篇
Spark性能调优-RDD算子调优篇 RDD算子调优 1. RDD复用 在对RDD进行算子时,要避免相同的算子和计算逻辑之下对RDD进行重复的计算,如下图所示: 对上图中的RDD计算架构进行修改,得到 ...
- spark学习-Spark性能调优(1)
本文要解决的问题: Spark在使用过程中不可避免的需要进行一系列的性能优化,本文就Spark性能调优的基础部分进行总结和归纳(开发调优和资源调优),参考了不少前辈的文章,在此非常感谢. 目的 在大数 ...
- 大数据培训:Spark性能调优与参数配置
Spark性能调优-基础篇 众所周知,正确的参数配置对提升Spark的使用效率具有极大助力,帮助相关数据开发.分析人员更高效地使用Spark进行离线批处理和SQL报表分析等作业. 推荐参数配置模板如下 ...
- Spark商业案例与性能调优实战100课》第20课:大数据性能调优的本质和Spark性能调优要点分析
Spark商业案例与性能调优实战100课>第20课:大数据性能调优的本质和Spark性能调优要点分析 基于本元想办法,大智若愚,大巧若拙!深入彻底的学习spark技术内核!
- Spark性能调优总结
文章授权自 : http://www.6aiq.com/article/1547041120082 使用正确的 transformations操作 虽然开发者达到某一目标,可以通过不同的transfo ...
- Sparkamp;Spark性能调优实战
Spark特别适用于多次操作特定的数据,分mem-only和mem & disk.当中mem-only:效率高,但占用大量的内存,成本非常高;mem & disk:内存用完后,会自己主 ...
- 大数据培训:Spark 性能调优详解
调优概述 有的时候,我们可能会遇到大数据计算中一个最棘手的问题--数据倾斜,此时Spark作业的性能会比期望差很多.数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的 ...
- Kafka 性能调优实战:同等资源配置性能提升 20 几倍的秘诀
作者 | 丁威 责编 | 欧阳姝黎 抛出问题 笔者最近在折腾数据异构体系,在实现MySQL增量数据同步到MQ(Kafka.RocketMQ),本文的故事就从这里开始. 众所周知,为了提高写 ...
最新文章
- leetcode-142 环形链表II
- Gradle Android客户端程序打包(基于gradle 2.10版本验证通过)
- 搬了十次家,总算搬进了自己的家
- 图灵机(Turing Machine)
- [转载] Python杂谈 | (6) numpy中array()和asarray()的区别
- webrtc之SVC实现(十)
- python3使用pickle读取文件提示TypeError或者UnicodeDecodeError的解决办法
- 【源代码】Image Deformation Using Moving Least Squares算法的实现
- iOS 信号量解决-网络异步请求的数据同步返回问题
- html颜色奶白色,象牙白rgb值是多少 和乳白哪个更白
- 各类识别、深度学习-开源代码文献梳理
- c语言是世界上最好的语言搞笑图片,C++是世界上最好的语言!不服来辩! | 爆笑囧图...
- 【dubbo系列001】dubbo是什么?dubbo解决什么问题?
- 简单计算机基础知识,计算机基础知识讲义
- 【水滴石穿】github_popular
- wordpress优化之实现百度站内收索
- 三维空间中的平面方程
- div控制显示与隐藏状态的两种方式
- e-office10.0用户连接SqlServer数据库配置说明
- 防抖云台-鸡头稳定 简介篇