最近在看微软eShopOnContainers 项目,看到事件总线觉得不错,和大家分享一下

看完此文你将获得什么?

  1. eShop中是如何设计事件总线的
  2. 实现一个InMemory事件总线eShop中是没有InMemory实现的,这算是一个小小小的挑战

发布订阅模式

发布订阅模式可以让应用程序组件之间解耦,这是我们使用这种模式最重要的理由之一,如果你完全不知道这个东西,建议你先通过搜索引擎了解一下这种模式,网上的资料很多这里就不再赘述了。

eShop中的EventBus就是基于这种模式的发布/订阅
发布订阅模式核心概念有三个:发布者、订阅者、调度中心,这些概念在消息队列中就是生产者、消费者、MQ实例

在eShop中有两个EventBus的实现:

  • 基于RabbitMq的EventBusRabbitMQ
  • 基于AzureServiceBus的EventBusServiceBus

IEventBus开始

先来看一看,所有EventBus的接口IEventBus

public interface IEventBus
{void Publish(IntegrationEvent @event);void Subscribe<T, TH>()where T : IntegrationEventwhere TH : IIntegrationEventHandler<T>;void SubscribeDynamic<TH>(string eventName)where TH : IDynamicIntegrationEventHandler;void UnsubscribeDynamic<TH>(string eventName)where TH : IDynamicIntegrationEventHandler;void Unsubscribe<T, TH>()where TH : IIntegrationEventHandler<T>where T : IntegrationEvent;
}

嗯,乍一看看是有点眼晕的,仔细看它的核心功能只有三个:

  1. Publish 发布
  2. Subscribe 订阅
  3. Unsubscribe 取消订阅

这对应着发布订阅模式的基本概念,不过对于事件总线的接口添加了许多约束:

  1. 发布的内容(消息)必须是IntegrationEvent及其子类
  2. 订阅事件必须指明要订阅事件的类型,并附带处理器类型
  3. 处理器必须是IIntegrationEventHandler的实现类

Ok,看到这里先不要管Dynamic相关的方法,然后记住这个两个关键点:

  1. 事件必须继承IntegrationEvent
  2. 处理器必须实现IIntegrationEventHandler<T>TIntegrationEvent子类

另外,看下 IntegrationEvent有什么

public class IntegrationEvent
{public IntegrationEvent(){Id = Guid.NewGuid();CreationDate = DateTime.UtcNow;}public Guid Id  { get; }public DateTime CreationDate { get; }
}

IEventBusSubscriptionsManager是什么

public interface IEventBusSubscriptionsManager
{bool IsEmpty { get; }event EventHandler<string> OnEventRemoved;void AddDynamicSubscription<TH>(string eventName)where TH : IDynamicIntegrationEventHandler;void AddSubscription<T, TH>()where T : IntegrationEventwhere TH : IIntegrationEventHandler<T>;void RemoveSubscription<T, TH>()where TH : IIntegrationEventHandler<T>where T : IntegrationEvent;void RemoveDynamicSubscription<TH>(string eventName)where TH : IDynamicIntegrationEventHandler;bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent;bool HasSubscriptionsForEvent(string eventName);Type GetEventTypeByName(string eventName);void Clear();IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent;IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName);string GetEventKey<T>();
}

这个接口看起来稍显复杂些,我们来简化下看看:

public interface IEventBusSubscriptionsManager
{void AddSubscription<T, TH>()void RemoveSubscription<T, TH>()IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>()
}

最终,这三个方法就是我们要关注的,添加订阅、移除订阅、获取指定事件的订阅信息。

SubscriptionInfo是什么?

public bool IsDynamic { get; }
public Type HandlerType{ get; }

SubscriptionInfo中只有两个信息,这是不是一个Dynamic类型的Event以及这个Event所对应的处理器的类型。

这是你可能会有另一个疑问:

