最终效果

master:

worker:

思路分析


Master代码

package cn.zxl.spark.masterimport akka.actor.{Actor, ActorRef, ActorSystem, Props}
import cn.zxl.spark.common.{RegisterWorkerInfo, RegisteredWorkerInfo, WorkerInfo}
import com.typesafe.config.ConfigFactoryimport scala.collection.mutable/*** @description:* @author: zhangxueliang* @create: 2021-05-29 16:37* @version: 1.0* */
class SparkMaster extends Actor {//定义hashmap。管理workersprivate val workers: mutable.Map[String, WorkerInfo] = mutable.Map[String, WorkerInfo]()override def receive: Receive = {case "start" => println("Spark Master服务器启动了!")case RegisterWorkerInfo(id, cpu, ram) => {//接收到worker注册信息if (!workers.contains(id)) {//创建WorkerInfo对象val workerInfo = new WorkerInfo(id, cpu, ram)//加入到workersworkers += (id -> workerInfo)println("服务器当前的workers="+workers)sender() ! RegisteredWorkerInfo}}}
}object SparkMaster {def main(args: Array[String]): Unit = {//先创建ActorSystemval config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=127.0.0.1|akka.remote.netty.tcp.port=10005
""".stripMargin)val sparkMasterSystem: ActorSystem = ActorSystem("SparkMaster", config)//创建SparkMaster  actorval sparkMasterRef: ActorRef = sparkMasterSystem.actorOf(Props[SparkMaster], "SparkMaster-01")//启动SparkMastersparkMasterRef ! "start"}
}

启动起来,验证下端口号,是否正常启动!

查看端口号:

Worker代码

package cn.zxl.spark.workerimport akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import cn.zxl.spark.common.{RegisterWorkerInfo, RegisteredWorkerInfo}
import com.typesafe.config.ConfigFactory/*** @description:* @author: zhangxueliang* @create: 2021-05-29 16:47* @version: 1.0* */
class SparkWorker(masterHost: String, masterPort: Int) extends Actor {//masterProxy是Master的代理/引用var masterProxy: ActorSelection = _//随机生成一个IDprivate val id: String = java.util.UUID.randomUUID().toStringoverride def preStart(): Unit = {println("preStart()调用")//初始化masterProxymasterProxy = context.actorSelection(s"akka.tcp://SparkMaster@${masterHost}:${masterPort}/user/SparkMaster-01")println("masterProxy=" + masterProxy)}override def receive: Receive = {case "start" => {println("worker启动了")masterProxy ! RegisterWorkerInfo(id, 16, 16 * 1024)}case RegisteredWorkerInfo => {println("workerid=" + id + "注册成功")}}
}object SparkWorker {def main(args: Array[String]): Unit = {val workerHost = "127.0.0.1"val workerPort = 10001val masterHost = "127.0.0.1"val masterPort = 10005val config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=127.0.0.1|akka.remote.netty.tcp.port=10002""".stripMargin)//创建ActorSystemval sparkWorkerSystem: ActorSystem = ActorSystem("SparkWorker", config)//创建SparkWorker的引用/代理val sparkWorkerRef: ActorRef = sparkWorkerSystem.actorOf(Props(new SparkWorker(masterHost, masterPort)), "SparkWorker-01")sparkWorkerRef ! "start"}
}

RegisterWorkerInfo样例类

package cn.zxl.spark.common/*** @description:* @author: zhangxueliang* @create: 2021-05-29 17:12* @version: 1.0* */
//worker注册信息
case class RegisterWorkerInfo(id:String,cpu:Int,ram:Int)
//这个是WorkerInfo 此信息是用老保存到master的hashmap(管理worker)
//将来这个workerInfo会扩展(如扩展worker上一次的心跳时间)
class WorkerInfo(val id:String,val cpu:Int,val ram:Int)
//当worker注册成功,服务器返回 RegisteredWorkerInfo 对象
case object RegisteredWorkerInfo

pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zxl</groupId><artifactId>scala-demo</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>scala-demo</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><encoding>UTF-8</encoding><scala.version>2.11.12</scala.version><scala.compat.version>2.11</scala.compat.version><akka.version>2.5.12</akka.version><scala.actors.version>2.10.0-M6</scala.actors.version></properties><dependencies><!--scala 2.11起过时 无法使用--><!--<dependency><groupId>org.scala-lang</groupId><artifactId>scala-actors</artifactId><version>${scala.actors.version}</version></dependency>--><!--akka actor依赖--><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_${scala.compat.version}</artifactId><version>${akka.version}</version></dependency><!--多进程之间的Actor通信--><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_${scala.compat.version}</artifactId><version>${akka.version}</version></dependency></dependencies><build><!--指定源码包和测试包的位置--><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><!-- 该插件用于将 Scala 代码编译成 class 文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><!-- 声明绑定到 maven 的 compile 阶段 --><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_depencencies</arg></args></configuration></execution></executions></plugin><!--maven打包的插件--><!--            maven-assembly-plugin 和 maven-shade-plugin都是打包插件。遇到同名文件assembly是覆盖,shade是追加。所以此处选择shade插件--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer><!--指定main方法--><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>xxx</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
</project>

