前面介绍了事件源(EventSource)和集群(cluster),现在到了讨论CQRS的时候了。CQRS即读写分离模式,由独立的写方程序和读方程序组成,具体原理在以前的博客里介绍过了。akka-typed应该自然支持CQRS模式,最起码本身提供了对写方编程的支持,这点从EventSourcedBehavior 可以知道。akka-typed提供了新的EventSourcedBehavior-Actor,极大方便了对persistentActor的应用开发,但同时也给编程者造成了一些限制。如手工改变状态会更困难了、EventSourcedBehavior不支持多层式的persist,也就是说通过persist某些特定的event然后在event-handler程序里进行状态处理是不可能的了。我这里有个例子,是个购物车应用:当完成支付后需要取个快照(snapshot),下面是这个snapshot的代码:

snapshotWhen {

(state,evt,seqNr)=>CommandHandler.takeSnapshot(state,evt,seqNr)

}

...

def takeSnapshot(state: Voucher, evt: Events.Action, lstSeqNr: Long)(implicit pid: PID) ={if(evt.isInstanceOf[Events.PaymentMade]||evt.isInstanceOf[Events.VoidVoucher.type]||evt.isInstanceOf[Events.SuspVoucher.type])if(state.items.isEmpty) {

log.step(s"#${state.header.num} taking snapshot at [$lstSeqNr] ...")true}else

false

else

false}

判断event类型是没有问题的,因为正是当前的事件,但另一个条件是购物车必须是清空了的。这个有点为难,因为这个状态要依赖这几个event运算的结果才能确定,也就是下一步,但确定结果又需要对购物车内容进行计算,好像是个死循环。在akka-classic里我们可以在判断了event运算结果后,如果需要改变状态就再persist一个特殊的event,然后在这个event的handler进行状态处理。没办法,EventSourcedBehavior不支持多层persist,只有这样做:

case PaymentMade(acct, dpt, num, ref,amount) =>...

writerInternal.lastVoucher=Voucher(vchs, vItems)

endVoucher(Voucher(vchs,vItems),TXNTYPE.sales)

Voucher(vchs.nextVoucher, List())

...

我只能先吧当前状态保存下来、进行结单运算、然后清空购物车,这样snapshot就可以顺利进行了。

好了,akka的读方编程是通过PersistentQuery实现的。reader的作用就是把event从数据库读出来后再恢复成具体的数据格式。我们从reader的调用了解一下这个应用里reader的实现细节:

val readerShard = writerInternal.optSharding.getval readerRef= readerShard.entityRefFor(POSReader.EntityKey, s"$pid.shopId:$pid.posId")

readerRef! Messages.PerformRead(pid.shopid, pid.posid,writerInternal.lastVoucher.header.num,writerInternal.lastVoucher.header.opr,bseq,eseq,txntype,writerInternal.expurl,writerInternal.expacct,writerInternal.exppass)

可以看到这个reader是一个集群分片,sharding-entity。想法是每单完成购买后发个消息给一个entity、这个entity再完成reader功能后自动终止,立即释放出占用的资源。reader-actor的定义如下:

objectPOSReader extends LogSupport {

val EntityKey: EntityTypeKey[Command]= EntityTypeKey[Command]("POSReader")

def apply(nodeAddress: String, trace: Boolean): Behavior[Command]={

log.stepOn=traceimplicit var pid: PID = PID("","")

Behaviors.supervise(

Behaviors.setup[Command] { ctx=>Behaviors.withTimers { timer=>

implicit val ec =ctx.executionContext

Behaviors.receiveMessage {case PerformRead(shopid, posid, vchnum, opr, bseq, eseq, txntype, xurl, xacct, xpass) =>pid=PID(shopid, posid)

log.step(s"POSReader: PerformRead($shopid,$posid,$vchnum,$opr,$bseq,$eseq,$txntype,$xurl,$xacct,$xpass)")(PID(shopid, posid))

val futReadSaveNExport= for{

txnitems

_

{if(txntype ==Events.TXNTYPE.voidall)

txnitems.map (_.copy(txntype=Events.TXNTYPE.voidall))elsetxnitems },

trace)(ctx.system.toClassic, pid)

}yield()

ctx.pipeToSelf(futReadSaveNExport) {case Success(_) =>{

timer.startSingleTimer(ReaderFinish(shopid, posid, vchnum), readInterval.seconds)

StopReader

}case Failure(err) =>log.error(s"POSReader: Error: ${err.getMessage}")

timer.startSingleTimer(ReaderFinish(shopid, posid, vchnum), readInterval.seconds)

StopReader

}

Behaviors.samecase StopReader =>Behaviors.samecase ReaderFinish(shopid, posid, vchnum) =>Behaviors.stopped(

()=> log.step(s"POSReader: {$shopid,$posid} finish reading voucher#$vchnum and stopped")(PID(shopid, posid))

)

}

}

}

).onFailure(SupervisorStrategy.restart)

}

