主要负责整个应用程序在运行期间block元数据的管理和维护,以及向从节点发送指令执行命令。

一 核心属性

RpcEndpointRef: driverEndpointBlockManagerMasterEndpoint通信终端

Boolean isDriver: 是否在Driver

二 重要方法

2.1 从driver通信终端删除一个executor

defremoveExecutor(execId:String) {
  // 向BlockManagerMasterEndpoint发送RemoveExecutor消息
 
tell(RemoveExecutor(execId))
  logInfo("Removed "+ execId + " successfully in removeExecutor")
}

2.2 向BlockManagerMasterEndpoint发送RegisterBlockManager消息

def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = {logInfo("Trying to register BlockManager")// 向BlockManagerMasterEndpoint发送RegisterBlockManager消息tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))logInfo("Registered BlockManager")
}

2.3 更新数据块信息

def updateBlockInfo(blockManagerId: BlockManagerId,blockId: BlockId,storageLevel: StorageLevel,memSize: Long,diskSize: Long,externalBlockStoreSize: Long): Boolean = {// 向BlockManagerMasterEndpoint发送UpdateBlockInfo消息,并且返回结果val res = driverEndpoint.askWithRetry[Boolean](UpdateBlockInfo(blockManagerId, blockId, storageLevel,memSize, diskSize, externalBlockStoreSize))logDebug(s"Updated info of block $blockId")res
}

2.4 获取某个block的所有BlockManagerId信息

def getLocations(blockId: BlockId): Seq[BlockManagerId] = {// 向BlockManagerMasterEndpoint发送GetLocations消息driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetLocations(blockId))
}

2.5 获取多个block的位置信息

def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {// 向BlockManagerMasterEndpoint发送GetLocationsMultipleBlockIds消息driverEndpoint.askWithRetry[IndexedSeq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
}

2.6  检查是否存在该数据块

def contains(blockId: BlockId): Boolean = {!getLocations(blockId).isEmpty
}

2.7 获取Executor的非当前BlockManagerId

def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId))
}

2.8 向BlockManagerMasterEndpoint发送RemoveBlock消息删除数据块

def removeBlock(blockId: BlockId) {driverEndpoint.askWithRetry[Boolean](RemoveBlock(blockId))
}

2.9 删除指定RDD的所有数据块

def removeRdd(rddId: Int, blocking: Boolean) {val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId))future.onFailure {case e: Exception =>logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)}(ThreadUtils.sameThread)if (blocking) {timeout.awaitResult(future)}
}

2.10 删除指定shuffle的所有数据块

def removeShuffle(shuffleId: Int, blocking: Boolean) {val future = driverEndpoint.askWithRetry[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))future.onFailure {case e: Exception =>logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e)}(ThreadUtils.sameThread)if (blocking) {timeout.awaitResult(future)}
}
2.11 返回每一个block manager的内存状态
def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {driverEndpoint.askWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
}

2.12 返回存储状态

def getStorageStatus: Array[StorageStatus] = {driverEndpoint.askWithRetry[Array[StorageStatus]](GetStorageStatus)
}

2.13 获取block状态

def getBlockStatus(blockId: BlockId,askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = {val msg = GetBlockStatus(blockId, askSlaves)val response = driverEndpoint.askWithRetry[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)val (blockManagerIds, futures) = response.unzipimplicit val sameThread = ThreadUtils.sameThreadval cbf =implicitly[CanBuildFrom[Iterable[Future[Option[BlockStatus]]],Option[BlockStatus],Iterable[Option[BlockStatus]]]]val blockStatus = timeout.awaitResult(Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread))if (blockStatus == null) {throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)}blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) =>status.map { s => (blockManagerId, s) }}.toMap
}

2.14 找到那些executor有缓存的block

def hasCachedBlocks(executorId: String): Boolean = {driverEndpoint.askWithRetry[Boolean](HasCachedBlocks(executorId))
}

