Spark运行的时候,会产生一堆临时文件,临时数据,比如持久化的RDD数据在磁盘上,没有持久化的在内存中,比如shuffle的临时数据等,如果每次运行完,或者没有运行完杀掉了,不清理,会产生大量的无用数据,最终造成大数据集群崩溃而死。

初始化
ContextCleaner的初始化是在SparkContext中初始化的,这个功能默认是必须开 
启的。

_cleaner =
      if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
        Some(new ContextCleaner(this))
      } else {
        None
      }
    _cleaner.foreach(_.start())
1
2
3
4
5
6
7
初始化 的时候主要newle一个清理线程

// 清理线程===》很重要
  private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
1
2
这个清理线程,主要清理了RDD,shuffle,Broadcast,累加器,检查点等数据

/** Keep cleaning RDD, shuffle, and broadcast state.
    * 保持一个干净的RDD,shuffle和broadcast状态
    *
    * ContextCleaner的工作原理和listenerBus一样,也采用监听器模式,由线程来处理,此线程实际还是那个只是
    * 调用keepCleanning方法。
    * */
  private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
    // 默认一直为真true
    while (!stopped) {
      try {
        val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
          .map(_.asInstanceOf[CleanupTaskWeakReference])
        // Synchronize here to avoid being interrupted on stop()
        synchronized {
          reference.foreach { ref =>
            logDebug("Got cleaning task " + ref.task)
            referenceBuffer.remove(ref)
            // 清除Shuffle和Broadcast相关的数据会分别调用doCleanupShuffle和doCleanupBroadcast函数。根据需要清除数据的类型分别调用
            ref.task match {
              case CleanRDD(rddId) =>
                doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
              case CleanShuffle(shuffleId) =>
                doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
              case CleanBroadcast(broadcastId) =>
                doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
              case CleanAccum(accId) =>
                doCleanupAccum(accId, blocking = blockOnCleanupTasks)
              case CleanCheckpoint(rddId) =>
                doCleanCheckpoint(rddId)
            }
          }
        }
      } catch {
        case ie: InterruptedException if stopped => // ignore
        case e: Exception => logError("Error in cleaning thread", e)
      }
    }
  }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
RDD的清理

/** Perform RDD cleanup.
    * 在ContextCleaner 中会调用RDD.unpersist()来清除已经持久化的RDD数据
    * */
  def doCleanupRDD(rddId: Int, blocking: Boolean): Unit = {
    try {
      logDebug("Cleaning RDD " + rddId)
      // 被SparkContext的unpersistRDD方法
      sc.unpersistRDD(rddId, blocking)
      listeners.asScala.foreach(_.rddCleaned(rddId))
      logInfo("Cleaned RDD " + rddId)
    } catch {
      case e: Exception => logError("Error cleaning RDD " + rddId, e)
    }
  }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
shuffle的清理

/** Perform shuffle cleanup.
    *
    * 清理Shuffle
    * */
  def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = {
    try {
      logDebug("Cleaning shuffle " + shuffleId)
      // 把mapOutputTrackerMaster跟踪的shuffle数据不注册(具体做了什么,还没处理)
      mapOutputTrackerMaster.unregisterShuffle(shuffleId)
      // 删除shuffle的块数据
      blockManagerMaster.removeShuffle(shuffleId, blocking)
      listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
      logInfo("Cleaned shuffle " + shuffleId)
    } catch {
      case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
    }
  }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
广播的清理

/** Perform broadcast cleanup.
    * 清除广播
    * */
  def doCleanupBroadcast(broadcastId: Long, blocking: Boolean): Unit = {
    try {
      logDebug(s"Cleaning broadcast $broadcastId")
      // 广播管理器 清除广播
      broadcastManager.unbroadcast(broadcastId, true, blocking)
      listeners.asScala.foreach(_.broadcastCleaned(broadcastId))
      logDebug(s"Cleaned broadcast $broadcastId")
    } catch {
      case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
    }
  }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
累加器的清理

