第 6 章 事件溯源与 CQRS

在本章,我们来了解一下随着云平台一同出现的设计模式

我们先探讨事件溯源和命令查询职责分离(CQRS)背后的动机与哲学

事件溯源简介

事实由事件溯源而来

我们大脑就是一种事件溯源系统,接收感官多种形式刺激,大脑负责对这些刺激进行合适排序,大约每隔几百毫秒,对刺激构成的流进行运算,而运算的结果,就是我们所说的事实

事件溯源的定义

传统应用中,状态由一系列零散的数据所管理,如果客户端向我们发送 PUT 或 POST 请求,状态就会改变

这种方式很好地给出了系统当前状态,却不能指示在当前状态之前,系统是如何变化的

事件溯源可以解决这个问题,因为它把状态管理的职责与接收导致状态变更的刺激的职责区分开来

基于事件溯源的系统需要满足一系列要求

  • 有序:有序事件流

  • 幂等:等价多个有序事件流的操作结果相同

  • 独立:不依赖外部信息

  • 过去式:事件发生在过去

流行的区块链技术的基础就是发生在特定私有资源上的安全、可信的事件序列

拥抱最终一致性

一种我们每天都在用的最终一致性的应用,就是社区网络应用

有时你从一个设备发出的评论要花几分钟才能展示在朋友的浏览器或者其他设备上

这是因为,应用的架构人员做了妥协:通过放弃同步操作的即时一致性,在可接受的范围内增加一定的反馈延迟,就能让应用支持巨大的规模与流量

CQRS 模式

如果把我们讨论的模式直接套用到系统中,很快会发现系统必须对输入命令和查询加以区分,这也被称为命令查询职责分离(CQRS)

我们用一个例子来说明这种模式的实际应用

租户通过一个门户网站查看用电情况,每当用户刷新门户页面时,就调用某种数据服务并请求,汇总一段时间内所有度量事件

但这种对于云规模的现代软件开发来说是不可接受的,如果将计算职责推卸给数据库,很快会造成数据库瓶颈

掌握了大多数客户的使用模式,让我们能够利用事件溯源来构建一个合理的 CQRS 实现。

事件处理器每次收到新事件时重新计算已缓存的度量总和

利用这种机制,在查询时,门户上的用户所期望的结果已经存在于数据库或者缓存中

不需要复制的计算,也没有临时的聚合与繁杂的汇总,只需要一个简单的查询

事件溯源于 CQRS 实战--附件的团队成员

接下来要开发的新版实例中,我们将检测成员彼此相距一个较小距离的时刻

系统将支持对这些接近的结果予以响应

例如我们可能希望向附近的团队成员的移动设备发送推送通知,以提醒他们可以约见对方

为了实现这一功能,我们把系统职责划分为以下四个组件:

  • 位置报送服务(命令)

  • 事件处理器(对事件进行溯源)

  • 事实服务(查询)

  • 位置接近监控器(对事件进行溯源)

位置报送服务

收到新报送的位置后,执行下列操作:

  • 验证上报数据

  • 将命令转换为事件

  • 生成事件,并用消息队列发送出去

GitHub 链接:https://github.com/microservices-aspnetcore/es-locationreporter

创建位置报送控制器

using System;
using Microsoft.AspNetCore.Mvc;
using StatlerWaldorfCorp.LocationReporter.Events;
using StatlerWaldorfCorp.LocationReporter.Models;
using StatlerWaldorfCorp.LocationReporter.Services;namespace StatlerWaldorfCorp.LocationReporter.Controllers
{[Route("/api/members/{memberId}/locationreports")]public class LocationReportsController : Controller{private ICommandEventConverter converter;private IEventEmitter eventEmitter;private ITeamServiceClient teamServiceClient;public LocationReportsController(ICommandEventConverter converter,IEventEmitter eventEmitter,ITeamServiceClient teamServiceClient) {this.converter = converter;this.eventEmitter = eventEmitter;this.teamServiceClient = teamServiceClient;}[HttpPost]public ActionResult PostLocationReport(Guid memberId, [FromBody]LocationReport locationReport){MemberLocationRecordedEvent locationRecordedEvent = converter.CommandToEvent(locationReport);locationRecordedEvent.TeamID = teamServiceClient.GetTeamForMember(locationReport.MemberID);eventEmitter.EmitLocationRecordedEvent(locationRecordedEvent);return this.Created($"/api/members/{memberId}/locationreports/{locationReport.ReportID}", locationReport);}}
}

创建 AMQP 事件生成器

