下面的这篇文章,从14年9月11日被抄袭,一直抄到2020年+

[1]里面有这么一句话:

稳定性方面,由于代码质量问题,Spark长时间运行会经常出错,在架构方面,由于大量数据被缓存在内存中,Java垃圾回收缓慢的现象严重,导致Spark的性能不稳定,在复杂场景SQL的性能甚至不如现有的Map/Reduce。

这句话直接被抄到了 知乎上面。

那么这句话怎么回事呢?

个人理解,因为spark初期使用的版本中,数据运算时spark占用了太多的内存,

由于工程师没有对剩余内存做好余量预算,运算过程中内存资源没有及时释放,导致内存爆了,从而流传出了"spark不稳定"的说法。

上述问题, 根据[2],项目Project Tachyon正在解决这个问题。

#############################################################################################################################################

Facebook Spark 的使用情况

在介绍下面文章之前我们来看看 Facebook 的 Spark 使用情况:

如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

•Spark 是 Facebook 内部最大的 SQL 查询引擎(按 CPU 使用率计算)•在存储计算分离的集群上使用 Spark,也就是存储和计算资源可以单独扩展升级。•考虑到 Facebook 的规模,效率是 Spark 的首要任务,主要包括以下两个效率:•计算效率:优化CPU和内存使用,CPU 的 40% 时间花在读写上。•存储效率:优化磁盘大小和IOPS:存储格式对磁盘占用大小和 IOPS 有很大影响,Facebook 数据仓库底层存储格式使用的是 ORC。

扩展 Spark Driver
动态资源分配
在 Facebook,Spark 集群启用了动态资源分配(Dynamic Executor Allocation),以便更好的使用集群资源,而且在 Facebook 内部,Spark 是运行在多租户的集群上,所以这个也是非常合适的。比如典型的配置如下:•spark.dynamicAllocation.enabled = true•spark.dynamicAllocation.executorIdleTimeout = 2m•spark.dynamicAllocation.minExecutors = 1•spark.dynamicAllocation.maxExecutors = 2000

如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

多线程事件处理
在 Spark 2.3 版本之前,事件处理是单线程的架构,也就是说,事件队列里面的事件得一个一个处理。如果你的作业很大,并且有很多 tasks,很可能会导致事件处理出现延迟,进一步导致作业性能出现问题,甚至使当前作业失败。为了解决这个问题,SPARK-18838 这个 ISSUE 引入了多线程事件处理架构,每个事件都有其单独的单线程 executor service 去处理,这样就可以大大减少事件处理延时的问题。另外,由于每类事件都有单独的事件队列,所以会增加 Driver 端的内存占用。

如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

更好的 Fetch 失败处理
在 Spark 2.3 版本之前,如果 Spark 探测到 fetch failure,那么它会把产生这个 shuffle 文件的 Executor 移除掉。但是如果这个 Executor 所在的机器有很多 Executor,而且是因为这台机器挂掉导致 fetch failure,那么会导致很多的 fetch 重试,这种处理机制很低下。SPARK-19753 这个 ISSUE 使得 Spark 可以把上述场景所有 Executor 的 shuffle 文件移除,也就是不再去重试就知道 shuffle 文件不可用。

如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

另外,Spark 最大 Fetch 重试次数也可以通过 spark.max.fetch.failures.per.stage 参数配置。

FetchFailed 会在 ShuffleReader 取数据失败 N 次后抛出,然后由 executor 通过 statusUpdate 传到 driver 端,实际的处理会在 DAGScheduler.handleTaskCompletion,它会重新提交该 Stage 和该 Stage 对应的 ShuffleMapStage,重试次数超过 spark.stage.maxConsecutiveAttempts 时会退出。

RPC 服务线程调优
当 Spark 同时运行大量的 tasks 时,Driver 很容易出现 OOM,这是因为在 Driver 端的 Netty 服务器上产生大量 RPC 的请求积压,我们可以通过加大 RPC 服务的线程数解决 OOM 问题,比如 spark.rpc.io.serverThreads = 64。

扩展 Spark Executor
在介绍 Spark Executor 调优之前,我们需要知道 Spark Executor 的内存模型,如下:

如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

默认情况下,Spark 仅仅使用了堆内内存。Executor 端的堆内内存区域大致可以分为以下四大块:

