Spark计算速度远胜于Hadoop的原因之一就在于中间结果是缓存在内存而不是直接写入到disk,本文尝试分析Spark中存储子系统的构成,并以数据写入和数据读取为例,讲述清楚存储子系统中各部件的交互关系。

存储子系统概览

上图是Spark存储子系统中几个主要模块的关系示意图,现简要说明如下

  • CacheManager  RDD在进行计算的时候,通过CacheManager来获取数据,并通过CacheManager来存储计算结果
  • BlockManager   CacheManager在进行数据读取和存取的时候主要是依赖BlockManager接口来操作,BlockManager决定数据是从内存(MemoryStore)还是从磁盘(DiskStore)中获取
  • MemoryStore   负责将数据保存在内存或从内存读取
  • DiskStore        负责将数据写入磁盘或从磁盘读入
  • BlockManagerWorker  数据写入本地的MemoryStore或DiskStore是一个同步操作,为了容错还需要将数据复制到别的计算结点,以防止数据丢失的时候还能够恢复, 数据复制的操作是异步完成,由BlockManagerWorker来处理这一部分事情
  • ConnectionManager 负责与其它计算结点建立连接,并负责数据的发送和接收
  • BlockManagerMaster 注意该模块只运行在Driver Application所在的Executor,功能是负责记录下所有BlockIds存储在哪个SlaveWorker上,比如RDD Task运行在机器A,所需要的BlockId为3,但在机器A上没有BlockId为3的数值,这个时候Slave worker需要通过BlockManager向BlockManagerMaster询问数据存储的位置,然后再通过 ConnectionManager去获取,具体参看“数据远程获取一节”

支持的操作

由于BlockManager起到实际的存储管控作用,所以在讲支持的操作的时候,以BlockManager中的public api为例

  • put  数据写入
  • get      数据读取
  • remoteRDD 数据删除,一旦整个job完成,所有的中间计算结果都可以删除

启动过程分析

上述的各个模块由SparkEnv来创建,创建过程在SparkEnv.create中完成

    val blockManagerMaster = new BlockManagerMaster(registerOrLookup("BlockManagerMaster",new BlockManagerMasterActor(isLocal, conf)), conf)val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf) val connectionManager = blockManager.connectionManager val broadcastManager = new BroadcastManager(isDriver, conf) val cacheManager = new CacheManager(blockManager) 

这段代码容易让人疑惑,看起来像是在所有的cluster node上都创建了BlockManagerMasterActor,其实不然,仔细看registerOrLookup函数的实现。如果当前节点是driver则创建这个actor,否则建立到driver的连接。

    def registerOrLookup(name: String, newActor: => Actor): ActorRef = {if (isDriver) {logInfo("Registering " + name)actorSystem.actorOf(Props(newActor), name = name)} else {val driverHost: String = conf.get("spark.driver.host", "localhost") val driverPort: Int = conf.getInt("spark.driver.port", 7077) Utils.checkHost(driverHost, "Expected hostname") val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name" val timeout = AkkaUtils.lookupTimeout(conf) logInfo(s"Connecting to $name: $url") Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout) } } 

初始化过程中一个主要的动作就是BlockManager需要向BlockManagerMaster发起注册

数据写入过程分析

数据写入的简要流程

  1. RDD.iterator是与storage子系统交互的入口
  2. CacheManager.getOrCompute调用BlockManager的put接口来写入数据
  3. 数据优先写入到MemoryStore即内存,如果MemoryStore中的数据已满则将最近使用次数不频繁的数据写入到磁盘
  4. 通知BlockManagerMaster有新的数据写入,在BlockManagerMaster中保存元数据
  5. 将写入的数据与其它slave worker进行同步,一般来说在本机写入的数据,都会另先一台机器来进行数据的备份,即replicanumber=1

序列化与否

写入的具体内容可以是序列化之后的bytes也可以是没有序列化的value. 此处有一个对scala的语法中Either, Left, Right关键字的理解。

数据读取过程分析

 def get(blockId: BlockId): Option[Iterator[Any]] = {val local = getLocal(blockId)if (local.isDefined) {logInfo("Found block %s locally".format(blockId))return local } val remote = getRemote(blockId) if (remote.isDefined) { logInfo("Found block %s remotely".format(blockId)) return remote } None } 

本地读取

首先在查询本机的MemoryStore和DiskStore中是否有所需要的block数据存在,如果没有则发起远程数据获取。

远程读取

远程获取调用路径, getRemote->doGetRemote, 在doGetRemote中最主要的就是调用BlockManagerWorker.syncGetBlock来从远程获得数据

def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {val blockManager = blockManagerWorker.blockManagerval connectionManager = blockManager.connectionManagerval blockMessage = BlockMessage.fromGetBlock(msg)val blockMessageArray = new BlockMessageArray(blockMessage) val responseMessage = connectionManager.sendMessageReliablySync( toConnManagerId, blockMessageArray.toBufferMessage) responseMessage match { case Some(message) => { val bufferMessage = message.asInstanceOf[BufferMessage] logDebug("Response message received " + bufferMessage) BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage => { logDebug("Found " + blockMessage) return blockMessage.getData }) } case None => logDebug("No response message received") } null } 

上述这段代码中最有意思的莫过于sendMessageReliablySync,远程数据读取毫无疑问是一个异步i/o操作,这里的代码怎么写起来就像是在进行同步的操作一样呢。也就是说如何知道对方发送回来响应的呢?

别急,继续去看看sendMessageReliablySync的定义

def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message): Future[Option[Message]] = {val promise = Promise[Option[Message]]val status = new MessageStatus(message, connectionManagerId, s => promise.success(s.ackMessage))messageStatuses.synchronized {messageStatuses += ((message.id, status))}sendMessage(connectionManagerId, message)promise.future}

