Spark支持三种模式的部署:YARN、Standalone以及Mesos。 Worker节点是Spark的工作节点,用于执行提交的作业。我们先从Worker节点的启动开始介绍。
  Spark中Worker的启动有多种方式,但是最终调用的都是org.apache.spark.deploy.worker.Worker类,启动Worker节点的时候可以传很多的参数:内存、核、工作目录等。如果你不知道如何传递,没关系,help一下即可:

[wyp@iteblogspark]$ ./bin/spark-classorg.apache.spark.deploy.worker.Worker -h
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Usage: Worker [options] <master>
Master must be a URL of the form spark://hostname:port
Options:
  -c CORES, --cores CORES  Number of cores to use
  -m MEM, --memory MEM     Amount of memory to use (e.g. 1000M, 2G)
  -d DIR, --work-dir DIR   Directory to run apps in (default: SPARK_HOME/work)
  -i HOST, --ip IP         Hostname to listen on (deprecated, please use --host or -h)
  -h HOST, --host HOST     Hostname to listen on
  -p PORT, --port PORT     Port to listen on (default: random)
  --webui-port PORT        Portfor web UI (default:8081)

  从上面的输出我们可以看出Worker的启动支持多达7个参数!这样每个都这样输入岂不是很麻烦?其实,我们不用担心,Worker节点启动地时候将先读取conf/spark-env.sh里面的配置,这些参数配置的解析都是由Worker中的WorkerArguments类进行解析的。如果你没有设置内存,那么将会把Worker启动所在机器的所有内存(会预先留下1G内存给操作系统)分给Worker,具体的代码实现如下:

def inferDefaultMemory(): Int = {
    valibmVendor = System.getProperty("java.vendor").contains("IBM")
    vartotalMb = 0
    try{
      valbean = ManagementFactory.getOperatingSystemMXBean()
      if(ibmVendor) {
        valbeanClass = Class.forName("com.ibm.lang.management.OperatingSystemMXBean")
        valmethod = beanClass.getDeclaredMethod("getTotalPhysicalMemory")
        totalMb= (method.invoke(bean).asInstanceOf[Long] /1024 / 1024).toInt
      }else {
        valbeanClass = Class.forName("com.sun.management.OperatingSystemMXBean")
        valmethod = beanClass.getDeclaredMethod("getTotalPhysicalMemorySize")
        totalMb= (method.invoke(bean).asInstanceOf[Long] /1024 / 1024).toInt
      }
    }catch {
      casee: Exception => {
        totalMb= 2*1024
        System.out.println("Failed to get total physical memory. Using "+ totalMb + " MB")
      }
    }
    // Leave out 1 GB for the operating system, but don't return a negative memory size
    math.max(totalMb -1024, 512)
  }

  同样,如果你没设置cores,那么Spark将会获取你机器的所有可用的核作为参数传进去。解析完参数之后,将运行preStart函数,进行一些启动相关的操作,比如判断是否已经向Master注册过,创建工作目录,启动Worker的WEB UI,向Master进行注册等操作,如下:

overridedef preStart() {
  assert(!registered)
  logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
    host, port, cores, Utils.megabytesToString(memory)))
  logInfo("Spark home: "+ sparkHome)
  createWorkDir()
  context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
  webUi= newWorkerWebUI(this, workDir, Some(webUiPort))
  webUi.bind()
  registerWithMaster()
  metricsSystem.registerSource(workerSource)
  metricsSystem.start()
}

  Worker向Master注册的超时时间为20秒,如果在这20秒内没有成功地向Master注册,那么将会进行重试,重试的次数为3,如过重试的次数大于等于3,那么将无法启动Worker,这时候,你就该看看你的网络环境或者你的Master是否存在问题了。
Worker在运行的过程中将会触发许多的事件, 比如:RegisteredWorker、SendHeartbeat、WorkDirCleanup以及MasterChanged等等,收到不同的事件,Worker进行不同的操作。比如,如果需要运行一个作业,Worker将会启动一个或多个ExecutorRunner,具体的代码可参见receiveWithLogging函数:

overridedef receiveWithLogging= {
    caseRegisteredWorker(masterUrl, masterWebUiUrl) =>
    caseSendHeartbeat =>
    caseWorkDirCleanup =>
    caseMasterChanged(masterUrl, masterWebUiUrl) =>
    caseHeartbeat =>
     
    caseRegisterWorkerFailed(message) =>
     
    caseLaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_)=>
      
    caseExecutorStateChanged(appId, execId, state, message, exitStatus)=>
       
    caseKillExecutor(masterUrl, appId, execId) =>
       
    caseLaunchDriver(driverId, driverDesc) => {
      
    caseKillDriver(driverId) => {
    caseDriverStateChanged(driverId, state, exception) => {
      
    casex: DisassociatedEvent if x.remoteAddress == masterAddress =>
    
    caseRequestWorkerState => {
  }

  上面的代码是经过处理的,其实receiveWithLogging 方法是从ActorLogReceive继承下来的。
  当Worker节点Stop的时候,将会执行postStop函数,如下:

overridedef postStop() {
  metricsSystem.report()
  registrationRetryTimer.foreach(_.cancel())
  executors.values.foreach(_.kill())
  drivers.values.foreach(_.kill())
  webUi.stop()
  metricsSystem.stop()
}

  杀掉所有还未执行完的executors、drivers等,操作。这方法也是从Actor继承下来的。

Spark源码分析之Worker相关推荐

  1. Spark源码分析之Worker启动通信机制

    Worker是spark的工作节点,主要负责接受Master指令,启动或者杀掉Executor,Driver等;汇报Driver或者Executor状态到Master;发送心跳请求到Master等等 ...

  2. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  3. spark 源码分析之十九 -- DAG的生成和Stage的划分

    上篇文章 spark 源码分析之十八 -- Spark存储体系剖析 重点剖析了 Spark的存储体系.从本篇文章开始,剖析Spark作业的调度和计算体系. 在说DAG之前,先简单说一下RDD. 对RD ...

  4. Spark 源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 一. 启动篇 (一) 引子 在spark-shell终端执行 val arr = Array(1,2,3,4) val rdd ...

  5. Spark源码分析之九:内存管理模型

    Spark是现在很流行的一个基于内存的分布式计算框架,既然是基于内存,那么自然而然的,内存的管理就是Spark存储管理的重中之重了.那么,Spark究竟采用什么样的内存管理模型呢?本文就为大家揭开Sp ...

  6. spark 源码分析 Blockmanager

    原文链接 参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD  Spark的特点就是可以将RDD cache在memo ...

  7. Spark源码分析 – DAGScheduler

    DAGScheduler的架构其实非常简单, 1. eventQueue, 所有需要DAGScheduler处理的事情都需要往eventQueue中发送event 2. eventLoop Threa ...

  8. spark 源码分析之十八 -- Spark存储体系剖析

    本篇文章主要剖析BlockManager相关的类以及总结Spark底层存储体系. 总述 先看 BlockManager相关类之间的关系如下: 我们从NettyRpcEnv 开始,做一下简单说明. Ne ...

  9. spark 源码分析之二十 -- Stage的提交

    引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...

最新文章

  1. 1-NET UX1000-实战-配置-Lync Server 2010-集成
  2. 一生都要Debug,我们最需要掌握哪些硬技能?
  3. PKI/CA (4)根CA信任模型“概述”
  4. RabbitMQ路由模式
  5. C# 并行运算方法简析
  6. 7-19 求链式线性表的倒数第K项 (20 分)(思路分析+极简代码+超容易理解)
  7. P3449-[POI2006]PAL-Palindromes【结论题,字符串hash】
  8. 【博弈论】【SG函数】bzoj1777 [Usaco2010 Hol]rocks 石头木头
  9. 值得电商美工借鉴的购物APP页面设计,让人无法自拔
  10. 确认无疑,.NET 6是迄今为止最快的.NET
  11. echo 在shell及脚本中显示色彩及闪烁警告效果
  12. SpringBoot日志logback-spring.xml分环境
  13. 电子秤PCBA方案的功能及设计
  14. 线性代数之——矩阵乘法和逆矩阵
  15. 爬虫项目5[爬取拉钩网招聘数据]
  16. 你知道吗?一个比房地产更大的超级泡沫正风靡全国!
  17. 广东省清远市谷歌卫星地图下载
  18. 2021年安徽省职业院校技能大赛网络搭建与应用竞赛
  19. 人生25句最美丽的名句
  20. 开课吧JAVAEE学习首周感受

热门文章

  1. eclipse怎么导出一个Java项目(莫要错过,最详细教程!)
  2. java 捕获特定异常_java – 使用特定消息捕获异常
  3. mac VMware Fusion 虚拟机键盘可以使用,鼠标无法使用排查思路及解决方法
  4. 教室工资管理系统c语言课程设计csdn,工资管理系统(C编写)
  5. 均匀白噪声的定义及特点_噪声的物理本质是什么?
  6. php5.5 集成环境,windows下配置php5.5开发环境及开发扩展_PHP
  7. python 3.6.0新语法_详解Python3.6正式版新特性
  8. java servlet获取url参数_Java Servlet如何获取请求的参数值?
  9. java程序设计基础29_java程序设计基础实验29
  10. c语言课程设计平时成绩,计算中心