1.简述

spark 源码分析第一篇,准备从最基本的集群搭建入手,全面剖析spark。希望自己能对spark又更深入的理解。希望对读者有所帮助。言归正传,Spark standalone 模式,架构图:

这里先讨论Master和worker启动,以及之间的通讯:worker向master注册,worker向master发送heartbeat。

2.Master及启动流程

继承ThreadSafeRpcEndpoint类。启动master,会执行它自己的onStart函数。

2.1.执行start-master.sh脚本,->spark-daemon.sh "org.apache.spark.deploy.master.Master" -> spark-class start ->org.apache.spark.launcher.Main ->org.apache.spark.deploy.master.Master

2.2.调用Master的main方法,执行

val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)

在此方法里创建RpcEnv对象和master对象:

def startRpcEnvAndEndpoint( host: String, port: Int, webUiPort: Int, conf: SparkConf): (RpcEnv, Int, Option[Int]) = { val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr) val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest) (rpcEnv, portsResponse.webUIPort, portsResponse.restPort) }

2.3.创建master对象,执行onstart方法。

(1).onstart函数做了如下的事情:

启动web UI

发送CheckForWorkerTimeOut消息给自己,移除超时的worker。

根据参数判断是否启动rest的接口,

注册master resources到master MetricsSystem

根据参数指定的recovery mode进行恢复。

(2) receive 函数接收了哪些消息

ElectedLeader

CompleteRecovery

RevokedLeadership 收回leader权力

RegisterWorker 注册worker :注册成功后,发送RegisteredWorker消息给worker

RegisterApplication 注册application

ExecutorStateChanged

DriverStateChanged

Heartbeat

MasterChangeAcknowledged

WorkerSchedulerStateResponse

WorkerLatestState

CheckForWorkerTimeOut

RequestSubmitDriver 请求提交driver

RequestKillDriver

RequestDriverStatus

RequestMasterState

BoundPortsRequest

3.worker及启动流程

同样继承ThreadSafeRpcEndpoint类.

3.1.执行start-slave.sh脚本 -> spark-daemon.sh start "org.apache.spark.deploy.worker.Worker" ->-> spark-class start ->org.apache.spark.launcher.Main ->org.apache.spark.deploy.worker.Worker

3.2.执行Worker的main方法

启动rpcEnv对象和Worker对象。

val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores, args.memory, args.masters, args.workDir, conf = conf)
def startRpcEnvAndEndpoint( host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None, conf: SparkConf = new SparkConf): RpcEnv = {  // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("") val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr) val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_)) rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory, masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr)) rpcEnv }

3.3.创建worker对象,执行onstart方法。

(1).onstart函数做了如下的事情:

在SPAKR_HOME目录下创建work目录

启动外部shuffle服务

启动workerweb UI

注册到master上,根据 val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)获取masterRef。

启动MetricsSystem

3.4.接收到master注册完成的消息RegisteredWorker之后调用handleRegisterResponse方法

