Task运行的时候是要去获取Parent 的RDD对应的Partition的数据的,即它会调用RDD的iterator方法把对应的Partition的数据集给遍历出来,然后写入存储,这个存储可能是磁盘或者内存,取决于StorageLevel是什么。

如果当前RDD的StorageLevel不为空,则表示已经存持久化了,我们可以直接在内存中获取,而不是去计算Parent RDD。如果没有StorageLevel,则表示没有缓存过,内存中没有,则我们需要运行的数据就需要从Parent RDD计算出来。注意,这里所谓的缓存并不是使用什么cache 组件,而直接是从本地读取,本地没有则从远端,获取的结果直接放入内存存储,方便后续读取,这才是真正cache的地方。

一 RDD的iterator方法

final defiterator(split:Partition, context: TaskContext): Iterator[T] = {
  // 如果StorageLevel不为空,表示该RDD已经持久化过了,可能是在内存,也有可能是在磁盘,
  // 如果是磁盘获取的,需要把block缓存在内存中
 
if (storageLevel!= StorageLevel.NONE) {
    getOrCompute(split,context)
  } else {
    // 进行rdd partition的计算或者根据checkpoint读取数据
   
computeOrReadCheckpoint(split,context)
  }
}

二 RDD的getOrCompute

从内存或者磁盘获取,如果磁盘获取需要将block缓存到内存

private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {// 根据rdd id创建RDDBlockIdval blockId = RDDBlockId(id, partition.index)// 是否从缓存的block读取var readCachedBlock = trueSparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {readCachedBlock = false // 如果调用了这个函数,说明没有获取到block,自然不能从cache中读取// 需要调用该函数重新计算或者从checkpoint读取computeOrReadCheckpoint(partition, context)}) match {// 获取到了结果直接返回case Left(blockResult) =>// 如果从cache读取blockif (readCachedBlock) {val existingMetrics = context.taskMetrics().inputMetricsexistingMetrics.incBytesRead(blockResult.bytes)new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {override def next(): T = {existingMetrics.incRecordsRead(1)delegate.next()}}} else {new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])}case Right(iter) =>new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])}
}

三 BlockManager的getOrElseUpdate方法

如果指定的block存在,则直接获取,否则调用makeIterator方法去计算block,然后持久化最后返回值

