基于事件驱动架构构建微服务第12部分:向Apache KAFKA生成事件
原文链接:https://logcorner.com/building-microservices-through-event-driven-architecture-part12-produce-events-to-apache-kafka/
在本教程中,我将展示如何将事件发布到apache KAFKA。
当客户端发生命令时,它将产生一个事件(例如:PlaceOrderCommand => OrderCreatedEvent)。新事件由聚合根注册为未提交的事件,并插入到仅附加表(事件存储)中。
现在我必须将这些事件生成到服务总线,以便订阅服务总线的应用程序可以选择事件以处理它们。
在接下来的步骤中,我将有一个使用者来选择事件,并将它们索引到一个高性能的no-sql数据库,该数据库将被我的应用程序的查询端用作后端数据库。
Apache KAFKA简介
Apache Kafka是一个社区分布式事件流平台,能够每天处理数万亿个事件。最初设想为消息队列,Kafka基于分布式提交日志的抽象。自2011年由LinkedIn创建并开源以来,Kafka已迅速从消息队列演变为成熟的事件流平台。
安装
安装Java SE开发工具包
打开以下网址,下载并安装java:https://www.oracle.com/fr/java/technologies/javase/javase-jdk8-downloads.html
安装Apache Kafka
到以下网址下载然后安装Kafka:https://kafka.apache.org/downloads
选择最新的稳定版本,在我的例子中我选择了scala 2.13 kafka_2.13-2.6.0版本
在打开的页面中,我选择建议的镜像来下载二进制文件。
下载.tgz存档文件并将其解压缩到安装文件夹(我的工作站上的C:\KAFKADEMO文件夹)。
你应该在Windows上有以下内容
要验证安装是否正常,请转到C:\KAFKADEMO\kafka_2.13-2.6.0\bin\windows位置并运行以下命令:
kafka-topics.bat
添加环境变量
这一步是可选的,你可以编辑你的环境变量并将你的kafka安装文件夹添加到路径中
添加一个文件夹working_dir和2个子文件夹zookeeper-data和kafka-data,如下图所示
启动zookeeper
要配置zookeeper,请编辑zookeeper.properties文件并按如下方式更新dataDir目录。
编辑C:\KAFKADEMO\kafka_2.13-2.6.0\config\zookeeper.properties
dataDir=C:/KAFKADEMO/kafka_2.13-2.6.0/working_dir/zookeeper-data
运行以下命令启动zookeeper:
zookeeper-server-start.bat config\zookeeper.properties
启动Kafka
要配置Kafka,请编辑server.properties文件并更新log.dirs目录,如下所示。
编辑C:\KAFKADEMO\kafka_2.13-2.6.0\config\server.properties
log.dirs=C:/KAFKADEMO/kafka_2.13-2.6.0/working_dir/kafka-data
运行以下命令启动kafka:
kafka-server-start.bat config\server.properties
创建主题
运行以下命令以创建主题:
kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –create –partitions 3 –replication-factor 1
运行以下命令以列出主题:
kafka-topics –zookeeper 127.0.0.1:2181 –list
运行以下命令来描述主题:
kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –describe
运行以下命令删除主题:
kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –delete
生产者
要创建向apache kafka主题(事件流)生成事件的生产者,请运行以下命令:
kafka-console-producer –broker-list 127.0.0.1:9092 –topic eventstream
消费者
要开始使用在主题(事件流)上生成的事件,请运行以下命令:
kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic eventstream
要从第一个事件开始使用在主题(事件流)上生成的所有事件,请运行以下命令:
kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic eventstream –from-beginning
Asp.Net Core SignalR介绍
ASP.NET Core SignalR是一个开源库,可简化向应用程序添加实时Web功能的过程。实时Web功能使服务器端代码能够立即将内容推送到客户端。
SignalR的使用场景:
需要从服务器进行高频更新的应用程序。例如游戏、社交网络、投票、拍卖、地图和GPS应用程序。
仪表板和监控应用程序。示例包括公司仪表板、即时销售更新或旅行提醒。
协作应用程序。白板应用程序和团队会议软件是协作应用程序的示例。
需要通知的应用程序。社交网络、电子邮件、聊天、游戏、旅行提醒和许多其他应用程序都使用通知。
创建一个SignalR HUB:
为了创建一个 SignalR Hub,我定义了以下接口以便拥有一个强类型Hub
Task OnPublish(T payload); 在消息发布到中心时获得通知
Task OnPublish(string topic, T payload); 在消息发布到特定主题时获得通知
Task OnSubscribe(string connectionId, string topic); 在客户加入特定主题时收到通知
Task OnUnSubscribe(string connectionId, string topic); 在客户离开特定主题时收到通知
以下接口用于订阅和发布事件
Hub定义如下
ISignalRNotifier是发布和接收消息的接口
将事件发布到SignalR Hub
当命令发生时,它作为事件存储到事件存储中,然后生产者可以从事件存储中选择事件并将其发布到服务总线。我不希望它像那样工作,因为我想知道哪些事件尚未发布(isPublihed = true/false)并相应地更新它。
因此,为了获得更大的灵活性,我将介绍一个SignalR Hub。所以我将实现的场景是:
当命令发生时,它会作为事件存储到事件存储区,然后发布到SignalR Hub主题。因此,对该主题感兴趣的客户将收到通知,然后可以处理该事件。客户端可以是服务总线、移动应用程序、单页应用程序等……
让我们继续从系统的命令端将事件发布到SignalR Hub。
所以我必须更新LogCorner.EduSync.Speech.Application.UseCases.EventSourcingHandler.cs文件的Handle函数并添加以下内容:
_publisher.PublishAsync(Topics.Speech, eventStore);
创建工作服务
让我们创建一个工作服务并添加以下类
ProducerHostedService
ProducerHostedService是承载ProducerService的后台服务
backgroundService是用于实现长时间运行的IHostedService的基类
ProducerService
ProducerService订阅一个signalR主题并处理在该主题上发布的事件。
它使用IServiceBus将接收到的事件发送到服务总线主题
ServiceBus
ServiceBus使用IServiceBusProvider接口向服务总线提供者发送消息。这样我就可以在不改变实现的情况下切换到另一个服务总线提供者(例如:RabbitMq 等)。
KafkaClient
KafkaClient使用Confluent.Kafka向kafka发送消息
测试
启动zookeeper
zookeeper-server-start.bat config\zookeeper.properties
启动Kafka
kafka-server-start.bat config\server.properties
启动消费者
kafka-topics –zookeeper 127.0.0.1:2181 –topic eventstream –create –partitions 3 –replication-factor 1
kafka-console-consumer –bootstrap-server 127.0.0.1:9092 –topic eventstream
启动以下工程:
LogCorner.EduSync.SignalR.Server
LogCorner.EduSync.Speech.Producer
启动以下工程:
LogCorner.EduSync.Speech.Presentation
启动postman并且post一个新的command
您应该在消费者控制台上看到以下输出,使用postman上发布的命令
代码源可在此处获得:
https://github.com/logcorner/LogCorner.EduSync.Speech.Command/tree/Feature/Task/AddSignalR https://github.com/logcorner/LogCorner.EduSync.Speech.ServiceBus/tree/ProduceMessagesTokafka
基于事件驱动架构构建微服务第12部分:向Apache KAFKA生成事件相关推荐
- 基于事件驱动架构构建微服务第1部分:应用程序特定的业务规则
原文链接:https://logcorner.com/building-microservices-through-event-driven-architecture-part1-applicatio ...
- 基于事件驱动架构构建微服务第13部分:使用来自Apache KAFKA的事件并将投影流传输到ElasticSearch...
原文链接:https://logcorner.com/building-microservices-through-event-driven-architecture-part13-read-mode ...
- 基于事件驱动架构构建微服务第16部分:Azure Active Directory B2C
原文链接: https://logcorner.com/building-micro-services-through-event-driven-architecture-part16-azure-a ...
- 基于事件驱动架构构建微服务第14部分:查询API
原文链接:https://logcorner.com/building-micro-services-through-event-driven-architecture-part14-query-ap ...
- 基于事件驱动架构构建微服务第11部分:持续集成
原文链接:https://logcorner.com/building-microservices-through-event-driven-architecture-part12-continuou ...
- 基于事件驱动架构构建微服务第10部分:在docker容器内运行单元测试
原文链接:https://logcorner.com/building-microservices-through-event-driven-architecture-part11-run-unit- ...
- 基于事件驱动架构构建微服务第19部分:使用 SignalR 和 Azure Active Directory 构建和保护实时通信...
原文链接:https://logcorner.com/building-micro-services-through-event-driven-architecture-part19-building ...
- 基于事件驱动架构构建微服务第5部分:容器化(Web Api Core 和 SQL Server Linux)
原文链接:https://logcorner.com/building-microservices-through-event-driven-architecture-part5-dockerizat ...
- 基于事件驱动架构构建微服务第15部分:SPA前端
原文链接 https://logcorner.com/building-micro-services-through-event-driven-architecture-part15-spa-fron ...
最新文章
- keras 的 example 文件 mnist_siamese.py 解析
- JAVA中几种常用JSON库性能比较
- vc中GetDlgItem用法
- https搭建(openssl)
- WPF的ListBox中的RadioButton不能单选问题
- 通过错误的sql来测试推理sql的解析过程
- 三维重建13:点云的局部特征总结
- 福禄克Fluke Pro3000 音频发生器和探头
- 经典面试题:断网排查思路
- win11安装报错0xc1900101怎么办 Windows11安装报错0xc1900101的解决方法
- 不已0开头的数字正则
- QQ空间说说一键批量删除软件2.0使用教程
- LTE-V2X车联网技术、标准、应用
- 3D MAX 插件的基本知识和安装方法
- 绕过preg_match
- 【QT学习六】QTextEdit
- 随机变量的相关性与独立性
- go语言中flag库使用
- android抖音自动刷新,Android 使用SwipeRefreshLayout控件仿抖音做的视频下拉刷新效果...
- 能ping通百度但是浏览器不能访问网页??强哥
热门文章
- Linux 命令(三)--用户管理
- 【转】你没有变强是因为你一直很舒服
- ip_vs实现分析(2)
- 钻石2 D2 让你的diamond2待机2天,甚至2天以上的方法(绝对不是购买电池)
- 计算机python程序设计导论,程序设计导论:Python计算与应用开发实践(原书第2版)...
- linux多线程求和_linux 多线程信号处理总结
- 辉光UIView的category
- DataAdapter.FillSchema 方法
- 在程序员的道路上,义无反顾的努力,有思想的人,很多,好的想法,需要学习。(以此共勉)...
- facebook 邀请好友_如何查看紧急情况下您的Facebook朋友是否安全