BlockManager是对外提供的统一访问block的接口,在Master和Slave上都有一个实例,他提供读写数据的方法,并且根据不同StorageLevel调用不同的BlockStore来读写数据。

在应用程序启动的时候,SparkContext会创建Driver端的SpakEnv,在该SparkEnv中实例化BlockManager和BlockManagerMaster,在其内部创建消息通信的终端BlockManagerMasterEndpoint.

在Executor启动的时候,也会创建其SparkEnv,在该SparkEnv中实例化BlockManager和BlockTransferService. 在BlockManager初始化的过程一方面会加入BlockManagerSlaveEndpoint的消息终端,并把该BlockManagerSlaveEndpoint的该终端引用注册到Driver中,这样Driver和Executor就可以相互持有通信终端的引用

// 创建BlockManagerMaster
val blockManagerMaster= new BlockManagerMaster(registerOrLookupEndpoint(
  BlockManagerMaster.DRIVER_ENDPOINT_NAME,
  // 创建BlockManagerMasterEndpoint
 
new BlockManagerMasterEndpoint(rpcEnv,isLocal, conf,listenerBus)),
  conf, isDriver)

// NB: blockManager is not valid until initialize() is called later.
// 创建BlockManager
val blockManager= new BlockManager(executorId,rpcEnv, blockManagerMaster,
  serializer, conf,mapOutputTracker, shuffleManager, blockTransferService, securityManager,
  numUsableCores)

一 核心属性

String executorId: executorId或者是driverId

RpcEnv rpcEnv: rpc通信环境

BlockManagerMaster master: 主要负责整个应用程序在运行期间block元数据的管理和维护,和指令的发送

Serializer defaultSerializer: 默认的序列化机制

Long maxMemory:分配的最大可用内存

Int numUsableCores: 可以使用cpu核数

MapOutputTracker mapOutputTracker:map端shuffle过程的输出状态

ShuffleManager shuffleManager: Shuffle管理器

BlockTransferService blockTransferService: 用于远程间传输数据,用于获取上床block

DiskBlockManager diskBlockManager:主要用于管理磁盘上block以及对应文件和目录

TimeStampedHashMap[BlockId, BlockInfo] blockInfo: 构建一个维护<blockId,blockInfo>的映射

Boolean externalBlockStoreInitialized: 是否使用外部存储

MemoryStore memoryStore:内存存储对象

DiskStore diskStore:磁盘存储对象

ExternalBlockStore externalBlockStore:外部寸处对象

BlockManagerId blockManagerId:当前BlockManager对应的id

ShuffleClient shuffleClient:读取其他executor上的shuffle 文件的客户端,可能是外部服务也有可能是标准的数据块传输服务,如果启用了如果启用外部shuffle 服务,则创建ExternalShuffleClient否则创建 BlockTransferService

Boolean compressBroadcast:是否对广播数据进行压缩

Boolean compressShuffle:是否压缩map输出文件,一般建议打开,但是如果cpu资源消耗太大,则不建议设置为true

Boolean compressRdds:是否要压缩序列化的RDD分区

Boolean compressShuffleSpill:是否对map端溢写的临时文件进行压缩

BlockManagerSlaveEndpoint slaveEndpoint: 持有的BlockManagerSlaveEndpoint通信终端

二 重要方法

2.1initialize 初始化方法,用指定的application id 实例化BlockManager

# 初始化BlockTransferService,构建rpc server等,BlockTransferService主要是用于跨节点传输数据

# 初始化ShuffleClient,读取其他executor上的shuffle文件的客户端

# 构建blockManagerId

# 构建shuffleServerId

# 向BlockManagerMaster注册BlockManager

# 如果外部Shuffle服务启动并且为Executor节点,则注册外部Shuffle服务

