对于一个复杂的RDD,我们如果担心某些关键的,会在后面反复使用的RDD,可能会因为节点的故障,导致持久化数据的丢失,就可以针对该RDD启动checkpoint机制,实现容错和高可用。

在进行checkpoint之前,最好先对RDD执行持久化操作,比如persist(StorageLevel.DISK_ONLY)如果持久化了,就不用再重新计算;否则如果没有持久化RDD,还设置了checkpoint,那么本来job都结束了,但是由于中间的RDD没有持久化,那么checkpoint job想要将RDD数据写入外部文件系统,还得从RDD之前的所有的RDD全部重新计算一次,再进行checkpoint。然后从持久化的RDD磁盘文件读取数据

一 RDD的checkpoint方法

# 如果SparkContext没有设置checkpointDir,则抛出异常

# 如果设置了,则创建RDDCheckpointData,这个类主要负责管理RDD的checkpoint的进程和状态等

# 创建RDDCheckpointData的时候,会初始化checkpoint状态为Initialized

def checkpoint(): Unit = RDDCheckpointData.synchronized {if (context.checkpointDir.isEmpty) {throw new SparkException("Checkpoint directory has not been set in the SparkContext")} else if (checkpointData.isEmpty) {checkpointData = Some(new ReliableRDDCheckpointData(this))}
}
 

二persist 持久化RDD

# 如果该RDD已经有了storage level,但是还和指定的storage level不相等,那么抛出异常,不支持在一个RDD分配了storage level之后再分配一个storage level

# 标记这个RDD为persisting

# 设置RDD的storage level

private def persist(newLevel:StorageLevel, allowOverride: Boolean):this.type = {
  if (storageLevel!= StorageLevel.NONE&& newLevel != storageLevel&& !allowOverride) {
    throw new UnsupportedOperationException(
      "Cannotchange storage level of an RDD after it was already assigned a level")
  }
  // If this isthe first time this RDD is marked for persisting, register it
  // with the SparkContext for cleanupsand accounting. Do this only once.
 
if (storageLevel== StorageLevel.NONE) {
    sc.cleaner.foreach(_.registerRDDForCleanup(this))
    sc.persistRDD(this)
  }
  storageLevel = newLevel
  this
}

三 RDD的doCheckpoint方法

当调用DAGScheduler的runJob的时候,开始调用RDD的doCheckpoint方法

# 该rdd是否已经调用doCheckpoint,如果还没有,则开始处理

# 查看是否需要把该rdd的所有依赖即血缘全部checkpoint,如果需要,血缘上的每一个rdd递归调用该方法

# 调用RDDCheckpointData的checkpoint方法

