公众号后台回复关键词:pyspark,获取本项目github地址。

Spark程序可以快如闪电⚡️,也可以慢如蜗牛?。

它的性能取决于用户使用它的方式。

一般来说,如果有可能,用户应当尽可能多地使用SparkSQL以取得更好的性能。

主要原因是SparkSQL是一种声明式编程风格,背后的计算引擎会自动做大量的性能优化工作。

基于RDD的Spark的性能调优属于坑非常深的领域,并且很容易踩到。

我们将介绍Spark调优原理,Spark任务监控,以及Spark调优案例。

本文参考了以下文章:

《Spark性能优化指南——基础篇》:https://tech.meituan.com/2016/04/29/spark-tuning-basic.html

《Spark性能优化指南——高级篇》:https://tech.meituan.com/2016/05/12/spark-tuning-pro.html

《spark-调节executor堆外内存》:https://www.cnblogs.com/colorchild/p/12175328.html

import findspark

#指定spark_home为刚才的解压路径,指定python路径spark_home = "/Users/liangyun/ProgramFiles/spark-3.0.1-bin-hadoop3.2"python_path = "/Users/liangyun/anaconda3/bin/python"findspark.init(spark_home,python_path)

import pyspark from pyspark.sql import SparkSession

#SparkSQL的许多功能封装在SparkSession的方法接口中

spark = SparkSession.builder \        .appName("test") \        .config("master","local[4]") \        .enableHiveSupport() \        .getOrCreate()

sc = spark.sparkContext

一,Spark调优原理

可以用下面三个公式来近似估计spark任务的执行时间。

可以用下面二个公式来说明spark在executor上的内存分配。

如果程序执行太慢,调优的顺序一般如下:

1,首先调整任务并行度,并调整partition分区。

2,尝试定位可能的重复计算,并优化之。

3,尝试定位数据倾斜问题或者计算倾斜问题并优化之。

4,如果shuffle过程提示堆外内存不足,考虑调高堆外内存。

5,如果发生OOM或者GC耗时过长,考虑提高executor-memory或降低executor-core。