/** Perform accumulator cleanup.
    * 清除累加器
    * */
  def doCleanupAccum(accId: Long, blocking: Boolean): Unit = {
    try {
      logDebug("Cleaning accumulator " + accId)
      AccumulatorContext.remove(accId)
      listeners.asScala.foreach(_.accumCleaned(accId))
      logInfo("Cleaned accumulator " + accId)
    } catch {
      case e: Exception => logError("Error cleaning accumulator " + accId, e)
    }
  }
1
2
3
4
5
6
7
8
9
10
11
12
13
检查点的清理

/**
   * Clean up checkpoint files written to a reliable storage.
   * Locally checkpointed files are cleaned up separately through RDD cleanups.
    *
    * 清理记录到可靠存储的检查点文件。
    * 局部检查点文件通过RDD清理被单独清理。
   */
  def doCleanCheckpoint(rddId: Int): Unit = {
    try {
      logDebug("Cleaning rdd checkpoint data " + rddId)
      // 这里直接调用文件系统删除  是本地 就本地删除,是hdfs就hdfs删除
      ReliableRDDCheckpointData.cleanCheckpoint(sc, rddId)
      listeners.asScala.foreach(_.checkpointCleaned(rddId))
      logInfo("Cleaned rdd checkpoint data " + rddId)
    }
    catch {
      case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e)
    }
  }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
启动方法
在sparkContext中调用启动方法

_cleaner.foreach(_.start())
1
这里是启动方法