using System;
using System.Linq;
using System.Text;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using StatlerWaldorfCorp.LocationReporter.Models;namespace StatlerWaldorfCorp.LocationReporter.Events
{public class AMQPEventEmitter : IEventEmitter{private readonly ILogger logger;private AMQPOptions rabbitOptions;private ConnectionFactory connectionFactory;public AMQPEventEmitter(ILogger<AMQPEventEmitter> logger,IOptions<AMQPOptions> amqpOptions){this.logger = logger;this.rabbitOptions = amqpOptions.Value;connectionFactory = new ConnectionFactory();connectionFactory.UserName = rabbitOptions.Username;connectionFactory.Password = rabbitOptions.Password;connectionFactory.VirtualHost = rabbitOptions.VirtualHost;connectionFactory.HostName = rabbitOptions.HostName;connectionFactory.Uri = rabbitOptions.Uri;logger.LogInformation("AMQP Event Emitter configured with URI {0}", rabbitOptions.Uri);}public const string QUEUE_LOCATIONRECORDED = "memberlocationrecorded";public void EmitLocationRecordedEvent(MemberLocationRecordedEvent locationRecordedEvent){using (IConnection conn = connectionFactory.CreateConnection()) {using (IModel channel = conn.CreateModel()) {channel.QueueDeclare(queue: QUEUE_LOCATIONRECORDED,durable: false,exclusive: false,autoDelete: false,arguments: null);string jsonPayload = locationRecordedEvent.toJson();var body = Encoding.UTF8.GetBytes(jsonPayload);channel.BasicPublish(exchange: "",routingKey: QUEUE_LOCATIONRECORDED,basicProperties: null,body: body);}}}}
}

配置并启动服务

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using System;
using Microsoft.Extensions.Logging;
using System.Linq;
using StatlerWaldorfCorp.LocationReporter.Models;
using StatlerWaldorfCorp.LocationReporter.Events;
using StatlerWaldorfCorp.LocationReporter.Services;namespace StatlerWaldorfCorp.LocationReporter
{public class Startup{public Startup(IHostingEnvironment env, ILoggerFactory loggerFactory){loggerFactory.AddConsole();loggerFactory.AddDebug();var builder = new ConfigurationBuilder().SetBasePath(env.ContentRootPath).AddJsonFile("appsettings.json", optional: false, reloadOnChange: false).AddEnvironmentVariables();Configuration = builder.Build();}public IConfigurationRoot Configuration { get; }public void ConfigureServices(IServiceCollection services){services.AddMvc();services.AddOptions();services.Configure<AMQPOptions>(Configuration.GetSection("amqp"));services.Configure<TeamServiceOptions>(Configuration.GetSection("teamservice"));services.AddSingleton(typeof(IEventEmitter), typeof(AMQPEventEmitter));services.AddSingleton(typeof(ICommandEventConverter), typeof(CommandEventConverter));services.AddSingleton(typeof(ITeamServiceClient), typeof(HttpTeamServiceClient));}public void Configure(IApplicationBuilder app,IHostingEnvironment env,ILoggerFactory loggerFactory,ITeamServiceClient teamServiceClient,IEventEmitter eventEmitter){// Asked for instances of singletons during Startup// to force initialization early.app.UseMvc();}}
}

对 Configure 的两次调用让配置子系统把分别从 amqp 和 teamservice 节加载的配置选项以依赖注入的方式提供出来

这些配置可以由 appsettings.json 文件提供,也可以用环境变量覆盖

{"amqp": {"username": "guest","password": "guest","hostname": "localhost","uri": "amqp://localhost:5672/","virtualhost": "/"},"teamservice": {"url": "http://localhost:5001"}
}

消费团队服务

using System;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using Newtonsoft.Json;
using StatlerWaldorfCorp.LocationReporter.Models;namespace StatlerWaldorfCorp.LocationReporter.Services
{public class HttpTeamServiceClient : ITeamServiceClient{private readonly ILogger logger;private HttpClient httpClient;public HttpTeamServiceClient(IOptions<TeamServiceOptions> serviceOptions,ILogger<HttpTeamServiceClient> logger){this.logger = logger;var url = serviceOptions.Value.Url;logger.LogInformation("Team Service HTTP client using URL {0}", url);httpClient = new HttpClient();httpClient.BaseAddress = new Uri(url);}public Guid GetTeamForMember(Guid memberId){httpClient.DefaultRequestHeaders.Accept.Clear();httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));HttpResponseMessage response = httpClient.GetAsync(String.Format("/members/{0}/team", memberId)).Result;TeamIDResponse teamIdResponse;if (response.IsSuccessStatusCode) {string json = response.Content.ReadAsStringAsync().Result;teamIdResponse = JsonConvert.DeserializeObject<TeamIDResponse>(json);return teamIdResponse.TeamID;}else {return Guid.Empty;}}}public class TeamIDResponse{public Guid TeamID { get; set; }}
}

