知乎导入md文件会失真,无语,将就着看吧!原博客地址为:Spark高性能Job

1.1 Job

遇到一个action算子就会提交一个job,常见的transformation算子以及Action算子:

  • Transformation

    • map, mapPartitions, flatMap, filter, union, groupbyKey, repartition, cache
  • Action
    • reduce, collect, show, count, foreach, save一系列操作

1.2 Task

​ task是spark最小的执行单元,task的数量就是stage的并行度,分配给不同的executor去执行。RDD在计算的时候,每个分区都会启一个task,这就是我们常说的数据并行!在map阶段,partition分区的数量保持不变,在reduce阶段,RDD聚合会触发shuffle操作。

2 Spark Shuffle

shuffle是spark job中一个比较重要的阶段,发生在map与reduce之间。

2.1 举例分析

对于上述的reduceByKey,涉及到需要将相同的key进行聚合。对于Stage1中的每个分区的数据,其输入可能存在于Stage0中的每个分区,因此需要从上游的每一个分区所在的机器拉取数据,这个过程称为shuffle。

2.2 Shuffle Write

shuffle write操作发生在ShuffleMapTask,Spark中的task分为以下两种类型:

  • ShuffleMapTask
  • 负责rdd之间的transform,map的输出也就是shuffle write
  • ResultTask
  • job最后阶段的执行任务,也就是action操作。

2.2.1 shuffle write分析

  • Hash Based Shuffle

​ 上图有四个ShuffleMapTask,假设在这四个都在一个worker node上运行,CPU的核为2,可以同时运行2个task(一个核运行两个线程,可能是超线程技术)。

​ 那么每个执行shuffle write的task,要为下一个stage创建多少个磁盘文件呢?很简单,下一个stage的task有多少个,当前stage的每个Map task就要创建多少份磁盘文件。比如下一个stage总共有100个task,那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个task,总共有10个Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。

每个task包含R个缓冲区,R=Reducer的个数(也就是下个stage中task的个数),缓冲区被称为bucket。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去

但上述可能会出现下面几个问题:

  • 本地磁盘根据bucket产生的blockfile很多,ShuffleMap task产生R个blockfile,M个ShuffleMapTask产生M×R个文件。一般Spark Job的M与R都很大,因此磁盘上会有大量的blockfile文件
  • 缓冲区内存占用空间大
    每个 ShuffleMapTask 需要开 R 个 bucket,M 个 ShuffleMapTask 就会产生 M * R 个 bucket。实际情况下,在一个worker node上,可以并行运行cores个ShuffleMapTask,一个机器上的bucket个数达到cores×R个,这会占用大量的内存空间。

对于第二个问题,由于从内存往磁盘写数据一定得开缓冲区(内存与磁盘速度不匹配),所以对于第二个问题而言,没有较好的方法解决!但第一个问题可以通过下面的方法解决。

  • Consolidation机制的Shuffle

​ 在一个 core 上连续执行的 ShuffleMapTasks 可以共用一个输出文件 ShuffleFile。先执行完ShuffleMapTask 形成 ShuffleBlock i,后执行的 ShuffleMapTask 可以将输出数据直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i',每个 ShuffleBlock 被称为 FileSegment。下一个 stage 的 reducer 只需要 fetch 整个 ShuffleFile 就行了。这样,每个 worker 持有的文件数降为 cores * R。

​ 假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量。也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。

  • 在map阶段,除了map的业务逻辑外,还有shuffle write的过程,这个过程涉及到序列化、磁盘IO等耗时操作;在reduce阶段,除了reduce的业务逻辑外,还有前面shuffle read过程,这个过程涉及到网络IO、反序列化等耗时操作。