要是我说秘密在这里,你肯定会说我在扯淡,但确实在此处。注意到关键字Promise和Future没。

如果这个future执行完毕,返回s.ackMessage。我们再看看这个ackMessage是在什么地方被写入的呢。看一看ConnectionManager.handleMessage中的代码片段


case bufferMessage: BufferMessage => {if (authEnabled) {val res = handleAuthentication(connection, bufferMessage)if (res == true) { // message was security negotiation so skip the rest logDebug("After handleAuth result was true, returning") return } } if (bufferMessage.hasAckId) { val sentMessageStatus = messageStatuses.synchronized { messageStatuses.get(bufferMessage.ackId) match { case Some(status) => { messageStatuses -= bufferMessage.ackId status } case None => { throw new Exception("Could not find reference for received ack message " + message.id) null } } } sentMessageStatus.synchronized { sentMessageStatus.ackMessage = Some(message) sentMessageStatus.attempted = true sentMessageStatus.acked = true sentMessageStaus.markDone() } 

注意,此处的所调用的sentMessageStatus.markDone就会调用在sendMessageReliablySync中定义的promise.Success. 不妨看看MessageStatus的定义。

 class MessageStatus(val message: Message,val connectionManagerId: ConnectionManagerId,completionHandler: MessageStatus => Unit) {var ackMessage: Option[Message] = None var attempted = false var acked = false def markDone() { completionHandler(this) } } 

我想至此调用关系搞清楚了,scala中的Future和Promise理解起来还有有点费劲。

TachyonStore

在Spark的最新源码中,Storage子系统引入了TachyonStore. TachyonStore是在内存中实现了hdfs文件系统的接口,主要目的就是尽可能的利用内存来作为数据持久层,避免过多的磁盘读写操作。

有关该模块的功能介绍,可以参考http://www.meetup.com/spark-users/events/117307472/

小结

一点点疑问,目前在Spark的存储子系统中,通信模块里传递的数据即有“心跳检测消息”,“数据同步的消息”又有“数据获取之类的信息流”。如果可能的话,要将心跳检测与数据同步即数据获取所使用的网卡分离以提高可靠性。

参考资料

  1. Spark源码分析之-Storage模块 http://jerryshao.me/architecture/2013/10/08/spark-storage-module-analysis/

  2. Tachyon  http://www.slideshare.net/rxin/a-tachyon-2013-0509sparkmeetup?qid=39ee582d-e0bf-41d2-ab01-dc2439abc626&v=default&b=&from_search=2

转载于:https://www.cnblogs.com/captain_ccc/articles/4129318.html

Apache Spark源码走读之6 -- 存储子系统分析相关推荐

  1. Apache Spark源码走读之16 -- spark repl实现详解

    欢迎转载,转载请注明出处,徽沪一郎. 概要 之所以对spark shell的内部实现产生兴趣全部缘于好奇代码的编译加载过程,scala是需要编译才能执行的语言,但提供的scala repl可以实现代码 ...

  2. Apache Spark源码走读(九)如何进行代码跟读使用Intellij idea调试Spark源码

    <一>如何进行代码跟读 概要 今天不谈Spark中什么复杂的技术实现,只稍为聊聊如何进行代码跟读.众所周知,Spark使用scala进行开发,由于scala有众多的语法糖,很多时候代码跟着 ...

  3. Apache Spark源码走读之8 -- Spark on Yarn

    欢迎转载,转载请注明出处,徽沪一郎. 概要 Hadoop2中的Yarn是一个分布式计算资源的管理平台,由于其有极好的模型抽象,非常有可能成为分布式计算资源管理的事实标准.其主要职责将是分布式计算集群的 ...

  4. Apache Spark源码走读之22 -- 浅谈mllib中线性回归的算法实现

    欢迎转载,转载请注明出处,徽沪一郎. 概要 本文简要描述线性回归算法在Spark MLLib中的具体实现,涉及线性回归算法本身及线性回归并行处理的理论基础,然后对代码实现部分进行走读. 线性回归模型 ...

  5. Apache Spark源码走读之3 -- Task运行期之函数调用关系分析

    概要 本篇主要阐述在TaskRunner中执行的task其业务逻辑是如何被调用到的,另外试图讲清楚运行着的task其输入的数据从哪获取,处理的结果返回到哪里,如何返回. 准备 spark已经安装完毕 ...

  6. Apache Spark源码走读之4 -- DStream实时流数据处理

    欢迎转载,转载请注明出处,徽沪一郎. Spark Streaming能够对流数据进行近乎实时的速度进行数据处理.采用了不同于一般的流式数据处理模型,该模型使得Spark Streaming有非常高的处 ...

  7. 编译 Apache Spark 源码报错?那是因为你漏掉了关键操作

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  8. Spark源码走读概述

    Spark代码量 --Spark:20000loc --Hadoop 1.0:90000loc --Hadoop 2.0:220000loc Spark生态系统代码量 Spark生态系统 概述 --构 ...

  9. Spark源码走读1——RDD

    RDD全称Resilient Distributed DataSets,弹性的分布式数据集.是Spark的核心内容. RDD是只读的,不可变的数据集,也拥有很好的容错机制.他有5个主要特性 -A li ...

最新文章

  1. 为什么很难训练深度神经网络?
  2. 关于MPLS和SD-WAN的4大误解—Vecloud微云
  3. left join 临时表_图解SQL的JOIN
  4. linux错误说未声明,如何在Linux 32位计算机上解决REG_EIP未声明(在此函数中首次使用)错误?...
  5. python编辑器_python编辑器,作为小白该如何抉择?
  6. 【拔刀吧少年】之sed编辑器
  7. [dsu on tree]树上启发式合并总结(算法思想及模板附例题练习)
  8. php点击按钮弹窗提示,WEB表单,给出弹出框提示,点击按钮报错
  9. 1029 最大公约数和最小公倍数问题(gcd) luogu洛谷
  10. 【解决】该任务映像已损坏或已篡改。(异常来自HRESULT:0x80041321)
  11. 每天CookBook之JavaScript-018
  12. 雷达线性调频信号的脉冲压缩处理
  13. 远程桌面连接Windows后显示蓝屏
  14. [源码]VB6.0操作注册表
  15. Android底层网络防火墙,Android系统中实现网络防火墙的方法
  16. __FILE__, __FUNCTION__, __LINE__学习篇,谨以此文告别那无知的岁月
  17. 微信扫描二维码网页跳转显示信息
  18. 线性筛(Linear Sieve)
  19. 阿里、华为、腾讯的“第二曲线”,大厂云们的成败得失如何评价?
  20. 分享:虚拟筛选常用化合物库

热门文章

  1. python编程题-python编程练习题
  2. python3爬虫入门教程-有什么好的python3爬虫入门教程或书籍吗?
  3. python绘制条形图-python3使用matplotlib绘制条形图
  4. python教程视频在线-微软再推免费在线Python教程 包含20个视频
  5. 软件测试用python一般用来做什么-如何将Python应用到实际测试工作中?
  6. python打开excel表格-如何从python中用excel打开excel工作表?
  7. python 培训-本人的Python自学历程分享
  8. python最新版下载教程-各种版本的Python下载安装教程
  9. 从零开始学python数据分析-【01】从零开始学Python—数据分析与挖掘概述
  10. python二次开发攻略-ABAQUS Python二次开发攻略