文章目录

  • 简介
  • 创建
  • 主要成员
  • 主要功能
    • 创建本地目录
    • 获取BlockId对应的文件路径
    • 查询BlockId对应的文件是否存在
    • 创建临时Block文件

简介

DiskBlockManager主要用来创建并持有逻辑blocks块与物理磁盘位置之间的映射。逻辑block块通过其BlockId映射到一个磁盘上的文件。Block块文件被hash存储到spark.local.dir(或SPARK_LOCAL_DIRS)配置的列表目录中。

创建

BlockManager初始化时会创建DiskBlockManager:

val diskBlockManager = {// Only perform cleanup if an external service is not serving our shuffle files.val deleteFilesOnStop =!externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIERnew DiskBlockManager(conf, deleteFilesOnStop)
}

主要成员

  • localDirs: Array[File]:创建spark.local.dir配置指定的目录列表。然后在这些目录下创建用来存放中间数据的子目录(为了避免顶层目录的inodes过大),最后会根据文件名将其hash存放到不同的目录下。
  • subDirsPerLocalDir:localDirs代表的各个目录下的子目录的个数,由 spark.diskStore.subDirectories指定,默认为64个。
  • subDirs: Array[Array[File]]:localDirs代表的各个目录下的子目录,子目录个数由 subDirsPerLocalDir指定。subDirs是不可变的,但是其每个成员subDirs(i)是可变的,并且subDirs(i)都是被subDirs(i)的锁保护的。
  • shutdownHook:即关闭钩子,在进程结束时会递归删除localDirs下所有属于该Application的文件。

主要功能

DiskBlockManager管理了每个BlockId到磁盘上文件的映射关系。DiskBlockManager主要有如下几个功能:

  • 创建本地目录;
  • 获取BlockId对应的文件路径;
  • 查询BlockId对应的文件是否存在;
  • 创建临时Block文件;

创建本地目录

负责创建配置的本地节点上的指定列表磁盘目录,用来存储Block数据到指定文件中。当使用外部shuffle服务时,在JVM退出时目录不会被删除。

private def createLocalDirs(conf: SparkConf): Array[File] = {// 获取配置的本地目录列表Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>try {// 创建目录,目录名为rootDir + "blockmgr-" + UUID.randomUUID.toStringval localDir = Utils.createDirectory(rootDir, "blockmgr")logInfo(s"Created local directory at $localDir")Some(localDir)} catch {case e: IOException =>logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e)None}}
}

获取BlockId对应的文件路径

如果Block数据想要落盘,首先需要通过调用getFile方法来分配一个唯一的文件路径(寻找该blockId对应文件也是通过这种方式)。其处理步骤如下:

  • 根据文件名计算哈希值;

  • 根据哈希值与本地文件一级目录的总数求余数,记为dirId;

  • 根据哈希值与本地文件一级目录的总数求商数,此商数与二级目录的数目再求余数,记为subDirId;

  • 如果dirId/subDirId目录存在,则获取dirId/subDirId目录下的文件,否则新建dirId/subDirId目录。

      def getFile(blockId: BlockId): File = getFile(blockId.name)// 通过将文件名hash到本地子目录的方式来查找文件// 该方法应该与"org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile()"方法保持同步def getFile(filename: String): File = {// 得到该文件名hash到的本地目录,以及该目录下的子目录val hash = Utils.nonNegativeHash(filename)val dirId = hash % localDirs.lengthval subDirId = (hash / localDirs.length) % subDirsPerLocalDir// 如果目录不存在则创建该子目录val subDir = subDirs(dirId).synchronized {val old = subDirs(dirId)(subDirId)if (old != null) {old} else {val newDir = new File(localDirs(dirId), "%02x".format(subDirId))if (!newDir.exists() && !newDir.mkdir()) {throw new IOException(s"Failed to create local dir in $newDir.")}subDirs(dirId)(subDirId) = newDirnewDir}}new File(subDir, filename)}
    

查询BlockId对应的文件是否存在

查询BlockId对应的文件是否存在,也是通过获取BlockId对应的文件路径是否存在来实现的。

/** Check if disk block manager has a block. */
def containsBlock(blockId: BlockId): Boolean = {getFile(blockId.name).exists()
}

创建临时Block文件

当ShuffleMapTask运行结束需要把中间结果临时保存,此时就调用createTempShuffleBlock方法创建临时的Block文件,并返回TempShuffleBlockId(TempShuffleBlockId的生成规则:"temp_shuffle_"后加上UUID字符串)与其对应的文件。

/** Produces a unique block id and File suitable for storing shuffled intermediate results. */
def createTempShuffleBlock(): (TempShuffleBlockId, File) = {var blockId = new TempShuffleBlockId(UUID.randomUUID())while (getFile(blockId).exists()) {blockId = new TempShuffleBlockId(UUID.randomUUID())}(blockId, getFile(blockId))
}/** Id associated with temporary shuffle data managed as blocks. Not serializable. */
private[spark] case class TempShuffleBlockId(id: UUID) extends BlockId {override def name: String = "temp_shuffle_" + id
}

