ContextCleaner是Spark中用来清理无用rdd,broadcast等数据的清理器,其主要用到的是java的weakReference弱引用来达成清理无用数据的目的。

ContextCleaner主要由两个线程两个集合组成。

private val referenceBuffer =Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)private val referenceQueue = new ReferenceQueue[AnyRef]

在ContextCleaner中,需要将来被清理而注册到ContextCleaner的数据都将会被构造成下文的CleanupTaskWeakReference。

private class CleanupTaskWeakReference(val task: CleanupTask,referent: AnyRef,referenceQueue: ReferenceQueue[AnyRef])extends WeakReference(referent, referenceQueue)

task代表具体的清理事情,为case类,根据数据类型区分具体的事件类型以便确定具体的清理方法。

referent为具体的被注册到这里的数据,将会直接被作为弱引用WeakReference构造方法的一员参数被使用,而referenceQueue则是上文中提到的ContextCleaner生效的集合之一,用来构造WeakReference,当被注册的数据只剩下当前这唯一一个弱引用,而在别处没有引用之后,将会准备作为gc的一部分被清理回收,并被放入到此referenceQueue中被获取到。

而上文另一个容器referenceBuffer则用阿里存放CleanupTaskWeakReference,可根据具体的数据类型确定具体的清理步骤。

CleanupTaskWeakReference的注册如下:

def registerRDDCheckpointDataForCleanup[T](rdd: RDD[_], parentId: Int): Unit = {registerForCleanup(rdd, CleanCheckpoint(parentId))
}/** Register an object for cleanup. */
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
}

另外两个线程如下:

private val cleaningThread = new Thread() { override def run() { keepCleaning() }}private val periodicGCService: ScheduledExecutorService =ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")def start(): Unit = {cleaningThread.setDaemon(true)cleaningThread.setName("Spark Context Cleaner")cleaningThread.start()periodicGCService.scheduleAtFixedRate(new Runnable {override def run(): Unit = System.gc()}, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
}

两个线程都将在ContextCleaner中被开启,其中单个线程池中的线程职责很简单,则是简单的调用System.gc()去开启垃圾回收进行数据清理。

另一个线程会在start()方法中被设置为守护线程,并被启动,其会开始执行keepCleaning()方法。

private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {while (!stopped) {try {val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)).map(_.asInstanceOf[CleanupTaskWeakReference])// Synchronize here to avoid being interrupted on stop()synchronized {reference.foreach { ref =>logDebug("Got cleaning task " + ref.task)referenceBuffer.remove(ref)ref.task match {case CleanRDD(rddId) =>doCleanupRDD(rddId, blocking = blockOnCleanupTasks)case CleanShuffle(shuffleId) =>doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)case CleanBroadcast(broadcastId) =>doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)case CleanAccum(accId) =>doCleanupAccum(accId, blocking = blockOnCleanupTasks)case CleanCheckpoint(rddId) =>doCleanCheckpoint(rddId)}}}} catch {case ie: InterruptedException if stopped => // ignorecase e: Exception => logError("Error in cleaning thread", e)}}
}

在这里,会不断从上文提到,需要被回收的对象将会在referenceQueue中,从这里取得并从erferenceBuffer中得到对应的case类确定执行清理的具体步骤,并移除。例如如果为rdd,则在这里获得的是CleanRDD,并将调用doCleanRDD()方法根据rddId去回收该rdd。

def doCleanupRDD(rddId: Int, blocking: Boolean): Unit = {try {logDebug("Cleaning RDD " + rddId)sc.unpersistRDD(rddId, blocking)listeners.asScala.foreach(_.rddCleaned(rddId))logInfo("Cleaned RDD " + rddId)} catch {case e: Exception => logError("Error cleaning RDD " + rddId, e)}
}

Rdd的具体回收包含两步,首先从blockManager中移除该数据,之后调用监听器通知rddCleaned()被回收。