2.3 Shuffle Read

  • 前一个stage的ShuffleMapTask进行shuffle write,把数据存储在blockManager中,并且把数据位置元信息上报到driver的mapOutTrack组件,下一个stage根据数据位置元信息,进行shuffle read,拉取上一个stage的输出数据。
  • shuffle read实际是从上游executor以block为单位获取数据。当数据分布均匀的时候,导致下游某个partition过大。即上游某个block太大,就会出现OOM。

2.3.1 shuffle read分析

​ 执行shuffle read时,要将数据从MapPartitionsRDD 中的数据 fetch 过来。有一个问题?Reducer怎么知道应该去哪里找需要fetch的数据?答案是:在进行shuffle write的时候,会把数据位置元信息上报到driver的mapOutTrack组件,下一个stage根据数据位置元信息,进行shuffle read,拉取上一个stage的输出数据。

2.4 transform算子对应的shuffle read举例

  • reduceByKey

reduceByKey是在fetch的同时进行reduce操作。方式是类似于Spark中aggregateByKey的方式,fetch来的数据设置一种数据结构比如HashMap,方便aggregate。reduce阶段fetch来的数据被逐个aggreagte到HashMap中,等所有记录都进入到了HashMap即完成了Reduce任务。注意,在reduce前的map阶段,Spark需要很多小buffer来存储bucket到磁盘!

  • groupByKey
  • distinct
  • cogroup
  • intersection(otherRDD)
  • join(otherRDD, numPartitions)
  • sortByKey

2.5 Shuffle Read中的HashMap

