BlockManagerMasterEndpoint主要用于向BlockManagerSlaveEndpoint发送消息,主要分析他们都接受哪些消息,接受到消息之后怎么处理?

一BlockManagerMasterEndpoint

首先它维护了3个重要映射:

维护一个<BlockManagerId,BlockManagerInfo>的映射

维护一个<ExecuotorId,BlockManagerId>的映射

维护一个<BlockId,Set<BlockManagerId>>映射,多个Block Manager Id包含这个blockId

1.1receiveAndReply接收消息

//接收消息并返回结果
override def receiveAndReply(context:RpcCallContext): PartialFunction[Any, Unit] = {
  // 注册BlockManager
 
case RegisterBlockManager(blockManagerId,maxMemSize, slaveEndpoint) =>
    register(blockManagerId,maxMemSize, slaveEndpoint)
    context.reply(true)
  // 更新block信息
 
case _updateBlockInfo@ UpdateBlockInfo(
    blockManagerId, blockId, storageLevel, deserializedSize,size, externalBlockStoreSize) =>
    context.reply(updateBlockInfo(
      blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize))
    listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
  // 根据blockId获取对应的所有BlockManagerId列表
 
case GetLocations(blockId) =>
    context.reply(getLocations(blockId))
  // 根据指定的blockId列表,返回多个blockId对应的BlockManagerId集合
 
case GetLocationsMultipleBlockIds(blockIds) =>
    context.reply(getLocationsMultipleBlockIds(blockIds))
 // 获取指定的blockManagerId是Executor的BlockManager,且不包括指定blockManagerId
 
case GetPeers(blockManagerId) =>
    context.reply(getPeers(blockManagerId))
  // 根据executorId获取RPC远程主机和端口号
 
case GetRpcHostPortForExecutor(executorId) =>
    context.reply(getRpcHostPortForExecutor(executorId))
  // 获取内存状态
 
case GetMemoryStatus=>
    context.reply(memoryStatus)
  // 获取存储状态
 
case GetStorageStatus=>
    context.reply(storageStatus)
  // 返回所有block manager的block状态
 
case GetBlockStatus(blockId,askSlaves) =>
    context.reply(blockStatus(blockId,askSlaves))
  // 获取与过滤条件相匹配的blockId
 
case GetMatchingBlockIds(filter,askSlaves) =>
    context.reply(getMatchingBlockIds(filter,askSlaves))
  // 删除指定rdd对应的所有blocks
 
case RemoveRdd(rddId) =>
    context.reply(removeRdd(rddId))
  // 删除该shuffle对应的所有block
 
case RemoveShuffle(shuffleId) =>
    context.reply(removeShuffle(shuffleId))
  // 删除广播数据对应的block
 
case RemoveBroadcast(broadcastId,removeFromDriver) =>
    context.reply(removeBroadcast(broadcastId,removeFromDriver))
  //  从worker节点(slave节点)删除对应block
 
case RemoveBlock(blockId) =>
    removeBlockFromWorkers(blockId)
    context.reply(true)
  // 试图从BlockManagerMaster移除掉这个Executor
 
case RemoveExecutor(execId) =>
    removeExecutor(execId)
    context.reply(true)
  // 停止StopBlockManagerMaster消息
 
case StopBlockManagerMaster=>
    context.reply(true)
    stop()
  // 发送BlockManager心跳检测消息
 
case BlockManagerHeartbeat(blockManagerId) =>
    context.reply(heartbeatReceived(blockManagerId))
  // 判断executorId对应的BlockManager是否有缓存的block
 
case HasCachedBlocks(executorId) =>
    blockManagerIdByExecutor.get(executorId)match {
      case Some(bm) =>
        if (blockManagerInfo.contains(bm)) {
          val bmInfo= blockManagerInfo(bm)
          context.reply(bmInfo.cachedBlocks.nonEmpty)
        } else {
          context.reply(false)
        }
      case None => context.reply(false)
    }
}

1.2removeRdd 删除该rdd对应的所有block

首先删除和该rdd相关的元数据信息;然后再向BlockManager从节点发送RemoveRdd进行具体的删除

