一 常规性能调优

1 . 分配更多资源

--num-executors 3 \  配置executor的数量

--driver-memory 100m \  配置driver的内存(影响不大)

--executor-memory 100m \  配置每个executor的内存大小

--executor-cores 3 \  配置每个executor的cpu core数量

增加每个executor的内存量。增加了内存量以后,对性能的提升,有两点:

1、如果需要对RDD进行cache,那么更多的内存,就可以缓存更多的数据,将更少的数据写入磁盘,甚至不写入磁盘。减少了磁盘IO。

2、对于shuffle操作,reduce端,会需要内存来存放拉取的数据并进行聚合。如果内存不够,也会写入磁盘。如果给executor分配更多内存以后,就有更少的数据,需要写入磁盘,甚至不需要写入磁盘。减少了磁盘IO,提升了性能。

3、对于task的执行,可能会创建很多对象。如果内存比较小,可能会频繁导致JVM堆内存满了,然后频繁GC,垃圾回收,minor GC和full GC。(速度很慢)。内存加大以后,带来更少的GC,垃圾回收,避免了速度变慢,速度变快了。

2.调节并行度

很简单的道理,只要合理设置并行度,就可以完全充分利用你的集群计算资源,并且减少每个task要处理的数据量,最终,就是提升你的整个Spark作业的性能和运行速度。


3.重构RDD架构以及RDD持久化

第一,RDD架构重构与优化 尽量去复用RDD,差不多的RDD,可以抽取称为一个共同的RDD,供后面的RDD计算时,反复使用

第二,公共RDD一定要实现持久化 北方吃饺子,现包现煮。你人来了,要点一盘饺子。馅料+饺子皮+水->包好的饺子,对包好的饺子去煮,煮开了以后,才有你需要的熟的,热腾腾的饺子。 现实生活中,饺子现包现煮,当然是最好的了;但是Spark中,RDD要去“现包现煮”,那就是一场致命的灾难。 对于要多次计算和使用的公共RDD,一定要进行持久化。 持久化,也就是说,将RDD的数据缓存到内存中/磁盘中,(BlockManager),以后无论对这个RDD做多少次计算,那么都是直接取这个RDD的持久化的数据,比如从内存中或者磁盘中,直接提取一份数据。

第三,持久化,是可以进行序列化的 如果正常将数据持久化在内存中,那么可能会导致内存的占用过大,这样的话,也许,会导致OOM内存溢出。 当纯内存无法支撑公共RDD数据完全存放的时候,就优先考虑,使用序列化的方式在纯内存中存储。将RDD的每个partition的数据,序列化成一个大的字节数组,就一个对象;序列化后,大大减少内存的空间占用。 序列化的方式,唯一的缺点就是,在获取数据的时候,需要反序列化。 如果序列化纯内存方式,还是导致OOM,内存溢出;就只能考虑磁盘的方式,内存+磁盘的普通方式(无序列化)。 内存+磁盘,序列化

第四,为了数据的高可靠性,而且内存充足,可以使用双副本机制,进行持久化 持久化的双副本机制,持久化后的一个副本,因为机器宕机了,副本丢了,就还是得重新计算一次;持久化的每个数据单元,存储一份副本,放在其他节点上面;从而进行容错;一个副本丢了,不用重新计算,还可以使用另外一份副本。 这种方式,仅仅针对你的内存资源极度充足

4.广播大变量

广播变量,初始的时候,就在Drvier上有一份副本。 task在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor对应的BlockManager中,尝试获取变量副本;如果本地没有,那么就从Driver远程拉取变量副本,并保存在本地的BlockManager中;此后这个executor上的task,都会直接使用本地的BlockManager中的副本。 executor的BlockManager除了从driver上拉取,也可能从其他节点的BlockManager上拉取变量副本,举例越近越好。

每个 Executor一个副本,不一定每个节点。

5.使用Kryo序列化

内存占用,网络传输

1、算子函数中使用到的外部变量

2、持久化RDD时进行序列化,StorageLevel.MEMORY_ONLY_SER

3、shuffle

1、算子函数中使用到的外部变量,使用Kryo以后:优化网络传输的性能,可以优化集群中内存的占用和消耗