def getOrElseUpdate[T](blockId: BlockId,level: StorageLevel,classTag: ClassTag[T],makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {// 尝试从本地获取数据,如果获取不到则从远端获取get[T](blockId)(classTag) match {case Some(block) =>return Left(block)case _ =>// Need to compute the block.}// 如果本地化和远端都没有获取到数据,则调用makeIterator计算,最后将结果写入blockdoPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {// 表示写入成功case None =>// 从本地获取数据块val blockResult = getLocalValues(blockId).getOrElse {releaseLock(blockId)throw new SparkException(s"get() failed for block $blockId even though we held a lock")}releaseLock(blockId)Left(blockResult)// 如果写入失败case Some(iter) =>// 如果put操作失败,表示可能是因为数据太大,无法写入内存,又无法被磁盘drop,因此我们需要返回这个iterator给// 调用者以至于他们能够做出决定这个值是什么,怎么办Right(iter)}
}

四 BlockManager的get方法

先从本地获取数据,如果没有则从远端获取

def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = {// 从本地获取blockval local = getLocalValues(blockId)// 如果本地获取到了则返回if (local.isDefined) {logInfo(s"Found block $blockId locally")return local}// 如果本地没有获取到则从远端获取val remote = getRemoteValues[T](blockId)// 如果远端获取到了则返回,没有返回Noneif (remote.isDefined) {logInfo(s"Found block $blockId remotely")return remote}None
}

五 BlockManager的getLocalValues方法

从本地获取block,如果存在返回BlockResult,不存在返回None;如果storage level是磁盘,则还需将得到的block缓存到内存存储,方便下次读取

def getLocalValues(blockId: BlockId): Option[BlockResult] = {logDebug(s"Getting local block $blockId")// 调用block info manager,锁定该block,然后读取block,返回该block 元数据block infoblockInfoManager.lockForReading(blockId) match {// 没有读取到则返回Nonecase None =>logDebug(s"Block $blockId was not found")None// 读取到block元数据case Some(info) =>// 获取存储级别storage levelval level = info.levellogDebug(s"Level for block $blockId is $level")// 如果使用内存,且内存memory store包含这个block idif (level.useMemory && memoryStore.contains(blockId)) {// 判断是不是storage level是不是反序列化的,如果死反序列化的,则调用MemoryStore的getValues方法// 否则调用MemoryStore的getBytes然后反序列输入流返回数据作为迭代器val iter: Iterator[Any] = if (level.deserialized) {memoryStore.getValues(blockId).get} else {serializerManager.dataDeserializeStream(blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag)}val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))// 构建一个BlockResult对象返回,这个对象包括数据,读取方式以及字节大小Some(new BlockResult(ci, DataReadMethod.Memory, info.size))}// 如果使用磁盘存储,且disk store包含这个block则从磁盘获取,并且把结果放入内存else if (level.useDisk && diskStore.contains(blockId)) {// 调用DiskStore的getBytes方法,如果需要反序列化,则进行反序列val iterToReturn: Iterator[Any] = {val diskBytes = diskStore.getBytes(blockId)if (level.deserialized) {val diskValues = serializerManager.dataDeserializeStream(blockId,diskBytes.toInputStream(dispose = true))(info.classTag)// 尝试将从磁盘读的溢写的值加载到内存,方便后续快速读取maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)} else {// 如果不需要反序列化,首先将读取到的流加载到内存,方便后续快速读取val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes).map {_.toInputStream(dispose = false)}.getOrElse { diskBytes.toInputStream(dispose = true) }// 然后再返回反序列化之后的数据serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)}}// 构建BlockResult返回val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId))Some(new BlockResult(ci, DataReadMethod.Disk, info.size))} else {// 处理本地读取block失败,报告driver这是一个无效的block,将会删除这个blockhandleLocalReadFailure(blockId)}}
}

五 BlockManager的getRemoteValues方法

从block所存放的其他block manager(其他节点)获取block

private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = {val ct = implicitly[ClassTag[T]]// 将远程fetch的结果进行反序列化,然后构建BlockResult返回getRemoteBytes(blockId).map { data =>val values =serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct)new BlockResult(values, DataReadMethod.Network, data.size)}
}
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {logDebug(s"Getting remote block $blockId")require(blockId != null, "BlockId is null")var runningFailureCount = 0var totalFailureCount = 0// 首先根据blockId获取当前block存在在哪些block manager上val locations = getLocations(blockId)// 最大允许的获取block的失败次数为该block对应的block manager数量val maxFetchFailures = locations.sizevar locationIterator = locations.iterator// 开始遍历block managerwhile (locationIterator.hasNext) {val loc = locationIterator.next()logDebug(s"Getting remote block $blockId from $loc")// 通过调用BlockTransferSerivce的fetchBlockSync方法从远端获取blockval data = try {blockTransferService.fetchBlockSync(loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()} catch {case NonFatal(e) =>runningFailureCount += 1totalFailureCount += 1// 如果总的失败数量大于了阀值则返回Noneif (totalFailureCount >= maxFetchFailures) {logWarning(s"Failed to fetch block after $totalFailureCount fetch failures. " +s"Most recent failure cause:", e)return None}logWarning(s"Failed to fetch remote block $blockId " +s"from $loc (failed attempt $runningFailureCount)", e)if (runningFailureCount >= maxFailuresBeforeLocationRefresh) {locationIterator = getLocations(blockId).iteratorlogDebug(s"Refreshed locations from the driver " +s"after ${runningFailureCount} fetch failures.")runningFailureCount = 0}// This location failed, so we retry fetch from a different one by returning null herenull}// 返回ChunkedByteBufferif (data != null) {return Some(new ChunkedByteBuffer(data))}logDebug(s"The value of block $blockId is null")}logDebug(s"Block $blockId not found")None
}

六RDD的computeOrReadCheckpoint

如果block没有被持久化,即storage level为None,我们就需要进行计算或者从Checkpoint读取数据;如果已经checkpoint了,则调用ietrator去读取block数据,否则调用Parent的RDD的compute方法

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{// 当前rdd是否已经checkpoint和物化了,如果已经checkpoint,则调用第一个parent rdd的iterator方法获取// 如果没有则开始计算if (isCheckpointedAndMaterialized) {firstParent[T].iterator(split, context)} else {// 则调用rdd的compute方法开始计算,返回一个Iterator对象compute(split, context)}
}

Spark源码分析之cahce原理分析相关推荐

  1. 老李推荐:第5章5节《MonkeyRunner源码剖析》Monkey原理分析-启动运行: 获取系统服务引用 1...

    老李推荐:第5章5节<MonkeyRunner源码剖析>Monkey原理分析-启动运行: 获取系统服务引用 上一节我们描述了monkey的命令处理入口函数run是如何调用optionPro ...

  2. 老李推荐:第6章1节《MonkeyRunner源码剖析》Monkey原理分析-事件源-事件源概览 1...

    老李推荐:第6章1节<MonkeyRunner源码剖析>Monkey原理分析-事件源-事件源概览 在上一章中我们有简要的介绍了事件源是怎么一回事,但是并没有进行详细的描述.那么往下的这几个 ...

  3. 老李推荐:第6章6节《MonkeyRunner源码剖析》Monkey原理分析-事件源-事件源概览-命令队列...

    老李推荐:第6章6节<MonkeyRunner源码剖析>Monkey原理分析-事件源-事件源概览-命令队列 事件源在获得字串命令并把它翻译成对应的MonkeyEvent事件后,会把这些事件 ...

  4. Spark源码解读之Shuffle原理剖析与源码分析

    在前面几篇文章中,介绍了Spark的启动流程Spark内核架构流程深度剖析,Spark源码分析之DAGScheduler详解,Spark源码解读之Executor以及Task工作原理剖析,Spark源 ...

  5. 顺序线性表 ---- ArrayList 源码解析及实现原理分析

    原创播客,如需转载请注明出处.原文地址:http://www.cnblogs.com/crawl/p/7738888.html ------------------------------------ ...

  6. android 实例源码解释,Android Handler 原理分析及实例代码

    Android Handler 原理分析 Handler一个让无数android开发者头疼的东西,希望我今天这边文章能为您彻底根治这个问题 今天就为大家详细剖析下Handler的原理 Handler使 ...

  7. spring源码阅读--aop实现原理分析

    aop实现原理简介 首先我们都知道aop的基本原理就是动态代理思想,在设计模式之代理模式中有介绍过这两种动态代理的使用与基本原理,再次不再叙述. 这里分析的是,在spring中是如何基于动态代理的思想 ...

  8. Spark源码阅读02-Spark核心原理之调度算法

    Spark核心原理之调度算法 Spark核心原理之调度算法 应用程序之间 作业及调度阶段之间 1.创建调度池 2.调度池加入调度内容 3.提供已排序的任务集管理器 任务之间 1.数据本地性 2.延迟执 ...

  9. Spark源码阅读03-Spark存储原理之存储分析

    Spark存储分析 整体框架 存储级别 RDD存储调用 读数据过程 本地读取 远程读取 写数据过程 写入内存 写入磁盘 整体框架 Spark的存储采取了主从模式,即Master / Slave模式,整 ...

最新文章

  1. vue element 关闭当前tab 跳转到上一路由
  2. 首届中国IT架构大师高峰论坛(十年架构之路汇成一句话!)
  3. 排优解难 网上邻居常遇故障解决方法
  4. php中插入表格 标签,PHP_HTML中的表格元素,一,table标签。tablegt - phpStudy
  5. 重磅!!kaggle训练, 终于不用怕断网了
  6. mvc 404错误 php,ASP.NET MVC实现404跳转的代码实例
  7. c语言中逐个检索字符的库函数,C语言库函数strstr的实现
  8. android litepal可以指定存储目录吗,Android数据库LitePal的基本用法详解
  9. 64位Ubuntu kylin 16.04显示CPU内存使用率
  10. Variable Assembly Language可变汇编语言
  11. Spring 3.1和Hibernate的持久层
  12. 数据库过滤操作中 != 或者 指定操作数并不能改匹配到NULL值
  13. Python 语法糖
  14. cassss服务未启动_aws启动ssserver
  15. 51单片机之定时器/计数器0中断程序
  16. moss下载_无法为增值税MOSS混乱提供“简单的技术解决方案”
  17. 机器学习实战-决策树 java版代码开发实现
  18. imx6上调用 vpu
  19. input输入框只能输入字母
  20. 20220614 笔记

热门文章

  1. 小程序统一服务消息_微信团队发布小程序模板消息能力调整通知:小程序订阅消息接口正式上线...
  2. phphstudy运行不了网站_传统企业网站运营分析:这些弊端你了解吗
  3. java量_Java 2. 量与常量
  4. 牛客网 java刷题_牛客网刷题(纯java题型 1~30题)
  5. java singletonlist_Java Collections singletonList()方法及示例
  6. php购物网站类的继承和多态,类的继承与多态
  7. ArcEngine和GDAL读写栅格数据机制对比(一)
  8. python输出输入的指定位数的密码_用python生成指定位数的密码
  9. python读取多个文件夹中的音频文件_Python3.7 读取音频根据文件名生成脚本的代码...
  10. 关闭惠普计算机通电启动注册表,惠普10代cpu电脑装win7卡logo(安装程序正在更新注册表设置)解决方法...