文章目录

  • 1. 合理规划资源配置
    • 1.1 简介
      • 1.1.1 资源配置调优原理
    • 1.2 资源配置说明
      • 1.2.1 资源配置种类
      • 1.2.2 资源配置方式
      • 1.2.3 调优原则
  • 2. 调节并行度
    • 2.1 简介
      • 2.1.1 并行度简介
      • 2.1.2 并行度调优原理
    • 2.2 并行度调节说明
      • 2.2.1 并行度调节方式
      • 2.2.2 并行度调节原则
  • 3. 重构RDD架构与RDD持久化
    • 3.1 说明
    • 3.2 重构与RDD持久化说明
      • 3.2.1 重构说明
      • 3.2.2 持久化说明
      • 3.2.3 持久化级别
        • 3.2.3.1 级别
        • 3.2.3.2 持久化级别选择策略
      • 3.2.4 持久化设置
  • 4. 广播大变量
    • 4.1 说明
    • 4.2 广播大变量说明
      • 4.2.1 大变量说明
      • 4.2.2 广播变量说明
      • 4.2.3 广播变量设置方式

1. 合理规划资源配置


1.1 简介

性能调优最最基础的一步,就是增加和分配更多的资源
  基本上,在一定范围之内,增加资源与性能的提升是成正比的。因此,增加和分配更多的资源,在性能和速度上的提升,是显而易见的。因此,在我们编写完成Spark作业之后,首先第一步,便是要调节最优资源配置在给予程序所能获取到的最大资源之后,才考虑对程序进行其他方面的调优

1.1.1 资源配置调优原理

在这里,我们有一个问题,为什么资源配置对于性能的提升这么明显呢

  要回答这个问题,便涉及到了应用程序对资源的申请和分配了。首先我们知道,Task从生成到执行并返回结果的流程1如下(如下图及说明):

  1. DAGScheduler将一个Job划分为多个Stage,又将Stage划分生成TaskSet。
  2. DAGScheduler将TaskSet交给TaskScheduler去提交。
  3. TaskScheduler将TaskSet中的Task逐个提交到它对应的Executor上去执行。一个Task提交给一个Executor。
  4. Executor从线程池取出一个线程执行,并返回执行结果。(可见,只要线程数(即CPU数)充足,多个Task可以提交给同一个Executor的)

由此,我们可以知道,Task是Spark任务的核心,而Task是在Executor上执行的,因此,对资源的配置,主要是为Executor配置资源


1.2 资源配置说明

1.2.1 资源配置种类

在资源配置时,我我们主要配置以下种类的资源:

Task执行并行度 = Executor数量 * 每个Executor的CPU数量
(当然了,每个Executor的CPU数量可能不同)

RDD 与 Task 的关系说明
  RDD在计算的时候,每个分区都会起一个Task,所以RDD的分区数目决定了总的的Task数目。每个Task执行的结果就是生成了目标RDD的一个Partiton。

类别 说明
Executor数量 从上面的公式可知,如果Executor数量比较少,那么,能够并行执行的Task数量就比较少,就意味着,我们的Application的并行执行的能力就很弱。

  比如有10个Executor,每个Executor有10个CPU core,那么同时能够并行执行的Task就是100个。100个执行完以后,再换下一批100个task。增加了Executor数量意味着增加了能够并行执行的Task数量。比如原先是100个,现在可能可以并行执行200个,2000个,甚至20000个。那么并行能力就比之前提升了数倍,数十倍。相应的,性能(执行的速度),也能提升数倍~数十倍

Executor CPU 同理,增加每个Executor的CPU core,也是增加了执行的并行能力。
Executor Memory 增加每个Executor的内存量。增加了内存量以后,对性能的提升,有三点:

  1、如果需要对RDD进行cache,那么更多的内存,就可以缓存更多的数据,将更少的数据写入磁盘,甚至不写入磁盘。减少了磁盘IO
  2、对于shuffle操作,reduce端,会需要内存来存放拉取的数据并进行聚合。如果内存不够,也会写入磁盘。如果给Executor分配更多内存以后,就有更少的数据,需要写入磁盘,甚至不需要写入磁盘。减少了磁盘IO,提升了性能
  3、对于task的执行,可能会创建很多对象如果内存比较小,可能会频繁导致JVM堆内存满了,然后频繁GC,垃圾回收,minor GC和full GC(速度很慢)。内存加大以后,带来更少的GC,垃圾回收,避免了速度变慢,速度变快了。