reader就是一个普通的actor。值得注意的是读方程序可能是一个庞大复杂的程序,肯定需要分割成多个模块,所以我们可以按照流程顺序进行模块功能切分:这样下面的模块可能会需要上面模块产生的结果才能继续。记住,在actor中绝对避免阻塞线程,所有的模块都返回Future, 然后用for-yield串起来。上面我们用了ctx.pipeToSelf 在Future运算完成后发送ReaderFinish消息给自己,通知自己停止。

在这个例子里我们把reader任务分成:

1、从数据库读取事件

2、事件重演一次产生状态数据(购物车内容)

3、将形成的购物车内容作为交易单据项目存入数据库

4、向用户提供的restapi输出交易数据

event读取是通过cassandra-persistence-plugin实现的:

val query =PersistenceQuery(classicSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)//issue query to journal

val source: Source[EventEnvelope, NotUsed] =query.currentEventsByPersistenceId(s"${pid.shopid}:${pid.posid}", startSeq, endSeq)//materialize stream, consuming events

val readActions: Future[List[Any]] = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny }

这部分比较简单:定义一个PersistenceQuery,用它产生一个Source,然后run这个Source获取Future[List[Any]]。

重演事件产生交易数据:

def buildVoucher(actions: List[Any]): List[TxnItem] ={

log.step(s"POSReader: read actions: $actions")

val (voidtxns,onlytxns)=actions.asInstanceOf[Seq[Action]].pickOut(_.isInstanceOf[Voided])

val listOfActions= onlytxns.reverse zip (LazyList from 1) //zipWithIndex

listOfActions.foreach { case (txn,idx) =>txn.asInstanceOf[Action] match {case Voided(_) =>

case ti@_ =>curTxnItem= EventHandlers.buildTxnItem(ti.asInstanceOf[Action],vchState).copy(opr=cshr)if(voidtxns.exists(a => a.asInstanceOf[Voided].seq ==idx)) {

curTxnItem= curTxnItem.copy(txntype = TXNTYPE.voided, opr=cshr)

log.step(s"POSReader: voided txnitem: $curTxnItem")

}

val vch= EventHandlers.updateState(ti.asInstanceOf[Action],vchState,vchItems,curTxnItem,true)

vchState=vch.header

vchItems=vch.txnItems

log.step(s"POSReader: built txnitem: ${vchItems.txnitems.head}")

}

}

log.step(s"POSReader: voucher built with state: $vchState, items: ${vchItems.txnitems}")

vchItems.txnitems

}

重演List[Event],产生了List[TxnItem]。

向数据库里写List[TxnItem]:

def writeTxnsToDB(vchnum: Int, txntype: Int, bseq: Long, eseq: Long, txns: List[TxnItem])(implicit system: akka.actor.ActorSystem, session: CassandraSession, pid: PID): Future[Seq[TxnItem]] = ???

注意返回结果类型Future[Seq[TxnItem]]。我们用for-yield把这几个动作串起来:

val txnitems: Future[List[Events.TxnItem]] = for{

lst1

lstTxns

readActionselseFastFuture.successful(lst1)

items

_

_

}yield items

注意返回结果类型Future[Seq[TxnItem]]。我们用for-yield把这几个动作串起来:

val txnitems: Future[List[Events.TxnItem]] = for{

lst1

lstTxns

readActionselseFastFuture.successful(lst1)

items

_

_

}yield items

注意:这个for返回的Future[List[TxnItem]],是提供给restapi输出功能的。在那里List[TxnItem]会被转换成json作为post的包嵌数据。

现在所有子任务的返回结果类型都是Future了。我们可以再用for来把它们串起来:

val futReadSaveNExport = for{

txnitems

_

{if(txntype ==Events.TXNTYPE.voidall)

txnitems.map (_.copy(txntype=Events.TXNTYPE.voidall))elsetxnitems },

trace)(ctx.system.toClassic, pid)

}yield ()

说到EventSourcedBehavior,因为用了cassandra-plugin,忽然想起配置文件里新旧有很大区别。现在这个application.conf是这样的:

akka {

loglevel=INFO

actor {

provider=cluster

serialization-bindings {"com.datatech.pos.cloud.CborSerializable" = jackson-cbor

}

}

remote {

artery {

canonical.hostname= "192.168.11.189"canonical.port= 0}

}

cluster {

seed-nodes =["akka://cloud-pos-server@192.168.11.189:2551"]

sharding {

passivate-idle-entity-after = 5m

}

}

# use Cassandra to store both snapshots and the events of the persistent actors

persistence {

journal.plugin= "akka.persistence.cassandra.journal"snapshot-store.plugin = "akka.persistence.cassandra.snapshot"}

}

akka.persistence.cassandra {

# don't use autocreate in production

journal.keyspace = "poc2g"journal.keyspace-autocreate =on

journal.tables-autocreate =on

snapshot.keyspace= "poc2g_snapshot"snapshot.keyspace-autocreate =on

snapshot.tables-autocreate =on

}

datastax-java-driver {

basic.contact-points = ["192.168.11.189:9042"]

basic.load-balancing-policy.local-datacenter = "datacenter1"}

