工作原理图

源代码分析

包名:org.apache.spark.deploy.worker

启动driver入口点:registerWithMaster方法中的case LaunchDriver

case LaunchDriver(driverId, driverDesc) => {
    logInfo(s"Asked to launch driver $driverId")
    // 创建DriverRunner对象启动Driver
    val driver = new DriverRunner(
    conf,
    driverId,
    workDir,
    sparkHome,
    driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
10      self,
11      akkaUrl)
12      // 将driver加入本地缓存
13      drivers(driverId) = driver
14      driver.start()
15   
16      // 增加已使用core
17      coresUsed += driverDesc.cores
18      // 增加已使用内存
19      memoryUsed += driverDesc.mem
20  }

DriverRunner

管理一个driver的执行,包括失败时自动重启driver,这种方式仅仅适用于standalone集群部署模式

DriverRunner类中start方法实现

def start() = {
    // 创建新线程
    new Thread("DriverRunner for " + driverId) {
      override def run() {
        try {
          // 创建driver工作目录
          val driverDir = createWorkingDirectory()
          // 下载应用所需的的Jar包
          val localJarFilename = downloadUserJar(driverDir)
10   
11            def substituteVariables(argument: String): String = argument match {
12              case "{{WORKER_URL}}" => workerUrl
13              case "{{USER_JAR}}" => localJarFilename
14              case other => other
15            }
16   
17            // TODO: If we add ability to submit multiple jars they should also be added here
18            // 构建ProcessBuilder对象,传入启动driver命令(所需内存大小)
19            val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem,
20              sparkHome.getAbsolutePath, substituteVariables)
21            // 启动driver进程
22            launchDriver(builder, driverDir, driverDesc.supervise)
23          }
24          catch {
25            case e: Exception => finalException = Some(e)
26          }
27   
28          // Driver退出状态处理
29          val state =
30            if (killed) {
31              DriverState.KILLED
32            } else if (finalException.isDefined) {
33              DriverState.ERROR
34            } else {
35              finalExitCode match {
36                case Some(0) => DriverState.FINISHED
37                case _ => DriverState.FAILED
38              }
39            }
40   
41          finalState = Some(state)
42          // 向Driver所属worker发送DriverStateChanged消息
43          worker ! DriverStateChanged(driverId, state, finalException)
44        }
45      }.start()
46  }

LaunchExecutor

管理LaunchExecutor的启动

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
    if (masterUrl != activeMasterUrl) {
    logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
    } else {
    try {
      logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
 
      // Create the executor's working directory
      // 创建executor本地工作目录
10        val executorDir = new File(workDir, appId + "/" + execId)
11        if (!executorDir.mkdirs()) {
12          throw new IOException("Failed to create directory " + executorDir)
13        }
14   
15        // Create local dirs for the executor. These are passed to the executor via the
16        // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the
17        // application finishes.
18        val appLocalDirs = appDirectories.get(appId).getOrElse {
19          Utils.getOrCreateLocalRootDirs(conf).map { dir =>
20            Utils.createDirectory(dir).getAbsolutePath()
21          }.toSeq
22        }
23        appDirectories(appId) = appLocalDirs
24        // 创建ExecutorRunner对象
25        val manager = new ExecutorRunner(
26          appId,
27          execId,
28          appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
29          cores_,
30          memory_,
31          self,
32          workerId,
33          host,
34          webUi.boundPort,
35          publicAddress,
36          sparkHome,
37          executorDir,
38          akkaUrl,
39          conf,
40          appLocalDirs, ExecutorState.LOADING)
41        // executor加入本地缓存
42        executors(appId + "/" + execId) = manager
43        manager.start()
44        // 增加worker已使用core
45        coresUsed += cores_
46        // 增加worker已使用memory
47        memoryUsed += memory_
48        // 通知master发送ExecutorStateChanged消息
49        master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
50      }
51      // 异常情况处理,通知master发送ExecutorStateChanged FAILED消息
52      catch {
53        case e: Exception => {
54          logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
55          if (executors.contains(appId + "/" + execId)) {
56            executors(appId + "/" + execId).kill()
57            executors -= appId + "/" + execId
58          }
59          master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
60            Some(e.toString), None)
61        }
62      }
63  }

总结

1、Worker、Driver、Application启动后都会向Master进行注册,并缓存到Master内存数据模型中
2、完成注册后发送LaunchExecutor、LaunchDriver到Worker
3、Worker收到消息后启动executor和driver进程,并调用Worker的ExecutorStateChanged和DriverStateChanged方法
4、发送ExecutorStateChanged和DriverStateChanged消息到Master的,根据各自的状态信息进行处理,最重要的是会调用schedule方法进行资源的重新调度