Spark源码分析之BlockManagerMaster相关推荐

  1. Spark 源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 一. 启动篇 (一) 引子 在spark-shell终端执行 val arr = Array(1,2,3,4) val rdd ...

  2. spark 源码分析 Blockmanager

    原文链接 参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD  Spark的特点就是可以将RDD cache在memo ...

  3. Spark源码分析 – DAGScheduler

    DAGScheduler的架构其实非常简单, 1. eventQueue, 所有需要DAGScheduler处理的事情都需要往eventQueue中发送event 2. eventLoop Threa ...

  4. spark 源码分析之十八 -- Spark存储体系剖析

    本篇文章主要剖析BlockManager相关的类以及总结Spark底层存储体系. 总述 先看 BlockManager相关类之间的关系如下: 我们从NettyRpcEnv 开始,做一下简单说明. Ne ...

  5. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  6. Spark源码分析之九:内存管理模型

    Spark是现在很流行的一个基于内存的分布式计算框架,既然是基于内存,那么自然而然的,内存的管理就是Spark存储管理的重中之重了.那么,Spark究竟采用什么样的内存管理模型呢?本文就为大家揭开Sp ...

  7. spark 源码分析之二十 -- Stage的提交

    引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...

  8. spark 源码分析之十九 -- DAG的生成和Stage的划分

    上篇文章 spark 源码分析之十八 -- Spark存储体系剖析 重点剖析了 Spark的存储体系.从本篇文章开始,剖析Spark作业的调度和计算体系. 在说DAG之前,先简单说一下RDD. 对RD ...

  9. Spark源码分析:多种部署方式之间的区别与联系

    作者:过往记忆 从官方的文档我们可以知道, Spark 的部署方式有很多种:local.Standalone.Mesos.YARN-..不同部署方式的后台处理进程是不一样的,但是如果我们从代码的角度来 ...

最新文章

  1. 直接在sublime中运行php
  2. Popup窗口在XP+SP2下面受到限制
  3. Java-Java I/O流解读之java.io.PrintStream java.io.PrintWriter
  4. Enterprise Library学习所得(一):总体概述
  5. http 二进制_浅谈HTTP协议
  6. C++ 运算符重载规则
  7. Python PIP Mysql-python 报错 ERROR: Command errored out with exit status 1: python setup.py egg_info C
  8. spark报错:invalid token
  9. Java文件上传实例并解决跨域问题
  10. 苹果电脑mysql_MacBook 安装 MySQL 5.7.29(新手都看得懂的安装教程)
  11. 单片机音乐盒c语言程序代码,基于单片机的八音盒电路原理图和完整程序源代码.doc...
  12. Mac 下解压Android NDK 的 .bin文件
  13. CI/CD---使用新版云效流水线自动部署Java项目
  14. 计算机显示器刷新率怎么调,电脑显示器刷新率怎么调
  15. 2021年西式面点师(初级)考试题及西式面点师(初级)
  16. 高数:微分中值定理介值定理证明题浅析
  17. ur机器人编程-程序流程
  18. IOS上传app store审核截图规格要求
  19. PM2.5污染物的空间地图分区统计到表(第二种)
  20. c语言笛卡尔坐标系两点坐标,计算笛卡尔坐标系或极坐标系中2个位置之间的夹角...

热门文章

  1. matlab 均值滤波_数字图像处理基础 — 高斯滤波
  2. android必看java_Android开发工程师必看笔试题:Java基础选择题(一)
  3. 2-2Pytorch1.5环境配置
  4. Python机器学习:梯度下降法003线性回归中的梯度下降法
  5. alidata mysql 卸载_mysql相关(一)、基本知识
  6. c6011取消对null指针的引用_COM编程攻略(二十二 IDL中的枚举,指针,数组)
  7. arm汇编指令WFI和WFE
  8. vim配置运行python3快捷键_Vim的Dokuwiki快捷键配置
  9. php 中间表统计,多对多中间表详解
  10. SpringBoot项目集成Mybatis Plus(一)多数据源配置