2、持久化RDD,优化内存的占用和消耗;持久化RDD占用的内存越少,task执行的时候,创建的对象,就不至于频繁的占满内存,频繁发生GC。

3、shuffle:可以优化网络传输的性能

bbg

使用时,要自定义注册类哦



6.使用 fastutil


7.数据本地化的等待时长

Spark在Driver上,对Application的每一个stage的task,进行分配之前,都会计算出每个task要计算的是哪个分片数据,RDD的某个partition;Spark的task分配算法,优先,会希望每个task正好分配到它要计算的数据所在的节点,这样的话,就不用在网络间传输数据; 但是呢,通常来说,有时,事与愿违,可能task没有机会分配到它的数据所在的节点,为什么呢,可能那个节点的计算资源和计算能力都满了;所以呢,这种时候,通常来说,Spark会等待一段时间,默认情况下是3s钟(不是绝对的,还有很多种情况,对不同的本地化级别,都会去等待),到最后,实在是等待不了了,就会选择一个比较差的本地化级别,比如说,将task分配到靠它要计算的数据所在节点,比较近的一个节点,然后进行计算。 但是对于第二种情况,通常来说,肯定是要发生数据传输,task会通过其所在节点的BlockManager来获取数据,BlockManager发现自己本地没有数据,会通过一个getRemote()方法,通过TransferService(网络数据传输组件)从数据所在节点的BlockManager中,获取数据,通过网络传输回task所在节点。 对于我们来说,当然不希望是类似于第二种情况的了。最好的,当然是task和数据在一个节点上,直接从本地executor的BlockManager中获取数据,纯内存,或者带一点磁盘IO;如果要通过网络传输数据的话,那么实在是,性能肯定会下降的,大量网络传输,以及磁盘IO,都是性能的杀手。


二.JVM调优

JVM调优的第一个点:降低cache操作的内存占比 spark中,堆内存又被划分成了两块儿,一块儿是专门用来给RDD的cache、persist操作进行RDD数据缓存用的;另外一块儿,就是我们刚才所说的,用来给spark算子函数的运行使用的,存放函数中自己创建的对象。 默认情况下,给RDD cache操作的内存占比,是0.6,60%的内存都给了cache操作了。但是问题是,如果某些情况下,cache不是那么的紧张,问题在于task算子函数中创建的对象过多,然后内存又不太大,导致了频繁的minor gc,甚至频繁full gc,导致spark频繁的停止工作。性能影响会很大。 针对上述这种情况,大家可以在之前我们讲过的那个spark ui。yarn去运行的话,那么就通过yarn的界面,去查看你的spark作业的运行统计,很简单,大家一层一层点击进去就好。可以看到每个stage的运行情况,包括每个task的运行时间、gc时间等等。如果发现gc太频繁,时间太长。此时就可以适当调价这个比例。 降低cache操作的内存占比,大不了用persist操作,选择将一部分缓存的RDD数据写入磁盘,或者序列化方式,配合Kryo序列化类,减少RDD缓存的内存占用;降低cache操作内存占比;对应的,算子函数的内存占比就提升了。这个时候,可能,就可以减少minor gc的频率,同时减少full gc的频率。对性能的提升是有一定的帮助的。

一句话,让task执行算子函数时,有更多的内存可以使用。

spark.storage.memoryFraction,0.6 -> 0.5 -> 0.4 -> 0.2

--conf spark.yarn.executor.memoryOverhead=2048 spark-submit脚本里面,去用--conf的方式,去添加配置;一定要注意!!!切记,不是在你的spark作业代码中,用new SparkConf().set()这种方式去设置,不要这样去设置,是没有用的!一定要在spark-submit脚本中去设置。 spark.yarn.executor.memoryOverhead(看名字,顾名思义,针对的是基于yarn的提交模式) 默认情况下,这个堆外内存上限大概是300多M;后来我们通常项目中,真正处理大数据的时候,这里都会出现问题,导致spark作业反复崩溃,无法运行;此时就会去调节这个参数,到至少1G(1024M),甚至说2G、4G 通常这个参数调节上去以后,就会避免掉某些JVM OOM的异常问题,同时呢,会让整体spark作业的性能,得到较大的提升。