Driver Memory 增加Driver的内存量。增加Driver内存量对性能的提升主要体现在:

  1、DAGscheduler在Stage划分过程中,产生大量的Task对象,会占用大量的内存,如果内存不足也会导致频繁GC。SparkContext本身维护了大量的对象,也占用很多内存。

  2、Driver在待计算数据分发和计算数据接收过程中,一方面需要保存这些数据,另外一方面在发送和接收时需要进行序列化和反序列化,同样是非常耗费内存。

1.2.2 资源配置方式

在我们在生产环境中,提交spark作业时,用的spark-submit shell脚本,里面调整对应的参数:

/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \                 //配置executor的数量
--driver-memory 100m \              //配置driver的内存(影响不大)
--executor-memory 100m \            //配置每个executor的内存大小
--executor-cores 3 \                //配置每个executor的cpu core数量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
1.2.3 调优原则

调优原则:为任务分配,可以获取到并使用的最大资源

  • Spark Standalone模式:了解集群中可使用的资源,依据实际情况,去计算这些参数应该设置的数值。
  • Spark Yarn模式:Yarn的是资源队列进行资源的分配和调度,因此,在写编写submit脚本的时候,就根据Spark作业要提交到的资源队列,进行资源分配。(比如资源队列有400G内存,100个CPU core,那么指定50个Executor,每个Executor分配8G内存,2个CPU core。)

2. 调节并行度

2.1 简介

在配置资源之后,便是针对这些资源来调节并行度,以充分的利用资源,从而达到最佳性能。

2.1.1 并行度简介

并行度:指Spark作业中,各个Stage的Task的数量,也就代表了Spark作业在各个阶段的并行度。

如上图,Stage0的并行度为12,Stage1的并行度为4。而这影响到的便是:

  1. Executor数量的设定
  2. Executor中工作CPU数量的设定
  3. 中间文件(Local / Cache)的存储
2.1.2 并行度调优原理

问题:如果不调节并行度,导致并行度过低,会怎么样?
  假设,现在已经在spark-submit脚本里面,给我们的spark作业分配了足够多的资源,比如50个executor每个executor有10G内存每个executor有3个cpu core。(基本已经达到了集群或者yarn队列的资源上限。)

如果并行度没有设置,或者设置的很小(比如只设置并行度为100,即同时执行100个task),就意味着,只有100个task能同时执行,每个executor平均分配到2个task(即每个executor只会并行运行2个task)。而对于现有资源来说(50个executor,每个executor有3个cpu core),当前的Application任何一个stage运行的时候,都有总数在150个cpu core,可以并行运行。即原本每个Executor可以同时执行3个Task,但实际上,每个executor只平均分配到2个task,每个executor剩下的一个cpu core,就浪费掉了

你的资源虽然分配足够了,但是问题是,并行度没有与资源相匹配,导致你分配下去的资源都浪费掉了。

合理的并行度的设置,应该是要设置的足够大,大到可以完全合理的利用你的集群资源;比如上面的例子,总共集群有150个cpu core可用,那么就应该将你的Application的并行度,至少设置成150,才能完全有效的利用你的集群资源,让150个task,并行执行;
而且提高并行度以后,既可以同时并行运行,还可以让每个task要处理的数据量变少;比如总共150G的数据要处理,如果是100个task,每个task计算1.5G的数据;现在增加到150个task,可以并行运行,而且每个task主要处理1G的数据就可以。

2.2 并行度调节说明

2.2.1 并行度调节方式

调节参数:spark.default.parallelism

SparkConf conf = new SparkConf().set("spark.default.parallelism", "500")
2.2.2 并行度调节原则
  1. task数量,至少设置成与可用cpu core数量相同(最理想情况,比如总共150个cpu core,分配了150个task,一起运行,差不多同一时间运行完毕)

  2. 官方推荐,task数量,设置成可用cpu core数量的2 ~ 3倍(比如150个cpu core,基本要设置task数量为300~500)。

