Spark Cleaner 清理器

  • 功能概述
  • Cleaner的创建
  • 清理逻辑
    • RDD的清理
    • Shuffle的清理
    • Broadcast的清理
    • Accum的清理
    • Checkpoint的清理
  • 参考

功能概述

这里使用的是一个弱引用(WeakReference)队列,主要用于对RDD,shuffle和广播状态异步清理。当这些对象被gc回收以后,会被放入待清理队列referenceQueue中等待清理,实际的清理动作是在单独的守护线程完成

Cleaner的创建

SparkContext在初始化时就会创建并启动一个cleaner

 _cleaner =if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {Some(new ContextCleaner(this))} else {None}_cleaner.foreach(_.start())

cleaner内部维护一个缓冲区refercenceBuffer,目的是防止CleanupTaskWeakReference未处理就被垃圾回收器回收,这里存的只是RDD的refercence,并非真正的RDD对象。当然,这里还有一个比较重要的角色referenceQueue,主要作用就是当如RDD这样的对象被gc回收后,能通知到它。那系统何时会做gc呢?其实Spark在这个cleaner中启动了一个定时做垃圾回收单线程context-cleaner-periodic-gc

spark.cleaner.periodicGC.interval=30min 表示每30分钟做一次系统gc
spark.cleaner.referenceTracking.blocking=true 表示清理线程是否等待远端操作的完成,即rpc的返回
spark.cleaner.referenceTracking.blocking.shuffle=false 表示shuffule清理线程是否等待远端操作的完成,即rpc的返回

清理逻辑

cleaner清理的逻辑都在keepCleaning()方法中,当RDD被GC回收后,referenceQueue会收到删除对象的reference,该方法不断从队列中remove reference,然后执行真正的清理 doCleaupXXX()

/** Keep cleaning RDD, shuffle, and broadcast state. */private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {while (!stopped) {try {val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)).map(_.asInstanceOf[CleanupTaskWeakReference])// Synchronize here to avoid being interrupted on stop()synchronized {reference.foreach { ref =>logDebug("Got cleaning task " + ref.task)referenceBuffer.remove(ref)ref.task match {case CleanRDD(rddId) =>doCleanupRDD(rddId, blocking = blockOnCleanupTasks)case CleanShuffle(shuffleId) =>doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)case CleanBroadcast(broadcastId) =>doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)case CleanAccum(accId) =>doCleanupAccum(accId, blocking = blockOnCleanupTasks)case CleanCheckpoint(rddId) =>doCleanCheckpoint(rddId)}}}} catch {case ie: InterruptedException if stopped => // ignorecase e: Exception => logError("Error in cleaning thread", e)}}}

RDD的清理

RDD的清理直接调用SparkContextunpersistRDD(rddId, blocking)方法,并且会通知到已注册的listeners

这里说明一下blocking这个参数,其主要作用是用于BlockManagerMaster向Driver发送消息时是同步发送还是异步发送,在调用unpersistRDD方法时,SparkContext会调用BlockManagerMasterremoveRdd(rddId, blocking)删除RDD数据(内存或者磁盘,如果该参数是true则代表等待rpc的返回结果,否则不用等待,所以这里可能抛出Exception

 /** Perform RDD cleanup. */def doCleanupRDD(rddId: Int, blocking: Boolean): Unit = {try {logDebug("Cleaning RDD " + rddId)sc.unpersistRDD(rddId, blocking)listeners.asScala.foreach(_.rddCleaned(rddId))logInfo("Cleaned RDD " + rddId)} catch {case e: Exception => logError("Error cleaning RDD " + rddId, e)}}

Shuffle的清理

Shuffle清理会调用到mapOutputTrackerMaster(For Driver)组建的unregisterShuffle(shuffuleId)方法移除Spark关于该shuffle的内部信息,同时会调用到blockManagerMasterremoveShuffle(shuffleId, blocking)方法删除所有属于该shuffleblock块,此处也是向Driver发送删除消息,因此有blocking参数的存在,同时这里也会通知到相关的listener

 /** Perform shuffle cleanup. */def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = {try {logDebug("Cleaning shuffle " + shuffleId)mapOutputTrackerMaster.unregisterShuffle(shuffleId)blockManagerMaster.removeShuffle(shuffleId, blocking)listeners.asScala.foreach(_.shuffleCleaned(shuffleId))logInfo("Cleaned shuffle " + shuffleId)} catch {case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)}}

Broadcast的清理

广播变量broadcast的清理则是调用的BroadcastManagerunbroadcast(broadcastId, true, blocking)方法删除所有已经持久化的广播变量状态,第二个参数表示是否从Driver端删除。同样也会通知到所有的listener

/** Perform broadcast cleanup. */def doCleanupBroadcast(broadcastId: Long, blocking: Boolean): Unit = {try {logDebug(s"Cleaning broadcast $broadcastId")broadcastManager.unbroadcast(broadcastId, true, blocking)listeners.asScala.foreach(_.broadcastCleaned(broadcastId))logDebug(s"Cleaned broadcast $broadcastId")} catch {case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)}}

Accum的清理

累加器的清理主要是调用AccumulatorContextremove(accId)方法将累加器从其上下文删除,所以这里用不到blocking参数

