前面我们讨论过FSM,一种专门为维护内部状态而设计的Actor,它的特点是一套特殊的DSL能很方便地进行状态转换。FSM的状态转换模式特别适合对应现实中的业务流程,因为它那套DSL可以更形象的描述业务功能。为了实现FSM的可用性,就必须为FSM再增加自我修复能力,PersistentFSM是FSM和PersistentActor的合并,是在状态机器模式的基础上再增加了状态转变事件的持久化,从而实现内部状态的自我修复功能的。在FSM结构基础上,PersistentFSM又增加了领域事件(domain-event)这一元素,也就是事件来源(event-sourcing)模式里持久化的目标。PersistentFSM trait是如下定义的:

/*** A FSM implementation with persistent state.** Supports the usual [[akka.actor.FSM]] functionality with additional persistence features.* `PersistentFSM` is identified by 'persistenceId' value.* State changes are persisted atomically together with domain events, which means that either both succeed or both fail,* i.e. a state transition event will not be stored if persistence of an event related to that change fails.* Persistence execution order is: persist -> wait for ack -> apply state.* Incoming messages are deferred until the state is applied.* State Data is constructed based on domain events, according to user's implementation of applyEvent function.**/
trait PersistentFSM[S <: FSMState, D, E] extends PersistentActor with PersistentFSMBase[S, D, E] with ActorLogging {...}

我们看到:PersistentFSM继承了PersistentActor,代表它具备了事件来源模式中的事件持久化和日志恢复能力。继承的另一个类型PersistentFSMBase是FSM trait的重新定义,针对状态机器增加的持久化特性设计了一套持久化状态转换的DSL。PersistentFSM trait的三个类参数S,D,E分别代表状态类型(State)、状态数据(Data)、领域事件(event)。与FSM比较:PersistentFSM除增加了event参数外,State类型是以FSMState类型为基础的,方便对State进行序列化(serialization):

 /*** FSM state and data snapshot** @param stateIdentifier FSM state identifier* @param data FSM state data* @param timeout FSM state timeout* @tparam D state data type*/@InternalApiprivate[persistence] case class PersistentFSMSnapshot[D](stateIdentifier: String, data: D, timeout: Option[FiniteDuration]) extends Message/*** FSMState base trait, makes possible for simple default serialization by conversion to String*/trait FSMState {def identifier: String}

PersistentFSM程序结构与FSM相似:

class PersistentFSMActor extends PersistentFSM[StateType,DataType,EventType] {startWith(initState,initData)  //起始状态
when(stateA) {...}             //处理各种状态
  when(stateB) {...}whenUnhandled {...}            //处理共性状态
  onTransition {...}             //状态转变跟踪
}

从这个程序结构来看,日志恢复(recovery)receiveRecovery函数实现应该隐含在类型定义里面:

 /*** After recovery events are handled as in usual FSM actor*/override def receiveCommand: Receive = {super[PersistentFSMBase].receive}/*** Discover the latest recorded state*/override def receiveRecover: Receive = {case domainEventTag(event) ⇒ startWith(stateName, applyEvent(event, stateData))case StateChangeEvent(stateIdentifier, timeout) ⇒ startWith(statesMap(stateIdentifier), stateData, timeout)case SnapshotOffer(_, PersistentFSMSnapshot(stateIdentifier, data: D, timeout)) ⇒ startWith(statesMap(stateIdentifier), data, timeout)case RecoveryCompleted ⇒initialize()onRecoveryCompleted()}