以下是对上述公式中涉及到的一些概念的初步解读。

  • 任务计算总时间:假设由一台无限内存的同等CPU配置的单核机器执行该任务,所需要的运行时间。通过缓存避免重复计算,通过mapPartitions代替map以减少诸如连接数据库,预处理广播变量等重复过程,都是减少任务计算总时间的例子。

  • shuffle总时间:任务因为reduceByKey,join,sortBy等shuffle类算子会触发shuffle操作产生的磁盘读写和网络传输的总时间。shuffle操作的目的是将分布在集群中多个节点上的同一个key的数据,拉取到同一个节点上,以便让一个节点对同一个key的所有数据进行统一处理。shuffle过程首先是前一个stage的一个shuffle write即写磁盘过程,中间是一个网络传输过程,然后是后一个stage的一个shuffle read即读磁盘过程。shuffle过程既包括磁盘读写,又包括网络传输,非常耗时。因此如有可能,应当避免使用shuffle类算子。例如用map+broadcast的方式代替join过程。退而求其次,也可以在shuffle之前对相同key的数据进行归并,减少shuffle读写和传输的数据量。此外,还可以应用一些较为高效的shuffle算子来代替低效的shuffle算子。例如用reduceByKey/aggregateByKey来代替groupByKey。最后,shuffle在进行网络传输的过程中会通过netty使用JVM堆外内存,spark任务中大规模数据的shuffle可能会导致堆外内存不足,导致任务挂掉,这时候需要在配置文件中调大堆外内存。

  • GC垃圾回收总时间:当JVM中execution内存不足时,会启动GC垃圾回收过程。执行GC过程时候,用户线程会终止等待。因此如果execution内存不够充分,会触发较多的GC过程,消耗较多的时间。在spark2.0之后excution内存和storage内存是统一分配的,不必调整excution内存占比,可以提高executor-memory来降低这种可能。或者减少executor-cores来降低这种可能(这会导致任务并行度的降低)。

  • 任务有效并行度:任务实际上平均被多少个core执行。它首先取决于可用的core数量。当partition分区数量少于可用的core数量时,只会有partition分区数量的core执行任务,因此一般设置分区数是可用core数量的2倍以上20倍以下。此外任务有效并行度严重受到数据倾斜和计算倾斜的影响。有时候我们会看到99%的partition上的数据几分钟就执行完成了,但是有1%的partition上的数据却要执行几个小时。这时候一般是发生了数据倾斜或者计算倾斜。这个时候,我们说,任务实际上有效的并行度会很低,因为在后面的这几个小时的绝大部分时间,只有很少的几个core在执行任务。

  • 任务并行度:任务可用core的数量。它等于申请到的executor数量和每个executor的core数量的乘积。可以在spark-submit时候用num-executor和executor-cores来控制并行度。此外,也可以开启spark.dynamicAllocation.enabled根据任务耗时动态增减executor数量。虽然提高executor-cores也能够提高并行度,但是当计算需要占用较大的存储时,不宜设置较高的executor-cores数量,否则可能会导致executor内存不足发生内存溢出OOM。

  • partition分区数量:分区数量越大,单个分区的数据量越小,任务在不同的core上的数量分配会越均匀,有助于提升任务有效并行度。但partition数量过大,会导致更多的数据加载时间,一般设置分区数是可用core数量的2倍以上20倍以下。可以在spark-submit中用spark.default.parallelism来控制RDD的默认分区数量,可以用spark.sql.shuffle.partitions来控制SparkSQL中给shuffle过程的分区数量。

  • 数据倾斜度:数据倾斜指的是数据量在不同的partition上分配不均匀。一般来说,shuffle算子容易产生数据倾斜现象,某个key上聚合的数据量可能会百万千万之多,而大部分key聚合的数据量却只有几十几百个。一个partition上过大的数据量不仅需要耗费大量的计算时间,而且容易出现OOM。对于数据倾斜,一种简单的缓解方案是增大partition分区数量,但不能从根本上解决问题。一种较好的解决方案是利用随机数构造数量为原始key数量1000倍的中间key。大概步骤如下,利用1到1000的随机数和当前key组合成中间key,中间key的数据倾斜程度只有原来的1/1000, 先对中间key执行一次shuffle操作,得到一个数据量少得多的中间结果,然后再对我们关心的原始key进行shuffle,得到一个最终结果。

  • 计算倾斜度:计算倾斜指的是不同partition上的数据量相差不大,但是计算耗时相差巨大。考虑这样一个例子,我们的RDD的每一行是一个列表,我们要计算每一行中这个列表中的数两两乘积之和,这个计算的复杂度是和列表长度的平方成正比的,因此如果有一个列表的长度是其它列表平均长度的10倍,那么计算这一行的时间将会是其它列表的100倍,从而产生计算倾斜。计算倾斜和数据倾斜的表现非常相似,我们会看到99%的partition上的数据几分钟就执行完成了,但是有1%的partition上的数据却要执行几个小时。计算倾斜和shuffle无关,在map端就可以发生。计算倾斜出现后,一般可以通过舍去极端数据或者改变计算方法优化性能。

  • 堆内内存:on-heap memory, 即Java虚拟机直接管理的存储,由JVM负责垃圾回收GC。由多个core共享,core越多,每个core实际能使用的内存越少。core设置得过大容易导致OOM,并使得GC时间增加。

  • 堆外内存:off-heap memory, 不受JVM管理的内存,  可以精确控制申请和释放, 没有GC问题。一般shuffle过程在进行网络传输的过程中会通过netty使用到堆外内存。

二,Spark任务UI监控

Spark任务启动后,可以在浏览器中输入 http://localhost:4040/ 进入到spark web UI 监控界面。

该界面中可以从多个维度以直观的方式非常细粒度地查看Spark任务的执行情况,包括任务进度,耗时分析,存储分析,shuffle数据量大小等。

最常查看的页面是 Stages页面和Excutors页面。

Jobs:每一个Action操作对应一个Job,以Job粒度显示Application进度。有时间轴Timeline。

Stages:Job在遇到shuffle切开Stage,显示每个Stage进度,以及shuffle数据量。

可以点击某个Stage进入详情页,查看其下面每个Task的执行情况以及各个partition执行的费时统计。

Storage:

监控cache或者persist导致的数据存储大小。

Environment:
显示spark和scala版本,依赖的各种jar包及其版本。

Excutors : 监控各个Excutors的存储和shuffle情况。

SQL: 显示各种SQL命令在那些Jobs中被执行。

三,Spark调优案例

下面介绍几个调优的典型案例:

1,资源配置优化

2,利用缓存减少重复计算