private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized { msg match { case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress) => if (preferConfiguredMasterAddress) { logInfo("Successfully registered with master " + masterAddress.toSparkURL) } else { logInfo("Successfully registered with master " + masterRef.address.toSparkURL) } registered = true //设置masterRef changeMaster(masterRef, masterWebUiUrl, masterAddress) //定时发送heartbeat forwordMessageScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(SendHeartbeat) } }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS) //是否清理workdir if (CLEANUP_ENABLED) { logInfo( s"Worker cleanup enabled; old application directories will be deleted in: $workDir") forwordMessageScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(WorkDirCleanup) } }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS) }  val execs = executors.values.map { e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state) } masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))  case RegisterWorkerFailed(message) => if (!registered) { logError("Worker registration failed: " + message) System.exit(1) }  case MasterInStandby => // Ignore. Master not yet ready. } }

4.总结

最好是跟着上面的讲接把源码理一遍,就清晰很多了, 接下来会分析driver的启动。

spark启动的worker节点是localhost_「Spark源码分析1」Spark standalone模式Master和Worker启动流程...相关推荐

  1. 【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)

    承接[[深度挖掘 RocketMQ底层源码]「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)] pullBlockIfNotFound方法 通过该方法获取 ...

  2. 「从源码中学习」面试官都不知道的Vue题目答案

    前言 当回答面试官问及的Vue问题,我们除了照本宣科的回答外,其实还可以根据少量的源码来秀一把,来体现出你对Vue的深度了解. 本文会陆续更新,此次涉及以下问题: "new Vue()做了什 ...

  3. springmvc项目在启动完成之后执行一次方法_SpringMVC源码分析

    一 SpringMVC运行原理 二 SpingMVC源码分析 1 DispatcherServlet 1.1 DispatcherServlet继承结构 ServletConfig对象获取Init标签 ...

  4. 「Leakcanary 源码分析」看这一篇就够了

    image.png 「Leakcanary 」是我们经常用于检测内存泄漏的工具,简单的使用方式,内存泄漏的可视化,是我们开发中必备的工具之一. 分析源码之前 Leakcanary 大神的 github ...

  5. 自动驾驶 Apollo 源码分析系列,感知篇(二):Perception 如何启动?

    从 Apollo 的官方文档,我们很容易得知 Perception 是核心的组件之一,但像所有的 C++ 程序一样,每个应用都有一个 Main 函数入口,那么引出本文要探索的 2 个问题: Perce ...

  6. java 最少使用(lru)置换算法_「Redis源码分析」Redis中的LRU算法实现

    如果对我的文章感兴趣.希望阅读完可以得到你的一个[三连],这将是对我最大的鼓励和支持. LRU是什么 LRU(least recently used)是一种缓存置换算法.即在缓存有限的情况下,如果有新 ...

  7. Spark 2.2 Core :TimSort 的原理与源码分析

    文章目录 1.想理解TimeSort必须先理解归并排序 2. timsort 其实只是对归并排序进行了一系列的改进. 3.Timsort的核心过程 4.源代码 5.全局流程图 6.演示 1.想理解Ti ...

  8. 「Ceph源码分析」纠删码解码

    存储系统:ceph-14.2.22 操作系统:ubuntu-server-18.04 总体框架 源码分析 ECBackend::objects_read_and_reconstruct [ 文件路径 ...

  9. C/C++编程打造单机麻将「附源码+说明文档」一个不错的入门项目

    项目介绍 基于最新版 Cocos2d-X 3.17 与 cocostudio V3.10 开发的单机麻将游戏,麻将算法为商业级麻将服务器端算法,整个项目代码精简.注释详细. 如果你刚刚接触Cocos2 ...

  10. Android 9(P)之init进程启动源码分析指南之一

         Android 9 之init进程启动源码分析指南之一 Android 9 (P) 系统启动及进程创建源码分析目录: Android 9 (P)之init进程启动源码分析指南之一 Andro ...

最新文章

  1. PowerShell批量修改邮箱配额和已删除保留期
  2. STM32 应用程序加密的一种设计方案
  3. React - S1
  4. mysql or_MySQL中or语句用法示例
  5. 链接详解--共享库命名
  6. delphi ,安装插件
  7. 中国期货交易技术的逆袭之路
  8. Java实现 蓝桥杯VIP 算法训练 会议中心
  9. Cesium 纹理贴图
  10. ArcGIS影像空值填充\插补
  11. [逆向]汇编movs,stos,rep指令讲解
  12. 计算机专业面试银行的自我介绍,银行面试自我介绍范文3分钟
  13. ① 如何优雅快乐打出漂亮LaTeX数学公式——小乐数学zzllrr Mather公式编辑器教程
  14. 【noip模拟题】天神下凡(贪心)
  15. 用pip安装pytorch
  16. 我的世界无限法则服务器怎么用,我的世界无限法则版
  17. 英文邮件模板--向论文作者索要源代码(write an email requesting for code)
  18. 构造方法--带参构造方法
  19. 字符串转换成JSON
  20. 基于GA优化BP神经网络的传感器故障诊断算法matlab仿真

热门文章

  1. iOS开发UI篇—程序启动原理和UIApplication
  2. ubuntu下MySQL的安装及远程连接配置(转)
  3. Error: could not open `C:\Program Files\Java\jre6\lib\i386\jvm.cfg')
  4. pureftp在企业中的应用及配置
  5. 今天博客园肿了吗?希望团队修复一下
  6. 今天写了个查看 ABAP 类层级的程序
  7. 跟我一起学写Makefile-Lesson 8
  8. Python爬虫开发【第1篇】【正则表达式】
  9. Canvas坐标轴中的Y轴距离是X轴的两倍
  10. 【并行计算-CUDA开发】CUDA shared memory bank 冲突