Spark Worker启动源码
worker的启动流程和master的启动流程相同,区别就是在worker启动完成后,要向Master注册、汇报资源。在Worker的onStart()方法中会调用registerWithMaster()方法来向Master注册:
private def registerWithMaster() {// onDisconnected may be triggered multiple times, so don't attempt registration// if there are outstanding registration attempts scheduled.registrationRetryTimer match {case None =>registered = false//Master高可用,向所有的Master注册registerMasterFutures = tryRegisterAllMasters()connectionAttemptCount = 0registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {Option(self).foreach(_.send(ReregisterWithMaster))}},INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,TimeUnit.SECONDS))case Some(_) =>logInfo("Not spawning another attempt to register with the master, since there is an" +" attempt scheduled already.")}}
需要注意的是,这里要向所有的Master注册,因为Master是高可用的,不止1个Master。注册过程首先要获取到Master,然后调用sendRegisterMessageToMaster方法向Master发送消息。这样Master就能通过receive方法接收到来自Worker的消息,这里会进行模式匹配,匹配到RegisterWorker
case RegisterWorker(id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>logInfo("Registering worker %s:%d with %d cores, %s RAM".format(workerHost, workerPort, cores, Utils.megabytesToString(memory)))if (state == RecoveryState.STANDBY) {workerRef.send(MasterInStandby)} else if (idToWorker.contains(id)) {//重复注册workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))} else {//第一次注册//将worker的id、host、port、cores、memory都统统封装到WorkerInfo中val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,workerRef, workerWebUiUrl)if (registerWorker(worker)) { //Worker注册成功persistenceEngine.addWorker(worker)workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))schedule()} else { //Worker注册失败val workerAddress = worker.endpoint.addresslogWarning("Worker registration failed. Attempted to re-register worker at same " +"address: " + workerAddress)workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "+ workerAddress))}}
这里看第一次注册,会作注册成功的状态判断registerWorker(worker),这里面有最关键的一行代码:
workers += worker
这个workers就是个HashSet:
val workers = new HashSet[WorkerInfo]
将刚刚启动的Worker对象worker,添加到Master的HashSet中去
Spark Worker启动源码相关推荐
- Spark Master启动源码分析
文章目录 一.RPC通信环境 1.创建Netty的RPC通信环境: 2.启动Netty RPC 二.注册EndPoint 总结 创建RPC通信环境 注册EndPoint 感悟 底层使用netty通信, ...
- spark任务运行源码
spark任务运行源码 spark是一个分布式计算引擎,任务的运行是计算引擎的核心. 一个spark任务怎么能运行起来呢? 1 spark服务启动(Master,Worker): 2 应用程序提交 3 ...
- Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法
Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadO ...
- Spark RPC框架源码分析(二)RPC运行时序
前情提要: Spark RPC框架源码分析(一)简述 一. Spark RPC概述 上一篇我们已经说明了Spark RPC框架的一个简单例子,Spark RPC相关的两个编程模型,Actor模型和Re ...
- Myth源码解析系列之五- 服务启动源码解析
通过前面几篇文章,我们搭建了环境,也进行了分布式事务服务的体验,相信大家对myth也有了一个大体直观的了解,接下来我们将正式步入源码解析之旅~~ order服务启动源码解析(myth-demo-spr ...
- spring-boot启动源码学习-1
2019独角兽企业重金招聘Python工程师标准>>> spring-boot启动源码分析-启动初始化 主要对spring-boot的启动流程中的启动初始化进行学习,学习spring ...
- 【Linux 内核 内存管理】Linux 内核内存布局 ④ ( ARM64 架构体系内存分布 | 内核启动源码 start_kernel | 内存初始化 mm_init | mem_init )
文章目录 一.ARM64 架构体系内存分布 二.Linux 内核启动源码 start_kernel 三.内存初始化源码 mm_init 四.内存初始化源码 mem_init 一.ARM64 架构体系内 ...
- 【Android 启动过程】Activity 启动源码分析 ( ActivityThread 流程分析 二 )
文章目录 前言 一.ActivityManagerService.attachApplicationLocked 二.ActivityStackSupervisor.attachApplication ...
- 【Android 启动过程】Activity 启动源码分析 ( ActivityThread -> Activity、主线程阶段 二 )
文章目录 前言 一.ActivityThread 类 handleLaunchActivity -> performLaunchActivity 方法 二.Instrumentation.new ...
最新文章
- 深入Phtread(三):线程的同步-Condition Variables
- Keil编译产生的RO,RW和ZI是什么
- cudnn v4安装
- Linux学习之crontab定时任务
- C++ Primer 5th笔记(chap 19 特殊工具与技术)类成员指针
- git add后取消_Python 命令行之旅:使用 click 实现 git 命令
- java 布尔表达式_java - 布尔值,条件运算符和自动装箱
- php点击后增加html元素,如何动态生成html元素以及为元素追加属性的方法介绍(附代码)...
- c++ 结构体遍历_PBRT-E4.3-层次包围体(BVH)(一)
- 安装fail2ban
- [tensorflow]tensorflw2.1.0张量和变量介绍
- matlab替换矩阵中元素的值,怎么修改矩阵中的某些元素 或者简单点说保留矩阵中的元素...
- p12文件和mobileprovision文件导入到xcode如何使用
- Arm中国换帅风波始末 | 钛媒体深度
- FS4066耐高压1到4节内置MOS的锂电池充电管理芯片
- 美国苹果股价走势图(抢先看美股三大指数新动态)
- 辛弃疾最经典的10首词
- 微信小程序开发之——Video
- 在Python中输入汉字以及六个撇
- 阿里云ACP大数据专业认证,值得报名吗?