它会强制管理存储(storage)和执行(execution)之间的内存使用

# 记录用了多少 storage memory 和 execution memory

# 申请 storage、execution 和 unroll memory

# 释放 storage 和 execution memory

execution memory: 是指 shuffles,joins,sorts 和 aggregation 的计算操作

storage memory:是指persist或者cache缓存数据到内存等

unroll memory: 则是指展开block本身就需要耗费内存,好比打开文件,打开文件我们是需要耗费内存的

MemoryManager根据spark.memory.useLegacyMode这个配置项决定你是否使用遗留的MemoryManager策略即StaticMemoryManager。默认是不使用StaticMemoryManager,而是UnifiedMemoryManager。

一 MemoryManager

1.1 核心属性

Int numCores: 核数

Long onHeapStorageMemory:堆内storage 内存大小

Long onHeapExecutionMemory: 堆内execution内存大小

StorageMemoryPool onHeapStorageMemoryPool:创建堆内storage内存池

StorageMemoryPool offHeapStorageMemoryPool:创建堆外storage内存池

ExecutionMemoryPool onHeapExecutionMemoryPool:创建堆内execution内存池

ExecutionMemoryPool offHeapExecutionMemoryPool:创建堆外execution内存池

Long maxOffHeapMemory: 最大的对外内存大小,可以由spark.memory.offHeap.size配置,如果要配置必须启用了才可以生效spark.memory.offHeap.enabled

Long maxOnHeapStorageMemory: 最大的堆内storage内存大小

Long maxOffHeapStorageMemory 最大的堆外storage内存大小

1.2 重要方法

# 释放numBytes字节的执行内存

defreleaseExecutionMemory(
    numBytes: Long,
    taskAttemptId: Long,
    memoryMode: MemoryMode): Unit = synchronized {
  memoryMode match{
    case MemoryMode.ON_HEAP=> onHeapExecutionMemoryPool.releaseMemory(numBytes,taskAttemptId)
    case MemoryMode.OFF_HEAP=> offHeapExecutionMemoryPool.releaseMemory(numBytes,taskAttemptId)
  }
}

# 释放指定task的所有execution内存

private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)
}

# 释放N字节存储内存

def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {memoryMode match {case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes)case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes)}
}

# 释放所有存储内存

final def releaseAllStorageMemory(): Unit = synchronized {onHeapStorageMemoryPool.releaseAllMemory()offHeapStorageMemoryPool.releaseAllMemory()
}

二 StaticMemoryManager

Executor的内存界限分明,分别由3部分组成:execution,storage和system。对各部分内存静态划分好后便不可变化

# executor:execution内存大小通过设置spark.shuffle.memoryFraction参数来控制大小,默认为0.2。

为了避免shuffle,join,排序和聚合这些操作直接将数据写入磁盘,所设置的buffer大小,减少了磁盘读写的次数。

#storage: storage内存大小通过设置spark.storage.memoryFraction参数来控制大小,默认为0.6。

用于存储用户显示调用的persist,cache,broadcast等命令存储的数据空间。

#system:程序运行需要的空间,存储一些spark内部的元数据信息,用户的数据结构,避免一些不寻常的大记录带来的OOM。

这种划分方式,在某些时候可能会带来一定的资源浪费,比如我对cache或者persist没啥要求,那么storage的内存就剩余了

由于很多属性都继承了父类MemoryManager,在这里不做赘述。

# maxUnrollMemory

最大的block展开内存空间,默认是占用最大存储内存的20%

