《深入理解Spark:核心思想与源码分析》一书前言的内容请看链接《深入理解SPARK:核心思想与源码分析》一书正式出版上市

《深入理解Spark:核心思想与源码分析》一书第一章的内容请看链接《第1章 环境准备》

《深入理解Spark:核心思想与源码分析》一书第二章的内容请看链接《第2章 SPARK设计理念与基本架构》

由于本书的第3章内容较多,所以打算分别开辟四篇随笔分别展现。

《深入理解Spark:核心思想与源码分析》一书第三章第一部分的内容请看链接《深入理解Spark:核心思想与源码分析》——SparkContext的初始化(伯篇)》

《深入理解Spark:核心思想与源码分析》一书第三章第二部分的内容请看链接《深入理解Spark:核心思想与源码分析》——SparkContext的初始化(仲篇)》

本文展现第3章第三部分的内容:

3.8 TaskScheduler的启动

  3.7节介绍了任务调度器TaskScheduler的创建,要想TaskScheduler发挥作用,必须要启动它,代码如下。

taskScheduler.start()

TaskScheduler在启动的时候,实际调用了backend的start方法。

  override def start() {backend.start()}

以LocalBackend为例,启动LocalBackend时向actorSystem注册了LocalActor,见代码清单3-30所示(在《深入理解Spark:核心思想与源码分析》——SparkContext的初始化(中)》一文)。

3.8.1 创建LocalActor

  创建LocalActor的过程主要是构建本地的Executor,见代码清单3-36。

代码清单3-36         LocalActor的实现

private[spark] class LocalActor(scheduler: TaskSchedulerImpl, executorBackend: LocalBackend,private val totalCores: Int) extends Actor with ActorLogReceive with Logging {import context.dispatcher   // to use Akka's scheduler.scheduleOnce()private var freeCores = totalCoresprivate val localExecutorId = SparkContext.DRIVER_IDENTIFIERprivate val localExecutorHostname = "localhost"val executor = new Executor(localExecutorId, localExecutorHostname, scheduler.conf.getAll, totalCores, isLocal = true)override def receiveWithLogging = {case ReviveOffers =>reviveOffers()case StatusUpdate(taskId, state, serializedData) =>scheduler.statusUpdate(taskId, state, serializedData)if (TaskState.isFinished(state)) {freeCores += scheduler.CPUS_PER_TASKreviveOffers()} case KillTask(taskId, interruptThread) =>executor.killTask(taskId, interruptThread)case StopExecutor =>executor.stop()}}

Executor的构建,见代码清单3-37,主要包括以下步骤:

1) 创建并注册ExecutorSource。ExecutorSource是做什么的呢?笔者将在3.10.2节详细介绍。

2) 获取SparkEnv。如果是非local模式,Worker上的CoarseGrainedExecutorBackend向Driver上的CoarseGrainedExecutorBackend注册Executor时,则需要新建SparkEnv。可以修改属性spark.executor.port(默认为0,表示随机生成)来配置Executor中的ActorSystem的端口号。

3) 创建并注册ExecutorActor。ExecutorActor负责接受发送给Executor的消息。

4) urlClassLoader的创建。为什么需要创建这个ClassLoader?在非local模式中,Driver或者Worker上都会有多个Executor,每个Executor都设置自身的urlClassLoader,用于加载任务上传的jar包中的类,有效对任务的类加载环境进行隔离。

5) 创建Executor执行TaskRunner任务(TaskRunner将在5.5节介绍)的线程池。此线程池是通过调用Utils.newDaemonCachedThreadPool创建的,具体实现请参阅附录A。

6) 启动Executor的心跳线程。此线程用于向Driver发送心跳。

此外,还包括Akka发送消息的帧大小(10485760字节)、结果总大小的字节限制(1073741824字节)、正在运行的task的列表、设置serializer的默认ClassLoader为创建的ClassLoader等。

