1. 优化? Why? How? When? What?

“spark 应用程序也需要优化?”,很多人可能会有这个疑问,“不是已经有代码生成器,执行优化器,pipeline 什么的了的吗?”。是的,spark 的确是有一些列强大的内置工具,让你的代码在执行时更快。但是,如果一切都依赖于工具,框架来做的话,我想那只能说明两个问题:1. 你对这个框架仅仅是知其然,而非知其所以然;2. 看来你也只是照葫芦画瓢而已,没了你,别人也可以轻轻松松的写这样一个 spark 应用程序,so you are replaceable;

在做 spark 应用程序的优化的时候,从下面几个点出发就够了:

  • 为什么:因为你的资源有限,因为你的应用上生产环境了会有很多不稳定的因素,在上生产前做好优化和测试是唯一一个降低不稳定因素影响的办法;
  • 怎么做:web ui + log 是做优化的倚天剑和屠龙刀,能掌握好这两点就可以了;
  • 何时做:应用开发成熟时,满足业务要求时,就可以根据需求和时间安排开始做了;
  • 做什么:一般来说,spark 应用程序 80% 的优化,都是集中在三个地方:内存,磁盘io,网络io。再细点说,就是 driver,executor 的内存,shuffle 的设置,文件系统的配置,集群的搭建,集群和文件系统的搭建[e.g 尽量让文件系统和集群都在一个局域网内,网络更快;如果可以,可以让 driver 和 集群也在一个局域网内,因为有时候需要从 worker 返回数据到 driver]
  • 备注:千万不要一心想着优化都从程序本身入手,虽然大多数时候都是程序自己的原因,但在入手检查程序之前最好先确认所有的 worker 机器情况都正常哦。比如说机器负载,网络情况。

下面这张图来自 databricks 的一个分享 Tuning and Debugging Apache Spark,很有意思,说得非常对啊,哈哈。

OK,下面我们来看看一些常见的优化方法。

2. repartition and coalesce

原文:

 
  1. Spark provides the `repartition()` function, which shuffles the data

  2. across the network to create a new set of partitions. Keep in mind

  3. that repartitioning your data is a fairly expensive operation. Spark

  4. also has an optimized version of `repartition()` called `coalesce()`

  5. that allows avoiding data movement, but only if you are decreasing

  6. the number of RDD partitions. To know whether you can safely call

  7. coalesce(), you can check the size of the RDD using `rdd.partitions.size()`

  8. in Java/Scala and `rdd.getNumPartitions()` in Python and make sure

  9. that you are coalescing it to fewer partitions than it currently has.

总结:当要对 rdd 进行重新分片时,如果目标片区数量小于当前片区数量,那么用coalesce,不要用repartition。关于partition 的更多优化细节,参考chapter 4 of Learning Spark

3. Passing Functions to Spark

In Python, we have three options for passing functions into Spark.

  • lambda expressions
    word = rdd.filter(lambda s: "error" in s)
  • top-level functions
 
  1. import my_personal_lib

  2. word = rdd.filter(my_personal_lib.containsError)

  • locally defined functions
 
  1. def containsError(s):

  2. return "error" in s

  3. word = rdd.filter(containsError)

One issue to watch out for when passing functions is inadvertently serializing the object containing the function. When you pass a function that is the member of an object, or contains references to fields in an object (e.g., self.field), Spark sends the entire object to worker nodes, which can be much larger than the bit of information you need. Sometimes this can also cause your program to fail, if your class contains objects that Python can’t figure out how to pickle.

 
  1. ### wrong way

  2. class SearchFunctions(object):

  3. def __init__(self, query):

  4. self.query = query

  5. def isMatch(self, s):

  6. return self.query in s

  7. def getMatchesFunctionReference(self, rdd):

  8. # Problem: references all of "self" in "self.isMatch"

  9. return rdd.filter(self.isMatch)

  10. def getMatchesMemberReference(self, rdd):

  11. # Problem: references all of "self" in "self.query"

  12. return rdd.filter(lambda x: self.query in x)

  13. ### the right way

  14. class WordFunctions(object):

  15. ...

  16. def getMatchesNoReference(self, rdd):

  17. # Safe: extract only the field we need into a local variable

  18. query = self.query

  19. return rdd.filter(lambda x: query in x)

4. worker 的资源分配:cpu, memroy, executors

