


 /*** This captures all of the managed state of the [[akka.actor.FSM]]: the state* name, the state data, possibly custom timeout, stop reason and replies* accumulated while processing the last message.*/case class State[S, D](stateName: S, stateData: D, timeout: Option[FiniteDuration] = None, stopReason: Option[Reason] = None, replies: List[Any] = Nil)


  /*** All messages sent to the [[akka.actor.FSM]] will be wrapped inside an* `Event`, which allows pattern matching to extract both state and data.*/final case class Event[D](event: Any, stateData: D) extends NoSerializationVerificationNeeded


type StateFunction = scala.PartialFunction[Event, State]


* A fsm hakker is an awesome dude or dudette who either thinks about hacking or has to eat ;-)
class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[FSMHakkerState, TakenChopsticks] {//All hakkers start waitingstartWith(Waiting, TakenChopsticks(None, None))when(Waiting) {case Event(Think, _) =>println("%s starts to think".format(name))startThinking(5.seconds)}//When a hakker is thinking it can become hungry//and try to pick up its chopsticks and eatwhen(Thinking) {case Event(StateTimeout, _) =>left ! Takeright ! Takegoto(Hungry)}// When a hakker is hungry it tries to pick up its chopsticks and eat// When it picks one up, it goes into wait for the other// If the hakkers first attempt at grabbing a chopstick fails,// it starts to wait for the response of the other grabwhen(Hungry) {case Event(Taken(`left`), _) =>goto(WaitForOtherChopstick) using TakenChopsticks(Some(left), None)case Event(Taken(`right`), _) =>goto(WaitForOtherChopstick) using TakenChopsticks(None, Some(right))case Event(Busy(_), _) =>goto(FirstChopstickDenied)}// When a hakker is waiting for the last chopstick it can either obtain it// and start eating, or the other chopstick was busy, and the hakker goes// back to think about how he should obtain his chopsticks :-)when(WaitForOtherChopstick) {case Event(Taken(`left`), TakenChopsticks(None, Some(right))) => startEating(left, right)case Event(Taken(`right`), TakenChopsticks(Some(left), None)) => startEating(left, right)case Event(Busy(chopstick), TakenChopsticks(leftOption, rightOption)) =>leftOption.foreach(_ ! Put)rightOption.foreach(_ ! Put)startThinking(10.milliseconds)}private def startEating(left: ActorRef, right: ActorRef): State = {println("%s has picked up %s and %s and starts to eat".format(name, left.path.name, right.path.name))goto(Eating) using TakenChopsticks(Some(left), Some(right)) forMax (5.seconds)}// When the results of the other grab comes back,// he needs to put it back if he got the other one.// Then go back and think and try to grab the chopsticks againwhen(FirstChopstickDenied) {case Event(Taken(secondChopstick), _) =>secondChopstick ! PutstartThinking(10.milliseconds)case Event(Busy(chopstick), _) =>startThinking(10.milliseconds)}// When a hakker is eating, he can decide to start to think,// then he puts down his chopsticks and starts to thinkwhen(Eating) {case Event(StateTimeout, _) =>println("%s puts down his chopsticks and starts to think".format(name))left ! Putright ! PutstartThinking(5.seconds)}// Initialize the hakkerinitialize()private def startThinking(duration: FiniteDuration): State = {goto(Thinking) using TakenChopsticks(None, None) forMax duration}


  /*** Set initial state. Call this method from the constructor before the [[#initialize]] method.* If different state is needed after a restart this method, followed by [[#initialize]], can* be used in the actor life cycle hooks [[akka.actor.Actor#preStart]] and [[akka.actor.Actor#postRestart]].** @param stateName initial state designator* @param stateData initial state data* @param timeout state timeout for the initial state, overriding the default timeout for that state*/final def startWith(stateName: S, stateData: D, timeout: Timeout = None): Unit =currentState = FSM.State(stateName, stateData, timeout)

private var currentState: State = _


/*** Insert a new StateFunction at the end of the processing chain for the* given state. If the stateTimeout parameter is set, entering this state* without a differing explicit timeout setting will trigger a StateTimeout* event; the same is true when using #stay.** @param stateName designator for the state* @param stateTimeout default state timeout for this state* @param stateFunction partial function describing response to input*/final def when(stateName: S, stateTimeout: FiniteDuration = null)(stateFunction: StateFunction): Unit =register(stateName, stateFunction, Option(stateTimeout))

private def register(name: S, function: StateFunction, timeout: Timeout): Unit = {if (stateFunctions contains name) {stateFunctions(name) = stateFunctions(name) orElse functionstateTimeouts(name) = timeout orElse stateTimeouts(name)} else {stateFunctions(name) = functionstateTimeouts(name) = timeout}}

/** State definitions*/private val stateFunctions = mutable.Map[S, StateFunction]()private val stateTimeouts = mutable.Map[S, Timeout]()


/*** Produce transition to other state.* Return this from a state function in order to effect the transition.** This method always triggers transition events, even for `A -> A` transitions.* If you want to stay in the same state without triggering an state transition event use [[#stay]] instead.** @param nextStateName state designator for the next state* @return state transition descriptor*/final def goto(nextStateName: S): State = FSM.State(nextStateName, currentState.stateData)


/*** Produce "empty" transition descriptor.* Return this from a state function when no state change is to be effected.** No transition event will be triggered by [[#stay]].* If you want to trigger an event like `S -> S` for `onTransition` to handle use `goto` instead.** @return descriptor for staying in current state*/final def stay(): State = goto(currentState.stateName).withNotification(false)


