一 Dispatchers

1.1 理解分发器

分发器是一个通信协调员的角色,主要负责消息的接收和传递。主要依据一定的分发策略,用于控制执行流程,然后将到来的消息或者请求路由给相关的业务进程。

提供飞机的航空公司类似于Akka的MailBox,机场跑道类似于Akka的线程资源,航空交通管制塔类似于Dispatcher

1.2 分发器的类型

1.2.1 Dispatcher

这是默认的分发器,它是一个基于事件的,绑定了一套Actors到一个线程池中。

特点如下:

# 每一个actor都依赖他们自己的邮箱

# 可以被任何数量的actors共享

# 是对非阻塞的优化

1.2.2 PinnedDispatcher

它对于每一个Actor提供了单独的或者专用的线程,在做一些I/O操作或者执行一些长时间计算任务的task.

# 每一个actor都依赖他们自己的邮箱

# 对于每一个actor的专用线程不能和其他actor共享

# 分发器依赖于线程池的Execcutor

# 是对阻塞操作的优化

1.2.3 BalancingDispatcher

他可以重新分配工作从比较忙的actor到比较空闲的actor,任务的重新分配是有前提的,那就是所有的actor的类型是必须一致的

特点:

# 只有一个邮箱

# 分发器被所有相同类型的actors共享

# 分发器依赖于一个线程池或者fork join pool

1.2.4 CallingThreadDispatcher

他只是在相同的线程上运行任务,不能创建任何新的线程,并且提供了一个执行顺序

特点:

# 每一个actor都依赖于自己的邮箱

# 所有actors共享分发器

# 分发器依赖于当前的调用线程

1.3 Dispatcher的使用

Akka支持的Executor Context:

第一种: Thread PoolExecutor

创建工作者线程池,任务队列分配到池子,如果task数量超过了线程数,那么就会排队等候,直到池子有空闲线程的时候

第二种:Fork JoinExecutor

这是基于分儿治之的前提,他的核心思想就是把一个大的任务划分成多个小任务,最后在进行结果的联合,这些任务是能够独立运行的

对于每一个ExecutorContext, Akka都可以指定一个配置文件参数,这些参数主要定义:

# 分配的最小数量的线程数

# 分配的最大数量的线程数

# 所使用的系数(基于可用的CPU核数)

比如最小线程数为3,系数为2,那么dispatcher总共可用开始6个线程

1.4 如何配置参数

它会默认去读取classpath下的application.conf文件,所以你可以定义一个该文件,然后在里面配置

akka {loglevel = INFO
}# 你的dispatcher名字
my-dispatcher {# 指定Dispatcher类型[Dispatcher/PinnedDispatcher/BalancingDispatcher/CallingThreadDispatcher]type = Dispatcher# 指定使用哪一种 Executor Context [thread-pool-executor/fork-join-executor]executor = "fork-join-executor"# 针对executor context 类型配置参数fork-join-executor {# 最小的并发数parallelism-min = 2# 并发因子可以计算并发总并发数 ceil(available processors * factor)parallelism-factor = 2.0# 最大并发数parallelism-max = 10}thread-pool-executor {# 线程池最小线程数core-pool-size-min = 2# 因子 (有效核数)*因子core-pool-size-factor = 2.0# 线程池最大线程数core-pool-size-max = 10}# 每一个Actor在跳到另外一个Actor之前能处理的最大消息的吞吐量# 设成1,对于每一个Actor都是公平的throughput = 100# 指定mailbox的容量大小,如果设置成0 表示没有边界mailbox-capacity = 100# 指定mailbox的类型,这个应该配合mailbox-capacity# mailbox-type = ""
}

1.5 选择哪一种Dispatcher的策略

# 分发器的选择取决于你的应用程序是阻塞还是非阻塞

# executor的选择取决于你的应用程序逻辑,是不是需要将一个任务分成多个小任务,然后每一个小任务可以并行运行

# 依据CPU核数确定你的线程数的最大值和最小值

# 吞吐量的设置

简单的例子:

class DispatcherActor extends UntypedActor{val x:Int = 2var child:ActorRef = _@throws[Exception](classOf[Exception]) overridedef preStart(): Unit = {child = context.actorOf(Props[MsgEchoActor],name="MsgEchoActor")}@throws[Throwable](classOf[Throwable])override def onReceive(message: Any): Unit = {println("[onReceive] => "+message)var result = 0try {result = x + message.toString.toInt} catch {case t:Throwable => t.printStackTrace()}child.tell(result,sender)}
}class MsgEchoActor extends Actor {def receive:Receive = {case a if 1 to 10 contains(a) =>println(s"$a belongs to [1-10]")case b if 11 to 50 contains(b) =>println(s"$b belongs to [11-50]")case c if 51 to 100 contains(c) =>println(s"$c belongs to [51-100]")case _ =>println("Unknown Range Value")}
}
object DispatcherActor extends App {val _system = ActorSystem("DispatcherActorSystem")val dispatcherActor = _system.actorOf(Props[DispatcherActor].withDispatcher("my-dispatcher"),name="DispatcherActor")dispatcherActor ! 88dispatcherActor ! 20dispatcherActor ! 1Thread.sleep(50)_system.terminate()
}

二 MailBox

邮箱,临时存储消息的地方,然后这些消息会派到Actor中,一般情况下,每一个Actor都会依赖一个Mailbox,但是BalancingPool允许所有actor共用一个MailBox

2.1Mailbox类型

以下是一些默认的邮箱实现

他们依赖于Java的并发库里的队列:

Blocking Queue: 阻塞队列意味着一个队列如果满了,那么该队列就会阻塞,直到有空的空间才可以存放数据

Bounded Queue: 边界队列意味着这个队列有大小限制,如果达到指定的数量,那么就不能继续存数据

Unbounded MailBox: 如果不修改akka.actor.default-mailbox配置,那么它是默认的mailbox type.

配置名称:unbounded 或者 akka.dispatch.UnboundedMainbox

SingleConsumerOnlyUnboundedMailBox:

依赖于多个生产者和单个消费者队列,不能被BalancingDispatcher使用,不是阻塞队列,也没有边界

配置名称:akka.dispatch.SingleConsumerOnlyUnbounded

UnboundedControlAwareMailbox: 非阻塞,无边界,高优先级

配置名字:akka.dispatch.UnboundedControlAwareMailbox

UnboundedPriorityMailbox: 消息投递的顺序等价于优先级,非阻塞,无边界,和UnboundedStablePriorityMaibox相反

配置名称:akka.dispatch.UnboundedPriorityMailbox

UnboundedStablePriorityMaibox:不阻塞,无边界,FIFO投递消息

和UnboundedPriorityMailbox相反。

配置名称:akka.dispatch.UnboundedStablePriorityMaibox

BoundedMailBox:只要达到了边界限制,则不能继续push数据

配置名称:bounded或者akka.dispatch.BoundedMailbox

BoundedPriorityMailbox: 消息投递的顺序等价于优先级,非阻塞,有边界

配置名称:akka.dispatch.BoundedPriorityMailbox

BoundedStablePriorityMailbox: 先进先出的顺序投递消息,非阻塞,有边界

配置名字:akka.dispatch.BoundedStablePriorityMailbox

BoundedControlAwareMailbox: 非阻塞,有边界,高优先级

配置名称:akka.dispatch.BoundedControlAwareMailbox

2.2 如何选择Mailbox类型

# 默认情况是默认的mailbox => akka.actor.default-mailbox被使用

怎么设置默认的mailboxtype呢?

akka.actor.default-mailbox {

mailbox-type= "akka.dispatch.SingleConsumerOnlyUnbounded

Mailbox"

}

# 如果actor的部署配置段包含一个mailbox 的配置段,然后在里面描述的mailbox 的类型,那么就会使用这个类型

# 如果actor的Props#withMailbox也可以设置mailbox类型

# 如果分发器的配置段描述了mailbox-type,那么将会使用这个类型

2.3 定制Mailbox

import java.util.concurrent.ConcurrentLinkedQueue
import akka.actor.{ActorRef, PoisonPill, ActorSystem}
import akka.dispatch._
import com.typesafe.config.Config

trait MyUnboundedMessageQueueSemantics
class MyUnboundedMailBox extends MailboxType with ProducesMessageQueue[MyUnboundedMailBox.MyMessageQueue]{
    import MyUnboundedMailBox._
    def this(settings:ActorSystem.Settings,config:Config){
        this()
    }
    override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = {
        new MyMessageQueue()
    }
}

object MyUnboundedMailBox {
    class MyMessageQueue extends MessageQueue with MyUnboundedMessageQueueSemantics {
        private finalval queue = new ConcurrentLinkedQueue[Envelope]()
        override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
            queue.offer(handle)
        }
        override def numberOfMessages: Int = queue.size
        override def dequeue(): Envelope = queue.poll()
        override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
            while (hasMessages){
                deadLetters.enqueue(owner,dequeue())
            }
        }
        override def hasMessages: Boolean = !queue.isEmpty
    }
}

