spark 笔记 16: BlockManager
/* Class for returning a fetched block and associated metrics. */private[spark] class BlockResult(val data: Iterator[Any], readMethod: DataReadMethod.Value, bytes: Long) {val inputMetrics = new InputMetrics(readMethod)inputMetrics.bytesRead = bytes} private[spark] class BlockManager( executorId: String, actorSystem: ActorSystem, val master: BlockManagerMaster, defaultSerializer: Serializer, maxMemory: Long, val conf: SparkConf, securityManager: SecurityManager, mapOutputTracker: MapOutputTracker, shuffleManager: ShuffleManager)extends BlockDataProvider with Logging {
/** * Contains all the state related to a particular shuffle. This includes a pool of unused * ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle. */private class ShuffleState(val numBuckets: Int) {val nextFileId = new AtomicInteger(0)val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]() /** * The mapIds of all map tasks completed on this Executor for this shuffle. * NB: This is only populated if consolidateShuffleFiles is FALSE. We don't need it otherwise. */ val completedMapTasks = new ConcurrentLinkedQueue[Int]()}
private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState] private val metadataCleaner =new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf)
/** * Register a completed map without getting a ShuffleWriterGroup. Used by sort-based shuffle * because it just writes a single file by itself. */def addCompletedMap(shuffleId: Int, mapId: Int, numBuckets: Int): Unit = {shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))val shuffleState = shuffleStates(shuffleId) shuffleState.completedMapTasks.add(mapId)}
/** * Initialize the BlockManager. Register to the BlockManagerMaster, and start the * BlockManagerWorker actor. */private def initialize(): Unit = { master.registerBlockManager(blockManagerId, maxMemory, slaveActor) BlockManagerWorker.startBlockManagerWorker(this)}
/** * A short-circuited method to get blocks directly from disk. This is used for getting * shuffle blocks. It is safe to do so without a lock on block info since disk store * never deletes (recent) items. */def getLocalFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {diskStore.getValues(blockId, serializer).orElse {throw new BlockException(blockId, s"Block $blockId not found on disk, though it should be") }}
/** A group of writers for a ShuffleMapTask, one writer per reducer. */private[spark] trait ShuffleWriterGroup {val writers: Array[BlockObjectWriter] /** @param success Indicates all writes were successful. If false, no blocks will be recorded. */ def releaseWriters(success: Boolean)} /** * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file * per reducer (this set of files is called a ShuffleFileGroup). * * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle * files, it releases them for another task. * Regarding the implementation of this feature, shuffle files are identified by a 3-tuple: * - shuffleId: The unique id given to the entire shuffle stage. * - bucketId: The id of the output partition (i.e., reducer id) * - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a * time owns a particular fileId, and this id is returned to a pool when the task finishes. * Each shuffle file is then mapped to a FileSegment, which is a 3-tuple (file, offset, length) * that specifies where in a given file the actual block data is located. * * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping * ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a list of offsets for * each block stored in each file. In order to find the location of a shuffle block, we search the * files within a ShuffleFileGroups associated with the block's reducer. */// TODO: Factor this into a separate class for each ShuffleManager implementationprivate[spark]class ShuffleBlockManager(blockManager: BlockManager, shuffleManager: ShuffleManager) extends Logging {
privateobject /** * . * . */ private class val Int, val Int, val private var numBlocksInt 0 /** * For instance, * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0. */ private val mapIdToIndex new Int, Int/** * Stores consecutive offsets and lengths of blocks into each reducer file, ordered by * position in the file. * Note: * . */ private val blockOffsetsByReducer fillLongnew Longprivate val blockLengthsByReducer fillLongnew Longdef applyIntdef recordMapOutputInt, Long, LongassertmapIdToIndexnumBlocks numBlocks 1 for 0 blockOffsetsByReducerblockLengthsByReducer/** Returns the FileSegment associated with the given map task, or None if no entry exists. */ def getFileSegmentForInt, Intval val blockOffsetsByReducerval blockLengthsByReducerval mapIdToIndex, 1if 0val val Somenew , , else
转载于:https://www.cnblogs.com/zwCHAN/p/4253287.html
spark 笔记 16: BlockManager相关推荐
- Spark笔记——技术点汇总
Spark笔记--技术点汇总 目录 · 概况 · 手工搭建集群 · 引言 · 安装Scala · 配置文件 · 启动与测试 · 应用部署 · 部署架构 · 应用程序部署 · 核心原理 · RDD概念 ...
- cocos2d-x学习笔记16:记录存储1:CCUserDefault
cocos2d-x学习笔记16:记录存储1:CCUserDefault 一.简述 CCUserDefalt作为NSUserDefalt类的cocos2d-x实现版本,承担了cocos2d-x引擎的记录 ...
- 安装Hadoop及Spark(Ubuntu 16.04)
安装Hadoop及Spark(Ubuntu 16.04) 安装JDK 下载jdk(以jdk-8u91-linux-x64.tar.gz为例) 新建文件夹 sudo mkdir /usr/lib/jvm ...
- 操作系统概念学习笔记 16 内存管理(二) 段页
操作系统概念学习笔记 16 内存管理 (二) 分页(paging) 分页(paging)内存管理方案允许进程的物理地址空间可以使非连续的.分页避免了将不同大小的内存块匹配到交换空间上(前面叙述的内存管 ...
- spark 笔记 1: 如何着手
spark 笔记 1: 如何着手 必读:从官方的开发者页面着手,包括如何构建spark以及编码规范(强烈建议读读编程规范)等:https://cwiki.apache.org/confluence/d ...
- 《Head First设计模式》 读书笔记16 其余的模式(二) 蝇量 解释器 中介者
<Head First设计模式> 读书笔记16 其余的模式(二) 蝇量 解释器 中介者 蝇量(Flyweight Pattern) 如想让某个类的一个实例能用来提供许多"虚拟实例 ...
- SpringBoot学习笔记(16)----SpringBoot整合Swagger2
Swagger 是一个规范和完整的框架,用于生成,描述,调用和可视化RESTful风格的web服务 http://swagger.io Springfox的前身是swagger-springmvc,是 ...
- Hadoop学习笔记—16.Pig框架学习
Hadoop学习笔记-16.Pig框架学习 一.关于Pig:别以为猪不能干活 1.1 Pig的简介 Pig是一个基于Hadoop的大规模数据分析平台,它提供的SQL-LIKE语言叫Pig Latin, ...
- 台大李宏毅Machine Learning 2017Fall学习笔记 (16)Unsupervised Learning:Neighbor Embedding
台大李宏毅Machine Learning 2017Fall学习笔记 (16)Unsupervised Learning:Neighbor Embedding
最新文章
- Spark-ML-数据获取/处理/准备
- openlayers基础(一)——Map
- 实验6_MPEG音频编码实验
- Oracle的rownum原理和使用(整理几个达人的帖子)
- Python第三方库离线安装包制作(whl文件)(离线包)
- Scala 安装(win)
- 手机和PC机根本不能挖矿
- 蓝桥杯单片机数码管动态显示_单片机期末整理
- C++ - Sodoku Killer(DFS) - 实现一个数独解算器
- java里的文件显示繁体_java保存繁体网页打开后出现乱码
- RSA算法原理及数字签名技术
- biosrecovery什么意思_卡刷和线刷手机什么意思 Recovery使用方法
- access用姓名字段统计人数_用Access统计新生数据
- maven的jar包引入成功却仍然爆红
- activiti报错:org.activiti.engine.ActivitiTaskAlreadyClaimedException: Task ‘12502‘ is already claimed
- 【系统架构设计师】第四章 计算机网络
- MathType 快捷键大全——数学建模神器yyds(告别繁杂的公式)
- python开发mbus程序_基于MBUS标准协议采集水、热、气表的方法与流程
- java es 数据批量导入_elasticsearch批量数据导入和导出
- CMakeList 文件
热门文章
- 余额宝利率破2.4%,你还会把钱放在余额宝里面吗?
- Juniper Space License Issue on Citrix Xen Environment
- protel布线需要注意事项
- 单片机小白学步系列(十七) 单片机/计算机系统概述:核心模块
- 异步复位的串联T触发器
- Counting Bits
- swift date 计算差_[Swift 设计模式] 适配器
- github 公钥 私钥_ubuntu git生成ssh key (公钥私钥)配置github或者码云
- 此应用无法在你的电脑上运行_能直运行iOS应用!苹果新macOS翻车 正式版下载后无法安装...
- 生成特定分布随机数的方法:Python seed() 函数numpy scikit-learn随机数据生成