代码清单3-37         Executor的构建

  val executorSource = new ExecutorSource(this, executorId)private val env = {if (!isLocal) {val port = conf.getInt("spark.executor.port", 0)val _env = SparkEnv.createExecutorEnv(conf, executorId, executorHostname, port, numCores, isLocal, actorSystem)SparkEnv.set(_env)_env.metricsSystem.registerSource(executorSource)_env.blockManager.initialize(conf.getAppId)_env} else {SparkEnv.get}}private val executorActor = env.actorSystem.actorOf(Props(new ExecutorActor(executorId)), "ExecutorActor")private val urlClassLoader = createClassLoader()private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)env.serializer.setDefaultClassLoader(urlClassLoader)private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)private val maxResultSize = Utils.getMaxResultSize(conf)val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]startDriverHeartbeater()

3.8.2 ExecutorSource的创建与注册

  ExecutorSource用于测量系统。通过metricRegistry的register方法注册计量,这些计量信息包括threadpool.activeTasks、threadpool.completeTasks、threadpool.currentPool_size、threadpool.maxPool_size、filesystem.hdfs.write_bytes、filesystem.hdfs.read_ops、filesystem.file.write_bytes、filesystem.hdfs.largeRead_ops、filesystem.hdfs.write_ops等,ExecutorSource的实现见代码清单3-38。Metric接口的具体实现,参考附录D。

代码清单3-38         ExecutorSource的实现

private[spark] class ExecutorSource(val executor: Executor, executorId: String) extends Source {private def fileStats(scheme: String) : Option[FileSystem.Statistics] =FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOptionprivate def registerFileSystemStat[T](scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = {metricRegistry.register(MetricRegistry.name("filesystem", scheme, name), new Gauge[T] {override def getValue: T = fileStats(scheme).map(f).getOrElse(defaultValue)})}override val metricRegistry = new MetricRegistry()override val sourceName = "executor"metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {override def getValue: Int = executor.threadPool.getActiveCount()})metricRegistry.register(MetricRegistry.name("threadpool", "completeTasks"), new Gauge[Long] {override def getValue: Long = executor.threadPool.getCompletedTaskCount()})metricRegistry.register(MetricRegistry.name("threadpool", "currentPool_size"), new Gauge[Int] {override def getValue: Int = executor.threadPool.getPoolSize()})metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] {override def getValue: Int = executor.threadPool.getMaximumPoolSize()})// Gauge for file system stats of this executorfor (scheme <- Array("hdfs", "file")) {registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L)registerFileSystemStat(scheme, "write_bytes", _.getBytesWritten(), 0L)registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0)registerFileSystemStat(scheme, "largeRead_ops", _.getLargeReadOps(), 0)registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)}} 

创建完ExecutorSource后,调用MetricsSystem的registerSource方法将ExecutorSource注册到MetricsSystem。registerSource方法使用MetricRegistry的register方法,将Source注册到MetricRegistry,见代码清单3-39。关于MetricRegistry,具体参阅附录D。

代码清单3-39         MetricsSystem注册Source的实现

  def registerSource(source: Source) {sources += sourcetry {val regName = buildRegistryName(source)registry.register(regName, source.metricRegistry)} catch {case e: IllegalArgumentException => logInfo("Metrics already registered", e)}} 

3.8.3 ExecutorActor的构建与注册

  ExecutorActor很简单,当接收到SparkUI发来的消息时,将所有线程的栈信息发送回去,代码实现如下。

  override def receiveWithLogging = {case TriggerThreadDump =>sender ! Utils.getThreadDump()}

3.8.4 Spark自身ClassLoader的创建

  获取要创建的ClassLoader的父加载器currentLoader,然后根据currentJars生成URL数组,spark.files.userClassPathFirst属性指定加载类时是否先从用户的classpath下加载,最后创建ExecutorURLClassLoader或者ChildExecutorURLClassLoader,见代码清单3-40。

代码清单3-40         Spark自身ClassLoader的创建

  private def createClassLoader(): MutableURLClassLoader = {val currentLoader = Utils.getContextOrSparkClassLoaderval urls = currentJars.keySet.map { uri =>new File(uri.split("/").last).toURI.toURL}.toArrayval userClassPathFirst = conf.getBoolean("spark.files.userClassPathFirst", false)userClassPathFirst match {case true => new ChildExecutorURLClassLoader(urls, currentLoader)case false => new ExecutorURLClassLoader(urls, currentLoader)}} 

Utils.getContextOrSparkClassLoader的实现见附录A。ExecutorURLClassLoader或者ChildExecutorURLClassLoader实际上都继承了URLClassLoader,见代码清单3-41。 