http://blog.csdn.net/hammertank/article/details/48346285

此时呢,就会没有响应,无法建立网络连接;会卡住;ok,spark默认的网络连接的超时时长,是60s;如果卡住60s都无法建立连接的话,那么就宣告失败了。 碰到一种情况,偶尔,偶尔,偶尔!!!没有规律!!!某某file。一串file id。uuid(dsfsfd-2342vs--sdf--sdfsd)。not found。file lost。 这种情况下,很有可能是有那份数据的executor在jvm gc。所以拉取数据的时候,建立不了连接。然后超过默认60s以后,直接宣告失败。 报错几次,几次都拉取不到数据的话,可能会导致spark作业的崩溃。也可能会导致DAGScheduler,反复提交几次stage。TaskScheduler,反复提交几次task。大大延长我们的spark作业的运行时间。

实际案例脚本:

/usr/local/spark/bin/spark-submit \

--class com.ibeifeng.sparkstudy.WordCount \

--num-executors 80 \

--driver-memory 6g \

--executor-memory 6g \

--executor-cores 3 \

--master yarn-cluster \

--queue root.default \

--conf spark.yarn.executor.memoryOverhead=2048 \

--conf spark.core.connection.ack.wait.timeout=300 \

/usr/local/spark/spark.jar \

三.Shuffle调优

  1. new SparkConf().set("spark.shuffle.consolidateFiles", "true") 开启shuffle map端输出文件合并的机制;默认情况下,是不开启的,就是会发生如上所述的大量map端输出文件的操作,严重影响性能。

  2. 增大map端溢写的内存缓冲空间,减少溢写次数。  spark.shuffle.file.buffer ,32k--》 64k

  3. 增大reduce端聚合内存,减少读写次数 spark.shuffle.memoryFraction ,0.2--》0.3 尝试性的增加

  4. new SparkConf().set(" spark.shuffle.manager", " hash") :hash(默认)sort(可以排序)tungsten-sort钨丝(1.5版本后才有,不稳定)


四.spark操作调优(算子调优)

1.MapPartitions替代map操作,不过看具体操作,因为Maprtition容易导致OOM哦!

2.filter之后,数据容易倾斜,采用coalesce算子。主要就是用于在filter操作之后,针对每个partition的数据量各不相同的情况,来压缩partition的数量。减少partition的数量,而且让每个partition的数据量都尽量均匀紧凑。 从而便于后面的task进行计算操作,在某种程度上,能够一定程度的提升性能。

3.foreachPartition替代foreach,例如数据库连接操作的时候,是非常好的。在实际生产环境,都是用这个,但数据量特别大,会有oom的可能。

4.repartition,SparkSQL的初始stage受限于hdfs的block数量限制。repartition算子,你用Spark SQL这一步的并行度和task数量,肯定是没有办法去改变了。但是呢,可以将你用Spark SQL查询出来的RDD,使用repartition算子,去重新进行分区,此时可以分区成多个partition,比如从20个partition,分区成100个。

5.reduceBykey,map 端本地聚合。


转载于:https://blog.51cto.com/coollast/1889258