def initialize(appId: String): Unit = {// 初始化BlockTransferService,构建rpc server等,BlockTransferService主要是用于跨节点传输数据blockTransferService.init(this)// 初始化ShuffleClient,读取其他executor上的shuffle文件的客户端shuffleClient.init(appId)// 构建blockManagerIdblockManagerId = BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port)// 构建shuffleServerIdshuffleServerId = if (externalShuffleServiceEnabled) {BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)} else {blockManagerId}// 向BlockManagerMaster注册BlockManager,传入了slaveEndpoint,用于和BlockManagerMaster通信master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)// 如果外部Shuffle服务启动并且为Executor节点,则注册外部Shuffle服务if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {registerWithExternalShuffleServer()}
}

2.2reportAllBlocks 报告BlockManager所有的数据块

private def reportAllBlocks(): Unit = {logInfo(s"Reporting ${blockInfo.size} blocks to the master.")for ((blockId, info) <- blockInfo) {// 获取每一个block当前的状态val status = getCurrentBlockStatus(blockId, info)if (!tryToReportBlockStatus(blockId, info, status)) {logError(s"Failed to report $blockId to master; giving up.")return}}
}
private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = {info.synchronized {info.level match {case null =>BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)case level =>val inMem = level.useMemory && memoryStore.contains(blockId)val inExternalBlockStore = level.useOffHeap && externalBlockStore.contains(blockId)val onDisk = level.useDisk && diskStore.contains(blockId)val deserialized = if (inMem) level.deserialized else falseval replication = if (inMem || inExternalBlockStore || onDisk) level.replication else 1val storageLevel =StorageLevel(onDisk, inMem, inExternalBlockStore, deserialized, replication)val memSize = if (inMem) memoryStore.getSize(blockId) else 0Lval externalBlockStoreSize =if (inExternalBlockStore) externalBlockStore.getSize(blockId) else 0Lval diskSize = if (onDisk) diskStore.getSize(blockId) else 0LBlockStatus(storageLevel, memSize, diskSize, externalBlockStoreSize)}}
}

2.3reregister 注册BlockManager,并且报告BlockManager所有数据块的状态

def reregister(): Unit = {// TODO: We might need to rate limit re-registering.logInfo("BlockManager re-registering with master")master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)reportAllBlocks()
}

2.4getBlockData 获取本地数据块数据

# 如果是shuffle的数据块,则通过ShuffleBlockResolver获取数据块,否则调用doGetLocal从本地获取

# 结果封装成buffer,然后创建NioManagedBuffer,然后返回

override def getBlockData(blockId: BlockId): ManagedBuffer = {// 首先判断是不是shuffle的数据块if (blockId.isShuffle) {// 如果是shuffle的数据块,则通过ShuffleBlockResolver获取数据块shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])} else {// 获取本地block数据val blockBytesOpt = doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]if (blockBytesOpt.isDefined) {// 获取buffer,然后创建NioManagedBufferval buffer = blockBytesOpt.getnew NioManagedBuffer(buffer)} else {throw new BlockNotFoundException(blockId.toString)}}
}

# doGetLocal:根据指定的blockId获取本地block数据,如果是存在内存上的直接从内存获取;如果是存储在磁盘上的则从磁盘获取,如果是MEMORY_AND_DISK,先放入内存,再返回数据,下次就可以从内存获取,否则直接返回等

