EventBus/EventQueue 再思考

Intro

之前写过两篇文章,造轮子系列的 EventBus/ EventQueue,回想起来觉得当前的想法有点问题,当时对 EvenStore 可能有点误解,有兴趣可以参考

动手造轮子:实现一个简单的 EventBus

动手造轮子:实现简单的 EventQueue,

最近把 Event 相关的逻辑做了一个重构,修改 EventStore,重新设计了 Event 相关的组件

重构后的 Event

  • Event: 事件的抽象定义

  • EventHandler:事件处理器抽象定义

  • EventHandlerFactory:事件处理器工厂,用来根据事件类型获取事件处理器(新增)

  • EventPublisher:事件发布器,用于事件发布

  • EventSubscriber:事件订阅器,用于管理事件的订阅

  • EventSubscriptionManager:事件订阅管理器,在 EventSubscriber 的基础上增加了一个根据事件类型获取事件订阅器类型的方法

  • EventBus:事件总线,由 EventPubliser 和 EventSubscriber 组合而成,用来比较方便的做事件发布和订阅

  • EventQueue:事件队列,希望某些消息顺序处理的时候可以考虑用 EventQueue 的模式

  • EventStore:事件存储,事件的持久化存储(在之前的版本里,EventStore 实际作用是一个 EventSubscriptionManager,在最近的版本更新中已修改)

以上 EventSubscriberEventSubscriptionManager 一般不直接用,一般用 EventBus 来处理即可

EventHandlerFactory

这次引入了 EventHandlerFactory 用来抽象获取 EventHandler 的逻辑,原来的设计里是在处理 Event 的时候获取 EventHandler 的类型,然后从依赖注入框架中获取或创建新的 event handler 实例之后再调用 EventHandler 的 Handle 方法处理事件,有一些冗余

使用 EventHandlerFactory 之后就可以直接获取一个 EventHandler 实例集合,具体是实例化还是从依赖注入中获取就由 EventHandlerFactory 来决定了,这样就可以对依赖注入很友好,对于基于内存的简单 EventBus 来说,在服务注册之后就不需要再调用 Subscribe 去显式订阅了,因为再注册服务的时候就已经隐式实现了订阅的逻辑,这样实际就不需要 EventSubscriptionManager 来管理订阅了,订阅信息都在依赖注入框架内部,比如说 CounterEvent,要获取它的订阅信息,我只需要从依赖注入框架中获取 IEventHandler<CounterEvent> 的实例即可,实际就代替了原先 “EventStoreInMemory”,现在的 EventSubscriptionManagerInMemory

基于依赖注入的 EventHandlerFactory 定义:

public sealed class DependencyInjectionEventHandlerFactory : IEventHandlerFactory
{private readonly IServiceProvider _serviceProvider;public DependencyInjectionEventHandlerFactory(IServiceProvider serviceProvider = null){_serviceProvider = serviceProvider ?? DependencyResolver.Current;}public ICollection<IEventHandler> GetHandlers(Type eventType){var eventHandlerType = typeof(IEventHandler<>).MakeGenericType(eventType);return _serviceProvider.GetServices(eventHandlerType).Cast<IEventHandler>().ToArray();}
}

如果不使用依赖注入,也可以根据 IEventSubscriptionManager 订阅信息来实现:

public sealed class DefaultEventHandlerFactory : IEventHandlerFactory
{private readonly IEventSubscriptionManager _subscriptionManager;private readonly ConcurrentDictionary<Type, ICollection<IEventHandler>> _eventHandlers = new ConcurrentDictionary<Type, ICollection<IEventHandler>>();private readonly IServiceProvider _serviceProvider;public DefaultEventHandlerFactory(IEventSubscriptionManager subscriptionManager, IServiceProvider serviceProvider = null){_subscriptionManager = subscriptionManager;_serviceProvider = serviceProvider ?? DependencyResolver.Current;}public ICollection<IEventHandler> GetHandlers(Type eventType){var eventHandlers = _eventHandlers.GetOrAdd(eventType, type =>{var handlerTypes = _subscriptionManager.GetEventHandlerTypes(type);var handlers = handlerTypes.Select(t => (IEventHandler)_serviceProvider.GetServiceOrCreateInstance(t)).ToArray();return handlers;});return eventHandlers;}
}

