本文从源码角度分析spark统一内存管理的实现原理。

统一内存管理对象的创建

统一内存管理对象在SparkEnv中进行创建和管理,这样内存管理就在Driver和Executor端中都可以使用。在SparkEnv的create函数中,创建内存管理对象的实现代码如下:

   val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)  val memoryManager: MemoryManager =      if (useLegacyMemoryManager) {        new StaticMemoryManager(conf, numUsableCores)      } else { // spark2默认使用统一内存管理模式,所以执行这里        UnifiedMemoryManager(conf, numUsableCores)      }

从以上代码片段可知,使用静态内存管理还是统一内存管理,是由参数:spark.memory.useLegacyMode决定的。从spark-2.0开始默认都是使用统一内存管理,一般不会修改该参数。

所以,一般情况下,默认会创建统一内存管理:UnifiedMemoryManager对象。这几个对象之间的关系,如图1所示:

图1 内存管理对象和SparkContext

统一内存管理初始化

在创建统一内存管理对象时,会进行初始化操作。为了便于管理和分配内存,在初始化初始化时会把内存分成几个部分:预留内存,用户内存,执行和存储内存。

统一内存管理对象初始化时的主要步骤如下:

(1)计算JVM可用的最大内存,保存在变量:systemMemory中,默认从参数spark.testing.memory获取值但一般不设置,所以会获取:Runtime.getRuntime.maxMemory的值。

(2)计算需要预留的内存数:reservedMemory,先取参数:spark.testing.reservedMemory的值,但一般不设置,此时使用默认值:300M。

(3)计算系统使用内存的最小值,它是预留内存的1.5倍,也就是:minSystemMemory=reservedMemory * 1.5,若系统使用内存比这个值小:systemMemory < minSystemMemory,则报错:请增加spark.driver.memory的值。

(4)获取executor的内存值:val executorMemory = conf.getSizeAsBytes("spark.executor.memory"),若executorMemory < minSystemMemory,则报错:请增加spark.executor.memory的值。

(5)计算系统可用内存的总量,系统内存-预留内存,得到spark可以使用的总内存:usableMemory = systemMemory - reservedMemory

(6)计算任务执行和存储可用内存总量。计算公式是:usableMemory * memoryFraction。其中memoryFraction是一个小数,是配置项spark.memory.fraction的值,默认值是0.6。

(7)最大可用内存已经计算出来了,此时可以创建UnifiedMemoryManager对象了,代码如下:

    new UnifiedMemoryManager(      conf,      maxHeapMemory = maxMemory,      onHeapStorageRegionSize =        (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,      numCores = numCores)

从创建统一内存管理对象的代码中可以看出,默认情况下任务的执行内存和存储内存是个占50%。可以通过参数spark.memory.storageFraction来调整执行内存和存储内存的占比。

完成统一内存初始化后,内存的划分情况如图2所示:

图2 统一内存初始化内存分布

统一内存管理的实现

前面已经说明,统一内存管理是在UnifiedMemoryManager类中实现的。下面我们来分析统一内存管理的实现逻辑。

该类的声明如下:

 private[spark] class UnifiedMemoryManager private[memory] (    conf: SparkConf,    val maxHeapMemory: Long,    onHeapStorageRegionSize: Long,    numCores: Int)

统一内存管理为spark提供了灵活使用内存的机制。它把一块大的可使用的内存分成执行内存和存储内存。执行内存主要被Executor在执行任务时使用,而存储内存主要用来存储数据块。

该类的成员变量说明如下

  • onHeapStorageRegionSize堆内内存区的大小,以字节为单位。该内存区不是静态保留的; 执行器可以在必要时进行借用。仅当实际存储内存使用量超过此区域时,才能清除缓存块。

  • maxHeapMemory最大可用堆内存。该成员变量是通过函数getMaxMemory计算而来的,具体的计算方法见下面的分析。

  • numCores核数。

获取执行内存

在执行当前任务内存不足时会需要申请执行内存。申请内存的过程可能会向存储内存池(StorageMemoryPool)借用一部分内存,并把这部分内存添加到执行内存池(ExecutionMemoryPool)中。能够向存储内存池借用内存必须满足以下条件之一:

(1)存储池的空闲内存大于0;

(2)存储是否已经借用了执行池的内存。通过:存储内存池目前的大小减去初始化设置的存储内存池的大小是否大于0来进行判断,也就是计算storagePool.poolSize - storageRegionSize是否大于0。若大于0(已借用)表示可以分配。

在借用存储内存时,可能会把存储池中的内存释放一部分,若这部分内存的rdd设置了useDisk级别,还会把这些内存的数据写入磁盘,否则,这些内存中的存储数据就丢失了。

内存块的释放是在MemoryStore对象中完成(后面的文章会详细分析这实现),官方文档中提到过,释放老的内存块的算法是LRU(最近最少使用),这是由于在MemoryStore中内存块是以LinkedHashMap的结构组织的,在链表的头部就是“最近最少使用”的内存块。这部分内容在分析MemoryStore的实现时再继续讲解。

下面分析获取执行内存操作的实现逻辑。

acquireExecutionMemory函数

在统一内存管理中实现获取执行内存的函数是:acquireExecutionMemory。该函数的原型如下:

   override private[memory] def acquireExecutionMemory(      numBytes: Long,      taskAttemptId: Long,      memoryMode: MemoryMode): Long = synchronized {...}

该函数尝试为目前的执行任务获取numBytes执行内存。对于该函数需要注意以下几点:

(1)它尝试获取numBytes字节大小的内存,返回能够获取的字节数,若返回0,则表示无法分配内存;

(2)它是同步函数,所以当有多个任务调用该函数时可能会阻塞,直到有足够的内存,这样做是为了在把数据进行持久化之前,让每个任务都有机会获取到1/2N的内存(其中N是运行的任务数)。

(3)当老的任务占用很多内存,而新任务数又不断增加时,阻塞就可能会发生。

实现逻辑

获取执行内存操作的实现逻辑如下:

(1)根据参数memoryMode的值来选择操作:若是堆内模式(ON_HEAP),获取堆内的执行和存储池总量和堆内可用存储内存总量,以及总的堆内内存大小。若是堆外模式(OFF_HEAP),获取堆外的执行和存储池总量和堆外可用存储内存总量,以及总的堆外内存大小。这一步的代码实现如下:

     val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {      // 堆内模式      case MemoryMode.ON_HEAP => (        onHeapExecutionMemoryPool,        onHeapStorageMemoryPool,        onHeapStorageRegionSize,        maxHeapMemory)      // 堆外模式      case MemoryMode.OFF_HEAP => (        offHeapExecutionMemoryPool,        offHeapStorageMemoryPool,        offHeapStorageMemory,        maxOffHeapMemory)    }

(2)判断是否需要增加执行内存池(ExecutionPool)。当执行内存池中空闲内存量小于需要申请的内存量时,则会尝试增加执行池。尝试增加执行池的过程,本质上就是向存储池StorageMemoryPool借用内存的过程。能够成功借用存储池的内存,需要满足以下两个条件之一:1)存储池有空闲内存;2)存储池的量大于初始化的量。(也就是说,已经向执行内存池借用了一些内存,存储池大小增加了)

另外,这个过程可能执行多次,每次尝试都必须能够获取到一些内存,可能会清除掉一些内存中的数据块,以防其他任务在缓存大的数据块和清除数据之间进行反复。那么,为什么每次只能清除一些内存呢?这是因为在MemoryStore中,内存是以MemoryEntry对象来组织和管理的,清理时也是以这个为单位进行的,而每个这样的对象的大小是不同的。

尝试增加执行内存池大小的实现代码如下:

 def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {      if (extraMemoryNeeded > 0) {        // 可以分配内存的条件:1.存储池有空闲内存 或 2.存储池已经借用了执行池的内存        val memoryReclaimableFromStorage = math.max(          storagePool.memoryFree,          storagePool.poolSize - storageRegionSize)                if (memoryReclaimableFromStorage > 0) {          // 通过下面的函数来释放存储内存池的内存,减少存储内存池的大小。          val spaceToReclaim = storagePool.freeSpaceToShrinkPool(            math.min(extraMemoryNeeded, memoryReclaimableFromStorage))          // 到这里,说明存储内存池的空间已经释放,这一步只需要减少存储内存池的大小即可          storagePool.decrementPoolSize(spaceToReclaim)          // 增加执行内存池大小的量          executionPool.incrementPoolSize(spaceToReclaim)        }      }    }

要注意的是,执行内存池将借用的内存均匀地分配给活动任务,以限制每个任务的执行内存分配。保持这个大于执行池大小是很重要的,这不考虑可以通过清除存储而释放的潜在内存。另外,这个数量应该保持在“maxMemory”以下,以便在任务中执行内存分配的公平性,否则,任务可能占用超过其平均份额的执行内存。

(3)然后调用executionPool.acquireMemory来获取内存,该函数的声明如下:

 private[memory] def acquireMemory(      numBytes: Long, // 想要获取的内存数      taskAttemptId: Long, // 想要获取内存的任务数      maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit, // 一个回调函数,用来增长内存池的大小      computeMaxPoolSize: () => Long = () => poolSize // 回调函数,用来获取某个时刻允许获取内存的最大值      ): Long = lock.synchronized 

该函数尝试为给定任务获取numBytes大小的内存,并返回获取到的内存大小。该函数可能会阻塞,直到有足够的内存再返回。该函数的执行逻辑大致如下:

  • 添加任务到taskMemory这个map中,该map保存了任务id和申请的内存大小的对应关系。

  • 调用maybeGrowExecutionPool回调函数来向storeage申请内存,若内存不够该函数会释放掉一些存储内存。一次释放的内存可能不够,所以该函数可能会尝试多次。

  • maybeGrowExecutionPool会调用memoryStore.evictBlocksToFreeSpace函数,在该函数中会根据rdd和内存模式等参数来清除一些内存块,释放对应大小的内存,具体的实现过程在后面分析。

获取存储内存

获取存储内存的过程比获取执行内存的过程要相对简单。因为,获取存储内存时不会强制释放正在使用的执行内存,而只能从执行池的空闲内存中申请。

所以,申请存储内存的步骤主要是以下几步:

(1)判断需要申请的内存数量,是否大于存储池的空闲内存量。若大于(存储池的内存量不够),则向执行池的空闲内存申请一部分内存。要注意:可能执行池的空闲内存也不够,或根本就没有空闲内存。

(2)调用存储池的内存获取函数获取内存,若空闲内存不够,则需要从存储池中按LRU算法释放一部分内存。

获取存储内存是在函数acquireStorageMemory中实现,下面我们来分析一下该函数的具体实现。

acquireStorageMemory函数

该函数的原型如下:

  override def acquireStorageMemory(      blockId: BlockId,      numBytes: Long,      memoryMode: MemoryMode): Boolean = synchronized {...}

该函数的参数:

  • memoryMode: MemoryMode:该参数是内存的模式,主要有两种:ON_HEAP或OFF_HEAP。

  • numBytes:需要申请的内存大小,单位是bytes

  • blockId:数据块的ID,也是可能会被释放的数据块。若该id为空,则会通过LRU算法寻找需要释放块对应的内存。

该函数是一个同步函数,若是多个线程同时调用该函数,可能会阻塞。

实现分析

该函数的主要实现逻辑如下:

(1)根据参数memoryMode来获取此种模式下的最大可以用存储内存,保存在变量maxMemory中。

(2)判断内存申请量(即参数numBytes)是否大于maxMemory,若申请内存大于最大可用内存,会失败。报错:该blockid的数据块需要的内存超过最大使用内存。

(3)若申请的内存大小:numBytes大于存储池的空闲内存大小,则需要从执行池中“借用”一些空闲内存。借用的意思是,从执行池的空闲内存中获取一部分内存,但要注意:最多从执行池中借用空闲内存量,不会释放任务正在使用的执行内存。实现代码如下:

 if (numBytes > storagePool.memoryFree) {       // 所需内存量大于可用存储空闲内存量,需要从执行池中申请一部分内存      val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree,        numBytes - storagePool.memoryFree) //最多获取执行池中空闲的内存量大小      executionPool.decrementPoolSize(memoryBorrowedFromExecution)  // 执行内存池减少内存数      storagePool.incrementPoolSize(memoryBorrowedFromExecution) // 存储内存池增加对应内存    }

注意:这一步是体现统一内存思想的重要的一步。

(4)若能够从执行内存池中借用成功,这一步就直接在存储内存池中申请内存了。代码很简单,就是调用存储内存池的内存申请函数:

     storagePool.acquireMemory(blockId, numBytes)
storagePool#acquireMemory函数

该函数来完成存储池的内存申请工作。要注意,此时的存储池可能有空闲的内存,也可能没有空闲内存。当存储池没有空闲内存时,需要把已有的某些数据块从存储池中清除,以满足当前数据块的存储需要。

该函数的实现逻辑如下:

(1) 计算需要释放的内存量

需要申请的内存量减去空闲的内存量,就是需要释放的内存量。也就是说,需要从已经使用的存储内存块中释放一部分内存。

     val numBytesToFree = math.max(0, numBytes - memoryFree)

(2) 第(1)步已经计算出来需要释放的内存量了。下面调用StorageMemoryPool.acquireMemory函数来申请内存,释放一定的数据块。该函数会调用MemoryStore.evictBlocksToFreeSpace来清除数据块。会被清除的数据块的判断如下:

 def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {     //存储模式相同,且blockId没有被RDD占用 或则不是要替换相同RDD的不同数据块    entry.memoryMode == memoryMode &&     (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))}

若有可以释放的数据块,还需要获取一把写锁,加锁的目的是防止目前还有其他的线程在读该数据块。当锁获取成功后,就可以开始删除数据块了,具体的删除过程是通过blockInfoManager.removeBlock来进行的。该函数会把需要清除的元数据和数据块从blockManager中删除。

释放内存块:MemoryStore#evictBlocksToFreeSpace函数

这是MemoryStore类的成员函数,该函数完成内存块的释放,若存储级别包含useDisk还会把内存中的数据保存到磁盘中。该函数的原型如下:

   private[spark] def evictBlocksToFreeSpace(      blockId: Option[BlockId],      space: Long,      memoryMode: MemoryMode): Long = {

其中的blockId是数据块的id,每个id都对应一个内存块。释放内存块的逻辑如下:

(1)遍历内存块的队列。这是一个LinkedHashMap,最后一次被访问的内存块节点会放到链表的后面,这样最近没有被访问的内存块就在队列的头部。

(2)检查内存块是否可以被释放。释放内存块需要满足以下条件:

1)内存块的模式必须和参数中memoryMode的值相等;

2)该blockId对应的内存块没有被其他RDD占用,或则不是要替换相同RDD的不同数据块。

(3)若满足以上两个条件,就会释放该内存块。释放内存块的过程如下:

1)确认内存块的写锁已经锁上了;

2)通过blockId的信息检查存储级别是否包含useDisk,若包含则把内存的数据写入到磁盘上。写入磁盘 的过程是通过DiskStore对象来完成的。

(4)由于实际的内存是通过MemoryStore来管理的,所以,最后一步就是从memoryStore中删除并释放blockId对应的内存块,并减少MemoryStore的内存数量。到此,就完成了内存释放的整个过程。至于MemoryStore是如何释放内存的,会在分析MemoryStore时进行分析。

计算可用堆内存储内存:maxOnHeapStorageMemory函数

该函数用来计算堆内可用内存,逻辑很简单:就是使用总的堆内存储内存-为执行器可分配的堆内内存:

   override def maxOnHeapStorageMemory: Long = synchronized { // 计算可用堆内内存    maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed  }
计算对外存储内存:maxOffHeapStorageMemory函数

该函数用来计算可用堆外内存:使用总堆外内存-为执行器分配的堆外内存:

  override def maxOffHeapStorageMemory: Long = synchronized { // 计算可用堆外内存    maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed  }

小结

本文讲述了spark统一内存管理的实现原理。从实现层面来看,Spark的统一内存管理都是在UnifiedMemoryManager类中实现。不管是执行还是存储内存不足时,都可以向对方借用内存。但内存不足时,可以根据LRU来释放存储正在使用的内存,但不能释放执行时正在使用的内存。

另外,最终的内存块释放和数据块的持久化是通过MemoryStore,DiskStore以及BlockManager这几个系统来完成的,这些组件的原理会在后面的文章中继续分析。

内存参数 计算_Spark统一内存管理的实现相关推荐

  1. spark 序列化错误 集群提交时_Spark统一内存管理机制

    一.内存的分配 预留内存:300m 可用内存 = 系统内存 -  预留内存 可用内存 = 统一内存(60%) + 其他 (40%) 统一内存 = 存储内存(Storage)50%  + 执行内存(Ex ...

  2. 【Android 内存优化】Bitmap 内存占用计算 ( Bitmap 图片内存占用分析 | Bitmap 内存占用计算 | Bitmap 不同像素密度间的转换 )

    文章目录 一.Bitmap 内存占用 二.Bitmap 内存占用计算示例 三.Bitmap 内存占用与像素密度 四.Bitmap 内存占用与像素密度示例 一.Bitmap 内存占用 在 Android ...

  3. linux 真实内存,Linux计算真实可用内存

    ---恢复内容开始--- free命令显示了Linux系统中物理内存.交换分区的使用统计信息. 可用内存 = free + buffers + cached 第一行为 物理内存使用统计: 标题 说明 ...

  4. Linux计算内存,正确计算linux系统内存使用率

    对操作系统来说,Buffers和Cached是已经被使用的(上图Mem:这一行) MemFree=total-used 314952=24946552-24631600 对应用程序来说(上图对应-/+ ...

  5. Spark统一内存划分

    文章目录 1. Executor内存逻辑架构 2. Executor 界面内存计算 3. UnrollMemory理解 4. 参考 1. Executor内存逻辑架构 堆内存,由JVM分配和回收,由s ...

  6. 掌握 Spring Boot 运行内存及内存参数设置:助力高效应用部署与优化

    pring Boot 是当今非常流行的 Java 应用框架之一,在企业级应用开发中被广泛使用.应用部署和优化是企业级应用开发的一个非常重要的方面.在这篇博客中,我们将学习如何掌握 Spring Boo ...

  7. Spark内存管理(3)—— 统一内存管理设计理念

    Spark内存管理系列文章:  Spark内存管理(1)-- 静态内存管理  Spark内存管理(2)-- 统一内存管理 在本文中,将会对各个内存的分布以及设计原理进行详细的阐述  相对于静态内存模型 ...

  8. Spark内存管理(2)—— 统一内存管理

    Spark内存管理系列文章:  Spark内存管理(1)-- 静态内存管理 堆内内存 Spark 1.6之后引入的统一内存管理机制,与静态内存管理的区别在于Storage和Execution共享同一块 ...

  9. Apache Spark统一内存管理模型详解

    本文将对Spark的内存管理模型进行分析,下面的分析全部是基于ApacheSpark2.x进行的.文章仅对统一内存管理模块(UnifiedMemoryManager)进行分析,如对之前的静态内存管理感 ...

最新文章

  1. 一些关于找工作的书籍
  2. db2 springboot 整合_springboot的yml配置文件通过db2的方式整合mysql的教程
  3. iOS用户设计指南 - 平台特征
  4. Kotlin实战指南十一:扩展函数
  5. 【整理】MySQL 之 autocommit
  6. MuiPlayer视频播放组件入门
  7. xmpppy获取服务器版本信息,为什么XMPP? - 今幕明的个人页面 - OSCHINA - 中文开源技术交流社区...
  8. 电梯实时智能监测与诊断:应用人工智能的案例研究和解决方案
  9. 如何利用软文营销将好品牌故事?
  10. vue按钮字体大小设置_vue添加文字怎么设置时长
  11. UE4 Lights UWorld to FScene [1]
  12. AndroidStudio Kotlin Analysis 卡住(无限Performing...)
  13. python陆股通_要闻:11月“陆股通”渠道外资净买入A股579亿元
  14. 解决import org.junit.Test 和@Test报错
  15. JMS介绍:我对JMS的理解和认识
  16. 关闭烦人的Windows XP系统哔哔声
  17. Python 基础篇(三)--初步编程总结,各种常识
  18. pyppeteer报错:NoneType‘ object has no attribute ‘goto‘
  19. allegro模块布局+交互布局详解
  20. java设备imei号_Java IMEI串号生成规则

热门文章

  1. Flume环境搭建_五种案例(转)
  2. js学习总结----编写简单的ajax方法库
  3. 数据结构--------------静态表的希尔排序
  4. nlp中的经典深度学习模型(一)
  5. [剑指offer][JAVA]面试题第[18]题[删除链表的节点]
  6. 南师大632c语言程序设计,单片机c语言学习心得632.docx
  7. python3 unicode字符串_【已解决】Python3中如何声明字符串是unicode类型以避免log日志打印出错...
  8. java classpath bat_tomcat启动批处理——setclasspath.bat | 学步园
  9. i9 9900k mysql_i9-9900K和9900KS有什么区别?i9-9900KS和i9-9900K区别对比评测
  10. 检测同心圆_(二)光线如何被眼睛检测到?