Spark Master资源调度–Worker向Master注册

更多资源

  • github: https://github.com/opensourceteams/spark-scala-maven
  • csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769

Youtube 视频

  • youtube (Spark Master资源调度–Worker向Master注册) https://youtu.be/SFqXaIKt-yI​

Bilibili 视频

  • bilibili (Spark Master资源调度–Worker向Master注册) https://www.bilibili.com/video/av37442280/

Worker向Master注册

worker发送注册消息(RegisterWorker)

 override def onStart() {assert(!registered)logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(host, port, cores, Utils.megabytesToString(memory)))logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")logInfo("Spark home: " + sparkHome)createWorkDir()shuffleService.startIfEnabled()webUi = new WorkerWebUI(this, workDir, webUiPort)webUi.bind()val scheme = if (webUi.sslOptions.enabled) "https" else "http"workerWebUiUrl = s"$scheme://$publicAddress:${webUi.boundPort}"registerWithMaster()metricsSystem.registerSource(workerSource)metricsSystem.start()// Attach the worker metrics servlet handler to the web ui after the metrics system is started.metricsSystem.getServletHandlers.foreach(webUi.attachHandler)}
  private def registerWithMaster() {// onDisconnected may be triggered multiple times, so don't attempt registration// if there are outstanding registration attempts scheduled.registrationRetryTimer match {case None =>registered = falseregisterMasterFutures = tryRegisterAllMasters()connectionAttemptCount = 0registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {Option(self).foreach(_.send(ReregisterWithMaster))}},INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,TimeUnit.SECONDS))case Some(_) =>logInfo("Not spawning another attempt to register with the master, since there is an" +" attempt scheduled already.")}}
  private def tryRegisterAllMasters(): Array[JFuture[_]] = {masterRpcAddresses.map { masterAddress =>registerMasterThreadPool.submit(new Runnable {override def run(): Unit = {try {logInfo("Connecting to master " + masterAddress + "...")val masterEndpoint =rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)registerWithMaster(masterEndpoint)} catch {case ie: InterruptedException => // Cancelledcase NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)}}})}}
  private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(workerId, host, port, self, cores, memory, workerWebUiUrl)).onComplete {// This is a very fast action so we can use "ThreadUtils.sameThread"case Success(msg) =>Utils.tryLogNonFatalError {handleRegisterResponse(msg)}case Failure(e) =>logError(s"Cannot register with master: ${masterEndpoint.address}", e)System.exit(1)}(ThreadUtils.sameThread)}

Master处理Worker的注册消息

receiveAndReply接收消息

  • 在master上new WorkerInfo信息
  • WorkerInfo信息注册到master上(内存中)
  • 把WorkerInfo信息保存到master的存储引擎中
  • 给Worker发送消息: RegisteredWorker
  • 调用master的资源调试方法,一般在worker启动时,此时还没有新的作业提交,所以此时资源调度是没有实际分配的
  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {case RegisterWorker(id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => {logInfo("Registering worker %s:%d with %d cores, %s RAM".format(workerHost, workerPort, cores, Utils.megabytesToString(memory)))if (state == RecoveryState.STANDBY) {context.reply(MasterInStandby)} else if (idToWorker.contains(id)) {context.reply(RegisterWorkerFailed("Duplicate worker ID"))} else {val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,workerRef, workerWebUiUrl)if (registerWorker(worker)) {persistenceEngine.addWorker(worker)context.reply(RegisteredWorker(self, masterWebUiUrl))schedule()} else {val workerAddress = worker.endpoint.addresslogWarning("Worker registration failed. Attempted to re-register worker at same " +"address: " + workerAddress)context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "+ workerAddress))}}}
private def registerWorker(worker: WorkerInfo): Boolean = {// There may be one or more refs to dead workers on this same node (w/ different ID's),// remove them.workers.filter { w =>(w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)}.foreach { w =>workers -= w}val workerAddress = worker.endpoint.addressif (addressToWorker.contains(workerAddress)) {val oldWorker = addressToWorker(workerAddress)if (oldWorker.state == WorkerState.UNKNOWN) {// A worker registering from UNKNOWN implies that the worker was restarted during recovery.// The old worker must thus be dead, so we will remove it and accept the new worker.removeWorker(oldWorker)} else {logInfo("Attempted to re-register worker at same address: " + workerAddress)return false}}workers += workeridToWorker(worker.id) = workeraddressToWorker(workerAddress) = workertrue}

Spark Master资源调度--worker向master注册相关推荐

  1. Spark源码分析之Master状态改变处理机制原理

    一Master故障挥着宕机,可能触发新的Master选举 当重新选择Leader的时候,会进行集群的恢复,在恢复的过程中,就会向Worker和AppClient发送MasterChanged消息. p ...

  2. Scala基于Akka模拟Spark Master Worker进程间通信(一):Worker向Master注册

    最终效果 master: worker: 思路分析 Master代码 package cn.zxl.spark.masterimport akka.actor.{Actor, ActorRef, Ac ...

  3. Spark源码分析之Master注册机制原理

    一 Worker向Master注册 1.1 Worker启动,调用registerWithMaster,向Master注册 当worker启动的时候,会调用registerWithMaster方法 # ...

  4. Spark源码分析之Master主备切换机制

    Master作为Spark standalone模式的核心,如果Master出现异常,那么集群就不能正常工作.所以Spark会从Standby中选择一个节点作为Master. Spark支持以下几种策 ...

  5. Spark源码分析之Master启动和通信机制

    Master主要就是用于管理集群,负责资源的调度什么的.它继承了ThreadSafeRpcEndpoint和LeaderElectable,由于继承ThreadSafeRpcEndpoint,所以Ma ...

  6. Scal:Master和worker之间的通信

    ** Scala编程实战 ** 1. 课程目标 1.1. 目标:熟练使用Scala编写程序 2. 项目概述 2.1. 需求 目前大多数的分布式架构底层通信都是通过RPC实现的,RPC框架非常多,比如前 ...

  7. Akka编写一个RPC框架,模拟多个Worker连接Master的情况的案例

    使用Akka编写一个RPC框架,实现Master与多个Worker之间的通信.流程图如下: 编写Pom文件,Pom文件的代码如下: <?xml version="1.0" e ...

  8. 使用 Akka 实现 Master 与 Worker 之间的通信

    MessageProtocol.scala package top.gldwolf.scala.akkademo.sparkmasterandworker.common/*** @author: Gl ...

  9. Apache DolphinScheduler v2.0.1 Master 和 Worker 执行流程分析系列(三)

    点亮 ⭐️ Star · 照亮开源之路 https://github.com/apache/dolphinscheduler 这是一系列关于 DolphinScheduler v2.0.1的源码分析文 ...

最新文章

  1. video/audio在ios/android上播放兼容
  2. c语言删除文件中的数据_第20问:删除了数据文件,该往哪个方向逃跑
  3. 亚马逊给创业者5条建议:开会杜绝PPT
  4. jvm 助记符_您的JVM是否泄漏文件描述符-像我的一样?
  5. bootstrap验证 2021-04-21
  6. Centos 安装 Oracle Java JDK
  7. kafka partition分配_【kafka】消费者对应的分配partition分区策略
  8. j2me模拟器qq2007_如何在J2ME中创建MIDlet
  9. alisql mysql_AliSQL 5.6.32 vs MySQL 5.7.15抢鲜测试
  10. Hash 表的时间复杂度为什么是 O(1)(面试版)
  11. 合泰单片机驱动步进电机程序
  12. TP礼物钻石投票评选男神女神萌娃商家投票系统源码简介下载
  13. 计算程序运行时间,并将毫秒换算成人看得懂的文字,展示形式为时分秒
  14. 树莓派运行yolo fastest启用bf16加速
  15. CRC16-IBM/MAXIM/USB/MODBUS/CCITT/CCITT-FALSE/X25/XMODEM查表算法
  16. 内存卡打不开提示格式化?数据恢复怎么弄?
  17. 日内趋势交易的操盘法
  18. 零知识证明笔记The 9th BIU Winter School on Cryptography
  19. 包装成悲伤消费的骗局正在收割午夜的年轻人
  20. OpenJudge NOI 1.5编程基础之循环控制(31-40题)C++ 解题思路

热门文章

  1. Lynis – 用于Linux服务器的自动安全审计工具
  2. 流量分析的瑞士军刀:Zeek
  3. 博科光交机SNMP配置
  4. CentOS7中密码登录失败锁定设置
  5. Ubuntu 16.04下使用apt 搭建 ELK
  6. 扫盲——敏捷开发 Agile development 之 Scrum开发
  7. pycharm中不能安装bs4的解决方案
  8. centos 解决“不在 sudoers 文件中。此事将被报告“的问题
  9. [转]解决mySQL占用内存超大问题
  10. Eclipse中Errors occurred during the build最简单的解决方法