目录

  • 前言
  • App状态数据的键值对存储
    • KVStore
    • InMemoryStore与InMemoryView
    • ElementTrackingStore
  • App状态监听器
    • LiveEntity
    • 添加清理触发器
    • 监听事件处理方法
  • 基于KVStore和监听器包装AppStatusStore
  • 总结

前言

AppStatusStore这个东西是在Spark 2.3.0版本才加入的,在Spark大家庭中是真正的新面孔。顾名思义,它用来存储Application的状态数据,Spark Web UI及REST API需要的数据都取自它。之前在写度量系统时,我曾经说Web UI的数据全部来自度量系统,这句话是错误的,深表歉意。

在本系列之前的文章中已经不止一次地提到了AppStatusStore,但都是几笔带过,从未认真分析,网上也没有前人发表过高见。本文是这个系列的第一篇番外,作为查漏补缺,今天就来稍微探索一下AppStatusStore的底层细节吧。

App状态数据的键值对存储

由前文的分析,我们已经知道AppStatusStore的构造依赖于两个要素:一为键值对存储KVStore,二为App状态监听器AppStatusListener。本节先来看KVStore,它位于o.a.s.util.kvstore包中,是一个Java接口,作为Spark内App状态数据键值对存储的基类。

KVStore

代码#A.1 - o.a.s.util.kvstore.KVStore接口

@Private
public interface KVStore extends Closeable {<T> T getMetadata(Class<T> klass) throws Exception;void setMetadata(Object value) throws Exception;<T> T read(Class<T> klass, Object naturalKey) throws Exception;void write(Object value) throws Exception;void delete(Class<?> type, Object naturalKey) throws Exception;<T> KVStoreView<T> view(Class<T> type) throws Exception;long count(Class<?> type) throws Exception;long count(Class<?> type, String index, Object indexedValue) throws Exception;
}

顾名思义,该接口定义的大多数方法都是非常常见的,比如read()、write()、delete()、count()等,也符合它作为键值对存储的定位。另外,它还会提供基本的元数据存取(get/setMetadata()方法)及视图操作(view()方法)。

根据其注释中的描述,KVStore的子类可以(不是必须)支持以下两个特性:

  • 序列化:如果数据需要序列化及压缩存储,可以利用定义好的KVStoreSerializer序列化器来做。
  • Key自动管理:KVStore可以自动利用各个类型的名字(也就是代码#A.1中的klass或者type)来作为键,与自然键(naturalKey)配合即可唯一确定一条记录。如果为写入KVStore的数据结构的字段加上@KVIndex注解的话,就会为这些字段创建索引,方便排序。

下图示出以KVStore接口为中心的类结构。

图#A.1 - KVStore的继承体系

其中,InMemoryStore是在内存中维护的键值对存储;LevelDB则是借助Google开源的KV数据库来实现,可以持久化到磁盘。ElementTrackingStore额外加上了跟踪元素个数的功能,可以根据元素个数阈值触发特定的操作,但它更多地是个包装类,需要依赖于InMemoryStore或者LevelDB。

AppStatusStore的实现依赖于InMemoryStore和ElementTrackingStore,下面分别来看。

InMemoryStore与InMemoryView

由于不需要持久化,该类只是用一个ConcurrentHashMap保存对象类型与对象列表的映射,即:
private ConcurrentMap<Class<?>, InstanceList> data = new ConcurrentHashMap<>();

对象列表由静态内部类InstanceList来实现。看官如果有兴趣,可以自行查看InstanceList的源码,其中涉及到较多Java反射方面的知识。以下则是InMemoryStore实现的read()、write()和view()方法。

代码#A.2 - o.a.s.util.kvstore.InMemoryStore.read()/write()/view()方法

  @Overridepublic <T> T read(Class<T> klass, Object naturalKey) {InstanceList list = data.get(klass);Object value = list != null ? list.get(naturalKey) : null;if (value == null) {throw new NoSuchElementException();}return klass.cast(value);}@Overridepublic void write(Object value) throws Exception {InstanceList list = data.computeIfAbsent(value.getClass(), key -> {try {return new InstanceList(key);} catch (Exception e) {throw Throwables.propagate(e);}});list.put(value);}@Overridepublic <T> KVStoreView<T> view(Class<T> type){InstanceList list = data.get(type);return list != null ? list.view(type): new InMemoryView<>(type, Collections.<T>emptyList(), null);}

read()和write()方法自不必多说,view()方法则是通过调用InstanceList.view()方法生成当前KVStore的某一个类型对应的视图InMemoryView。InMemoryView继承自抽象类KVStoreView,并且只能通过内部自定义的迭代器InMemoryIterator来访问。下面是获取InMemoryIterator的方法。

代码#A.3 - o.a.s.util.kvstore.InMemoryStore.InMemoryView.iterator()方法

    @Overridepublic Iterator<T> iterator() {if (elements.isEmpty()) {return new InMemoryIterator<>(elements.iterator());}try {KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : null;int modifier = ascending ? 1 : -1;final List<T> sorted = copyElements();Collections.sort(sorted, (e1, e2) -> modifier * compare(e1, e2, getter));Stream<T> stream = sorted.stream();if (first != null) {stream = stream.filter(e -> modifier * compare(e, getter, first) >= 0);}if (last != null) {stream = stream.filter(e -> modifier * compare(e, getter, last) <= 0);}if (skip > 0) {stream = stream.skip(skip);}if (max < sorted.size()) {stream = stream.limit((int) max);}return new InMemoryIterator<>(stream.iterator());} catch (Exception e) {throw Throwables.propagate(e);}}

由上可知,视图内的数据首先会被排序,然后利用Java Stream API做一些过滤的操作,最后返回代表有序数据的InMemoryIterator。在AppStatusStore中,从KVStore取数据经常会用到视图,这也就解释了为什么我们在Spark UI中看到的很多信息(比如Task、Stage等)都是已经排好序的。

ElementTrackingStore

ElementTrackingStore的初始化依赖于InMemoryStore,因此它的多数方法都是直接代理了InMemoryStore的方法。为了实现跟踪元素数并触发操作的功能,其内部维护了一个类型与触发器(通过内部样例类Trigger定义)的映射关系,添加触发器的方法如下。

代码#A.4 - o.a.s.status.ElementTrackingStore.addTrigger()方法

  def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): Unit = {val existing = triggers.getOrElse(klass, Seq())triggers(klass) = existing :+ Trigger(threshold, action)}

