Spark源码分析之BlockManagerMaster
主要负责整个应用程序在运行期间block元数据的管理和维护,以及向从节点发送指令执行命令。
一 核心属性
RpcEndpointRef: driverEndpointBlockManagerMasterEndpoint通信终端
Boolean isDriver: 是否在Driver
二 重要方法
2.1 从driver通信终端删除一个executor
defremoveExecutor(execId:String) {
// 向BlockManagerMasterEndpoint发送RemoveExecutor消息
tell(RemoveExecutor(execId))
logInfo("Removed "+ execId + " successfully in removeExecutor")
}
2.2 向BlockManagerMasterEndpoint发送RegisterBlockManager消息
def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = {logInfo("Trying to register BlockManager")// 向BlockManagerMasterEndpoint发送RegisterBlockManager消息tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))logInfo("Registered BlockManager") }
2.3 更新数据块信息
def updateBlockInfo(blockManagerId: BlockManagerId,blockId: BlockId,storageLevel: StorageLevel,memSize: Long,diskSize: Long,externalBlockStoreSize: Long): Boolean = {// 向BlockManagerMasterEndpoint发送UpdateBlockInfo消息,并且返回结果val res = driverEndpoint.askWithRetry[Boolean](UpdateBlockInfo(blockManagerId, blockId, storageLevel,memSize, diskSize, externalBlockStoreSize))logDebug(s"Updated info of block $blockId")res }
2.4 获取某个block的所有BlockManagerId信息
def getLocations(blockId: BlockId): Seq[BlockManagerId] = {// 向BlockManagerMasterEndpoint发送GetLocations消息driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetLocations(blockId)) }
2.5 获取多个block的位置信息
def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {// 向BlockManagerMasterEndpoint发送GetLocationsMultipleBlockIds消息driverEndpoint.askWithRetry[IndexedSeq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds)) }
2.6 检查是否存在该数据块
def contains(blockId: BlockId): Boolean = {!getLocations(blockId).isEmpty }
2.7 获取Executor的非当前BlockManagerId
def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId)) }
2.8 向BlockManagerMasterEndpoint发送RemoveBlock消息删除数据块
def removeBlock(blockId: BlockId) {driverEndpoint.askWithRetry[Boolean](RemoveBlock(blockId)) }
2.9 删除指定RDD的所有数据块
def removeRdd(rddId: Int, blocking: Boolean) {val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId))future.onFailure {case e: Exception =>logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)}(ThreadUtils.sameThread)if (blocking) {timeout.awaitResult(future)} }
2.10 删除指定shuffle的所有数据块
def removeShuffle(shuffleId: Int, blocking: Boolean) {val future = driverEndpoint.askWithRetry[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))future.onFailure {case e: Exception =>logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e)}(ThreadUtils.sameThread)if (blocking) {timeout.awaitResult(future)} }
2.11 返回每一个block manager的内存状态 def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {driverEndpoint.askWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus) }
2.12 返回存储状态
def getStorageStatus: Array[StorageStatus] = {driverEndpoint.askWithRetry[Array[StorageStatus]](GetStorageStatus) }
2.13 获取block状态
def getBlockStatus(blockId: BlockId,askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = {val msg = GetBlockStatus(blockId, askSlaves)val response = driverEndpoint.askWithRetry[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)val (blockManagerIds, futures) = response.unzipimplicit val sameThread = ThreadUtils.sameThreadval cbf =implicitly[CanBuildFrom[Iterable[Future[Option[BlockStatus]]],Option[BlockStatus],Iterable[Option[BlockStatus]]]]val blockStatus = timeout.awaitResult(Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread))if (blockStatus == null) {throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)}blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) =>status.map { s => (blockManagerId, s) }}.toMap }
2.14 找到那些executor有缓存的block
def hasCachedBlocks(executorId: String): Boolean = {driverEndpoint.askWithRetry[Boolean](HasCachedBlocks(executorId)) }
Spark源码分析之BlockManagerMaster相关推荐
- Spark 源码分析
2019独角兽企业重金招聘Python工程师标准>>> 一. 启动篇 (一) 引子 在spark-shell终端执行 val arr = Array(1,2,3,4) val rdd ...
- spark 源码分析 Blockmanager
原文链接 参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD Spark的特点就是可以将RDD cache在memo ...
- Spark源码分析 – DAGScheduler
DAGScheduler的架构其实非常简单, 1. eventQueue, 所有需要DAGScheduler处理的事情都需要往eventQueue中发送event 2. eventLoop Threa ...
- spark 源码分析之十八 -- Spark存储体系剖析
本篇文章主要剖析BlockManager相关的类以及总结Spark底层存储体系. 总述 先看 BlockManager相关类之间的关系如下: 我们从NettyRpcEnv 开始,做一下简单说明. Ne ...
- Spark源码分析之七:Task运行(一)
在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...
- Spark源码分析之九:内存管理模型
Spark是现在很流行的一个基于内存的分布式计算框架,既然是基于内存,那么自然而然的,内存的管理就是Spark存储管理的重中之重了.那么,Spark究竟采用什么样的内存管理模型呢?本文就为大家揭开Sp ...
- spark 源码分析之二十 -- Stage的提交
引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...
- spark 源码分析之十九 -- DAG的生成和Stage的划分
上篇文章 spark 源码分析之十八 -- Spark存储体系剖析 重点剖析了 Spark的存储体系.从本篇文章开始,剖析Spark作业的调度和计算体系. 在说DAG之前,先简单说一下RDD. 对RD ...
- Spark源码分析:多种部署方式之间的区别与联系
作者:过往记忆 从官方的文档我们可以知道, Spark 的部署方式有很多种:local.Standalone.Mesos.YARN-..不同部署方式的后台处理进程是不一样的,但是如果我们从代码的角度来 ...
最新文章
- 直接在sublime中运行php
- Popup窗口在XP+SP2下面受到限制
- Java-Java I/O流解读之java.io.PrintStream java.io.PrintWriter
- Enterprise Library学习所得(一):总体概述
- http 二进制_浅谈HTTP协议
- C++ 运算符重载规则
- Python PIP Mysql-python 报错 ERROR: Command errored out with exit status 1: python setup.py egg_info C
- spark报错:invalid token
- Java文件上传实例并解决跨域问题
- 苹果电脑mysql_MacBook 安装 MySQL 5.7.29(新手都看得懂的安装教程)
- 单片机音乐盒c语言程序代码,基于单片机的八音盒电路原理图和完整程序源代码.doc...
- Mac 下解压Android NDK 的 .bin文件
- CI/CD---使用新版云效流水线自动部署Java项目
- 计算机显示器刷新率怎么调,电脑显示器刷新率怎么调
- 2021年西式面点师(初级)考试题及西式面点师(初级)
- 高数:微分中值定理介值定理证明题浅析
- ur机器人编程-程序流程
- IOS上传app store审核截图规格要求
- PM2.5污染物的空间地图分区统计到表(第二种)
- c语言笛卡尔坐标系两点坐标,计算笛卡尔坐标系或极坐标系中2个位置之间的夹角...
热门文章
- matlab 均值滤波_数字图像处理基础 — 高斯滤波
- android必看java_Android开发工程师必看笔试题:Java基础选择题(一)
- 2-2Pytorch1.5环境配置
- Python机器学习:梯度下降法003线性回归中的梯度下降法
- alidata mysql 卸载_mysql相关(一)、基本知识
- c6011取消对null指针的引用_COM编程攻略(二十二 IDL中的枚举,指针,数组)
- arm汇编指令WFI和WFE
- vim配置运行python3快捷键_Vim的Dokuwiki快捷键配置
- php 中间表统计,多对多中间表详解
- SpringBoot项目集成Mybatis Plus(一)多数据源配置