代码清单3-41         ChildExecutorURLClassLoader与ExecutorURLClassLoader的实现

private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)extends MutableURLClassLoader {private object userClassLoader extends URLClassLoader(urls, null){override def addURL(url: URL) {super.addURL(url)}override def findClass(name: String): Class[_] = {super.findClass(name)}}private val parentClassLoader = new ParentClassLoader(parent)override def findClass(name: String): Class[_] = {try {userClassLoader.findClass(name)} catch {case e: ClassNotFoundException => {parentClassLoader.loadClass(name)}}}def addURL(url: URL) {userClassLoader.addURL(url)}def getURLs() = {userClassLoader.getURLs()}
}private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)extends URLClassLoader(urls, parent) with MutableURLClassLoader {override def addURL(url: URL) {super.addURL(url)}}

如果需要REPL交互,还会调用addReplClassLoaderIfNeeded创建replClassLoader,见代码清单3-42。

代码清单3-42         addReplClassLoaderIfNeeded的实现

  private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = {val classUri = conf.get("spark.repl.class.uri", null)if (classUri != null) {logInfo("Using REPL class URI: " + classUri)val userClassPathFirst: java.lang.Boolean =conf.getBoolean("spark.files.userClassPathFirst", false)try {val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader").asInstanceOf[Class[_ <: ClassLoader]]val constructor = klass.getConstructor(classOf[SparkConf], classOf[String],classOf[ClassLoader], classOf[Boolean])constructor.newInstance(conf, classUri, parent, userClassPathFirst)} catch {case _: ClassNotFoundException =>logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!")System.exit(1)null}} else {parent}}

3.8.5 启动Executor的心跳线程

  Executor的心跳由startDriverHeartbeater启动,见代码清单3-43。Executor心跳线程的间隔由属性spark.executor.heartbeatInterval配置,默认是10000毫秒。此外,超时时间是30秒,超时重试次数是3次,重试间隔是3000毫秒,使用actorSystem.actorSelection (url)方法查找到匹配的Actor引用, url是akka.tcp://sparkDriver@ $driverHost:$driverPort/user/HeartbeatReceiver,最终创建一个运行过程中,每次会休眠10000到20000毫秒的线程。此线程从runningTasks获取最新的有关Task的测量信息,将其与executorId、blockManagerId封装为Heartbeat消息,向HeartbeatReceiver发送Heartbeat消息。

代码清单3-43         启动Executor的心跳线程

  def startDriverHeartbeater() {val interval = conf.getInt("spark.executor.heartbeatInterval", 10000)val timeout = AkkaUtils.lookupTimeout(conf)val retryAttempts = AkkaUtils.numRetries(conf)val retryIntervalMs = AkkaUtils.retryWaitMs(conf)val heartbeatReceiverRef = AkkaUtils.makeDriverRef("HeartbeatReceiver", conf,env.actorSystem)val t = new Thread() {override def run() {// Sleep a random interval so the heartbeats don't end up in syncThread.sleep(interval + (math.random * interval).asInstanceOf[Int])while (!isStopped) {val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]()val curGCTime = gcTimefor (taskRunner <- runningTasks.values()) {if (!taskRunner.attemptedTask.isEmpty) {Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>metrics.updateShuffleReadMetricsmetrics.jvmGCTime = curGCTime - taskRunner.startGCTimeif (isLocal) {val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))tasksMetrics += ((taskRunner.taskId, copiedMetrics))} else {// It will be copied by serializationtasksMetrics += ((taskRunner.taskId, metrics))}}}}val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)try {val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,retryAttempts, retryIntervalMs, timeout)if (response.reregisterBlockManager) {logWarning("Told to re-register on heartbeat")env.blockManager.reregister()}} catch {case NonFatal(t) => logWarning("Issue communicating with driver in heartbeater", t)}Thread.sleep(interval)}}}t.setDaemon(true)t.setName("Driver Heartbeater")t.start()}

这个心跳线程的作用是什么呢?其作用有两个:

q  更新正在处理的任务的测量信息;

q  通知BlockManagerMaster,此Executor上的BlockManager依然活着。

下面对心跳线程的实现详细分析下,读者可以自行选择是否需要阅读。

  初始化TaskSchedulerImpl后会创建心跳接收器HeartbeatReceiver。HeartbeatReceiver接受所有分配给当前Driver Application的Executor的心跳,并将Task、Task计量信息、心跳等交给TaskSchedulerImpl和DAGScheduler作进一步处理。创建心跳接收器的代码如下。

  private val heartbeatReceiver = env.actorSystem.actorOf(Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")

HeartbeatReceiver在收到心跳消息后,会调用TaskScheduler的executorHeartbeatReceived方法,代码如下。

  override def receiveWithLogging = {case Heartbeat(executorId, taskMetrics, blockManagerId) =>val response = HeartbeatResponse(!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))sender ! response}

executorHeartbeatReceived的实现代码如下。

    val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized {taskMetrics.flatMap { case (id, metrics) =>taskIdToTaskSetId.get(id).flatMap(activeTaskSets.get).map(taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.attempt, metrics))}}dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)

这段程序通过遍历taskMetrics,依据taskIdToTaskSetId和activeTaskSets找到TaskSetManager。然后将taskId、TaskSetManager.stageId、TaskSetManager .taskSet.attempt、TaskMetrics封装到Array[(Long, Int, Int, TaskMetrics)]的数组metricsWithStageIds中。最后调用了dagScheduler的executorHeartbeatReceived方法,其实现如下。

    listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))implicit val timeout = Timeout(600 seconds)Await.result(blockManagerMaster.driverActor ? BlockManagerHeartbeat(blockManagerId),timeout.duration).asInstanceOf[Boolean]

