CQRS

CQRS 的意思是“命令-查询责任隔离”。我们分离了命令(写请求)和查询(读请求)之间的责任。写请求和读请求由不同的对象处理。

就是这样。我们可以进一步分割数据存储,使用单独的读写存储。一旦发生这种情况,可能会有许多读取存储,这些存储针对处理不同类型的查询或跨越多个边界上下文进行了优化。虽然经常讨论与 CQRS 相关的单独读写存储,但这并不是 CQRS 本身。CQRS 只是命令和查询的第一部分。

术语

Command

该命令是一个简单的数据结构,表示执行某些操作的请求。

Command Bus

完整源码:

  • github.com/ThreeDotsLabs/watermill/components/cqrs/command_bus.go
// ...
// CommandBus 将命令(commands)传输到命令处理程序(command handlers)。
type CommandBus struct {
// ...

Command Processor

完整源码:

  • github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
// CommandProcessor 决定哪个 CommandHandler 应该处理这个命令
received from the command bus.
type CommandProcessor struct {
// ...

Command Handler

完整源码:

  • github.com/ThreeDotsLabs/watermill/components/cqrs/command_processor.go
// ...
// CommandHandler 接收由 NewCommand 定义的命令,并使用 Handle 方法处理它。
// 如果使用 DDD, CommandHandler 可以修改并持久化聚合。
//
// 与 EvenHandler 相反,每个命令必须只有一个 CommandHandler。
//
// 在处理消息期间使用 CommandHandler 的一个实例。
// 当同时发送多个命令时,Handle 方法可以同时执行多次。
// 因此,Handle 方法必须是线程安全的!
type CommandHandler interface {
// ...

Event

该事件表示已经发生的事情。 事件是不可变的。

Event Bus

完整源码:

  • github.com/ThreeDotsLabs/watermill/components/cqrs/event_bus.go
// ...
// EventBus 将事件传输到事件处理程序。
type EventBus struct {
// ...

Event Processor

完整源码:

  • github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
// EventProcessor 确定哪个 EventHandler 应该处理从事件总线接收到的事件。
type EventProcessor struct {
// ...

Event Handler

完整源码:

  • github.com/ThreeDotsLabs/watermill/components/cqrs/event_processor.go
// ...
// EventHandler 接收由 NewEvent 定义的事件,并使用其 Handle 方法对其进行处理。
// 如果使用 DDD,CommandHandler 可以修改并保留聚合。
// 它还可以调用流程管理器、saga 或仅仅构建一个读取模型。
// 与 CommandHandler 相比,每个 Event 可以具有多个 EventHandler。
//
// 在处理消息时使用 EventHandler 的一个实例。
// 当同时传递多个事件时,Handle 方法可以同时执行多次。
// 因此,Handle 方法必须是线程安全的!
type EventHandler interface {
// ...

CQRS Facade

完整源码:

  • github.com/ThreeDotsLabs/watermill/components/cqrs/cqrs.go
// ...
// Facade 是用于创建 Command 和 Event buses 及 processors 的 facade。
// 创建它是为了在以标准方式使用 CQRS 时避免使用 boilerplate。
// 您还可以手动创建 buses 和 processors,并从 NewFacade 中获得灵感。
type Facade struct {
// ...

Command and Event Marshaler

完整源码:

  • github.com/ThreeDotsLabs/watermill/components/cqrs/marshaler.go
// ...
// CommandEventMarshaler 将命令和事件 marshal 给 Watermill 的消息,反之亦然。
// 该命令的有效载荷需要 marshal 至 []bytes。
type CommandEventMarshaler interface {// Marshal marshal 命令或事件给 Watermill 的消息。Marshal(v interface{}) (*message.Message, error)// Unmarshal Unmarshal watermill的信息给 v Command 或 Event。Unmarshal(msg *message.Message, v interface{}) (err error)// Name 返回命令或事件的名称。// Name 用于确定接收到的命令或事件是我们想要处理的事件。Name(v interface{}) string// NameFromMessage 从 Watermill 的消息(由 Marshal 生成)中返回命令或事件的名称。// //// 当我们将 Command 或 Event marshal 到 Watermill 的消息中时,// 我们应该使用 NameFromMessage 而不是 Name 来避免不必要的 unmarshaling。NameFromMessage(msg *message.Message) string
}
// ...

用法

Example domain(领域模型示例)

作为示例,我们将使用一个简单的 domain,它负责处理酒店的房间预订。

我们将使用 Event Storming 表示法来展示这个 domain 的模型。

Legend:

  • blue(蓝色)便利贴是命令
  • orange(橙色)便利贴是事件
  • green(绿色)便利贴是读取模型,从事件异步生成
  • violet(紫色)便利贴是策略,由事件触发并产生命令
  • pink(粉色)便利贴是热点(hot-spots);我们标记经常发生问题的地方

domain(领域模型)很简单:

  • 客人可以预订房间(book a room)。
  • 每当预订房间时,我们都会为客人订购啤酒(Whenever a room is booked, we order a beer)(因为我们爱客人)。
    • 我们知道有时候啤酒不够(not enough beers)。
  • 我们根据预订生成财务报告(financial report)。

Sending a command(发送命令)

首先,我们需要模拟访客的动作。

完整源码:

  • github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...bookRoomCmd := &BookRoom{RoomId:    fmt.Sprintf("%d", i),GuestName: "John",StartDate: startDate,EndDate:   endDate,}if err := commandBus.Send(context.Background(), bookRoomCmd); err != nil {panic(err)}
// ...

Command handler

BookRoomHandler 将处理我们的命令。

完整源码:

  • github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookRoomHandler 是一个命令处理程序,它处理 BookRoom 命令并发出 RoomBooked。
//
// 在 CQRS 中,一个命令只能由一个处理程序处理。
// 将具有此命令的另一个处理程序添加到命令处理器时,将返回错误。
type BookRoomHandler struct {eventBus *cqrs.EventBus
}func (b BookRoomHandler) HandlerName() string {return "BookRoomHandler"
}// NewCommand 返回该 handle 应该处理的命令类型。它必须是一个指针。
func (b BookRoomHandler) NewCommand() interface{} {return &BookRoom{}
}func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {// c 始终是 `NewCommand` 返回的类型,因此强制转换始终是安全的cmd := c.(*BookRoom)// 一些随机的价格,在生产中你可能会用更明智的方式计算price := (rand.Int63n(40) + 1) * 10log.Printf("Booked %s for %s from %s to %s",cmd.RoomId,cmd.GuestName,time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),)// RoomBooked 将由 OrderBeerOnRoomBooked 事件处理程序处理,// 将来,RoomBooked 可能由多个事件处理程序处理if err := b.eventBus.Publish(ctx, &RoomBooked{ReservationId: watermill.NewUUID(),RoomId:        cmd.RoomId,GuestName:     cmd.GuestName,Price:         price,StartDate:     cmd.StartDate,EndDate:       cmd.EndDate,}); err != nil {return err}return nil
}// OrderBeerOnRoomBooked 是事件处理程序,它处理 RoomBooked 事件并发出 OrderBeer 命令。
// ...

Event handler

如前所述,我们希望每次预订房间时都点一杯啤酒(“每次预订房间时”便笺)。我们通过使 OrderBeer 命令来实现。

完整源码:

  • github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// OrderBeerOnRoomBooked 是事件处理程序,它处理 RoomBooked 事件并发出 OrderBeer 命令。
type OrderBeerOnRoomBooked struct {commandBus *cqrs.CommandBus
}func (o OrderBeerOnRoomBooked) HandlerName() string {// 此名称传递给 EventsSubscriberConstructor 并用于生成队列名称return "OrderBeerOnRoomBooked"
}func (OrderBeerOnRoomBooked) NewEvent() interface{} {return &RoomBooked{}
}func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {event := e.(*RoomBooked)orderBeerCmd := &OrderBeer{RoomId: event.RoomId,Count:  rand.Int63n(10) + 1,}return o.commandBus.Send(ctx, orderBeerCmd)
}// OrderBeerHandler 是命令处理程序,它处理 OrderBeer 命令并发出 BeerOrdered。
// ...

OrderBeerHandler 与 BookRoomHandler 非常相似。唯一的区别是,当啤酒不够时,它有时会返回一个错误,这将导致重新交付命令。您可以在示例源代码中找到整个实现。

使用事件处理程序构建读取模型

完整源码:

  • github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
// BookingsFinancialReport 是一个读取模型,用于计算我们可以从预订中赚取多少钱。
// 与 OrderBeerOnRoomBooked 一样,它侦听 RoomBooked 事件。
//
// 此实现只是写入内存。在生产中,您可能会使用一些持久性存储。
type BookingsFinancialReport struct {handledBookings map[string]struct{}totalCharge     int64lock            sync.Mutex
}func NewBookingsFinancialReport() *BookingsFinancialReport {return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}func (b BookingsFinancialReport) HandlerName() string {// 此名称传递给 EventsSubscriberConstructor 并用于生成队列名称return "BookingsFinancialReport"
}func (BookingsFinancialReport) NewEvent() interface{} {return &RoomBooked{}
}func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {// Handle 可以被并发调用,因此它必须是线程安全的。b.lock.Lock()defer b.lock.Unlock()event := e.(*RoomBooked)// 当我们使用不提供一次精确交付语义的 Pub/Sub 时,我们需要对消息进行重复数据删除。// GoChannel Pub/Sub 提供了精确的一次交付,// 但是让我们为其他 Pub/Sub 实现准备好这个示例。if _, ok := b.handledBookings[event.ReservationId]; ok {return nil}b.handledBookings[event.ReservationId] = struct{}{}b.totalCharge += event.Pricefmt.Printf(">>> Already booked rooms for $%d\n", b.totalCharge)return nil
}var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"func main() {
// ...

将其连接起来——CQRS facade

我们拥有构建 CQRS 应用程序的所有块。 现在,我们需要使用某种胶水将其连接起来。

我们将使用最简单的内存消息传递基础设施: GoChannel。

在后台,CQRS 正在使用 Watermill 的消息路由器。 如果您不熟悉它,并且想了解它的工作原理,则应查阅《入门指南》。 它还将向您展示如何使用一些标准的消息传递模式,例如 metrics,poison queue,throttling,correlation 以及每个消息驱动的应用程序使用的其他工具。那些内置于 Watermill 中。

让我们回到 CQRS。如您所知,CQRS 是由多个组件构建的,如命令(Command)或事件总线(Event buses)、处理程序(handlers)、处理器(processors)等。为了简化所有这些构建块的创建,我们创建了 cqrs.Facade,它创建所有这些。

完整源码:

  • github.com/ThreeDotsLabs/watermill/_examples/basic/5-cqrs-protobuf/main.go
// ...
func main() {logger := watermill.NewStdLogger(false, false)cqrsMarshaler := cqrs.ProtobufMarshaler{}// 您可以从此处使用任何 Pub/Sub 实现:https://watermill.io/docs/pub-sub-implementations/// 详细的 RabbitMQ 实现: https://watermill.io/docs/pub-sub-implementations/#rabbitmq-amqp// 命令将被发送到队列,因为它们需要被使用一次。commandsAMQPConfig := amqp.NewDurableQueueConfig(amqpAddress)commandsPublisher, err := amqp.NewPublisher(commandsAMQPConfig, logger)if err != nil {panic(err)}commandsSubscriber, err := amqp.NewSubscriber(commandsAMQPConfig, logger)if err != nil {panic(err)}// 事件将被发布到配置了 PubSu b的 Rabbit,因为它们可能被多个使用者使用。// (在这种情况下,BookingsFinancialReport 和 OrderBeerOnRoomBooked).eventsPublisher, err := amqp.NewPublisher(amqp.NewDurablePubSubConfig(amqpAddress, nil), logger)if err != nil {panic(err)}// CQRS 建立在消息路由器上。详细文档:https://watermill.io/docs/messages-router/router, err := message.NewRouter(message.RouterConfig{}, logger)if err != nil {panic(err)}// 简单的中间件,可以从事件或命令处理程序中 recover panics。// 您可以在文档中找到有关路由器中间件的更多信息:// https://watermill.io/docs/messages-router/#middleware//// 您可以在 message/router/middleware 中找到可用的中间件列表。router.AddMiddleware(middleware.Recoverer)// cqrs.Facade是命令和事件总线与处理器的 facade。// 您可以使用 facade,或者手动创建总线和处理器(您可以使用 cqrs.NewFacade 激发灵感)cqrsFacade, err := cqrs.NewFacade(cqrs.FacadeConfig{GenerateCommandsTopic: func(commandName string) string {// 我们正在使用RabbitMQ队列配置,因此我们需要按命令类型指定主题 topicreturn commandName},CommandHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.CommandHandler {return []cqrs.CommandHandler{BookRoomHandler{eb},OrderBeerHandler{eb},}},CommandsPublisher: commandsPublisher,CommandsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {// 我们可以重用订阅者(subscriber),因为所有命令都有各自的主题(topics)return commandsSubscriber, nil},GenerateEventsTopic: func(eventName string) string {// 因为我们使用的是PubSub RabbitMQ配置,所以我们可以对所有事件使用一个主题(topic)return "events"// 我们还可以按事件类型使用主题(topic)// return eventName},EventHandlers: func(cb *cqrs.CommandBus, eb *cqrs.EventBus) []cqrs.EventHandler {return []cqrs.EventHandler{OrderBeerOnRoomBooked{cb},NewBookingsFinancialReport(),}},EventsPublisher: eventsPublisher,EventsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {config := amqp.NewDurablePubSubConfig(amqpAddress,amqp.GenerateQueueNameTopicNameWithSuffix(handlerName),)return amqp.NewSubscriber(config, logger)},Router:                router,CommandEventMarshaler: cqrsMarshaler,Logger:                logger,})if err != nil {panic(err)}// 每秒发布 BookRoom 命令以模拟传入流量go publishCommands(cqrsFacade.CommandBus())// 处理器(processors)是基于路由器(router)的,所以当路由器启动时,处理器就会工作if err := router.Run(context.Background()); err != nil {panic(err)}
}
// ...

就这样。 我们有一个正在运行的 CQRS 应用程序。

我是为少。微信:uuhells123。公众号:黑客下午茶。

谢谢点赞支持

利用 Watermill 实现 Golang CQRS相关推荐

  1. 巧妙利用channel进行golang并发式爬虫

    上面一篇博客的代码是串行化,没有最大利用go语言的天生协程特性,如下代码使用channel管道加上goroutine实现高并发编程: package mainimport ("fmt&quo ...

  2. 服务器推送_初探 Watermill 构建 Golang 事件驱动程序,SSE 进行 HTTP 服务器推送

    使用 SSE(Server-Sent Events) 进行 HTTP 服务器推送 这个示例是一个类似 twitter 的 web 应用程序,使用 Server-Sent Events 来支持实时刷新. ...

  3. 如何利用docker 构建golang线上部署环境

    公司最近开发了一个项目是用golang 写的,现在要部署到线上环境去,又不想在服务器上装单独的golang,决定用docker 封装下,直接打到镜像里面,然后就直接在hub.docker.com上面搜 ...

  4. 利用redis实现golang的分布式锁

    go使用redis锁 基于Redis的SetNX方法,创建并使用redis锁 曾经在一便文档中,有一句话,引发的我的思考:如果公司内已有可以使用的ZooKeeper.etcd或者Redis集群,那么就 ...

  5. linux qml 环境,利用Qml与Golang打造Gui客户端(二)qamel环境安装

    由于种种原因,放弃了therecipe/qt,转向了更为轻量级的qamel,这个库的安装非常简单,跟随着以下步骤就能解决 安装 安装qamel非常简单,只需要go get -v github.com/ ...

  6. Linux下golang开发环境搭建

    对于golang开发来说,Windows下可以用vscode或者liteide都不错,但是Linux下的开发也就只有vim了,所以怎么搞笑的利用vim进行golang开发呢? 参考官方推荐的一个插件: ...

  7. Golang优雅之道

    借助一些设计模式.流式编程.函数编程的方法可以让我们的Golang代码更清晰优雅,本文中描述了在错误处理.可选配置.并发控制等方面的优化手段. 链式错误处理 很多人不喜欢Go的错误处理,需要写大量if ...

  8. 猜谜游戏、彩云词典爬虫、SOCKS5代理的 Go(Golang) 小实践,附带全代码解释

    猜谜游戏在编程语言实践都已经和 HelloWord 程序成为必不可少的新手实践环节,毕竟,它能够让我们基本熟悉 for 循环.变量定义.打印.if else 语句等等的使用,当我们基本熟悉该语言基础之 ...

  9. 前端竟然用Golang 动态生成图片?

    作者:阅文前端团队 原文:https://mp.weixin.qq.com/s/0dWfL3ChIceH6rQ8-Oh6pg 一.背景 在业务需求中,根据返回数据动态生成图片分享是很常见的场景.比如在 ...

最新文章

  1. 大型运输行业实战_day14_1_webserivce简单入门
  2. 实录分享 | 计算未来轻沙龙:对话系统研究进展(视频 + PPT)
  3. Hybris Administration console功能一览
  4. jmeter生成html报告修改,Jmeter生成html报告(示例代码)
  5. 框架less和sass
  6. 通信风口下,App 即将消亡?
  7. CentOS 6.2目录服务之LDAP(一)
  8. 从Android发展看Meego
  9. windows10操作系统开启以及关闭测试模式
  10. OpenDaylight VTN 项目指南
  11. JAVA编写一个三棱柱求体积,三棱柱以及多棱柱的实现
  12. java用socket解析16进制数据_浅析Java基于Socket的文件传输案例
  13. CodRED: A Cross-Document Relation Extraction Dataset for Acquiring Knowledge in the Wild
  14. 2022-2028全球与中国商用车辆HMI解决方案市场现状及未来发展趋势
  15. 微信小程序请求后台接口(完整版)
  16. python实现——文件操作(超详细)
  17. 团队合作难,归根到底是老板领导力不行
  18. 健身类小程序前后端源码
  19. VMware安装虚拟机
  20. CF 732F Tourist Reform——v-SCC+dfs

热门文章

  1. 如何完美卸载pads 9.5/vx
  2. 使用CSS使div块内容垂直居中的方法
  3. 2020年最新移动CRM免费下载
  4. SKG、倍轻松“亮红灯”,网红按摩仪难逃“过气命”?
  5. notes获取计算机名,在Lotus Notes数据库中获取最近读取的文档(Get recently read document in Lotus Notes Database)...
  6. 精选收集50个计算机热门视频教程免费下载
  7. 从入门到精通 网吧免费上网狙击战(转)
  8. 回来了,我的motherland
  9. 深度学习分类任务常用评估指标——总结(重点)
  10. 4天完成一个物联网项目