HashMap是Spark Shuffle read过程中频繁使用的、用于aggregate 的数据结构。Spark 设计了两种:一种是全内存的 AppendOnlyMap,另一种是内存+磁盘的 ExternalAppendOnlyMap。(参考链接:https://cloud.tencent.com/developer/article/1085719解释的非常详细)

3 性能优化

3.1 对多次使用的RDD进行持久化

3.2 进来避免使用shuffle算子

  • shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中(也就是我们的blockfile),然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。
  • 常见的shuffle算子有reduceByKey、join、distinct、repartition

3.2.1 举例:使用BroadCast与map来代替join算子

​ 传统的join操作会导致shuffle操作,为什么?不同父RDD中相同的key会通过网络拉取到一个节点上,也就是我们的宽依赖操作然后由该节点上的一个task进行join操作。

改进:

数据量较小的RDD作为广播变量发到每个工作节点,然后在执行join。在rdd1.map中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。然后遍历rdd2的所有数据,若发现某条记录的key与rdd1中当前数据的key相同,则进行join(或者使用rdd2DataBroadcast来与rdd1进行join)。需要注意的是,上述操作,仅仅在rdd2较小(几百兆或者1G)下使用,因为在每个Executor的内存中,都会驻留一份rdd2的全量数据。

val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
val rdd3 = rdd1.map(rdd2DataBroadcast...)

3.3 使用map-side预聚合的shuffle算子

  • map-side预聚合
  • 在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合后,每个节点本地只会有一条相同的key,因为多于相同的key都被聚合起来了。reducer在拉取所有节点上相同的key时,就会大大减少拉取的数据数量。
  • 尽量使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子会使用用户自定义函数对每个节点本地相同的key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。

groupByKey算子示例:

reduceByKey算子示例:

3.4 使用高性能算子

  • 使用mapPartitions代替普通map
  • 一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条。但是单次函数调用就要处理掉一个partition所有的数据,如果内存不够,很可能出现OOM异常

3.5 广播大变量

  • 当算子函数使用外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,每个task都会保留一份变量副本。如果变量本身较大,那么大量的变量副本在网络传输中非常消耗性能(有多少个task就要传递多少个变量副本)。

解决方法:

​ 使用Spark广播。广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本

~~~ // 以下代码在算子函数中,使用了外部的变量。 // 此时没有做任何特殊操作,每个task都会有一份list1的副本。 val list1 = ... rdd1.map(list1...)

// 以下代码将list1封装成了Broadcast类型的广播变量。 // 在算子函数中,使用广播变量时,首先会判断当前task所在Executor内存中,是否有变量副本。 // 如果有则直接使用;如果没有则从Driver或者其他Executor节点上远程拉取一份放到本地Executor内存中。 // 每个Executor内存中,就只会驻留一份广播变量副本。 val list1 = ... val list1Broadcast = sc.broadcast(list1) rdd1.map(list1Broadcast...) ~~~

3.6 spark-submit的参数设置

  • 使用spark-submit提交作业,该作业会启动一个对应的Driver进程。根据部署模式的不同,Driver进程可能会在本地启动(local模式),或者集群中某一节点启动(yarn模式)。
  • Driver向Yarn集群管理器申请运行Spark作业的资源,这里的资源指的是Executor进程。Yarn会根据Spark作业设置的参数,在各个工作节点上,启动一定数量的Executor进程,每个进程都占有一定数量的内存以及cpu core
  • 申请好作业执行资源后,Driver进程会调度执行作业代码!首先,Driver进程将作业分成多个stage,并为每个stage创建一批task,然后将各个task分配到各个Executor中执行(在Executor中执行的每个task可以想象为线程)。当一个stage执行结束,会在各个节点本地进行shuffle write,写入中间计算结果。然后Driver开始调度执行下一个stage。
  • 当我们在代码中执行了cache/persist等持久化操作时,根据我们选择的持久化级别的不同,每个task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中

task的执行速度是跟每个Executor进程的CPU core数量有直接关系的。一个cpu core同一时间只能执行一个线程,而每个Executor进程上分配得到的多个task,多是以多个线程并发运行的。(多个线程抢占Executor的CPU core运行)。如果Executor上的cpu core << task数量,那么每个cpu core会轮询task,有些task会被抢占等待资源,使得作业变慢。

  • num-executors
  • 设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。
  • executor-memory
  • 如果每个executor的内存太小,可能会出现OOM
  • executor-cores
  • 这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程
  • spark.default.parallelism
  • 设置stage默认的task数量
  • 该参数非常重要,会影响到总的task数量。如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。因为,无论你的Executor进程有多少个,内存与CPU有多大,但是如果task总量少,Executor进程的资源无法得到充分使用!建议:设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源
  • spark.storage.memoryFraction
  • Executor的内存分配
    • 第一块是让task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;第二块是让task通过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;第三块是让RDD持久化时使用,默认占Executor总内存的60%
  • 该参数在作业中有持久化RDD的时候才需要设置。
  • spark.shuffle.memoryFraction
  • 该参数设置shuffle过程中,一个task拉取到上一个stage的task输出后,进行reduce的时候使用的Executor内存的比例。当设置为0.2,Executor默认只有20%的内存来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
  • 设置的建议:
    如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能

4 性能优化高阶----数据倾斜

4.1什么是数据倾斜

​ 在进行shuffle操作的时候,将各个节点上相同的key拉取到某个节点上的task来执行(shuffle read阶段)。如果某个key对应的数据量非常大,就会发生数据倾斜。比如,大部分key对应只有10条数据,但个别key有100万,那么大部分task可能只会分配到10条数据(这是reduce阶段决定的,相同的key发往同一个task进行reduce),然后1秒运行结束。但个别task会分配超过100万数据,所以要运行很久。

如图所示,在reduce对应的task中,处理hello的task线程处理的数据量很大,需要很长时间的运行。

4.2 可能会发生数据倾斜的算子

数据倾斜会发生在shuffle过程中,一下算子可能会触发shuffle:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition

4.2.1 某个task执行非常慢的情况

​ 某个task执行非常慢的情况:查看作业当前运行的stage下每个task分配的数据量

​ 在Web UI上可以看到每个stage各个task的分配数据量大小,从而进一步确定是不是task分配的数据不均匀导致。(在Web UI stage栏目可以看到每个stage下并行的task分配的数据量)如果不出意外:会看到有的task运行非常快,几秒钟; 有的task运行非常慢,仅仅从时间上已经能看出数据倾斜了!在通过查看每个task处理的数据量,可以看到处理的数据量也有很大的区别。在知道是哪个stage发生了数据倾斜之后,根据stage的划分原理。推算到Spark作业中对应哪行代码有问题!

4.2.2 某个task出现莫名其妙的内存溢出

通过Web UI使用相同的方法查看

4.2.3 查看导致数据倾斜的key的数据分布情况

​ 通过RDD的countByKey,查看RDD中不同key的数据量分布。然后对统计出来的各个key出现的次数,collect/take到客户端打印一下,就可以看到key的分布情况。

val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))

