akka之分发和路由
一 Dispatchers
1.1 理解分发器
分发器是一个通信协调员的角色,主要负责消息的接收和传递。主要依据一定的分发策略,用于控制执行流程,然后将到来的消息或者请求路由给相关的业务进程。
提供飞机的航空公司类似于Akka的MailBox,机场跑道类似于Akka的线程资源,航空交通管制塔类似于Dispatcher
1.2 分发器的类型
1.2.1 Dispatcher
这是默认的分发器,它是一个基于事件的,绑定了一套Actors到一个线程池中。
特点如下:
# 每一个actor都依赖他们自己的邮箱
# 可以被任何数量的actors共享
# 是对非阻塞的优化
1.2.2 PinnedDispatcher
它对于每一个Actor提供了单独的或者专用的线程,在做一些I/O操作或者执行一些长时间计算任务的task.
# 每一个actor都依赖他们自己的邮箱
# 对于每一个actor的专用线程不能和其他actor共享
# 分发器依赖于线程池的Execcutor
# 是对阻塞操作的优化
1.2.3 BalancingDispatcher
他可以重新分配工作从比较忙的actor到比较空闲的actor,任务的重新分配是有前提的,那就是所有的actor的类型是必须一致的
特点:
# 只有一个邮箱
# 分发器被所有相同类型的actors共享
# 分发器依赖于一个线程池或者fork join pool
1.2.4 CallingThreadDispatcher
他只是在相同的线程上运行任务,不能创建任何新的线程,并且提供了一个执行顺序
特点:
# 每一个actor都依赖于自己的邮箱
# 所有actors共享分发器
# 分发器依赖于当前的调用线程
1.3 Dispatcher的使用
Akka支持的Executor Context:
第一种: Thread PoolExecutor
创建工作者线程池,任务队列分配到池子,如果task数量超过了线程数,那么就会排队等候,直到池子有空闲线程的时候
第二种:Fork JoinExecutor
这是基于分儿治之的前提,他的核心思想就是把一个大的任务划分成多个小任务,最后在进行结果的联合,这些任务是能够独立运行的
对于每一个ExecutorContext, Akka都可以指定一个配置文件参数,这些参数主要定义:
# 分配的最小数量的线程数
# 分配的最大数量的线程数
# 所使用的系数(基于可用的CPU核数)
比如最小线程数为3,系数为2,那么dispatcher总共可用开始6个线程
1.4 如何配置参数
它会默认去读取classpath下的application.conf文件,所以你可以定义一个该文件,然后在里面配置
akka {loglevel = INFO }# 你的dispatcher名字 my-dispatcher {# 指定Dispatcher类型[Dispatcher/PinnedDispatcher/BalancingDispatcher/CallingThreadDispatcher]type = Dispatcher# 指定使用哪一种 Executor Context [thread-pool-executor/fork-join-executor]executor = "fork-join-executor"# 针对executor context 类型配置参数fork-join-executor {# 最小的并发数parallelism-min = 2# 并发因子可以计算并发总并发数 ceil(available processors * factor)parallelism-factor = 2.0# 最大并发数parallelism-max = 10}thread-pool-executor {# 线程池最小线程数core-pool-size-min = 2# 因子 (有效核数)*因子core-pool-size-factor = 2.0# 线程池最大线程数core-pool-size-max = 10}# 每一个Actor在跳到另外一个Actor之前能处理的最大消息的吞吐量# 设成1,对于每一个Actor都是公平的throughput = 100# 指定mailbox的容量大小,如果设置成0 表示没有边界mailbox-capacity = 100# 指定mailbox的类型,这个应该配合mailbox-capacity# mailbox-type = "" }
1.5 选择哪一种Dispatcher的策略
# 分发器的选择取决于你的应用程序是阻塞还是非阻塞
# executor的选择取决于你的应用程序逻辑,是不是需要将一个任务分成多个小任务,然后每一个小任务可以并行运行
# 依据CPU核数确定你的线程数的最大值和最小值
# 吞吐量的设置
简单的例子:
class DispatcherActor extends UntypedActor{val x:Int = 2var child:ActorRef = _@throws[Exception](classOf[Exception]) overridedef preStart(): Unit = {child = context.actorOf(Props[MsgEchoActor],name="MsgEchoActor")}@throws[Throwable](classOf[Throwable])override def onReceive(message: Any): Unit = {println("[onReceive] => "+message)var result = 0try {result = x + message.toString.toInt} catch {case t:Throwable => t.printStackTrace()}child.tell(result,sender)} }class MsgEchoActor extends Actor {def receive:Receive = {case a if 1 to 10 contains(a) =>println(s"$a belongs to [1-10]")case b if 11 to 50 contains(b) =>println(s"$b belongs to [11-50]")case c if 51 to 100 contains(c) =>println(s"$c belongs to [51-100]")case _ =>println("Unknown Range Value")} } object DispatcherActor extends App {val _system = ActorSystem("DispatcherActorSystem")val dispatcherActor = _system.actorOf(Props[DispatcherActor].withDispatcher("my-dispatcher"),name="DispatcherActor")dispatcherActor ! 88dispatcherActor ! 20dispatcherActor ! 1Thread.sleep(50)_system.terminate() }
二 MailBox
邮箱,临时存储消息的地方,然后这些消息会派到Actor中,一般情况下,每一个Actor都会依赖一个Mailbox,但是BalancingPool允许所有actor共用一个MailBox
2.1Mailbox类型
以下是一些默认的邮箱实现
他们依赖于Java的并发库里的队列:
Blocking Queue: 阻塞队列意味着一个队列如果满了,那么该队列就会阻塞,直到有空的空间才可以存放数据
Bounded Queue: 边界队列意味着这个队列有大小限制,如果达到指定的数量,那么就不能继续存数据
Unbounded MailBox: 如果不修改akka.actor.default-mailbox配置,那么它是默认的mailbox type.
配置名称:unbounded 或者 akka.dispatch.UnboundedMainbox
SingleConsumerOnlyUnboundedMailBox:
依赖于多个生产者和单个消费者队列,不能被BalancingDispatcher使用,不是阻塞队列,也没有边界
配置名称:akka.dispatch.SingleConsumerOnlyUnbounded
UnboundedControlAwareMailbox: 非阻塞,无边界,高优先级
配置名字:akka.dispatch.UnboundedControlAwareMailbox
UnboundedPriorityMailbox: 消息投递的顺序等价于优先级,非阻塞,无边界,和UnboundedStablePriorityMaibox相反
配置名称:akka.dispatch.UnboundedPriorityMailbox
UnboundedStablePriorityMaibox:不阻塞,无边界,FIFO投递消息
和UnboundedPriorityMailbox相反。
配置名称:akka.dispatch.UnboundedStablePriorityMaibox
BoundedMailBox:只要达到了边界限制,则不能继续push数据
配置名称:bounded或者akka.dispatch.BoundedMailbox
BoundedPriorityMailbox: 消息投递的顺序等价于优先级,非阻塞,有边界
配置名称:akka.dispatch.BoundedPriorityMailbox
BoundedStablePriorityMailbox: 先进先出的顺序投递消息,非阻塞,有边界
配置名字:akka.dispatch.BoundedStablePriorityMailbox
BoundedControlAwareMailbox: 非阻塞,有边界,高优先级
配置名称:akka.dispatch.BoundedControlAwareMailbox
2.2 如何选择Mailbox类型
# 默认情况是默认的mailbox => akka.actor.default-mailbox被使用
怎么设置默认的mailboxtype呢?
akka.actor.default-mailbox {
mailbox-type= "akka.dispatch.SingleConsumerOnlyUnbounded
Mailbox"
}
# 如果actor的部署配置段包含一个mailbox 的配置段,然后在里面描述的mailbox 的类型,那么就会使用这个类型
# 如果actor的Props#withMailbox也可以设置mailbox类型
# 如果分发器的配置段描述了mailbox-type,那么将会使用这个类型
2.3 定制Mailbox
import java.util.concurrent.ConcurrentLinkedQueue
import akka.actor.{ActorRef, PoisonPill, ActorSystem}
import akka.dispatch._
import com.typesafe.config.Config
trait MyUnboundedMessageQueueSemantics
class MyUnboundedMailBox extends MailboxType with ProducesMessageQueue[MyUnboundedMailBox.MyMessageQueue]{
import MyUnboundedMailBox._
def this(settings:ActorSystem.Settings,config:Config){
this()
}
override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = {
new MyMessageQueue()
}
}
object MyUnboundedMailBox {
class MyMessageQueue extends MessageQueue with MyUnboundedMessageQueueSemantics {
private finalval queue = new ConcurrentLinkedQueue[Envelope]()
override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
queue.offer(handle)
}
override def numberOfMessages: Int = queue.size
override def dequeue(): Envelope = queue.poll()
override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
while (hasMessages){
deadLetters.enqueue(owner,dequeue())
}
}
override def hasMessages: Boolean = !queue.isEmpty
}
}
custom-dispatcher {mailbox-requirement = "com.concurrent.akka.mailbox.MyUnboundedMessageQueueSemantics" } akka.actor.mailbox.requirements {"com.concurrent.akka.mailbox.MyUnboundedMessageQueueSemantics" = custom-dispatcher-mainbox } custom-dispatcher-mainbox {mailbox-type = "com.concurrent.akka.mailbox.MyUnboundedMailBox" }
三 路由 Routing
我们知道当大量的actor在并行工作的时候,处理到来的消息流,这时候就需要一个组件或者东西来引导消息从源到目的地Actor,这个组件或者东西就是Router
在Akka中,router也是一种actor 类型,它路由到来的消息到其他的actors,其他那些actors就叫做routees(被路由对象)
3.1Akka router支持以下路由策略
2.11以前的方式=>
# RoundRobinRouter: 轮询
# RadnomRouter: 随机
# SmallestMailboxRouter: 空闲
# BroadcastRouter: 广播,将转发相同消息到每一个routees
2.11之后的提供的路由实现:
# RoundRobinRouteLogic
# RandomRouteLogic
# SmallestMailboxRoutingLogic
# BroadcastRoutingLogic
# TailChoppingRoutingLogic
# ConsistentHashingRoutingLogic
3.2 创建router的2种方法
我们创建router,router可以包含需要路由到的Actor即routees
和路由到这些routees的路由策略
3.2.1 程序动态创建
class Master extends Actor{
var router:Router = _
@throws[Exception](classOf[Exception]) override
def preStart(): Unit = {
router = {
/*相当于创建了5个actors作为routees*/
var routees = Vector.fill(5){
val r = context.actorOf(Props[Worker],"WorkActor---"+math.round(math.random * 100) + 1)
println(r.path)
context watch r
ActorRefRoutee(r)
}
Router(RoundRobinRoutingLogic(),routees)
}
}
def receive: Receive = {
case a:Int => router.route(a,sender)
case _ => println("参数异常")
}
}
结果如下:
3.2.2 根据application.conf配置文件针对actor的配置
############### 2种配置方式 ############### ############### 单层结构 ############### # akka.actor.deployment { # /MasterActor2/router1{ # router = round-robin-pool # nr-of-instances = 5 # } #}############### 多层结构 ############### akka {actor{deployment {# 注意这里不需要加/user/MasterActor2/router1 {# 路由策略router = round-robin-pool# routees的数量,默认是按照$+字母的方式nr-of-instances = 5}# 远程调用# '/user/actorA/actorB' is a remote deployed actor# /actorA/actorB {# remote = "akka.tcp://sampleActorSystem@127.0.0.1:2553"# }# 匹配所有子actor# all direct children of '/user/actorC' have a dedicated dispatcher# "/actorC/*" {# dispatcher = my-dispatcher# }# 为某一个actor指定一个邮箱# '/user/actorD/actorE' has a special priority mailbox# /actorD/actorE {# mailbox = prio-mailbox# }}} }# my-dispatcher { # fork-join-executor.parallelism-min = 10 # fork-join-executor.parallelism-max = 10 #} # prio-mailbox { # mailbox-type = "a.b.MyPrioMailbox" # }
class Master2 extends Actor{var router:ActorRef = _@throws[Exception](classOf[Exception]) overridedef preStart(): Unit = {/*根据配置文件Application.conf创建router*/router = context.actorOf(FromConfig.props(Props[Worker]),"router1")}def receive: Receive = {case a:Long => router ! acase _ => println("参数异常")} } object Master extends App {val system = ActorSystem("MasterActorSystem")val x = system.actorOf(Props[Master2],name="MasterActor2")for (i <- 0 until 10) {x ! math.round(math.random * 10) + 1}Thread.sleep(20)system.terminate() }
结果如下:
3.3Router Actor的分类
Pool: Router actors创建routees作为子的actors,然后如果他们结束了,就从router中删除他们
Group: Routee Actor被Router创建,且使用actor selection,router发送消息到指定的路径,不需要监控终止情况
至于router在配置时候的选择:
router.type-mapping {from-code = "akka.routing.NoRouter"round-robin-pool = "akka.routing.RoundRobinPool"round-robin-group = "akka.routing.RoundRobinGroup"random-pool = "akka.routing.RandomPool"random-group = "akka.routing.RandomGroup"balancing-pool = "akka.routing.BalancingPool"smallest-mailbox-pool = "akka.routing.SmallestMailboxPool"broadcast-pool = "akka.routing.BroadcastPool"broadcast-group = "akka.routing.BroadcastGroup"scatter-gather-pool = "akka.routing.ScatterGatherFirstCompletedPool"scatter-gather-group = "akka.routing.ScatterGatherFirstCompletedGroup"tail-chopping-pool = "akka.routing.TailChoppingPool"tail-chopping-group = "akka.routing.TailChoppingGroup"consistent-hashing-pool = "akka.routing.ConsistentHashingPool"consistent-hashing-group = "akka.routing.ConsistentHashingGroup" }
3.4 pool相关的只是
3.4.1 远程部署Routees
首先需要配置akka.remotesection在配置文件:
akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.remote {enabled-transports = ["akka.remote.netty.tcp"]netty.tcp {hostname = "127.0.0.1"port = 10000 # 0表示自动选择一个可用的} }
object Master extends App {val system = ActorSystem("MasterActorSystem")val address = Seq(Address("akka.tcp","RemoteActorSystem","localhost",10000),AddressFromURIString("akka.tcp://MasterActorSystem@localhost:10000"))val remoteRouter = system.actorOf(RemoteRouterConfig(RoundRobinPool(3),address).props(Props[Worker]))remoteRouter ! "xxx" }
3.4.2Supervision 监管
我们可以通过pool router创建routees,然后作为routers的孩子,因此这个router可以监管这些孩子
监管策略 我们可以通过pool的supervisorStrategy属性设置,如果没有显示提供那么,默认就是always escalate策略,这就意味着错误是传递给routers的监管者来处理
3.5Group相关知识
有时候,我们希望通过传递path参数创建routees,消息最后通过ActorSelection发送
3.5.1application.conf配置
akka.actor.deployment {/master/router {router = broadcast-grouproutees.paths = ["/user/master/worker1","/user/master/worker2","/user/master/worker3"]} }
3.5.2手动提供path创建routees
class Master3 extends Actor{var router:ActorRef = _@throws[Exception](classOf[Exception]) overridedef preStart(): Unit = {/*自己提供path路径*/val paths = List("/user/master/worker1","/user/master/worker2","/user/master/worker3")router = context.actorOf(BroadcastGroup(paths).props(),"router")context.actorOf(Props[Worker],name="worker1")context.actorOf(Props[Worker],name="worker2")context.actorOf(Props[Worker],name="worker3")}def receive: Receive = {case a:Long => router ! acase _ => println("参数异常")} }
class Worker extends Actor with ActorLogging{def receive:Receive = {case a if 1 to 10 contains(a) =>log.info(s"$a belongs to [1-10]")case b if 11 to 50 contains(b) =>log.info(s"$b belongs to [11-50]")case c if 51 to 100 contains(c) =>log.info(s"$c belongs to [51-100]")case _ =>log.info("Unknown Range Value")} }
object Master extends App {val _system = ActorSystem("MasterActorSystem",config = ConfigFactory.load("group"))val master = _system.actorOf(Props[Master3],"master")for (x <- 0 until 2 ) {master ! math.round(math.random * 20) + 1}Thread.sleep(100)_system.terminate()}
结果:每一个routees都给广播一条消息
3.5.3 加载配置文件创建routees
class Master3 extends Actor{var router:ActorRef = _@throws[Exception](classOf[Exception]) overridedef preStart(): Unit = {/*自己提供path路径val paths = List("/user/master/worker1","/user/master/worker2","/user/master/worker3")router = context.actorOf(BroadcastGroup(paths).props(),"router")*//*读取配置文件参数,创建router*/router = context.actorOf(FromConfig.props(),"router")context.actorOf(Props[Worker],name="worker1")context.actorOf(Props[Worker],name="worker2")context.actorOf(Props[Worker],name="worker3")}def receive: Receive = {case a:Long => router ! acase _ => println("参数异常")} }
其余步骤一样
akka之分发和路由相关推荐
- akka 简介_Akka HTTP路由简介
akka 简介 by Miguel Lopez 由Miguel Lopez Akka HTTP路由简介 (An introduction to Akka HTTP routing) Akka HTTP ...
- 路由-策略Policy(路由重分发、路由过滤、路由策略)
Preifx-list前缀列表,用于抓取路由: Distribute-list分发列表,用于RIP和EIGRP过滤路由: Filter-list过滤列表,用于OSPF过滤路由: Route-map路由 ...
- Django的路由分发与名称空间
路由分发django每一个app下面都可以有自己的urls.py路由层,templates文件夹,static文件夹项目名下urls.py(总路由)不再做路由与视图函数的匹配关系而是做路由的分发fro ...
- Web框架之Django_03 路由层了解(路有层 无名分组、有名分组、反向解析、路由分发 视图层 JsonResponse,FBV、CBV、文件上传)
阅读目录 一.路由层:(Django的路由系统) 二.伪静态网页和虚拟环境: 三.FBV与CBV.JsonResponse.文件上传 一.路由层:(Django的路由系统) URL配置(Django项 ...
- 现实生活中常用的动态路由—OSPF路由重分发
OSPF路由重分发 一.路由重分发 1.1路由重分发的考虑 1.2重分发到OSPF的路径类型 1.2.1OSPF的路径类型的优先级 2.OSPF重分发配置命令 二.NSSA区域 三.地址汇总的作用 四 ...
- 初识OSPF(三)——路由重分发及虚链路
初识OSPF(三)--路由重分发及虚链路 前言 一.路由重分发 1.理解路由重分发 2.路由重分发的考虑 3.重分发到OSPF域中路由的路径类型 4.配置命令 二.NSSA区域 1.NSSA 2.配置 ...
- OSPF——多区域概念及配置、ABR简介、ASBR简介、路由重分发
目录 部署多区域注意点: ABR路由器(区域边界路由器)简介: ABR路由器满足条件: 路由重分发简介: ASBR路由器简介: OSPF区域:0:骨干区域:非0--非骨干区域 部署多区域注意点: (1 ...
- Django-安装/分组命名/路由分发
一.安装Django 命令行窗口: pycharm安装: 二.创建Django项目 命令行窗口创建项目: 访问地址: 表示访问成功 注意如果我们在命令行窗口创建的应用需要我们手动的在django的se ...
- OSPF —— 多区域部署 + ABR + ASBR + 路由重分发
目录 一.OSPF多区域的部署: (1)部署多区域注意点: (2)单区域缺点: (3)划分区域的好处: 二.ABR || ASBR 路由器: (1)ABR路由器(区域边界路由器)简介: (2)ABR路 ...
最新文章
- Tensorflow实战之下载MNIST数据,自动分成train, validation和test三个数据集
- @Configuration
- tar解压出错:gzip: stdin: unexpected end of file的解决
- ctf题目:看不见的flag_记一次江西省信息安全线下CTF比赛
- python repusts模块_Python tslearn包_程序模块 - PyPI - Python中文网
- emu8086 寻址方式
- C9:Unity3D制作智能家居设计软件——导入户型图自动设计(算法剖析+源码实现篇)
- 教师管理系统_ER图_功能图_数据字典_数据库脚本
- 安卓搞机教程--修改设置里 添加选项 添加文字 修改图标 修改版本号等等 实例解析
- 前端需要学习c语言吗,我应该在学习C语言之前学习HTML或CSS吗?
- 华为手机usb连接计算机,华为手机USB为什么连接不上电脑(3个方法彻底解决)...
- 判断邮箱正确的c语言代码,如何用c语言来识别电子邮箱是否正确
- 解决“微信与此IPAD不兼容
- 正是岳麓好风景,软件逢君正当时
- Centos 7分辨率调整成适应虚拟机屏幕大小
- 【算法】判断一个点是否在多边形之内
- FT232RL为接口转换芯片
- 二级域名做网站有哪些优势?
- 2022-12-01:从不订购的客户。找出所有从不订购任何东西的客户,以下数据的答案输出是Henry和Max,sql语句如何写? DROP TABLE IF EXISTS `customers`; C
- 【层级文本分类】Constrained Sequence-to-Tree Generation for Hierarchical Text Classification
热门文章
- node在regedit配置哪个位置_Spring Boot 2.x基础教程:Spring Data JPA的多数据源配置
- kubernetes service 原理解析
- python中形参可以使用中文定义嘛_python中函数的参数分类
- 机器学习和深度学习_算法测评 | 机器学习VS深度学习
- php 求 相似 比,php比较相似字符串的方法
- html网页报告怎么导出,cucumber生成html的报告实现步骤
- java调用MySQL脚本_Java调用SQL脚本执行常用的方法示例
- python import MySQLdb 解决报错 Error:Reason: image not found
- 贪心算法求解TSP问题(python)
- 昆明理工大学c语言设计大作业,昆明理工大学大一C语言大作业题目