在前面的的讨论里已经介绍了CQRS读写分离模式的一些原理和在akka-typed应用中的实现方式。通过一段时间akka-typed的具体使用对一些经典akka应用的迁移升级,感觉最深的是EvenSourcedBehavior和akka-cluster-sharding了。前者是经典akka中persistenceActor的替换,后者是在原有组件基础上在使用方面的升级版。两者都在使用便捷性方面提供了大幅度的提升。在我看来,cluster-sharding是分布式应用的核心,如果能够比较容易掌握,对开发正确的分布式系统有着莫大的裨益。但这篇讨论的重点将会集中在EventSourcedBehavior上,因为它是实现CQRS的关键。而CQRS又是大数据应用数据采集(输入)管理最新的一个重要模式。

EventSourcedBehaviro是akka-typed为event-sourcing事件源模式提供的开发支持。具体的原理和使用方法在前面的博客里都介绍过了,在这篇就不再重复。我们把时间精力放到对event-sourcing的了解和应用上。

可以说,event-sourcing是一种数据库操作的模式。简单来说:event-sourcing的工作原理是把对数据库的操作动作保存起来,不直接对数据库进行即时更新,而是在一个阶段之后通过回溯replay这些动作才对数据库进行实质的更新。event-sourcing与传统数据库操作模式的最大分别就是:event-sourcing对数据库的更新过程可以重复,在一个既定的原点开始重演所有动作可以得出同样的结果,即同样的数据库状态。在大数据、高并发应用中最难控制的应该就是用户操作了。用户可能在任何时间同时对同一项数据进行更新。通用的传统方式是通过“锁”来保证数据的正确性,但“锁”会给系统带来更多的麻烦如响应慢甚至系统锁死。而一旦出现系统锁死重启后并无有效办法恢复数据库正确状态。event-sourcing恰恰就能有针对性的解决这些问题。

感觉到,event-sourcing模式应该可以避免对“锁”的使用:在高并发环境里,event-sourcing系统的每个用户在任何时间都有可能对数据库进行操作。但他们并不直接改变数据库内容,而是将这些对数据库操作的动作保存起来。因为用户保存的是各自的动作,互不关联,所以不需要任何锁机制。当系统完成一个阶段的工作后,从这个阶段的起点开始,把所有用户的动作按发生时间顺序重演并对数据库进行实质的更新。可以看到,这个具体的数据库更新过程是单一用户的,所以不需要“锁”了。阶段的起点是由数据库状态快照来表示。在完成了这个阶段所有动作重演后数据库状态一次性更新。整个过程即是CQRS读写分离模式了,其中:保存动作为写部分,动作重演是读部分。动作重演可以在之后的任何时间进行,因而读、写是完全分离的。实际上CQRS就是一个数据库更新管理的状态机器:从数据起始状态到终结状态的一种过程管理方法。下面就用一个实际的应用设计例子来介绍CQRS在应用系统中的具体使用。

下面讨论一个超市收款机pos软件的例子:

收款流程比较简单:收款员登录=>扫码录入销售项目=>录入折扣=>其它操作=>支付=>打小票