custom-dispatcher {mailbox-requirement = "com.concurrent.akka.mailbox.MyUnboundedMessageQueueSemantics"
}
akka.actor.mailbox.requirements {"com.concurrent.akka.mailbox.MyUnboundedMessageQueueSemantics" = custom-dispatcher-mainbox
}
custom-dispatcher-mainbox {mailbox-type = "com.concurrent.akka.mailbox.MyUnboundedMailBox"
}

三 路由 Routing

我们知道当大量的actor在并行工作的时候,处理到来的消息流,这时候就需要一个组件或者东西来引导消息从源到目的地Actor,这个组件或者东西就是Router

在Akka中,router也是一种actor 类型,它路由到来的消息到其他的actors,其他那些actors就叫做routees(被路由对象)

3.1Akka router支持以下路由策略

2.11以前的方式=>

# RoundRobinRouter: 轮询

# RadnomRouter: 随机

# SmallestMailboxRouter: 空闲

# BroadcastRouter: 广播,将转发相同消息到每一个routees

2.11之后的提供的路由实现:

# RoundRobinRouteLogic

# RandomRouteLogic

# SmallestMailboxRoutingLogic

# BroadcastRoutingLogic

# TailChoppingRoutingLogic

# ConsistentHashingRoutingLogic

3.2 创建router的2种方法

我们创建router,router可以包含需要路由到的Actor即routees

和路由到这些routees的路由策略

3.2.1 程序动态创建

class Master extends Actor{
    var router:Router = _
    @throws[Exception](classOf[Exception]) override
    def preStart(): Unit = {
        router = {
            /*相当于创建了5个actors作为routees*/
           
var routees = Vector.fill(5){
                val r = context.actorOf(Props[Worker],"WorkActor---"+math.round(math.random * 100) + 1)
                println(r.path)
                context watch r
                ActorRefRoutee(r)
            }
            Router(RoundRobinRoutingLogic(),routees)
        }
    }

def receive: Receive = {
        case a:Int => router.route(a,sender)
        case _ => println("参数异常")
    }
}

结果如下:

3.2.2 根据application.conf配置文件针对actor的配置