这个话题比较深,而且在不同的部署模式也不一样 [standalone, yarn, mesos],这里给不了什么建议。唯一的一个宗旨是,不要一昧考虑把所有资源都独立给到 spark 来用,要考虑到机器本身的一些进程,spark 依赖的一些进程,网络情况,任务情况 [计算密集,IO密集,long-live task]等。

这里只能推荐一些 video,slide 和 blog,具体情况具体分析,以后我遇到资源调优的时候再把实际案例发出来。

  • Top 5 Mistakes When Writing Spark Applications

5. shuffle block size limitation

No Spark shuffle block can be greater than 2 GB — spark shuffle 里的 block size 不能大于2g

Spark 使用一个叫 ByteBuffer 的数据结构来作为 shuffle 数据的缓存,但这个ByteBuffer 默认分配的内存是 2g,所以一旦 shuffle 的数据超过 2g 的时候,shuflle 过程会出错。影响 shuffle 数据大小的因素有以下常见的几个:

  • partition 的数量,partition 越多,分布到每个 partition 上的数据越少,越不容易导致 shuffle 数据过大;
  • 数据分布不均匀,一般是 groupByKey 后,存在某几个 key 包含的数据过大,导致该 key 所在的 partition 上数据过大,有可能触发后期 shuflle block 大于 2g;

一般解决这类办法都是增加 partition 的数量,Top 5 Mistakes When Writing Spark Applications 这里说可以预计让每个 partition 上的数据为 128MB 左右,仅供参考,还是需要具体场景具体分析,这里只把原理讲清楚就行了,并没有一个完美的规范。

  • sc.textfile 时指定一个比较大的 partition number
  • spark.sql.shuffle.partitions
  • rdd.repartition
  • rdd.coalesce

TIPS:

在 partition 小于 2000 和大于 2000 的两种场景下,Spark 使用不同的数据结构来在 shuffle 时记录相关信息,在 partition 大于 2000 时,会有另一种更高效 [压缩] 的数据结构来存储信息。所以如果你的 partition 没到 2000,但是很接近 2000,可以放心的把 partition 设置为 2000 以上。

 
  1. def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {

  2. if (uncompressedSizes.length > 2000) {

  3. HighlyCompressedMapStatus(loc, uncompressedSizes)

  4. } else {

  5. new CompressedMapStatus(loc, uncompressedSizes)

  6. }

  7. }

6. level of parallel - partition

先来看看一个 stage 里所有 task 运行的一些性能指标,其中的一些说明:

  • Scheduler Delay: spark 分配 task 所花费的时间
  • Executor Computing Time: executor 执行 task 所花费的时间
  • Getting Result Time: 获取 task 执行结果所花费的时间
  • Result Serialization Time: task 执行结果序列化时间
  • Task Deserialization Time: task 反序列化时间
  • Shuffle Write Time: shuffle 写数据时间
  • Shuffle Read Time: shuffle 读数据所花费时间

而这里要说的 level of parallel,其实大多数情况下都是指 partition 的数量,partition 数量的变化会影响上面几个指标的变动。我们调优的时候,很多时候都会看上面的指标变化情况。当 partition 变化的时候,上面几个指标变动情况如下:

  • partition 过小[容易引入 data skew 问题]

    • Scheduler Delay: 无明显变化
    • Executor Computing Time: 不稳定,有大有小,但平均下来比较大
    • Getting Result Time: 不稳定,有大有小,但平均下来比较大
    • Result Serialization Time: 不稳定,有大有小,但平均下来比较大
    • Task Deserialization Time: 不稳定,有大有小,但平均下来比较大
    • Shuffle Write Time: 不稳定,有大有小,但平均下来比较大
    • Shuffle Read Time: 不稳定,有大有小,但平均下来比较大
  • partition 过大
    • Scheduler Delay: 无明显变化
    • Executor Computing Time: 比较稳定,平均下来比较小
    • Getting Result Time: 比较稳定,平均下来比较小
    • Result Serialization Time: 比较稳定,平均下来比较小
    • Task Deserialization Time: 比较稳定,平均下来比较小
    • Shuffle Write Time: 比较稳定,平均下来比较小
    • Shuffle Read Time: 比较稳定,平均下来比较小

那应该怎么设置 partition 的数量呢?这里同样也没有专门的公式和规范,一般都在尝试几次后有一个比较优化的结果。但宗旨是:尽量不要导致 data skew 问题,尽量让每一个 task 执行的时间在一段变化不大的区间之内。

