任何尝试实施完全符合ACID的系统的人都知道,您需要做很多事情。 您需要确保可以自由创建,修改和删除数据库实体而不会出错,在大多数情况下,解决方案将以性能为代价。 可以用来解决此问题的一种方法是根据一系列事件而不是可变状态来设计系统。 这通常称为事件来源。

在本文中,我将展示一个演示应用程序,该应用程序使用开源工具包Speedment快速启动并运行可扩展的基于事件的数据库应用程序。 示例的完整源代码在此处 。

什么是事件源?

在典型的关系数据库系统中,您将实体的状态存储为数据库中的一行。 状态更改时,应用程序使用UPDATE或DELETE语句修改行。 这种方法的问题在于,当要确保没有更改任何行以致使系统处于非法状态时,它将对数据库增加很多要求。 您不希望任何人提取比他们帐户中更多的钱,或者要竞标已经结束的拍卖。

在基于事件的系统中,我们对此采取了不同的方法。 无需将实体的状态存储在数据库中,而是存储导致该状态的一系列更改 。 事件一旦创建便是不可变的,这意味着您仅需实现两个操作CREATE和READ。 如果实体被更新或删除,则可以通过创建“更新”或“删除”事件来实现。

事件源系统可以轻松扩展以提高性能,因为任何节点都可以简单地下载事件日志并重播当前状态。 由于写入和查询由不同的机器处理,因此您还可以获得更好的性能。 这称为CQRS(命令查询职责隔离)。 正如您将在示例中看到的,使用Speedment工具包,我们可以在极短的时间内获得最终一致的实例化视图并开始运行。

可预订的桑拿

为了展示构建事件源系统的工作流程,我们将创建一个小型应用程序来处理住宅区中共享桑拿浴室的预订。 我们有多个租户有兴趣预订桑拿浴室,但我们需要确保害羞的租户永远不会意外预订它。 我们还希望在同一系统中支持多个桑拿浴室。

为了简化与数据库的通信,我们将使用Speedment工具箱 。 Speedment是一个Java工具,它使我们能够从数据库生成完整的域模型,并且还可以使用优化的Java 8流轻松查询数据库。 在Apache 2-license下可以使用Speedment ,在Github页面上有很多很好的例子说明了不同的用法。

步骤1:定义数据库架构

第一步是定义我们的(MySQL)数据库。 我们仅拥有一张称为“预订”的桌子,用于存储与预订桑拿浴室有关的事件。 请注意,预订是事件而不是实体。 如果我们要取消预订或对其进行更改,则必须将其他更改发布为新行。 我们不允许修改或删除已发布的行。

