1. 整体架构

Spark存储介质包括内存和磁盘等。Spakr的存储采用了主从模式,也就是Master/Slave模式,整个存储模块使用了前面介绍的RPC的通信方式。其中,Master负责整个应用程序运行期间的数据块元数据的管理和维护,而Slave一方面负责本地数据块的状态信息上报给Master,另一方面接受从Master传来的执行命令,如获取数据块状态、删除RDD/数据块等命令。在每个Slave中存在数据传输通道,根据需要在Slave之间进行远程数据的读取和写入

(1)在应用程序启动后,SparkContext会创建Driver端的SpakrEnv,在该SparkEnv中实列化BlockManager和BlockManagerMaster,在BlockManagerMaster内部创建消息通信的终端点BlockManagerMasterEndPoint

在Executor启动时候,也会创建SparkEnv,在该SparkEnv中实列化BlockManger和负责数据传输服务的BlockTransferService终端点的引用。在BlockManger初始化的过程中,一方面会加入BlockManagerMasterEndpoint终端点的引用,另一方面会创建Executor消息通信BlockManagerSlaveEndpoint终端点,并把该终端点的引用注册到Driver中,这样Dirver和Executor相互持有通信终端点的引用,可以在应用程序执行过程中进行消息通信。

// 创建远程数据传输服务,使用Nettyval blockTransferService =new NettyBlockTransferService(conf, securityManager, hostname, numUsableCores)//创建BlockMangerMaster,如果是Dirver端 在BlockMangerMaster内部,则创建终端点BlockManagerMasterEndpoint// 如果是Executor,则创建BlockManagerSlaveEndpoint的引用val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(BlockManagerMaster.DRIVER_ENDPOINT_NAME,new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),conf, isDriver)// NB: blockManager is not valid until initialize() is called later.// 创建BlockManager,如果是Driver端包含BlockManagerMaster,如果是Executor包含的是BlockManagerMaster的引用,另外BlockManager包含了// 数据传输服务,当BlockManager调用initalize()方法初始化时真正生效val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,blockTransferService, securityManager, numUsableCores)

其中BlockManger初始化代码如下,如果是Exeucor创建其消息通信的终端点BlockMangerSlaveEndpoint,并向Driver发送RegisterBlockManger消息,把该Executor的BlockManger和其包含的BlockMangerSlaveEndPoint注册到BlockManagerMaster中。