EventQueue Demo

来看一下 EventQueue 的示例,示例基于 asp.net core 的,定义了一个 HostedService 来实现一个 EventConsumer 来消费 EventQueue 中的事件信息

EventConsumer 定义如下:

public class EventConsumer : BackgroundService
{private readonly IEventQueue _eventQueue;private readonly IEventHandlerFactory _eventHandlerFactory;public EventConsumer(IEventQueue eventQueue, IEventHandlerFactory eventHandlerFactory){_eventQueue = eventQueue;_eventHandlerFactory = eventHandlerFactory;}protected override async Task ExecuteAsync(CancellationToken stoppingToken){while (!stoppingToken.IsCancellationRequested){var queues = await _eventQueue.GetQueuesAsync();if (queues.Count > 0){await queues.Select(async q =>{var @event = await _eventQueue.DequeueAsync(q);if (null != @event){var handlers = _eventHandlerFactory.GetHandlers(@event.GetType());if (handlers.Count > 0){await handlers.Select(h => h.Handle(@event)).WhenAll();}}}).WhenAll();}await Task.Delay(1000, stoppingToken);}}
}

定义 PageViewEventPageViewEventHandler,用来记录和处理请求访问记录

public class PageViewEvent : EventBase
{
}
public class PageViewEventHandler : EventHandlerBase<PageViewEvent>
{public static int Count;public override Task Handle(PageViewEvent @event){Interlocked.Increment(ref Count);return Task.CompletedTask;}
}

事件很简单,事件处理也只是增加了 PageViewEventHandler 内定义的 Count。

服务注册:

// 注册事件核心组件
// 会注册 EventBus、EventHandlerFactory、EventQueue 等
services.AddEvents()// 注册 EventHanlder.AddEventHandler<PageViewEvent, PageViewEventHandler>();
// 注册 EventQueuePubliser,默认注册的 IEventPublisher 是 EventBus
services.AddSingleton<IEventPublisher, EventQueuePublisher>();
// 注册 EventConsumer
services.AddHostedService<EventConsumer>();

事件发布,定义了一个中间件来发布 PageViewEvent,定义如下:

// pageView middleware
app.Use((context, next) =>{var eventPublisher = context.RequestServices.GetRequiredService<IEventPublisher>();eventPublisher.Publish(new PageViewEvent());return next();});

然后定义一个接口来获取上面定义的 PageViewEventHandler 中的 Count

[Route("api/[controller]")]
public class EventsController : ControllerBase
{[HttpGet("pageViewCount")]public IActionResult Count(){return Ok(new { Count = PageViewEventHandler.Count });}
}

运行起来之后,访问几次接口,看上面的接口返回 Count 是否会增加,正常的话每访问一次接口就会增加 1,并发访问问题也不大,因为每个事件都是顺序处理的,即使并发访问也没有关系,事件发布之后,在队列里都是顺序处理的,这也就是引入事件队列的目的(好像上面的原子递增没什么用了...) 如果没看到了增加,稍等一会儿再访问试试,事件处理会迟到,但总会处理,毕竟是异步处理的,有些延迟很正常,而且上面我们还有一个 1s 的延迟

More

作者水平有限,如果上述有哪些不对的地方还望指出,万分感谢

Reference

  • https://github.com/WeihanLi/WeihanLi.Common/tree/dev/src/WeihanLi.Common/Event

  • https://github.com/WeihanLi/WeihanLi.Common/blob/dev/samples/AspNetCoreSample/Startup.cs

  • https://www.cnblogs.com/weihanli/p/implement-a-simple-event-bus.html

  • https://www.cnblogs.com/weihanli/p/implement-event-queue.html