这个例子中,我们使用 .Result 属性在等待异步方法响应期间强行阻塞了线程

在生产级质量的代码里,很可能对此进行重构,确保在服务边界之内整个调用链都传递异步结果

运行位置报送服务

RabbitMQ 已经启动运行,默认的配置也指向了本地的 RabbitMQ 实例

此时可以使用以下方式启动位置报送服务

(确保位于 src/StatlerWaldorfCorp.LocationReporter 子目录中)

$ dotnet restore
$ dotnet build
$ dotnet run --server.urls=http://0.0.0.0:9090

服务运行后,只要向服务提交请求,就可以体验其功能了

$ curl -X POST -d \
'{"reportID":"...", \
"origin":"...", "latitude":10, "longtitude":20, \
"memberID":"..."}' \
http://...le2 \
/locationreports

提交完成后,应该能从服务获得一个 HTTP 201 响应

事件处理器

它的职责是消费来自流的事件,并执行合适的操作

为确保代码整洁、可测试,我们把事件处理的职责划分为如下部分:

  • 订阅队列并从事件流中获取新的消息

  • 将消息写入事件存储

  • 处理事件流(检测附近的队友)

  • 作为流的处理结果,生成新的消息并发送到队列

  • 作为流的处理结果,向事实服务的服务器 / 缓存提交状态变更情况

GitHub 链接:https://github.com/microservices-aspnetcore/es-eventprocessor

检测附近队友的基于 GPS 工具类的检测器

using System.Collections.Generic;
using StatlerWaldorfCorp.EventProcessor.Location;
using System.Linq;
using System;namespace StatlerWaldorfCorp.EventProcessor.Events
{public class ProximityDetector{/** This method assumes that the memberLocations collection only* applies to members applicable for proximity detection. In other words,* non-team-mates must be filtered out before using this method.* distance threshold is in Kilometers.*/public ICollection<ProximityDetectedEvent> DetectProximityEvents(MemberLocationRecordedEvent memberLocationEvent,ICollection<MemberLocation> memberLocations,double distanceThreshold){GpsUtility gpsUtility = new GpsUtility();GpsCoordinate sourceCoordinate = new GpsCoordinate() {Latitude = memberLocationEvent.Latitude,Longitude = memberLocationEvent.Longitude};return memberLocations.Where(ml => ml.MemberID != memberLocationEvent.MemberID &&gpsUtility.DistanceBetweenPoints(sourceCoordinate, ml.Location) < distanceThreshold).Select( ml => {return new ProximityDetectedEvent() {SourceMemberID = memberLocationEvent.MemberID,TargetMemberID = ml.MemberID,TeamID = memberLocationEvent.TeamID,DetectionTime = DateTime.UtcNow.Ticks,SourceMemberLocation = sourceCoordinate,TargetMemberLocation = ml.Location,MemberDistance = gpsUtility.DistanceBetweenPoints(sourceCoordinate, ml.Location)};}).ToList();}}
}

接着我们就可以用这个方法的结果来产生对应的额外效果,例如可能需要发出一个 ProximityDetectorEvent 事件,并将事件写入事件存储

作为主体的事件处理器代码

using System;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
using StatlerWaldorfCorp.EventProcessor.Location;
using StatlerWaldorfCorp.EventProcessor.Queues;namespace StatlerWaldorfCorp.EventProcessor.Events
{public class MemberLocationEventProcessor : IEventProcessor{private ILogger logger;private IEventSubscriber subscriber;private IEventEmitter eventEmitter;private ProximityDetector proximityDetector;private ILocationCache locationCache;public MemberLocationEventProcessor(ILogger<MemberLocationEventProcessor> logger,IEventSubscriber eventSubscriber,IEventEmitter eventEmitter,ILocationCache locationCache){this.logger = logger;this.subscriber = eventSubscriber;this.eventEmitter = eventEmitter;this.proximityDetector = new ProximityDetector();this.locationCache = locationCache;this.subscriber.MemberLocationRecordedEventReceived += (mlre) => {var memberLocations = locationCache.GetMemberLocations(mlre.TeamID);ICollection<ProximityDetectedEvent> proximityEvents =proximityDetector.DetectProximityEvents(mlre, memberLocations, 30.0f);foreach (var proximityEvent in proximityEvents) {eventEmitter.EmitProximityDetectedEvent(proximityEvent);}locationCache.Put(mlre.TeamID, new MemberLocation { MemberID = mlre.MemberID, Location = new GpsCoordinate {Latitude = mlre.Latitude, Longitude = mlre.Longitude} });};}public void Start(){this.subscriber.Subscribe();}public void Stop(){this.subscriber.Unsubscribe();}}
}