private def removeRdd(rddId: Int): Future[Seq[Int]] = {// 将所有可以转化为rdd的blockId转化为rddId,然后过滤出和当前指定rddId相等的blocksval blocks = blockLocations.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)// 遍历和该rdd的blocks,从该block对应的BlockManager中删除该block// 并且blockLocations也要移除这个blockblocks.foreach { blockId =>val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))blockLocations.remove(blockId)}// 然后通过BlockManagerSlaveEndpoint向slave发送RemoveRdd消息val removeMsg = RemoveRdd(rddId)Future.sequence(blockManagerInfo.values.map { bm =>bm.slaveEndpoint.ask[Int](removeMsg)}.toSeq)
}

1.3removeShuffle

只是向slave发送RemoveShuffle消息,让slave去删除shuffle相关的block

private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {// 只是向slave发送RemoveShuffle消息,让slave去删除shuffle相关的blockval removeMsg = RemoveShuffle(shuffleId)Future.sequence(blockManagerInfo.values.map { bm =>bm.slaveEndpoint.ask[Boolean](removeMsg)}.toSeq)
}

1.4removeBlockManager 删除BlockManager

private def removeBlockManager(blockManagerId: BlockManagerId) {// 根据blockManaerId获取BlockInfoval info = blockManagerInfo(blockManagerId)// 从<ExecutorId,BlockManagerId>中移除diaper该block manager对应的executorIdblockManagerIdByExecutor -= blockManagerId.executorId// 从<BlockManagerId,BlockMangerInfo>中移除掉这个BlockManagerblockManagerInfo.remove(blockManagerId)// 遍历该BlockManager所对应的所有blockval iterator = info.blocks.keySet.iteratorwhile (iterator.hasNext) {// 获取每一个blockIdval blockId = iterator.next// 从<BlockId,Set<BlockManagerId>>映射中得到该block所对应的所有BlockManagerval locations = blockLocations.get(blockId)// 所有BlockManager中移除当前要移除的blockManagerIdlocations -= blockManagerId// 移除完了之后,Set<BlockManagerId>大小,如果没有数据了,则表示没有对应的// BlockManger与之对应,我们应该从<BlockId,Set<BlockManagerId>>移除这个blockIdif (locations.size == 0) {blockLocations.remove(blockId)}}listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))logInfo(s"Removing block manager $blockManagerId")
}

1.5removeBlockFromWorkers 从worker节点(slave节点)删除对应block

private def removeBlockFromWorkers(blockId: BlockId) {// 获取该block所在的那些BlockManagerId的列表val locations = blockLocations.get(blockId)if (locations != null) {// 遍历blockManagerId列表,然后获取每一个blockManagerId对应的BlockManager// 如果这个BlockManager已经定义,则向slave节点发送RemoveBlock消息locations.foreach { blockManagerId: BlockManagerId =>val blockManager = blockManagerInfo.get(blockManagerId)if (blockManager.isDefined) {blockManager.get.slaveEndpoint.ask[Boolean](RemoveBlock(blockId))}}}
}

1.6blockStatus 返回所有block manager的block状态

private def blockStatus(blockId: BlockId,askSlaves: Boolean): Map[BlockManagerId, Future[Option[BlockStatus]]] = {// 创建GetBlockStatus对象val getBlockStatus = GetBlockStatus(blockId)// 遍历注册过的BlockManagerInfo,如果需要向slave查询,则向BlockManagerSlaveEndpoint发送BlockStatus消息// 否则将返回结果封装Future中,最后将结果转化成Map[BlockManagerId, Future[Option[BlockStatus]]]blockManagerInfo.values.map { info =>val blockStatusFuture =if (askSlaves) {info.slaveEndpoint.ask[Option[BlockStatus]](getBlockStatus)} else {Future { info.getStatus(blockId) }}(info.blockManagerId, blockStatusFuture)}.toMap
}

1.7register 注册