dagScheduler将executorId、metricsWithStageIds封装为SparkListenerExecutorMetricsUpdate事件,并post到listenerBus中,此事件用于更新Stage的各种测量数据。最后给BlockManagerMaster持有的BlockManagerMasterActor发送BlockManagerHeartbeat消息。BlockManagerMasterActor在收到消息后会匹配执行heartbeatReceived方法(会在4.3.1节介绍)。heartbeatReceived最终更新BlockManagerMaster对BlockManager最后可见时间(即更新BlockManagerId对应的BlockManagerInfo的_lastSeenMs,见代码清单3-44)。

代码清单3-44         BlockManagerMasterActor的心跳处理

private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {if (!blockManagerInfo.contains(blockManagerId)) {blockManagerId.isDriver && !isLocal} else {blockManagerInfo(blockManagerId).updateLastSeenMs()true}}

local模式下Executor的心跳通信过程,可以用图3-3来表示。

图3-3       Executor的心跳通信过程

注意:在非local模式中Executor发送心跳的过程是一样的,主要的区别是Executor进程与Driver不在同一个进程,甚至不在同一个节点上。

 

接下来会初始化块管理器BlockManager,代码如下。

env.blockManager.initialize(applicationId)

具体的初始化过程,请参阅第4章。

未完待续。。。

后记:自己牺牲了7个月的周末和下班空闲时间,通过研究Spark源码和原理,总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前亚马逊、京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。我开始研究源码时的Spark版本是1.2.0,经过7个多月的研究和出版社近4个月的流程,Spark自身的版本迭代也很快,如今最新已经是1.6.0。目前市面上另外2本源码研究的Spark书籍的版本分别是0.9.0版本和1.2.0版本,看来这些书的作者都与我一样,遇到了这种问题。由于研究和出版都需要时间,所以不能及时跟上Spark的脚步,还请大家见谅。但是Spark核心部分的变化相对还是很少的,如果对版本不是过于追求,依然可以选择本书。

京东(现有满100减30活动):http://item.jd.com/11846120.html

当当:http://product.dangdang.com/23838168.html

转载于:https://www.cnblogs.com/jiaan-geng/p/5249682.html

