背景以及现象

本文基于 spark 3.1.2
设置spark.driver.memory=2g
在调试spark sql任务的时候,发现有任务产生了200多个exchange,而且任务长期运行不出来。

分析

运行对应的sql(多个连续的join操作,且join的key都不一样),得到如下的物理计划(我们只截取了一部分):

和之前的文章spark task过多导致任务运行过慢甚至超时 做法一样(对应的内存都是调优完后的镜像信息),三步曲如下:
用jstat -gcutil查看一下对应的gc情况,如下:

  S0     S1     E      O      M     CCS    YGC     YGCT    FGC    FGCT     GCT0.00 100.00  96.35  75.69  91.07  93.57    761    8.476    37    1.073    9.548
100.00   0.00  28.08  76.17  91.07  93.57    762    8.509    37    1.073    9.581
100.00   0.00  66.38  76.17  91.07  93.57    762    8.509    38    1.148    9.6560.00  89.11  15.98  60.79  91.04  93.46    763    8.529    38    1.148    9.6760.00  89.11  67.15  60.79  91.04  93.46    763    8.529    38    1.148    9.67653.32   0.00   0.00  60.79  91.04  93.46    764    8.536    38    1.148    9.68353.32   0.00  32.68  60.79  91.04  93.46    764    8.536    38    1.148    9.68353.32   0.00  66.07  60.79  91.04  93.46    764    8.536    38    1.148    9.68353.32   0.00  98.66  60.79  91.04  93.46    764    8.536    38    1.148    9.6830.00  90.36  25.66  60.79  91.04  93.46    765    8.543    38    1.148    9.6910.00  90.36  59.88  60.79  91.04  93.46    765    8.543    38    1.148    9.6910.00  90.36  95.83  60.79  91.04  93.46    765    8.543    38    1.148    9.69197.11   0.00  25.22  61.08  91.04  93.46    766    8.555    38    1.148    9.702

用jmap -heap 命令查看一下对应的堆情况:

Heap Configuration:MinHeapFreeRatio         = 40MaxHeapFreeRatio         = 70MaxHeapSize              = 6442450944 (6144.0MB)NewSize                  = 172621824 (164.625MB)MaxNewSize               = 523436032 (499.1875MB)OldSize                  = 345374720 (329.375MB)NewRatio                 = 2SurvivorRatio            = 8MetaspaceSize            = 21807104 (20.796875MB)CompressedClassSpaceSize = 1073741824 (1024.0MB)MaxMetaspaceSize         = 17592186044415 MBG1HeapRegionSize         = 0 (0.0MB)Heap Usage:
New Generation (Eden + 1 Survivor Space):capacity = 155385856 (148.1875MB)used     = 125461056 (119.64898681640625MB)free     = 29924800 (28.53851318359375MB)80.74161910849853% used
Eden Space:capacity = 138149888 (131.75MB)used     = 108225088 (103.21148681640625MB)free     = 29924800 (28.53851318359375MB)78.33888942421727% used
From Space:capacity = 17235968 (16.4375MB)used     = 17235968 (16.4375MB)free     = 0 (0.0MB)100.0% used
To Space:capacity = 17235968 (16.4375MB)used     = 0 (0.0MB)free     = 17235968 (16.4375MB)0.0% used
concurrent mark-sweep generation:capacity = 1930588160 (1841.15234375MB)used     = 1187447176 (1132.437873840332MB)free     = 743140984 (708.714469909668MB)61.50701639027974% used

再次 我们用jmap -dump:format=b,file=heapdump.hprof命令dump内存的堆信息,我们分析一下,用MAT打开,我们可以看到如下的信息:


可以看到SQLAppStatusListener这个对象占用的内存达到了500多M,而且只是在任务的一开始,后续很长一段时间,该内存会增加,且会持续占用。
我们可以稍微分析一下SQLAppStatusListener这个类:

class SQLAppStatusListener(conf: SparkConf,kvstore: ElementTrackingStore,live: Boolean) extends SparkListener with Logging {

该类继承自SparkListener,并且该类会在
SharedState中会被初始化:

val statusStore: SQLAppStatusStore = {val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]val listener = new SQLAppStatusListener(conf, kvStore, live = true)sparkContext.listenerBus.addToStatusQueue(listener)val statusStore = new SQLAppStatusStore(kvStore, Some(listener))sparkContext.ui.foreach(new SQLTab(statusStore, _))statusStore}

SQLAppStatusListener 会被加到数据总线中,也就是说所有的event的事件都会被接受,只不过可以自己进行过滤。
而最终改listener存储的状态都会被ui给展示出来。

SQLAppStatusListener存储的组件最主要的也是stageMetrics,也是在内存中占用比较多的对象。

查看stageMetrics被调用的地方,主要是在onStageSubmitted方法,onExecutorMetricsUpdate方法,onExecutionEnd方法中:

override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {if (!isSQLStage(event.stageInfo.stageId)) {return}// Reset the metrics tracking object for the new attempt.Option(stageMetrics.get(event.stageInfo.stageId)).foreach { stage =>if (stage.attemptId != event.stageInfo.attemptNumber) {stageMetrics.put(event.stageInfo.stageId,new LiveStageMetrics(event.stageInfo.stageId, event.stageInfo.attemptNumber,stage.numTasks, stage.accumIdsToMetricType))}}}...private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {val SparkListenerSQLExecutionEnd(executionId, time) = eventOption(liveExecutions.get(executionId)).foreach { exec =>exec.completionTime = Some(new Date(time))update(exec)// Aggregating metrics can be expensive for large queries, so do it asynchronously. The end// event count is updated after the metrics have been aggregated, to prevent a job end event// arriving during aggregation from cleaning up the metrics data.kvstore.doAsync {exec.metricsValues = aggregateMetrics(exec)removeStaleMetricsData(exec)exec.endEvents.incrementAndGet()update(exec, force = true)}}}private def removeStaleMetricsData(exec: LiveExecutionData): Unit = {// Remove stale LiveStageMetrics objects for stages that are not active anymore.val activeStages = liveExecutions.values().asScala.flatMap { other =>if (other != exec) other.stages else Nil}.toSetstageMetrics.keySet().asScala.filter(!activeStages.contains(_)).foreach(stageMetrics.remove)}

这里只截取了onStageSubmitted方法和onExecutionEnd方法,
因为onStageSubmitted这是在有stage提交的时候,spark会发出SparkListenerJobStart事件,这会往stageMetrics写入对应的信息。
而onExecutionEnd方法是在executor被移除的时候,spark会发出SparkListenerSQLExecutionEnd事件,这个时候会清除stageMetrics对应的信息。

所以说在stage很多的情况下(也就是exchange很多的情况下),stageMetrics会存储大量的信息,而这种情况下,executor会被长期占用而得不到释放,
所以导致了driver端内存持续增加。

结论以及解决方法

所以在这种情况下,如果业务上改变不了,我们就得增加内存,在笔者的情况下,增加driver内存到6g就能很好的解决,且运行的速度很顺畅。

spark.driver.memory=6g

spark shuffle(ExchangeExec)过多导致任务运行过慢甚至超时相关推荐

  1. Electron+Vue3+Vite+Element-Plus,保持软后台全速运行(解决循环过多导致的界面不刷新问题,保证窗口失去焦点后setTimeOut可用)

    文章目录 Electron+Vue3+Vite+Element-Plus,保持软后台全速运行(解决循环过多导致的界面不刷新问题,保证窗口失去焦点后setTimeOut可用) 问题描述 问题一 大循环界 ...

  2. Spark Shuffle运行原理

    1.什么是spark shuffle? Shuffle中文意思就是"洗牌",在Spark中Shuffle的目的是为了保证每一个key所对应的value都会汇聚到同一个分区上去聚合和 ...

  3. Spark Shuffle原理解析

    Spark Shuffle原理解析 一:到底什么是Shuffle? Shuffle中文翻译为"洗牌",需要Shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节 ...

  4. Spark Shuffle两种Manager

    文章目录 前言 hashShuffleManager 1.普通机制 缺陷 2.合并机制-优化 sortShuffleManager 1.普通机制 2.byPass机制 总结: 前言 reduceByK ...

  5. spark shuffle 内幕彻底解密

    一:到底什么是Shuffle? Shuffle中文翻译为"洗牌",需要Shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节点上进行计算. 二:Shuffle可 ...

  6. Spark shuffle机制演进史及原理说明(sort-based/hash-based/bypassShuffleManager)

    spark shuffle 演进的历史 Spark 0.8及以前 Hash Based Shuffle Spark 0.8.1 为Hash Based Shuffle引入File Consolidat ...

  7. Spark shuffle调优

    Spark shuffle是什么 Shuffle在Spark中即是把父RDD中的KV对按照Key重新分区,从而得到一个新的RDD.也就是说原本同属于父RDD同一个分区的数据需要进入到子RDD的不同的分 ...

  8. 阿里云Spark Shuffle的优化

    转自:大数据技术与架构 本次分享者:辰石,来自阿里巴巴计算平台事业部EMR团队技术专家,目前从事大数据存储以及Spark相关方面的工作. Spark Shuffle介绍 Smart Shuffle设计 ...

  9. Spark(Shuffle)

    2019独角兽企业重金招聘Python工程师标准>>> Shuffle Shuffle是Spark对各分区的数据进行重新分布的机制,是一个复杂而且代价较高的操作, 因为一般需要在执行 ...

最新文章

  1. 文档模型(JSON)使用介绍
  2. aws dynamodb_使用适用于Java 2的AWS开发工具包的AWS DynamoDB版本字段
  3. 双非同学,自学编程,毕业一年逆袭百度!
  4. node.js之require
  5. ISV客户博客系列:iVoteSports通过Windows Azure扩展它的面向棒球的移动游戏应用程序...
  6. JAVA无法加载此类文件,ORA-00376: 此时无法读取文件问题处理
  7. 从5个函数带你理解K8s DeltaFIFO
  8. Java中含有泛型的 JSON 反序列化问题
  9. 2022年全网首发|大数据专家级技能模型与学习指南(胜天半子篇)
  10. 阿里云大学生领取免费ECS服务器——测试题答案
  11. js表单提交enter触发提交
  12. 60、JAVA的map集合
  13. Python去除小数点后面多余的0
  14. 关于周报的写法和原则
  15. 蚂蚁金服旗下网商银行招聘了
  16. 深度学习的loss变小梯度是否变小
  17. LeetCode 28. 找出字符串中第一个匹配项的下标 -- 字符串编码成数字匹配
  18. [044] 微信公众平台开发教程第20篇-新手解惑40则
  19. Nuget的使用说明
  20. python学习笔记 os.scandir遍历目录

热门文章

  1. 行会最大上限是多少人?如何提升人数?
  2. 蓝桥杯单片机学习过程记录(二十九)第八届国赛超声波测距机
  3. 英语不好计算机好学吗,英语很差能学计算机专业吗
  4. 全年GMV突破40亿元关口,为何如涵仍难造出第二个“张大奕”?
  5. MediaPlayer详解
  6. Mysql之统计函数
  7. java box类定义三变量_01.类的成员变量:\n设计一个立方体类Box,定义三个属性,分别是长,宽,高。定义二个方法,分别计...
  8. iOS删除项目中未使用的图片资源
  9. SIC8833芯片开发厨房电子秤方案
  10. c语言控制树莓派蓝牙,方法2-树莓派3B蓝牙rfcomm通信调试