关于第 2 点说明实际情况,与理想情况不同的,有些task会运行的快一点,有些task可能会慢一点,所以如果你的task数量,刚好设置的跟cpu core数量相同,可能还是会导致资源的浪费
  比如150个task,10个先运行完了,剩余140个还在运行,但是这个时候,有10个cpu core就空闲出来了,就导致了浪费。
  那如果task数量设置成cpu core总数的2~3倍,那么一个task运行完了以后,另一个task马上可以补上来,就尽量让cpu core不要空闲,同时也是尽量提升spark作业运行的效率和速度,提升性能

3. 重构RDD架构与RDD持久化

3.1 说明

在编写程序过程中:

  • 一方面由于种种原因,导致了原本可以复用的RDD,被重复计算并赋予了不同的名字,导致重复计算;
  • 另外一方面,一些需要多次重复使用的RDD,没有被持久化,导致后面再次用到此RDD时,又要重新从头开始计算,影响性能。如下图:

因此,重构RDD架构与RDD持久化可以减少不必要的重复计算,从而提高性能。

3.2 重构与RDD持久化说明

3.2.1 重构说明

一 、RDD架构重构与优化:尽量去复用RDD,差不多的RDD,可以抽取称为一个共同的RDD,供后面的RDD计算时,反复使用。

3.2.2 持久化说明

关于RDD持久化原理,请参考一下文章:

  • Spark内存管理之存储内存管理
  • Spark存储管理之Storage模块解析

二、持久化公共RDD(务必):对于要多次计算和使用的公共RDD,一定要进行持久化。将RDD的数据缓存到内存中/磁盘中(使用BlockManager进行数据存取操作),以后无论对这个RDD做多少次计算,那么都是直接取这个RDD的持久化的数据,比如从内存中或者磁盘中,直接提取一份数据。

三、以序列化方式进行持久化:如果正常将数据持久化在内存中,那么可能会导致内存的占用过大,这样的话,也许,会导致OOM(Out Of Memory,内存溢出)。当纯内存无法支撑公共RDD数据完全存放的时候,就优先考虑,使用序列化的方式在纯内存中存储。将RDD的每个partition的数据,序列化成一个大的字节数组,就一个对象;序列化后,大大减少内存的空间占用。序列化的方式,唯一的缺点就是,在获取数据的时候,需要反序列化。</font

如果序列化纯内存方式,还是导致OOM,内存溢出;就只能考虑磁盘的方式,内存+磁盘的普通方式(无序列化)。

内存+磁盘,序列化

四、采取双副本机制持久化:为了数据的高可靠性,而且内存充足,可以使用双副本机制,进行持久化

持久化的双副本机制,持久化后的一个副本,因为机器宕机了,副本丢了,就还是得重新计算一次;持久化的每个数据单元,存储一份副本,放在其他节点上面;从而进行容错;一个副本丢了,不用重新计算,还可以使用另外一份副本。

这种方式,仅仅针对你的内存资源极度充足

3.2.3 持久化级别

RDD持久化用于RDD重用和节省重新计算,方便构建迭代算法,缓存粒度为整个RDD,下面看看实际提供的持久化级别有哪些,并分析级别的选择策略。

3.2.3.1 级别
StorageLevel 说明
MEMORY_ONLY 使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化,默认的持久化策略
MEMORY_ONLY_SER RDD的每个partition会被序列化成一个字节数组,节省空间,但读取时更占CPU
MEMORY_AND_DISK 使用未序列化的Java对象格式,优先尝试将数据保存在内存中。
如果内存不够存放所有的数据,会将数据写入磁盘文件中。不会立刻输出到磁盘
MEMORY_AND_DISK_SER 序列化存储超出部分写入磁盘文件
DISK_ONLY 使用未序列化的Java对象格式,将数据全部写入磁盘文件
MEMORY_ONLY_2 对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点
OFF_HEAP RDD序列化存储在分布式存储系统Tachyon中
3.2.3.2 持久化级别选择策略
选择级别 StorageLevel 说明
StorageLevel.MEMORY_ONLY 纯内存
  如果是persist(StorageLevel.MEMORY_ONLY()),纯内存,无序列化,那么就可以用cache()方法来替代
