在上篇讨论里我们提到了become/unbecome。由于它们本质上是堆栈操作,所以只能在较少的状态切换下才能保证堆栈操作的协调及维持程序的清晰逻辑。对于比较复杂的程序流程,Akka提供了FSM:一种通过状态变化进行功能切换的Actor。FSM模式的状态转变特别适合对应现实情况中的程序流程,我们可以用每一种状态来代表一个程序流程。FSM是个trait,定义如下:

trait FSM[S, D] extends Actor with Listeners with ActorLogging {...}

我们看到:FSM就是一个特殊的Actor。带着两个类型参数:S代表状态类型,D代表状态数据类型。实际上S和D结合起来就是FSM的内部状态,即:SomeState+DataX 和 SomeState+DataY分别代表不同的Actor内部状态,这点从State定义可以得到信息:

/*** This captures all of the managed state of the [[akka.actor.FSM]]: the state* name, the state data, possibly custom timeout, stop reason and replies* accumulated while processing the last message.*/case class State[S, D](stateName: S, stateData: D, timeout: Option[FiniteDuration] = None, stopReason: Option[Reason] = None, replies: List[Any] = Nil) {...}

我们可以用下面的表达式来代表FSM功能:

State(SA) x Event(E) -> Actions (A), State(SB)

意思是:假如在状态SA发生了事件E,那么FSM应该实施操作A并把状态转换到SB。这里面操作Action代表某项功能,事件Event是个新的类型,定义如下:

/*** All messages sent to the [[akka.actor.FSM]] will be wrapped inside an* `Event`, which allows pattern matching to extract both state and data.*/final case class Event[D](event: Any, stateData: D) extends NoSerializationVerificationNeeded

Event[D]是个包嵌消息和数据的类型。理论上FSM是通过接收Event来确定运行功能和下一个状态转换,就像普通的Actor接收Message原理一样。我们用上一篇里的FillSeasons作为这次FSM示范的例子。首先定义State,Data:

trait Seasons   //States
case object Spring extends Seasons
case object Summer extends Seasons
case object Fall extends Seasons
case object Winter extends Seasonsclass SeasonInfo(talks: Int, month: Int)  //Data
case object BeginSeason extends SeasonInfo(0,1)

四个状态分别是:春夏秋冬。SeasonInfo代表数据类型,含被问候次数talks,季中月份month两个参数,每季含1,2,3三个月份。目前我们只支持两种功能消息:

object FillSeasons {sealed trait Messages    //功能消息case object HowYouFeel extends Messagescase object NextMonth extends Messages
}