Scala基于Akka模拟Spark Master Worker进程间通信(一):Worker向Master注册相关推荐

  1. Scala基于Akka模拟Spark Master Worker进程间通信(二):Worker定时向Master心跳

    最终效果 Master package cn.zxl.spark.masterimport akka.actor.{Actor, ActorRef, ActorSystem, Props} impor ...

  2. Storm通信机制,Worker进程间通信,Worker进程间通信分析,Worker进程间技术(Netty、ZeroMQ),Worker 内部通信技术(Disruptor)(来自学习资料)

    Storm通信机制 Worker间的通信经常需要通过网络跨节点进行,Storm使用ZeroMQ或Netty(0.9以后默认使用)作为进程间通信的消息框架. Worker进程内部通信:不同worker的 ...

  3. 基于Kubernetes的Spark部署完全指南

    基于Kubernetes的Spark部署完全指南 [编者的话]本文是在Kubernets上搭建Spark集群的操作指南,同时提供了Spark测试任务及相关的测试数据,通过阅读本文,你可以实践从制作Sp ...

  4. Akka(二):使用Akka模拟yarn

    1.样例         使用akka来模拟yarn集群的通信.流程图如下: 完整代码如下: MyResourceManager.scala import akka.actor.{Actor, Act ...

  5. 基于Hadoop安装spark集群

    基于Hadoop的spark环境搭建 已有环境情况 Hadoop HA Java 软件版本 Hadoop 2.7.2 Java 1.8.0_301 Scala 2.11.8 Spark 2.1.0 下 ...

  6. 命令行中只用scala来运行一个spark应用

    由于intellij十分消耗内存, 并且在概念上来讲,scala导入jar包应该和在集成开发环境中导入jar包是等效的. 所以我想,能否纯命令行,不用spark-submit的情况下来运行呢? 折腾了 ...

  7. 基于Zookeeper的Spark HA配置说明

    默认情况下在Spark standalone集群中进行计算时,由于是RDD的计算模型,所以可以认为worker 已经是有HA特性的了,但是负责资源调度的Master节点有可能出现单点故障.所以为了保证 ...

  8. scala akka_如何对Scala和Akka HTTP应用程序进行Docker化-简单的方法

    scala akka by Miguel Lopez 由Miguel Lopez 如何对Scala和Akka HTTP应用程序进行Docker化-简单的方法 (How to Dockerise a S ...

  9. 基于案例贯通 Spark Streaming 流计算框架的运行源码

    本期内容 : Spark Streaming+Spark SQL案例展示 基于案例贯穿Spark Streaming的运行源码 一. 案例代码阐述 : 在线动态计算电商中不同类别中最热门的商品排名,例 ...

最新文章

  1. Vue2.0使用嵌套路由实现页面内容切换/公用一级菜单控制页面内容切换
  2. asp.net ajax1.0基础回顾(六):调用ASPX页面方法
  3. 01_反射_02_反射类的构造方法
  4. [css] 举例说明你对相邻兄弟选择器的理解
  5. C# 中 NPOI 库读写 Excel 文件的方法【摘】
  6. HTML/CSS面试题(收集)
  7. 博图/博途(TIA)V13 V14 V15 V16 软件安装教程,适用于新手的傻瓜式安装方法,强推!!!!
  8. luogu P5336 [THUSC2016]成绩单
  9. A Magic Lamp
  10. 体系结构实验(4)—— Tomasulo算法
  11. 在python中读取文件时如何去除行末的换行符以及在Windows与Linux中的区别
  12. leetcode 183. Customers Who Never Order
  13. 每日三思:微信小程序多层级父子组件如何在子组件滚动加载
  14. webstorm 扩大内存
  15. dis反汇编文件的分析理解
  16. redis热key监控
  17. python 涨停统计_python+tushare获取股票和基金每日涨跌停价格
  18. P2P、消费贷和现金贷的区别
  19. ABBYY Screenshot Reader功能详解
  20. 公理定理定律的区别与联系

热门文章

  1. sympy随笔-python符号计算
  2. 【深度学习】百度:YOLOX和NanoDet都没我优秀!轻量型实时目标检测模型PP-PicoDet开源...
  3. 概率论回顾.pptx
  4. 【论文解读】DeepFM论文总结
  5. 【算法基础】数据结构导论第七章-排序.pptx
  6. 【机器学习基础】一文读懂用于序列标注的条件随机场(CRF)模型
  7. 【NLP】通俗讲解从Transformer到BERT模型!
  8. AAAI2020录用论文汇总(三)
  9. 一文看尽 CVPR2022 最新 22 篇论文(附打包下载)
  10. 2021年中国计算机视觉人才调研开启啦,诚邀各位开发者们参与~