7. data skew

大多数时候,我们希望的分布式计算带来的好处应该是像下图这样的效果:

但是,有时候,却是下面这种效果,这就是所谓的 data skew。即数据没有被 大致均匀 的分布到集群中,这样对一个 task 来说,整个 task 的执行时间取决于第一个数据块被处理的时间。在很多分布式系统中,data skew 都是一个很大的问题,比如说分布式缓存,假设有 10 台缓存机器,但有 50% 的数据都落到其中一台机器上,那么当这台机器 down 掉之后,整个缓存的数据就会丢掉一般,缓存命中率至少 [肯定大于] 降低 50%。这也是很多分布式缓存中要引入一致性哈希,要引入虚拟节点 vnode 的原因。

一致性哈希原理图:

回到正题,在 spark 中如何解决 data skew 的问题?首先明确这个问题的发生场景和根源:一般来说,都是 (key, value) 型数据中,key 的分布不均匀,这种场景比较常见的方法是把 key 进行 salt 处理 [不知道 salt 中文应该怎么说],比如说原来有 2 个 key (key1, key2),并且 key1 对应的数据集很大,而 key2 对应的数据集相对较小,可以把 key 扩张成多个 key (key1-1, key1-2, …, key1-n, key2-1, key2-2, …, key2-m) ,并且保证 key1-* 对应的数据都是原始key1 对应的数据集上划分而来的,key2-* 上对应的数据都是原始的key2 对应的数据集上划分而来。这样之后,我们有m+n 个 key,而且每个 key 对应的数据集都相对较小,并行度增加,每个并行程序处理的数据集大小差别不大,可以大大提速并行处理效率。在这两个个分享里都有提到这种方法:

  • Top 5 Mistakes When Writing Spark Applications
  • Sparkling: Speculative Partition of Data for Spark Applications - Peilong Li

8. avoid cartesian operation

rdd.cartesian 操作很耗时,特别是当数据集很大的时候,cartesian 的数量级都是平方级增长的,既耗时也耗空间。

 
  1. >>> rdd = sc.parallelize([1, 2])

  2. >>> sorted(rdd.cartesian(rdd).collect())

  3. [(1, 1), (1, 2), (2, 1), (2, 2)]

9. avoid shuffle when possible

spark 中的 shuffle 默认是把上一个 stage 的数据写到 disk 上,然后下一个 stage 再从 disk 上读取数据。这里的磁盘 IO 会对性能造成很大的影响,特别是数据量大的时候。

10. use reduceByKey instead of GroupByKey when possible

11. use treeReduce instead of reduce when possible

12. use Kryo serializer

spark 应用程序中,在对 RDD 进行 shuffle 和 cache 时,数据都是需要被序列化才可以存储的,此时除了 IO 外,数据序列化也可能是应用程序的瓶颈。这里推荐使用 kryo 序列库,在数据序列化时能保证较高的序列化效率。

 
  1. sc_conf = SparkConf()

  2. sc_conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

13. Next

这些都是一些实际实践中的经验和对一些高质量分享的总结[大多数是来自那些高质量分享],里面可能有说得不完全正确的地方,在未来亲自实践,调试过后会再有一篇性能调试的 blog 的,本篇仅供参考哦。下一次,我们来看看怎么统一部署和配置 spark 的 cluster,那的确几乎来自个人实践经验了。

参考文章

  • chapter 4 of Learning Spark
  • chapter 8 of Learning Spark
  • Top 5 Mistakes When Writing Spark Applications
  • Databricks Spark Knowledge Base
  • Sparkling: Speculative Partition of Data for Spark Applications - Peilong Li
  • Fighting the skew in Spark
  • Tuning and Debugging Apache Spark
  • Tuning Spark
  • Avoid GroupByKey

原文链接:http://litaotao.github.io/boost-spark-application-performance?utm_source=tuicool&utm_medium=referral