磁盘块管理器DiskBlockManager相关推荐

  1. Spark源码剖析 - SparkContext的初始化(八)_初始化管理器BlockManager

    8.初始化管理器BlockManager 无论是Spark的初始化阶段还是任务提交.执行阶段,始终离不开存储体系.Spark为了避免Hadoop读写磁盘的I/O操作成为性能瓶颈,优先将配置信息.计算结 ...

  2. ceph bluestore中的磁盘空间管理

    ceph bluestore摒弃了传统的本地文件系统,而直接使用裸磁盘作为OSD的存储介质,因而需要自行管理磁盘空间的分配与回收 概述 一个设计良好的磁盘空间管理器,需要兼顾空间和时间效率:blues ...

  3. 虚拟磁盘管理器 服务器运行失败,Xp系统运行虚拟磁盘管理器出现故障怎么办

    我们使用的电脑都是需要进行磁盘分区的,这样我们就能够直接的给打开的磁盘管理进行分区.变更盘符或是进行压缩卷等的操作,但有时候在打开时却显示:"RPC服务器不可用"的故障,碰到这样的 ...

  4. 文件系统,磁盘配额,数据存储,lvm 逻辑卷管理器

    文件系统 文件系统包括:ext2 ext3 (比ext2多一个日志)ext4                 iso9660(光盘文件系统)                 vfat  (相当于win ...

  5. 计算机管理器磁盘清理,修复win7提示“windows磁盘空间清理管理器已停止工作”的方法...

    在长期的使用电脑之后,我们运行的程序会给电脑制造很多的垃圾文件,这些文件有的可以使用第三方工具进行清理,有的只能使用系统自带的磁盘清理器进行删除.在win7中运行磁盘清理器时会遇到系统弹出" ...

  6. python 上下文管理器、 else 块、@contextmanager

    文章目录 1. else 2. with上下文管理器 3. contextlib模块实用工具 4. @contextmanager 装饰器 learn from <流畅的python> 1 ...

  7. win10磁盘管理器的用处和意义

    win10磁盘管理器的用处和意义 https://baijiahao.baidu.com/s?id=1623011831942760541&wfr=spider&for=pc 关于电脑 ...

  8. win10计算机的管理在哪里打开,Win10磁盘管理器

    问:Win10的磁盘管理器在哪里打开?我想重新调整Win10电脑上的磁盘分区,但是对Win10系统不是太熟悉,找不到磁盘管理器在哪里! 请问Win10电脑上怎么打开磁盘管理器? 答:Win10系统中, ...

  9. Win10磁盘管理器:轻松和安全地调整Win10的分区大小

    Win10内嵌的磁盘管理工具 微软最新操作系统Win10的拥有了很多先进的功能,如基于触摸的界面Metro风格,Windows商店,等等.然而,与Windows 7相比,它在磁盘管理上没有太大的突破. ...

最新文章

  1. 分享5个我「 最死忠 」的Windows10软件
  2. XCTF-高手进阶区:NewsCenter
  3. linux smb 启动失败,[已解决]windows能看到smb,但是打不开
  4. MAC地址进行验证的方法
  5. Redis(五):List集合数据类型详解
  6. java jeditorpane 自动换行_java – JTextPane JScrollPane自动换行?
  7. java sql date类_java.util.Date和java.sql.Date 一点区别
  8. WebStorm设置px转换rem,亲测有效!
  9. 手把手教你:CSS+JS 打造一个有个性的滚动条
  10. 如何从外网穿透到内网
  11. 【存储器了解 RAM flash和eeprom存储器的区别和作用】
  12. android模拟器访问本地网站
  13. 破解网络尖兵(真正对付限制ADSL路由共享的方法)
  14. UE4 动态修改材质 控制颜色和贴图
  15. 惠普光影精灵5 拆机与配置升级教程
  16. 震旦199打印机扫描A4文件
  17. 信息安全界的巨星Bruce Schneier
  18. 特鲁姆普完胜奥沙利文 首夺斯诺克大师赛冠军
  19. 抽象数据类型:复数COMPLEX
  20. python如何声明函数_python如何声明函数

热门文章

  1. 微信小程序--动态时间实现
  2. 即时聊天工具二次开发
  3. Fiddler原理+雷电模拟器进行APP抓包
  4. 交通期刊JCR(2020)
  5. 阅读小结:MSR:From Captions to Visual Concepts and Back
  6. 基于SSM+Bootstrap【爱校教务系统管理系统】附源码
  7. 一种Δ-Σ模数转换器中梳状滤波器的设计
  8. SQL Server 数据库之视图
  9. 【Hive】常用日期格式转换和计算
  10. 电动汽车充电桩管理平台