def initialize(appId: String): Unit = {// 在Executor中启动远程数据传输服务blockTransferService,根据配置启动传输服务器BlockTransferService// 该服务器启动后等待其他节点发送请求blockTransferService.init(this)shuffleClient.init(appId)// 获取BlockManager的编号blockManagerId = BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port)// 获取Shuffle服务编号,如果启动外部Shuffle服务,则加入外部Shuffle服务端口信息,// 否则使用BlockManager的编号shuffleServerId = if (externalShuffleServiceEnabled) {logInfo(s"external shuffle service port = $externalShuffleServicePort")BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)} else {blockManagerId}// 把Executor的BlockManager注册到BlockManagerMaster中,启动包括其终端点BlockMangaerSlaveEndPoint的引用,Master端持有该引用可以向Executor发送信息master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)// Register Executors' configuration with the local shuffle service, if one should exist.// 如果外部Shuffle服务启动并且为Executor节点,则注册该外部Shuffle服务if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {registerWithExternalShuffleServer()}}

(2)当写入、更新或删除数据完毕后,发送数据块的最新状态消息UpdateBlockInfo给BlockMangerMasterEndPoint终端点,由其更新数据块的元数据。该终端点的元数据存放在BlockMangerMasterEndPint的3个HashMap中。

 // Mapping from block manager id to the block manager's information.// 该HashMap中存放了BlockMangerId与BlockMangerInfo的对应,其中BlockMangerInfo包含了Executor内存使用情况、数据块的使用情况、已被缓存的数据块和Executor终端点引用// 通过该引用可以向Execuotr发送消息private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]// Mapping from executor ID to block manager ID.// 该HashMap中存放了ExecutorID和BlockMangerID对应列表private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]// Mapping from block id to the set of block managers that have the block.// 该HashMap存放了BlockId和BlockManagerId序列对应的列表,原因在于一个数据块可能存在多个副本,保持在多个Executor中

在更新数据的元数据时,更新BlockManagerInfo和BlockLocations两个列表

  • 在处理BokcMangerInfo时,传入的BlockMangerId、blockId和SotrageLevel等参数,通过这些参数判断数据的操作是插入、更新还是删除操作。
  • 在处理blockLoacations,根据blockId判断blockLocations中是否包含该数据库。如果包含该数据块,则根据数据块的操作,当进行数据更新时,更新数据块所在的BlockMangerid信息,当进行数据删除时,则移除该BlockMangerid信息,在删除过程中判断数据块对应的Executor是否为空,如果为空表示在集群中删除了该数据块,则在blockLoactions删除该数据块信息。

(3)应用程序数据存储后,在获取远程节点数据、获取RDD执行的首选位置等操作时需要根据数据块的编号查询数据块所处的位置,此时发送GetLoacations或获取数据块的位置信息。

(4)Spark提供删除RDD、数据块、广播变量的方式。当数据需要删除的时候,提交删除信息给BlockMangerSlaveEndPoint终端点,在该终端点发起删除操作,删除操作一方面需要删除Driver端元数据信息,另一方面需要发送消息通知Executor,删除对应的物理数据。

首先在SparkContext中调用unpersistRDD方法,在该方法中发送removeRDD消息给BlockMangerMasterEndPoint;然后,该终端点接收到消息时,从blockLocations列表中找出RDD对应的数据存在BlockManagerId列表,查询完毕之后,更新blockLoactions和blockMangerInfo两个元数据列表;最后,把获取的BlockManger列表,发送消息给所在的BlockMangerSlaveEndPoint终端点,通知其删除该Executor上的RDD,删除时调用BlockManager的removeRDD方法,删除Executor上RDD对应的数据块。

private def removeRdd(rddId: Int): Future[Seq[Int]] = {// First remove the metadata for the given RDD, and then asynchronously remove the blocks// from the slaves.// Find all blocks for the given RDD, remove the block from both blockLocations and// the blockManagerInfo that is tracking the blocks.// 在blockLocations和blockManagerInfo中删除该RDD的数据元信息// 首先根据RDD编号获取该RDD存储的数据块信息val blocks = blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)blocks.foreach { blockId =>//然后根据该数据块的信息找出这些数据块在blockManagerId中的列表,遍历这些列表并删除// BlockManager包含该数据块的元数据,同时删除blockLocations对应数据块的元数据val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))blockLocations.remove(blockId)}// Ask the slaves to remove the RDD, and put the result in a sequence of Futures.// The dispatcher is used as an implicit argument into the Future sequence construction.//最后发送RemoveRDD消息给Executor,通知其删除RDDval removeMsg = RemoveRdd(rddId)Future.sequence(blockManagerInfo.values.map { bm =>bm.slaveEndpoint.ask[Int](removeMsg)}.toSeq)}

在实际研究存储首先之前,我们在来看一下Spark存储模块之间的关系,如下图所示,整个模块中BlockManger时其核心,它不仅提供存储模块处理各种存储方式的读写方法,而且为Shuffle模块提供数据处理等操作。

BlockManger存在与Dirver端和每个Executor中,在Driver端的BlockManger保存了数据的元数据信息,而在Executor的BlockManger根据接受到消息进行操作:

  • 当Executor的BlockManger接受到读取数据时,根据数据块所在节点是否为本地使用BlockManger不同的方法进行处理。如果在本地,则直接调用MemeoryStore和DiskStore中的取方法getVlaues/getBytes进行读取;如果在远程,则调用BlockTransferService的服务进行获取远程数据节点上的数据。
  • 当Executor的BlockManger接收到写入数据时,如果不需要创建副本,则调用BlockStore的接口方法进行处理,根据数据写入的存储模型,决定调用对应的写入方法。

2. 存储级别

Spark时基于内存的计算,但是RDD的数据不仅可以存储到内存中,还可以使用persist或者cache 方显示的将RDD的数据存储到内存或者磁盘中。

  private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {// TODO: Handle changes of StorageLevel// 如果RDD指定了非NONE的存储级别,该存储级别不能被修改if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {throw new UnsupportedOperationException("Cannot change storage level of an RDD after it was already assigned a level")}// If this is the first time this RDD is marked for persisting, register it// with the SparkContext for cleanups and accounting. Do this only once.// 当RDD原来的存储级别为None时,可以对RDD进行持久化处理,在处理之前需要先清楚SparkContext中原来的存储元数据,然后加入该持久信息if (storageLevel == StorageLevel.NONE) {sc.cleaner.foreach(_.registerRDDForCleanup(this))sc.persistRDD(this)}// 当RDD原来的存储级别为NONE时,把RDD存储级别修改为传入的新值storageLevel = newLevelthis}

persist操作时控制操作的一种,它只是改变了原有的RDD的元数据信息,并没有进行数据的存储操作操作,正在进行是在RDD的iteratior方法中。对于cache方法而言,它只是persist的特例,即persist的方法参数为MEMORY_ONLY的情况。

在StorageLevel类中,根据useDisk、useMmeory、uesOffHeap、deserialized、replicaiton5个参数的组和。Spakr提供了12中存储级别的缓存策略,这可以将RDD持久化到内存、磁盘和外部存储系统,或者是以序列化的方式持久化到内存等,甚至可以在集群中不同节点之间存储多个副本呢。

self.useDisk = useDisk
self.useMemory = useMemory
self.useOffHeap = useOffHeap
self.deserialized = deserialized
self.replication = replicationStorageLevel.DISK_ONLY = StorageLevel(True, False, False, False)
StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False)
StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False)
StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)
StorageLevel.OFF_HEAP = StorageLevel(True, True, True, False, 1)
存储级别 描述
NONE 不进行数据存储
MEMORY_ONLY 将RDD作为反序列化的对象存储JVM中。如果RDD不能被内存装下,一些分区将不会被缓存,并且在需要的时候被重新计算。这是默认的存储级别
MEMORY_AND_DISK 将RDD作为反序列化的对象存储到JVM中。如果RDD不能内存装下,超出的分区将被保存在磁盘上,并且在需要的时候被读取
MEMORY_ONLY_SER 将RDD作为序列化的对象进行存储
MEMORY_AND_DISK_SER 与MEMORY_ONLY_SER类似,但是把超出的内存部分分区将存储到硬盘中而不是每次需要的时候进行重新计算
DISK_ONLY 只将RDD分区存储到硬盘上
DISK_ONLY_2 与上述的存储级别一样,但是将每个分区都复制到两个集群之上
OFF_HEAP 可以将RDD存储到分布式文件系统中,如Alluxio