4.3 数据倾斜的解决方案

4.3.1 使用Hive ETL预处理数据

4.3.2 过滤少数倾斜的key

​ 如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜。

​ 实际操作:采取每次执行前先进行采样,计算出样本中数据量最大的几个key之后,直接在程序中将那些key给过滤掉。

4.3.3 提高shuffle操作的并行度

​ 在对RDD进行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数设置了shuffle算子执行时shuffle read task的数量

​ 原理:让原本分配给一个task的多个keu分配多个task,从而让每个task处理比原来更少的数据。缺点:无法应对极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定还是会分配到一个task中去处理,因此注定还是会发生数据倾斜的。

4.3.4 两阶段聚合(局部聚合和全局聚合)

​ 对RDD执行reduceByKey等聚合类shuffle算子时使用。第一阶段聚合,先给每个key都打上一个随机数,此时原来的key就不同了。比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。

​ 原理:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题

​ 方案缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案

4.3.5 将reduce join改为map join

​ 非常厉害的一个思路!!!!!!!!!!!!!!!!!!!!

​ 使用场景:进行join的两个RDD,其中一个RDD或表的数据量比较小(比如几百M或者一两G)

思路:Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。

​ 优点:对join操作导致的数据倾斜,效果非常好,因为根本就不会发生shuffle,也就根本不会发生数据倾斜

​ 缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。

4.3.4 采样倾斜key并分拆join操作

​ 使用场景:当join的两张表都很大,无法使用上面的方案!如果出现数据倾斜,是因为其中某一个RDD/Hive表中的少数几个key的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均匀,那么采用这个解决方案是比较合适的。

​ 缺点:如果导致倾斜的key特别多的话,比如成千上万个key都导致数据倾斜,那么这种方式也不适合。

4.4 Spark Shuffle的多种机制

  • 未经优化的HashShuffleManager
  • consolidate机制的HashShuffleManager
  • SortShuffleManager
  • 普通运行机制
  • bypass机制
  • spark.shuffle.maneger

默认值是sort,该参数用于设置ShuffleManager的类型

  • spark.shuffle.consolidateFiles

默认值false,如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。