private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) {val time = System.currentTimeMillis()// 如果还没有被注册if (!blockManagerInfo.contains(id)) {// 获取该executor对应的BlockManagerIdblockManagerIdByExecutor.get(id.executorId) match {// 但是该block对应的executor已经有对应的BlockManager,则表示是旧的BlockManager,则把该Executor删除掉case Some(oldId) =>logError("Got two different block manager registrations on same executor - "+ s" will replace old one $oldId with new one $id")// 从内存中移除该Executor以及Executor对应的BlockManagerremoveExecutor(id.executorId)case None =>}logInfo("Registering block manager %s with %s RAM, %s".format(id.hostPort, Utils.bytesToString(maxMemSize), id))// <ExecuotorId,BlockManagerId> 映射加入这个BlockManagerIdblockManagerIdByExecutor(id.executorId) = id// 创建BlockManagerInfo,加入到<BlockManagerId, BlockManagerInfo>blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)}listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
}

1.8updateBlockInfo 更新数据块信息

private def updateBlockInfo(blockManagerId: BlockManagerId,blockId: BlockId,storageLevel: StorageLevel,memSize: Long,diskSize: Long,externalBlockStoreSize: Long): Boolean = {// 如果该blockManagerId还没有注册,则返回if (!blockManagerInfo.contains(blockManagerId)) {// 如果blockManagerId是driver上的BlockManager而且又不在本地,意思就是这个BlockManager是其他节点的if (blockManagerId.isDriver && !isLocal) {// We intentionally do not register the master (except in local mode),// so we should not indicate failure.return true} else {return false}}// 如果没有block,也不用更新block,所以返回if (blockId == null) {blockManagerInfo(blockManagerId).updateLastSeenMs()return true}// 调用BlockManagerInfo的updateBlockInfo方法,更新blockblockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize, externalBlockStoreSize)var locations: mutable.HashSet[BlockManagerId] = null// 如果blockLocations包含blockId,则获取block对应的所有BlockManager集合,否则创建空的集合// 然后更新blockLocations集合if (blockLocations.containsKey(blockId)) {locations = blockLocations.get(blockId)} else {locations = new mutable.HashSet[BlockManagerId]blockLocations.put(blockId, locations)}// 存储级别有效,则向block对应的BlockManger集合里添加该blockManagerId// 如果无效,则移除之if (storageLevel.isValid) {locations.add(blockManagerId)} else {locations.remove(blockManagerId)}// 如果block对应的BlockManger集合为空,则没有BlockManager与之对应,则从blockLocations删除这个blockIdif (locations.size == 0) {blockLocations.remove(blockId)}true
}

1.9 getPeers 获取指定的blockManagerId是Executor的BlockManager,且不包括指定blockManagerId

private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {// 获取所有BlockManagerId集合val blockManagerIds = blockManagerInfo.keySet// 如果包含指定的blockManagerIdif (blockManagerIds.contains(blockManagerId)) {// 得到Executor的BlockManager,再得到和当前blockManagerId不相等的BlockMangerId集合blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq} else {Seq.empty}
}

二BlockManagerSlaveEndpoint

接收BlockManagerMasterEndpoint发送过来的指令,然后执行该指令

2.1 receiveAndReply接受消息

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {// 接收master发送过来的RemoveBlock消息case RemoveBlock(blockId) =>doAsync[Boolean]("removing block " + blockId, context) {// 调用BlockManager删除blockblockManager.removeBlock(blockId)true}// 接收master发送过来的RemoveRdd消息case RemoveRdd(rddId) =>doAsync[Int]("removing RDD " + rddId, context) {// 调用BlockManager删除rdd对应的blockblockManager.removeRdd(rddId)}// 接收master发送过来的RemoveShuffle消息case RemoveShuffle(shuffleId) =>doAsync[Boolean]("removing shuffle " + shuffleId, context) {// 首先需要调用MapOutputTracker取消shuffleId的注册的if (mapOutputTracker != null) {mapOutputTracker.unregisterShuffle(shuffleId)}// 删除shuffle的元数据SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)}// 接收master发送过来的RemoveBroadcast消息case RemoveBroadcast(broadcastId, _) =>doAsync[Int]("removing broadcast " + broadcastId, context) {// 调用BlockManagerd的removeBroadcastblockManager.removeBroadcast(broadcastId, tellMaster = true)}// 接收消息GetBlockStatus,调用blockManager的getStatuscase GetBlockStatus(blockId, _) =>context.reply(blockManager.getStatus(blockId))// 接收GetMatchingBlockIds消息调用blockManager的getMatchingBlockIds方法case GetMatchingBlockIds(filter, _) =>context.reply(blockManager.getMatchingBlockIds(filter))
}