注意initialize已经过时,不要再用,我们可以重写onRecoveryCompleted()来实现一些初始化工作。那么事件写入日志又放在哪里了呢:

  /*** Persist FSM State and FSM State Data*/override private[akka] def applyState(nextState: State): Unit = {var eventsToPersist: immutable.Seq[Any] = nextState.domainEvents.toList//Prevent StateChangeEvent persistence when staying in the same state, except when state defines a timeoutif (nextState.notifies || nextState.timeout.nonEmpty) {eventsToPersist = eventsToPersist :+ StateChangeEvent(nextState.stateName.identifier, nextState.timeout)}if (eventsToPersist.isEmpty) {//If there are no events to persist, just apply the state
      super.applyState(nextState)} else {//Persist the events and apply the new state after all event handlers were executedvar nextData: D = stateDatavar handlersExecutedCounter = 0def applyStateOnLastHandler() = {handlersExecutedCounter += 1if (handlersExecutedCounter == eventsToPersist.size) {super.applyState(nextState using nextData)currentStateTimeout = nextState.timeoutnextState.afterTransitionDo(stateData)}}persistAll[Any](eventsToPersist) {case domainEventTag(event) ⇒nextData = applyEvent(event, nextData)applyStateOnLastHandler()case StateChangeEvent(stateIdentifier, timeout) ⇒applyStateOnLastHandler()}}}

注意这个内部函数applyState重写(override)了父辈PersistentFSMBase中的applyState:

  /** ********************************************       Main actor receive() method* ********************************************/override def receive: Receive = {case TimeoutMarker(gen) ⇒if (generation == gen) {processMsg(StateTimeout, "state timeout")}case t @ Timer(name, msg, repeat, gen) ⇒if ((timers contains name) && (timers(name).generation == gen)) {if (timeoutFuture.isDefined) {timeoutFuture.get.cancel()timeoutFuture = None}generation += 1if (!repeat) {timers -= name}processMsg(msg, t)}case SubscribeTransitionCallBack(actorRef) ⇒// TODO Use context.watch(actor) and receive Terminated(actor) to clean up list
      listeners.add(actorRef)// send current state back as reference pointactorRef ! CurrentState(self, currentState.stateName, currentState.timeout)case Listen(actorRef) ⇒// TODO Use context.watch(actor) and receive Terminated(actor) to clean up list
      listeners.add(actorRef)// send current state back as reference pointactorRef ! CurrentState(self, currentState.stateName, currentState.timeout)case UnsubscribeTransitionCallBack(actorRef) ⇒listeners.remove(actorRef)case Deafen(actorRef) ⇒listeners.remove(actorRef)case value ⇒if (timeoutFuture.isDefined) {timeoutFuture.get.cancel()timeoutFuture = None}generation += 1processMsg(value, sender())}private def processMsg(value: Any, source: AnyRef): Unit = {val event = Event(value, currentState.stateData)processEvent(event, source)}private[akka] def processEvent(event: Event, source: AnyRef): Unit = {val stateFunc = stateFunctions(currentState.stateName)val nextState = if (stateFunc isDefinedAt event) {stateFunc(event)} else {// handleEventDefault ensures that this is always definedhandleEvent(event)}applyState(nextState)}private[akka] def applyState(nextState: State): Unit = {nextState.stopReason match {case None ⇒ makeTransition(nextState)case _ ⇒nextState.replies.reverse foreach { r ⇒ sender() ! r }terminate(nextState)context.stop(self)}}

在PersistentFSM trait中的抽象函数receiveCommand在实现时直接调用了PersistentFSMBase中的receive:

 /*** After recovery events are handled as in usual FSM actor*/override def receiveCommand: Receive = {super[PersistentFSMBase].receive}

PersistentFSM还需要实现抽象函数applyEvent:

  /*** Override this handler to define the action on Domain Event** @param domainEvent domain event to apply* @param currentData state data of the previous state* @return updated state data*/def applyEvent(domainEvent: E, currentData: D): D

这个函数的主要功能是针对发生的事件进行当前状态数据的转换。另一个需要实现的抽象函数是domainEventClassTag。这是一个ClassTag[E]实例,用来解决泛型E的模式匹配问题(由scala语言类型擦拭type-erasure造成):

  /*** Enables to pass a ClassTag of a domain event base type from the implementing class** @return [[scala.reflect.ClassTag]] of domain event base type*/implicit def domainEventClassTag: ClassTag[E]/*** Domain event's [[scala.reflect.ClassTag]]* Used for identifying domain events during recovery*/val domainEventTag = domainEventClassTag
.../*** Discover the latest recorded state*/override def receiveRecover: Receive = {case domainEventTag(event) ⇒ startWith(stateName, applyEvent(event, stateData))
...persistAll[Any](eventsToPersist) {case domainEventTag(event) ⇒nextData = applyEvent(event, nextData)applyStateOnLastHandler()case StateChangeEvent(stateIdentifier, timeout) ⇒applyStateOnLastHandler()}

akka-persistentFSM官方文档中的例子挺有代表性,下面我就根据这个例子来进行示范。这是个电商购物车的例子。用PersistentFSM来实现最大的优点就是在任何情况下都可以保证购物车内容的一致性。而且可以自动保存电商用户所有的历史选购过程方便将来大数据分析-这已经是一种潮流了,甚至对中途暂时放弃了的购物车也可以在下次登陆时自动恢复。好了,我们先来研究一下这个例子:首先是数据结构:

import akka.persistence.fsm.PersistentFSM._object WebShopping {sealed trait UserState extends FSMState  //状态类型case object LookingAround extends UserState {  //浏览状态,可转Shopping状态override def identifier: String = "Looking Around"}case object Shopping extends UserState {  //拣选状态,可转到Paid状态或超时变Inactiveoverride def identifier: String = "Shopping"}case object Inactive extends UserState {  //停滞状态,可转回Shopping状态override def identifier: String = "Inactive"}case object Paid extends UserState {   //结账完成购物,只能查询购物结果,或退出override def identifier: String = "Paid"}case class Item(id: String, name: String, price: Float)//state datasealed trait ShoppingCart {  //true functional structure
    def addItem(item: Item): ShoppingCartdef removeItem(id: String): ShoppingCartdef empty(): ShoppingCart}case class LoadedCart(items: Seq[Item]) extends ShoppingCart {override def addItem(item: Item): ShoppingCart = LoadedCart(items :+ item)override def removeItem(id: String): ShoppingCart = {val newItems = items.filter {item => item.id != id}if (newItems.length > 0)LoadedCart(newItems)elseEmptyCart}override def empty() = EmptyCart}case object EmptyCart extends ShoppingCart {override def addItem(item: Item) = LoadedCart(item :: Nil)override def empty() = thisoverride def removeItem(id: String): ShoppingCart = this}}

UserState是FSM的当前状态。状态代表FSM的流程,每种状态运行它自己的业务流程:

  when(LookingAround) {...}             //处理各种状态
  when(Shopping) {...}when(Inactive) {...}when(Paid) {...}
...

ShoppingCart代表FSM当前状态的数据。每种状态都有可能具备不同的数据。注意ShoppingCart是典型的函数式数据结构:不可变结构,任何更新操作都返回新的结构。StateData ShoppingCart是在抽象函数applyEvent里更新的。再看看applyEvent的函数款式:

  /*** Override this handler to define the action on Domain Event** @param domainEvent domain event to apply* @param currentData state data of the previous state* @return updated state data*/def applyEvent(domainEvent: E, currentData: D): D

要求用户提供这个函数的实现:根据发生的事件及当前状态数据产生新的状态数据。applyEvent函数是如下调用的:

 override def receiveRecover: Receive = {case domainEventTag(event) ⇒ startWith(stateName, applyEvent(event, stateData))case StateChangeEvent(stateIdentifier, timeout) ⇒ startWith(statesMap(stateIdentifier), stateData, timeout)case SnapshotOffer(_, PersistentFSMSnapshot(stateIdentifier, data: D, timeout)) ⇒ startWith(statesMap(stateIdentifier), data, timeout)case RecoveryCompleted ⇒initialize()onRecoveryCompleted()}
.../*** Persist FSM State and FSM State Data*/override private[akka] def applyState(nextState: State): Unit = {var eventsToPersist: immutable.Seq[Any] = nextState.domainEvents.toList//Prevent StateChangeEvent persistence when staying in the same state, except when state defines a timeoutif (nextState.notifies || nextState.timeout.nonEmpty) {eventsToPersist = eventsToPersist :+ StateChangeEvent(nextState.stateName.identifier, nextState.timeout)}if (eventsToPersist.isEmpty) {//If there are no events to persist, just apply the state
      super.applyState(nextState)} else {//Persist the events and apply the new state after all event handlers were executedvar nextData: D = stateDatavar handlersExecutedCounter = 0def applyStateOnLastHandler() = {handlersExecutedCounter += 1if (handlersExecutedCounter == eventsToPersist.size) {super.applyState(nextState using nextData)currentStateTimeout = nextState.timeoutnextState.afterTransitionDo(stateData)}}persistAll[Any](eventsToPersist) {case domainEventTag(event) ⇒nextData = applyEvent(event, nextData)applyStateOnLastHandler()case StateChangeEvent(stateIdentifier, timeout) ⇒applyStateOnLastHandler()}}}

状态转换是通过stay, goto,stop实现的:

 /*** Produce transition to other state.* Return this from a state function in order to effect the transition.** This method always triggers transition events, even for `A -> A` transitions.* If you want to stay in the same state without triggering an state transition event use [[#stay]] instead.** @param nextStateName state designator for the next state* @return state transition descriptor*/final def goto(nextStateName: S): State = PersistentFSM.State(nextStateName, currentState.stateData)()/*** Produce "empty" transition descriptor.* Return this from a state function when no state change is to be effected.** No transition event will be triggered by [[#stay]].* If you want to trigger an event like `S -> S` for `onTransition` to handle use `goto` instead.** @return descriptor for staying in current state*/final def stay(): State = goto(currentState.stateName).withNotification(false) // cannot directly use currentState because of the timeout field/*** Produce change descriptor to stop this FSM actor with reason "Normal".*/final def stop(): State = stop(Normal)

状态数据转换是用applying实现的:

 /*** Specify domain events to be applied when transitioning to the new state.*/@varargs def applying(events: E*): State[S, D, E] = {copy(domainEvents = domainEvents ++ events)}/*** Register a handler to be triggered after the state has been persisted successfully*/def andThen(handler: D ⇒ Unit): State[S, D, E] = {copy(afterTransitionDo = handler)}

applying对State[S,D,E]类型进行操作,State[S,D,E]的定义如下:

 /*** This captures all of the managed state of the [[akka.actor.FSM]]: the state* name, the state data, possibly custom timeout, stop reason, replies* accumulated while processing the last message, possibly domain event and handler* to be executed after FSM moves to the new state (also triggered when staying in the same state)*/final case class State[S, D, E](stateName:         S,stateData:         D,timeout:           Option[FiniteDuration] = None,stopReason:        Option[Reason]         = None,replies:           List[Any]              = Nil,domainEvents:      Seq[E]                 = Nil,afterTransitionDo: D ⇒ Unit               = { _: D ⇒ })(private[akka] val notifies: Boolean = true) {/*** Copy object and update values if needed.*/@InternalApiprivate[akka] def copy(stateName: S = stateName, stateData: D = stateData, timeout: Option[FiniteDuration] = timeout, stopReason: Option[Reason] = stopReason, replies: List[Any] = replies, notifies: Boolean = notifies, domainEvents: Seq[E] = domainEvents, afterTransitionDo: D ⇒ Unit = afterTransitionDo): State[S, D, E] = {State(stateName, stateData, timeout, stopReason, replies, domainEvents, afterTransitionDo)(notifies)}

applying实际上是把发生事件存入一个清单domainEvents,然后在调用applyState函数时再施用:

 /*** Persist FSM State and FSM State Data*/override private[akka] def applyState(nextState: State): Unit = {var eventsToPersist: immutable.Seq[Any] = nextState.domainEvents.toList
...

PersistentFSM继承了PersistentActor事件来源(event-sourcing)模式。下面是command和event的类型定义:

  sealed trait Commandcase class AddItem(item: Item) extends Commandcase class RemoveItem(id: String) extends Commandcase object Buy extends Commandcase object Leave extends Commandcase object GetCart extends Commandsealed trait DomainEventcase class ItemAdded(item: Item) extends DomainEventcase class ItemRemoved(id: String) extends DomainEventcase object OrderClosed extends DomainEvent

我们知道:DomainEvent将会被写入日志,它与Command的关系是:运算某些Command时会产生DomainEvent,然后这些产生的DomainEvent会被写入日志。

我们开始设计这个PersistentFSM:

class WebShopping(webUserId: String) extends PersistentFSM[UserState,ShoppingCart,DomainEvent] {override def persistenceId: String = webUserIdoverride def domainEventClassTag: ClassTag[DomainEvent] = classTag[DomainEvent]override def applyEvent(domainEvent: DomainEvent, currentCart: ShoppingCart): ShoppingCart =domainEvent match {case ItemAdded(item) => currentCart.addItem(item)case ItemRemoved(id) => currentCart.removeItem(id)case OrderClosed => currentCart.empty()}
}

我们首先实现了trait中的抽象函数。其中persistenceId代表了当前购物者的userid。这样我们就可以把用户的购物过程写入日志。试想想这里面的意义:我们用一个独立的Actor来处理一个用户的购物过程。Actor对资源要求很低,但运算能力却高效强大,一个服务器上如果有足够内存就可以轻松负载几十万甚至百万级Actor实例,如果再使用akka-cluster的话不知不觉我们已经实现了可以容纳百万级用户的电商网站了。

好了,现在我们看看这个PersistentFSM的完整业务流程:

class WebShopping(webUserId: String) extends PersistentFSM[UserState,ShoppingCart,DomainEvent] {override def persistenceId: String = webUserIdoverride def domainEventClassTag: ClassTag[DomainEvent] = classTag[DomainEvent]override def applyEvent(event: DomainEvent, currentCart: ShoppingCart): ShoppingCart =event match {case ItemAdded(item) => currentCart.addItem(item)case ItemRemoved(id) => currentCart.removeItem(id)case OrderClosed => currentCart.empty()  //买单成功后清空ShoppingCart
     }startWith(LookingAround,EmptyCart)  //初次登陆购物状态
when(LookingAround) {   //浏览时可以加入购物车转到Shopping状态case Event(AddItem(item),currentCart) =>context.system.eventStream.publish(CurrentCart(s"LookingAround-Adding Item: $item",currentCart))goto(Shopping) applying ItemAdded(item) forMax(1 second)case Event(GetCart,currentCart) =>stay replying currentCart}when(Shopping) {case Event(AddItem(item),currentCart) =>context.system.eventStream.publish(CurrentCart(s"Shopping-Adding Item: $item",currentCart))stay applying ItemAdded(item) forMax (1 second) andThen {case cart @ _ =>context.system.eventStream.publish(CurrentCart(s"Shopping-after adding Item: $item",cart))}case Event(RemoveItem(id),currentCart) =>context.system.eventStream.publish(CurrentCart(s"Shopping-Removing Item: $id",currentCart))stay applying ItemRemoved(id) forMax (1 second) andThen {case cart @ _ =>context.system.eventStream.publish(CurrentCart(s"Shopping-after removing Item: $id",cart))}case Event(Buy,currentCart) =>context.system.eventStream.publish(CurrentCart(s"Shopping-Buying",currentCart))goto(Paid) applying OrderClosed forMax (1 second) andThen {case cart @ _ => saveStateSnapshot()context.system.eventStream.publish(CurrentCart(s"Shopping-after paid",cart))}case Event(Leave,currentCart) =>context.system.eventStream.publish(CurrentCart(s"Shopping-Leaving",currentCart))stop()case Event(StateTimeout,currentCart) =>context.system.eventStream.publish(CurrentCart(s"Shopping-Timeout",currentCart))goto(Inactive) forMax(1 second)case Event(GetCart,currentCart) =>stay replying currentCart}when(Inactive) {case Event(AddItem(item),currentCart) =>context.system.eventStream.publish(CurrentCart(s"Inactive-Adding Item: $item",currentCart))goto(Shopping) applying ItemAdded(item) forMax(1 second)case Event(StateTimeout,currentCart) =>context.system.eventStream.publish(CurrentCart(s"Inactive-Timeout",currentCart))stop()}when(Paid) {case Event(Leave,currentCart) =>context.system.eventStream.publish(CurrentCart(s"Paid-Leaving",currentCart))stop()case Event(GetCart,currentCart) =>stay replying currentCart}
}

我们看到通过FSM的DSL,PersistentActor和FSM的具体技术特征和细节被隐藏了,呈现给编程人员的是一段对业务流程的描述,这样可以使整段代码代表的功能更贴近现实应用,容易理解。

下面是有关数据快照、日志维护以及过程跟踪等方法的示范:

  whenUnhandled {case Event(SaveSnapshotSuccess(metadata),currentCart) =>context.system.eventStream.publish(CurrentCart("Successfully saved snapshot",currentCart))//假如不需要保存历史购物过程,可以清理日志和旧快照deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = metadata.sequenceNr - 1))deleteMessages(metadata.sequenceNr)stay()case Event(SaveSnapshotFailure(metadata, reason),currentCart) =>context.system.eventStream.publish(CurrentCart(s"Fail to save snapshot for $reason",currentCart))stay()case Event(DeleteMessagesSuccess(toSeq),currentCart) =>context.system.eventStream.publish(CurrentCart(s"Succefully deleted journal upto: $toSeq",currentCart))stay()case Event(DeleteMessagesFailure(cause,toSeq),currentCart) =>context.system.eventStream.publish(CurrentCart(s"Failed to delete journal upto: $toSeq because: $cause",currentCart))stay()case Event(DeleteSnapshotsSuccess(crit),currentCart) =>context.system.eventStream.publish(CurrentCart(s"Successfully deleted snapshots for $crit",currentCart))stay()case Event(DeleteSnapshotsFailure(crit,cause),currentCart) =>context.system.eventStream.publish(CurrentCart(s"Failed to delete snapshots $crit because: $cause",currentCart))stay()}onTransition {case LookingAround -> Shopping =>context.system.eventStream.publish(CurrentCart("LookingAround -> Shopping",stateData))case Shopping -> Inactive =>context.system.eventStream.publish(CurrentCart("Shopping -> Inactive",stateData))case Shopping -> Paid =>context.system.eventStream.publish(CurrentCart("Shopping -> Paid",stateData))case Inactive -> Shopping =>context.system.eventStream.publish(CurrentCart("Inactive -> Shopping",stateData))}override def onRecoveryCompleted(): Unit =context.system.eventStream.publish(CurrentCart("OnRecoveryCompleted",stateData))override def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit =context.system.eventStream.publish(CurrentCart(s"onPersistFailure ${cause.getMessage}",stateData))override def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit =context.system.eventStream.publish(CurrentCart(s"onPersistRejected ${cause.getMessage}",stateData))override def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit =context.system.eventStream.publish(CurrentCart(s"onRecoveryFailure ${cause.getMessage}",stateData))

下面是过程跟踪器的设计代码:

package persistentfsm.tracker
import akka.actor._
import persistentfsm.cart.WebShopping
object EventTracker {def props = Props(new EventTracker)
}
class EventTracker extends Actor {override def preStart(): Unit = {context.system.eventStream.subscribe(self,classOf[WebShopping.CurrentCart])super.preStart()}override def postStop(): Unit = {context.system.eventStream.unsubscribe(self)super.postStop()}override def receive: Receive = {case WebShopping.CurrentCart(loc,cart) =>println(loc)cart match {case WebShopping.EmptyCart => println("empty cart!")case WebShopping.LoadedCart(items) => println(s"Current content in cart: $items")}}}

用下面的代码来测试运行:

package persistentfsm.demo
import persistentfsm.cart._
import persistentfsm.tracker._
import akka.actor._
import WebShopping._object PersistentFSMDemo extends App {val pfSystem = ActorSystem("persistentfsm-system")val trackerActor = pfSystem.actorOf(EventTracker.props,"tracker")val cart123 = pfSystem.actorOf(WebShopping.props("123"))cart123 ! GetCartcart123 ! AddItem(Item("001","Cigar",12.50))cart123 ! AddItem(Item("002","Wine",18.30))cart123 ! AddItem(Item("003","Coffee",5.50))cart123 ! GetCartcart123 ! RemoveItem("001")cart123 ! Buycart123 ! GetCartcart123 ! AddItem(Item("004","Bread",3.25))cart123 ! AddItem(Item("005","Cake",5.25))scala.io.StdIn.readLine()pfSystem.terminate()}

重复运算可以得出:结账后选购的商品可以恢复。如果中途异常退出,购物车中已经选购的商品任然保留。

下面是本次示范的完整源代码:

build.sbt

name := "persistent-fsm"version := "1.0"scalaVersion := "2.11.9"sbtVersion := "0.13.5"libraryDependencies ++= Seq("com.typesafe.akka"           %% "akka-actor"       % "2.5.3","com.typesafe.akka"           %% "akka-persistence" % "2.5.3","ch.qos.logback" % "logback-classic" % "1.1.7","com.typesafe.akka" %% "akka-persistence-cassandra" % "0.54","com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.54" % Test
)

application.conf

akka {persistence {journal.plugin = "cassandra-journal"snapshot-store.plugin = "cassandra-snapshot-store"fsm {snapshot-after = 10}}
}
akka.actor.warn-about-java-serializer-usage = off

WebShopping.scala

package persistentfsm.cart
import WebShopping._
import akka.persistence.fsm._
import akka.persistence.fsm.PersistentFSM._
import akka.persistence._
import akka.actor._
import scala.concurrent.duration._
import scala.reflect._object WebShopping {sealed trait UserState extends FSMState  //状态类型case object LookingAround extends UserState {  //浏览状态,可转Shopping状态override def identifier: String = "Looking Around"}case object Shopping extends UserState {  //拣选状态,可转到Paid状态或超时变Inactiveoverride def identifier: String = "Shopping"}case object Inactive extends UserState {  //停滞状态,可转回Shopping状态override def identifier: String = "Inactive"}case object Paid extends UserState {   //结账完成购物,只能查询购物结果,或退出override def identifier: String = "Paid"}case class Item(id: String, name: String, price: Double)//state datasealed trait ShoppingCart {  //true functional structure
    def addItem(item: Item): ShoppingCartdef removeItem(id: String): ShoppingCartdef empty(): ShoppingCart}case class LoadedCart(items: Seq[Item]) extends ShoppingCart {override def addItem(item: Item): ShoppingCart = LoadedCart(items :+ item)override def removeItem(id: String): ShoppingCart = {val newItems = items.filter {item => item.id != id}if (newItems.length > 0)LoadedCart(newItems)elseEmptyCart}override def empty() = EmptyCart}case object EmptyCart extends ShoppingCart {override def addItem(item: Item) = LoadedCart(item :: Nil)override def empty() = thisoverride def removeItem(id: String): ShoppingCart = this}sealed trait Commandcase class AddItem(item: Item) extends Commandcase class RemoveItem(id: String) extends Commandcase object Buy extends Commandcase object Leave extends Commandcase object GetCart extends Commandsealed trait DomainEventcase class ItemAdded(item: Item) extends DomainEventcase class ItemRemoved(id: String) extends DomainEventcase object OrderClosed extends DomainEvent
//logging message typecase class CurrentCart(location: String, cart: ShoppingCart)def props(uid: String) = Props(new WebShopping(uid))}
class WebShopping(webUserId: String) extends PersistentFSM[UserState,ShoppingCart,DomainEvent] {override def persistenceId: String = webUserIdoverride def domainEventClassTag: ClassTag[DomainEvent] = classTag[DomainEvent]override def applyEvent(event: DomainEvent, currentCart: ShoppingCart): ShoppingCart =event match {case ItemAdded(item) => currentCart.addItem(item)case ItemRemoved(id) => currentCart.removeItem(id)case OrderClosed => currentCart.empty()  //买单成功后清空ShoppingCart
     }startWith(LookingAround,EmptyCart)  //初次登陆购物状态
when(LookingAround) {   //浏览时可以加入购物车转到Shopping状态case Event(AddItem(item),currentCart) =>context.system.eventStream.publish(CurrentCart(s"LookingAround-Adding Item: $item",currentCart))goto(Shopping) applying ItemAdded(item) forMax(1 second)case Event(GetCart,currentCart) =>context.system.eventStream.publish(CurrentCart(s"LookingAround-Showing",currentCart))stay replying currentCart}when(Shopping) {case Event(AddItem(item),currentCart) =>context.system.eventStream.publish(CurrentCart(s"Shopping-Adding Item: $item",currentCart))stay applying ItemAdded(item) forMax (1 second) andThen {case cart @ _ =>context.system.eventStream.publish(CurrentCart(s"Shopping-after adding Item: $item",cart))}case Event(RemoveItem(id),currentCart) =>context.system.eventStream.publish(CurrentCart(s"Shopping-Removing Item: $id",currentCart))stay applying ItemRemoved(id) forMax (1 second) andThen {case cart @ _ =>context.system.eventStream.publish(CurrentCart(s"Shopping-after removing Item: $id",cart))}case Event(Buy,currentCart) =>context.system.eventStream.publish(CurrentCart(s"Shopping-Buying",currentCart))goto(Paid) applying OrderClosed forMax (1 second) andThen {case cart @ _ => saveStateSnapshot()context.system.eventStream.publish(CurrentCart(s"Shopping-after paid",cart))}case Event(Leave,currentCart) =>context.system.eventStream.publish(CurrentCart(s"Shopping-Leaving",currentCart))stop()case Event(StateTimeout,currentCart) =>context.system.eventStream.publish(CurrentCart(s"Shopping-Timeout",currentCart))goto(Inactive) forMax(1 second)case Event(GetCart,currentCart) =>context.system.eventStream.publish(CurrentCart(s"LookingAround-Showing",currentCart))stay replying currentCart}when(Inactive) {case Event(AddItem(item),currentCart) =>context.system.eventStream.publish(CurrentCart(s"Inactive-Adding Item: $item",currentCart))goto(Shopping) applying ItemAdded(item) forMax(1 second)case Event(StateTimeout,currentCart) =>context.system.eventStream.publish(CurrentCart(s"Inactive-Timeout",currentCart))stop()}when(Paid) {case Event(Leave,currentCart) =>context.system.eventStream.publish(CurrentCart(s"Paid-Leaving",currentCart))stop()case Event(GetCart,currentCart) =>context.system.eventStream.publish(CurrentCart(s"Paid-Showing",currentCart))stay replying currentCartcase Event(AddItem(item),currentCart) =>context.system.eventStream.publish(CurrentCart(s"Paid-Adding Item: $item",currentCart))goto(Shopping) applying ItemAdded(item) forMax(1 second)}whenUnhandled {case Event(SaveSnapshotSuccess(metadata),currentCart) =>context.system.eventStream.publish(CurrentCart("Successfully saved snapshot",currentCart))//假如不需要保存历史购物过程,可以清理日志和旧快照deleteSnapshots(SnapshotSelectionCriteria(maxSequenceNr = metadata.sequenceNr - 1))deleteMessages(metadata.sequenceNr)stay()case Event(SaveSnapshotFailure(metadata, reason),currentCart) =>context.system.eventStream.publish(CurrentCart(s"Fail to save snapshot for $reason",currentCart))stay()case Event(DeleteMessagesSuccess(toSeq),currentCart) =>context.system.eventStream.publish(CurrentCart(s"Succefully deleted journal upto: $toSeq",currentCart))stay()case Event(DeleteMessagesFailure(cause,toSeq),currentCart) =>context.system.eventStream.publish(CurrentCart(s"Failed to delete journal upto: $toSeq because: $cause",currentCart))stay()case Event(DeleteSnapshotsSuccess(crit),currentCart) =>context.system.eventStream.publish(CurrentCart(s"Successfully deleted snapshots for $crit",currentCart))stay()case Event(DeleteSnapshotsFailure(crit,cause),currentCart) =>context.system.eventStream.publish(CurrentCart(s"Failed to delete snapshots $crit because: $cause",currentCart))stay()case _ => goto(LookingAround)}onTransition {case LookingAround -> Shopping =>context.system.eventStream.publish(CurrentCart("State changed: LookingAround -> Shopping",stateData))case Shopping -> Inactive =>context.system.eventStream.publish(CurrentCart("State changed: Shopping -> Inactive",stateData))case Shopping -> Paid =>context.system.eventStream.publish(CurrentCart("State changed: Shopping -> Paid",stateData))case Inactive -> Shopping =>context.system.eventStream.publish(CurrentCart("State changed: Inactive -> Shopping",stateData))case Paid -> LookingAround =>context.system.eventStream.publish(CurrentCart("State changed: Paid -> LookingAround",stateData))}override def onRecoveryCompleted(): Unit = {context.system.eventStream.publish(CurrentCart("OnRecoveryCompleted", stateData))}override def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit =context.system.eventStream.publish(CurrentCart(s"onPersistFailure ${cause.getMessage}",stateData))override def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit =context.system.eventStream.publish(CurrentCart(s"onPersistRejected ${cause.getMessage}",stateData))override def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit =context.system.eventStream.publish(CurrentCart(s"onRecoveryFailure ${cause.getMessage}",stateData))}

EventTracker.scala

package persistentfsm.tracker
import akka.actor._
import persistentfsm.cart.WebShopping
object EventTracker {def props = Props(new EventTracker)
}
class EventTracker extends Actor {override def preStart(): Unit = {context.system.eventStream.subscribe(self,classOf[WebShopping.CurrentCart])super.preStart()}override def postStop(): Unit = {context.system.eventStream.unsubscribe(self)super.postStop()}override def receive: Receive = {case WebShopping.CurrentCart(loc,cart) =>println(loc)cart match {case WebShopping.EmptyCart => println("empty cart!")case WebShopping.LoadedCart(items) => println(s"Current content in cart: $items")}}}

PersistentFSMDemo.scala

package persistentfsm.demo
import persistentfsm.cart._
import persistentfsm.tracker._
import akka.actor._
import WebShopping._object PersistentFSMDemo extends App {val pfSystem = ActorSystem("persistentfsm-system")val trackerActor = pfSystem.actorOf(EventTracker.props,"tracker")val cart123 = pfSystem.actorOf(WebShopping.props("123"))cart123 ! GetCartcart123 ! AddItem(Item("001","Cigar",12.50))cart123 ! AddItem(Item("002","Wine",18.30))cart123 ! AddItem(Item("003","Coffee",5.50))cart123 ! GetCartcart123 ! RemoveItem("001")cart123 ! Buycart123 ! GetCartcart123 ! AddItem(Item("004","Bread",3.25))cart123 ! AddItem(Item("005","Cake",5.25))scala.io.StdIn.readLine()pfSystem.terminate()}

转载于:https://www.cnblogs.com/tiger-xc/p/7323832.html

Akka(16): 持久化模式:PersistentFSM-可以自动修复的状态机器相关推荐

  1. 16位模式/32位模式下PUSH指令探究——《x86汇编语言:从实模式到保护模式》读书笔记16...

    一.Intel 32 位处理器的工作模式 如上图所示,Intel 32 位处理器有3种工作模式. (1)实模式:工作方式相当于一个8086 (2)保护模式:提供支持多任务环境的工作方式,建立保护机制 ...

  2. 16位模式/32位模式下PUSH指令探究——《x86汇编语言:从实模式到保护模式》读书笔记16

    一.Intel 32 位处理器的工作模式 如上图所示,Intel 32 位处理器有3种工作模式. (1)实模式:工作方式相当于一个8086 (2)保护模式:提供支持多任务环境的工作方式,建立保护机制 ...

  3. Pic18F25K80 16位模式下的定时器0配置

    Pic18F25K80 16位模式下的定时器0配置 项目用到了PIC18F25K80单片机的定时器0作为系统的节拍.在设计中发现定时器第一次进入中断的跟第二次进入中断时间都比较长,原来只配置了20ms ...

  4. 状态模式 设计模式_设计模式:状态

    状态模式 设计模式 本文将介绍状态设计模式 . 它是行为设计模式之一 . 您无需了解许多理论即可了解模式的主要概念. 该文章将分为几个部分,在其中我将提供有关需要应用该模式的情况,它所具有的利弊以及用 ...

  5. java 状态模式 同步_JAVA设计模式之状态模式

    在阎宏博士的<JAVA与模式>一书中开头是这样描述状态(State)模式的: 状态模式,又称状态对象模式(Pattern of Objects for States),状态模式是对象的行为 ...

  6. flutter bloc_如何使用BLoC模式处理Flutter中的状态

    flutter bloc Last year, I picked up Flutter and I must say it has been an awesome journey so far. Fl ...

  7. android 状态模式,Android编程设计模式之状态模式详解

    本文实例讲述了Android编程设计模式之状态模式.分享给大家供大家参考,具体如下: 一.介绍 状态模式中的行为是由状态来决定的,不同的状态下有不同的行为.状态模式和策略模式的结构几乎完全一样,但它们 ...

  8. 红米机器人倒地_教你手机recovery模式后出现机器人倒地状态怎么办及红米note增强版怎么root...

    很多安卓手机用户都会选择给手机进行刷机,不过,在刷机的过程中,常常会遇到一些意想不到的状况,而让正在刷机的手机用户束手无策,比如说:手机recovery模式后出现机器人倒地状态,不少刷机的用户看到手机 ...

  9. AKKA框架持久化入门样例

    背景 我们在开发的过程中可能会在内存中操作数据,但是可能会遇到突然服务器断电.网线被挖等情况.这就需要将内存中的数据持久化,在程序重启的时候依然能够恢复. AKKA介绍 Akka is a toolk ...

最新文章

  1. 2022-2028年中国露天采矿行业调查与投资前景评估报告
  2. “vector”: 不是“std”的成员_C++ vector成员函数实现[持续更新]
  3. PHP stripos strpos,strpos()和stripos()函数的区别
  4. Element UI 在父类设置样式不起作用
  5. memset,memcpy,strcpy 的区别
  6. Django 入门项目案例开发(中)
  7. 2019金球奖——梅西
  8. mac通过homebrew安装opensdk11
  9. tensorflow和python版本不一样_相比Tensorflow2和PyTorch,TensorFlow1.x版本有什么弊端?...
  10. uboot加载linux内核加载那些内容,uBoot和Linux内核中涉及到的几个地址参数的理解...
  11. Vista开发兼容性概述
  12. C语言实现简单学籍管理系统
  13. 扩展JavaScript数组(Array)添加删除元素方法
  14. java ftpclient quit_一步一步android(6):关于FtpClient类的学习
  15. Edraw Office Viewer Component Crack
  16. Python——pyqt5的计算器(源码+打包)
  17. CVE-2012-0158漏洞分析
  18. 微信小程序实现身份证识别功能
  19. (二)基于STM32f103的I2C通信接口的EPPROM模块(24C256)读写程序详解
  20. microduino实现红外线发送与接收

热门文章

  1. AR内容开发--AR开发引擎和SDK
  2. 深度学习(Deep Learning)——卷积神经网络 (Convolutional Neural Networks / CNN)
  3. 区块链论语:公链可能是永无止息的战争
  4. 大数据分析-NBA球员分析-基于pandas、numpy等-期末大作业
  5. 防蓝光爱大爱手机眼镜效果好吗?
  6. 2021-2025年中国EMS和ODM行业市场供需与战略研究报告
  7. 互联网O2O维修的新格局,极客修与他的新使命
  8. UNIX标准化及实现
  9. 浅谈box-sizeing,以及chrome firfox 以及ie的区别
  10. 外贸CRM软件排行榜:优化客户关系管理,跟进客户并提升销售业绩