1.样例
        使用akka来模拟yarn集群的通信。流程图如下:

完整代码如下:

MyResourceManager.scala

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactoryimport scala.collection.mutable
import scala.concurrent.duration._class MyResourceManager extends Actor{//将Worker信息存到map集合中val map = new mutable.HashMap[String, WorkerInfo]()override def preStart(): Unit = {println("preStart")import context.dispatchercontext.system.scheduler.schedule(0.millisecond,5000.millisecond,self,CheckTimeOutWorker)}//消息的接受  此函数为偏函数 不需要写matchoverride def receive: Receive = {//做模式匹配case "hello" =>{println("hello")}case CheckTimeOutWorker =>{val deadWorker: Iterable[WorkerInfo] = map.values.filter(x => System.currentTimeMillis() - x.lastHeartbeatTime > 15000)deadWorker.foreach(m =>{map -= m.workerId})println(s"current alive worker size: ${map.size}")}//匹配worker发送来的注册信息case Conf(workerId, memory, cores) => {//      println(s"workerId:$workerId,memory:$memory,cores:$cores")//将Worker的数据存到内存中val workInfo = new WorkerInfo(workerId, memory, cores)map(workerId) = workInfo//给发送者返回一个注册成功的响应sender() ! Confed}//接受Worker的心跳 并更新心跳时间case HeartBeat(workerId) =>   {//根据workerId查找对应的Worker信息if (map.contains(workerId)){val workerInfo: WorkerInfo = map(workerId)//更新Worker心跳的时间 更新到当前时间workerInfo.lastHeartbeatTime = System.currentTimeMillis()}}}
}
object MyResourceManager{def main(args: Array[String]): Unit = {val host = "localhost"val port = 1111val configstr =s"""|akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = $host|akka.remote.netty.tcp.port = $port|""".stripMarginval config = ConfigFactory.parseString(configstr)//创建MasterActorSystem(单例)val masterActorSystem = ActorSystem("MasterActorSystem", config)//通过Master创建Actorval masterActor = masterActorSystem.actorOf(Props[MyResourceManager], "MasterActor")//自己给自己发消息masterActor ! "hello"}
}

MyNodeManager.scala

import java.util.UUIDimport akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactoryimport scala.concurrent.duration._class MyNodeManager extends Actor {var workerId = UUID.randomUUID().toStringvar selection: ActorSelection = _//是在构造方法之后,receive方法之前一定会调用一次override def preStart(): Unit = {//与master建立连接selection = context.actorSelection("akka.tcp://MasterActorSystem@localhost:1111/user/MasterActor")//向master注册信息selection ! Conf(workerId, 2040, 4)}//消息的接受  此函数为偏函数 不需要写matchoverride def receive: Receive = {//做模式匹配case "hello" => {println("hello")}//向Master发送一个心跳case SendHeartBeat => {selection ! HeartBeat(workerId)}//启动定时器case Confed => {//导包 隐式转换import context.dispatchercontext.system.scheduler.schedule(0.millisecond, 10000.millisecond, self, SendHeartBeat)}}
}object MyNodeManager {def main(args: Array[String]): Unit = {val host = "localhost"val port = 2222val configStr =s"""|akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = $host|akka.remote.netty.tcp.port = $port|""".stripMarginval config = ConfigFactory.parseString(configStr)//创建WorkerActorSystemval workerActorSystem = ActorSystem("WorkerActorSysytem", config)//通过WorkActorSystem创建WorkerActorworkerActorSystem.actorOf(Props[MyNodeManager], "WorkerActor")}
}

Message.scala

case class Conf(workerId: String, memory: Int, cores: Int)case object Confedcase class WorkerInfo(workerId: String, memory: Int, cores: Int) {var lastHeartbeatTime: Long = _
}case object SendHeartBeatcase class HeartBeat(workerId: String)case object CheckTimeOutWorker

Akka(二):使用Akka模拟yarn相关推荐

  1. Scala并发编程(二)之 Akka

    Scala并发编程之 Akka 概述 Akka通信过程 Actor Path 入门案例 定时任务案例 两个进程之间的通信案例 简易版 Spark通信框架实现案例 概述 Akka是使用 Scala开发的 ...

  2. 2021年大数据Hadoop(二十七):YARN运行流程

    全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 Yarn运行流程 本系列历史文章 2021年大数据Hado ...

  3. java akka_Akka系列(九):Akka分布式之Akka Remote

    Akka作为一个天生用于构建分布式应用的工具,当然提供了用于分布式组件即Akka Remote,那么我们就来看看如何用Akka Remote以及Akka Serialization来构建分布式应用. ...

  4. php 读取mysql 二维数组_PHP操作 二维数组模拟mysql函数

    PHP操作 二维数组模拟mysql函数 public function monimysqltest(){ $testarray=array( array('ss'=>'1','dd'=>' ...

  5. java模拟微信抢红包金额算法规则二倍均值法模拟(满满的注释)

    二倍均值法模拟微信抢红包金额算法规则 ```java /*** 二倍均值法* @param amount 总金额* @param min 最小金额* @param num 个数* 本帖只提供思路,实际 ...

  6. 硬件电路设计入门之三路二选一模拟开关电路芯片--74HCT4053

    74HCT4053是一款三路二选一模拟开关电路,它的优点是能够通过交流信号, 笔者一般用它切换不同的音频通路或者直流控制电压. 如上图,X0,X1,X为一组:Y0,Y1,Y为一组:Z0,Z1,Z为一组 ...

  7. c# CAD二次开发 模拟CAD移动图形, 通过圆现在注记,改变图形颜色

    c# CAD二次开发 模拟CAD移动图形, 通过圆现在注记,改变图形颜色 using Autodesk.AutoCAD.DatabaseServices; using Autodesk.AutoCAD ...

  8. Java爬虫(二)-- httpClient模拟Http请求+jsoup页面解析

    博客 学院 下载 GitChat TinyMind 论坛 APP 问答 商城 VIP会员 活动 招聘 ITeye 写博客 发Chat 传资源 登录注册 原 Java爬虫(二)-- httpClient ...

  9. python拟合三维平面_tensorflow实现二维平面模拟三维数据教程

    我就废话不多说了,直接上代码吧! #!/bin/bash # -*-coding=utf-8-*- import re import os import sys import numpy as np ...

最新文章

  1. Effective Java 读书笔记----第三章
  2. 请求一个action,将图片的二进制字节字符串在视图页面以图片形式输出
  3. html5画布可以p图,HTML5图像适合发布在画布上
  4. 201703-4 地铁修建
  5. Qt / 如何解决移动了 Qt 的项目,但是 Qt 在原目录下生成编译文件的问题
  6. pandas中的sample方法
  7. java散列法的运用实例,Java HashMap compute() 使用方法及示例
  8. sql 时态表的意义_SQL Server 2016中的时态表的概念和基础
  9. layui form模块
  10. nginx配置注意事项1
  11. 机器学习中应用到的各种距离介绍(附上Matlab代码)
  12. ASP.NET之MVC 微信公众号授权给第三方平台的技术实现流程一(获取第三方平台access_token)...
  13. 致远getshell
  14. LINUX创建桌面运行快捷方式
  15. JAVA练习216-整数的英语表示
  16. 彻底搞懂数据库内连接、外连接
  17. 梦三国服务器维护多久,《梦三国2》天命地图开启时间变更公告
  18. Adobe illustrator/Ai 2019 软件安装包
  19. Access数据库的.ldb文件
  20. Xiuno 开发手册正式发布。

热门文章

  1. 完了!服务器沦为肉鸡了!排查过程!
  2. 基于目标导向行为和空间拓扑记忆的视觉导航方法
  3. Java for循环-水仙花数
  4. [oh-my-zsh] 提示检测到不安全的完成相关目录的问题解决
  5. 单元测试技巧之PowerMock
  6. ubuntu 破解Beyond compare 4
  7. repos install.packages()安装镜像
  8. python的endswith()的用法及实例
  9. 使用IIS部署网站步骤
  10. Promise的作用及用法