spark 应用程序性能优化:12 个优化方法相关推荐

  1. spark 应用程序性能优化经验

    一 常规性能调优 1 . 分配更多资源 --num-executors 3 \  配置executor的数量 --driver-memory 100m \  配置driver的内存(影响不大) --e ...

  2. Python-OpenCV 杂项(三): 程序性能的检测和优化

    在图像处理的中每秒钟都要进行大量的运算,所以对程序的要求不仅要能给出正确的结果,同时还需要快. 0x00. 获取程序执行时间 cv2.getTickCount 函数返回从参考点到这个函数被执行的时钟数 ...

  3. Python-OpenCV 杂项(二)(三): 鼠标事件、 程序性能的检测和优化

    0x00. 查看鼠标事件 下面的方法可以查看OpenCV支持的所有鼠标事件: import cv2 events=[i for i in dir(cv2) if 'EVENT'in i] print ...

  4. 深入理解计算机系统第五章------优化程序性能

    课程网站 引言 编写高效程序的要点: 1,选择适当的算法和数据结构:2,编写出编译器能够有效优化以转换成高效可执行代码的源代码:3,将计算量特别大的任务分解成多个部分,使其可以在多核或者多处理器上并行 ...

  5. Project Tungsten:让Spark将硬件性能压榨到极限

     Project Tungsten:让Spark将硬件性能压榨到极限 摘要:对于Spark来说,通用只是其目标之一,更好的性能同样是其赖以生存的立足之本.北京时间4月28日晚,Databricks ...

  6. 调整Apache Spark应用程序的Java垃圾收集

    调整Apache Spark应用程序的Java垃圾收集 王道远黄洁 由王道远和黄杰 发表于公司博客 2015年5月28日 这是来自英特尔SSG STO大数据技术小组的朋友的客座文章. 来源地址: ht ...

  7. 多个goruntine 性能变慢_提高 JavaScript 性能的 12 个技巧

    作者丨Liz Parody 在创建 Web 应用程序时应始终考虑性能.为了帮助你开始,本文列举了有效提高应用程序性能的 12 种方法. 性能是创建网页或应用程序时最重要的一个方面.没有人想要应用程序崩 ...

  8. WP7 App性能优化(12):检测应用程序性能(Ⅴ)

    透视填充率示例 该示例展示了平面投影.透视转换对应用程序性能的影响.设计师创建的XAML经常会包含很多的透视转换用以创建漂亮的视觉效果,但是这会影响应用程序的性能.该示例演示了透视和动画的缓存行为.不 ...

  9. Spark程序性能优化之persist()

    Spark的RDD Persistence,是一个重要的能力,可以将中间结果保存,提供复用能力,加速基于中间结果的后续计算,经常可以提高10x以上的性能.在PySpark的DataFrame中同样适用 ...

最新文章

  1. 15DOM之获取元素方法
  2. VALSE 2020来了!60位计算机视觉、模式识别领域的优秀青年学者齐聚一堂
  3. html标签 marquee 滚动
  4. MONGODB 数据的存储顺序发现不是按_ID 的顺序存储的原因
  5. 数博会重磅活动:第二届大数据科学与工程国际会议议程
  6. Redis如何实现刷抖音不重复-布隆过滤器(Bloom Filter)
  7. SAP License:SD与COPA集成
  8. 每天生成 45 亿字符的 GPT-3,真能取代人工?
  9. python中不同进制的整数之间可以直接运算_python学习第三天总结(进制与位运算,)...
  10. ROS机器人操作系统资料与资讯(2018年6月)
  11. 卡诺模型案例分析_设计师必修课:KANO 模型的讲解与案例分析
  12. R语言-缺失值判断以及处理
  13. debian上虚拟机安装系统
  14. 手把手教你App推广时如何能找到100个以上渠道!
  15. 利用JavaScript实现BMI指数计算
  16. 2023年全国最新工会考试精选真题及答案51
  17. linux操作系统下 c语言编程入门
  18. 电脑管家怎么关闭右键深度加速(小火箭)功能
  19. 如何形成统一设计风格 - 实践篇
  20. {转}tbl语言简介

热门文章

  1. Android 自动向上滚动,android – Recyclerview在插入数据时自动向上滚动
  2. xavier初始化_深入解读xavier初始化(附源码)
  3. 手工搭建APACHE服务
  4. linux性能测试 瓶颈,性能测试——瓶颈分析方法
  5. java有效索引范围_java – 索引范围的上限始终假定为独占?
  6. union和union all有什么区别_Django基础(29):select_related和prefetch_related的用法与区别...
  7. java map判断是否有键_检查Java HashMap中是否存在给定键
  8. python中的常量_Python中的变量和常量
  9. java仿聊天室项目总结_Java团队课程设计-socket聊天室(个人总结)
  10. mysql 排序 删除_是否可以删除mysql表排序规则?