/** Start the cleaner.
    * 开始清理
    * */
  def start(): Unit = {
    // 设置清理线程为守护进程
    cleaningThread.setDaemon(true)
    // 设置守护进程的名称
    cleaningThread.setName("Spark Context Cleaner")
    // 启动守护进程
    cleaningThread.start()

// scheduleAtFixedRate 在给定的初始延迟之后,并随后在给定的时间内,创建并执行一个已启用的周期操作
    // periodicGCInterval=30分钟 也就是没=每过30分钟运行一次清理线程清理垃圾
    periodicGCService.scheduleAtFixedRate(new Runnable {
      // 执行系统的垃圾清理
      override def run(): Unit = System.gc()
    }, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
  }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
这里启动线程 // 启动守护进程 cleaningThread.start(),这里自我感觉一下,因为下面调用System.gc()是清理垃圾,所以这个cleaningThread线程应该是收集那些需要清理的数据,保存它的引用(引用就是一个地址,一个指针,指向要删除的数据),最后调用System.gc()方法,才真正清理。

结束
最后是关闭这个应用的时候,调用Stop()方法

/**
   * Stop the cleaning thread and wait until the thread has finished running its current task.
    * 停止清理线程并等待线程完成其当前任务。
   */
  def stop(): Unit = {
    stopped = true
    // Interrupt the cleaning thread, but wait until the current task has finished before
    // doing so. This guards against the race condition where a cleaning thread may
    // potentially clean similarly named variables created by a different SparkContext,
    // resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132).
    // 中断清理线程,但等待当前任务完成后再执行。
    // This guards against the race condition where a cleaning thread may
    // potentially clean similarly named variables created by a different SparkContext,
    // ,导致其他令人费解的块未发现异常(spark-6132)。
    synchronized {
      // 打断线程
      cleaningThread.interrupt()
    }
    // 设置0 等待这个线程死掉
    cleaningThread.join()
    // 关闭垃圾清理
    periodicGCService.shutdown()
  }

spark学习:ContextCleaner清理器相关推荐

  1. spark的ContextCleaner清理

    ContextCleaner是Spark中用来清理无用rdd,broadcast等数据的清理器,其主要用到的是java的weakReference弱引用来达成清理无用数据的目的. ContextCle ...

  2. Spark : ContextCleaner清理器

    1.美图 Spark运行的时候,会产生一堆临时文件,临时数据,比如持久化的RDD数据在磁盘上,没有持久化的在内存中,比如shuffle的临时数据等,如果每次运行完,或者没有运行完杀掉了,不清理,会产生 ...

  3. Spark Cleaner 清理器

    Spark Cleaner 清理器 功能概述 Cleaner的创建 清理逻辑 RDD的清理 Shuffle的清理 Broadcast的清理 Accum的清理 Checkpoint的清理 参考 功能概述 ...

  4. SparkContext的初始化(伯篇)——执行环境与元数据清理器

    <深入理解Spark:核心思想与源码分析>一书前言的内容请看链接<深入理解SPARK:核心思想与源码分析>一书正式出版上市 <深入理解Spark:核心思想与源码分析> ...

  5. spark学习笔记!!!!

    文章目录 1.spark概述 2.spark运行架构 2.1.核心组件 2.2.核心概念 2.3.Spark提交流程 3.spark核心编程 3.1.RDD 3.1.1.什么是RDD 3.1.2.RD ...

  6. spark学习-43-Spark的BlockManager

    1.美图 1.简介: BlockManager负责对Block的管理,只有在BlockManager的痴实话方法initialize被调用之后,它才是有效的.Blockmanager作为存储系统的一部 ...

  7. spark 学习笔记

    spark 学习笔记 spark介绍 Spark是是一种快速通用的集群计算系统,它的主要特点是能够在内存中进行计算.它包含了 spark 核心组件 spark-core,用于 SQL 和结构化处理数据 ...

  8. Spark学习之路一——Spark基础及环境搭建

    Spark学习之路一--Spark基础及环境搭建 文章目录 一. Spark 概述 1.1 概述 1.2 优势特性 1.2.1 运行速度快 1.2.2 容易使用 1.2.3 通用性 1.2.4 运行模 ...

  9. jvm学习 Shenandoah垃圾收集器

    系统学习请点击jvm学习目录 建议学习Shenandoah之前先学习G1垃圾收集器 前言 Shenandoah垃圾收集器是一个很有意思的垃圾收集器,它是第一款非Oracle公司开发的HotSpot垃圾 ...

最新文章

  1. java培训教程:什么是匿名内部类?怎样创建匿名内部类?
  2. openstack高可用方案
  3. PHP安装编译错误及解决办法
  4. Docker 核心概念、安装、端口映射及常用操作命令,详细到令人发指。
  5. Cocos2d—android 中常用的工具类
  6. 高露洁、悦诗风吟、Benefit,618大促的数字化难题都是如何解决的?
  7. CodeForces - 724C Ray Tracing(扩展欧几里得解方程)
  8. C#多线程编程系列(五)- C# ConcurrentBag的实现原理
  9. spring官方文档列表
  10. 在Win8.1系统下如何安装运行SQL Server 2005 (以及安装SQL Server 2005 Express打补丁)...
  11. python读取HDF文件
  12. java简易计算器程序框图_简易计算器程序设计思路及流程图
  13. and design 如何引入阿里图表矢量库 创建自定义icon
  14. I5 4590 台式机安装黑苹果最新版笔记
  15. 吾爱破解python百度文库下载源码_python版百度音乐下载软件和源码
  16. 文件服务器限制流量,盘点天翼云盘,限制虽紧依然堪用,几个使用小技巧
  17. 对于硬盘做了raid的Windows server 2016服务器重置密码
  18. freebsd的swatch安装和使用
  19. SF14 | Supertrend“超级趋势线”指标魔改升级(源码)
  20. 空光盘复制后到另外计算机无法读取,解决方法:无法读取计算机CD的解决方案...

热门文章

  1. 百度api语音识别一直“无内容”_PHP开发语音识别功能
  2. vue 左右滑动菜单_Vue实现左右菜单联动实现代码
  3. mysql 事物状态有几种_10分钟梳理MySQL核心知识点
  4. 搭建AWStats日志分析系统
  5. 大数据常见组件的访问页面总结
  6. rust布料怎么弄_布料“难弄”,你需要从这六方面解决!
  7. python怎样打开加密的文件_python基础教程如何用Python 加密文件
  8. phpstorm安装_快速打造自己的PHPStorm主题
  9. python中size的用法_在Python中PyArray_SIZE的正确用法是什么?
  10. 金古桥机器人_《泽塔奥特曼》奥特曼憋屈了,被机器人保护,金古桥可能才是主角...