探索cqrs和事件源

任何尝试实施完全符合ACID的系统的人都知道,您需要做很多事情。 您需要确保可以自由创建,修改和删除数据库实体而不会出错,并且在大多数情况下,解决方案将以性能为代价。 可以用来解决此问题的一种方法是根据一系列事件而不是可变状态来设计系统。 这通常称为事件源。

在本文中,我将展示一个演示应用程序,该应用程序使用开源工具包Speedment快速启动并运行可扩展的基于事件的数据库应用程序。 示例的完整源代码在此处 。

什么是事件源?

在典型的关系数据库系统中,您将实体的状态存储为数据库中的一行。 状态改变时,应用程序使用UPDATE或DELETE语句修改行。 这种方法的问题在于,当要确保没有更改任何行以致使系统处于非法状态时,它将对数据库增加很多要求。 您不希望任何人提取比他们帐户中更多的钱或对已经结束的拍卖出价。

在事件源系统中,我们对此采取了不同的方法。 无需将实体的状态存储在数据库中,而是存储导致该状态的一系列更改 。 事件一旦创建便是不可变的,这意味着您仅需实现两个操作CREATE和READ。 如果实体被更新或删除,则可以通过创建“更新”或“删除”事件来实现。

事件源系统可以轻松扩展规模以提高性能,因为任何节点都可以简单地下载事件日志并重播当前状态。 由于写入和查询由不同的机器处理,因此您还可以获得更好的性能。 这称为CQRS(命令查询职责隔离)。 正如您将在示例中看到的,使用Speedment工具包,我们可以在极短的时间内获得最终一致的实例化视图并开始运行。

可预订的桑拿

为了展示构建事件源系统的工作流程,我们将创建一个小型应用程序来处理住宅区中共享桑拿的预订。 我们有多个租户有兴趣预订桑拿房,但我们需要确保害羞的租户永远不会意外预订它。 我们还希望在同一系统中支持多个桑拿浴室。

为了简化与数据库的通信,我们将使用Speedment工具箱 。 Speedment是一个Java工具,它使我们能够从数据库生成完整的域模型,并且还可以使用优化的Java 8流轻松查询数据库。 在Apache 2-license下可以使用Speedment ,在Github页面上有很多很好的例子说明了不同的用法。

步骤1:定义数据库架构

第一步是定义我们的(MySQL)数据库。 我们只是有一张称为“预订”的表,用于存储与预订桑拿有关的事件。 请注意,预订是事件而不是实体。 如果我们要取消预订或对其进行更改,则必须将具有更改的其他事件发布为新行。 我们不允许修改或删除已发布的行。