最终结果是在数据库产生了一张销售单,即一组交易数据,是实际反映在交易数据库里的。从CQRS流程来解释:这组销售数据在开单时为空,然后在完成所有单据操作后一次性产生,也就是在CQRS模式的读部分产生的。在这个过程中一直是写部分的操作,不影响交易数据库状态。当然,我们还必须在内存里维护一个模拟的状态来对每项操作进行控制,如:用户未登录时不容许任何操作动作。所以必须有个状态能代表用户登录的,而这个状态应该可以通过动作重演来重现,所以用户登录也是一个必须保存的动作。如此,每张销售单在内存里都应该有一个状态,这个状态包括了单据状态和一个动态的交易项目集合。这个项目集合就代表即将产生的数据库交易数据。下面是单据状态的定义:

  case class VchStates(opr: String = "", //收款员num: Int = 1, //当前单号seq: Int = 1, //当前序号void: Boolean = false, //取消模式refd: Boolean = false, //退款模式susp: Boolean = false, //挂单canc: Boolean = false, //废单due: Boolean = false, //当前余额su: String = "",mbr: String = "",disc: Int = 0, //预设折扣,如:会员折扣mode: Int = 0 //当前操作流程:0=logOff, 1=LogOn, 2=Payment) extends CborSerializable { ... }

交易项目是交易数据的直接对应:

  case class TxnItem(txndate: String = LocalDate.now.format(DateTimeFormatter.ofPattern("yyyyMMdd")), txntime: String = LocalDateTime.now.format(dateTimeFormatter).substring(11), opr: String = "" //工号, num: Int = 0 //销售单号, seq: Int = 1 //交易序号, txntype: Int = TXNTYPE.sales //交易类型, salestype: Int = SALESTYPE.nul //销售类型, qty: Int = 1 //交易数量, price: Int = 0 //单价(分), amount: Int = 0 //码洋(分), disc: Int = 0 //折扣率 (%) 100% = 1, dscamt: Int = 0 //折扣额:负值  net实洋 = amount + dscamt, member: String = "" //会员卡号, code: String = "" //编号(商品、部类编号、账户编号、卡号...), refnum: String = "" //参考号,如退货单号, acct: String = "" //账号, dpt: String = "" //部类) extends CborSerializable {

为了提高系统效率,根据操作动作实时对交易项目进行了更新,如遇到折扣动作时需要更新上一条交易项目的优惠金额等。这也是在读部分动作重演必须的,因为CQRS的读部分目的是把正确的交易数据写到数据库里。所以,CQRS的写部分就代表对内存中这个交易项目集的动态更新过程。

单据状态在结单时用EventSourcedBehavior拿了个snapshot作为下一单的起始状态。销售中途出现异常退出后可以在上一单状态快照的基础上实施动作重演把状态恢复到出现异常之前。

由于每个阶段都可以清晰的用一张销售单的生命周期来代表,所以在整单操作完成后就可以进行CQRS的读部分了。操作结束的方式最明显的是单据完成支付操作了,如下:

      case PaymentMade(acct, dpt, num, ref,amount) =>if (curItem.txntype != TXNTYPE.voided) {val due = items.totalSales - items.totalPaidval bal = if (items.totalSales > 0) due - curItem.amount else due + curItem.amountlog.step(s"#${vchState.num} PaymentMade with input totalSales[${items.totalSales}], totalPaid[${items.totalPaid}], txnItems[${items}].")val vchs = vchState.copy(seq = vchState.seq + 1,due = (if ((items.totalPaid.abs + curItem.amount.abs) >= items.totalSales.abs) false else true),mode = (if (items.totalPaid.abs > 0) 2 else 1))val vItems = items.addItem(curItem.copy(salestype = SALESTYPE.ttl,price = due,amount = curItem.amount,dscamt = bal)).txnitemsif (replay) {Voucher(vchs, vItems)} else {if (vchs.due) {val vch = Voucher(vchs,vItems)log.step(s"#${vchState.num} PaymentMade with current item: ${vch.items.head}")vch}else {writerInternal.lastVoucher = Voucher(vchs, vItems)if (!writerInternal.afterRecovery)endVoucher(Voucher(vchs,vItems),TXNTYPE.sales)Voucher(vchs.nextVoucher, List())}}}else {log.step(s"#${vchState.num} PaymentMade with current item: $curItem")Voucher(vchState.copy(seq = vchState.seq + 1), items.addItem(curItem).txnitems)}

确认了完成支付调用endVoucher. endVoucher启动读部分reader, 如下:

  def endVoucher(voucher: Voucher, txntype: Int)(implicit writerInternal: WriterInternal,pid:Messages.PID) = {log.step(s"#${writerInternal.lastVoucher.header.num } ending voucher with state: ${writerInternal.lastVoucher.header}, txns: ${writerInternal.lastVoucher.items}")val readerShard = writerInternal.optSharding.get   //ClusterSharding(writerInternal.actorContext.system)val readerRef = readerShard.entityRefFor(POSReader.EntityKey, s"$pid.shopId:$pid.posId")val eseq = EventSourcedBehavior.lastSequenceNumber(writerInternal.optContext.get)val bseq = eseq - writerInternal.listOfActions.size + 1log.step(s"#${writerInternal.lastVoucher.header.num } sending PerformRead(${pid.shopid}, ${pid.posid},${writerInternal.lastVoucher.header.num},${writerInternal.lastVoucher.header.opr},$bseq,$eseq,$txntype,${writerInternal.expurl},${writerInternal.expacct},${writerInternal.exppass}) ...")//    log.step(s"#${writerInternal.lastVoucher.header.num } ending voucher with actions: ${writerInternal.listOfActions}")readerRef ! Messages.PerformRead(pid.shopid, pid.posid,writerInternal.lastVoucher.header.num,writerInternal.lastVoucher.header.opr,bseq,eseq,txntype,writerInternal.expurl,writerInternal.expacct,writerInternal.exppass)writerInternal.clearListOfAction()log.step(s"#${writerInternal.lastVoucher.header.num } ending voucher with actions: ${writerInternal.listOfActions}")}

reader是在一个sharding上即时构建的一个actor。这个actor的主要功能就是从journal里读出这张单所有动作进行重演得出交易项目集后写进交易数据库:

 def readActions(ctx: ActorContext[Command],vchnum: Int, cshr: String, startSeq: Long, endSeq: Long, trace: Boolean, nodeAddress: String, shopId: String, posId: String, txntype: Int): Future[List[TxnItem]] = {implicit val classicSystem = ctx.system.toClassicimplicit val ec = classicSystem.dispatcherimplicit var vchState = VchStates().copy(num = vchnum, opr = cshr)implicit var vchItems = VchItems()implicit var curTxnItem = TxnItem()implicit val pid = PID(shopId,posId)implicit val writerInternal = new Messages.WriterInternal(nodeAddress = nodeAddress, pid = pid, trace=trace)log.stepOn = tracelog.step(s"POSReader: readActions($vchnum,$cshr,$startSeq,$endSeq,$trace,$nodeAddress,$shopId,$posId), txntype=$txntype")def buildVoucher(actions: List[Any]): List[TxnItem] = {log.step(s"POSReader: read actions: $actions")val (voidtxns,onlytxns) = actions.asInstanceOf[Seq[Action]].pickOut(_.isInstanceOf[Voided])val listOfActions = actions.reverse zip (LazyList from 1)   //zipWithIndexlistOfActions.foreach { case (txn,idx) =>txn.asInstanceOf[Action] match {case ti@_ =>curTxnItem = EventHandlers.buildTxnItem(ti.asInstanceOf[Action],vchState).copy(opr=cshr)if (!ti.isInstanceOf[Voided]) {if (voidtxns.exists(a => a.asInstanceOf[Voided].seq == idx)) {curTxnItem = curTxnItem.copy(txntype = TXNTYPE.voided, opr = cshr)log.step(s"POSReader: voided txnitem: $curTxnItem")}}val vch = EventHandlers.updateState(ti.asInstanceOf[Action],vchState,vchItems,curTxnItem,true)vchState = vch.headervchItems = vch.txnItemslog.step(s"POSReader: built txnitem: ${vchItems.txnitems.head}")}}log.step(s"POSReader: voucher built with state: $vchState, items: ${vchItems.txnitems}")vchItems.txnitems}val query =PersistenceQuery(classicSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)implicit val session = CassandraSessionRegistry(classicSystem).sessionFor("alpakka.cassandra")// issue query to journalval source: Source[EventEnvelope, NotUsed] =query.currentEventsByPersistenceId(s"${pid.shopid}:${pid.posid}", startSeq, endSeq)// materialize stream, consuming eventsval readActions: Future[List[Any]] = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny }for {lst1 <- readActions    //read list from SourcelstTxns <- if (lst1.length < (endSeq -startSeq))    //if imcomplete list read againreadActionselse FastFuture.successful(lst1)items <- FastFuture.successful( buildVoucher(lstTxns) )_ <- JournalTxns.writeTxnsToDB(vchnum,txntype,startSeq,endSeq,items)_ <- session.close(ec)} yield items}

akka-typed(10) - event-sourcing, CQRS实战相关推荐

  1. Event Sourcing和CQRS实现

    Event Sourcing和CQRS实现 文章参考自: https://github.com/soooban/AxonDemo 相关资料: https://blog.csdn.net/quguang ...

  2. 领域驱动设计的实践 – CQRS Event Sourcing

    1.前言 领域驱动(Domain – Driven Design)设计的理念在于建立一系列既符合软件所处领域本身又适合软件分析开发需要的领域模型.命令查询与职责分离(Command Query Res ...

  3. CQRS, Task Based UIs, Event Sourcing agh!

    原文地址:CQRS, Task Based UIs, Event Sourcing agh! Many people have been getting confused over what CQRS ...

  4. [外文理解] DDD创始人Eric Vans:要实现DDD原始意图,必须CQRS+Event Sourcing架构。

    原文:http://www.infoq.com/interviews/Technology-Influences-DDD# 要实现DDD(domain drive  design 领域驱动设计)原始意 ...

  5. java event sourcing_深入浅出Event Sourcing和CQRS

    原标题:深入浅出Event Sourcing和CQRS Event Sourcing也叫事件溯源,是这些年另一个越来越流行的概念,是大神Martin Fowler提出的一种架构模式.简单来说,它有几个 ...

  6. Event Sourcing 和 CQRS

    目录 状态和事件 事件存储(Event Store) 事件朔源(Event Sourcing) CQRS(Command Query Responsibility Segregation) 总结 状态 ...

  7. (转)DDD CQRS和Event Sourcing的案例:足球比赛

    原文链接: https://www.jdon.com/44815 在12月11日新的有关DDD CQRS和Event Sourcing演讲:改变心态- 以更加面向对象视角看待业务领域建模中,作者以足球 ...

  8. cqrs java_深入浅出Event Sourcing和CQRS

    Event Sourcing也叫事件溯源,是这些年另一个越来越流行的概念,是大神Martin Fowler提出的一种架构模式.简单来说,它有几个特点: 整个系统以事件为驱动,所有业务都由事件驱动来完成 ...

  9. 对LMAX架构以及Event Sourcing模式的一些新思考和问题的记录

    最近又学习了一下LMAX架构,让我对该架构以及event sourcing模式又有了很多新的认识和疑问. 注:如果不知道什么是lmax架构和event sourcing模式的看官可以自己先去查查资料: ...

  10. Windows 10企业批量部署实战之刷新并添加启动映像

    相关组件及配置都设置完成后,接下来我们需要对我们的控制台进行刷新操作,以便生成windows PE启动映像并添加到WDS启动映像完成Windows 10企业部署的最后操作.针对x64\x86两启动映像 ...

最新文章

  1. 【洛谷 1345】 奶牛的电信
  2. qlabel可以选中吗_QLabel-标签控件的应用
  3. 令人迷惑的ATT的jmp:直接跳转和间接跳转 [转]
  4. python scrapy 环境搭建_python+scrapy环境搭建步骤描述
  5. Javascript中的arguments数组对象
  6. c构造函数和析构函数_C ++构造函数和析构函数| 查找输出程序| 套装2
  7. 管理运筹学软件计算机解咋看,运筹学软件结果解析总结计划题1-20210618123710.docx-原创力文档...
  8. 在freemarker中使用jsp标签 Using FreeMarker with servlets
  9. html 通用ui css图标,ui-icon.html
  10. LayoutInflater——inflate方法不同参数的区别
  11. php ftp 账号密码修改,月光软件站 - 编程文档 - 其他语言 - 用PHP即时添加,删除FTP帐号和更改FTP密码的函数...
  12. prim算法_数据结构 7.4.1 最小生成树 Prim
  13. 16级大一c语言考试题及答案,2013年计算机二级C语言上机试题十六及答案
  14. 将list中的数据组成用逗号分隔的字符串
  15. SQL Server 版本变更检查
  16. postman接口测试
  17. 卡巴斯基半年激活码免费申请
  18. 游戏美术基础:游戏贴图
  19. 如何解决VMware Workstation上ubuntu出现Host SMbus controller not enabled
  20. MEM/MBA英语基础(10)非谓语动词

热门文章

  1. ARM Cortex-A系列(A53、A57、A73等)处理器性能分类与对比
  2. 代码量怎么计算_怎么样利用南方CASS三角网法和方格网法进行土方量计算
  3. Intel E100 网卡驱动实例分析
  4. ipad上怎么打开html文件,ipad HTML文件怎么打开
  5. Andrew Ng-ML习题答案1
  6. ie上直接打开服务器word文档,win7 ie11直接打开word
  7. fix-下拉出现白条问题
  8. VA_LIST可变参数列表的使用方法与原理
  9. 7岁儿童智力检测题_7岁-11岁儿童智商测试题
  10. clock()、time()、clock_gettime()和gettimeofday()函数的用法和区别【转】