StorageLevel.MEMORY_ONLY_SER() 纯内存+序列化
  如果不能与MEMORY_ONLY很好的契合,建议使用MEMORY_ONLY_SER
StorageLevel.MEMORY_AND_DISK() 内存+磁盘
  尽可能不要存储数据到磁盘上,除非数据集函数计算量特别大,或者它过滤了大量数据,否则从新计算一个分区的速度和从磁盘中读取差不多
StorageLevel.MEMORY_AND_DISK_SER() 内存+磁盘+序列化
  首先,从磁盘读取效率不高,其次,对于序列化和反序列化,其消耗的CPU资源是不容忽视的
StorageLevel.DISK_ONLY() 纯磁盘
  很少使用,适用于那些计算特别消耗CPU,并且本身非常庞大的,长期驻留内存会导致可用内存不足。
  • 如果想拥有快速故障恢复能力,可以使用复制存储级别(_2)

  • 可以自定义存储级别(如复制因子为3),使用StorageLevel单例对象apply()方法

  • 第一选择:StorageLevel.MEMORY_ONLY,优先使用默认持久化级别MEMORY_ONLY如果是persist(StorageLevel.MEMORY_ONLY()),纯内存,无序列化,那么就可以用cache()方法来替代

  • 第二选择:StorageLevel.MEMORY_ONLY_SER(),如果不能与MEMORY_ONLY很好的契合,建议使用MEMORY_ONLY_SER

  • 第三选择:StorageLevel.MEMORY_AND_DISK(),尽可能不要存储数据到磁盘上,除非数据集函数计算量特别大,或者它过滤了大量数据,否则从新计算一个分区的速度和从磁盘中读取差不多

  • 第四选择:StorageLevel.MEMORY_AND_DISK_SER()

  • 如果想拥有快速故障恢复能力,可以使用复制存储级别(_2)

  • 可以自定义存储级别(如复制因子为3),使用StorageLevel单例对象apply()方法

如果是persist(StorageLevel.MEMORY_ONLY()),纯内存,无序列化,那么就可以用cache()方法来替代
* StorageLevel.MEMORY_ONLY_SER(),第二选择 StorageLevel.MEMORY_AND_DISK(),第三选择
* StorageLevel.MEMORY_AND_DISK_SER(),第四选择 StorageLevel.DISK_ONLY(),第五选择
*
* 如果内存充足,要使用双副本高可靠机制 选择后缀带_2的策略 StorageLevel.MEMORY_ONLY_2()

3.2.4 持久化设置

持久化,很简单,就是对RDD调用persist()方法,并传入一个持久化级别

JavaPairRDD<String, Row> RDD = getRDD(actionRDD);
RDD = RDD.persist(StorageLevel.MEMORY_ONLY());

4. 广播大变量

4.1 说明

在Task提交到Executor执行的过程中,可能出现一些大变量。首先,这些大变量如果被Task当中包含的语句使用到,那么,在DAGScheduler划分stage生成Task时,默认情况下会将这些大变量一同封装在Task中,这就会导致以下问题:

默认情况下,变量在Driver与Executor之间的传递过程如下:

虽然一个Task中的的大变量可能只有1~100M,但是如果放眼整个集群,如果Task数量有1000个,那么即使Task只有1M,那么也就是说网络传输开销和内存消耗至少就是近1G;而如果是100M,那么就是100G了,这在有限的资源状况下,是难以接收的。

因此,Spark提供了一种广播变量的方式,即Broadcast,将大变量广播出去,而不是直接使用。

4.2 广播大变量说明

4.2.1 大变量说明
  • 在Java变量中,对于基本的数据类型这类的,往往占用的内存空间很小,因此没有必要广播。
  • 然而,对于一些大的对象/变量,诸如容器类(Map、List、Set)这种,底层基于复杂数据结构的对象,会保存数据量的增加而变大,以及一些自定义的对象,非常有可能占用非常多的内存,在创建Task时,如果也一同写入Task中,毫无疑问是使得Task变得非常大。