3,数据倾斜调优

4,broadcast+map代替join

5,reduceByKey/aggregateByKey代替groupByKey

1,资源配置优化

下面是一个资源配置的例子:

优化前:

#提交python写的任务spark-submit --master yarn \--deploy-mode cluster \--executor-memory 12G \--driver-memory 12G \--num-executors 100 \--executor-cores 8 \--conf spark.yarn.maxAppAttempts=2 \--conf spark.task.maxFailures=10 \--conf spark.stage.maxConsecutiveAttempts=10 \--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON = ./anaconda3.zip/anaconda3/bin/python  #cluster模式时候设置--archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境--files  data.csv,profile.txt--py-files  pkg.py,tqdm.pypyspark_demo.py 

优化后:

#提交python写的任务spark-submit --master yarn \--deploy-mode cluster \--executor-memory 12G \--driver-memory 12G \--num-executors 100 \--executor-cores 2 \--conf spark.yarn.maxAppAttempts=2 \--conf spark.default.parallelism=1600 \--conf spark.sql.shuffle.partitions=1600 \--conf spark.memory.offHeap.enabled=true \--conf spark.memory.offHeap.size=2g\--conf spark.task.maxFailures=10 \--conf spark.stage.maxConsecutiveAttempts=10 \--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON = ./anaconda3.zip/anaconda3/bin/python  #cluster模式时候设置--archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境--files  data.csv,profile.txt--py-files  pkg.py,tqdm.pypyspark_demo.py 

这里主要减小了 executor-cores数量,一般设置为1~4,过大的数量可能会造成每个core计算和存储资源不足产生OOM,也会增加GC时间。此外也将默认分区数调到了1600,并设置了2G的堆外内存。

2, 利用缓存减少重复计算

%%time# 优化前:import math rdd_x = sc.parallelize(range(0,2000000,3),3)rdd_y = sc.parallelize(range(2000000,4000000,2),3)rdd_z = sc.parallelize(range(4000000,6000000,2),3)rdd_data = rdd_x.union(rdd_y).union(rdd_z).map(lambda x:math.tan(x))s = rdd_data.reduce(lambda a,b:a+b+0.0)n = rdd_data.count()mean = s/n print(mean)

-1.889935655259299CPU times: user 40.2 ms, sys: 12.4 ms, total: 52.6 msWall time: 2.76 s
%%time # 优化后: import math from  pyspark.storagelevel import StorageLevelrdd_x = sc.parallelize(range(0,2000000,3),3)rdd_y = sc.parallelize(range(2000000,4000000,2),3)rdd_z = sc.parallelize(range(4000000,6000000,2),3)rdd_data = rdd_x.union(rdd_y).union(rdd_z).map(lambda x:math.tan(x)).persist(StorageLevel.MEMORY_AND_DISK)

s = rdd_data.reduce(lambda a,b:a+b+0.0)n = rdd_data.count()mean = s/n rdd_data.unpersist()print(mean)

-1.889935655259299CPU times: user 40.5 ms, sys: 11.5 ms, total: 52 msWall time: 2.18 s

3, 数据倾斜调优

%%time # 优化前: rdd_data = sc.parallelize(["hello world"]*1000000+["good morning"]*10000+["I love spark"]*10000)rdd_word = rdd_data.flatMap(lambda x:x.split(" "))rdd_one = rdd_word.map(lambda x:(x,1))rdd_count = rdd_one.reduceByKey(lambda a,b:a+b+0.0)print(rdd_count.collect()) 

[('good', 10000.0), ('hello', 1000000.0), ('spark', 10000.0), ('world', 1000000.0), ('love', 10000.0), ('morning', 10000.0), ('I', 10000.0)]CPU times: user 285 ms, sys: 27.6 ms, total: 313 msWall time: 2.74 s
%%time # 优化后: import random rdd_data = sc.parallelize(["hello world"]*1000000+["good morning"]*10000+["I love spark"]*10000)rdd_word = rdd_data.flatMap(lambda x:x.split(" "))rdd_one = rdd_word.map(lambda x:(x,1))rdd_mid_key = rdd_one.map(lambda x:(x[0]+"_"+str(random.randint(0,999)),x[1]))rdd_mid_count = rdd_mid_key.reduceByKey(lambda a,b:a+b+0.0)rdd_count = rdd_mid_count.map(lambda x:(x[0].split("_")[0],x[1])).reduceByKey(lambda a,b:a+b+0.0)print(rdd_count.collect())  

