worker的启动流程和master的启动流程相同,区别就是在worker启动完成后,要向Master注册、汇报资源。在Worker的onStart()方法中会调用registerWithMaster()方法来向Master注册:

  private def registerWithMaster() {// onDisconnected may be triggered multiple times, so don't attempt registration// if there are outstanding registration attempts scheduled.registrationRetryTimer match {case None =>registered = false//Master高可用,向所有的Master注册registerMasterFutures = tryRegisterAllMasters()connectionAttemptCount = 0registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {Option(self).foreach(_.send(ReregisterWithMaster))}},INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,TimeUnit.SECONDS))case Some(_) =>logInfo("Not spawning another attempt to register with the master, since there is an" +" attempt scheduled already.")}}

需要注意的是,这里要向所有的Master注册,因为Master是高可用的,不止1个Master。注册过程首先要获取到Master,然后调用sendRegisterMessageToMaster方法向Master发送消息。这样Master就能通过receive方法接收到来自Worker的消息,这里会进行模式匹配,匹配到RegisterWorker

case RegisterWorker(id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>logInfo("Registering worker %s:%d with %d cores, %s RAM".format(workerHost, workerPort, cores, Utils.megabytesToString(memory)))if (state == RecoveryState.STANDBY) {workerRef.send(MasterInStandby)} else if (idToWorker.contains(id)) {//重复注册workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))} else {//第一次注册//将worker的id、host、port、cores、memory都统统封装到WorkerInfo中val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,workerRef, workerWebUiUrl)if (registerWorker(worker)) { //Worker注册成功persistenceEngine.addWorker(worker)workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))schedule()} else {  //Worker注册失败val workerAddress = worker.endpoint.addresslogWarning("Worker registration failed. Attempted to re-register worker at same " +"address: " + workerAddress)workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "+ workerAddress))}}

这里看第一次注册,会作注册成功的状态判断registerWorker(worker),这里面有最关键的一行代码:

 workers += worker

这个workers就是个HashSet:

val workers = new HashSet[WorkerInfo]

将刚刚启动的Worker对象worker,添加到Master的HashSet中去

Spark Worker启动源码相关推荐

  1. Spark Master启动源码分析

    文章目录 一.RPC通信环境 1.创建Netty的RPC通信环境: 2.启动Netty RPC 二.注册EndPoint 总结 创建RPC通信环境 注册EndPoint 感悟 底层使用netty通信, ...

  2. spark任务运行源码

    spark任务运行源码 spark是一个分布式计算引擎,任务的运行是计算引擎的核心. 一个spark任务怎么能运行起来呢? 1 spark服务启动(Master,Worker): 2 应用程序提交 3 ...

  3. Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

    Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadO ...

  4. Spark RPC框架源码分析(二)RPC运行时序

    前情提要: Spark RPC框架源码分析(一)简述 一. Spark RPC概述 上一篇我们已经说明了Spark RPC框架的一个简单例子,Spark RPC相关的两个编程模型,Actor模型和Re ...

  5. Myth源码解析系列之五- 服务启动源码解析

    通过前面几篇文章,我们搭建了环境,也进行了分布式事务服务的体验,相信大家对myth也有了一个大体直观的了解,接下来我们将正式步入源码解析之旅~~ order服务启动源码解析(myth-demo-spr ...

  6. spring-boot启动源码学习-1

    2019独角兽企业重金招聘Python工程师标准>>> spring-boot启动源码分析-启动初始化 主要对spring-boot的启动流程中的启动初始化进行学习,学习spring ...

  7. 【Linux 内核 内存管理】Linux 内核内存布局 ④ ( ARM64 架构体系内存分布 | 内核启动源码 start_kernel | 内存初始化 mm_init | mem_init )

    文章目录 一.ARM64 架构体系内存分布 二.Linux 内核启动源码 start_kernel 三.内存初始化源码 mm_init 四.内存初始化源码 mem_init 一.ARM64 架构体系内 ...

  8. 【Android 启动过程】Activity 启动源码分析 ( ActivityThread 流程分析 二 )

    文章目录 前言 一.ActivityManagerService.attachApplicationLocked 二.ActivityStackSupervisor.attachApplication ...

  9. 【Android 启动过程】Activity 启动源码分析 ( ActivityThread -> Activity、主线程阶段 二 )

    文章目录 前言 一.ActivityThread 类 handleLaunchActivity -> performLaunchActivity 方法 二.Instrumentation.new ...

最新文章

  1. 深入Phtread(三):线程的同步-Condition Variables
  2. Keil编译产生的RO,RW和ZI是什么
  3. cudnn v4安装
  4. Linux学习之crontab定时任务
  5. C++ Primer 5th笔记(chap 19 特殊工具与技术)类成员指针
  6. git add后取消_Python 命令行之旅:使用 click 实现 git 命令
  7. java 布尔表达式_java - 布尔值,条件运算符和自动装箱
  8. php点击后增加html元素,如何动态生成html元素以及为元素追加属性的方法介绍(附代码)...
  9. c++ 结构体遍历_PBRT-E4.3-层次包围体(BVH)(一)
  10. 安装fail2ban
  11. [tensorflow]tensorflw2.1.0张量和变量介绍
  12. matlab替换矩阵中元素的值,怎么修改矩阵中的某些元素 或者简单点说保留矩阵中的元素...
  13. p12文件和mobileprovision文件导入到xcode如何使用
  14. Arm中国换帅风波始末 | 钛媒体深度
  15. FS4066耐高压1到4节内置MOS的锂电池充电管理芯片
  16. 美国苹果股价走势图(抢先看美股三大指数新动态)
  17. 辛弃疾最经典的10首词
  18. 微信小程序开发之——Video
  19. 在Python中输入汉字以及六个撇
  20. 阿里云ACP大数据专业认证,值得报名吗?

热门文章

  1. 怎么去控制浏览器对资源文件的处理行为
  2. 巧用Eclipse Java编辑器调试
  3. 用vue优雅地编写UI组件的几条指导原则
  4. idea搭建简单spring-boot项目
  5. java IO 解析
  6. CentOS 6.0 VNC远程桌面配置
  7. Exchange禁用中继后仍然被中继的处理方法
  8. Android TextView中设定个别文字字体显示格式
  9. 安装Open Live Writer,添加SyntaxHighlighter实现代码高亮
  10. 大名鼎鼎的Requests库用了什么编码风格?