 private[akka] def withNotification(notifies: Boolean): State[S, D] = {if (notifies)State(stateName, stateData, timeout, stopReason, replies)elsenew SilentState(stateName, stateData, timeout, stopReason, replies)}}


private[akka] class SilentState[S, D](_stateName: S, _stateData: D, _timeout: Option[FiniteDuration], _stopReason: Option[Reason], _replies: List[Any])extends State[S, D](_stateName, _stateData, _timeout, _stopReason, _replies) {/*** INTERNAL API*/private[akka] override def notifies: Boolean = falseoverride def copy(stateName: S = stateName, stateData: D = stateData, timeout: Option[FiniteDuration] = timeout, stopReason: Option[Reason] = stopReason, replies: List[Any] = replies): State[S, D] = {new SilentState(stateName, stateData, timeout, stopReason, replies)}}



override def receive: Receive = {case TimeoutMarker(gen) ⇒if (generation == gen) {processMsg(StateTimeout, "state timeout")}case t @ Timer(name, msg, repeat, gen, owner) ⇒if ((owner eq this) && (timers contains name) && (timers(name).generation == gen)) {if (timeoutFuture.isDefined) {timeoutFuture.get.cancel()timeoutFuture = None}generation += 1if (!repeat) {timers -= name}processMsg(msg, t)}case SubscribeTransitionCallBack(actorRef) ⇒// TODO Use context.watch(actor) and receive Terminated(actor) to clean up listlisteners.add(actorRef)// send current state back as reference pointactorRef ! CurrentState(self, currentState.stateName)case Listen(actorRef) ⇒// TODO Use context.watch(actor) and receive Terminated(actor) to clean up listlisteners.add(actorRef)// send current state back as reference pointactorRef ! CurrentState(self, currentState.stateName)case UnsubscribeTransitionCallBack(actorRef) ⇒listeners.remove(actorRef)case Deafen(actorRef) ⇒listeners.remove(actorRef)case value ⇒ {if (timeoutFuture.isDefined) {timeoutFuture.get.cancel()timeoutFuture = None}generation += 1processMsg(value, sender())}}

  我们只关注最后一段代码processMsg(value, sender())。

private def processMsg(value: Any, source: AnyRef): Unit = {val event = Event(value, currentState.stateData)processEvent(event, source)}private[akka] def processEvent(event: Event, source: AnyRef): Unit = {val stateFunc = stateFunctions(currentState.stateName)val nextState = if (stateFunc isDefinedAt event) {stateFunc(event)} else {// handleEventDefault ensures that this is always definedhandleEvent(event)}applyState(nextState)}private[akka] def applyState(nextState: State): Unit = {nextState.stopReason match {case None ⇒ makeTransition(nextState)case _ ⇒nextState.replies.reverse foreach { r ⇒ sender() ! r }terminate(nextState)context.stop(self)}}private[akka] def makeTransition(nextState: State): Unit = {if (!stateFunctions.contains(nextState.stateName)) {terminate(stay withStopReason Failure("Next state %s does not exist".format(nextState.stateName)))} else {nextState.replies.reverse foreach { r ⇒ sender() ! r }if (currentState.stateName != nextState.stateName || nextState.notifies) {this.nextState = nextStatehandleTransition(currentState.stateName, nextState.stateName)gossip(Transition(self, currentState.stateName, nextState.stateName))this.nextState = null}currentState = nextStatedef scheduleTimeout(d: FiniteDuration): Some[Cancellable] = {import context.dispatcherSome(context.system.scheduler.scheduleOnce(d, self, TimeoutMarker(generation)))}currentState.timeout match {case SomeMaxFiniteDuration                    ⇒ // effectively disable stateTimeoutcase Some(d: FiniteDuration) if d.length >= 0 ⇒ timeoutFuture = scheduleTimeout(d)case _ ⇒val timeout = stateTimeouts(currentState.stateName)if (timeout.isDefined) timeoutFuture = scheduleTimeout(timeout.get)}}}



  其实分析道这里,FSM就没必要再深入研究了,非常简单。就是定义了一些DSL,用trait Actor的receive函数进行行为函数的调用和状态转移,如果你的状态不多,转移条件简单,完全没必要用FSM,自己用beconme/unbecome实现就行了。而且官方的FSM是用一个函数调用列表(或者说是一个函数指针,C++里面经常这样实现)实现的,并没有用beconme/unbecome,这一点倒让我挺意外的。当然了,如果你的场景确实是FSM,那最好还是用官方的实现,毕竟它已经把FSM可能遇到的问题都帮你处理好了。