#作者按:此处仅示范原理,单机上该优化方案难以获得性能优势

[('good', 10000.0), ('hello', 1000000.0), ('spark', 10000.0), ('world', 1000000.0), ('love', 10000.0), ('morning', 10000.0), ('I', 10000.0)]CPU times: user 351 ms, sys: 51 ms, total: 402 msWall time: 7 s

4, broadcast+map代替join

该优化策略一般限于有一个参与join的rdd的数据量不大的情况。

%%time # 优化前:

rdd_age = sc.parallelize([("LiLei",18),("HanMeimei",19),("Jim",17),("LiLy",20)])rdd_gender = sc.parallelize([("LiLei","male"),("HanMeimei","female"),("Jim","male"),("LiLy","female")])rdd_students = rdd_age.join(rdd_gender).map(lambda x:(x[0],x[1][0],x[1][1]))

print(rdd_students.collect())

[('LiLy', 20, 'female'), ('LiLei', 18, 'male'), ('HanMeimei', 19, 'female'), ('Jim', 17, 'male')]CPU times: user 43.9 ms, sys: 11.6 ms, total: 55.6 msWall time: 307 ms
%%time 

# 优化后:rdd_age = sc.parallelize([("LiLei",18),("HanMeimei",19),("Jim",17),("LiLy",20)])rdd_gender = sc.parallelize([("LiLei","male"),("HanMeimei","female"),("Jim","male"),("LiLy","female")],2)ages = rdd_age.collect()broads = sc.broadcast(ages)

def get_age(it):    result = []    ages = dict(broads.value)    for x in it:        name = x[0]        age = ages.get(name,0)        result.append((x[0],age,x[1]))    return iter(result)

rdd_students = rdd_gender.mapPartitions(get_age)

print(rdd_students.collect())

[('LiLei', 18, 'male'), ('HanMeimei', 19, 'female'), ('Jim', 17, 'male'), ('LiLy', 20, 'female')]CPU times: user 14.3 ms, sys: 7.43 ms, total: 21.7 msWall time: 86.3 ms

5,reduceByKey/aggregateByKey代替groupByKey

groupByKey算子是一个低效的算子,其会产生大量的shuffle。其功能可以用reduceByKey和aggreagateByKey代替,通过在每个partition内部先做一次数据的合并操作,大大减少了shuffle的数据量。

%%time # 优化前:rdd_students = sc.parallelize([("class1","LiLei"),("class2","HanMeimei"),("class1","Lucy"),                               ("class1","Ann"),("class1","Jim"),("class2","Lily")])rdd_names = rdd_students.groupByKey().map(lambda t:(t[0],list(t[1])))names = rdd_names.collect()print(names)

[('class1', ['LiLei', 'Lucy', 'Ann', 'Jim']), ('class2', ['HanMeimei', 'Lily'])]CPU times: user 25.3 ms, sys: 7.32 ms, total: 32.6 msWall time: 164 ms
%%time # 优化后:rdd_students = sc.parallelize([("class1","LiLei"),("class2","HanMeimei"),("class1","Lucy"),                               ("class1","Ann"),("class1","Jim"),("class2","Lily")])rdd_names = rdd_students.aggregateByKey([],lambda arr,name:arr+[name],lambda arr1,arr2:arr1+arr2)

names = rdd_names.collect()print(names)
[('class1', ['LiLei', 'Lucy', 'Ann', 'Jim']), ('class2', ['HanMeimei', 'Lily'])]CPU times: user 21.6 ms, sys: 6.63 ms, total: 28.3 msWall time: 118 ms

公众号后台回复关键字:pyspark,获取《eat pyspark in 10 days》 github项目地址。