###############   2种配置方式   ###############
###############   单层结构      ###############
# akka.actor.deployment {
#  /MasterActor2/router1{
#    router = round-robin-pool
#    nr-of-instances = 5
#  }
#}###############   多层结构      ###############
akka {actor{deployment {# 注意这里不需要加/user/MasterActor2/router1 {# 路由策略router = round-robin-pool# routees的数量,默认是按照$+字母的方式nr-of-instances = 5}# 远程调用# '/user/actorA/actorB' is a remote deployed actor# /actorA/actorB {#  remote = "akka.tcp://sampleActorSystem@127.0.0.1:2553"# }# 匹配所有子actor# all direct children of '/user/actorC' have a dedicated dispatcher# "/actorC/*" {# dispatcher = my-dispatcher# }# 为某一个actor指定一个邮箱# '/user/actorD/actorE' has a special priority mailbox# /actorD/actorE {#  mailbox = prio-mailbox# }}}
}# my-dispatcher {
#  fork-join-executor.parallelism-min = 10
#  fork-join-executor.parallelism-max = 10
#}
# prio-mailbox {
#   mailbox-type = "a.b.MyPrioMailbox"
# }
class Master2 extends Actor{var router:ActorRef = _@throws[Exception](classOf[Exception]) overridedef preStart(): Unit = {/*根据配置文件Application.conf创建router*/router = context.actorOf(FromConfig.props(Props[Worker]),"router1")}def receive: Receive = {case a:Long => router ! acase _ => println("参数异常")}
}
object Master extends App {val system = ActorSystem("MasterActorSystem")val x = system.actorOf(Props[Master2],name="MasterActor2")for (i <- 0 until 10) {x ! math.round(math.random * 10) + 1}Thread.sleep(20)system.terminate()
}

结果如下:

3.3Router Actor的分类

Pool: Router actors创建routees作为子的actors,然后如果他们结束了,就从router中删除他们

Group: Routee Actor被Router创建,且使用actor selection,router发送消息到指定的路径,不需要监控终止情况

至于router在配置时候的选择:

router.type-mapping {from-code = "akka.routing.NoRouter"round-robin-pool = "akka.routing.RoundRobinPool"round-robin-group = "akka.routing.RoundRobinGroup"random-pool = "akka.routing.RandomPool"random-group = "akka.routing.RandomGroup"balancing-pool = "akka.routing.BalancingPool"smallest-mailbox-pool = "akka.routing.SmallestMailboxPool"broadcast-pool = "akka.routing.BroadcastPool"broadcast-group = "akka.routing.BroadcastGroup"scatter-gather-pool = "akka.routing.ScatterGatherFirstCompletedPool"scatter-gather-group = "akka.routing.ScatterGatherFirstCompletedGroup"tail-chopping-pool = "akka.routing.TailChoppingPool"tail-chopping-group = "akka.routing.TailChoppingGroup"consistent-hashing-pool = "akka.routing.ConsistentHashingPool"consistent-hashing-group = "akka.routing.ConsistentHashingGroup"
}

3.4 pool相关的只是

3.4.1 远程部署Routees

首先需要配置akka.remotesection在配置文件:

akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote {enabled-transports = ["akka.remote.netty.tcp"]netty.tcp {hostname = "127.0.0.1"port = 10000  # 0表示自动选择一个可用的}
}
object Master extends App {val system = ActorSystem("MasterActorSystem")val address = Seq(Address("akka.tcp","RemoteActorSystem","localhost",10000),AddressFromURIString("akka.tcp://MasterActorSystem@localhost:10000"))val remoteRouter = system.actorOf(RemoteRouterConfig(RoundRobinPool(3),address).props(Props[Worker]))remoteRouter ! "xxx"
}

3.4.2Supervision 监管

我们可以通过pool router创建routees,然后作为routers的孩子,因此这个router可以监管这些孩子

监管策略 我们可以通过pool的supervisorStrategy属性设置,如果没有显示提供那么,默认就是always escalate策略,这就意味着错误是传递给routers的监管者来处理

3.5Group相关知识

有时候,我们希望通过传递path参数创建routees,消息最后通过ActorSelection发送

3.5.1application.conf配置

akka.actor.deployment {/master/router {router = broadcast-grouproutees.paths = ["/user/master/worker1","/user/master/worker2","/user/master/worker3"]}
}

3.5.2手动提供path创建routees

class Master3 extends Actor{var router:ActorRef = _@throws[Exception](classOf[Exception]) overridedef preStart(): Unit = {/*自己提供path路径*/val paths = List("/user/master/worker1","/user/master/worker2","/user/master/worker3")router = context.actorOf(BroadcastGroup(paths).props(),"router")context.actorOf(Props[Worker],name="worker1")context.actorOf(Props[Worker],name="worker2")context.actorOf(Props[Worker],name="worker3")}def receive: Receive = {case a:Long => router ! acase _ => println("参数异常")}
}
class Worker extends Actor with ActorLogging{def receive:Receive = {case a if 1 to 10 contains(a) =>log.info(s"$a belongs to [1-10]")case b if 11 to 50 contains(b) =>log.info(s"$b belongs to [11-50]")case c if 51 to 100 contains(c) =>log.info(s"$c belongs to [51-100]")case _ =>log.info("Unknown Range Value")}
}
object Master extends App {val _system = ActorSystem("MasterActorSystem",config = ConfigFactory.load("group"))val master = _system.actorOf(Props[Master3],"master")for (x <- 0 until 2 ) {master ! math.round(math.random * 20) + 1}Thread.sleep(100)_system.terminate()}
 
结果:每一个routees都给广播一条消息
 

3.5.3 加载配置文件创建routees

class Master3 extends Actor{var router:ActorRef = _@throws[Exception](classOf[Exception]) overridedef preStart(): Unit = {/*自己提供path路径val paths = List("/user/master/worker1","/user/master/worker2","/user/master/worker3")router = context.actorOf(BroadcastGroup(paths).props(),"router")*//*读取配置文件参数,创建router*/router = context.actorOf(FromConfig.props(),"router")context.actorOf(Props[Worker],name="worker1")context.actorOf(Props[Worker],name="worker2")context.actorOf(Props[Worker],name="worker3")}def receive: Receive = {case a:Long => router ! acase _ => println("参数异常")}
}

其余步骤一样

akka之分发和路由相关推荐

  1. akka 简介_Akka HTTP路由简介

    akka 简介 by Miguel Lopez 由Miguel Lopez Akka HTTP路由简介 (An introduction to Akka HTTP routing) Akka HTTP ...

  2. 路由-策略Policy(路由重分发、路由过滤、路由策略)

    Preifx-list前缀列表,用于抓取路由: Distribute-list分发列表,用于RIP和EIGRP过滤路由: Filter-list过滤列表,用于OSPF过滤路由: Route-map路由 ...

  3. Django的路由分发与名称空间

    路由分发django每一个app下面都可以有自己的urls.py路由层,templates文件夹,static文件夹项目名下urls.py(总路由)不再做路由与视图函数的匹配关系而是做路由的分发fro ...

  4. Web框架之Django_03 路由层了解(路有层 无名分组、有名分组、反向解析、路由分发 视图层 JsonResponse,FBV、CBV、文件上传)

    阅读目录 一.路由层:(Django的路由系统) 二.伪静态网页和虚拟环境: 三.FBV与CBV.JsonResponse.文件上传 一.路由层:(Django的路由系统) URL配置(Django项 ...

  5. 现实生活中常用的动态路由—OSPF路由重分发

    OSPF路由重分发 一.路由重分发 1.1路由重分发的考虑 1.2重分发到OSPF的路径类型 1.2.1OSPF的路径类型的优先级 2.OSPF重分发配置命令 二.NSSA区域 三.地址汇总的作用 四 ...

  6. 初识OSPF(三)——路由重分发及虚链路

    初识OSPF(三)--路由重分发及虚链路 前言 一.路由重分发 1.理解路由重分发 2.路由重分发的考虑 3.重分发到OSPF域中路由的路径类型 4.配置命令 二.NSSA区域 1.NSSA 2.配置 ...

  7. OSPF——多区域概念及配置、ABR简介、ASBR简介、路由重分发

    目录 部署多区域注意点: ABR路由器(区域边界路由器)简介: ABR路由器满足条件: 路由重分发简介: ASBR路由器简介: OSPF区域:0:骨干区域:非0--非骨干区域 部署多区域注意点: (1 ...

  8. Django-安装/分组命名/路由分发

    一.安装Django 命令行窗口: pycharm安装: 二.创建Django项目 命令行窗口创建项目: 访问地址: 表示访问成功 注意如果我们在命令行窗口创建的应用需要我们手动的在django的se ...

  9. OSPF —— 多区域部署 + ABR + ASBR + 路由重分发

    目录 一.OSPF多区域的部署: (1)部署多区域注意点: (2)单区域缺点: (3)划分区域的好处: 二.ABR || ASBR 路由器: (1)ABR路由器(区域边界路由器)简介: (2)ABR路 ...

最新文章

  1. Tensorflow实战之下载MNIST数据,自动分成train, validation和test三个数据集
  2. @Configuration
  3. tar解压出错:gzip: stdin: unexpected end of file的解决
  4. ctf题目:看不见的flag_记一次江西省信息安全线下CTF比赛
  5. python repusts模块_Python tslearn包_程序模块 - PyPI - Python中文网
  6. emu8086 寻址方式
  7. C9:Unity3D制作智能家居设计软件——导入户型图自动设计(算法剖析+源码实现篇)
  8. 教师管理系统_ER图_功能图_数据字典_数据库脚本
  9. 安卓搞机教程--修改设置里 添加选项 添加文字 修改图标 修改版本号等等 实例解析
  10. 前端需要学习c语言吗,我应该在学习C语言之前学习HTML或CSS吗?
  11. 华为手机usb连接计算机,华为手机USB为什么连接不上电脑(3个方法彻底解决)...
  12. 判断邮箱正确的c语言代码,如何用c语言来识别电子邮箱是否正确
  13. 解决“微信与此IPAD不兼容
  14. 正是岳麓好风景,软件逢君正当时
  15. Centos 7分辨率调整成适应虚拟机屏幕大小
  16. 【算法】判断一个点是否在多边形之内
  17. FT232RL为接口转换芯片
  18. 二级域名做网站有哪些优势?
  19. 2022-12-01:从不订购的客户。找出所有从不订购任何东西的客户,以下数据的答案输出是Henry和Max,sql语句如何写? DROP TABLE IF EXISTS `customers`; C
  20. 【层级文本分类】Constrained Sequence-to-Tree Generation for Hierarchical Text Classification

热门文章

  1. node在regedit配置哪个位置_Spring Boot 2.x基础教程:Spring Data JPA的多数据源配置
  2. kubernetes service 原理解析
  3. python中形参可以使用中文定义嘛_python中函数的参数分类
  4. 机器学习和深度学习_算法测评 | 机器学习VS深度学习
  5. php 求 相似 比,php比较相似字符串的方法
  6. html网页报告怎么导出,cucumber生成html的报告实现步骤
  7. java调用MySQL脚本_Java调用SQL脚本执行常用的方法示例
  8. python import MySQLdb 解决报错 Error:Reason: image not found
  9. 贪心算法求解TSP问题(python)
  10. 昆明理工大学c语言设计大作业,昆明理工大学大一C语言大作业题目