3. RDD存储调用

RDD 包含多个Partition,每个Partition对应一个或者多个数据块Block,每个Block,每个Block拥有唯一的编号BlockId,对应于数据块编号规则为:“rdd”+rddId+"_"+splitIndex,其中splitIndex为该数据块对应的Partition序号。

首先RDD通过Transfermation操作,比如map获取flatMap操作,调用RDD构造相应的MapPartitionsRDD。然后通过在提交作业之后,运行相应的Task,执行MapPartitionsRDD的compute方法,在compute方法中调用RDD的iterator方法。

实际发送数据操作是任务执行的时候发生的,RDD调用iterator方法时发生的。在调用过程中,先根据数据块Block的编号在判断是否已经按照指定的存储级别进行存储,如果存在该数据块Block,则从本地或远程节点读取数据;如果不存在该数据块Block,则调用RDD的计算方法输出结果,并把结果按照指定的存储级别进行存储。

iterator 函数实现大体是这么个流程:
1 若标记了有缓存,则取缓存,取不到则进行”计算或读检查点”。完了再存入缓存,以备后续使用。
2 若未标记有缓存,则直接进行”计算或读检查点”。
3 “计算或读检查点”这个过程也做两个判断:有做过checkpoint,没有做过checkpoint。做过checkpoint则可以读取到检查点数据返回。无则调该rdd的实现类的computer函数计算。computer函数实现方式就是向上递归“获取父rdd分区数据进行计算”,直到遇到检查点rdd获取有缓存的rdd。

  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {if (storageLevel != StorageLevel.NONE) {// 如果存在存储级别,尝试读取内存的数据进行迭代计算getOrCompute(split, context)} else {// 如果不存在存储级别,则直接读取数据进行迭代计算或者读取检查点结构进行迭代计算computeOrReadCheckpoint(split, context)}}

其中调用的getOrCompute时方法存储逻辑的核心。

  private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {// 通过RDD编号和partition序号获取数据块的Block的编号val blockId = RDDBlockId(id, partition.index)var readCachedBlock = true// This method is called on executors, so we need call SparkEnv.get instead of sc.env.// 由于该方法由Executor调用,可以使用SparkEnv代替sc.env// 根据数据块Block的编号先读取数据,然后在更新数据,这里是读写数据的入口SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {// 如果数据块不在内存中,则尝试读取检查点结果进行迭代计算readCachedBlock = falsecomputeOrReadCheckpoint(partition, context)}) match {case Left(blockResult) =>// 对getOrElseUpdate返回结果进行处理,该结果表示处理成功,记录结果度量信息if (readCachedBlock) {val existingMetrics = context.taskMetrics().inputMetricsexistingMetrics.incBytesRead(blockResult.bytes)new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {override def next(): T = {existingMetrics.incRecordsRead(1)delegate.next()}}} else {new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])}// 对getOrElseUpdate返回结果进行处理,该结果表示保存失败,例如数据太大无法放到内存中// 并且也无法保存到磁盘中,把该返回结果给调用者,由其决定如何处理。case Right(iter) =>new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])}}

在getOrCompute调用getOrElseUpdate方法,该方法时存储读写数据的入口点。

  def getOrElseUpdate[T](blockId: BlockId,level: StorageLevel,classTag: ClassTag[T],makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {// Attempt to read the block from local or remote storage. If it's present, then we don't need// to go through the local-get-or-put path.// 尝试从本地或远程存储中读取块。 如果它存在,那么我们不需要通过local-get-or-put路径。get[T](blockId)(classTag) match {case Some(block) =>return Left(block)case _ =>// Need to compute the block.}// Initially we hold no locks on this block.// 写数据的入口doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {case None =>// doPut() didn't hand work back to us, so the block already existed or was successfully// stored. Therefore, we now hold a read lock on the block.val blockResult = getLocalValues(blockId).getOrElse {// Since we held a read lock between the doPut() and get() calls, the block should not// have been evicted, so get() not returning the block indicates some internal error.releaseLock(blockId)throw new SparkException(s"get() failed for block $blockId even though we held a lock")}// We already hold a read lock on the block from the doPut() call and getLocalValues()// acquires the lock again, so we need to call releaseLock() here so that the net number// of lock acquisitions is 1 (since the caller will only call release() once).releaseLock(blockId)Left(blockResult)case Some(iter) =>// The put failed, likely because the data was too large to fit in memory and could not be// dropped to disk. Therefore, we need to pass the input iterator back to the caller so// that they can decide what to do with the values (e.g. process them without caching).Right(iter)}}

4. 读数据的过程

BlockManager的get方法是读数据的入口点,在读取时分为本地读取和远程节点读取两个步骤。本地读取使用getLocalValues方法,该方法根据不同的存储级别直接调用不同存储实现方法;而远程读取使用getRemoteValues方法,在getRemoteVulaes方法中调用了GetRemoteBytes方法,在方法中调用远程数据传输类BLockTransferService的fetchBlockSync进行处理,使用Netty的fetchBlocks方法读取数据,整个数据读取类调用如下:

5.写数据过程

前面分析当中,我们了解到BlockManger的doPutIterator方法是写数据的入口。在该方法中,根据数据是否缓存到内存中进行处理。如果不缓存到内存中,则调用BlockManager的putIterator方法直接存储磁盘;如果缓存到内存中,则先判断数据存储级别是否进行了反序列化。如果设置了反序列化,则说明获取的数据为值类型,调用putIteratorAsVaules方法把数据存入内存;如果没有设置反序列化,则获取的数据为字节类型,调用putIteratorAsBytes方法把数据放入内存中。在把数据存入内存中的时候,则需要判断内存中展开数据大小是否满足,当足够调用BlockManger的putArray方法写入内存,否则把数据写入到磁盘中。

在写入数据完成的时候,一方面吧数据块的元数据发送给Driver端的BlockMangerMasterEndPoint终端点,请求其更新数据元数据,另一方面判断是否需要创建数据副本,如果需要调用replicate方法,把数据写到远程节点上,类似于读取远程节点的数据,Spark提供Netty方式写入数据。

Spark详解(九):Spark存储原理分析相关推荐

  1. [网络安全自学篇] 四十五.病毒详解及批处理病毒原理分析(自启动、修改密码、定时关机、蓝屏、进程关闭)

    这是作者的网络安全自学教程系列,主要是关于安全工具和实践操作的在线笔记,特分享出来与博友们学习,希望您们喜欢,一起进步.前文分享了Windows远程桌面服务漏洞(CVE-2019-0708),并详细讲 ...

  2. Kubernetes Service详解(概念、原理、流量分析、代码)

    Kubernetes Service详解(概念.原理.流量分析.代码) 作者: liukuan73 原文:https://blog.csdn.net/liukuan73/article/details ...

  3. otg usb 定位_详解USB OTG工作原理及其应用

    原标题:详解USB OTG工作原理及其应用 1994年,Intel,Compaq等七家软硬件全球知名企业为了突破当时PC使用串口和并口传输速度的限制,成立了通用串行 开发者论坛( Implemente ...

  4. JPEG/Exif/TIFF格式解读(1):JEPG图片压缩与存储原理分析

    JPEG文件简介 JPEG的全称是JointPhotographicExpertsGroup(联合图像专家小组),它是一种常用的图像存储格式, jpg/jpeg是24位的图像文件格式,也是一种高效率的 ...

  5. 详解Oracle架构、原理、进程,学会世间再无复杂架构

    详解Oracle架构.原理.进程,学会世间再无复杂架构 学习是一个循序渐进的过程,从面到点.从宏观到微观,逐步渗透,各个击破,对于Oracle, 怎么样从宏观上来理解呢?先来看一个图,这个图取自于教材 ...

  6. 算法:详解布隆过滤器的原理、使用场景和注意事项@知乎.Young Chen

    算法:详解布隆过滤器的原理.使用场景和注意事项@知乎.Young Chen 什么是布隆过滤器 本质上布隆过滤器是一种数据结构,比较巧妙的概率型数据结构(probabilistic data struc ...

  7. 云原生存储详解:容器存储与 K8s 存储卷

    作者 | 阚俊宝 阿里云技术专家 导读:云原生存储详解系列文章将从云原生存储服务的概念.特点.需求.原理.使用及案例等方面,和大家一起探讨云原生存储技术新的机遇与挑战.本文为该系列文章的第二篇,会对容 ...

  8. FPGA学习之路—接口(3)—SPI详解及Verilog源码分析

    FPGA学习之路--SPI详解及Verilog源码分析 概述 SPI = Serial Peripheral Interface,是串行外围设备接口,是一种高速,全双工,同步的通信总线. 优点 支持全 ...

  9. docker修改镜像的存储位置_云原生存储详解:容器存储与 K8s 存储卷(内含赠书福利)...

    作者 | 阚俊宝  阿里巴巴技术专家 参与文末留言互动,即有机会获得赠书福利! 导读:云原生存储详解系列文章将从云原生存储服务的概念.特点.需求.原理.使用及案例等方面,和大家一起探讨云原生存储技术新 ...

  10. 云原生存储详解:容器存储与K8s存储卷

    作者 | 阚俊宝 阿里云技术专家 导读:云原生存储详解系列文章将从云原生存储服务的概念.特点.需求.原理.使用及案例等方面,和大家一起探讨云原生存储技术新的机遇与挑战.本文为该系列文章的第二篇,会对容 ...

最新文章

  1. 为什么php动态语言,动态语言静态化
  2. 如何使用PXE 安装 Windows XP +PXE安装XP
  3. 《剑指offer》c++版本 5.替换空格
  4. 推荐系统笔记:决策树回归树
  5. Redis性能问题排查解决手册(值得收藏)
  6. 650服务器raid配置_DELL R730服务器配置RAID及安装服务器系统
  7. Swift之深入解析可选类型Optional的底层原理
  8. URI和URLConnection类的区别
  9. 2pc_two phase commit详情
  10. docker 部署Gitlab
  11. 一个好用的不基于时间的同步文件的软件 —— Allway sync 文件同步
  12. C语言从字符串中提取数字
  13. VeriSign SSL证书产品及服务_VeriSign证书|SSL证书|EVSSL证书|服务器证书|数字证书
  14. 制作本地视频网站 苹果cms 超详细
  15. java 怎么去JTF边框_求助 java 如何编写JFrame窗体右上角红色打叉关闭按钮的事件?...
  16. android相机固定焦距,如何找到Android相机的焦距?
  17. CSS:个人常用搜索框样式
  18. 【项目实践】海康威视工业相机SDK开发小白版入门教程(VS2015+OpenCV4.5.1)
  19. source insight 4.0 使用make命令编译
  20. Java经典实验_猴子的经典实验(转载)

热门文章

  1. 什么是同源策略及限制
  2. [大山中学模拟赛] 2016.9.10
  3. 2016百度之星 - 资格赛(Astar Round1)
  4. 继续说一下2016里面的json功能(1)
  5. 漫游Kafka设计篇之性能优化(7)
  6. 应用DIV+CSS编码时容易犯的一些错误
  7. vs2005中文的,可是有180天的适用期,哪位高手能破了啊
  8. 深入浅出 Javascript API(五)--Query Find 查询
  9. java学习与总结:集合类
  10. 解决:RuntimeError: cryptography is required for sha256_password or caching_sha2_password