《深入理解Spark:核心思想与源码分析》——SparkContext的初始化(叔篇)——TaskScheduler的启动...相关推荐

  1. 《深入理解Spark:核心思想与源码分析》——1.2节Spark初体验

    本节书摘来自华章社区<深入理解Spark:核心思想与源码分析>一书中的第1章,第1.2节Spark初体验,作者耿嘉安,更多章节内容可以访问云栖社区"华章社区"公众号查看 ...

  2. 《深入理解Spark:核心思想与源码分析》——第1章环境准备

    本节书摘来自华章社区<深入理解Spark:核心思想与源码分析>一书中的第1章环境准备,作者耿嘉安,更多章节内容可以访问云栖社区"华章社区"公众号查看 第1章 环 境 准 ...

  3. 《深入理解Spark:核心思想与源码分析》——3.10节创建和启动ExecutorAllocationManager...

    本节书摘来自华章社区<深入理解Spark:核心思想与源码分析>一书中的第3章,第3.10节创建和启动ExecutorAllocationManager,作者耿嘉安,更多章节内容可以访问云栖 ...

  4. 《深入理解Spark:核心思想与源码分析》——1.3节阅读环境准备

    本节书摘来自华章社区<深入理解Spark:核心思想与源码分析>一书中的第1章,第1.3节阅读环境准备,作者耿嘉安,更多章节内容可以访问云栖社区"华章社区"公众号查看 1 ...

  5. 深入理解Spark:核心思想与源码分析

    大数据技术丛书 深入理解Spark:核心思想与源码分析 耿嘉安 著 图书在版编目(CIP)数据 深入理解Spark:核心思想与源码分析/耿嘉安著. -北京:机械工业出版社,2015.12 (大数据技术 ...

  6. 《深入理解SPARK:核心思想与源码分析》(第1章)

    自己牺牲了7个月的周末和下班空闲时间,通过研究Spark源码和原理,总结整理的<深入理解Spark:核心思想与源码分析>一书现在已经正式出版上市,目前亚马逊.京东.当当.天猫等网站均有销售 ...

  7. 深入理解Spark:核心思想与源码分析. 3.9 启动测量系统MetricsSystem

    3.9 启动测量系统MetricsSystem MetricsSystem使用codahale提供的第三方测量仓库Metrics,有关Metrics的具体信息可以参考附录D.MetricsSystem ...

  8. spring源码分析第五天------springAOP核心原理及源码分析

    spring源码分析第五天------springAOP核心原理及源码分析 1. 面向切面编程.可以通过预 编译方式和运行期动态代理实现在不修改源代码的情况下给程序动态统一添加功能的一种技术 切面(A ...

  9. spring源码分析第四天------springmvc核心原理及源码分析

    spring源码分析第四天------springmvc核心原理及源码分析 1.基础知识普及 2. SpringMVC请求流程 3.SpringMVC代码流程 4.springMVC源码分析 4.1 ...

最新文章

  1. 基于超声波升压中周构建的150kHz的单管选频放大电路
  2. SAP创建生产订单时要求输入销售订单
  3. C++骑士走棋盘Knight tour算法(附完整源码)
  4. 安装lxml,抓取、解析网页
  5. 诗与远方:无题(七十一)- 雨季来了
  6. 这样讲闭包,你终生难忘
  7. 还在手写 Nginx 配置?试试这款可视化配置神器,太强了!
  8. 基于Proteus仿真8253音乐发生器
  9. 不用USB连接线或没有ADB驱动如何调试安卓
  10. 中国旅行包行业市场供需与战略研究报告
  11. wince tfp telnet
  12. Java 操作excel 插入删除列,插入删除图片
  13. python3.6实现随机森林算法(可视化)机器学习算法(赵志勇)学习笔记
  14. android 打开网页的两种方式.
  15. 最新Google 15道古怪面试题
  16. 可以说:未来10年这个行业依然值得进,天花板很高,月薪至少3W
  17. Parity Game(并查集)
  18. Redis(16) -- Redis集群
  19. 敏捷开发中的站立会应该怎么开?
  20. HTB-Entity

热门文章

  1. Python源码深度解析—float空闲对象缓存池
  2. [paper reading] CenterNet (Object as Points)
  3. jquery中ajax应用中的通用ajax()函数
  4. python+tkinter 输入框及Label
  5. 【nand2tetris实验0】windows找不到文件javaw的解决办法
  6. quartus调用D触发器DFF和JK触发器JKFF
  7. matlab矩阵的LU分解
  8. Python 解决面试题47 不用加减乘除做加法
  9. 洛谷 P1187 3D模型
  10. [bzoj 1452] [JSOI2009]Count