spark 算子例子_Spark性能调优方法相关推荐

  1. 浅谈Spark应用程序的性能调优

    浅谈Spark应用程序的性能调优 :http://geek.csdn.net/news/detail/51819 下面列出的这些API会导致Shuffle操作,是数据倾斜可能发生的关键点所在  1. ...

  2. linux mysql数据库优化_MySQL_Linux下MySQL数据库性能调优方法,以下的环境具备一定的代表性 - phpStudy...

    Linux下MySQL数据库性能调优方法 以下的环境具备一定的代表性,可以说是中小企业一般配置和工作环境.希望通过本文能让大家理解Linux下MySQL数据库性能调优方法. 硬件准备环境: 硬盘: 1 ...

  3. 透明大页相关内核参数_Alibaba Cloud Linux 2系统中与透明大页THP相关的性能调优方法...

    免责声明:本文档可能包含第三方产品信息,该信息仅供参考.阿里云对第三方产品的性能.可靠性以及操作可能带来的潜在影响,不做任何暗示或其他形式的承诺. 概述 本文主要介绍在Alibaba Cloud Li ...

  4. Spark商业案例与性能调优实战100课》第16课:商业案例之NBA篮球运动员大数据分析系统架构和实现思路

    Spark商业案例与性能调优实战100课>第16课:商业案例之NBA篮球运动员大数据分析系统架构和实现思路 http://www.basketball-reference.com/leagues ...

  5. Spark商业案例与性能调优实战100课》第2课:商业案例之通过RDD实现分析大数据电影点评系统中电影流行度分析

    Spark商业案例与性能调优实战100课>第2课:商业案例之通过RDD实现分析大数据电影点评系统中电影流行度分析 package com.dt.spark.coresimport org.apa ...

  6. 《Spark商业案例与性能调优实战100课》第17课:商业案例之NBA篮球运动员大数据分析系统代码实战

    <<<Spark商业案例与性能调优实战100课>第17课:商业案例之NBA篮球运动员大数据分析系统代码实战

  7. 《Spark商业案例与性能调优实战100课》第18课:商业案例之NBA篮球运动员大数据分析代码实战之核心基础数据项编写

    <Spark商业案例与性能调优实战100课>第18课:商业案例之NBA篮球运动员大数据分析代码实战之核心基础数据项编写

  8. 《Spark商业案例与性能调优实战100课》第15课:商业案例之纯粹通过DataSet进行电商交互式分析系统中各种类型TopN分析实战详解

    <Spark商业案例与性能调优实战100课>第15课:商业案例之纯粹通过DataSet进行电商交互式分析系统中各种类型TopN分析实战详解

  9. 《Spark商业案例与性能调优实战100课》第25课:Spark Hash Shuffle源码解读与剖析

    <Spark商业案例与性能调优实战100课>第25课:Spark Hash Shuffle源码解读与剖析

最新文章

  1. 解决win7不能上网的问题
  2. 云VS本地,一言难尽的ERP
  3. 2017 Multi-University Training Contest - Team 2——HDU6045HDU6047HDU6055
  4. python合并视频和音频_真没想到,Python 还能实现 5 毛特效
  5. 漫步数理统计二十一——变换:随机向量
  6. Django框架详细介绍---认证系统
  7. php检查数组下标是否,php检查数组下标是否存在
  8. Go panic的学习
  9. 安装Aira2的三种方法(包括一键安装命令,epel源安装,源码安装)
  10. 如何用PYTHON代码写出音乐
  11. John Peluso、Chris 谈话筒设计及应用
  12. 深度学习撞上推荐系统——02 GFM MLP NCF Pytorch代码实现
  13. matlab实现从s域变成z域、matlab实现长除法逆z变换实例
  14. Linux:makefile简易入门
  15. 五步看平台,选好安全的MT4交易外汇平台
  16. “自动修复“无法修复你的电脑,日志文件:C:\Windows\System32\Logfiles\Srt\SrtTrail.txt
  17. Cadence Allegro修改大小十字光标方法图文教程及视频演示
  18. php mhash,PHP加密扩展库-Mhash扩展库实例用法详解
  19. R中如何下载pima-indians-diabetes数据库
  20. JAVA毕业设计汉字幼教系统计算机源码+lw文档+系统+调试部署+数据库

热门文章

  1. 1 jquery对checkbox的简单操作
  2. 关于DateTime和String转换的容易犯得错误
  3. java I O类大全_Java I/O最简单的几个类
  4. 7-3 堆中的路径 (25 分)
  5. linux重启mysql一直_LINUX重启MYSQL的命令
  6. python 导入包 作用域_Python 包、模块、函数、变量作用域
  7. 真实AIS数据,解码,可视化
  8. mysql获取当前时间毫秒_MySQL性能指标及计算方法
  9. pytorch卷积神经网络_资源|卷积神经网络迁移学习pytorch实战推荐
  10. matlab 条形图横坐标,Matlab条形图bar横坐标间距设置