CREATE DATABASE `sauna`;CREATE TABLE `sauna`.`booking` (`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,`booking_id` BIGINT NOT NULL,`event_type` ENUM('CREATE', 'UPDATE', 'DELETE') NOT NULL,`tenant` INT NULL,`sauna` INT NULL,`booked_from` DATE NULL,`booked_to` DATE NULL,PRIMARY KEY (`id`)
);

“ id”列是一个递增的整数,每次将新事件发布到日志时都会自动分配。 “ booking_id”告诉我们我们指的是哪个预订。 如果两个事件共享相同的预订ID,则它们引用相同的实体。 我们还有一个名为“ event_type”的枚举,它描述了我们试图执行的操作。 之后是属于预订的信息。 如果列为NULL,则与任何先前值相比,我们将其视为未修改的。

步骤2:使用加速生成代码

下一步是使用Speedment为项目生成代码。 只需创建一个新的maven项目并将以下代码添加到pom.xml文件即可。

pom.xml

<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><speedment.version>3.0.0-EA2</speedment.version><mysql.version>5.1.39</mysql.version>
</properties><build><plugins><plugin><groupId>com.speedment</groupId><artifactId>speedment-maven-plugin</artifactId><version>${speedment.version}</version><dependencies><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency></dependencies></plugin></plugins>
</build><dependencies><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>com.speedment</groupId><artifactId>runtime</artifactId><version>${speedment.version}</version><type>pom</type></dependency>
</dependencies>

如果生成项目,则IDE 中将出现一个新的maven目标,称为speedment:tool 。 运行它以启动Speedment用户界面。 在其中,连接到Sauna数据库并使用默认设置生成代码。 现在应在项目中填充源文件。

提示:如果对数据库进行了更改,则可以使用speedment:reload -goal下载新配置,并使用speedment:generate 重新生成源。 无需重新启动该工具!

步骤3:创建物化视图

物化视图是一个组件,该组件定期轮询数据库以查看是否已添加任何新行,如果有,则以正确的顺序下载并将它们合并到视图中。 由于轮询有时会花费很多时间,因此我们希望此过程在单独的线程中运行。 我们可以使用Java Timer和TimerTask来实现。

轮询数据库? 真? 好吧,要考虑的重要一点是,只有服务器才能轮询数据库,而不是客户端。 这给我们提供了很好的可伸缩性,因为我们可以让少数服务器轮询数据库,从而服务于成千上万的租户。 将此与常规系统进行比较,在常规系统中,每个客户端都会从服务器请求资源,然后服务器又与数据库进行联系。

BookingView.java

public final class BookingView {...public static BookingView create(BookingManager mgr) {final AtomicBoolean working = new AtomicBoolean(false);final AtomicLong last  = new AtomicLong();final AtomicLong total = new AtomicLong();final String table = mgr.getTableIdentifier().getTableName();final String field = Booking.ID.identifier().getColumnName();final Timer timer = new Timer();final BookingView view = new BookingView(timer);final TimerTask task = ...;timer.scheduleAtFixedRate(task, 0, UPDATE_EVERY);return view;}
}

计时器任务是匿名定义的,这就是轮询逻辑所在的位置。

final TimerTask task = new TimerTask() {@Overridepublic void run() {boolean first = true;// Make sure no previous task is already inside this block.if (working.compareAndSet(false, true)) {try {// Loop until no events was merged // (the database is up to date).while (true) {// Get a list of up to 25 events that has not yet // been merged into the materialized object view.final List added = unmodifiableList(mgr.stream().filter(Booking.ID.greaterThan(last.get())).sorted(Booking.ID.comparator()).limit(MAX_BATCH_SIZE).collect(toList()));if (added.isEmpty()) {if (!first) {System.out.format("%s: View is up to date. A total of " + "%d rows have been loaded.%n",System.identityHashCode(last),total.get());}break;} else {final Booking lastEntity = added.get(added.size() - 1);last.set(lastEntity.getId());added.forEach(view::accept);total.addAndGet(added.size());System.out.format("%s: Downloaded %d row(s) from %s. " + "Latest %s: %d.%n", System.identityHashCode(last),added.size(),table,field,Long.parseLong("" + last.get()));}first = false;}// Release this resource once we exit this block.} finally {working.set(false);}}}
};

有时,合并任务所花费的时间可能会超过计时器的时间间隔。 为了避免这引起问题,我们使用AtomicBoolean进行检查并确保只能同时执行一个任务。 这类似于信号量,不同之处在于我们希望删除没有时间的任务而不是排队,因为我们实际上不需要执行所有任务,因此只需一秒钟即可完成一个新任务。

构造函数和基本成员方法相当容易实现。 我们将传递给类的计时器作为参数存储在构造函数中,以便在需要停止时可以取消该计时器。 我们还存储了一张地图,将所有预订的当前视图保存在内存中。

private final static int MAX_BATCH_SIZE = 25;
private final static int UPDATE_EVERY   = 1_000; // Millisecondsprivate final Timer timer;
private final Map<Long, Booking> bookings;private BookingView(Timer timer) {this.timer    = requireNonNull(timer);this.bookings = new ConcurrentHashMap<>();
}public Stream<Booking> stream() {return bookings.values().stream();
}public void stop() {timer.cancel();
}

BookingView类的最后一个缺失部分是合并过程中上面使用的accept()方法。 在这里考虑新事件并将其合并到视图中。

private boolean accept(Booking ev) {final String type = ev.getEventType();// If this was a creation eventswitch (type) {case "CREATE" :// Creation events must contain all information.if (!ev.getSauna().isPresent()||  !ev.getTenant().isPresent()||  !ev.getBookedFrom().isPresent()||  !ev.getBookedTo().isPresent()||  !checkIfAllowed(ev)) {return false;}// If something is already mapped to that key, refuse the // event.return bookings.putIfAbsent(ev.getBookingId(), ev) == null;case "UPDATE" :// Create a copy of the current statefinal Booking existing = bookings.get(ev.getBookingId());// If the specified key did not exist, refuse the event.if (existing != null) {final Booking proposed = new BookingImpl();proposed.setId(existing.getId());// Update non-null valuesproposed.setSauna(ev.getSauna().orElse(unwrap(existing.getSauna())));proposed.setTenant(ev.getTenant().orElse(unwrap(existing.getTenant())));proposed.setBookedFrom(ev.getBookedFrom().orElse(unwrap(existing.getBookedFrom())));proposed.setBookedTo(ev.getBookedTo().orElse(unwrap(existing.getBookedTo())));// Make sure these changes are allowed.if (checkIfAllowed(proposed)) {bookings.put(ev.getBookingId(), proposed);return true;}}return false;case "DELETE" :// Remove the event if it exists, else refuse the event.return bookings.remove(ev.getBookingId()) != null;default :System.out.format("Unexpected type '%s' was refused.%n", type);return false;}
}

在事件源系统中,规则在收到事件时不执行,但在实现时才执行。 基本上,任何人都可以在表的末尾插入新事件到系统中。 在这种方法中,我们选择丢弃不遵循规则设置的事件。

步骤4:用法示例

在此示例中,我们将使用标准的Speedment API将三个新的预订插入到数据库中,其中两个有效,而第三个与先前的一个相交。 然后,我们将等待视图更新并打印出所有预订。

public static void main(String... params) {final SaunaApplication app = new SaunaApplicationBuilder().withPassword("password").build();final BookingManager bookings = app.getOrThrow(BookingManager.class);final SecureRandom rand = new SecureRandom();rand.setSeed(System.currentTimeMillis());// Insert three new bookings into the system.bookings.persist(new BookingImpl().setBookingId(rand.nextLong()).setEventType("CREATE").setSauna(1).setTenant(1).setBookedFrom(Date.valueOf(LocalDate.now().plus(3, DAYS))).setBookedTo(Date.valueOf(LocalDate.now().plus(5, DAYS))));bookings.persist(new BookingImpl().setBookingId(rand.nextLong()).setEventType("CREATE").setSauna(1).setTenant(2).setBookedFrom(Date.valueOf(LocalDate.now().plus(1, DAYS))).setBookedTo(Date.valueOf(LocalDate.now().plus(2, DAYS))));bookings.persist(new BookingImpl().setBookingId(rand.nextLong()).setEventType("CREATE").setSauna(1).setTenant(3).setBookedFrom(Date.valueOf(LocalDate.now().plus(2, DAYS))).setBookedTo(Date.valueOf(LocalDate.now().plus(7, DAYS))));final BookingView view = BookingView.create(bookings);// Wait until the view is up-to-date.try { Thread.sleep(5_000); }catch (final InterruptedException ex) {throw new RuntimeException(ex);}System.out.println("Current Bookings for Sauna 1:");final SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd");final Date now = Date.valueOf(LocalDate.now());view.stream().filter(Booking.SAUNA.equal(1)).filter(Booking.BOOKED_TO.greaterOrEqual(now)).sorted(Booking.BOOKED_FROM.comparator()).map(b -> String.format("Booked from %s to %s by Tenant %d.", dt.format(b.getBookedFrom().get()),dt.format(b.getBookedTo().get()),b.getTenant().getAsInt())).forEachOrdered(System.out::println);System.out.println("No more bookings!");view.stop();
}

如果运行它,将得到以下输出:

677772350: Downloaded 3 row(s) from booking. Latest id: 3.
677772350: View is up to date. A total of 3 rows have been loaded.
Current Bookings for Sauna 1:
Booked from 2016-10-11 to 2016-10-12 by Tenant 2.
Booked from 2016-10-13 to 2016-10-15 by Tenant 1.
No more bookings!

我的GitHub页面上提供了此演示应用程序的完整源代码。 在这里您还可以找到许多其他示例,这些示例说明了如何在各种情况下使用Speedment快速开发数据库应用程序。

摘要

在本文中,我们在数据库表上开发了一个物化视图,该视图可评估物化而不是插入时的事件。 这样就可以启动应用程序的多个实例,而不必担心对其进行同步,因为它们最终将保持一致。 然后,我们通过展示如何使用Speedment API查询实例化视图以生成当前预订列表来结束。

感谢您的阅读,请在Github页面上查看更多Speedment示例 !

翻译自: https://www.javacodegeeks.com/2016/10/event-sourcing-cqrs-practise.html

探索cqrs和事件源

探索cqrs和事件源_实践中的事件源和CQRS相关推荐

  1. 实践中的事件源和CQRS

    任何尝试实施完全符合ACID的系统的人都知道,您需要做很多事情. 您需要确保可以自由创建,修改和删除数据库实体而不会出错,在大多数情况下,解决方案将以性能为代价. 可以用来解决此问题的一种方法是根据一 ...

  2. 探索cqrs和事件源_编写基于事件的CQRS读取模型

    探索cqrs和事件源 关于事件源和CQRS的讨论似乎通常集中在CQRS上下文中的整体系统架构或领域驱动设计的各种形式. 但是,尽管也有一些有趣的考虑,但读取模型经常被忽略. 在本文中,我们将介绍通过使 ...

  3. 探索cqrs和事件源_假人的CQRS和事件源

    探索cqrs和事件源 CQRS(命令和查询责任隔离)和事件源不是什么新概念. 除了NoSql,Functional Programming和Microservices之外,这些复兴概念由于能够应对现代 ...

  4. ios开发 mvp实践_实践中开发人员的工作流程-我们如何在30天内建立​​MVP

    ios开发 mvp实践 by Léna Faure 莱娜·福雷(LénaFaure) 实践中开发人员的工作流程-我们如何在30天内建立​​MVP (The developer's workflow i ...

  5. mysql not in优化_实践中如何优化MySQL(收藏)

    SQL语句的优化: 1.尽量避免使用子查询 3.用IN来替换OR 4.LIKE前缀%号.双百分号._下划线查询非索引列或*无法使用到索引,如果查询的是索引列则可以 5.读取适当的记录LIMIT M,N ...

  6. tmux系统剪切板_实践中的tmux:与系统剪贴板集成

    tmux系统剪切板 by Alexey Samoshkin 通过阿列克谢·萨莫什金(Alexey Samoshkin) 在实践中使用tmux:与系统剪贴板集成 (tmux in practice: i ...

  7. 工厂模式 构建者模式_实践中的构建者模式

    工厂模式 构建者模式 我将不深入讨论该模式,因为已经有大量的文章和书籍对此进行了详细的解释. 相反,我将告诉您为什么以及何时应该考虑使用它. 但是,值得一提的是,这种模式与< 四人帮>一书 ...

  8. tmux 上滚_实践中的tmux:回滚缓冲区

    tmux 上滚 by Alexey Samoshkin 通过阿列克谢·萨莫什金(Alexey Samoshkin) 实践中的tmux:回滚缓冲区 (tmux in practice: the scro ...

  9. pygame系列_游戏中的事件

    先看一下我做的demo: 当玩家按下键盘上的:上,下,左,右键的时候,后台会打印出玩家所按键的数字值,而图形会随之移动 这是客观上面存在的现象. 那么啥是事件呢? 你叫我做出定义,我不知道,我只能举个 ...

最新文章

  1. vue-threeJS数据驱动的三维图形可视化
  2. 1. 训练集、开发集、测试集(Train/Dev/Test sets)
  3. C# “不支持给定路径的格式”异常处理
  4. HBase在淘宝的应用和优化小结
  5. GPS nmealib学习笔记
  6. python3精要(35)-wxPython(1)-简介与开源协议
  7. redis种类型对应java类型_Redis的五种基本数据类型介绍
  8. CentOs7.2编译安装Nginx服务器
  9. 深入研究微服务架构——第二部分
  10. 索尼推出 PlayStation 漏洞奖励计划,最严重漏洞5万美元起步
  11. 图像超分工具,在线工具
  12. delphi pid判断进程结束_有两个这样的进程:僵尸进程amp;孤儿进程,蓝瘦香菇
  13. MonoCSharp Evaluator Extension
  14. 【金融科技前沿】【长文】金融监管、监管科技以及银行业监管报送概述
  15. Laravel框架教程 入门篇(一)
  16. 数学建模之传染病SIR模型(新冠真实数据)
  17. UWP开发入门(八)——聊天窗口和ItemTemplateSelector
  18. android ndk 怎样调用第三方的so库文件。
  19. 浅谈人脸识别在公共安全领域的应用
  20. 第三章 本地锁和分布式锁的区别

热门文章

  1. YbtOJ#643-机器决斗【贪心,李超树】
  2. P4630-[APIO2018]Duathlon铁人两项【圆方树】
  3. 【bfs】WZK旅游(jzoj 1996)
  4. SpringBoot集成Flowable
  5. 汇编语言(三十二)之读写文件
  6. 两年摸爬滚打 Spring Boot,总结了这 16 条最佳实践
  7. 一道非常棘手的 Java 面试题:i++ 是线程安全的吗
  8. 【深入理解JVM】:类加载器与双亲委派模型
  9. Oracle入门(十四.15)之捕获Oracle服务器异常
  10. 坑爹的日志无法按天切割问题