CREATE DATABASE `sauna`;CREATE TABLE `sauna`.`booking` (`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,`booking_id` BIGINT NOT NULL,`event_type` ENUM('CREATE', 'UPDATE', 'DELETE') NOT NULL,`tenant` INT NULL,`sauna` INT NULL,`booked_from` DATE NULL,`booked_to` DATE NULL,PRIMARY KEY (`id`)
);

“ id”列是一个递增的整数,每次将新事件发布到日志时都会自动分配。 “ booking_id”告诉我们我们指的是哪个预订。 如果两个事件共享相同的预订ID,则它们引用相同的实体。 我们还有一个名为“ event_type”的枚举,它描述了我们试图执行的操作。 之后是属于预订的信息。 如果列为NULL,则与任何先前值相比,我们将其视为未修改。

步骤2:使用加速生成代码

下一步是使用Speedment为项目生成代码。 只需创建一个新的maven项目并将以下代码添加到pom.xml文件即可。

pom.xml

<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><speedment.version>3.0.0-EA2</speedment.version><mysql.version>5.1.39</mysql.version>
</properties><build><plugins><plugin><groupId>com.speedment</groupId><artifactId>speedment-maven-plugin</artifactId><version>${speedment.version}</version><dependencies><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency></dependencies></plugin></plugins>
</build><dependencies><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>com.speedment</groupId><artifactId>runtime</artifactId><version>${speedment.version}</version><type>pom</type></dependency>
</dependencies>

如果生成项目,则IDE 中将出现一个名为speedment:tool的新maven目标。 运行它以启动Speedment用户界面。 在其中,连接到Sauna数据库并使用默认设置生成代码。 现在应在项目中填充源文件。

提示:如果对数据库进行了更改,则可以使用speedment:reload -goal下载新配置,并使用speedment:generate 重新生成源。 无需重新启动该工具!

步骤3:创建物化视图

物化视图是一个组件,该组件定期轮询数据库以查看是否已添加任何新行,如果已添加,则以正确的顺序下载并将它们合并到视图中。 由于轮询有时会花费很多时间,因此我们希望此过程在单独的线程中运行。 我们可以使用Java Timer和TimerTask来实现。

轮询数据库? 真? 好吧,需要考虑的重要一点是,只有服务器才能轮询数据库,而不会轮询客户端。 这给我们提供了很好的可伸缩性,因为我们可以让少数服务器轮询数据库,从而为成千上万的租户提供服务。 将此与常规系统进行比较,在常规系统中,每个客户端都会从服务器请求资源,然后服务器又会联系数据库。

BookingView.java

public final class BookingView {...public static BookingView create(BookingManager mgr) {final AtomicBoolean working = new AtomicBoolean(false);final AtomicLong last  = new AtomicLong();final AtomicLong total = new AtomicLong();final String table = mgr.getTableIdentifier().getTableName();final String field = Booking.ID.identifier().getColumnName();final Timer timer = new Timer();final BookingView view = new BookingView(timer);final TimerTask task = ...;timer.scheduleAtFixedRate(task, 0, UPDATE_EVERY);return view;}
}

计时器任务是匿名定义的,这就是轮询逻辑所在的位置。

final TimerTask task = new TimerTask() {@Overridepublic void run() {boolean first = true;// Make sure no previous task is already inside this block.if (working.compareAndSet(false, true)) {try {// Loop until no events was merged // (the database is up to date).while (true) {// Get a list of up to 25 events that has not yet // been merged into the materialized object view.final List added = unmodifiableList(mgr.stream().filter(Booking.ID.greaterThan(last.get())).sorted(Booking.ID.comparator()).limit(MAX_BATCH_SIZE).collect(toList()));if (added.isEmpty()) {if (!first) {System.out.format("%s: View is up to date. A total of " + "%d rows have been loaded.%n",System.identityHashCode(last),total.get());}break;} else {final Booking lastEntity = added.get(added.size() - 1);last.set(lastEntity.getId());added.forEach(view::accept);total.addAndGet(added.size());System.out.format("%s: Downloaded %d row(s) from %s. " + "Latest %s: %d.%n", System.identityHashCode(last),added.size(),table,field,Long.parseLong("" + last.get()));}first = false;}// Release this resource once we exit this block.} finally {working.set(false);}}}
};

有时,合并任务可能需要花费比计时器间隔更多的时间。 为避免此问题,我们使用AtomicBoolean进行检查并确保只能同时执行一个任务。 这类似于信号量,不同之处在于我们想要删除没有时间的任务而不是排队,因为我们确实不需要执行所有任务,因此只需一秒钟即可完成一个新任务。

构造函数和基本成员方法很容易实现。 我们将传递给类的计时器作为参数存储在构造函数中,以便在需要停止时可以取消该计时器。 我们还会存储一张地图,以将所有预订的当前视图保存在内存中。

private final static int MAX_BATCH_SIZE = 25;
private final static int UPDATE_EVERY   = 1_000; // Millisecondsprivate final Timer timer;
private final Map<Long, Booking> bookings;private BookingView(Timer timer) {this.timer    = requireNonNull(timer);this.bookings = new ConcurrentHashMap<>();
}public Stream<Booking> stream() {return bookings.values().stream();
}public void stop() {timer.cancel();
}

BookingView类的最后一个缺少的部分是合并过程中上面使用的accept()方法。 在这里考虑新事件并将其合并到视图中。

private boolean accept(Booking ev) {final String type = ev.getEventType();// If this was a creation eventswitch (type) {case "CREATE" :// Creation events must contain all information.if (!ev.getSauna().isPresent()||  !ev.getTenant().isPresent()||  !ev.getBookedFrom().isPresent()||  !ev.getBookedTo().isPresent()||  !checkIfAllowed(ev)) {return false;}// If something is already mapped to that key, refuse the // event.return bookings.putIfAbsent(ev.getBookingId(), ev) == null;case "UPDATE" :// Create a copy of the current statefinal Booking existing = bookings.get(ev.getBookingId());// If the specified key did not exist, refuse the event.if (existing != null) {final Booking proposed = new BookingImpl();proposed.setId(existing.getId());// Update non-null valuesproposed.setSauna(ev.getSauna().orElse(unwrap(existing.getSauna())));proposed.setTenant(ev.getTenant().orElse(unwrap(existing.getTenant())));proposed.setBookedFrom(ev.getBookedFrom().orElse(unwrap(existing.getBookedFrom())));proposed.setBookedTo(ev.getBookedTo().orElse(unwrap(existing.getBookedTo())));// Make sure these changes are allowed.if (checkIfAllowed(proposed)) {bookings.put(ev.getBookingId(), proposed);return true;}}return false;case "DELETE" :// Remove the event if it exists, else refuse the event.return bookings.remove(ev.getBookingId()) != null;default :System.out.format("Unexpected type '%s' was refused.%n", type);return false;}
}

在事件源系统中,规则在收到事件时不执行,但在实现时才执行。 基本上,任何人都可以在表的末尾插入新事件到系统中。 在这种方法中,我们选择丢弃不遵循规则设置的事件。

步骤4:示例用法

在此示例中,我们将使用标准的Speedment API将三个新的预订插入到数据库中,其中两个有效,第三个与先前的一个相交。 然后,我们将等待视图更新并打印出所有预订。

public static void main(String... params) {final SaunaApplication app = new SaunaApplicationBuilder().withPassword("password").build();final BookingManager bookings = app.getOrThrow(BookingManager.class);final SecureRandom rand = new SecureRandom();rand.setSeed(System.currentTimeMillis());// Insert three new bookings into the system.bookings.persist(new BookingImpl().setBookingId(rand.nextLong()).setEventType("CREATE").setSauna(1).setTenant(1).setBookedFrom(Date.valueOf(LocalDate.now().plus(3, DAYS))).setBookedTo(Date.valueOf(LocalDate.now().plus(5, DAYS))));bookings.persist(new BookingImpl().setBookingId(rand.nextLong()).setEventType("CREATE").setSauna(1).setTenant(2).setBookedFrom(Date.valueOf(LocalDate.now().plus(1, DAYS))).setBookedTo(Date.valueOf(LocalDate.now().plus(2, DAYS))));bookings.persist(new BookingImpl().setBookingId(rand.nextLong()).setEventType("CREATE").setSauna(1).setTenant(3).setBookedFrom(Date.valueOf(LocalDate.now().plus(2, DAYS))).setBookedTo(Date.valueOf(LocalDate.now().plus(7, DAYS))));final BookingView view = BookingView.create(bookings);// Wait until the view is up-to-date.try { Thread.sleep(5_000); }catch (final InterruptedException ex) {throw new RuntimeException(ex);}System.out.println("Current Bookings for Sauna 1:");final SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd");final Date now = Date.valueOf(LocalDate.now());view.stream().filter(Booking.SAUNA.equal(1)).filter(Booking.BOOKED_TO.greaterOrEqual(now)).sorted(Booking.BOOKED_FROM.comparator()).map(b -> String.format("Booked from %s to %s by Tenant %d.", dt.format(b.getBookedFrom().get()),dt.format(b.getBookedTo().get()),b.getTenant().getAsInt())).forEachOrdered(System.out::println);System.out.println("No more bookings!");view.stop();
}

如果运行它,将得到以下输出:

677772350: Downloaded 3 row(s) from booking. Latest id: 3.
677772350: View is up to date. A total of 3 rows have been loaded.
Current Bookings for Sauna 1:
Booked from 2016-10-11 to 2016-10-12 by Tenant 2.
Booked from 2016-10-13 to 2016-10-15 by Tenant 1.
No more bookings!

我的GitHub页面上提供了此演示应用程序的完整源代码。 在这里您还可以找到许多其他示例,这些示例说明了如何在各种情况下使用Speedment快速开发数据库应用程序。

摘要

在本文中,我们在数据库表上开发了一个物化视图,该视图可评估物化而不是插入时的事件。 这样就可以启动应用程序的多个实例,而不必担心对其进行同步,因为它们最终将保持一致。 然后,我们通过展示如何使用Speedment API查询实例化视图来生成当前预订列表来结束。

感谢您的阅读,请在Github页面上查看更多Speedment示例 !

翻译自: https://www.javacodegeeks.com/2016/10/event-sourcing-cqrs-practise.html

实践中的事件源和CQRS相关推荐

  1. 探索cqrs和事件源_实践中的事件源和CQRS

    探索cqrs和事件源 任何尝试实施完全符合ACID的系统的人都知道,您需要做很多事情. 您需要确保可以自由创建,修改和删除数据库实体而不会出错,并且在大多数情况下,解决方案将以性能为代价. 可以用来解 ...

  2. 探索cqrs和事件源_编写基于事件的CQRS读取模型

    探索cqrs和事件源 关于事件源和CQRS的讨论似乎通常集中在CQRS上下文中的整体系统架构或领域驱动设计的各种形式. 但是,尽管也有一些有趣的考虑,但读取模型经常被忽略. 在本文中,我们将介绍通过使 ...

  3. 编写基于事件的CQRS读取模型

    关于事件源和CQRS的讨论似乎通常集中在CQRS上下文中的整体系统架构或领域驱动设计的各种形式. 但是,尽管也有一些有趣的考虑,但读取模型经常被忽略. 在本文中,我们将展示一个通过使用事件流填充视图模 ...

  4. 高可用 Prometheus 架构实践中的踩坑集锦

    监控系统的历史悠久,是一个很成熟的方向,而 Prometheus 作为新生代的开源监控系统,慢慢成为了云原生体系的事实标准,也证明了其设计很受欢迎. 本文主要分享在 Prometheus 实践中遇到的 ...

  5. sessionlistener方法中获取session中存储的值报空指针异常_从Golang实践中得到的教训...

    当使用复杂的分布式系统时,可能会遇到并发处理的需求.我们知道golang的协程是处理并发的利器之一,加上Golang为静态类型和编译型使得其在企业中使用越来越广泛.Mode.net公司系统每天要处理实 ...

  6. 5种避免C#.NET中因事件造成内存泄漏的技术

    原文来自互联网,由长沙DotNET技术社区编译. 5种避免C#.NET中事件造成的内存泄漏的技术 C#(通常是.NET)中的事件注册是内存泄漏的最常见原因.至少从我的经验来看.实际上,我从事件中看到了 ...

  7. tmux 上滚_实践中的tmux:回滚缓冲区

    tmux 上滚 by Alexey Samoshkin 通过阿列克谢·萨莫什金(Alexey Samoshkin) 实践中的tmux:回滚缓冲区 (tmux in practice: the scro ...

  8. 机器学习与R语言(原书第2版)》一1.4 实践中的机器学习

    本节书摘来自华章出版社<机器学习与R语言(原书第2版)>一书中的第1章,第1.4节,美] 布雷特·兰茨(Brett Lantz) 著,李洪成 许金炜 李舰 译更多章节内容可以访问云栖社区& ...

  9. php循环套循环_PHP中的事件循环简介

    php循环套循环 PHP developers are always waiting for something. Sometimes we're waiting for requests to re ...

最新文章

  1. java移动端接口测试_移动端质量体系之性能测试(上)
  2. Asp.Net页面生命周期(多图)
  3. 聊聊 MySql 索引那些事儿
  4. SAP CRM系统里Opportunity预期销售金额和货币相关的自动转换
  5. supervisord安装使用简记
  6. 圣诞祝福网页_平安夜圣诞节适合发朋友圈的文案,快快收藏起来!
  7. MongoDB学习笔记lt;七gt;
  8. Spring Security 3多用户登录实现之二 多登录界面展示
  9. Tomcat startup.bat 后台运行,不再弹出 Dos 黑框
  10. plc tcp ip通讯怎么只能连一个客户端_Kepware V5如何实现与PLC的通讯
  11. 十大优秀 Windows开放源代码软件简介
  12. ORID方法在敏捷中的利用
  13. Sutherland-Hodgeman 多边形裁剪算法
  14. MYSQL之如何列转行
  15. 自动化立体库能力分析——堆垛机(单深单货位Case1,双循环)
  16. python attention机制_[深度应用]·Keras实现Self-Attention文本分类(机器如何读懂人心)...
  17. 富斯 fs-i6s 通过PPM连接高频头
  18. pytorch出现RuntimeError: CUDA out of memory的问题解决方法
  19. Kubernetes 核心组件 kubelet
  20. 考研计算机组成原理难度,考研计算机组成原理特点及复习攻略

热门文章

  1. Intellij IDEA 那些隐藏好用的小技巧
  2. 英语不会读怎么办?它来教你……
  3. Hibernate中使用Criteria查询及注解——(HibernateUtil)
  4. linux 安装mysql 8.0_Linux安装mysql 8.0的详细方法介绍(代码示例)
  5. python大神交流网站_学习Python必去的8个网站
  6. JavaWeb项目:简易小米商城系统
  7. 如何用css3实现简单旋转的风车
  8. Flex布局 让你的布局更完美
  9. java 示例_功能Java示例 第4部分–首选不变性
  10. 机器学习java_如何开始使用Java机器学习