EventBus/EventQueue 再思考相关推荐

  1. 2d 蓝图_“蓝图”卷积--对深度可分离卷积的再思考

    论文:Rethinking Depthwise Separable Convolutions: How Intra-Kernel Correlations Lead to Improved Mobil ...

  2. 任何举动之前,先思考,思考,再思考

    任何举动之前,先思考,思考,再思考--<Windows用户态程序高效排错>之读书笔记 之所以会读<Windows用户态程序高效排错>这本书,是因为某个卖存储的技术大拿不停地给我 ...

  3. 论文解读二十七:文本行识别模型的再思考

    摘要:本文研究了两个解码器(CTC[1]和 Transformer[2])和三个编码器模块(双向LSTM[3].Self-Attention[4]和GRCL[5]),通过大量实验在广泛使用的场景和手写 ...

  4. SPOOLing技术的再思考

    SPOOLing技术的再思考 @(OS) 首先看什么是SPOOLing. Simultaneous Peripheral Operation On-Line 通过这个名称基本上就可以知道很多事情.外部 ...

  5. 光通信的再思考:5G流量爆发下的数据密度革命

    来源:未来智库 1.投资要件 区别于市场的观点: (1)市场对 5G 时代流量爆发的认知不足.市场认为当前大带宽高流量新应用的爆发趋势尚不明显,当前流量增长需求不清.我们认为,5G 时代的流量的爆发将 ...

  6. 关于第一型曲面积分的再思考

    关于第一型曲面积分的再思考 @(微积分) 有些问题,看着复杂,却很好解.同样,有些问题看着很简单,但是却很难下手.举一个关于第一型曲面积分计算的例子. 第一型曲面积分基础解法要干三件事: 投影 代入 ...

  7. 重读GPDB 和 TiDB 论文引发的 HTAP 数据库再思考

    为什么要再思考? 大家好,我是阿福,之前我在社区 Paper Reading 活动中分享了 Greenplum 团队在 2021年 SIGMOD 上发表的论文:<Greenplum: A Hyb ...

  8. 【秒杀下单再思考 】

    秒杀下单再思考 - 由reids数据倾斜问题如何解决引发的思考 问题背景:用db扣减库存+redis.lock解决超卖问题,并发量并不能提很高,考虑redis中扣减库存,秒杀方案:redis.get( ...

  9. (2019.09) 区块链与游戏结合的再思考

    区块链与游戏结合的再思考 2015年2月份,我在知乎上回答了一个问题:如果使用电子加密貨幣來充當遊戲貨幣體系的一環,會對遊戲有什麽影響? 限于我本人当时的认识,那篇文章只是浅尝辄止了一下各种可能性.在 ...

最新文章

  1. TimerHandler的简单应用
  2. 【风险管理】策略开发流程
  3. STM32:Flash擦除与读写操作(HAL库)
  4. Windows Phone 7“芒果”更新带来浏览器重大升级:IE Mobile 9
  5. js cookies 存数组_用一个例子理解JS函数的底层处理机制
  6. localstorage || globalStorage || userData
  7. datagridview实时更新数据_旭诺云盒|智能办公新趋势进出口数据自动提取,通关状态实时更新...
  8. java int字母,从Java中获取int,也包含字母
  9. 2192-Zipper 求最长公共子序列的解题报告
  10. Android端公司通讯录开发与实现(一)
  11. 程序员英文简历范例(前端)
  12. 通达oa系统怎么转移到服务器,通达OA升级心通达OA操作步骤规范
  13. 超赞!设计师完全自学指南
  14. JavaScript验证手机号码、电子邮箱格式
  15. 高端游戏计算机配置单,高端稳定有保障 6000元i7-7700配GTX1060游戏配置推荐
  16. canbus是什么意思_can-bus总线是什么意思?
  17. 为什么阿里不推荐使用MySQL分区表?
  18. AD9854 MSP430 代码总结
  19. mmdetection config文件中几个参数的理解(anchor_scales,anchor_ratios,anchor_strides)
  20. 关于Likelihood 和 Probability的差别

热门文章

  1. SQL 分页查询语句大全即(查找第N到M条记录的方法)
  2. arm开发tq2440上的c++裸奔程序
  3. (转)程序员的十层楼11层(上帝)
  4. pkpm板按弹性计算还是塑性_双向板按弹性方法还是按塑性方法计算
  5. mysql 面试知识点笔记(七)RR如何避免幻读及非阻塞读、范式
  6. 在Data Lake Analytics中使用视图
  7. 操作系统与多核处理器
  8. ARM再出手,软银攀登新高山
  9. 段落排版--行间距, 行高(line-height)
  10. 纯css3实现的鼠标悬停动画按钮