这个和IEventBus有什么关系?

  1. IEventBusSubscriptionsManager含有更多功能:查看是否有订阅,获取事件的Type,获取事件的处理器等等
  2. IEventBusSubscriptionsManagerIEventBus使用,在RabbitMq和ServiceBus的实现中,都使用Manager去存储事件的信息,例如下面的代码:

     public void Subscribe<T, TH>()where T : IntegrationEventwhere TH : IIntegrationEventHandler<T>{// 查询事件的全名var eventName = _subsManager.GetEventKey<T>();//向mq添加注册DoInternalSubscription(eventName);// 向manager添加订阅_subsManager.AddSubscription<T, TH>();}private void DoInternalSubscription(string eventName){var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);if (!containsKey){if (!_persistentConnection.IsConnected){_persistentConnection.TryConnect();}using (var channel = _persistentConnection.CreateModel()){channel.QueueBind(queue: _queueName,exchange: BROKER_NAME,routingKey: eventName);}}}

    查询事件的名字是manager做的,订阅的时候是先向mq添加订阅,之后又加到manager中,manager管理着订阅的基本信息。

另外一个重要功能是获取事件的处理器信息,在rabbit mq的实现中,ProcessEvent方法中用manager获取了事件的处理器,再用依赖注入获得处理器的实例,反射调用Handle方法处理事件信息:

    private async Task ProcessEvent(string eventName, string message){// 从manager查询信息if (_subsManager.HasSubscriptionsForEvent(eventName)){using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)){// 从manager获取处理器var subscriptions = _subsManager.GetHandlersForEvent(eventName);foreach (var subscription in subscriptions){// Di + 反射调用,处理事件(两个都是,只是针对是否是dynamic做了不同的处理)if (subscription.IsDynamic){ var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;dynamic eventData = JObject.Parse(message);await handler.Handle(eventData);}else{var eventType = _subsManager.GetEventTypeByName(eventName);var integrationEvent = JsonConvert.DeserializeObject(message, eventType);var handler = scope.ResolveOptional(subscription.HandlerType);var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });}}}}}

IEventBusSubscriptionsManager的默认实现

在eShop中只有一个实现就是InMemoryEventBusSubscriptionsManager

这个类中有两个重要的字段

    private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;private readonly List<Type> _eventTypes;

他们分别存储了事件列表和事件处理器信息词典

接下来就是实现一个

基于内存的事件总线

我们要做什么呢?IEventBusSubscriptionsManager 已经有了InMemory的实现了,我们可以直接拿来用,所以我们只需要自己实现一个EventBus就好了

先贴出最终代码:

public class InMemoryEventBus : IEventBus
{private readonly IServiceProvider _provider;private readonly ILogger<InMemoryEventBus> _logger;private readonly ISubscriptionsManager _manager;private readonly IList<IntegrationEvent> _events;public InMemoryEventBus(IServiceProvider provider,ILogger<InMemoryEventBus> logger, ISubscriptionsManager manager){_provider = provider;_logger = logger;_manager = manager;}public void Publish(IntegrationEvent e){var eventType = e.GetType();var handlers = _manager.GetHandlersForEvent(eventType.FullName);foreach (var handlerInfo in handlers){var handler = _provider.GetService(handlerInfo.HandlerType);var method = handlerInfo.HandlerType.GetMethod("Handle");method.Invoke(handler, new object[] { e });}}public void Subscribe<T, TH>()where T : IntegrationEventwhere TH : IIntegrationEventHandler<T>{_manager.AddSubscription<T, TH>();}public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler{throw new NotImplementedException();}public void Unsubscribe<T, TH>()where T : IntegrationEventwhere TH : IIntegrationEventHandler<T>{_manager.RemoveSubscription<T, TH>();}public void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler{throw new NotImplementedException();}
}

首先构造函数中声明我们要使用的东西:

public InMemoryEventBus(IServiceProvider provider,ILogger<InMemoryEventBus> logger, ISubscriptionsManager manager)
{_provider = provider;_logger = logger;_manager = manager;
}

这里要注意的就是IServiceProvider provider这是 DI容器,当我们在切实处理事件的时候我们选择从DI获取处理器的实例,而不是反射创建,这要做的好处在于,处理器可以依赖于其它东西,并且可以是单例的

public void Subscribe<T, TH>()where T : IntegrationEventwhere TH : IIntegrationEventHandler<T>
{_manager.AddSubscription<T, TH>();}public void Unsubscribe<T, TH>()where T : IntegrationEventwhere TH : IIntegrationEventHandler<T>
{_manager.RemoveSubscription<T, TH>();
}

订阅和取消订阅很简单,因为我们是InMemory的所以只调用了manager的方法。

接下来就是最重要的Publish方法,实现Publish有两种方式:

  1. 使用额外的线程和Queue让发布和处理异步
  2. 为了简单起见,我们先写个简单易懂的同步的

     public void Publish(IntegrationEvent e){// 首先要拿到集成事件的Type信息var eventType = e.GetType();// 获取属于这个事件的处理器列表,可能有很多,注意获得的是SubscriptionInfovar handlers = _manager.GetHandlersForEvent(eventType.FullName);// 不解释循环foreach (var handlerInfo in handlers){// 从DI中获取类型的实例var handler = _provider.GetService(handlerInfo.HandlerType);// 拿到Handle方法var method = handlerInfo.HandlerType.GetMethod("Handle");// 调用方法method.Invoke(handler, new object[] { e });}}

OK,我们的InMemoryEventBus就写好了!

要实践这个InMemoryEventBus,那么还需要一个IntegrationEvent的子类,和一个IIntegrationEventHandler<T>的实现类,这些都不难,例如我们做一个添加用户的事件,A在添加用户后,发起一个事件并将新用户的名字作为事件数据,B去订阅事件,并在自己的处理器中处理名字信息。

思路是这样的:

  1. 写一个 AddUserEvent:IntegrationEvent,里面有一个UserId和一个UserName
  2. 写一个AddUserEventHandler:IIntegrationEventHandler<AddUserEvent>,在Handle方法中输出UserId和Name到日志。
  3. 注册DI,你要注册下面这些服务:

     IEventBus=>InMemoryEventBusISubscriptionsManager=>InMemorySubscriptionsManagerAddUserEventHandler=>AddUserEventHandler
  4. 在Startup中为刚刚写的事件和处理器添加订阅(在这里已经可以获取到IEventBus实例了)
  5. 写一个Api接口或是什么,调用IEventBus的Publish方法,new 一个新的AddUserEvent作为参数传进去。

OK!到这里一个切实可用的InMemoryEventBus就可以使用了。

转载于:https://www.cnblogs.com/rocketRobin/p/8510198.html

看eShopOnContainers学一个EventBus相关推荐

  1. 学python是看书还是看视频-学 Python 你觉得是看书还是看视频?

    大家好,这是首发在我公众号「Python空间」的第 87 篇文章,想看更多的文章或者加我欢迎关注,我们一起交流. 今天有个新关注的读者在后台问了这么一个问题:"我准备开始学 Python,是 ...

  2. python入门学习[看漫画学Python:有趣、有料、好玩、好用读书笔记]

    写在前面:本文中绝大多数图片来源于图书:看漫画学Python:有趣.有料.好玩.好用,本文仅供个人学习使用,如有侵权,请联系删除. 学习编程语言最好的方式就是多写,多写,多写!!!哪有什么快速掌握,能 ...

  3. 跟vczh看实例学编译原理——三:Tinymoe与无歧义语法分析

    文章中引用的代码均来自https://github.com/vczh/tinymoe. 看了前面的三篇文章,大家应该基本对Tinymoe的代码有一个初步的感觉了.在正确分析"print su ...

  4. 跟vczh看实例学编译原理——一:Tinymoe的设计哲学

    自从<序>胡扯了快一个月之后,终于迎来了正片.之所以系列文章叫<看实例学编译原理>,是因为整个系列会通过带大家一步一步实现Tinymoe的过程,来介绍编译原理的一些知识点. 但 ...

  5. 跟vczh看实例学编译原理——零:序言

    在<如何设计一门语言>里面,我讲了一些语言方面的东西,还有痛快的喷了一些XX粉什么的.不过单纯讲这个也是很无聊的,所以我开了这个<跟vczh看实例学编译原理>系列,意在科普一些 ...

  6. 且看且学Gradle--(1)Gradle入门

    介绍之前,先说点背景,其实之前不懂Gradle,听过maven,一直用ant写了好多年,最近上spring网站浏览一下想下个最新的版本.结果摸索,只下了个源文件版的4.0.5,打开里面的,没看懂是什么 ...

  7. 看动画学算法之:二叉搜索树BST

    文章目录 简介 BST的基本性质 BST的构建 BST的搜索 BST的插入 BST的删除 看动画学算法之:二叉搜索树BST 简介 树是类似于链表的数据结构,和链表的线性结构不同的是,树是具有层次结构的 ...

  8. 个盘子的汉诺塔需要移动几步_看漫画学C++039:递归解汉诺塔

    点击蓝字 关注我们 本话内容 请输入 相传在古印度圣庙中,有一种被称为汉诺塔(Hanoi)的游戏.该游戏是在一块铜板装置上,有三根杆(编号A.B.C),在A杆自下而上.由大到小按顺序放置64个金盘(如 ...

  9. 看代码学知识之(2) ListView无数据时显示其他View

    看代码学知识之(2) ListView无数据时显示其他View 今天看的一块布局是这样的: <!--The frame layout is here since we will be showi ...

最新文章

  1. ./configure --with-package=dir指定依赖的软件包
  2. 【SQL】表A多个字段,关联表B一个字段说明
  3. Using the Cordova Camera API
  4. Java 解析 XML
  5. 5h Oralcle进阶直播课,限时免费报名,手慢无!
  6. Generator 实现
  7. 2012-09-16-html
  8. a标签点击不跳转的几种方法
  9. Java程序设计课程设计_《JAVA程序设计》课程设计
  10. 自己动手从零搭建神经网络
  11. Poco Timer
  12. emlog模板酷黑自适应CoolBlack主题 黑的有质感
  13. .net ImageProcessor组件转换图片格式
  14. eclipse怎么配置oracle数据库,Eclipse连接Oracle数据库介绍
  15. GPS授时器(GPS卫星授时器)常用的同步方式
  16. 电子商务格局下的营销未来
  17. 2022 CCF中国软件大会(CCF ChinaSoft)“人工智能安全专刊”论坛成功召开
  18. 要想高效率完成软件测试工作,请牢记以下几点
  19. 分布式理论面试题 一
  20. 你所不知道的良心网站第一弹

热门文章

  1. centos6.5搭建lnmp环境
  2. Java RTTI与反射(参照Java编程思想与新浪博客)
  3. 分享一个java对xml,excel,jdbc.properties,读写文件,读写图片等实现(1)
  4. 安装Open Live Writer,添加SyntaxHighlighter实现代码高亮
  5. [BJOI2015]树的同构
  6. hibernate annotation多对多中间表添加其他字段的第三种方法
  7. RESTful Android
  8. 深入思考全局静态存储区、堆区和栈区
  9. 堆(heap)和栈(stack)有什么区别??
  10. ZedGraph:一个同时支持WinForm和WebForm的开源图表控件(基于LGPL协议,.NET 2.0 C#源代码)...