Spark源码分析之BlockManager通信机制相关推荐

  1. Spark源码分析之BlockManager

    BlockManager是对外提供的统一访问block的接口,在Master和Slave上都有一个实例,他提供读写数据的方法,并且根据不同StorageLevel调用不同的BlockStore来读写数 ...

  2. Spark源码分析之Master注册机制原理

    一 Worker向Master注册 1.1 Worker启动,调用registerWithMaster,向Master注册 当worker启动的时候,会调用registerWithMaster方法 # ...

  3. spark 源码分析 Blockmanager

    原文链接 参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD  Spark的特点就是可以将RDD cache在memo ...

  4. spark 源码分析之十八 -- Spark存储体系剖析

    本篇文章主要剖析BlockManager相关的类以及总结Spark底层存储体系. 总述 先看 BlockManager相关类之间的关系如下: 我们从NettyRpcEnv 开始,做一下简单说明. Ne ...

  5. spark 源码分析之二十 -- Stage的提交

    引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...

  6. Spark源码分析之Sort-Based Shuffle读写流程

    一 概述 我们知道Spark Shuffle机制总共有三种: # 未优化的Hash Shuffle:每一个ShuffleMapTask都会为每一个ReducerTask创建一个单独的文件,总的文件数是 ...

  7. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  8. Spark 源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 一. 启动篇 (一) 引子 在spark-shell终端执行 val arr = Array(1,2,3,4) val rdd ...

  9. Spark源码分析之九:内存管理模型

    Spark是现在很流行的一个基于内存的分布式计算框架,既然是基于内存,那么自然而然的,内存的管理就是Spark存储管理的重中之重了.那么,Spark究竟采用什么样的内存管理模型呢?本文就为大家揭开Sp ...

最新文章

  1. all index range ref eq_ref const system 索引type说明
  2. addroutes刷新_vue解决addRoutes多次添加路由重复的操作方法
  3. 【转】Linux系统编程---dup和dup2详解
  4. springboot 1.5.2 集成kafka 简单例子
  5. activiti5.13 框架 数据库设计说明书
  6. 迭代器,lower_bound说明
  7. 接地气的大数据来了:如何预报雾霾
  8. 最新单片机毕业设计题目大全
  9. qt 批量裁剪图片_介绍一款全功能的图片查看编辑软件
  10. 有效破解行业难点 专家称区块链+医疗发展还需找准应用场景
  11. bilibili外链链接到网页
  12. 现代化蔬菜大棚采用什么和计算机自动控制,温室大棚自动控制系统设计开题报告.doc...
  13. SQL语言——联结表
  14. 树莓派救援机器人制作
  15. Java界面可以放GIF吗_如何在java窗体中插入gif图
  16. 一些opencv小工程(蓝白转换,进度条亮度对比度,鼠标位置读取rgb)
  17. 斐波那契(Fibonacci,意大利数学家,1170年-1240年)数列,又称黄金分割数列,指的是这样一个数列:0、1、1、2、3、5、8、13、21、……。这个数列从第三项开始,每一项都等于前两项之
  18. UG NX 12 NX 创意塑形
  19. Facebook广告组如何复制到其他广告系列中
  20. 【GIF屏幕录制工具介绍】LICEcap

热门文章

  1. mysql主从同步忽略一条错误_mysql主从同步出现异常语句跳过错误处理
  2. Mysql 扩展性设计之Replication,在Mysql具有很相当重要的位置,主从、主主从,你了解他们的背后逻辑吗
  3. k8s-service定义文件的各属性说明
  4. mysql三大范式_数据库的三大范式?
  5. 2011年ESRI北京站IDL视频教程(全)
  6. gm220s路由器怎么设置_巴法诺无线路由器连接打印机怎么设置【图文教程】
  7. python pandas处理无限值inf
  8. 解决pip无法安装bayes-opt报错:ERROR: Could not find a version that satisfies the requirement bayes-opt
  9. sklearn模型使用贝叶斯优化调参(以随机森林为例)
  10. python二维平面上依次得到(0,0)距离相等的点(x,y)坐标,并打印距离