转载于:https://www.cnblogs.com/jianyuan/p/Spark%E7%B3%BB%E5%88%97%E4%B9%8BWorker%E5%B7%A5%E4%BD%9C%E5%8E%9F%E7%90%86.html

Spark系列(八)Worker工作原理相关推荐

  1. Git使用 从入门到入土 收藏吃灰系列(四) Git工作原理

    文章目录 一.前言 一.Git基本理论(核心) 1.1工作区 1.2工作流程 一.前言 参考安装Git 详细安装教程 参考视频B站 Git最新教程通俗易懂,这个有点长,感觉讲的精华不多 参考视频『Gi ...

  2. Git使用 从入门到入土 收藏吃灰系列 (八) 什么是分支 分支的作用

    文章目录 一.前言 二.Git分支 2.1什么是分支? 2.2 分支有什么用? 一.前言 参考安装Git 详细安装教程 参考视频B站 Git最新教程通俗易懂,这个有点长,感觉讲的精华不多 参考视频『G ...

  3. JAVA开发运维(nginx工作原理)

    nginx源码目录结构: . ├── auto 自动检测系统环境以及编译相关的脚本 │ ├── cc 关于编译器相关的编译选项的检测脚本 │ ├── lib nginx编译所需要的一些库的检测脚本 │ ...

  4. iommu 工作原理解析之dma remapping

    深入了解iommu系列二:iommu 工作原理解析之dma remapping: https://zhuanlan.zhihu.com/p/479963917

  5. spark任务shell运行_大数据系列:Spark的工作原理及架构

    介绍 本Apache Spark教程将说明Apache Spark的运行时架构以及主要的Spark术语,例如Apache SparkContext,Spark shell,Apache Spark应用 ...

  6. 深入浅出理解 Spark:环境部署与工作原理

    一.Spark 概述 Spark 是 UC Berkeley AMP Lab 开源的通用分布式并行计算框架,目前已成为 Apache 软件基金会的顶级开源项目.Spark 支持多种编程语言,包括 Ja ...

  7. 菜鸟学Kubernetes(K8s)系列——(七)关于Kubernetes底层工作原理

    菜鸟学Kubernetes(K8s)系列--(七)关于Kubernetes底层工作原理 Kubernetes系列文章 主要内容 菜鸟学Kubernetes(K8s)系列--(一)关于Pod和Names ...

  8. Spark工作原理入门

    Spark工作原理入门 文章目录 Spark工作原理入门 1.功能概要 基本描述 运用场景 实际使用 2.模块组成 HDFS MLlib Mesos Tachyon GraphX Spark SQL ...

  9. 【Nginx系列】Nginx配置使用与工作原理

    热门系列: [Linux系列]Linux实践(一):linux常用命令 程序人生,精彩抢先看 目录 1.Nginx介绍 1.1 什么是Nginx? 1.2 Nginx能做什么 1.3 为什么要选择用N ...

最新文章

  1. 创建一个水平盒子java_盒子模型理解
  2. 登录form php一个页面跳转页面,form表单页面跳转方式提交练习
  3. vue方法传值到data_Vue 组件传值几种常用方法【总结】
  4. Spork: Pig on Spark实现分析
  5. 让无线网卡同时工作在 AP 和 STA 模式
  6. 算数运算符与关系运算符_【Flutter 110】Flutter手把手教程Dart语言——运算符
  7. Html代码打包后如何修改,html代码打包封装成APP教程
  8. BZOJ5312 冒险 势能分析、线段树
  9. 如何速成java_极*Java速成教程 - (2)
  10. c语言一个字符串怎么做除法,c语言实数除法怎样保留小数部分
  11. Excel如何构建简单的透视表
  12. 谈谈反向代理Nginx
  13. 如何做客户分析?客户分析的内容有哪些?
  14. 2 什么是计算机网络的拓扑结构,什么是网络拓扑?
  15. ArcGIS中消除两幅卫星影像之间色带问题
  16. LeetCode:20 vaild parentless
  17. PPT文件不能编辑,什么情况?
  18. join语句的执行顺序
  19. codeforces E. Sum of Digits
  20. 危险漫步_有关2010年糖尿病漫步的详细信息和感谢

热门文章

  1. (easyui datagrid+mvc+json)之asp.net分页查询
  2. 查询数据库,处理NULL值问题
  3. 分布式经典书籍--深入分布式缓存 从原理到实践
  4. 后台开发经典书籍--高性能mysql
  5. c:线性表的链式表示
  6. es6 Set的几种使用场景
  7. iOS_“图片浏览选择”功能的编写思路
  8. django 文档生成器
  9. python Gevent – 高性能的Python并发框架
  10. 一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序...