其中,threshold是一个整形值,表示对应类型元素个数的阈值。action是一个偏函数,表示到达阈值之后需要执行的操作。在其重载的write()方法中,我们能清楚地看到这个逻辑。

代码A.5 - o.a.s.status.ElementTrackingStore.write()方法

  def write(value: Any, checkTriggers: Boolean): Unit = {write(value)if (checkTriggers && !stopped) {triggers.get(value.getClass()).foreach { list =>doAsync {val count = store.count(value.getClass())list.foreach { t =>if (count > t.threshold) {t.action(count)}}}}}}

需要注意的是,可以通过checkTriggers参数来控制是否触发。另外,可以通过配置项spark.appStateStore.asyncTracking.enable设置是否异步触发操作(代码中的doAsync()方法),默认值为true。

通过以上分析,我们对App状态数据的键值对存储有了大致的了解。接下来轮到AppStatusListener了。

App状态监听器

AppStatusListener类继承自SparkListener类,因此实现了很多SparkListener中定义的监听事件处理方法。我们不急着研究这些方法,而先来看一些前置的东西。

LiveEntity

所谓LiveEntity,可以直译为“活动实体”,指的是那些在Application运行过程中状态在不断变化的Spark内部组件,比如Job、Stage、Task、Executor、RDD,它们会向ElementTrackingStore更新自己的状态数据。LiveEntity是一个抽象类,其定义如下,比较简单,不过多解释。

代码#A.6 - o.a.s.status.LiveEntity抽象类

private[spark] abstract class LiveEntity {var lastWriteTime = -1Ldef write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean = false): Unit = {store.write(doUpdate(), checkTriggers || lastWriteTime == -1L)lastWriteTime = now}protected def doUpdate(): Any
}

所有LiveEntity的实现类(比如LiveJob、LiveTask等)都包含了大量的监控信息和度量信息,监控信息来自AppStatusListener,度量信息来自MetricsSystem。并且它们都需要实现doUpdate()方法,该方法负责将LiveEntity最新的状况反映给ElementTrackingStore。在AppStatusListener类中,也预先定义了各个LiveEntity的缓存。

代码#A.7 - o.a.s.status.AppStatusListener中LiveEntity的缓存

  private val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]()private val liveJobs = new HashMap[Int, LiveJob]()private val liveExecutors = new HashMap[String, LiveExecutor]()private val liveTasks = new HashMap[Long, LiveTask]()private val liveRDDs = new HashMap[Int, LiveRDD]()

添加清理触发器

在AppStatusListener的构造方法中,首先就调用了ElementTrackingStore.addTrigger()方法添加触发器,代码如下。

代码#A.8 - o.a.s.status.AppStatusListener添加触发器

  kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS)){ count => cleanupExecutors(count) }kvstore.addTrigger(classOf[JobDataWrapper], conf.get(MAX_RETAINED_JOBS)) { count =>cleanupJobs(count)}kvstore.addTrigger(classOf[StageDataWrapper], conf.get(MAX_RETAINED_STAGES)) { count =>cleanupStages(count)}