akka.persitence.cassandra段落里可以定义keyspace名称,这样新旧版本应用可以共用一个cassandra,同时在线。

akka typed mysql_akka-typed(8) - CQRS读写分离模式相关推荐

  1. CQRS读写职责分离模式(Command and Query Responsibility Segregation (CQRS) Pattern)

    此文翻译自msdn,侵删. 原文地址:https://msdn.microsoft.com/en-us/library/dn568103.aspx 通过使用不同的接口来分离读和写操作,这种模式最大化了 ...

  2. Mycat实现读写分离

    概述 通过Mycat和MySQL的主从复制配合搭建数据库的读写分离,实现MySQL的高可用性.我们将搭建:一主一从.双主双从两种读写分离模式. 一主一从模式 一主一从模式是指一个主机用于处理所有写请求 ...

  3. 读写分离,读写分离死锁解决方案,事务发布死锁解决方案,发布订阅死锁解决方案|事务(进程 ID *)与另一个进程被死锁在 锁 资源上,并且已被选作死锁牺牲品。请重新运行该事务...

    前言:         由于网站访问压力的问题,综合分析各种因素后结合实际情况,采用数据库读写分离模式来解决当前问题.实际方案中采用"事务发布"模式实现主数据库和只读数据库的同步, ...

  4. 【项目升级】单库、多库、读写分离 · 任你选

    本期配套视频: https://www.bilibili.com/video/BV1BJ411B7mn?p=6 (点击阅读原文,可看,如果没有,最晚下午可看) 继上周增加[任务调度]以后,继续对项目进 ...

  5. mybatis 自动填充无效_开发小知识-mybatis-plus自动填充与读写分离

    mybatis-plus 自动填充 说明 我们在设计表结构的时候,往往会额外添多如下几个字段 create_time[表字段]-- createTime[实体字段] : 创建时间 update_tim ...

  6. 数据源管理 | 主从库动态路由,AOP模式读写分离

    本文源码:GitHub·点这里 || GitEE·点这里 一.多数据源应用 1.基础描述 在相对复杂的应用服务中,配置多个数据源是常见现象,例如常见的:配置主从数据库用来写数据,再配置一个从库读数据, ...

  7. mycat读写分离与主从切换

    1,分库分表的优缺点.以及为什么分表方式无法成为主流? 分表:在台服务器上,优点是易维护,类似表分区,缺点是在一台db服务器上,无法分担IO.负载集中. 分库:在多台服务器上,优点是分担IO.负载均衡 ...

  8. mysql读写分离中间件有哪些

    mysql中间件有哪些 mysql-proxy是官方提供的mysql中间件产品可以实现负载平衡,读写分离,failover等,但其不支持大数据量的分库分表且性能较差.下面介绍几款能代替其的mysql开 ...

  9. MySQL-Mycat读写分离

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 MySQL-Mycat读写分离 一.读写分离 1.什么是读写分离 2.读写分离的好处 Mycat是什么? 架构 部署环境 部署myca ...

  10. php mycat 读写分离,MyCAT读写分离以及参数调配

    MyCat的说明文档请参见 主要使用到得几个配置文件有schema.xml.rule.xml.server.xml MYCAT_HOME/conf/schema.xml 中定义逻辑库,表.分片节点等内 ...

最新文章

  1. 在.net中读写config文件的各种方法(转载)
  2. android安卓机版市场,安卓各版本市场份额数据更新 安卓8.0暴增
  3. mysql数据库密码错误_MySQL数据库经典错误六 数据库密码忘记的问题
  4. 旅行报告:JavaOne 2013 –重归荣耀
  5. React之JSX入门
  6. 【iCore3 双核心板_ uC/OS-III】例程四:时间管理
  7. Java应用线上CPU飙高
  8. 记录——《C Primer Plus (第五版)》第十章编程练习第十题
  9. ICMP数据包结构(转)
  10. Adobe FLASH CS6 安装错误解决方法
  11. oracle表空间加密
  12. windows pe安装系统
  13. 在固定宽高内显示固定数量的最大正方形
  14. 虚拟机快照、迁移、删除
  15. 723. PUM(DAY 13)
  16. win10系统找不到telnet服务器,win10找不到telnet服务怎么办_win10没有telnet服务如何找回...
  17. Prior-Induced Information Alignment for Image Matting
  18. Unity自带GL详解
  19. 5G技术的原理(转)
  20. python设置电脑ip代理_设置代理IP在Python中使用

热门文章

  1. 什么是即时通讯(IM)?
  2. 火狐浏览器打开IE窗口/IE跳谷歌页面等 --- 自定义协议---手动执行注册表
  3. python批量删除txt文本前面几行和最后几行
  4. ECMAScript相关知识介绍
  5. Euclid‘s Game(博弈)
  6. 中国医科大学计算机应用本科作业答案,17秋中国医科大学《计算机应用基础(本科)》在线作业100分标准答案...
  7. XMLHTTP的ReadyState与Statu详解
  8. python入门神器_Python入门之神器,助你快速上手!
  9. Hexo添加Icarus主题
  10. Eclipse 代码整理