private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {// 根据blockId获取blockInfoval info = blockInfo.get(blockId).orNull// 如果blockInfo不为空if (info != null) {info.synchronized {// 再次检查blockInfo是否为空if (blockInfo.get(blockId).isEmpty) {logWarning(s"Block $blockId had been removed")return None}// 检查该block是否其他线程正在写,等待该block变为ready状态if (!info.waitForReady()) {// If we get here, the block write failed.logWarning(s"Block $blockId was marked as failure.")return None}// 获取该Block的存储级别val level = info.levellogDebug(s"Level for block $blockId is $level")// 如果存储级别是内存,则查找MemoryStore返回数据if (level.useMemory) {logDebug(s"Getting block $blockId from memory")val result = if (asBlockResult) {memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))} else {memoryStore.getBytes(blockId)}result match {case Some(values) =>return resultcase None =>logDebug(s"Block $blockId not found in memory")}}// 如果使用堆外存储,则查找ExternalBlockStore返回数据if (level.useOffHeap) {logDebug(s"Getting block $blockId from ExternalBlockStore")if (externalBlockStore.contains(blockId)) {val result = if (asBlockResult) {externalBlockStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))} else {externalBlockStore.getBytes(blockId)}result match {case Some(values) =>return resultcase None =>logDebug(s"Block $blockId not found in ExternalBlockStore")}}}// 如果使用磁盘存储,则查找DiskStore返回数据if (level.useDisk) {logDebug(s"Getting block $blockId from disk")val bytes: ByteBuffer = diskStore.getBytes(blockId) match {case Some(b) => bcase None =>throw new BlockException(blockId, s"Block $blockId not found on disk, though it should be")}assert(0 == bytes.position())// 如果不能使用内存存储,则直接返回结果if (!level.useMemory) {// If the block shouldn't be stored in memory, we can just return itif (asBlockResult) {return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,info.size))} else {return Some(bytes)}} else {// 若可以存入内存,将查询出来的数据放入内存,这样下次再查找该block数据直接从内存获取,以提高速度if (!level.deserialized || !asBlockResult) {memoryStore.putBytes(blockId, bytes.limit, () => {val copyForMemory = ByteBuffer.allocate(bytes.limit)// 如果内存放不下该block数据,将会发生OOM,因此放入ByteBuffer并且懒加载copyForMemory.put(bytes)})bytes.rewind()}if (!asBlockResult) {return Some(bytes)} else {// 如果想返回BlockResult对象// 将字节数据数据反序列化val values = dataDeserialize(blockId, bytes)// 如果存储级别是反序列化if (level.deserialized) {// 再返回之前放入内存缓存val putResult = memoryStore.putIterator(blockId, values, level, returnValues = true, allowPersistToDisk = false)putResult.data match {case Left(it) =>return Some(new BlockResult(it, DataReadMethod.Disk, info.size))case _ =>// This only happens if we dropped the values back to disk (which is never)throw new SparkException("Memory store did not return an iterator!")}} else {return Some(new BlockResult(values, DataReadMethod.Disk, info.size))}}}}}} else {logDebug(s"Block $blockId not registered locally")}None
}

2.5getLocal 从本地BlockManager获取数据

def getLocal(blockId: BlockId): Option[BlockResult] = {logDebug(s"Getting local block $blockId")doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
}

2.6getLocalgetLocalBytes从本地BlockManager获取数据并且序列化结果