这三个触发器分别用于在Job、Stage和Executor信息的数量超过SparkConf内规定的数值(名称均为spark.ui.retained***)时,将那些较旧的信息删除掉,Task信息则会随着Stage清除。以清理Job的cleanupJobs()方法为例,代码如下。

代码#A.9 - o.a.s.status.AppStatusListener.cleanupJobs()方法

  private def cleanupJobs(count: Long): Unit = {val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_JOBS))if (countToDelete <= 0L) {return}val view = kvstore.view(classOf[JobDataWrapper]).index("completionTime").first(0L)val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j =>j.info.status != JobExecutionStatus.RUNNING && j.info.status != JobExecutionStatus.UNKNOWN}toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) }}

其具体逻辑是:先计算出需要删除的记录数目,然后从KVStore中获取Job信息包装类JobDataWrapper对应的视图,并按照Job完成时间的索引排序。接下来取出要删除的Job记录(Job不能是在执行,也不能是未知状态),再调用KVStore.delete()方法删除之。

监听事件处理方法

由于AppStatusListener监听的事件甚多,所以我们只选取其中一个有代表性的onJobStart()方法来看一看。

代码#A.10 - o.a.s.status.AppStatusListener.onJobStart()方法

  override def onJobStart(event: SparkListenerJobStart): Unit = {val now = System.nanoTime()val numTasks = {val missingStages = event.stageInfos.filter(_.completionTime.isEmpty)missingStages.map(_.numTasks).sum}val lastStageInfo = event.stageInfos.sortBy(_.stageId).lastOptionval lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")val jobGroup = Option(event.properties).flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) }val job = new LiveJob(event.jobId,lastStageName,if (event.time > 0) Some(new Date(event.time)) else None,event.stageIds,jobGroup,numTasks)liveJobs.put(event.jobId, job)liveUpdate(job, now)event.stageInfos.foreach { stageInfo =>val stage = getOrCreateStage(stageInfo)stage.jobs :+= jobstage.jobIds += event.jobIdliveUpdate(stage, now)}event.stageInfos.foreach { stage =>val graph = RDDOperationGraph.makeOperationGraph(stage, maxGraphRootNodes)val uigraph = new RDDOperationGraphWrapper(stage.stageId,graph.edges,graph.outgoingEdges,graph.incomingEdges,newRDDOperationCluster(graph.rootCluster))kvstore.write(uigraph)}}

在接收到表示Job启动的SparkListenerJobStart事件后,该方法的大致流程如下:

  • 根据该Job的Stage信息,估算出一个大致的Task数目,获取其最后一个Stage的名称;
  • 封装一个LiveJob实例,将其放入缓存,并调用liveUpdate()方法向KVStore更新状态。liveUpdate()方法最终调用的就是LiveEntity.write()方法;
  • 调用getOrCreateStage()方法生成LiveStage实例,同样向KVStore更新状态;
  • 生成RDD的DAG表示,并写入KVStore中。

AppStatusListener监听的每个事件都会采用类似上面的逻辑来处理,将数据写入KVStore之后,就可以通过AppStatusStore将它们取出并且展示了。

基于KVStore和监听器包装AppStatusStore

有了存储数据的ElementTrackingStore和监听并写入数据的AppStatusListener,AppStatusStore的实现就会非常简单,只需要调用read()或view()从ElementTrackingStore中以一定的规则取出数据进行包装即可。

例如,在讲解SparkUI的文章#14中已经提到,构造EnvironmentPage时(参见代码清单#14.8),就会调用AppStatusStore.environmentInfo()方法。

代码#A.11 - o.a.s.status.AppStatusStore.environmentInfo()方法

  def environmentInfo(): v1.ApplicationEnvironmentInfo = {val klass = classOf[ApplicationEnvironmentInfoWrapper]store.read(klass, klass.getName()).info}

再举一例,如果要展示Stage相关的信息,就会调用stageData()方法,实现起来同样简单方便。

代码#A.12 - o.a.s.status.AppStatusStore.stageData()方法

  def stageData(stageId: Int, details: Boolean = false): Seq[v1.StageData] = {store.view(classOf[StageDataWrapper]).index("stageId").first(stageId).last(stageId).asScala.map { s =>if (details) stageWithDetails(s.info) else s.info}.toSeq}

总结

本文首先了解了Spark内的键值对存储KVStore的部分细节,探究了与AppStatusStore相关的具体实现类InMemoryStore与ElementTrackingStore。然后通过阅读监听器AppStatusListener的部分代码,明确了AppStatusStore内状态数据的来源。最后将两者结合在一起,简述了AppStatusStore是如何将数据反馈到前端的。

Spark Core源码精读计划 番外篇A:AppStatusStore的底层实现相关推荐

  1. clang static analyzer源码分析(番外篇):RegionStore以及evalCall()中的conservativeEvalCall

    引子 我们在上一篇文章<clang static analyzer源码分析(番外篇):evalCall()中的inline机制>中提及了clang如何创建CallGraph,如何进行函数i ...

  2. 30天搞定spark源码系列-RDD番外篇-shuffledRDD

    阅读这篇文章,你应该能得到这样几个问题的答案: 什么是spark的shuffle? 典型的shuffle类算子 spark shuffle在实战中的优化方向 shuffledRDD的基本流程和代码框架 ...

  3. Soul网关源码阅读番外篇(一) HTTP参数请求错误

    Soul网关源码阅读番外篇(一) HTTP参数请求错误 共同作者:石立 萧 * 简介     在Soul网关2.2.1版本源码阅读中,遇到了HTTP请求加上参数返回404的错误,此篇文章基于此进行探索 ...

  4. 番外篇——直流电机桥源码分析LED驱动例程开发

    [番外篇]直流电机桥源码分析&LED驱动例程开发 直流电机桥测试代码分析 直流电机桥驱动代码分析 仿写HBLED驱动程序 利用芯片手册修改设备树 利用原理图寻找引脚 仿写代码 仿写HBLED测 ...

  5. SparkSQL 之 Shuffle Join 内核原理及应用深度剖析-Spark商业源码实战

    本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客.版权声明:禁止转载,欢迎学习.QQ邮箱 ...

  6. 【源码阅读计划】浅析 Java 线程池工作原理及核心源码

    [源码阅读计划]浅析 Java 线程池工作原理及核心源码 为什么要用线程池? 线程池的设计 线程池如何维护自身状态? 线程池如何管理任务? execute函数执行过程(分配) getTask 函数(获 ...

  7. 03-做一个通读Vue源码的计划吧

    先梳理一下此时我的理解. src目录: core Vue的核心,专注于数据驱动,响应式 platform 各个平台的移植,负责模板的真正渲染工作 compiler 编译器,把字符串模板转换成rende ...

  8. tensorflow源码精读之graph

      本节介绍tensorflow中的graph,在c_api.cc中有创建graph的例子,我们从这个为切入点,探索graph的使用. 创建一个图   在c_api.cc中,创建graph的代码如下: ...

  9. spark最新源码下载并导入到开发环境下助推高质量代码(Scala IDEA for Eclipse和IntelliJ IDEA皆适用)(以spark2.2.0源码包为例)(图文详解)...

    不多说,直接上干货! 前言   其实啊,无论你是初学者还是具备了有一定spark编程经验,都需要对spark源码足够重视起来. 本人,肺腑之己见,想要成为大数据的大牛和顶尖专家,多结合源码和操练编程. ...

最新文章

  1. Android 取得 ListView中每个Item项目的值
  2. 如何定义和搭建可靠人工智能系统的规则?
  3. shell 读取文件
  4. 有趣又好玩的glm库
  5. 安装+wordpress+出现403+forbidden_教程篇 | WordPress网站搭建详细教程
  6. angularjs的$on、$emit、$broadcast
  7. HBase phoenix二级索引
  8. Hibernate- QBC-基本查询
  9. rxjs里merge operators的用法
  10. bufferedimage生成的图片模糊_Kaptcha图片验证码工具
  11. c语言加密shell脚本,shell脚本加密
  12. nexus+7+android+5.0++wifi+代理,谷歌Nexus5吃上安卓8.0:除了WiFi全不能正常工作
  13. java jsonobject date_如何将Json Passed Date Value分配给Java Date Object
  14. 开启TOGAF架构之路
  15. chinapub matlab,MATLAB 5手册
  16. 2021-08-08 解决“These dependencies were not found“的报错
  17. 四阶龙格库塔法的基本思想_经典四阶龙格库塔法解1阶微分方程组.doc
  18. CCNA考试题库中英文翻译版及答案10
  19. 易基因 | 转录组测序在原核生物研究中的应用(4)| 文献科普
  20. Email,电子邮箱免费注册流程

热门文章

  1. 【MindManager软件常用快捷键】Mindjet MindManager快捷键教程
  2. 机器学习及人工智能发展史
  3. android 定时更新banner图片,Android 用banner简单实现图片无限循环
  4. java.sql.Date和java.sql.Timestamp转换
  5. 中国高校鄙视链指南!
  6. CFS(完全公平调度)
  7. smb.conf 中文man页面(1)
  8. 北大教授:只剩下学术的生活是危险的
  9. VLOOKUP函数反向查找(需要用IF函数把数据源倒置一下)
  10. iPhone4 FaceTime功能激活和使用方法