•Execution 内存:主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据•Storage 内存:主要用于存储 spark 的 cache 数据,例如RDD的缓存、unroll数据;•用户内存(User Memory):主要用于存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息。•预留内存(Reserved Memory):系统预留内存,会用来存储Spark内部对象。这块内容的详细介绍可以参见过往记忆大数据这篇文章:Apache Spark 统一内存管理模型详解。

启用堆外内存

或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

Spark 1.6 开始引入了Off-heap memory(详见SPARK-11389)。这种模式不在 JVM 内申请内存,而是调用 Java 的 unsafe 相关 API 进行诸如 C 语言里面的 malloc() 直接向操作系统申请内存,由于这种方式不经过 JVM 内存管理,所以可以避免频繁的 GC,这种内存申请的缺点是必须自己编写内存申请和释放的逻辑。

默认情况下,堆外内存是关闭的,我们可以通过 spark.memory.offHeap.enabled 参数启用,并且通过 spark.memory.offHeap.size 设置堆外内存大小,单位为字节。如果堆外内存被启用,那么 Executor 内将同时存在堆内和堆外内存,两者的使用互补影响,这个时候 Executor 中的 Execution 内存是堆内的 Execution 内存和堆外的 Execution 内存之和,同理,Storage 内存也一样。相比堆内内存,堆外内存只区分 Execution 内存和 Storage 内存。大家可以根据自己的情况启用堆外内存,进一步减少 GC 的压力。

垃圾收集调优
Spark 内部会分配大量的连续内存缓存,如果对象大小超过32MB (G1GC 的最大区域大小),那么由于大量的分配,G1GC 会遭受碎片问题。所以建议在 Spark 中使用 parallel GC 而不是 G1GC,一个典型的配置如下:

spark.executor.extraJavaOptions = -XX:ParallelGCThreads=4 -XX:+UseParallelGC

Shuffle 调优
Spark Shuffle 是 Spark 的核心部分,下面简单介绍了Sort Shuffle 的过程,主要包括 Sort & Spill to disk,并在磁盘中保存成临时的 spill files,最后合并成 Final shuffle files,并保存在磁盘中。

如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

Shuffle file buffer 调优
我们都知道,磁盘访问比内存访问慢10 - 100K倍,所以我们可以适当调节 Shuffle buffer 的大小,涉及到的参数如:spark.shuffle.file.buffer = 1 MB,该参数用于设置 shuffle write task 的 BufferedOutputStream 的 buffer 缓冲大小。将数据写到磁盘文件之前,会先写入 buffer 缓冲中,待缓冲写满之后,才会溢写到磁盘。如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少 shuffle write 过程中溢写磁盘文件的次数,也就可以减少磁盘 IO 次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。spark.unsafe.sorter.spill.reader.buffer.size 参数也可以适当加大。

另外,Spark 在 Shuffle 的时候也会采用 lz4 (通过 spark.io.compression.codec 参数配置)来压缩文件,而默认的压缩块大小为 32kb,这不是最优的,所以我们可以适当调节 spark.io.compression.snappy.blockSize 参数。比如在 Facebook 这个参数设置为 spark.io.compression.lz4.blockSize = 512KB 最多减少 20% 的 Shuffle/spill 文件。

更多关于 Shuffle 的调优可以参见过往记忆大数据 Spark性能优化:shuffle调优 文章。

扩展 External Shuffle Service
在 Shuffle Server 上缓存索引文件
在 Apache Spark 2.1.0 之前,通过 Shuffle service 获取索引文件时并没有缓存起来,如果访问大量的临时文件时, Shuffle service 会成为瓶颈。SPARK-15074 对索引文件引入了缓存,减少了对索引文件的请求。我们可以通过 spark.shuffle.service.index.cache.entries 参数来设置最多可以缓存索引的个数。

如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

shuffle service 工作线程和 backlog 调优
spark.shuffle.io.serverThreads = 128
spark.shuffle.io.backLog = 8192
Configurable shuffle registration timeout and retry
参见 SPARK-20640

spark.shuffle.registration.timeout = 2m
spark.shuffle.registration.maxAttempts = 5
应用程序调优
这个动机是在资源相同的情况下减少作业的延迟。我们可以根据表的输入大小计算出 mapper 和 reduce 的个数,如下:

// number of mappers
num of mappers = max(256MB, inputTableSize / 50000)
 
 
// number of reducers 
num of reducers  = max(200, min(10000, max(inputTableSize / 256MB * 0.125, 200)))

猜你喜欢

1、刚刚晋升为 Apache 顶级项目的 Hudi 如何在数据湖上玩转增量处理