private[spark] def doCheckpoint(): Unit = {RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {// 该rdd是否已经调用doCheckpoint,如果还没有,则开始处理if (!doCheckpointCalled) {doCheckpointCalled = true// 判断RDDCheckpointData是否已经定义了,如果已经定义了if (checkpointData.isDefined) {// 查看是否需要把该rdd的所有依赖即血缘全部checkpointif (checkpointAllMarkedAncestors) {// 血缘上的每一个rdd递归调用该方法dependencies.foreach(_.rdd.doCheckpoint())}// 调用RDDCheckpointData的checkpoint方法checkpointData.get.checkpoint()} else {dependencies.foreach(_.rdd.doCheckpoint())}}}
}

四RDDCheckpointData的checkpoint

# 将checkpoint的状态从Initialized置为CheckpointingInProgress

# 调用子类的doCheckpoint,创建一个新的CheckpointRDD

# 将checkpoint状态置为Checkpointed状态,并且改变rdd之前的依赖,设置父rdd为新创建的CheckpointRDD

final def checkpoint(): Unit = {// 将checkpoint的状态从Initialized置为CheckpointingInProgressRDDCheckpointData.synchronized {if (cpState == Initialized) {cpState = CheckpointingInProgress} else {return}}// 调用子类的doCheckpoint,我们以ReliableCheckpointRDD为例,创建一个新的CheckpointRDDval newRDD = doCheckpoint()// 将checkpoint状态置为Checkpointed状态,并且改变rdd之前的依赖,设置父rdd为新创建的CheckpointRDDRDDCheckpointData.synchronized {cpRDD = Some(newRDD)cpState = Checkpointedrdd.markCheckpointed()}
}

五RDDCheckpointData的doCheckpoint

我们以ReliableCheckpointRDD为例,将rdd的数据写入HDFS中checkpoint目录,并且创建CheckpointRDD

protected override def doCheckpoint(): CheckpointRDD[T] = {// 将rdd的数据写入HDFS中checkpoint目录,并且创建CheckpointRDDval newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {rdd.context.cleaner.foreach { cleaner =>cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)}}logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")newRDD
}

六ReliableCheckpointRDD的writeRDDToCheckpointDirectory

将rdd的数据写入HDFS中checkpoint目录,并且创建CheckpointRDD

def writeRDDToCheckpointDirectory[T: ClassTag](originalRDD: RDD[T],checkpointDir: String,blockSize: Int = -1): ReliableCheckpointRDD[T] = {val sc = originalRDD.sparkContext// 创建checkpoint输出目录val checkpointDirPath = new Path(checkpointDir)// 获取HDFS文件系统API接口val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)// 创建目录if (!fs.mkdirs(checkpointDirPath)) {throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")}// 将配置文件信息广播到所有节点val broadcastedConf = sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))// 重新启动一个job,将rdd的分区数据写入HDFSsc.runJob(originalRDD,writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)// 如果rdd的partitioner不为空,则将partitioner写入checkpoint目录if (originalRDD.partitioner.nonEmpty) {writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)}// 创建一个CheckpointRDD,该分区数目应该和原始的rdd的分区数是一样的val newRDD = new ReliableCheckpointRDD[T](sc, checkpointDirPath.toString, originalRDD.partitioner)if (newRDD.partitions.length != originalRDD.partitions.length) {throw new SparkException(s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +s"number of partitions from original RDD $originalRDD(${originalRDD.partitions.length})")}newRDD
}

七 RDD的iterator方法

# 当持久化RDD的时候,执行task的时候,会遍历RDD指定分区的数据,在持久的时候,因为指定了storage level,所以我们会调用getOrCompute获取数据,由于第一次还没有持久化过,所以会先计算。但是数据还没有被持久化,所以此时先把数据持久化到磁盘(假设持久化时就指定了StorageLevel=DISK_ONLY),然后再把block数据缓存到本地内存

# 进行checkpoint操作时,会启动一个新的job来处理checkpoint任务。当执行checkpoint的任务来执行RDD的iterator方法时,此时我们知道该RDD的持久化级别不为空,则从BlockManager获取出结果来,因为已经持久化过了所以不需要进行计算。如果持久化的数据此时已经丢失呢,怎么办呢?即storage level为空了,这此时就会调用computeOrReadCheckpoint方法,重新计算结果,然后写入checkpoint目录

# 如果已经持久化和checkpoint了,那么此时如果有任务在iterator获取不到block,那么就会调用computeOrReadCheckpoint方法,此时已经物化过了,所以直接从原始RDD对应的父RDD(CheckpointRDD)的iterator方法,此时已经没有持久化级别,所以CheckpointRDD的iterator方法就会调用CheckpointRDD的compute方法从checkpoint文件读取数据

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {// 如果StorageLevel不为空,表示该RDD已经持久化过了,可能是在内存,也有可能是在磁盘,// 如果是磁盘获取的,需要把block缓存在内存中if (storageLevel != StorageLevel.NONE) {getOrCompute(split, context)} else {// 进行rdd partition的计算或者根据checkpoint读取数据computeOrReadCheckpoint(split, context)}
}

八  RDD的computeOrReadCheckpoint方法

# 如果checkpoint状态已经置为checkpointed了,表示checkpoint已经完成,这时候从checkpoint获取;如果还是checkpointInProgress,则表示持久化数据丢失,或者根本就没有持久化,所以需要原来的RDD的compute方法重新计算结果

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{// 当前rdd是否已经checkpoint和物化了,如果已经checkpoint,则调用父类的CheckpointRDD的iterator方法获取// 如果没有则开始计算if (isCheckpointedAndMaterialized) {firstParent[T].iterator(split, context)} else {// 则调用rdd的compute方法开始计算,返回一个Iterator对象compute(split, context)}
}

九CheckpointRDD的compute方法

# 创建checkpoint文件

#从HDFS上的checkpoint文件读取checkpoint过的数据

override def compute(split: Partition, context: TaskContext): Iterator[T] = {// 创建checkpoint文件val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))// 从HDFS上的checkpoint文件读取checkpoint过的数据ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
}

Spark源码分析之Checkpoint机制相关推荐

  1. spark 源码分析之二十 -- Stage的提交

    引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...

  2. Spark 源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 一. 启动篇 (一) 引子 在spark-shell终端执行 val arr = Array(1,2,3,4) val rdd ...

  3. spark 源码分析之十八 -- Spark存储体系剖析

    本篇文章主要剖析BlockManager相关的类以及总结Spark底层存储体系. 总述 先看 BlockManager相关类之间的关系如下: 我们从NettyRpcEnv 开始,做一下简单说明. Ne ...

  4. spark 源码分析之十九 -- DAG的生成和Stage的划分

    上篇文章 spark 源码分析之十八 -- Spark存储体系剖析 重点剖析了 Spark的存储体系.从本篇文章开始,剖析Spark作业的调度和计算体系. 在说DAG之前,先简单说一下RDD. 对RD ...

  5. Spark源码分析之Sort-Based Shuffle读写流程

    一 概述 我们知道Spark Shuffle机制总共有三种: # 未优化的Hash Shuffle:每一个ShuffleMapTask都会为每一个ReducerTask创建一个单独的文件,总的文件数是 ...

  6. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  7. Spark源码分析之九:内存管理模型

    Spark是现在很流行的一个基于内存的分布式计算框架,既然是基于内存,那么自然而然的,内存的管理就是Spark存储管理的重中之重了.那么,Spark究竟采用什么样的内存管理模型呢?本文就为大家揭开Sp ...

  8. spark 源码分析 Blockmanager

    原文链接 参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD  Spark的特点就是可以将RDD cache在memo ...

  9. Spark源码分析 – DAGScheduler

    DAGScheduler的架构其实非常简单, 1. eventQueue, 所有需要DAGScheduler处理的事情都需要往eventQueue中发送event 2. eventLoop Threa ...

最新文章

  1. 百度之星12月30号题目之维基解密
  2. BZOJ 2257: [Jsoi2009]瓶子和燃料【数论:裴蜀定理】
  3. 《实例化需求》读书笔记
  4. [Vue warn]: Duplicate keys detected: ‘0‘. This may cause an update error.
  5. 守护线程Daemon的理解
  6. jstack调试_增压的jstack:如何以100mph的速度调试服务器
  7. python https协议和InsecurePlatformWarning问题
  8. 经常吃番茄对身体有什么影响?
  9. freemarker mysql 生成bean_基于数据库的代码自动生成工具,生成JavaBean、生成数据库文档、生成前后端代码等(v6.6.6版)...
  10. 网易开源分布式存储系统 Curve,性能彪悍!这是要吊打阿里?
  11. SpringBoot项目启动报错
  12. rabbitmq消息队列原理
  13. VMware虚拟机中激活Windows Server 2008的具体步骤
  14. 【C++】给定两个没有刻度的容器,对于任意给定的容积,求出如何只用两个瓶装出L升的水
  15. 赫茨伯格的双因素理论
  16. HUAWEI Mate40Pro解除账号忘记密码ID强制刷机鸿蒙系统激活锁能解开吗
  17. ω一致的故事和符号世界的对应——哥德尔读后之24
  18. Excel教程:数值为0不显示的三种解决方法介绍
  19. 3D Models (3D 模型)
  20. 一个屌丝程序猿的人生(八十二)

热门文章

  1. Zookeeper常用命令详解(Zookeeper3.6)
  2. java如何画百分比圆环_canvas绘制旋转的圆环百分比进度条
  3. python如何在文本内排序_在python中对文本文件中的项进行排序
  4. 14Linux远程登录And15Linux远程文件传输
  5. linux history 用法,Linux之History的使用
  6. python数字转拼音输出_Python 将中文转拼音
  7. ajax 请求成功 再执行javascript,jquery中ajax请求后台数据成功后既不执行success也不执行error的完美解决方法...
  8. 解决python读取pickle报错ValueError: unsupported pickle protocol: 5
  9. python用代码执行另一个python文件
  10. Linux/Mac 配置安装scala