Scala并发编程(二)之 Akka
Scala并发编程之 Akka
- 概述
- Akka通信过程
- Actor Path
- 入门案例
- 定时任务案例
- 两个进程之间的通信案例
- 简易版 Spark通信框架实现案例
概述
- Akka是使用 Scala开发的库, 它是基于事件驱动的, 用于构建高并发项目的工具包
- Akka特性:
- 提供基于异步非阻塞, 高性能的事件驱动编程模型
- 内置容错机制, 允许 Actor出错时, 进行恢复或重置
- 轻量级的事件处理(每 GB堆内存几百万 Actor. *
轻量级事件处理和重量级的划分, 主要看是否依赖操作系统和硬件, 依赖是重量级, 不依赖是轻量级
- 可在单机上构建高并发应用, 也可在网络中构建分布式应用
Akka通信过程
- 学生创建一个 ActorSystem
- 通过 ActorSystem来创建一个 ActorRef(老师的引用), 并将消息发送给 ActorRef
- ActorRef将消息发送给 Message Dispatcher(消息分发器)
- Message Dispatcher将消息按照顺序保存到目标 Actor的 MailBox中
- Message Dispatcher将 MailBox放到一个线程中
- MailBox按照顺序取出消息, 最终将它递给 TeacherActor接收的方法中
API介绍:
- ActorSystem: 负责创建和监督 Actor
- ActorSystem是一个单例对象, 通过它可创建很多 Actor
- 使用
context.system
可以获取到管理该 Actor的 ActorSystem的引用
- 实现 Actor类
- 定义类或单例对象继承 Actor(
import akka.actor.Actor
)- 实现 receive方法接收消息(无需加 loop& react方法)
- 可以实现 preStart()方法(可选), 该方法在 Actor对象构建后执行, 在 Actor生命周期中仅执行一次
- 加载 Actor
- 要创建 Akka的 Actor对象, 必须先创建 ActorSystem
- 调用 ActorSystem.actorOf(Props(Actor对象), “Actor名称”)来加载 Actor
Actor Path
- 每一个 Actor都有一个 Path, 这个路径可以被外部引用
类型 | 路径 |
---|---|
本地 Actor | akka://actorSystem名称/user/Actor名称 |
远程 Actor | akka.tcp://dest-actorSystem@ip地址:port/user/Actor名称 |
入门案例
- 通过 ActorSystem加载两个 Actor(SenderActor& ReceiverActor), 并从 SenderActor发消息, 在 ReceiverActor接收, 再回复消息
package com.akka.ex1/*** 提交任务的消息格式** @param msg 发送信息*/
case class SubmitTaskMessage(msg: String)/*** 提交任务成功后的回执信息的格式** @param msg 回执信息*/
case class SuccessSubmitTaskMessage(msg: String)package com.akka.ex1import akka.actor.Actorobject SenderActor extends Actor {override def receive: Receive = {// 接收 Entrance发的消息: startcase "start" => {println("SenderActor, received: start!")// 获取 ReceiverActor的路径val receiverActor = context.actorSelection("akka://actorSystem/user/receiverActor")// 给 ReceiverActor发送消息receiverActor ! SubmitTaskMessage("Hello ReceiverActor!, This is SenderActor.")}// 接收 ReceiverActor返回的回执信息case SuccessSubmitTaskMessage(msg) => println(s"SenderActor, received: ${msg}")}
}package com.akka.ex1import akka.actor.Actorobject ReceiverActor extends Actor {override def receive: Receive = {// 接收 SenderActor发的消息case SubmitTaskMessage(msg) => {println(s"ReceiverActor, received: ${msg}")// 给 SenderActor回复信息sender ! SuccessSubmitTaskMessage("Hi!, This is ReceiverActor.")}}
}package com.akka.ex1import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactoryobject Entrance {def main(args: Array[String]): Unit = {/*** 创建ActorSystem, 用来负责创建和监督 Actor** @param name : scala.Predef.String 给 ActorSystem设置名字* @param config : com.typesafe.config.Config 配置环境*/val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())/*** 通过 ActorSystem来加载自定义 Actor对象** @param props : akka.actor.Props 指定要管理的 Actor伴生对象* @param name : scala.Predef.String 给指定 Actor对象设置名称*/val senderActor = actorSystem.actorOf(Props(SenderActor), "senderActor")// 必须给每个 Actor设置名称, 否则无法从 SenderActor内部通过 context.actorSelection方式, 获得 ReceiverActor的对象// 而会提示 Actor[akka://actorSystem/user/receiverActor] was not deliveredactorSystem.actorOf(Props(ReceiverActor), "receiverActor")// 给 SenderActor发送 "start"字符串senderActor ! "start"}
}
SenderActor, received: start!
ReceiverActor, received: Hello ReceiverActor!, This is SenderActor.
SenderActor, received: Hi!, This is ReceiverActor.
定时任务案例
- 通过
ActorSystem.scheduler.schedule()方法
, 启动定时任务 - 使用方式 1:
final def schedule(initialDelay : FiniteDuration, // 首次开始, 按此设定的时间, 延迟后执行interval : FiniteDuration, // 每隔多久执行一次(首次开始, 立马执行, 不延时receiver : ActorRef, // 设置目标接收消息的 Actormessage : Any) // 要发送的消息(implicit executor : ExecutionContext, sender : ActorRef = {}) // 隐式参数, 需导入
- 使用方式 2:
final def schedule(initialDelay : FiniteDuration, // 首次开始, 按此设定的时间, 延迟后执行interval : FiniteDuration // 每隔多久执行一次(首次开始, 立马执行, 不延时)(f : => Unit) // 定期要执行的函数(消息(implicit executor : ExecutionContext) // 隐式参数, 需导入
package com.akka.ex2import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactoryobject MainActor {object ReceiverActor extends Actor {override def receive: Receive = {case x => println(x)}}def main(args: Array[String]): Unit = {// 创建ActorSystem, 用来负责创建和监督 Actorval actorSystem = ActorSystem("actorSystem", ConfigFactory.load())// 通过 ActorSystem来加载自定义 Actor对象val receiverActor = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")// 导入隐式参数& 转换import actorSystem.dispatcherimport scala.concurrent.duration._// 通过定时器, 定时给 ReceiverActor发送消息// 方式 1: 采用提供的 Any数据类型参数的消息actorSystem.scheduler.schedule(3 seconds, 2 seconds, receiverActor, "Hello ReceiverActor!, 111.")// 方式 2: 采用自定义函数的消息actorSystem.scheduler.schedule(0 seconds, 5 seconds) {receiverActor ! "Hello ReceiverActor!, 222."}}
}
两个进程之间的通信案例
- WorkerActor发送 "connect"消息给 MasterActor
- MasterActor回复 "success"消息给 WorkerActor
- WorkerActor接收并打印接收到的消息
package com.akka.masterimport akka.actor.Actorobject MasterActor extends Actor {override def receive: Receive = {case "setup" => println("MasterActor started!")// 接收 WorkerActor发的消息case "connect" => {println("MasterActor, received: connect!")// 给发送者(WorkerActor)返回的回执信息sender ! "success"}}
}package com.akka.masterimport akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactoryobject Entrance {def main(args: Array[String]): Unit = {val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())val masterActor = actorSystem.actorOf(Props(MasterActor), "masterActor")// 给 MasterActor发送消息masterActor ! "setup"}
}package com.akka.workerimport akka.actor.Actor// WorkerActor的路径: akka.tcp://actorSystem@127.0.0.1:8081/user/workerActor
object WorkerActor extends Actor {override def receive: Receive = {case "setup" => {println("WorkerActor started!")// 远程获取 MasterActorval masterActor = context.system.actorSelection("akka.tcp://actorSystem@127.0.0.1:8080/user/masterActor")// 给 MasterActor发送字符串 connectmasterActor ! "connect"}// 接收 MasterActor发的消息case "success" => println("MasterActor, received: success!")}
}package com.akka.workerimport akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactoryobject Entrance {def main(args: Array[String]): Unit = {val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())val workerActor = actorSystem.actorOf(Props(WorkerActor), "workerActor")// 给 WorkerActor发送消息workerActor ! "setup"}
}
简易版 Spark通信框架实现案例
- 模拟 Spark的 Master与多个 Worker的通信
- 运作步骤:
启动 MasterActor
1.1 MasterActor对象构建后,开启定时任务(用于自检, 为移除超时的 WorkerActor
启动 WorkerActor
2.1 WorkerActor对象构建后, 将自身信息封装成注册信息
后, 发给 MasterActorMasterActor接收 WorkerActor的注册信息, 并保存
3.1 给 WorkerActor回执信息WorkerActor请求注册后, 接收到信息, 并打印 Connection is successful!
4.1 开启定时任务, 给 MasterActor发心跳消息MasterActor接收 WorkerActor发来的心跳消息, 将该 WorkerActor的注册信息中的最后心跳时间更新为当前时间
工程名 | 说明 |
---|---|
scala-spark-akka-common | 存放公共的消息实体类 |
scala-spark-akka-master | Akka Master节点 |
scala-spark-akka-worker | Akka Worker节点 |
package com.akka.spark.common/*** 用来保存已注册的 WorkerActor的信息的类** @param workerId : WorkerActor的 Id(UUID* @param cpuCores : WorkerActor的 CPU核数* @param memory : WorkerActor 的内存大小* @param lastHeartBeatTime : 最后一次心跳时间*/
case class WorkerInfo(workerId: String, cpuCores: Int, memory: Int, lastHeartBeatTime: Long)package com.akka.spark.common/*** WorkerActor提交注册信息的类** @param workerId : WorkerActor的 Id(UUID* @param cpuCores : WorkerActor的 CPU核数* @param memory : WorkerActor 的内存大小*/
case class WorkerRegisterMessage(workerId: String, cpuCores: Int, memory: Int)/** 注册成功后回执的单例对象*/
case object RegisterSuccessMessage/*** WorkerActor定时触发心跳到 MasterActor的信息类** @param workerId : WorkerActor的 Id(UUID* @param cpuCores : WorkerActor的 CPU核数* @param memory : WorkerActor 的内存大小*/
case class WorkerHeartBeatMessage(workerId: String, cpuCores: Int, memory: Int)package com.akka.spark.masterimport com.typesafe.config.{Config, ConfigFactory}// 用来读取配置文件信息的类
object ConfigUtils {// 1. 获取配置文件对象private val config: Config = ConfigFactory.load()// 2. 获取检查 WorkerActor心跳的时间间隔val `master.check.heartbeat.interval` = config.getInt("master.check.heartbeat.interval")// 3. 获取 WorkerActor心跳超时时间val `master.check.heartbeat.timeout` = config.getInt("master.check.heartbeat.timeout")
}package com.akka.spark.masterimport akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactoryobject Master {def main(args: Array[String]): Unit = {val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())actorSystem.actorOf(Props(MasterActor), "masterActor")}
}package com.akka.spark.masterimport java.util.Date
import akka.actor.Actor
import com.akka.spark.common.{RegisterSuccessMessage, WorkerHeartBeatMessage, WorkerInfo, WorkerRegisterMessage}object MasterActor extends Actor {// 1. 定义一个可变的 Map集合, 用来保存已注册的 WorkerActor信息private val regWorkerMap = scala.collection.mutable.Map[String, WorkerInfo]()// MasterActor定期检查 WorkerActor心跳, 将超时的 Worker移除override def preStart(): Unit = {// 1. 导入时间隐式参数& 转换import context.dispatcherimport scala.concurrent.duration._// 2. 启动定时任务(MasterActor自检移除超时的 WorkerActor)context.system.scheduler.schedule(0 seconds, ConfigUtils.`master.check.heartbeat.interval` seconds) {// 3. 过滤超时的 WorkerActor(返回被过滤的 workerId集合val timeOutWorkerMap = regWorkerMap.filter {keyVal => // keyVal的数据格式: workerId -> WorkerInfo(workerId, cpuCores, memory, lastHeartBeatTime)// 3.1 获取当前 WorkerActor对象的最后一次心跳时间val lastHeartBeatTime = keyVal._2.lastHeartBeatTime// 3.2 如果超时, 则 true, 否则 false (当前时间 - 最后一次心跳时间) > 最大超时时间 * 1000if ((new Date().getTime - lastHeartBeatTime) > (ConfigUtils.`master.check.heartbeat.timeout` * 1000)) true else false}// 4. 将要移除的已超时 workerId集合if (!timeOutWorkerMap.isEmpty) {regWorkerMap --= timeOutWorkerMap.map(_._1) // ArrayBuffer(5b9feb50-5c33-496b-a325-dd168360281b)}// 5. 有效的 WorkerActor, 按内存大小进行降序排列val workerList = regWorkerMap.map(_._2).toList.sortBy(_.memory).reverseprintln(s"Active WorkerActors: ${workerList}")}}override def receive: Receive = {// 接收 WorkerActor的注册信息case WorkerRegisterMessage(workerId, cpuCores, memory) => {// 打印接收到的注册信息println(s"MasterActor, received info: ${workerId}, ${cpuCores}, ${memory}")// 将注册信息保存到哈希表中& 并记录最后一次心跳时间regWorkerMap += workerId -> WorkerInfo(workerId, cpuCores, memory, new Date().getTime)// 给注册的 WorkerActor回执信息sender ! RegisterSuccessMessage}// 接收 WorkerActor发来的心跳消息case WorkerHeartBeatMessage(workerId, cpuCores, memory) => {println(s"MasterActor, received heartbeat: ${workerId}")// 更新指定的 WorkerActor对象的最后一次心跳时间regWorkerMap += workerId -> WorkerInfo(workerId, cpuCores, memory, new Date().getTime)}}
}package com.akka.spark.workerimport com.typesafe.config.{Config, ConfigFactory}// 用来读取配置文件信息的类
object ConfigUtils {// 1. 获取配置文件对象private val config: Config = ConfigFactory.load()// 2. 获取 WorkerActor心跳的间隔时间val `worker.heartbeat.interval`: Int = config.getInt("worker.heartbeat.interval")
}package com.akka.spark.workerimport akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactoryobject Worker {def main(args: Array[String]): Unit = {val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())actorSystem.actorOf(Props(WorkerActor), "workerActor")}
}package com.akka.spark.workerimport java.util.UUID
import akka.actor.{Actor, ActorSelection}
import com.akka.spark.common.{RegisterSuccessMessage, WorkerHeartBeatMessage, WorkerRegisterMessage}
import scala.util.Randomobject WorkerActor extends Actor {// 表示 MasterActor的引用private var masterActor: ActorSelection = _// WorkerActor的注册信息private var workerId: String = _private var cpuCores: Int = _ // CPU核数private var memory: Int = _ // 内存大小private val cpuCoreList = List(1, 2, 3, 4, 6, 8) // CPU核心数的随机取值范围private val memoryList = List(512, 1024, 2048, 4096) // 内存大小的随机取值范围override def preStart(): Unit = {// 获取 MasterActor的引用masterActor = context.system.actorSelection("akka.tcp://actorSystem@127.0.0.1:8080/user/masterActor")// 随机设置编号workerId = UUID.randomUUID().toString// 随机选择 WorkerActor的 CPU核数和内存大小val r = new Random()cpuCores = cpuCoreList(r.nextInt(cpuCoreList.length))memory = memoryList(r.nextInt(memoryList.length))// 封装 WorkerActor的注册信息val registerMessage = WorkerRegisterMessage(workerId, cpuCores, memory)// 发给 MasterActormasterActor ! registerMessage}override def receive: Receive = {// 注册成功的回执信息case RegisterSuccessMessage => {println("Connection is successful!")// 1. 导入时间隐式参数& 转换import context.dispatcherimport scala.concurrent.duration._// 定时给 MasterActor发心跳消息context.system.scheduler.schedule(0 seconds, ConfigUtils.`worker.heartbeat.interval` seconds) {masterActor ! WorkerHeartBeatMessage(workerId, cpuCores, memory)}}}
}
如果您觉得有帮助,欢迎点赞哦 ~ 谢谢!!
Scala并发编程(二)之 Akka相关推荐
- Scala入门到精通——第二十六节 Scala并发编程基础
本节主要内容 Scala并发编程简介 Scala Actor并发编程模型 react模型 Actor的几种状态 Actor深入使用解析 1. Scala并发编程简介 2003 年,Herb Sutte ...
- 【并发编程二】c++创建子进程CreateProcess()
[并发编程二]c++创建子进程CreateProcess() 一.创建子进程 二.demo 三.构建.编译.运行 四.相关知识介绍 1.CreateProcess 参数介绍 1.1.lpApplica ...
- 【并发编程二十】协程(coroutine)_协程库
[并发编程二十]协程(coroutine) 一.线程的缺点 二.协程 三.优点 四.个人理解 五.协程库 1.window系统 2.unix系统(包括linux的各个版本) 2.1.makeconte ...
- 15. Scala并发编程模型Akka
15.1 Akka介绍 1) Akka是Java虚拟机JVM平台上构建高并发.分布式和容错应用的工具包和运行时,可以理解成Akka是编写并发程序的框架 2) Akka用Scala语言写成,同时提供了S ...
- Java 并发编程(二):如何保证共享变量的原子性?
线程安全性是我们在进行 Java 并发编程的时候必须要先考虑清楚的一个问题.这个类在单线程环境下是没有问题的,那么我们就能确保它在多线程并发的情况下表现出正确的行为吗? 我这个人,在没有副业之前,一心 ...
- Java并发编程(二十三)------并发设计模式之生产者消费者模式
参考文章:Java实现生产者消费者问题与读者写者问题 目录 1. 生产者消费者问题 1.1 wait() / notify()方法 1.2 await() / signal()方法 1.2.1 对sy ...
- JUC并发编程二 并发架构--线程运行原理
sleep与yield的区别 两阶段终止模式
- java面试-Java并发编程(二)——重排序
当我们写一个单线程程序时,总以为计算机会一行行地运行代码,然而事实并非如此. 什么是重排序? 重排序指的是编译器.处理器在不改变程序执行结果的前提下,重新排列指令的执行顺序,以达到最佳的运行效率. 重 ...
- java并发编程(二十六)——单例模式的双重检查锁模式为什么必须加 volatile?
前言 本文我们从一个问题出发来进行探究关于volatile的应用. 问题:单例模式的双重检查锁模式为什么必须加 volatile? 什么是单例模式 单例模式指的是,保证一个类只有一个实例,并且提供一个 ...
最新文章
- shell脚本教学进阶——Linux三大文本处理工具之grep
- 开源的javascript实现页面打印功能,兼容所有的浏览器(情况属实)
- 汇编 db,dw,dd
- 国内外软件开发上的差距与分析
- kafka直连方式消费多个topic
- linux重启网络服务_vm上linux虚拟机NAT模式配置
- 8种最坑的SQL错误用法,你有没有踩过坑?
- 基于Visual C++2013拆解世界五百强面试题--题8-数组的排序和查找
- Unix文件系统基本结构
- python读取二进制文件_Python读写二进制文件
- 人民币大写转换 java_java人民币转大写中文
- Vue项目使用SSR服务器渲染
- git命令 之 切糕大全
- 刷脸支付便利更好推动普惠金融的落地
- oracle表数据导出成unl文件,oracle的文本导入、导出技巧
- 关于QQ开心农场外挂 开发
- 使用jira管理Scrum敏捷项目实战(四)jira自定义电子看板、敏捷看板、KANBAN配置
- 【李佳辉_周报_2022.10.30】
- linux寄存器位运算,位运算的一些操作
- 能被2、3、4、5、6、7、8、9等数整除的数的特征
热门文章
- 开博记录-二零一捌零柒二捌
- xbmc_如何在XBMC上获取Hulu和Amazon视频
- Unity(设置射线检测对象)
- html怎么引入多个字体文件,css – 如何为同一个字体添加多个字体文件?
- office2013卸载
- Failed to connect to github.com port 443 after 21080 ms
- 如何在软件开发团队中进行有效沟通
- 说走就走的「Windows」—— Windows To Go 制作详解
- Node.js联机游戏——gobang五子棋(客户端+服务端+websocket的双人游戏)
- 14.(地图工具篇)ArcMap点图层(shape图层)转Geojson