spark 应用程序性能优化经验相关推荐

  1. spark 应用程序性能优化:12 个优化方法

    1. 优化? Why? How? When? What? "spark 应用程序也需要优化?",很多人可能会有这个疑问,"不是已经有代码生成器,执行优化器,pipelin ...

  2. Hibernate程序性能优化的考虑要点

    Hibernate程序性能优化的考虑要点 Tag:java,j2ee,hibernate,O/R mappling, spring,性能,效率 本文依照HIBERNATE帮助文档,一些网络书籍及项目经 ...

  3. 我的 .NET Core 博客性能优化经验总结

    点击上方蓝字关注"汪宇杰博客" 导语 去年8月,我用 .NET Core 重写了我的博客系统.经过一年多的优化,服务器响应速度从上线时候的 80ms 提高到了现在的 8ms,十倍提 ...

  4. 开源即巅峰,《Java程序性能优化实战》GitHub三小时标星已超34k

    蓦然回首自己做开发已经十年了,这十年中我获得了很多,技术能力.培训.出国.大公司的经历,还有很多很好的朋友.但再仔细一想,这十年中我至少浪费了五年时间,这五年可以足够让自己成长为一个优秀的程序员,可惜 ...

  5. 滴滴架构师被迫离职后,只留下这份731页Java程序性能优化手册

    滴滴资深架构师深度分享Java程序性能优化的宝贵经验,从软件设计.编码和JVM等维度阐述性能优化的方法和技巧. 总览: 篇幅限制,这里就不全部展示出来了.需要获取完整版Java程序性能优化手册的小伙伴 ...

  6. [python]用profile协助程序性能优化

    本文最初发表于恋花蝶的博客http://blog.csdn.net/lanphaday,欢迎转载,但请务必保留原文完整,并保留本声明. [python]用profile协助程序性能优化 上帝说:&qu ...

  7. C++性能优化(一)——应用程序性能优化简介

    一.程序性能优化简介 1.程序性能优化简介 在计算机发展的早期阶段,硬件资源相对而言是非常昂贵的,CPU运行时间与内存容量给程序开发人员设置了极大限制.因此,早期的程序对运行性能和内存空间占用的要求是 ...

  8. 欢乐互娱庞池海:《龙之谷》项目性能优化经验分享

    欢乐互娱庞池海:<龙之谷>项目性能优化经验分享 在5月12日,UNITY 2017案例分享专场上,欢乐互娱技术引擎开发工程师娱庞池海分享了<龙之谷>项目性能优化经验.以下为分享 ...

  9. 网易视频云:游戏开发性能优化经验总结

    网易视频云是网易倾力打造的一款基于云计算的分布式多媒体处理集群和专业音视频技术,为客户提供稳定流畅.低时延.高并发的视频直播.录制.存储.转码及点播等音视频的PaaS服务.在线教育.远程医疗.娱乐秀场 ...

最新文章

  1. switch能使用的数据类型有6种
  2. 16625篇论文揭示25年来AI进化规律!深度学习时代行将结束!
  3. python安装以及版本检测
  4. 17. OD-带有多态、变形的程序进行打补丁去掉nag(分析xor加密解密、自身修改代码的程序)
  5. Binary Tree Zigzag Level Order Traversal
  6. 面试题 计算机安全,XX计算机信息安全工程师面试题路由.doc
  7. CodeForces - 1612A Distance
  8. 大数据面试3分钟自我介绍_如何在面试时,做好三分钟自我介绍
  9. android手机版本为什么打不开,手机打不开,不能打开usb调试,如何刷机
  10. 项目管理相关的考试认证及证书价值介绍
  11. 计算机操作系统 第五版 答案,操作系统-第5版-习题答案.doc
  12. PyAudio库简介
  13. 家庭宽带上网_03_IP城域网
  14. 运行bat时隐藏cmd窗口的方法(bat隐藏窗口 隐藏运行bat文件)
  15. Meta元宇宙副总裁离职了...『Go语言圣经』终于汉化啦;德云社失业警告!AI要说相声了;一键就能AI绘图的网站;前沿论文 | ShowMeAI资讯日报
  16. Freeze the Discriminator a Simple Baseline for Fine-Tuning GANs
  17. 阿里面试官鬼得很,问我为什么他们阿里要禁用Executors创建线程池?
  18. 使用fsck修复文件系统
  19. Color类颜色对照表
  20. F2FS 基础知识二

热门文章

  1. Python fire官方文档教学(自动生成命令行,个人觉得意义不大,不如argparse)
  2. windows10双系统安装ubuntu18.04
  3. Elizabeth Taylor【伊丽莎白·泰勒】
  4. 【洛谷习题】填涂颜色
  5. FreeRTOS队列集
  6. 那些辞职考公的程序员,最后都怎么样了?
  7. 阿里最后一面,高并发下如何设计一个秒杀系统?
  8. 张小龙:做 PC 版微信是一种破坏,本来不想做
  9. Java 14 发布了,再也不怕NullPointerException 了!?
  10. 127.0.0.1和0.0.0.0地址的区别 | 文末送书