def getLocalBytes(blockId: BlockId): Option[ByteBuffer] = {logDebug(s"Getting local block $blockId as bytes")
if (blockId.isShuffle) {val shuffleBlockResolver = shuffleManager.shuffleBlockResolver// TODO: This should gracefully handle case where local block is not available. Currently// downstream code will throw an exception.Option(shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())} else {doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]}
}

2.7doGetRemote  从远端获取数据

private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {require(blockId != null, "BlockId is null")// 从blockId对应的BlockManagerId,然后打乱顺序val locations = Random.shuffle(master.getLocations(blockId))// 遍历每一个BlockManagerId,调用blockTransferService的fetchBlockSync去拉取数据,返回for (loc <- locations) {logDebug(s"Getting remote block $blockId from $loc")val data = blockTransferService.fetchBlockSync(loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()if (data != null) {if (asBlockResult) {return Some(new BlockResult(dataDeserialize(blockId, data),DataReadMethod.Network,data.limit()))} else {return Some(data)}}logDebug(s"The value of block $blockId is null")}logDebug(s"Block $blockId not found")None
}

2.8doPut根据StorageLevel存放数据

private def doPut(blockId: BlockId, data: BlockValues,level: StorageLevel, tellMaster: Boolean = true,effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {// 检查BlockId,StorageLevel是否为空require(blockId != null, "BlockId is null")require(level != null && level.isValid, "StorageLevel is null or invalid")effectiveStorageLevel.foreach { level =>require(level != null && level.isValid, "Effective StorageLevel is null or invalid")}// 新建数组,存放blockId和block状态val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]// 获取BlockInfoval putBlockInfo = {// 创建BlockInfo对象val tinfo = new BlockInfo(level, tellMaster)// 将该blockId和创建BlockInfo对象放入内存,并且返回更新key之对应的value之前的value,即// 如果存在该key对应的value,则返回,如果没有则直接存入,返回为空,即不能进行更新val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)// 如果已经有旧的BlockInfo,则判断是否正准备写,如果准备些则直接返回更新的(BlockId, BlockStatus)数组if (oldBlockOpt.isDefined) {if (oldBlockOpt.get.waitForReady()) {logWarning(s"Block $blockId already exists on this machine; not re-adding it")return updatedBlocks}oldBlockOpt.get} else {tinfo // 否则直接返回blockInfo}}val startTimeMs = System.currentTimeMillisvar valuesAfterPut: Iterator[Any] = nullvar bytesAfterPut: ByteBuffer = null// 数据块大小var size = 0L// 存储的级别val putLevel = effectiveStorageLevel.getOrElse(level)// 启动一个线程在本地存储之前,异步初始化好要进行的备份的数据,这有助于提高发送数据的速度val replicationFuture = data match {case b: ByteBufferValues if putLevel.replication > 1 =>// 复制出一个新的bufferval bufferView = b.buffer.duplicate()Future {// 拷贝数据到其他节点replicate(blockId, bufferView, putLevel)}(futureExecutionContext)case _ => null}// 防止其他线程put这个block,所以需要使用同步操作指导marked置为trueputBlockInfo.synchronized {logTrace("Put for block %s took %s to get into synchronized block".format(blockId, Utils.getUsedTimeMs(startTimeMs)))var marked = falsetry {// returnValues:是否返回put操作的值// blockStore:存储方式val (returnValues, blockStore: BlockStore) = {// 内存存储返回true和MemoryStoreif (putLevel.useMemory) {(true, memoryStore)} else if (putLevel.useOffHeap) {//对外存储返回false和ExternalBlockStore(false, externalBlockStore)} else if (putLevel.useDisk) {// 磁盘存储,如果复制因子大于1返回true和DiskStore,否则返回false和DiskStore(putLevel.replication > 1, diskStore)} else {assert(putLevel == StorageLevel.NONE)throw new BlockException(blockId, s"Attempted to put block $blockId without specifying storage level!")}}// 匹配put操作存入的值类型,调用blockStore方法val result = data match {// 可迭代值类型,调用BlockStore的putIterator方法case IteratorValues(iterator) =>blockStore.putIterator(blockId, iterator, putLevel, returnValues)// 数组类型,调用BlockStore的putArray方法case ArrayValues(array) =>blockStore.putArray(blockId, array, putLevel, returnValues)// ByteBufferValues类型,调用BlockStore的putBytes方法case ByteBufferValues(bytes) =>bytes.rewind()blockStore.putBytes(blockId, bytes, putLevel)}// 获取结果大小size = result.sizeresult.data match {case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIteratorcase Right (newBytes) => bytesAfterPut = newBytescase _ =>}// 如果使用内存存储,遍历result结果中的droppedBlocks,将溢出到磁盘的block添加到updateBlock集合中if (putLevel.useMemory) {result.droppedBlocks.foreach { updatedBlocks += _ }}// 获取当前block状态val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)if (putBlockStatus.storageLevel != StorageLevel.NONE) {// 将marked标记置为truemarked = true// 调用blockInfo的markReady标记该block写操作完成putBlockInfo.markReady(size)// 向Master汇报block状态if (tellMaster) {reportBlockStatus(blockId, putBlockInfo, putBlockStatus)}// 更新updatedBlocksupdatedBlocks += ((blockId, putBlockStatus))}} finally {// If we failed in putting the block to memory/disk, notify other possible readers// that it has failed, and then remove it from the block info map.if (!marked) {// Note that the remove must happen before markFailure otherwise another thread// could've inserted a new BlockInfo before we remove it.blockInfo.remove(blockId)putBlockInfo.markFailure()logWarning(s"Putting block $blockId failed")}}}logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))// 如果复制因子大于1,开始异步复制数据if (putLevel.replication > 1) {data match {case ByteBufferValues(bytes) =>if (replicationFuture != null) {Await.ready(replicationFuture, Duration.Inf)}case _ =>val remoteStartTime = System.currentTimeMillis// Serialize the block if not already doneif (bytesAfterPut == null) {if (valuesAfterPut == null) {throw new SparkException("Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")}bytesAfterPut = dataSerialize(blockId, valuesAfterPut)}replicate(blockId, bytesAfterPut, putLevel)logDebug("Put block %s remotely took %s".format(blockId, Utils.getUsedTimeMs(remoteStartTime)))}}// 如果是基于内存映射的,则开始清理ByteBufferBlockManager.dispose(bytesAfterPut)if (putLevel.replication > 1) {logDebug("Putting block %s with replication took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))} else {logDebug("Putting block %s without replication took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))}updatedBlocks
}

2.9getPeers 获取集群中非当前BlockManagerId和非Driver端的BlockManagerId的所有BlockManagerId

private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {peerFetchLock.synchronized {val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // millisecondsval timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtlif (cachedPeers == null || forceFetch || timeout) {cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)lastPeerFetchTime = System.currentTimeMillislogDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))}cachedPeers}
}

2.10replicate 复制数据块到其他节点

private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {// 所允许最大复制失败次数val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)// 需要复制到其他的BlockManagerId数量val numPeersToReplicateTo = level.replication - 1// 需要被复制的BlockManagerId数组val peersForReplication = new ArrayBuffer[BlockManagerId]// 需要复制到其他的BlockManagerId数组val peersReplicatedTo = new ArrayBuffer[BlockManagerId]// 复制失败的BlockManagerId数组val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]// 创建存储级别val tLevel = StorageLevel(level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)val startTime = System.currentTimeMillisval random = new Random(blockId.hashCode)var replicationFailed = falsevar failures = 0var done = false// 获取Executor非当前BlockManagerId集合peersForReplication ++= getPeers(forceFetch = false)// 获取一个随机的BlockManagerIddef getRandomPeer(): Option[BlockManagerId] = {// 如果复制失败if (replicationFailed) {// 清理数组peersForReplication.clear()// 把获取Executor非当前BlockManagerId集合放入该集合peersForReplication ++= getPeers(forceFetch = true)// 移除掉需要复制到那个BlockManagerIdpeersForReplication --= peersReplicatedTo// 移除掉失败复制到那个BlockManagerIdpeersForReplication --= peersFailedToReplicateTo}// 如果需要被复制的BlockManagerId集合部位空,则则随机取出一个BlockManagerIdif (!peersForReplication.isEmpty) {Some(peersForReplication(random.nextInt(peersForReplication.size)))} else {None}}// 如果复制还没完成,则不断地循环while (!done) {// 获取随机的一个BlockManagerIdgetRandomPeer() match {case Some(peer) =>try {val onePeerStartTime = System.currentTimeMillisdata.rewind()logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")// 调用BlockTransferService的uploadBlockSync方法,同步上床blockblockTransferService.uploadBlockSync(peer.host, peer.port, peer.executorId, blockId, new NioManagedBuffer(data), tLevel)logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %s ms".format(System.currentTimeMillis - onePeerStartTime))// 更新复制到的BlockManagerId的集合peersReplicatedTo += peer// 被复制到的BlockManagerId的集合移除掉这个BlockManagerId,避免下一次复制到一个BlockManagerpeersForReplication -= peerreplicationFailed = false// 如果已经复制的数量等于需要需要复制的数量,则表示复制完成if (peersReplicatedTo.size == numPeersToReplicateTo) {done = true}} catch {case e: Exception =>logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)failures += 1replicationFailed = truepeersFailedToReplicateTo += peerif (failures > maxReplicationFailures) { // too many failures in replcating to peersdone = true}}case None => // no peer left to replicate todone = true}}val timeTakeMs = (System.currentTimeMillis - startTime)logDebug(s"Replicating $blockId of ${data.limit()} bytes to " +s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")if (peersReplicatedTo.size < numPeersToReplicateTo) {logWarning(s"Block $blockId replicated to only " +s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")}
}

2.11dropFromMemory 从内存中放弃某一个block,可能内存已经满了,放在磁盘比较合适等

def dropFromMemory(blockId: BlockId,data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {logInfo(s"Dropping block $blockId from memory")// 获取BlockInfoval info = blockInfo.get(blockId).orNull// If the block has not already been droppedif (info != null) {info.synchronized {if (!info.waitForReady()) {logWarning(s"Block $blockId was marked as failure. Nothing to drop")return None} else if (blockInfo.get(blockId).isEmpty) {logWarning(s"Block $blockId was already dropped.")return None}var blockIsUpdated = falseval level = info.level// 存储级别是磁盘且磁盘不包括这个block,则存入磁盘if (level.useDisk && !diskStore.contains(blockId)) {logInfo(s"Writing block $blockId to disk")data() match {case Left(elements) =>diskStore.putArray(blockId, elements, level, returnValues = false)case Right(bytes) =>diskStore.putBytes(blockId, bytes, level)}blockIsUpdated = true}// 如果内存包含这个blockId,则获取这个blocl大小val droppedMemorySize =if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L// MemoryStore移除这个blockval blockIsRemoved = memoryStore.remove(blockId)if (blockIsRemoved) {blockIsUpdated = true} else {logWarning(s"Block $blockId could not be dropped from memory as it does not exist")}// 重新获取当前block状态val status = getCurrentBlockStatus(blockId, info)if (info.tellMaster) {// 向master报告数据块状态reportBlockStatus(blockId, info, status, droppedMemorySize)}// 如果存储级别不是磁盘,则<blockId.,blockInfo>移除这个blockIdif (!level.useDisk) {// The block is completely gone from this node; forget it so we can put() it again later.blockInfo.remove(blockId)}if (blockIsUpdated) {return Some(status)}}}None
}

2.12removeRdd 删除所有数据当前RDD的数据块

def removeRdd(rddId: Int): Int = {// TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks.logInfo(s"Removing RDD $rddId")val blocksToRemove = blockInfo.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }blocksToRemove.size
}

2.13removeBlock 从内存和磁盘中移除block

def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {logDebug(s"Removing block $blockId")// 根据blockId获取blockInfoval info = blockInfo.get(blockId).orNullif (info != null) {info.synchronized {// 内存移除这个blockval removedFromMemory = memoryStore.remove(blockId)// 磁盘移除这个blockval removedFromDisk = diskStore.remove(blockId)// ExternalBlockStore移除这个blockval removedFromExternalBlockStore =if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else falseif (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {logWarning(s"Block $blockId could not be removed as it was not found in either " +"the disk, memory, or external block store")}// <blockId,blockInfo>开始移除这个blockIdblockInfo.remove(blockId)// 如果需要,则向master报告数据块状态if (tellMaster && info.tellMaster) {val status = getCurrentBlockStatus(blockId, info)reportBlockStatus(blockId, info, status)}}} else {// The block has already been removed; do nothing.logWarning(s"Asked to remove block $blockId, which does not exist")}
}

Spark源码分析之BlockManager相关推荐

  1. Spark源码分析之BlockManager通信机制

    BlockManagerMasterEndpoint主要用于向BlockManagerSlaveEndpoint发送消息,主要分析他们都接受哪些消息,接受到消息之后怎么处理? 一BlockManage ...

  2. spark 源码分析 Blockmanager

    原文链接 参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD  Spark的特点就是可以将RDD cache在memo ...

  3. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  4. Spark 源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 一. 启动篇 (一) 引子 在spark-shell终端执行 val arr = Array(1,2,3,4) val rdd ...

  5. Spark源码分析之九:内存管理模型

    Spark是现在很流行的一个基于内存的分布式计算框架,既然是基于内存,那么自然而然的,内存的管理就是Spark存储管理的重中之重了.那么,Spark究竟采用什么样的内存管理模型呢?本文就为大家揭开Sp ...

  6. Spark源码分析 – DAGScheduler

    DAGScheduler的架构其实非常简单, 1. eventQueue, 所有需要DAGScheduler处理的事情都需要往eventQueue中发送event 2. eventLoop Threa ...

  7. spark 源码分析之十八 -- Spark存储体系剖析

    本篇文章主要剖析BlockManager相关的类以及总结Spark底层存储体系. 总述 先看 BlockManager相关类之间的关系如下: 我们从NettyRpcEnv 开始,做一下简单说明. Ne ...

  8. spark 源码分析之二十 -- Stage的提交

    引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...

  9. Spark源码分析:多种部署方式之间的区别与联系

    作者:过往记忆 从官方的文档我们可以知道, Spark 的部署方式有很多种:local.Standalone.Mesos.YARN-..不同部署方式的后台处理进程是不一样的,但是如果我们从代码的角度来 ...

最新文章

  1. mysql5.7审计功能开启_MySQL5.7审计功能windows系统
  2. 奇怪的吃播_吃播,我看吐了!
  3. latex中怎样写上标_LaTex:参考文献引用的方法
  4. 【HTML/CSS】定位方式及区别
  5. windows7 配置iis技巧
  6. 使用xml和java代码混合控制UI界面
  7. Python获取微信好友地址以及性别并生成可视化图表
  8. 存储容量及相关计算单位
  9. 不用转化器PDF怎么转换成Word
  10. 前言-《揭示Kali Linux 》翻译连载02
  11. redis超卖java_redis如何解决秒杀超卖问题
  12. Ceph Cache Tier
  13. LabWindows的TEXTBOX和TABLE操作
  14. Android 手机存储及路径
  15. iOS 编译过程的原理和应用
  16. 中央银行调节货币供应量的三个手段
  17. linux下mv命令参数详解,linux下的mv命令使用详解
  18. ZIP压缩算法详细分析及解压实例
  19. 蚂蚁监控平台 - antmonitor架构设计
  20. cannot launch node of type [map_server/map_server]: can't locate node [map_server] in package

热门文章

  1. Java日志框架之JUL(java util logging)详解
  2. 牛客网 java刷题_牛客网刷题(纯java题型 1~30题)
  3. coreseek mysql.sock_Coreseek + Sphinx + Mysql + PHP构建中文检索引擎
  4. Java 并发编程之同步工具类闭锁 CountDownLatch
  5. 凤凰服务器系统,云服务器安装凤凰os
  6. pytorch将Tensor转为list
  7. 解决gensim导入模型报错UnpicklingError: invalid load key, ‘3‘.
  8. 系统升级不停服务器,服务器操作系统一直升级吗
  9. 快手用计算机说唱的叫什么,HIPHOP人物:“我们呢说唱,会在快手上爆炸!”
  10. 如何让.Net控件在设计时InitializeComponent()中不生成相关代码