Spark Master资源调度--worker向master注册
Spark Master资源调度–Worker向Master注册
更多资源
- github: https://github.com/opensourceteams/spark-scala-maven
- csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769
Youtube 视频
- youtube (Spark Master资源调度–Worker向Master注册) https://youtu.be/SFqXaIKt-yI
Bilibili 视频
- bilibili (Spark Master资源调度–Worker向Master注册) https://www.bilibili.com/video/av37442280/
Worker向Master注册
worker发送注册消息(RegisterWorker)
override def onStart() {assert(!registered)logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(host, port, cores, Utils.megabytesToString(memory)))logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")logInfo("Spark home: " + sparkHome)createWorkDir()shuffleService.startIfEnabled()webUi = new WorkerWebUI(this, workDir, webUiPort)webUi.bind()val scheme = if (webUi.sslOptions.enabled) "https" else "http"workerWebUiUrl = s"$scheme://$publicAddress:${webUi.boundPort}"registerWithMaster()metricsSystem.registerSource(workerSource)metricsSystem.start()// Attach the worker metrics servlet handler to the web ui after the metrics system is started.metricsSystem.getServletHandlers.foreach(webUi.attachHandler)}
private def registerWithMaster() {// onDisconnected may be triggered multiple times, so don't attempt registration// if there are outstanding registration attempts scheduled.registrationRetryTimer match {case None =>registered = falseregisterMasterFutures = tryRegisterAllMasters()connectionAttemptCount = 0registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {Option(self).foreach(_.send(ReregisterWithMaster))}},INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,TimeUnit.SECONDS))case Some(_) =>logInfo("Not spawning another attempt to register with the master, since there is an" +" attempt scheduled already.")}}
private def tryRegisterAllMasters(): Array[JFuture[_]] = {masterRpcAddresses.map { masterAddress =>registerMasterThreadPool.submit(new Runnable {override def run(): Unit = {try {logInfo("Connecting to master " + masterAddress + "...")val masterEndpoint =rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)registerWithMaster(masterEndpoint)} catch {case ie: InterruptedException => // Cancelledcase NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)}}})}}
private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(workerId, host, port, self, cores, memory, workerWebUiUrl)).onComplete {// This is a very fast action so we can use "ThreadUtils.sameThread"case Success(msg) =>Utils.tryLogNonFatalError {handleRegisterResponse(msg)}case Failure(e) =>logError(s"Cannot register with master: ${masterEndpoint.address}", e)System.exit(1)}(ThreadUtils.sameThread)}
Master处理Worker的注册消息
receiveAndReply接收消息
- 在master上new WorkerInfo信息
- WorkerInfo信息注册到master上(内存中)
- 把WorkerInfo信息保存到master的存储引擎中
- 给Worker发送消息: RegisteredWorker
- 调用master的资源调试方法,一般在worker启动时,此时还没有新的作业提交,所以此时资源调度是没有实际分配的
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {case RegisterWorker(id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => {logInfo("Registering worker %s:%d with %d cores, %s RAM".format(workerHost, workerPort, cores, Utils.megabytesToString(memory)))if (state == RecoveryState.STANDBY) {context.reply(MasterInStandby)} else if (idToWorker.contains(id)) {context.reply(RegisterWorkerFailed("Duplicate worker ID"))} else {val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,workerRef, workerWebUiUrl)if (registerWorker(worker)) {persistenceEngine.addWorker(worker)context.reply(RegisteredWorker(self, masterWebUiUrl))schedule()} else {val workerAddress = worker.endpoint.addresslogWarning("Worker registration failed. Attempted to re-register worker at same " +"address: " + workerAddress)context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "+ workerAddress))}}}
private def registerWorker(worker: WorkerInfo): Boolean = {// There may be one or more refs to dead workers on this same node (w/ different ID's),// remove them.workers.filter { w =>(w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)}.foreach { w =>workers -= w}val workerAddress = worker.endpoint.addressif (addressToWorker.contains(workerAddress)) {val oldWorker = addressToWorker(workerAddress)if (oldWorker.state == WorkerState.UNKNOWN) {// A worker registering from UNKNOWN implies that the worker was restarted during recovery.// The old worker must thus be dead, so we will remove it and accept the new worker.removeWorker(oldWorker)} else {logInfo("Attempted to re-register worker at same address: " + workerAddress)return false}}workers += workeridToWorker(worker.id) = workeraddressToWorker(workerAddress) = workertrue}
Spark Master资源调度--worker向master注册相关推荐
- Spark源码分析之Master状态改变处理机制原理
一Master故障挥着宕机,可能触发新的Master选举 当重新选择Leader的时候,会进行集群的恢复,在恢复的过程中,就会向Worker和AppClient发送MasterChanged消息. p ...
- Scala基于Akka模拟Spark Master Worker进程间通信(一):Worker向Master注册
最终效果 master: worker: 思路分析 Master代码 package cn.zxl.spark.masterimport akka.actor.{Actor, ActorRef, Ac ...
- Spark源码分析之Master注册机制原理
一 Worker向Master注册 1.1 Worker启动,调用registerWithMaster,向Master注册 当worker启动的时候,会调用registerWithMaster方法 # ...
- Spark源码分析之Master主备切换机制
Master作为Spark standalone模式的核心,如果Master出现异常,那么集群就不能正常工作.所以Spark会从Standby中选择一个节点作为Master. Spark支持以下几种策 ...
- Spark源码分析之Master启动和通信机制
Master主要就是用于管理集群,负责资源的调度什么的.它继承了ThreadSafeRpcEndpoint和LeaderElectable,由于继承ThreadSafeRpcEndpoint,所以Ma ...
- Scal:Master和worker之间的通信
** Scala编程实战 ** 1. 课程目标 1.1. 目标:熟练使用Scala编写程序 2. 项目概述 2.1. 需求 目前大多数的分布式架构底层通信都是通过RPC实现的,RPC框架非常多,比如前 ...
- Akka编写一个RPC框架,模拟多个Worker连接Master的情况的案例
使用Akka编写一个RPC框架,实现Master与多个Worker之间的通信.流程图如下: 编写Pom文件,Pom文件的代码如下: <?xml version="1.0" e ...
- 使用 Akka 实现 Master 与 Worker 之间的通信
MessageProtocol.scala package top.gldwolf.scala.akkademo.sparkmasterandworker.common/*** @author: Gl ...
- Apache DolphinScheduler v2.0.1 Master 和 Worker 执行流程分析系列(三)
点亮 ⭐️ Star · 照亮开源之路 https://github.com/apache/dolphinscheduler 这是一系列关于 DolphinScheduler v2.0.1的源码分析文 ...
最新文章
- video/audio在ios/android上播放兼容
- c语言删除文件中的数据_第20问:删除了数据文件,该往哪个方向逃跑
- 亚马逊给创业者5条建议:开会杜绝PPT
- jvm 助记符_您的JVM是否泄漏文件描述符-像我的一样?
- bootstrap验证 2021-04-21
- Centos 安装 Oracle Java JDK
- kafka partition分配_【kafka】消费者对应的分配partition分区策略
- j2me模拟器qq2007_如何在J2ME中创建MIDlet
- alisql mysql_AliSQL 5.6.32 vs MySQL 5.7.15抢鲜测试
- Hash 表的时间复杂度为什么是 O(1)(面试版)
- 合泰单片机驱动步进电机程序
- TP礼物钻石投票评选男神女神萌娃商家投票系统源码简介下载
- 计算程序运行时间,并将毫秒换算成人看得懂的文字,展示形式为时分秒
- 树莓派运行yolo fastest启用bf16加速
- CRC16-IBM/MAXIM/USB/MODBUS/CCITT/CCITT-FALSE/X25/XMODEM查表算法
- 内存卡打不开提示格式化?数据恢复怎么弄?
- 日内趋势交易的操盘法
- 零知识证明笔记The 9th BIU Winter School on Cryptography
- 包装成悲伤消费的骗局正在收割午夜的年轻人
- OpenJudge NOI 1.5编程基础之循环控制(31-40题)C++ 解题思路
热门文章
- Lynis – 用于Linux服务器的自动安全审计工具
- 流量分析的瑞士军刀:Zeek
- 博科光交机SNMP配置
- CentOS7中密码登录失败锁定设置
- Ubuntu 16.04下使用apt 搭建 ELK
- 扫盲——敏捷开发 Agile development 之 Scrum开发
- pycharm中不能安装bs4的解决方案
- centos 解决“不在 sudoers 文件中。此事将被报告“的问题
- [转]解决mySQL占用内存超大问题
- Eclipse中Errors occurred during the build最简单的解决方法