spark的ContextCleaner清理相关推荐

  1. spark学习:ContextCleaner清理器

    Spark运行的时候,会产生一堆临时文件,临时数据,比如持久化的RDD数据在磁盘上,没有持久化的在内存中,比如shuffle的临时数据等,如果每次运行完,或者没有运行完杀掉了,不清理,会产生大量的无用 ...

  2. Spark : ContextCleaner清理器

    1.美图 Spark运行的时候,会产生一堆临时文件,临时数据,比如持久化的RDD数据在磁盘上,没有持久化的在内存中,比如shuffle的临时数据等,如果每次运行完,或者没有运行完杀掉了,不清理,会产生 ...

  3. Spark Streaming揭秘 Day16 数据清理机制

    Spark Streaming揭秘 Day16 数据清理机制 今天主要来讲下Spark的数据清理机制,我们都知道,Spark是运行在jvm上的,虽然jvm本身就有对象的自动回收工作,但是,如果自己不进 ...

  4. 那些年我们在spark SQL上踩过的坑

    做了一年延云YDB的开发,这一年在使用spark上真心踩了不少坑,总结一下,希望对大家有所帮助. spark 内存泄露 1.高并发情况下的内存泄露的具体表现 很遗憾,spark的设计架构并不是为了高并 ...

  5. [Spark版本更新]--2.3.0发行说明

    自从2017年12月1日发布spark-2.2.1以来,已有3个月时间. 2018年2月28日,spark官方发布了一个大版本Spark-2.3.0,解决了1399个大大小小的问题. 一.DataBr ...

  6. [Spark版本更新]--2.3.0发行说明(二)

    新功能 ·        [ SPARK-3181 ] - 使用Huber估计器添加鲁棒回归算法 ·        [ SPARK-4131 ] - 支持"通过查询将数据写入文件系统&quo ...

  7. [Spark版本更新]--2.3.0发行说明(一)

    自从2017年12月1日发布spark-2.2.1以来,已有3个月时间. 2018年2月28日,spark官方发布了一个大版本Spark-2.3.0,解决了1399个大大小小的问题. 一.DataBr ...

  8. Apache Spark 3.0 结构化Streaming流编程指南

    目录 总览 快速范例 Scala语言 Java语言 Python语言 R语言 程式设计模型 基本概念 处理事件时间和延迟数据 容错语义 使用数据集和数据帧的API 创建流数据框架和流数据集 流数据帧/ ...

  9. Spark1.0.0 属性配置

    1:Spark1.0.0属性配置方式 Spark属性提供了大部分应用程序的控制项,而且能够单独为每一个应用程序进行配置. 在Spark1.0.0提供了3种方式的属性配置: SparkConf方式 Sp ...

最新文章

  1. Linux下修复修改profile文件导致命令不用可的解决方法
  2. iOS Sprite Kit教程之真机测试以及场景的添加与展示
  3. 游戏在ios和android,陈情令手游ios和安卓互通吗 ios和安卓能一起玩吗
  4. 循环队列(0965)
  5. 加法器的verilog实现(串行进位、并联、超前进位、流水线)
  6. OpenFOAM各版本说明介绍
  7. 信息学奥赛一本通(1005:地球人口承载力估计)
  8. python perl正则表达式_python学习笔记(正则表达式)
  9. 两个可用的ntp服务器地址
  10. C++ set 排序 修改元素之后不会改变原来的排序
  11. zookeeper 阿里滴滴 有点用 zookeeper主从选举方式
  12. mro python_一窥Python中MRO排序原理
  13. 扫码点菜系统代码_一顿火锅吃出474万天价?扫码点餐时,千万不要这样做
  14. 卸载centos7自带java,安装oracle的jdk8
  15. Proteus仿真:使用8255输出连续方波
  16. 云计算10个入门基础知识
  17. 英特尔核心显卡控制面板设置自定义分辨率
  18. 苹果关掉200m限制_苹果怎么取消200m限制
  19. android虚拟内置sd卡,安卓虚拟器bluestacks虚拟sd卡打开方法【图解】
  20. 观影《铁拳男人》有感

热门文章

  1. django+nginx+uwsgi项目部署文档整理
  2. vueCli3中使用代理,点击页面的刷新按钮时报错
  3. prestashop 隐藏 index.php,删除PrestaShop中的供应商和制造商页面
  4. python代码加密cython_利用Cython加密python脚本
  5. vue3——ref reactive函数
  6. 计算机应用基础第二版在线作业一,计算机应用基础在线作业一
  7. asp和php功能,asp和php都有什么功能?
  8. rabbitmq-plugins.bat enable rabbitmq_management
  9. leetcode最长递增子序列问题
  10. MySql 5.7 json数据格式 增删改查 操作 (不定时更新)