spark学习:ContextCleaner清理器
Spark运行的时候,会产生一堆临时文件,临时数据,比如持久化的RDD数据在磁盘上,没有持久化的在内存中,比如shuffle的临时数据等,如果每次运行完,或者没有运行完杀掉了,不清理,会产生大量的无用数据,最终造成大数据集群崩溃而死。
初始化
ContextCleaner的初始化是在SparkContext中初始化的,这个功能默认是必须开
启的。
_cleaner =
if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
} else {
None
}
_cleaner.foreach(_.start())
1
2
3
4
5
6
7
初始化 的时候主要newle一个清理线程
// 清理线程===》很重要
private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
1
2
这个清理线程,主要清理了RDD,shuffle,Broadcast,累加器,检查点等数据
/** Keep cleaning RDD, shuffle, and broadcast state.
* 保持一个干净的RDD,shuffle和broadcast状态
*
* ContextCleaner的工作原理和listenerBus一样,也采用监听器模式,由线程来处理,此线程实际还是那个只是
* 调用keepCleanning方法。
* */
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
// 默认一直为真true
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
// Synchronize here to avoid being interrupted on stop()
synchronized {
reference.foreach { ref =>
logDebug("Got cleaning task " + ref.task)
referenceBuffer.remove(ref)
// 清除Shuffle和Broadcast相关的数据会分别调用doCleanupShuffle和doCleanupBroadcast函数。根据需要清除数据的类型分别调用
ref.task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId)
}
}
}
} catch {
case ie: InterruptedException if stopped => // ignore
case e: Exception => logError("Error in cleaning thread", e)
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
RDD的清理
/** Perform RDD cleanup.
* 在ContextCleaner 中会调用RDD.unpersist()来清除已经持久化的RDD数据
* */
def doCleanupRDD(rddId: Int, blocking: Boolean): Unit = {
try {
logDebug("Cleaning RDD " + rddId)
// 被SparkContext的unpersistRDD方法
sc.unpersistRDD(rddId, blocking)
listeners.asScala.foreach(_.rddCleaned(rddId))
logInfo("Cleaned RDD " + rddId)
} catch {
case e: Exception => logError("Error cleaning RDD " + rddId, e)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
shuffle的清理
/** Perform shuffle cleanup.
*
* 清理Shuffle
* */
def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = {
try {
logDebug("Cleaning shuffle " + shuffleId)
// 把mapOutputTrackerMaster跟踪的shuffle数据不注册(具体做了什么,还没处理)
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
// 删除shuffle的块数据
blockManagerMaster.removeShuffle(shuffleId, blocking)
listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
logInfo("Cleaned shuffle " + shuffleId)
} catch {
case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
广播的清理
/** Perform broadcast cleanup.
* 清除广播
* */
def doCleanupBroadcast(broadcastId: Long, blocking: Boolean): Unit = {
try {
logDebug(s"Cleaning broadcast $broadcastId")
// 广播管理器 清除广播
broadcastManager.unbroadcast(broadcastId, true, blocking)
listeners.asScala.foreach(_.broadcastCleaned(broadcastId))
logDebug(s"Cleaned broadcast $broadcastId")
} catch {
case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
累加器的清理
/** Perform accumulator cleanup.
* 清除累加器
* */
def doCleanupAccum(accId: Long, blocking: Boolean): Unit = {
try {
logDebug("Cleaning accumulator " + accId)
AccumulatorContext.remove(accId)
listeners.asScala.foreach(_.accumCleaned(accId))
logInfo("Cleaned accumulator " + accId)
} catch {
case e: Exception => logError("Error cleaning accumulator " + accId, e)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
检查点的清理
/**
* Clean up checkpoint files written to a reliable storage.
* Locally checkpointed files are cleaned up separately through RDD cleanups.
*
* 清理记录到可靠存储的检查点文件。
* 局部检查点文件通过RDD清理被单独清理。
*/
def doCleanCheckpoint(rddId: Int): Unit = {
try {
logDebug("Cleaning rdd checkpoint data " + rddId)
// 这里直接调用文件系统删除 是本地 就本地删除,是hdfs就hdfs删除
ReliableRDDCheckpointData.cleanCheckpoint(sc, rddId)
listeners.asScala.foreach(_.checkpointCleaned(rddId))
logInfo("Cleaned rdd checkpoint data " + rddId)
}
catch {
case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
启动方法
在sparkContext中调用启动方法
_cleaner.foreach(_.start())
1
这里是启动方法
/** Start the cleaner.
* 开始清理
* */
def start(): Unit = {
// 设置清理线程为守护进程
cleaningThread.setDaemon(true)
// 设置守护进程的名称
cleaningThread.setName("Spark Context Cleaner")
// 启动守护进程
cleaningThread.start()
// scheduleAtFixedRate 在给定的初始延迟之后,并随后在给定的时间内,创建并执行一个已启用的周期操作
// periodicGCInterval=30分钟 也就是没=每过30分钟运行一次清理线程清理垃圾
periodicGCService.scheduleAtFixedRate(new Runnable {
// 执行系统的垃圾清理
override def run(): Unit = System.gc()
}, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
这里启动线程 // 启动守护进程 cleaningThread.start(),这里自我感觉一下,因为下面调用System.gc()是清理垃圾,所以这个cleaningThread线程应该是收集那些需要清理的数据,保存它的引用(引用就是一个地址,一个指针,指向要删除的数据),最后调用System.gc()方法,才真正清理。
结束
最后是关闭这个应用的时候,调用Stop()方法
/**
* Stop the cleaning thread and wait until the thread has finished running its current task.
* 停止清理线程并等待线程完成其当前任务。
*/
def stop(): Unit = {
stopped = true
// Interrupt the cleaning thread, but wait until the current task has finished before
// doing so. This guards against the race condition where a cleaning thread may
// potentially clean similarly named variables created by a different SparkContext,
// resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132).
// 中断清理线程,但等待当前任务完成后再执行。
// This guards against the race condition where a cleaning thread may
// potentially clean similarly named variables created by a different SparkContext,
// ,导致其他令人费解的块未发现异常(spark-6132)。
synchronized {
// 打断线程
cleaningThread.interrupt()
}
// 设置0 等待这个线程死掉
cleaningThread.join()
// 关闭垃圾清理
periodicGCService.shutdown()
}
spark学习:ContextCleaner清理器相关推荐
- spark的ContextCleaner清理
ContextCleaner是Spark中用来清理无用rdd,broadcast等数据的清理器,其主要用到的是java的weakReference弱引用来达成清理无用数据的目的. ContextCle ...
- Spark : ContextCleaner清理器
1.美图 Spark运行的时候,会产生一堆临时文件,临时数据,比如持久化的RDD数据在磁盘上,没有持久化的在内存中,比如shuffle的临时数据等,如果每次运行完,或者没有运行完杀掉了,不清理,会产生 ...
- Spark Cleaner 清理器
Spark Cleaner 清理器 功能概述 Cleaner的创建 清理逻辑 RDD的清理 Shuffle的清理 Broadcast的清理 Accum的清理 Checkpoint的清理 参考 功能概述 ...
- SparkContext的初始化(伯篇)——执行环境与元数据清理器
<深入理解Spark:核心思想与源码分析>一书前言的内容请看链接<深入理解SPARK:核心思想与源码分析>一书正式出版上市 <深入理解Spark:核心思想与源码分析> ...
- spark学习笔记!!!!
文章目录 1.spark概述 2.spark运行架构 2.1.核心组件 2.2.核心概念 2.3.Spark提交流程 3.spark核心编程 3.1.RDD 3.1.1.什么是RDD 3.1.2.RD ...
- spark学习-43-Spark的BlockManager
1.美图 1.简介: BlockManager负责对Block的管理,只有在BlockManager的痴实话方法initialize被调用之后,它才是有效的.Blockmanager作为存储系统的一部 ...
- spark 学习笔记
spark 学习笔记 spark介绍 Spark是是一种快速通用的集群计算系统,它的主要特点是能够在内存中进行计算.它包含了 spark 核心组件 spark-core,用于 SQL 和结构化处理数据 ...
- Spark学习之路一——Spark基础及环境搭建
Spark学习之路一--Spark基础及环境搭建 文章目录 一. Spark 概述 1.1 概述 1.2 优势特性 1.2.1 运行速度快 1.2.2 容易使用 1.2.3 通用性 1.2.4 运行模 ...
- jvm学习 Shenandoah垃圾收集器
系统学习请点击jvm学习目录 建议学习Shenandoah之前先学习G1垃圾收集器 前言 Shenandoah垃圾收集器是一个很有意思的垃圾收集器,它是第一款非Oracle公司开发的HotSpot垃圾 ...
最新文章
- java培训教程:什么是匿名内部类?怎样创建匿名内部类?
- openstack高可用方案
- PHP安装编译错误及解决办法
- Docker 核心概念、安装、端口映射及常用操作命令,详细到令人发指。
- Cocos2d—android 中常用的工具类
- 高露洁、悦诗风吟、Benefit,618大促的数字化难题都是如何解决的?
- CodeForces - 724C Ray Tracing(扩展欧几里得解方程)
- C#多线程编程系列(五)- C# ConcurrentBag的实现原理
- spring官方文档列表
- 在Win8.1系统下如何安装运行SQL Server 2005 (以及安装SQL Server 2005 Express打补丁)...
- python读取HDF文件
- java简易计算器程序框图_简易计算器程序设计思路及流程图
- and design 如何引入阿里图表矢量库 创建自定义icon
- I5 4590 台式机安装黑苹果最新版笔记
- 吾爱破解python百度文库下载源码_python版百度音乐下载软件和源码
- 文件服务器限制流量,盘点天翼云盘,限制虽紧依然堪用,几个使用小技巧
- 对于硬盘做了raid的Windows server 2016服务器重置密码
- freebsd的swatch安装和使用
- SF14 | Supertrend“超级趋势线”指标魔改升级(源码)
- 空光盘复制后到另外计算机无法读取,解决方法:无法读取计算机CD的解决方案...
热门文章
- 百度api语音识别一直“无内容”_PHP开发语音识别功能
- vue 左右滑动菜单_Vue实现左右菜单联动实现代码
- mysql 事物状态有几种_10分钟梳理MySQL核心知识点
- 搭建AWStats日志分析系统
- 大数据常见组件的访问页面总结
- rust布料怎么弄_布料“难弄”,你需要从这六方面解决!
- python怎样打开加密的文件_python基础教程如何用Python 加密文件
- phpstorm安装_快速打造自己的PHPStorm主题
- python中size的用法_在Python中PyArray_SIZE的正确用法是什么?
- 金古桥机器人_《泽塔奥特曼》奥特曼憋屈了,被机器人保护,金古桥可能才是主角...