4.2.2 广播变量说明
  1. 广播变量,在driver上会有一份初始的副本
  2. Task在运行的时候,想要使用广播变量中的数据
    2.1 首先会在自己本地的Executor对应的BlockManager中,尝试获取变量副本
    2.2 如果本地没有,那么就从Driver远程/其他节点的BlockManager拉取变量副本,并保存在本地的BlockManager中
    2.3 Executor的BlockManager除了从driver上拉取,也可能从其他节点的BlockManager上拉取变量副本,距离越近越好
    (此后这个Executor上的Task,都会直接使用本地的BlockManager中的副本。)

广播变量的好处,不是每个Task一份变量副本,而是变成每个节点的qExecutor才一份副本。这样的话,就可以让变量产生的副本大大减少

设置广播变量之后,大变量从Driver到Executor的传递及使用过程如下:

举例来说,假设有50个executor,1000个task。一个map,10M。

  • 默认情况下,1000个task,1000份副本。10G的数据,网络传输,在集群中,耗费10G的内存资源。
  • 如果使用了广播变量。50个execurtor,50个副本。500M的数据,网络传输,而且不一定都是从Driver传输到每个节点,还可能是就近从最近的节点的executor的bockmanager上拉取变量副本,网络传输速度大大增加;500M的内存消耗。

可见,对性能的提升和影响,还是很客观的。(10000M,500M,20倍。20倍~以上的网络传输性能消耗的降低;20倍的内存消耗的减少)虽然说,不一定会对性能产生决定性的作用。比如运行30分钟的spark作业,可能做了广播变量以后,速度快了2分钟,或者5分钟。但是一点一滴的调优,积少成多。最后还是会有效果的。没有经过任何调优手段的spark作业,16个小时;三板斧下来,就可以到5个小时;然后非常重要的一个调优,影响特别大,shuffle调优,2~3个小时;应用了10个以上的性能调优的技术点,JVM+广播,30分钟。16小时~30分钟。

4.2.3 广播变量设置方式

广播变量:很简单,其实就是调用SparkContextbroadcast()方法,传入你要广播的变量,即可。 这个broadcast()会返回一个Broadcast<T> 类型的变量,这个泛型<T>就是你要广播的变量本身的类型,指定此广播变量实际存储的类型

使用广播变量:直接调用广播变量的value() / getValue()可以获取到之前封装的变量

import org.apache.spark.broadcast.Broadcast;// 1. 定义将要广播的变量
Map<String, IntList> hourMap = new HashMap<String, IntList>();
// 2. 使用SparkContext的broadcast()将此变量广播出去,并使用Brocast<T>类型的变量保存返回结果。
//      这个泛型<T>就是你要广播的变量本身的类型,指定此广播变量实际存储的类型。
//      由于后续需要在匿名内部类访问此外部对象,因此这个hourMapBroadcast需要使用final修饰符。
final Broadcast<Map<String, IntList>> hourMapBroadcast = sc.broadcast(hourMap);
// 3. 使用广播变量的value() / getValue()来获取之前封装的广播变量
Map<String, IntList> HourMap = hourMapBroadcast.value();
再看一个例子:List<Tuple2<Long, Row>> userInfos = userid2InfoRDD.collect();
final Broadcast<List<Tuple2<Long, Row>>> userInfosBroadcast = sc.broadcast(userInfos);
// 得到用户信息map
List<Tuple2<Long, Row>> userInfos = userInfosBroadcast.value();

  1. (源码、原理请参考系列文章):
      Spark 任务调度之Driver send Task
      Spark DAG之SubmitJob
      Spark DAG之SubmitTask
      Spark DAG之SubmitStage
      Spark 任务调度之Executor执行task并返回结果 ↩︎