/** Perform accumulator cleanup. */def doCleanupAccum(accId: Long, blocking: Boolean): Unit = {try {logDebug("Cleaning accumulator " + accId)AccumulatorContext.remove(accId)listeners.asScala.foreach(_.accumCleaned(accId))logInfo("Cleaned accumulator " + accId)} catch {case e: Exception => logError("Error cleaning accumulator " + accId, e)}}

Checkpoint的清理

清理写入磁盘的Checkpoint文件,这里是直接删除的文件,所以也用不到blocking参数

/*** Clean up checkpoint files written to a reliable storage.* Locally checkpointed files are cleaned up separately through RDD cleanups.*/def doCleanCheckpoint(rddId: Int): Unit = {try {logDebug("Cleaning rdd checkpoint data " + rddId)ReliableRDDCheckpointData.cleanCheckpoint(sc, rddId)listeners.asScala.foreach(_.checkpointCleaned(rddId))logInfo("Cleaned rdd checkpoint data " + rddId)}catch {case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e)}}

参考

ContextCleaner Github源码

Spark Cleaner 清理器相关推荐

  1. Spark : ContextCleaner清理器

    1.美图 Spark运行的时候,会产生一堆临时文件,临时数据,比如持久化的RDD数据在磁盘上,没有持久化的在内存中,比如shuffle的临时数据等,如果每次运行完,或者没有运行完杀掉了,不清理,会产生 ...

  2. spark学习:ContextCleaner清理器

    Spark运行的时候,会产生一堆临时文件,临时数据,比如持久化的RDD数据在磁盘上,没有持久化的在内存中,比如shuffle的临时数据等,如果每次运行完,或者没有运行完杀掉了,不清理,会产生大量的无用 ...

  3. 数据清理器:FoneLab iPhone Cleaner Mac

    FoneLab iPhone Cleaner Mac一款专业强大的数据清理器.使您能够彻底删除 iPhone.iPad 或 iPod Touch 中的所有数据.您设备上的所有数据都将被擦除,您的数据将 ...

  4. SparkContext的初始化(伯篇)——执行环境与元数据清理器

    <深入理解Spark:核心思想与源码分析>一书前言的内容请看链接<深入理解SPARK:核心思想与源码分析>一书正式出版上市 <深入理解Spark:核心思想与源码分析> ...

  5. cas入门之二十五:ticket清理器

    当ticket存储器,没有能力管理ticket的状态时,则需要ticket清理器.默认的内存ticket存储器,jpa存储器等都需要ticket清理器;memecached ,ehcache等存储器则 ...

  6. 系统垃圾文件清理器 制作:China Doll (莫增成)

    @echo off ::修正于2018-10-06 color 4a Title 系统垃圾文件清理器 制作:China Doll (莫增成) echo. echo ================== ...

  7. Ubuntu Cleaner清理工具

    Ubuntu Cleaner清理工具 安装sudo add-apt-repository ppa:gerardpuig/ppasudo apt updatesudo apt install ubunt ...

  8. Reg Organizer(注册表文件清理器) v8.30.2中文绿色便携版

    点击下载来源:Reg Organizer(注册表文件清理器) v8.30.2中文绿色便携版 Reg Organizer是一款高品质的注册表文件清理器工具,通过简单的清理和整理窗口注册表项以及卸载程序来 ...

  9. Win日志批量清理器

    如果感觉日志太多了,或仅仅是为了跟踪软件出错报错的跟踪,有时候需要清一下,一个一个手工不是不可以,感觉就是麻烦. 所以单独写个"Win日志清理器",方便操作. 下载: http:/ ...

最新文章

  1. Python进阶01 词典
  2. Rest Framework
  3. jenkins配置记录(2)--代码发布流程
  4. kill apache
  5. django-dynamic-scraper(DDS)配置中的一些问题
  6. libvirt API管理hypervisors
  7. 软件工程实践总结--个人作业
  8. OA系统选型的那些事儿
  9. typora里插入图片,设置图片大小和位置
  10. 基于JQuery网页漂浮广告窗口Js详解
  11. 阿里云大数据ACA总结
  12. js实现表格按行滚动
  13. Appium自动化测试(五)——PO模式(一):短信案例
  14. 如何在ios app 局域网内搜索到设备
  15. Matlab 动态输入变量和嵌套函数、匿名函数
  16. tws耳机哪个品牌好?国产好用的tws耳机推荐
  17. git 远端更新合并到本地
  18. 计算机控制菜单在哪里,电脑菜单在哪里
  19. 小时 分钟 秒 计算
  20. 云服务器安装SSL证书,实现https访问

热门文章

  1. 制作RPG独立游戏练习(二)内置渲染管线中实现风格化PBR效果
  2. Windows系统中cmd命令总结
  3. visual studio软件破解方法
  4. 数字孪生汽车制造工艺车间 | 智慧工厂
  5. Python 算法交易实验56 ADBS:QuantData-灌入离线数据
  6. 不确定的乌卡时代:新国货品牌何去何从?
  7. 2023合肥工业大学计算机考研信息汇总
  8. 事件处理程序的注册this.button1.Click += new System.EventHandler(this.button1_Click)
  9. 十二个鸡蛋三次找出坏鸡蛋
  10. 解决支付宝小程序微信小程序post请求后台接收不到参数的问题