事件处理服务唯一的额外职责是需要将收到的每个事件都写入事件存储

这样做到原因有很多,包括向其他服务提供可供搜索的历史记录

如果缓存崩溃、数据丢失、事件存储也可用于重建事实缓存

请记住,缓存在架构里仅提供便利性,我们不应该在缓存中存储任何无法从其他位置重建的数据

我们要给服务里每一个团队创建一个 Redis 哈希(hash)

在哈希中,把团队成员的位置经序列化得到的 JSON 正文存储为字段(团队成员的 ID 用作键)

这样就能轻松地并发更新多个团队成员地位置而不会覆盖数据,同时也很容易查询给定的任意团队的位置列表,因为团队就是一个个哈希

事实服务

事实服务负责维护每个团队成员的位置,不过这些位置只代表最近从一些应用那里收到的位置

关于事实服务的这类服务,有两条重要的提醒需要记住:

  • 事实服务并不是事件存储

  • 事实服务是不可依赖服务

位置接近监控器

位置接近监控器的代码包括

  • 基本的微服务结构

  • 一个队列消费端,订阅 ProximityDetectedEvent 事件到达的消息

  • 调用一些第三方或云上的服务来发送推动通知

运行示例项目

下面列出运行本章示例的依赖项:

  • RabbitMQ 服务器

  • Redis 服务器

所有依赖项都启动运行后,可从 GitHub 拉取 es-locationreporter 和 es-eventprocessor 两个服务的代码

此外需要一份 teamservice 服务

请确保获取的是 master 分支,因为在测试期间只需要用到内存存储

要启动团队服务,在命令行中转到 src/StatlerWaldorfCorp.TeamService 目录并运行以下命令

$ dotnet run --server.urls=http://0.0.0.:5001

要启动位置报送服务,在命令行中转到 src/StatlerWaldorfCorp.LocationReporter 目录下并运行以下命令

$ dotnet run --server.urls=http://0.0.0:5002

启动事件处理器(从 src/StatlerWaldorfCorp.EventProcessor 目录运行)

$ dotnet run --server.urls=http://0.0.0.:5003

可用下列步骤端到端地检验整个事件溯源/CQRS系统:

  • (1)向 http://localhost:5001/teams 发送一个 POST 请求,创建一个新团队

  • (2)向 http://localhost:5001/teams/

    /members 发送一个 POST 请求,往团队中添加一个成员

  • (3)向 http://localhost:5002/api/members/

    /locationreports 发送一个 POST 请求,报送团队成员位置

  • (4)观察由报送的位置转换而成、被放到对应队列中的 MemberLocationReportedEvent 事件

  • (5)再重复几次第 3 步,添加一些相距较远的位置,确保不会触发并被检测到位置接近事件

  • (6)重复第 2 步,往第一名测试成员所在团队添加一名新成员

  • (7)为第二名成员再次重复第 3 步,添加一个于第一名成员最近的位置相距几公里以内的位置

  • (8)现在应该能够在 proximitydetected 队列中看到一条新消息

  • (9)可用直接查询 Redis 缓存,也可以利用事实服务来查看各团队成员最新的位置状态

手动操作几次后,大多数团队会花些时间把这一过程自动化

借助 docker compose 之类的工具,或者创建 Kubernetes 部署,或者其他容器编排环境,可自动将所有服务部署到集成测试环境

接着用脚本发送 REST 请求

待测试运行完成后,断言出现了正确的接近检测的次数,值也是正确的