private val maxUnrollMemory: Long = {(maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}

# 申请分配storage内存,注意StaticMemoryManager不支持堆外内存

override def acquireStorageMemory(blockId: BlockId,numBytes: Long,memoryMode: MemoryMode): Boolean = synchronized {require(memoryMode != MemoryMode.OFF_HEAP,"StaticMemoryManager does not support off-heap storage memory")// 要申请的空间大小超过最大的storage内存,肯定失败if (numBytes > maxOnHeapStorageMemory) {// Fail fast if the block simply won't fitlogInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +s"memory limit ($maxOnHeapStorageMemory bytes)")false} else {// 调用StorageMemoryPool分配numBytes字节内存onHeapStorageMemoryPool.acquireMemory(blockId, numBytes)}
}

# acquireUnrollMemory 用于申请展开block的内存

override def acquireUnrollMemory(blockId: BlockId, numBytes: Long,memoryMode: MemoryMode): Boolean = synchronized {require(memoryMode != MemoryMode.OFF_HEAP,"StaticMemoryManager does not support off-heap unroll memory")// 当前storage内存用于展开block的所需要内存val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory// 当前storage中获取空闲的内存val freeMemory = onHeapStorageMemoryPool.memoryFree// 判断还剩余的可用于展开block的内存-还剩余的内存,如果小于或者等0表示不够分配了,没有空闲内存可供分配val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
}

# acquireExecutionMemory申请执行内存

override def acquireExecutionMemory(numBytes: Long,taskAttemptId: Long,memoryMode: MemoryMode): Long = synchronized {memoryMode match {case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)}
}

# getMaxStorageMemory 返回有效的最大的storage内存空间

private def getMaxStorageMemory(conf: SparkConf): Long = {// 系统最大内存内存val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)// 内存占用比例val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)// 安全比例val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)(systemMaxMemory * memoryFraction * safetyFraction).toLong
}

# getMaxExecutionMemory 返回最大的execution内存空间

private def getMaxExecutionMemory(conf: SparkConf): Long = {// 系统内存空间val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)// 如果系统内存空间 < 最小内存空间,抛出异常if (systemMaxMemory < MIN_MEMORY_BYTES) {throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " +s"option or spark.driver.memory in Spark configuration.")}// 如果指定了执行内存空间if (conf.contains("spark.executor.memory")) {// 获取执行执行内存空间val executorMemory = conf.getSizeAsBytes("spark.executor.memory")// 如果执行内存空间 < 最小内存,抛出异常if (executorMemory < MIN_MEMORY_BYTES) {throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +s"--executor-memory option or spark.executor.memory in Spark configuration.")}}// shuffle内存占用比例val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)// shuffle内存安全比例val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)(systemMaxMemory * memoryFraction * safetyFraction).toLong
}

三UnifiedMemoryManager

由于传统的StaticMemoryManager存在资源浪费问题,所以引入了这个MemoryManager。UnifiedMemoryManager管理机制淡化了execution空间和storage空间的边界,让它们之间可以相互借内存。

它们总共可用的内存由spark.memory.fraction决定,默认0.6.可使用的堆内存比例 * 可使用的内存。在该空间内部,对execution和storage进行了进一步的划分。由spark.memory.storageFraction决定

# 计算最大的存储内存

计算最大的存储内存 = 最大内存 - 最大执行内存
override def maxOnHeapStorageMemory: Long = synchronized {maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
}

# 计算最大的堆外存储内存

计算最大的堆外存储内存 = 最大堆外内存 - 最大堆外执行内存
override def maxOffHeapStorageMemory: Long = synchronized {maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed
}

# acquireExecutionMemory 申请执行内存