这种普通Actor的消息类型对应到FSM的Event类型中的event:Any。也就是说FSM在收到功能消息后需要构建一个Event类型实例并把消息包嵌在里面。因为FSM继承了Actor,所以它必须实现receive函数。下面是FSM.receive的源代码:

 /** ********************************************       Main actor receive() method* ********************************************/override def receive: Receive = {case TimeoutMarker(gen) ⇒if (generation == gen) {processMsg(StateTimeout, "state timeout")}case t @ Timer(name, msg, repeat, gen) ⇒if ((timers contains name) && (timers(name).generation == gen)) {if (timeoutFuture.isDefined) {timeoutFuture.get.cancel()timeoutFuture = None}generation += 1if (!repeat) {timers -= name}processMsg(msg, t)}case SubscribeTransitionCallBack(actorRef) ⇒// TODO Use context.watch(actor) and receive Terminated(actor) to clean up list
      listeners.add(actorRef)// send current state back as reference pointactorRef ! CurrentState(self, currentState.stateName)case Listen(actorRef) ⇒// TODO Use context.watch(actor) and receive Terminated(actor) to clean up list
      listeners.add(actorRef)// send current state back as reference pointactorRef ! CurrentState(self, currentState.stateName)case UnsubscribeTransitionCallBack(actorRef) ⇒listeners.remove(actorRef)case Deafen(actorRef) ⇒listeners.remove(actorRef)case value ⇒ {if (timeoutFuture.isDefined) {timeoutFuture.get.cancel()timeoutFuture = None}generation += 1processMsg(value, sender())}

除timer,subscription等特殊功能外,case value => ... 就是处理自定义消息的地方了。我们看到FSM是用processMsg(value, sender())来处理消息的。processMsg又调用了processEvent:

  private def processMsg(value: Any, source: AnyRef): Unit = {val event = Event(value, currentState.stateData)processEvent(event, source)}private[akka] def processEvent(event: Event, source: AnyRef): Unit = {val stateFunc = stateFunctions(currentState.stateName)val nextState = if (stateFunc isDefinedAt event) {stateFunc(event)} else {// handleEventDefault ensures that this is always definedhandleEvent(event)}applyState(nextState)}

在processEvent里的stateFunction是个Map,以stateName为主键存放StateFunction:

 /** State definitions*/private val stateFunctions = mutable.Map[S, StateFunction]()

而StateFuction是:

 type StateFunction = scala.PartialFunction[Event, State]

FSM的receive函数在收到消息后把消息包嵌入新构建的Event然后在processEvent里通过stateName取出相应的StateFunction后传入Event产生新的状态State。用户提供的StateFunction是通过FSM的when函数压进stateFunction Map里的:

  /*** Insert a new StateFunction at the end of the processing chain for the* given state. If the stateTimeout parameter is set, entering this state* without a differing explicit timeout setting will trigger a StateTimeout* event; the same is true when using #stay.** @param stateName designator for the state* @param stateTimeout default state timeout for this state* @param stateFunction partial function describing response to input*/final def when(stateName: S, stateTimeout: FiniteDuration = null)(stateFunction: StateFunction): Unit =register(stateName, stateFunction, Option(stateTimeout))private def register(name: S, function: StateFunction, timeout: Timeout): Unit = {if (stateFunctions contains name) {stateFunctions(name) = stateFunctions(name) orElse functionstateTimeouts(name) = timeout orElse stateTimeouts(name)} else {stateFunctions(name) = functionstateTimeouts(name) = timeout}}

我们看到when调用了register在stateFunction Map中按stateName放置StateFunction。FSM的这个stateFunction Map解决了become/unbecome产生的堆栈问题。FSM有个比较规范的结构,拿上面例子的FeelingSeasons结构做个示范:

class FillSeasons extends FSM[Seasons,SeasonInfo] with ActorLogging {import FillSeasons._startWith(Spring,SeasonInfo(0,1))  //起始状态when(Spring) {   //状态在春季case Event(HowYouFeel,seasonInfo) => ...}when(Summer) {  //夏季状态case Event(HowYouFeel,_) =>}when(Fall) {  //秋季状态case Event(HowYouFeel,SeasonInfo(tks,mnth)) =>}when(Winter) {  //冬季状态case Event(HowYouFeel,si@ SeasonInfo(tks,_)) =>}whenUnhandled {  //所有状态未处理的Eventcase Event(NextMonth,seasonInfo) =>}onTransition {case Spring -> Summer => log.info("Season changed from Spring to Summer month 1")case Summer -> Fall => log.info("Season changed from Summer to Fall month 1")case Fall -> Winter => log.info("Season changed from Fall to Winter month 1")case Winter -> Spring => log.info("Season changed from Winter to Spring month 1")}initialize()  //设定起始状态}

基本上是按照各状态定义事件处理函数StateFunction的。也可以包括状态转换处理函数TransitionHandler:

  type TransitionHandler = PartialFunction[(S, S), Unit]

最后,initialize()确定起始状态是否安排正确:

  /*** Verify existence of initial state and setup timers. This should be the* last call within the constructor, or [[akka.actor.Actor#preStart]] and* [[akka.actor.Actor#postRestart]]** An initial `currentState -> currentState` notification will be triggered by calling this method.** @see [[#startWith]]*/final def initialize(): Unit =if (currentState != null) makeTransition(currentState)else throw new IllegalStateException("You must call `startWith` before calling `initialize`")

完整的FSM FeelingSeasons定义如下:

class FillSeasons extends FSM[Seasons,SeasonInfo] with ActorLogging {import FillSeasons._startWith(Spring,SeasonInfo(0,1))  //起始状态when(Spring) {   //状态在春季case Event(HowYouFeel,seasonInfo) =>val numtalks = seasonInfo.talks + 1log.info(s"It's ${stateName.toString}, feel so gooood! You've asked me ${numtalks}times.")stay using seasonInfo.copy(talks = numtalks)}when(Summer) {  //夏季状态case Event(HowYouFeel,_) =>val numtalks = stateData.talks + 1log.info(s"It's ${stateName.toString}, it's so hot! You've asked me ${numtalks}times")stay().using(stateData.copy(talks = numtalks))}when(Fall) {  //秋季状态case Event(HowYouFeel,SeasonInfo(tks,mnth)) =>val numtalks = tks + 1log.info(s"It's ${stateName.toString}, it's no so bad. You've asked me ${numtalks}times.")stay using SeasonInfo(numtalks,mnth)}when(Winter) {  //冬季状态case Event(HowYouFeel,si@ SeasonInfo(tks,_)) =>val numtalks = tks + 1log.info(s"It's ${stateName.toString}, it's freezing cold! You've asked me ${numtalks}times.")stay using si.copy(talks = numtalks)}whenUnhandled {  //所有状态未处理的Eventcase Event(NextMonth,seasonInfo) =>val mth = seasonInfo.monthif (mth <= 3) {log.info(s"It's month ${mth+1} of ${stateName.toString}")stay using seasonInfo.copy(month = mth + 1)}else {goto(nextSeason(stateName)) using SeasonInfo(0,1)}}onTransition {case Spring -> Summer => log.info(s"Season changed from Spring to Summer month ${nextStateData.month}")case Summer -> Fall => log.info(s"Season changed from Summer to Fall month ${nextStateData.month}")case Fall -> Winter => log.info(s"Season changed from Fall to Winter month ${nextStateData.month}")case Winter -> Spring => log.info(s"Season changed from Winter to Spring month ${nextStateData.month}")}initialize()  //设定起始状态log.info(s"It's month 1 of ${stateName.toString}")//季节转换顺序def nextSeason(season: Seasons): Seasons =season match {case Spring => Summercase Summer => Fallcase Fall => Wintercase Winter => Spring}
}

首先注意StateFunction中SeasonInfo的各种意思同等的表达方式及nextStateData。FSM状态数据用不可变对象(immutable object)最安全,所以在更新时必须用case class 的copy或直接构建新的SeasonInfo实例。

我们再来看看processEvent的作业流程:

 private[akka] def processEvent(event: Event, source: AnyRef): Unit = {val stateFunc = stateFunctions(currentState.stateName)val nextState = if (stateFunc isDefinedAt event) {stateFunc(event)} else {// handleEventDefault ensures that this is always definedhandleEvent(event)}applyState(nextState)}

先运算用户定义的StateFunction处理事件Event获取新的状态State。然后调用applyState运算makeTransition处理状态转换(currentState = nextState):

  private[akka] def applyState(nextState: State): Unit = {nextState.stopReason match {case None ⇒ makeTransition(nextState)case _ ⇒nextState.replies.reverse foreach { r ⇒ sender() ! r }terminate(nextState)context.stop(self)}}private[akka] def makeTransition(nextState: State): Unit = {if (!stateFunctions.contains(nextState.stateName)) {terminate(stay withStopReason Failure("Next state %s does not exist".format(nextState.stateName)))} else {nextState.replies.reverse foreach { r ⇒ sender() ! r }if (currentState.stateName != nextState.stateName || nextState.notifies) {this.nextState = nextStatehandleTransition(currentState.stateName, nextState.stateName)gossip(Transition(self, currentState.stateName, nextState.stateName))this.nextState = null}currentState = nextStatedef scheduleTimeout(d: FiniteDuration): Some[Cancellable] = {import context.dispatcherSome(context.system.scheduler.scheduleOnce(d, self, TimeoutMarker(generation)))}currentState.timeout match {case SomeMaxFiniteDuration                    ⇒ // effectively disable stateTimeoutcase Some(d: FiniteDuration) if d.length >= 0 ⇒ timeoutFuture = scheduleTimeout(d)case _ ⇒val timeout = stateTimeouts(currentState.stateName)if (timeout.isDefined) timeoutFuture = scheduleTimeout(timeout.get)}}}

我们用FSM DSL的stay, goto,using来取得新的FSM状态和数据:

 /*** Produce transition to other state.* Return this from a state function in order to effect the transition.** This method always triggers transition events, even for `A -> A` transitions.* If you want to stay in the same state without triggering an state transition event use [[#stay]] instead.** @param nextStateName state designator for the next state* @return state transition descriptor*/final def goto(nextStateName: S): State = FSM.State(nextStateName, currentState.stateData)/*** Produce "empty" transition descriptor.* Return this from a state function when no state change is to be effected.** No transition event will be triggered by [[#stay]].* If you want to trigger an event like `S -> S` for `onTransition` to handle use `goto` instead.** @return descriptor for staying in current state*/final def stay(): State = goto(currentState.stateName).withNotification(false) // cannot directly use currentState because of the timeout field

stay,goto返回结果都是State[S,D]类型。using是State类型的一个方法:

   /*** Modify state transition descriptor with new state data. The data will be* set when transitioning to the new state.*/def using(@deprecatedName('nextStateDate) nextStateData: D): State[S, D] = {copy(stateData = nextStateData)}

我们看到using的主要作用是把当前状态数据替换成新状态的数据。

Akka的FSM是一个功能强大的Actor类型,所以配备了一套完整的DSL来方便FSM编程。FSM的DSL语句包括:

  final def startWith(stateName: S, stateData: D, timeout: Timeout = None): Unit =currentState = FSM.State(stateName, stateData, timeout)final def goto(nextStateName: S): State = FSM.State(nextStateName, currentState.stateData)final def stay(): State = goto(currentState.stateName).withNotification(false) final def stop(): State = stop(Normal)final def transform(func: StateFunction): TransformHelper = new TransformHelper(func)...

State[S,D]也提供了一些比较实用的方法函数:

case class State[S, D](stateName: S, stateData: D, timeout: Option[FiniteDuration] = None, stopReason: Option[Reason] = None, replies: List[Any] = Nil) {...// defined here to be able to override it in SilentStatedef copy(stateName: S = stateName, stateData: D = stateData, timeout: Option[FiniteDuration] = timeout, stopReason: Option[Reason] = stopReason, replies: List[Any] = replies): State[S, D] = {new State(stateName, stateData, timeout, stopReason, replies)}/*** Modify state transition descriptor to include a state timeout for the* next state. This timeout overrides any default timeout set for the next* state.** Use Duration.Inf to deactivate an existing timeout.*/def forMax(timeout: Duration): State[S, D] = timeout match {case f: FiniteDuration ⇒ copy(timeout = Some(f))case Duration.Inf      ⇒ copy(timeout = SomeMaxFiniteDuration) // we map the Infinite duration to a special marker,case _                 ⇒ copy(timeout = None) // that means "cancel stateTimeout". This marker is needed} // so we do not have to break source/binary compat.// TODO: Can be removed once we can break State#timeout signature to `Option[Duration]`/*** Send reply to sender of the current message, if available.** @return this state transition descriptor*/def replying(replyValue: Any): State[S, D] = {copy(replies = replyValue :: replies)}/*** Modify state transition descriptor with new state data. The data will be* set when transitioning to the new state.*/def using(@deprecatedName('nextStateDate) nextStateData: D): State[S, D] = {copy(stateData = nextStateData)}...}

FSM DSL中的transform是这样定义的:

  final class TransformHelper(func: StateFunction) {def using(andThen: PartialFunction[State, State]): StateFunction =func andThen (andThen orElse { case x ⇒ x })}final def transform(func: StateFunction): TransformHelper = new TransformHelper(func)

我们看到TransformHelper用using对入参func:StateFunction施用用户提供的andThen: PartialFunction[State,State]后返回新的状态State。这个using与State.using是不同的。下面是一个transform用法例子:

when(Running) {transform {case Event(m, Target(Actor.noSender)) =>goto(Uninitialised) using NoConfigcase Event(m, Target(ref)) =>ref ! mstay} using targetTransformer}def targetTransformer: PartialFunction[State, State] = {case s @ State(stateName, Target(ref), _, _, _) if ref.path.name.startsWith("testActor") =>log.debug("Setting target to dead letters")s.using(Target(Actor.noSender))}

transform{...}产生的State传给了targetTransformer然后经过模式匹配拆分出properties后用s.using更新stateData。

与become/unbecome相同,我们也可以在FSM里使用stashing。下面是一个用例:

when(Uninitialised) {case Event(Config(ref), _) =>goto(Running) using Target(ref)case Event(_, _) =>stashstay}when(Running) {case Event(m, Target(ref)) =>ref ! mstay}onTransition {case Uninitialised -> Running => unstashAll()}

当然,还有如stop,setTimer,replying,forMax,onTermination等方法和函数,这里就不一一详述了,有兴趣可以直接查询Akka/actor/FSM.scala。

下面是本次讨论的示范源码:

import akka.actor._sealed trait Seasons   //States
case object Spring extends Seasons
case object Summer extends Seasons
case object Fall extends Seasons
case object Winter extends Seasons//sealed trait SeasonData  //Data
case class SeasonInfo(talks: Int, month: Int)object FillSeasons {sealed trait Messages    //功能消息case object HowYouFeel extends Messagescase object NextMonth extends Messagesdef props = Props(new FillSeasons)
}class FillSeasons extends FSM[Seasons,SeasonInfo] with ActorLogging {import FillSeasons._startWith(Spring,SeasonInfo(0,1))  //起始状态when(Spring) {   //状态在春季case Event(HowYouFeel,seasonInfo) =>val numtalks = seasonInfo.talks + 1log.info(s"It's ${stateName.toString}, feel so gooood! You've asked me ${numtalks} times.")stay using seasonInfo.copy(talks = numtalks)}when(Summer) {  //夏季状态case Event(HowYouFeel,_) =>val numtalks = stateData.talks + 1log.info(s"It's ${stateName.toString}, it's so hot! You've asked me ${numtalks} times")stay().using(stateData.copy(talks = numtalks))}when(Fall) {  //秋季状态case Event(HowYouFeel,SeasonInfo(tks,mnth)) =>val numtalks = tks + 1log.info(s"It's ${stateName.toString}, it's no so bad. You've asked me ${numtalks} times.")stay using SeasonInfo(numtalks,mnth)}when(Winter) {  //冬季状态case Event(HowYouFeel,si@ SeasonInfo(tks,_)) =>val numtalks = tks + 1log.info(s"It's ${stateName.toString}, it's freezing cold! You've asked me ${numtalks} times.")stay using si.copy(talks = numtalks)}whenUnhandled {  //所有状态未处理的Eventcase Event(NextMonth,seasonInfo) =>val mth = seasonInfo.monthif (mth <= 3) {log.info(s"It's month ${mth+1} of ${stateName.toString}")stay using seasonInfo.copy(month = mth + 1)}else {goto(nextSeason(stateName)) using SeasonInfo(0,1)}}onTransition {case Spring -> Summer => log.info(s"Season changed from Spring to Summer month ${nextStateData.month}")case Summer -> Fall => log.info(s"Season changed from Summer to Fall month ${nextStateData.month}")case Fall -> Winter => log.info(s"Season changed from Fall to Winter month ${nextStateData.month}")case Winter -> Spring => log.info(s"Season changed from Winter to Spring month ${nextStateData.month}")}initialize()  //设定起始状态log.info(s"It's month 1 of ${stateName.toString}")//季节转换顺序def nextSeason(season: Seasons): Seasons =season match {case Spring => Summercase Summer => Fallcase Fall => Wintercase Winter => Spring}
}object FSMDemo extends App {import scala.util.Randomval fsmSystem = ActorSystem("fsmSystem")val fsmActor = fsmSystem.actorOf(FillSeasons.props,"fsmActor")(1 to 15).foreach { _ =>(1 to Random.nextInt(3)).foreach{ _ =>fsmActor ! FillSeasons.HowYouFeel}fsmActor ! FillSeasons.NextMonth}scala.io.StdIn.readLine()fsmSystem.terminate()
}

转载于:https://www.cnblogs.com/tiger-xc/p/6999548.html

Akka(7): FSM:通过状态变化来转换运算行为相关推荐

  1. java二进制计算_Java 二进制,八进制,十进制,十六进制转换运算

    Java 二进制,八进制,十进制,十六进制转换运算 Java进制转换方法 十进制转成十六进制: Integer.toHexString(inti) 十进制转成八进制 Integer.toOctalSt ...

  2. cstart做int型转换运算Java,Java实验练习题目-供练习参考

    { message.setText("请输入数字字符"); } } textResult.requestFocus(); validate(); } } MainClass.jav ...

  3. html拖放数据库字段,HTML5 拖放(Drag 和 Drop)

    拖放是一种常见的特性,即抓取对象以后拖到另一个位置. 在 HTML5 中,拖放是标准的一部分,任何元素都能够拖放. #div1 {width:350px;height:70px;padding:10p ...

  4. Akka FSM 源代码分析

    Akka FSM 源代码分析 萧猛 <simonxiao@qq.com> 啰嗦几句 有限状态机本身不是啥新奇东西,在GoF的设计模式一书中就有状态模式, 也给出了实现的建议.各种语言对状态 ...

  5. Akka(6): become/unbecome:运算行为切换

    通过一段时间的学习了解,加深了一些对Akka的认识,特别是对于Akka在实际编程中的用途方面.我的想法,或者我希望利用Akka来达到的目的是这样的:作为传统方式编程的老兵,我们已经习惯了直线流程方式一 ...

  6. 【akka】akka源码 Akka源码分析-FSM

    1.概述 转载自己学习,建议直接看原文:Akka源码分析-FSM akka还有一个不常使用.但我觉得比较方便的一个模块,那就是FSM(有限状态机).我们知道了akka中Actor模型的具体实现之后,就 ...

  7. Akka源码分析-FSM

    akka还有一个不常使用.但我觉得比较方便的一个模块,那就是FSM(有限状态机).我们知道了akka中Actor模型的具体实现之后,就会发现,Akka的actor可以非常方便的实现FSM.其实在akk ...

  8. Akka(16): 持久化模式:PersistentFSM-可以自动修复的状态机器

    前面我们讨论过FSM,一种专门为维护内部状态而设计的Actor,它的特点是一套特殊的DSL能很方便地进行状态转换.FSM的状态转换模式特别适合对应现实中的业务流程,因为它那套DSL可以更形象的描述业务 ...

  9. akka typed mysql_akka-typed(2) - typed-actor交流方式和交流协议

    akka系统是一个分布式的消息驱动系统.akka应用由一群负责不同运算工作的actor组成,每个actor都是被动等待外界的某种消息来驱动自己的作业.所以,通俗点描述:akka应用就是一群actor相 ...

最新文章

  1. centos7+ansible自动化工具使用
  2. [力扣] 二叉树的层序遍历
  3. linux c socket ip地址 字符串 数字 转换 inet_addr inet_ntoa
  4. 第二百一十一天 how can i 坚持
  5. SAPCAR 压缩解压软件的使用方法
  6. MySQL5.6解压版详细安装教程(附安装配置、MySQL数据库设置root管理员密码,MySQL字符集设置问题及解决办法)
  7. 我被认定为高层次人才了!
  8. Java并发编程之Semaphore信号量
  9. (qsf文件 、 tcl文件 和 csv(txt)文件的区别) FPGA管脚分配文件保存、导入导出方法...
  10. linux 进程 cpu 100,清理linux中占用CPU 100%的病毒
  11. 使用jQuery高效制作网页特效 第六章习题
  12. 固态硬盘测试软件有哪些,常用的固态硬盘测试软件有哪几种
  13. 【网络安全】DRIDEX木马巧用VEH混淆API调用流程
  14. 皮卡丘是如何发电的?
  15. 什么是 SQL 注入速查表
  16. 强化学习 - Deep RL开源项目总结
  17. 虚拟化与元宇宙:人类文明演化的奇点与治理
  18. 杨澜:25岁后女孩子必需要懂14件事
  19. 一起学英语第三期,things change,people change
  20. Android集成QQ登录

热门文章

  1. 用计算机怎样打印,电脑怎么使用虚拟打印机?
  2. 我在百度Python小白逆袭大神课程中“打怪”欢乐之旅
  3. 世间什么才是最珍贵的
  4. 淘宝的商品json文件, 保存电脑是用json后缀保存,文件名要跟代码引用文件名一样
  5. 用友u8多出现未记账凭证 以及无法通过总账工具导入凭证,未出现错误提示
  6. win10永久关闭实时保护
  7. 电感纹波电流最大时占空比是多少?
  8. optitrack学习(4):动补系统marker点设置与刚体建立
  9. 转一位好友的《中国软件业的断想》
  10. Revit Navisworks 二次开发—获取材质贴图