《ASP.NET Core 微服务实战》-- 读书笔记(第6章)相关推荐

  1. 《ASP.NET Core 微服务实战》译者序

    最近,我将<ASP.NET Core 微服务实战>一书由英文翻译为中文.这本书是由清华大学出版社引进的,目前还处于最后的排版校对过程中,现将该书的译者序发表于此. 以下为译者译全文: &q ...

  2. 《ASP.NET Core 微服务实战》送书结果公告

    如何构建基于.NET Core和云环境下的微服务技术体系?的送书抽奖结果已经出来了: 当前只有一位同学填写了地址.其他几位同学抓紧填写,3/9 日还没有完成填写将作废,奖品可是热门的<ASP.N ...

  3. 《ASP.NET Core 微服务实战》-- 读书笔记(第10章)

    第 10 章 应用和微服务安全 云应用意味着应用运行所在的基础设施无法掌控,因此安全不能再等到事后再考虑,也不能只是检查清单上毫无意义的复选框 由于安全与云原生应用密切相关,本章将讨论安全话题,并用示 ...

  4. 《ASP.NET Core 微服务实战》-- 读书笔记(第7章)

    第 7 章 开发 ASP.NET Core Web 应用 ASP.NET Core 基础 在本章,我们将从一个命令行应用开始,并且在不借助任何模板,脚手架和向导的情况下,最终得到一个功能完整的 Web ...

  5. 《ASP.NET Core 微服务实战》-- 读书笔记(第3章)

    第 3 章 使用 ASP.NET Core 开发微服务 微服务定义 微服务是一个支持特定业务场景的独立部署单元.它借助语义化版本管理.定义良好的 API 与其他后端服务交互.它的天然特点就是严格遵守单 ...

  6. 《ASP.NET Core 微服务实战》-- 读书笔记(第1章 、第2章)

    译者序 微服务设计方法清晰定义了各个开发团队的业务边界,微服务框架以不同方式实现了服务之间的协作与集成. .NET Core 作为全新的 .NET 技术,它不仅完全开源.跨平台,更面向云原生开发进行了 ...

  7. 《ASP.NET Core 微服务实战》-- 读书笔记(第9章)

    第 9 章 微服务系统的配置 微服务系统中的配置需要关注更多其他方面的因素,包括: 配置值的安全读写 值变更的审计能力 配置信息源本身的韧性和可靠性 少量的环境变量难以承载大型.复杂的配置信息 应用要 ...

  8. 《ASP.NET Core 微服务实战》-- 读书笔记(第12章)

    第 12 章 设计汇总 微服务开发并不是要学习 C#.Java 或者 Go 编程--而是要学习如何开发应用以适应并充分利用弹性伸缩环境的优势,它们对托管环境没有偏好,并能瞬间启停 换句话说,我们要学习 ...

  9. 《ASP.NET Core 微服务实战》-- 读书笔记(第11章)

    第 11 章 开发实时应用和服务 在本章,我们将讨论"实时"的准确含义,以及在大部分消费者看来应该属于这一范畴的应用类型 接着,我们将探讨 WebSocket,并分析为什么传统的 ...

最新文章

  1. 【Sql Server】DateBase-视频总结
  2. Javascript——说说js的调试
  3. 【GDAL】GDAL栅格数据结构学习笔记(一): 关于Metadata
  4. C++实现简单选择排序
  5. 二嗨租车系统java_java第二季租车系统作业
  6. python中素数怎么求_用python怎么求素数
  7. python机器人编程教程入门_机器人编程怎么入门?
  8. 【高性能计算背景】《并行计算教程简介》翻译 - 中文 - 1 / 4
  9. Ubuntu软件中心的完全启用
  10. python绘制图像频谱_python傅里叶变换FFT绘制频谱图
  11. 【多媒体】媒体的概念和分类
  12. 【人工智能Prolog】Prolog解决数独问题
  13. 推荐这几个不错的提取伴奏在线软件给大家
  14. YOLOv7来临:论文解读附代码解析
  15. chapter8 Pull Complexity Downwards
  16. HTML+JS 前端雪花飘落
  17. 心理学和人工智能 第一部分 心理学(一)—— 心理学的研究范围
  18. python批量下载微博贴评论图片
  19. fastjson byte[]转json字符串
  20. IP周边创作交流#创作者的个人影响力

热门文章

  1. 高并发高可靠性系统思考1
  2. 超棒的在线Bootstrap主题编辑工具 - lollytin
  3. ASP.NET GridView控件匯出EXCEL-移除控件,只是顯示文本
  4. 世界顶级精英们的人生哲学 【转】
  5. 从别的地方转来的 网址
  6. 如何在Chrome工具栏中固定和取消固定扩展程序
  7. 如何破解您忘记的Windows密码
  8. pkpm板按弹性计算还是塑性_双向板按弹性方法还是按塑性方法计算
  9. Java并发(二十一):线程池实现原理
  10. IE8采用IE7模式