2、Apache Spark 在eBay 的优化

3、Elasticsearch如何做到亿级数据查询毫秒级返回?

4、大数据平台架构设计没思路?来看这篇就知道了!

################################################################################################################################################

Reference:

[1]星环:如何构建企业级Hadoop/Spark分析平台

[2]Spark特点及缺点?

为什么说spark不稳定相关推荐

  1. 使用spark.streaming.kafka.consumer.poll.ms和reconnect.backoff.ms解决spark streaming消费kafka时任务不稳定的问题

    问题描述 在用spark streaming程序消费kafka的数据时,遇到了一个神奇的现象:同样的数据量.相似的数据,在消费时,有些批次的数据在做map操作时神奇的多了40多秒,具体看下面的数据:在 ...

  2. Spark Streaming实践和优化

    2019独角兽企业重金招聘Python工程师标准>>> Spark Streaming实践和优化 博客分类: spark 在流式计算领域,Spark Streaming和Storm时 ...

  3. Spark shuffle调优

    Spark shuffle是什么 Shuffle在Spark中即是把父RDD中的KV对按照Key重新分区,从而得到一个新的RDD.也就是说原本同属于父RDD同一个分区的数据需要进入到子RDD的不同的分 ...

  4. 从Storm和Spark 学习流式实时分布式计算的设计

    转自:http://www.dataguru.cn/thread-341168-1-1.html 流式实时分布式计算系统在互联网公司占有举足轻重的地位,尤其在在线和近线的海量数据处理上.而处理这些海量 ...

  5. Apache Spark Jobs 性能调优

    当你开始编写 Apache Spark 代码或者浏览公开的 API 的时候,你会遇到各种各样术语,比如transformation,action,RDD(resilient distributed d ...

  6. spark Tachyon

    该论文来自Berkeley实验室,英文标题为:Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cl ...

  7. Spark Streaming学习笔记

    特点: Spark Streaming能够实现对实时数据流的流式处理,并具有很好的可扩展性.高吞吐量和容错性. Spark Streaming支持从多种数据源提取数据,如:Kafka.Flume.Tw ...

  8. 【转载】Apache Spark Jobs 性能调优(二)

    调试资源分配   Spark 的用户邮件邮件列表中经常会出现 "我有一个500个节点的集群,为什么但是我的应用一次只有两个 task 在执行",鉴于 Spark 控制资源使用的参数 ...

  9. Spark _20 _Spark_Shuffle调优

    SparkShuffle调优配置项如何使用? 在代码中,不推荐使用,硬编码. new SparkConf().set("spark.shuffle.file.buffer",&qu ...

最新文章

  1. Android SharedPreferences 的使用
  2. python饼图怎么显示中文_python生成饼图解决中文乱码
  3. Python导出Excel图表
  4. nodejs全局安装和本地安装的区别
  5. 实验二 网络嗅探与欺骗
  6. SQL Server 2012安装图解
  7. shiro学习(21):动态添加验证规则1
  8. Nginx学习总结(12)——Nginx各项配置总结
  9. 【GIS教程】土地利用转移矩阵、土地利用面积变化
  10. 免费好用的十二个SCADA组态软件
  11. win10 如何启用虚拟化 Hyper-V
  12. eclipse官方下载安装、JDK官方下载安装和环境变量配置
  13. 【问链财经-区块链基础知识系列】 第四十三课 区块链+保险,最全最深入的设计思路就在这儿了!
  14. reporting service odbc mysql_Reporting Services
  15. array()、range() 和 arange()函数的区别和用法
  16. 最简单的24点游戏c语言,C语言解24点游戏程序
  17. 学习笔记-Windows 安全
  18. Java web学习日志第一天
  19. 安装activex控件(64位机器MSComm32)
  20. 阿里云网站ICP备案一定要知道的东西

热门文章

  1. 【EntityFramework 6.1.3】个人理解与问题记录
  2. SpringBoot使用velocity模板引擎
  3. iOS中安全结束 子线程 的方法
  4. linq to sql (Group By/Having/Count/Sum/Min/Max/Avg操作符)
  5. 庐山真面-Oxite的HelloWorld
  6. 如何限制SELECT-OPTIONS的选择屏幕的OPTION
  7. kotlin android获取按钮,Kotlin 实现按钮点击跳转监听事件方式
  8. LeetCode-38 报数
  9. react 常用规范和经验
  10. 单行文字垂直居中,多行文字垂直居中