Scala并发编程之 Akka

  • 概述
    • Akka通信过程
    • Actor Path
  • 入门案例
  • 定时任务案例
  • 两个进程之间的通信案例
  • 简易版 Spark通信框架实现案例

概述

  • Akka是使用 Scala开发的库, 它是基于事件驱动的, 用于构建高并发项目的工具包
  • Akka特性:
  1. 提供基于异步非阻塞, 高性能的事件驱动编程模型
  2. 内置容错机制, 允许 Actor出错时, 进行恢复或重置
  3. 轻量级的事件处理(每 GB堆内存几百万 Actor. *轻量级事件处理和重量级的划分, 主要看是否依赖操作系统和硬件, 依赖是重量级, 不依赖是轻量级
  4. 可在单机上构建高并发应用, 也可在网络中构建分布式应用

Akka通信过程

  1. 学生创建一个 ActorSystem
  2. 通过 ActorSystem来创建一个 ActorRef(老师的引用), 并将消息发送给 ActorRef
  3. ActorRef将消息发送给 Message Dispatcher(消息分发器)
  4. Message Dispatcher将消息按照顺序保存到目标 Actor的 MailBox中
  5. Message Dispatcher将 MailBox放到一个线程中
  6. MailBox按照顺序取出消息, 最终将它递给 TeacherActor接收的方法中

API介绍:

  • ActorSystem: 负责创建和监督 Actor
  1. ActorSystem是一个单例对象, 通过它可创建很多 Actor
  2. 使用 context.system可以获取到管理该 Actor的 ActorSystem的引用
  • 实现 Actor类
  1. 定义类或单例对象继承 Actor(import akka.actor.Actor)
  2. 实现 receive方法接收消息(无需加 loop& react方法)
  3. 可以实现 preStart()方法(可选), 该方法在 Actor对象构建后执行, 在 Actor生命周期中仅执行一次
  • 加载 Actor
  1. 要创建 Akka的 Actor对象, 必须先创建 ActorSystem
  2. 调用 ActorSystem.actorOf(Props(Actor对象), “Actor名称”)来加载 Actor

Actor Path

  • 每一个 Actor都有一个 Path, 这个路径可以被外部引用
类型 路径
本地 Actor akka://actorSystem名称/user/Actor名称
远程 Actor akka.tcp://dest-actorSystem@ip地址:port/user/Actor名称

入门案例

  • 通过 ActorSystem加载两个 Actor(SenderActor& ReceiverActor), 并从 SenderActor发消息, 在 ReceiverActor接收, 再回复消息

package com.akka.ex1/*** 提交任务的消息格式** @param msg 发送信息*/
case class SubmitTaskMessage(msg: String)/*** 提交任务成功后的回执信息的格式** @param msg 回执信息*/
case class SuccessSubmitTaskMessage(msg: String)package com.akka.ex1import akka.actor.Actorobject SenderActor extends Actor {override def receive: Receive = {// 接收 Entrance发的消息: startcase "start" => {println("SenderActor, received: start!")// 获取 ReceiverActor的路径val receiverActor = context.actorSelection("akka://actorSystem/user/receiverActor")// 给 ReceiverActor发送消息receiverActor ! SubmitTaskMessage("Hello ReceiverActor!, This is SenderActor.")}// 接收 ReceiverActor返回的回执信息case SuccessSubmitTaskMessage(msg) => println(s"SenderActor, received: ${msg}")}
}package com.akka.ex1import akka.actor.Actorobject ReceiverActor extends Actor {override def receive: Receive = {// 接收 SenderActor发的消息case SubmitTaskMessage(msg) => {println(s"ReceiverActor, received: ${msg}")// 给 SenderActor回复信息sender ! SuccessSubmitTaskMessage("Hi!, This is ReceiverActor.")}}
}package com.akka.ex1import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactoryobject Entrance {def main(args: Array[String]): Unit = {/*** 创建ActorSystem, 用来负责创建和监督 Actor** @param name : scala.Predef.String 给 ActorSystem设置名字* @param config : com.typesafe.config.Config 配置环境*/val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())/*** 通过 ActorSystem来加载自定义 Actor对象** @param props : akka.actor.Props 指定要管理的 Actor伴生对象* @param name : scala.Predef.String 给指定 Actor对象设置名称*/val senderActor = actorSystem.actorOf(Props(SenderActor), "senderActor")// 必须给每个 Actor设置名称, 否则无法从 SenderActor内部通过 context.actorSelection方式, 获得 ReceiverActor的对象// 而会提示 Actor[akka://actorSystem/user/receiverActor] was not deliveredactorSystem.actorOf(Props(ReceiverActor), "receiverActor")// 给 SenderActor发送 "start"字符串senderActor ! "start"}
}
SenderActor, received: start!
ReceiverActor, received: Hello ReceiverActor!, This is SenderActor.
SenderActor, received: Hi!, This is ReceiverActor.

定时任务案例

  • 通过 ActorSystem.scheduler.schedule()方法, 启动定时任务
  • 使用方式 1:
final def schedule(initialDelay : FiniteDuration, // 首次开始, 按此设定的时间, 延迟后执行interval : FiniteDuration, // 每隔多久执行一次(首次开始, 立马执行, 不延时receiver : ActorRef, // 设置目标接收消息的 Actormessage : Any) // 要发送的消息(implicit executor : ExecutionContext, sender : ActorRef = {}) // 隐式参数, 需导入
  • 使用方式 2:
final def schedule(initialDelay : FiniteDuration, // 首次开始, 按此设定的时间, 延迟后执行interval : FiniteDuration // 每隔多久执行一次(首次开始, 立马执行, 不延时)(f : => Unit) // 定期要执行的函数(消息(implicit executor : ExecutionContext) // 隐式参数, 需导入
package com.akka.ex2import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactoryobject MainActor {object ReceiverActor extends Actor {override def receive: Receive = {case x => println(x)}}def main(args: Array[String]): Unit = {// 创建ActorSystem, 用来负责创建和监督 Actorval actorSystem = ActorSystem("actorSystem", ConfigFactory.load())// 通过 ActorSystem来加载自定义 Actor对象val receiverActor = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")// 导入隐式参数& 转换import actorSystem.dispatcherimport scala.concurrent.duration._// 通过定时器, 定时给 ReceiverActor发送消息// 方式 1: 采用提供的 Any数据类型参数的消息actorSystem.scheduler.schedule(3 seconds, 2 seconds, receiverActor, "Hello ReceiverActor!, 111.")// 方式 2: 采用自定义函数的消息actorSystem.scheduler.schedule(0 seconds, 5 seconds) {receiverActor ! "Hello ReceiverActor!, 222."}}
}

两个进程之间的通信案例

  1. WorkerActor发送 "connect"消息给 MasterActor
  2. MasterActor回复 "success"消息给 WorkerActor
  3. WorkerActor接收并打印接收到的消息
package com.akka.masterimport akka.actor.Actorobject MasterActor extends Actor {override def receive: Receive = {case "setup" => println("MasterActor started!")// 接收 WorkerActor发的消息case "connect" => {println("MasterActor, received: connect!")// 给发送者(WorkerActor)返回的回执信息sender ! "success"}}
}package com.akka.masterimport akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactoryobject Entrance {def main(args: Array[String]): Unit = {val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())val masterActor = actorSystem.actorOf(Props(MasterActor), "masterActor")// 给 MasterActor发送消息masterActor ! "setup"}
}package com.akka.workerimport akka.actor.Actor// WorkerActor的路径: akka.tcp://actorSystem@127.0.0.1:8081/user/workerActor
object WorkerActor extends Actor {override def receive: Receive = {case "setup" => {println("WorkerActor started!")// 远程获取 MasterActorval masterActor = context.system.actorSelection("akka.tcp://actorSystem@127.0.0.1:8080/user/masterActor")// 给 MasterActor发送字符串 connectmasterActor ! "connect"}// 接收 MasterActor发的消息case "success" => println("MasterActor, received: success!")}
}package com.akka.workerimport akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactoryobject Entrance {def main(args: Array[String]): Unit = {val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())val workerActor = actorSystem.actorOf(Props(WorkerActor), "workerActor")// 给 WorkerActor发送消息workerActor ! "setup"}
}

简易版 Spark通信框架实现案例

  • 模拟 Spark的 Master与多个 Worker的通信

  • 运作步骤:
  1. 启动 MasterActor
    1.1 MasterActor对象构建后, 开启定时任务(用于自检, 为移除超时的 WorkerActor

  2. 启动 WorkerActor
    2.1 WorkerActor对象构建后, 将自身信息封装成注册信息后, 发给 MasterActor

  3. MasterActor接收 WorkerActor的注册信息, 并保存
    3.1 给 WorkerActor回执信息

  4. WorkerActor请求注册后, 接收到信息, 并打印 Connection is successful!
    4.1 开启定时任务, 给 MasterActor发心跳消息

  5. MasterActor接收 WorkerActor发来的心跳消息, 将该 WorkerActor的注册信息中的最后心跳时间更新为当前时间

工程名 说明
scala-spark-akka-common 存放公共的消息实体类
scala-spark-akka-master Akka Master节点
scala-spark-akka-worker Akka Worker节点
package com.akka.spark.common/*** 用来保存已注册的 WorkerActor的信息的类** @param workerId : WorkerActor的 Id(UUID* @param cpuCores : WorkerActor的 CPU核数* @param memory : WorkerActor 的内存大小* @param lastHeartBeatTime : 最后一次心跳时间*/
case class WorkerInfo(workerId: String, cpuCores: Int, memory: Int, lastHeartBeatTime: Long)package com.akka.spark.common/*** WorkerActor提交注册信息的类** @param workerId : WorkerActor的 Id(UUID* @param cpuCores : WorkerActor的 CPU核数* @param memory : WorkerActor 的内存大小*/
case class WorkerRegisterMessage(workerId: String, cpuCores: Int, memory: Int)/** 注册成功后回执的单例对象*/
case object RegisterSuccessMessage/*** WorkerActor定时触发心跳到 MasterActor的信息类** @param workerId : WorkerActor的 Id(UUID* @param cpuCores : WorkerActor的 CPU核数* @param memory : WorkerActor 的内存大小*/
case class WorkerHeartBeatMessage(workerId: String, cpuCores: Int, memory: Int)package com.akka.spark.masterimport com.typesafe.config.{Config, ConfigFactory}// 用来读取配置文件信息的类
object ConfigUtils {// 1. 获取配置文件对象private val config: Config = ConfigFactory.load()// 2. 获取检查 WorkerActor心跳的时间间隔val `master.check.heartbeat.interval` = config.getInt("master.check.heartbeat.interval")// 3. 获取 WorkerActor心跳超时时间val `master.check.heartbeat.timeout` = config.getInt("master.check.heartbeat.timeout")
}package com.akka.spark.masterimport akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactoryobject Master {def main(args: Array[String]): Unit = {val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())actorSystem.actorOf(Props(MasterActor), "masterActor")}
}package com.akka.spark.masterimport java.util.Date
import akka.actor.Actor
import com.akka.spark.common.{RegisterSuccessMessage, WorkerHeartBeatMessage, WorkerInfo, WorkerRegisterMessage}object MasterActor extends Actor {// 1. 定义一个可变的 Map集合, 用来保存已注册的 WorkerActor信息private val regWorkerMap = scala.collection.mutable.Map[String, WorkerInfo]()// MasterActor定期检查 WorkerActor心跳, 将超时的 Worker移除override def preStart(): Unit = {// 1. 导入时间隐式参数& 转换import context.dispatcherimport scala.concurrent.duration._// 2. 启动定时任务(MasterActor自检移除超时的 WorkerActor)context.system.scheduler.schedule(0 seconds, ConfigUtils.`master.check.heartbeat.interval` seconds) {// 3. 过滤超时的 WorkerActor(返回被过滤的 workerId集合val timeOutWorkerMap = regWorkerMap.filter {keyVal => // keyVal的数据格式: workerId -> WorkerInfo(workerId, cpuCores, memory, lastHeartBeatTime)// 3.1 获取当前 WorkerActor对象的最后一次心跳时间val lastHeartBeatTime = keyVal._2.lastHeartBeatTime// 3.2 如果超时, 则 true, 否则 false (当前时间 - 最后一次心跳时间) > 最大超时时间 * 1000if ((new Date().getTime - lastHeartBeatTime) > (ConfigUtils.`master.check.heartbeat.timeout` * 1000)) true else false}// 4. 将要移除的已超时 workerId集合if (!timeOutWorkerMap.isEmpty) {regWorkerMap --= timeOutWorkerMap.map(_._1) // ArrayBuffer(5b9feb50-5c33-496b-a325-dd168360281b)}// 5. 有效的 WorkerActor, 按内存大小进行降序排列val workerList = regWorkerMap.map(_._2).toList.sortBy(_.memory).reverseprintln(s"Active WorkerActors: ${workerList}")}}override def receive: Receive = {// 接收 WorkerActor的注册信息case WorkerRegisterMessage(workerId, cpuCores, memory) => {// 打印接收到的注册信息println(s"MasterActor, received info: ${workerId}, ${cpuCores}, ${memory}")// 将注册信息保存到哈希表中& 并记录最后一次心跳时间regWorkerMap += workerId -> WorkerInfo(workerId, cpuCores, memory, new Date().getTime)// 给注册的 WorkerActor回执信息sender ! RegisterSuccessMessage}// 接收 WorkerActor发来的心跳消息case WorkerHeartBeatMessage(workerId, cpuCores, memory) => {println(s"MasterActor, received heartbeat: ${workerId}")// 更新指定的 WorkerActor对象的最后一次心跳时间regWorkerMap += workerId -> WorkerInfo(workerId, cpuCores, memory, new Date().getTime)}}
}package com.akka.spark.workerimport com.typesafe.config.{Config, ConfigFactory}// 用来读取配置文件信息的类
object ConfigUtils {// 1. 获取配置文件对象private val config: Config = ConfigFactory.load()// 2. 获取 WorkerActor心跳的间隔时间val `worker.heartbeat.interval`: Int = config.getInt("worker.heartbeat.interval")
}package com.akka.spark.workerimport akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactoryobject Worker {def main(args: Array[String]): Unit = {val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())actorSystem.actorOf(Props(WorkerActor), "workerActor")}
}package com.akka.spark.workerimport java.util.UUID
import akka.actor.{Actor, ActorSelection}
import com.akka.spark.common.{RegisterSuccessMessage, WorkerHeartBeatMessage, WorkerRegisterMessage}
import scala.util.Randomobject WorkerActor extends Actor {// 表示 MasterActor的引用private var masterActor: ActorSelection = _// WorkerActor的注册信息private var workerId: String = _private var cpuCores: Int = _ // CPU核数private var memory: Int = _ // 内存大小private val cpuCoreList = List(1, 2, 3, 4, 6, 8) // CPU核心数的随机取值范围private val memoryList = List(512, 1024, 2048, 4096) // 内存大小的随机取值范围override def preStart(): Unit = {// 获取 MasterActor的引用masterActor = context.system.actorSelection("akka.tcp://actorSystem@127.0.0.1:8080/user/masterActor")// 随机设置编号workerId = UUID.randomUUID().toString// 随机选择 WorkerActor的 CPU核数和内存大小val r = new Random()cpuCores = cpuCoreList(r.nextInt(cpuCoreList.length))memory = memoryList(r.nextInt(memoryList.length))// 封装 WorkerActor的注册信息val registerMessage = WorkerRegisterMessage(workerId, cpuCores, memory)// 发给 MasterActormasterActor ! registerMessage}override def receive: Receive = {// 注册成功的回执信息case RegisterSuccessMessage => {println("Connection is successful!")// 1. 导入时间隐式参数& 转换import context.dispatcherimport scala.concurrent.duration._// 定时给 MasterActor发心跳消息context.system.scheduler.schedule(0 seconds, ConfigUtils.`worker.heartbeat.interval` seconds) {masterActor ! WorkerHeartBeatMessage(workerId, cpuCores, memory)}}}
}

如果您觉得有帮助,欢迎点赞哦 ~ 谢谢!!

Scala并发编程(二)之 Akka相关推荐

  1. Scala入门到精通——第二十六节 Scala并发编程基础

    本节主要内容 Scala并发编程简介 Scala Actor并发编程模型 react模型 Actor的几种状态 Actor深入使用解析 1. Scala并发编程简介 2003 年,Herb Sutte ...

  2. 【并发编程二】c++创建子进程CreateProcess()

    [并发编程二]c++创建子进程CreateProcess() 一.创建子进程 二.demo 三.构建.编译.运行 四.相关知识介绍 1.CreateProcess 参数介绍 1.1.lpApplica ...

  3. 【并发编程二十】协程(coroutine)_协程库

    [并发编程二十]协程(coroutine) 一.线程的缺点 二.协程 三.优点 四.个人理解 五.协程库 1.window系统 2.unix系统(包括linux的各个版本) 2.1.makeconte ...

  4. 15. Scala并发编程模型Akka

    15.1 Akka介绍 1) Akka是Java虚拟机JVM平台上构建高并发.分布式和容错应用的工具包和运行时,可以理解成Akka是编写并发程序的框架 2) Akka用Scala语言写成,同时提供了S ...

  5. Java 并发编程(二):如何保证共享变量的原子性?

    线程安全性是我们在进行 Java 并发编程的时候必须要先考虑清楚的一个问题.这个类在单线程环境下是没有问题的,那么我们就能确保它在多线程并发的情况下表现出正确的行为吗? 我这个人,在没有副业之前,一心 ...

  6. Java并发编程(二十三)------并发设计模式之生产者消费者模式

    参考文章:Java实现生产者消费者问题与读者写者问题 目录 1. 生产者消费者问题 1.1 wait() / notify()方法 1.2 await() / signal()方法 1.2.1 对sy ...

  7. JUC并发编程二 并发架构--线程运行原理

    sleep与yield的区别 两阶段终止模式

  8. java面试-Java并发编程(二)——重排序

    当我们写一个单线程程序时,总以为计算机会一行行地运行代码,然而事实并非如此. 什么是重排序? 重排序指的是编译器.处理器在不改变程序执行结果的前提下,重新排列指令的执行顺序,以达到最佳的运行效率. 重 ...

  9. java并发编程(二十六)——单例模式的双重检查锁模式为什么必须加 volatile?

    前言 本文我们从一个问题出发来进行探究关于volatile的应用. 问题:单例模式的双重检查锁模式为什么必须加 volatile? 什么是单例模式 单例模式指的是,保证一个类只有一个实例,并且提供一个 ...

最新文章

  1. shell脚本教学进阶——Linux三大文本处理工具之grep
  2. 开源的javascript实现页面打印功能,兼容所有的浏览器(情况属实)
  3. 汇编 db,dw,dd
  4. 国内外软件开发上的差距与分析
  5. kafka直连方式消费多个topic
  6. linux重启网络服务_vm上linux虚拟机NAT模式配置
  7. 8种最坑的SQL错误用法,你有没有踩过坑?
  8. 基于Visual C++2013拆解世界五百强面试题--题8-数组的排序和查找
  9. Unix文件系统基本结构
  10. python读取二进制文件_Python读写二进制文件
  11. 人民币大写转换 java_java人民币转大写中文
  12. Vue项目使用SSR服务器渲染
  13. git命令 之 切糕大全
  14. 刷脸支付便利更好推动普惠金融的落地
  15. oracle表数据导出成unl文件,oracle的文本导入、导出技巧
  16. 关于QQ开心农场外挂 开发
  17. 使用jira管理Scrum敏捷项目实战(四)jira自定义电子看板、敏捷看板、KANBAN配置
  18. 【李佳辉_周报_2022.10.30】
  19. linux寄存器位运算,位运算的一些操作
  20. 能被2、3、4、5、6、7、8、9等数整除的数的特征

热门文章

  1. 开博记录-二零一捌零柒二捌
  2. xbmc_如何在XBMC上获取Hulu和Amazon视频
  3. Unity(设置射线检测对象)
  4. html怎么引入多个字体文件,css – 如何为同一个字体添加多个字体文件?
  5. office2013卸载
  6. Failed to connect to github.com port 443 after 21080 ms
  7. 如何在软件开发团队中进行有效沟通
  8. 说走就走的「Windows」—— Windows To Go 制作详解
  9. Node.js联机游戏——gobang五子棋(客户端+服务端+websocket的双人游戏)
  10. 14.(地图工具篇)ArcMap点图层(shape图层)转Geojson