override private[memory] def acquireExecutionMemory(numBytes: Long, taskAttemptId: Long,memoryMode: MemoryMode): Long = synchronized {assertInvariants()assert(numBytes >= 0)// 跟据不同内存模式,构建不同的组件和初始值val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {case MemoryMode.ON_HEAP => (onHeapExecutionMemoryPool,onHeapStorageMemoryPool,onHeapStorageRegionSize,maxHeapMemory)case MemoryMode.OFF_HEAP => (offHeapExecutionMemoryPool,offHeapStorageMemoryPool,offHeapStorageMemory,maxOffHeapMemory)}// 通过回收缓存的block,会增加执行内存,从而存储内存量就占用内存量减少了// 当为task申请内存的实时呢,执行内存需要多次尝试,每一次尝试可能都会回收一些存储内存def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {// 如果需要申请的内存大于0if (extraMemoryNeeded > 0) {// 计算execution 可以借到的storage的内存,应该是storage的空闲内存空间和可借出的内存较大者val memoryReclaimableFromStorage = math.max(storagePool.memoryFree,// storage的空闲内存空间storagePool.poolSize - storageRegionSize) // 可借出的内存// 如果可以借到内存if (memoryReclaimableFromStorage > 0) {// 减小pool大小,释放一些内存空间val spaceToReclaim = storagePool.freeSpaceToShrinkPool(math.min(extraMemoryNeeded, memoryReclaimableFromStorage))storagePool.decrementPoolSize(spaceToReclaim)executionPool.incrementPoolSize(spaceToReclaim)}}}// 计算存储内存占用的内存和存储def computeMaxExecutionPoolSize(): Long = {maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)}executionPool.acquireMemory(numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
}
// 申请分配存储内存
override def acquireStorageMemory(blockId: BlockId,numBytes: Long,memoryMode: MemoryMode): Boolean = synchronized {assertInvariants()assert(numBytes >= 0)// 跟据不同内存模式,构建不同的组件和初始值vval (executionPool, storagePool, maxMemory) = memoryMode match {case MemoryMode.ON_HEAP => (onHeapExecutionMemoryPool,onHeapStorageMemoryPool,maxOnHeapStorageMemory)case MemoryMode.OFF_HEAP => (offHeapExecutionMemoryPool,offHeapStorageMemoryPool,maxOffHeapMemory)}// 如果要申请的内存空间大于最大内存空间,直接返回falseif (numBytes > maxMemory) {logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +s"memory limit ($maxMemory bytes)")return false}// 如果要申请的内存空间比当前storage剩余空间多,不够用则去向execution借if (numBytes > storagePool.memoryFree) {// 表示没有足够内存,需要从执行缓存借一些数据,增加storage内存,缩小execution内存val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)executionPool.decrementPoolSize(memoryBorrowedFromExecution)storagePool.incrementPoolSize(memoryBorrowedFromExecution)}storagePool.acquireMemory(blockId, numBytes)
}

# acquireStorageMemory 申请分配存储内存

override def acquireStorageMemory(blockId: BlockId,numBytes: Long,memoryMode: MemoryMode): Boolean = synchronized {assertInvariants()assert(numBytes >= 0)// 跟据不同内存模式,构建不同的组件和初始值vval (executionPool, storagePool, maxMemory) = memoryMode match {case MemoryMode.ON_HEAP => (onHeapExecutionMemoryPool,onHeapStorageMemoryPool,maxOnHeapStorageMemory)case MemoryMode.OFF_HEAP => (offHeapExecutionMemoryPool,offHeapStorageMemoryPool,maxOffHeapMemory)}// 如果要申请的内存空间大于最大内存空间,直接返回falseif (numBytes > maxMemory) {logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +s"memory limit ($maxMemory bytes)")return false}// 如果要申请的内存空间比当前storage剩余空间多,不够用则去向execution借if (numBytes > storagePool.memoryFree) {// 表示没有足够内存,需要从执行缓存借一些数据,增加storage内存,缩小execution内存val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)executionPool.decrementPoolSize(memoryBorrowedFromExecution)storagePool.incrementPoolSize(memoryBorrowedFromExecution)}storagePool.acquireMemory(blockId, numBytes)
}

# getMaxMemory 返回最大的内存

private def getMaxMemory(conf: SparkConf): Long = {// 获取系统内存val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)// 获取预留的内存val reservedMemory = conf.getLong("spark.testing.reservedMemory",if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)// 最小的系统内存val minSystemMemory = (reservedMemory * 1.5).ceil.toLong// 如果系统内存 < 最小的系统内存,抛出异常if (systemMemory < minSystemMemory) {throw new IllegalArgumentException(s"System memory $systemMemory must " +s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +s"option or spark.driver.memory in Spark configuration.")}// 如果指定了executor内存if (conf.contains("spark.executor.memory")) {// 获取executor内存val executorMemory = conf.getSizeAsBytes("spark.executor.memory")// 如果executor内存 < 最小的系统内存抛出异常if (executorMemory < minSystemMemory) {throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +s"$minSystemMemory. Please increase executor memory using the " +s"--executor-memory option or spark.executor.memory in Spark configuration.")}}// 系统内存 - 预留的系统内存 = 可使用的内存val usableMemory = systemMemory - reservedMemory// 可使用的JVM堆内存比例,默认60%val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)// 返回可使用的堆内存比例 * 可使用的内存(usableMemory * memoryFraction).toLong
}