Spark性能调优 之 合理规划资源配置相关推荐

  1. Spark性能调优之资源分配

    **性能优化王道就是给更多资源!**机器更多了,CPU更多了,内存更多了,性能和速度上的提升,是显而易见的.基本上,在一定范围之内,增加资源与性能的提升,是成正比的:写完了一个复杂的spark作业之后 ...

  2. Spark性能调优-RDD算子调优篇

    Spark性能调优-RDD算子调优篇 RDD算子调优 1. RDD复用 在对RDD进行算子时,要避免相同的算子和计算逻辑之下对RDD进行重复的计算,如下图所示: 对上图中的RDD计算架构进行修改,得到 ...

  3. spark学习-Spark性能调优(1)

    本文要解决的问题: Spark在使用过程中不可避免的需要进行一系列的性能优化,本文就Spark性能调优的基础部分进行总结和归纳(开发调优和资源调优),参考了不少前辈的文章,在此非常感谢. 目的 在大数 ...

  4. 大数据培训:Spark性能调优与参数配置

    Spark性能调优-基础篇 众所周知,正确的参数配置对提升Spark的使用效率具有极大助力,帮助相关数据开发.分析人员更高效地使用Spark进行离线批处理和SQL报表分析等作业. 推荐参数配置模板如下 ...

  5. Spark商业案例与性能调优实战100课》第20课:大数据性能调优的本质和Spark性能调优要点分析

    Spark商业案例与性能调优实战100课>第20课:大数据性能调优的本质和Spark性能调优要点分析 基于本元想办法,大智若愚,大巧若拙!深入彻底的学习spark技术内核!

  6. Spark性能调优总结

    文章授权自 : http://www.6aiq.com/article/1547041120082 使用正确的 transformations操作 虽然开发者达到某一目标,可以通过不同的transfo ...

  7. Sparkamp;Spark性能调优实战

    Spark特别适用于多次操作特定的数据,分mem-only和mem & disk.当中mem-only:效率高,但占用大量的内存,成本非常高;mem & disk:内存用完后,会自己主 ...

  8. 大数据培训:Spark 性能调优详解

    调优概述 有的时候,我们可能会遇到大数据计算中一个最棘手的问题--数据倾斜,此时Spark作业的性能会比期望差很多.数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的 ...

  9. Kafka 性能调优实战:同等资源配置性能提升 20 几倍的秘诀

    作者 | 丁威       责编 | 欧阳姝黎 抛出问题 笔者最近在折腾数据异构体系,在实现MySQL增量数据同步到MQ(Kafka.RocketMQ),本文的故事就从这里开始. 众所周知,为了提高写 ...

最新文章

  1. leetcode-142 环形链表II
  2. Gradle Android客户端程序打包(基于gradle 2.10版本验证通过)
  3. 搬了十次家,总算搬进了自己的家
  4. 图灵机(Turing Machine)
  5. [转载] Python杂谈 | (6) numpy中array()和asarray()的区别
  6. webrtc之SVC实现(十)
  7. python3使用pickle读取文件提示TypeError或者UnicodeDecodeError的解决办法
  8. 【源代码】Image Deformation Using Moving Least Squares算法的实现
  9. iOS 信号量解决-网络异步请求的数据同步返回问题
  10. html颜色奶白色,象牙白rgb值是多少 和乳白哪个更白
  11. 各类识别、深度学习-开源代码文献梳理
  12. c语言是世界上最好的语言搞笑图片,C++是世界上最好的语言!不服来辩! | 爆笑囧图...
  13. 【dubbo系列001】dubbo是什么?dubbo解决什么问题?
  14. 简单计算机基础知识,计算机基础知识讲义
  15. 【水滴石穿】github_popular
  16. wordpress优化之实现百度站内收索
  17. 三维空间中的平面方程
  18. div控制显示与隐藏状态的两种方式
  19. e-office10.0用户连接SqlServer数据库配置说明
  20. 防抖云台-鸡头稳定 简介篇

热门文章

  1. 云主机怎么安装mysql_华为云主机安装Mysql
  2. 开源软件架构 zeromq
  3. STM32硬件I2C与软件模拟I2C超详解
  4. fastjson将date转换成了long
  5. 心肝火旺是夏季宝宝晚上睡不踏实的主要原因
  6. robocup学习篇(一)
  7. 出现“未报告的异常错误,必须对其进行捕获或声明以便抛出”的解决
  8. python3大神器_Python三大神器之pip的安装
  9. 【情景英语】英语自我介绍资料及范文
  10. 主机开机主板cpu灯和dram灯轮流亮的问题解决