spark union 会引起shuffle吗_Spark高性能Job相关推荐

  1. spark union 会引起shuffle吗_脑鸣不是耳鸣 会引起听力下降吗?

    脑鸣也会出现听力下降? 头响即脑鸣的别称.多因脑髓空虚,或因火郁,痰湿阻滞所致,以自觉脑内如虫蛀鸣响为主要表现的脑神疾病.病人自觉脑内有各种各样的声音,如流水声.嗡嗡声.鸟鸣声.虫鸣等,可时响不停,或 ...

  2. spark union 会引起shuffle吗_面肌痉挛会引起肌肉萎缩吗

    民间总是流传「左眼跳财右眼跳灾」的说法,甚至有一首歌也唱道「左眼皮跳跳,好事要来到」.但科学的来说,眼皮跳和运气没什么太大关系,反而预示着某些健康问题. 眼皮跳动其实和面神经有很大关系,大脑又掌控着面 ...

  3. Spark性能优化:Shuffle调优篇

    Spark性能优化:Shuffle调优篇 一.调优概述 大多数Spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘IO.序列化.网络数据传输等操作.因此,如果要让作业的性 ...

  4. Spark性能优化之-shuffle调优

    文章目录 概述 ShuffleManager发展概述 HashShuffleManager运行原理 未经优化的HashShuffleManager 优化后的HashShuffleManager Sor ...

  5. Spark技术内幕:Shuffle Read的整体流程

    回忆一下,每个Stage的上边界,要么需要从外部存储读取数据,要么需要读取上一个Stage的输出:而下边界,要么是需要写入本地文件系统(需要Shuffle),以供childStage读取,要么是最后一 ...

  6. Spark优化一则 - 减少Shuffle

    看了Spark Summit 2014的A Deeper Understanding of Spark Internals,视频(要科学上网)详细讲解了Spark的工作原理,Slides的45页给原始 ...

  7. spark 写mysql 设置主键_Spark Sql 连接mysql

    1.基本概念和用法(摘自spark官方文档中文版) Spark SQL 还有一个能够使用 JDBC 从其他数据库读取数据的数据源.当使用 JDBC 访问其它数据库时,应该首选 JdbcRDD.这是因为 ...

  8. spark 把一列数据合并_Spark Java-合并同一列多行 - java

    我正在使用Java Spark,并且有1个这样的数据框 +---+-----+------+ |id |color|datas | +----------------+ |1 |blue |data1 ...

  9. spark如何解决文件不存在_Spark Read.json无法找到文件

    嘿,我在AWS上拥有1个Master和1个Slave Node Standalone Spark Cluster . 我的主目录名为〜/ Notebooks . 这是我启动jupyter笔记本并在浏览 ...

最新文章

  1. Android渗透测试Android渗透测试入门教程大学霸
  2. 被程序员忽视的位运算
  3. 程序员远程办公需要面临哪些挑战?
  4. 算法高级(36)-如何利用并行提高算法的执行效率?
  5. 04数据库的高级查询
  6. 5个月自学,零基础至日语2级全攻略
  7. 如何调试 chrome插件
  8. 在angular2项目里使用ng-zorro的icon
  9. 与10.110.12.30 mask 255.255.255.224属于同一网段的主机IP地址是?
  10. [C#]SignalR实现扫码登录(B/S,C/S)
  11. 【万里征程——Windows App开发】SemanticZoom视图切换
  12. [leetcode]Unique Paths II
  13. 为什么把文字图片放大后有一圈彩色的像素方块,而不是纯黑色的
  14. 岛屿数量JAVA_LeetCode刷题 463. Island Perimeter 岛屿的周长 Java
  15. 客制化 GH60 XD60 像 Poker 一样的 60% 机械键盘 (2) 采购以及组装
  16. 新时代高校辅导员工作素养的新要求及提升路径(非原创)
  17. 第七章:项目成本管理 - (7.2 估算成本)
  18. 最强S60直板机王!诺基亚N系列三代N73(转载)
  19. flutter 配置charles抓包
  20. 销项发票采集工具需求分析及设计分享

热门文章

  1. PyTorch grad 与 Optimizer(params) 区别
  2. 计算机网络自顶向下方法华为路由器配置OSPFv3路由协议实现端到端的通信
  3. OPPOWatch3什么时候发布 OPPOWatch3配置如何
  4. 用计算机弹出记事本,win7电脑开机就会弹出Desktop.ini记事本怎么办?
  5. 信源编码算法(费诺编码哈夫曼编码)
  6. 有个漂亮女朋友是种怎样的体验?爬取知乎2.2亿的阅读量的话题
  7. php 中等职业教材,中等职业教育计算机专业系列教材
  8. 徐直军、何小鹏等大咖加盟,2021互联网岳麓峰会即将重磅开幕
  9. Unity 粒子制作简单飞舞纸片特效
  10. 关于学校邮箱收不到matlab验证短信