Spark源码分析之MemoryManager相关推荐

  1. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  2. Spark 源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 一. 启动篇 (一) 引子 在spark-shell终端执行 val arr = Array(1,2,3,4) val rdd ...

  3. Spark源码分析之九:内存管理模型

    Spark是现在很流行的一个基于内存的分布式计算框架,既然是基于内存,那么自然而然的,内存的管理就是Spark存储管理的重中之重了.那么,Spark究竟采用什么样的内存管理模型呢?本文就为大家揭开Sp ...

  4. spark 源码分析之十八 -- Spark存储体系剖析

    本篇文章主要剖析BlockManager相关的类以及总结Spark底层存储体系. 总述 先看 BlockManager相关类之间的关系如下: 我们从NettyRpcEnv 开始,做一下简单说明. Ne ...

  5. Spark源码分析之Sort-Based Shuffle读写流程

    一 概述 我们知道Spark Shuffle机制总共有三种: # 未优化的Hash Shuffle:每一个ShuffleMapTask都会为每一个ReducerTask创建一个单独的文件,总的文件数是 ...

  6. spark 源码分析 Blockmanager

    原文链接 参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD  Spark的特点就是可以将RDD cache在memo ...

  7. Spark源码分析 – DAGScheduler

    DAGScheduler的架构其实非常简单, 1. eventQueue, 所有需要DAGScheduler处理的事情都需要往eventQueue中发送event 2. eventLoop Threa ...

  8. spark 源码分析之二十 -- Stage的提交

    引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...

  9. spark 源码分析之十九 -- DAG的生成和Stage的划分

    上篇文章 spark 源码分析之十八 -- Spark存储体系剖析 重点剖析了 Spark的存储体系.从本篇文章开始,剖析Spark作业的调度和计算体系. 在说DAG之前,先简单说一下RDD. 对RD ...

最新文章

  1. 升级macOS新系统后,Xcode7.2 Xcode7.3.1 在新建项目界面,出现了文字错乱的情况
  2. 计算机伺服系统的作用是,伺服系统简介及其在数控加工中的应用
  3. php times33,PHP Hash算法:Times33算法代码实例
  4. 英特尔为 Kubernetes 推出分布式深度学习平台:Nauta
  5. java 原子类能做什么_死磕 java原子类之终结篇(面试题)
  6. Android反编译工具dex2jar的使用
  7. 神经网络学习的几种方法
  8. ​网红拉姆之死,这辈子最看不起这种男人!
  9. invalid suffix on literal; C++11 requires a space between literal and string macro [-Wliteral-suffix
  10. 自己动手画一个CPU——Logisim,下
  11. 微信注册验证成功之后不跳转_微信小号怎么申请(绑定了微信的手机号怎么注册新的微信)...
  12. JSHOP2学习1:环境配置(超详细教程)
  13. 删除UltraISO(软碟通)卸载后的遗留文件“isoshl64.dll”
  14. Qgis教程07:矢量数据属性编辑
  15. 写了一个Mac快速设置、打开和关闭Web代理的Shell命令
  16. 计算机应用评估与反馈,《计算机应用基础》(计算机应用基础教学效果评价研究)...
  17. 2022年十一届认证杯B题
  18. skipping incompatible xxxx.a when searching for -lxxx问题的解决
  19. 不稳定就是人生常态,要坚信未来一定会非常美好
  20. 学习css3,使用代码实现一根心爱的二踢脚

热门文章

  1. 多线程是并行还是并发_并发,并行,线程,进程,异步和同步有相关性吗?
  2. 第二篇:稳定性之如何有条不紊地应对风险?
  3. Deep Learning of Binary Hash Codes for Fast Image Retrieval(代码跑通了)
  4. 计算机不能检测到第二个屏幕,Win10检测不到第二个显示器怎么办?Win10第二个显示器不能识别解决方法...
  5. php渲染nodejs api,nodejs通过响应回写渲染页面步骤详解
  6. Octave入门基础
  7. 城市代码表_从零开始做一个SLG游戏(六)游戏系统以及配置表
  8. php7 mysql json 小程序_微信小程序JSON数组递交PHP服务端解析处理
  9. 使用邻接矩阵实现有向图最短路径Dijkstra算法
  10. os.path.realpath(__file__) 得到错误路径,与解决方案