动手造轮子:实现简单的 EventQueue
动手造轮子:实现简单的 EventQueue
Intro
最近项目里有遇到一些并发的问题,想实现一个队列来将并发的请求一个一个串行处理,可以理解为使用消息队列处理并发问题,之前实现过一个简单的 EventBus
,于是想在 EventBus
的基础上改造一下,加一个队列,改造成类似消息队列的处理模式。消息的处理(Consumer)直接使用 .netcore 里的 IHostedService
来实现了一个简单的后台任务处理。
初步设计
Event 抽象的事件
EventHandler 处理 Event 的方法
EventStore 保存订阅 Event 的 EventHandler
EventQueue 保存 Event 的队列
EventPublisher 发布 Event
EventConsumer 处理 Event 队列里的 Event
EventSubscriptionManager 管理订阅 Event 的 EventHandler
实现代码
EventBase 定义了基本事件信息,事件发生时间以及事件的id:
public abstract class EventBase
{
[JsonProperty]
public DateTimeOffset EventAt { get; private set; }
[JsonProperty]
public string EventId { get; private set; }
protected EventBase()
{
this.EventId = GuidIdGenerator.Instance.NewId();
this.EventAt = DateTimeOffset.UtcNow;
}
[JsonConstructor]
public EventBase(string eventId, DateTimeOffset eventAt)
{
this.EventId = eventId;
this.EventAt = eventAt;
}
}
EventHandler 定义:
public interface IEventHandler
{
Task Handle(IEventBase @event);
}
public interface IEventHandler<in TEvent> : IEventHandler where TEvent : IEventBase
{
Task Handle(TEvent @event);
}
public class EventHandlerBase<TEvent> : IEventHandler<TEvent> where TEvent : EventBase
{
public virtual Task Handle(TEvent @event)
{
return Task.CompletedTask;
}
public Task Handle(IEventBase @event)
{
return Handle(@event as TEvent);
}
}
EventStore:
public class EventStore
{
private readonly Dictionary<Type, Type> _eventHandlers = new Dictionary<Type, Type>();
public void Add<TEvent, TEventHandler>() where TEventHandler : IEventHandler<TEvent> where TEvent : EventBase
{
_eventHandlers.Add(typeof(TEvent), typeof(TEventHandler));
}
public object GetEventHandler(Type eventType, IServiceProvider serviceProvider)
{
if (eventType == null || !_eventHandlers.TryGetValue(eventType, out var handlerType) || handlerType == null)
{
return null;
}
return serviceProvider.GetService(handlerType);
}
public object GetEventHandler(EventBase eventBase, IServiceProvider serviceProvider) =>
GetEventHandler(eventBase.GetType(), serviceProvider);
public object GetEventHandler<TEvent>(IServiceProvider serviceProvider) where TEvent : EventBase =>
GetEventHandler(typeof(TEvent), serviceProvider);
}
EventQueue 定义:
public class EventQueue
{
private readonly ConcurrentDictionary<string, ConcurrentQueue<EventBase>> _eventQueues =
new ConcurrentDictionary<string, ConcurrentQueue<EventBase>>();
public ICollection<string> Queues => _eventQueues.Keys;
public void Enqueue<TEvent>(string queueName, TEvent @event) where TEvent : EventBase
{
var queue = _eventQueues.GetOrAdd(queueName, q => new ConcurrentQueue<EventBase>());
queue.Enqueue(@event);
}
public bool TryDequeue(string queueName, out EventBase @event)
{
var queue = _eventQueues.GetOrAdd(queueName, q => new ConcurrentQueue<EventBase>());
return queue.TryDequeue(out @event);
}
public bool TryRemoveQueue(string queueName)
{
return _eventQueues.TryRemove(queueName, out _);
}
public bool ContainsQueue(string queueName) => _eventQueues.ContainsKey(queueName);
public ConcurrentQueue<EventBase> this[string queueName] => _eventQueues[queueName];
}
EventPublisher:
public interface IEventPublisher
{
Task Publish<TEvent>(string queueName, TEvent @event)
where TEvent : EventBase;
}
public class EventPublisher : IEventPublisher
{
private readonly EventQueue _eventQueue;
public EventPublisher(EventQueue eventQueue)
{
_eventQueue = eventQueue;
}
public Task Publish<TEvent>(string queueName, TEvent @event)
where TEvent : EventBase
{
_eventQueue.Enqueue(queueName, @event);
return Task.CompletedTask;
}
}
EventSubscriptionManager:
public interface IEventSubscriptionManager
{
void Subscribe<TEvent, TEventHandler>()
where TEvent : EventBase
where TEventHandler : IEventHandler<TEvent>;
}
public class EventSubscriptionManager : IEventSubscriptionManager
{
private readonly EventStore _eventStore;
public EventSubscriptionManager(EventStore eventStore)
{
_eventStore = eventStore;
}
public void Subscribe<TEvent, TEventHandler>()
where TEvent : EventBase
where TEventHandler : IEventHandler<TEvent>
{
_eventStore.Add<TEvent, TEventHandler>();
}
}
EventConsumer:
public class EventConsumer : BackgroundService
{
private readonly EventQueue _eventQueue;
private readonly EventStore _eventStore;
private readonly int maxSemaphoreCount = 256;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger _logger;
public EventConsumer(EventQueue eventQueue, EventStore eventStore, IConfiguration configuration, ILogger<EventConsumer> logger, IServiceProvider serviceProvider)
{
_eventQueue = eventQueue;
_eventStore = eventStore;
_logger = logger;
_serviceProvider = serviceProvider;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using (var semaphore = new SemaphoreSlim(Environment.ProcessorCount, maxSemaphoreCount))
{
while (!stoppingToken.IsCancellationRequested)
{
var queues = _eventQueue.Queues;
if (queues.Count > 0)
{
await Task.WhenAll(
queues
.Select(async queueName =>
{
if (!_eventQueue.ContainsQueue(queueName))
{
return;
}
try
{
await semaphore.WaitAsync(stoppingToken);
//
if (_eventQueue.TryDequeue(queueName, out var @event))
{
var eventHandler = _eventStore.GetEventHandler(@event, _serviceProvider);
if (eventHandler is IEventHandler handler)
{
_logger.LogInformation(
"handler {handlerType} begin to handle event {eventType}, eventId: {eventId}, eventInfo: {eventInfo}",
eventHandler.GetType().FullName, @event.GetType().FullName,
@event.EventId, JsonConvert.SerializeObject(@event));
try
{
await handler.Handle(@event);
}
catch (Exception e)
{
_logger.LogError(e, "event {eventId} handled exception", @event.EventId);
}
finally
{
_logger.LogInformation("event {eventId} handled", @event.EventId);
}
}
else
{
_logger.LogWarning(
"no event handler registered for event {eventType}, eventId: {eventId}, eventInfo: {eventInfo}",
@event.GetType().FullName, @event.EventId,
JsonConvert.SerializeObject(@event));
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "error running EventConsumer");
}
finally
{
semaphore.Release();
}
})
);
}
await Task.Delay(50, stoppingToken);
}
}
}
}
为了方便使用定义了一个 Event 扩展方法:
public static IServiceCollection AddEvent(this IServiceCollection services)
{
services.TryAddSingleton<EventStore>();
services.TryAddSingleton<EventQueue>();
services.TryAddSingleton<IEventPublisher, EventPublisher>();
services.TryAddSingleton<IEventSubscriptionManager, EventSubscriptionManager>();
services.AddSingleton<IHostedService, EventConsumer>();
return services;
}
使用示例
定义 PageViewEvent
记录请求信息:
public class PageViewEvent : EventBase
{
public string Path { get; set; }
}
这里作为示例只记录了请求的Path信息,实际使用可以增加更多需要记录的信息
定义 PageViewEventHandler
,处理 PageViewEvent
public class PageViewEventHandler : EventHandlerBase<PageViewEvent>
{
private readonly ILogger _logger;
public PageViewEventHandler(ILogger<PageViewEventHandler> logger)
{
_logger = logger;
}
public override Task Handle(PageViewEvent @event)
{
_logger.LogInformation($"handle pageViewEvent: {JsonConvert.SerializeObject(@event)}");
return Task.CompletedTask;
}
}
这个 handler 里什么都没做只是输出一个日志
这个示例项目定义了一个记录请求路径的事件以及一个发布请求记录事件的中间件
// 发布 Event 的中间件
app.Use(async (context, next) =>
{
var eventPublisher = context.RequestServices.GetRequiredService<IEventPublisher>();
await eventPublisher.Publish("pageView", new PageViewEvent() { Path = context.Request.Path.Value });
await next();
});
Startup 配置:
public void ConfigureServices(IServiceCollection services)
{
// ...
services.AddEvent();
services.AddSingleton<PageViewEventHandler>();// 注册 Handler
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env, IEventSubscriptionManager eventSubscriptionManager)
{
eventSubscriptionManager.Subscribe<PageViewEvent, PageViewEventHandler>();
app.Use(async (context, next) =>
{
var eventPublisher = context.RequestServices.GetRequiredService<IEventPublisher>();
await eventPublisher.Publish("pageView", new PageViewEvent() { Path = context.Request.Path.Value });
await next();
});
// ...
}
使用效果:
More
注:只是一个初步设计,基本可以实现功能,还是有些不足,实际应用的话还有一些要考虑的事情
Consumer
消息逻辑,现在的实现有些问题,我们的应用场景目前比较简单还可以满足,如果事件比较多就会而且每个事件可能处理需要的时间长短不一样,会导致在一个批次中执行的Event
中已经完成的事件要等待其他还没完成的事件完成之后才能继续取下一个事件,理想的消费模式应该是各个队列相互独立,在同一个队列中保持顺序消费即可上面示例的
EventStore
的实现只是简单的实现了一个事件一个Handler
的处理情况,实际业务场景中很可能会有一个事件需要多个Handler
的情况这个实现是基于内存的,如果要在分布式场景下使用就不适用了,需要自己实现一下基于redis或者数据库的以满足分布式的需求
and more...
上面所有的代码可以在 Github 上获取,示例项目 Github 地址:https://github.com/WeihanLi/AspNetCorePlayground/tree/master/TestWebApplication
Reference
https://github.com/WeihanLi/AspNetCorePlayground/tree/master/TestWebApplication/Event
动手造轮子:实现简单的 EventQueue相关推荐
- 动手造轮子:实现一个简单的基于 Console 的日志输出
动手造轮子:实现一个简单的基于 Console 的日志输出 Intro 之前结合了微软的 Logging 框架和 Serilog 写了一个简单的日志框架,但是之前的用法都是基于 log4net.ser ...
- 动手造轮子:实现一个简单的依赖注入(二) --- 服务注册优化
动手造轮子:实现一个简单的依赖注入(二) --- 服务注册优化 Intro 之前实现的那版依赖注入框架基本可用,但是感觉还是不够灵活,而且注册服务和解析服务在同一个地方感觉有点别扭,有点职责分离不够. ...
- 动手造轮子:实现一个简单的依赖注入(一)
动手造轮子:实现一个简单的依赖注入(一) Intro 在上一篇文章中主要介绍了一下要做的依赖注入的整体设计和大概编程体验,这篇文章要开始写代码了,开始实现自己的依赖注入框架. 类图 首先来温习一下上次 ...
- 动手造轮子:实现一个简单的依赖注入(零)
动手造轮子:实现一个简单的依赖注入(零) Intro 依赖注入为我们写程序带来了诸多好处,在微软的 .net core 出来的同时也发布了微软开发的依赖注入框架 Microsoft.Extension ...
- 动手造轮子:实现一个简单的 EventBus
动手造轮子:实现一个简单的 EventBus Intro EventBus 是一种事件发布订阅模式,通过 EventBus 我们可以很方便的实现解耦,将事件的发起和事件的处理的很好的分隔开来,很好的实 ...
- 动手造轮子:写一个日志框架
动手造轮子:写一个日志框架 Intro 日志框架有很多,比如 log4net / nlog / serilog / microsoft.extensions.logging 等,如何在切换日志框架的时 ...
- 动手造轮子:基于 Redis 实现 EventBus
动手造轮子:基于 Redis 实现 EventBus Intro 上次我们造了一个简单的基于内存的 EventBus,但是如果要跨系统的话就不合适了,所以有了这篇基于 Redis 的 EventBus ...
- python安装轮子_自己动手造“轮子”---python常用的几个方法
前言:由于工作内容的原因,经常需要些python脚本,久而久之,发现有一些方法经常用到,于是就自己动手编辑了一些常用的.大众的.通用的方法.小弟不才,但也希望能为开源做做贡献. 最后再附上代码哈: 一 ...
- 造轮子之图片轮播组件(swiper)
图片轮播是种很常见的场景和功能,一般移动网站首页的轮播 banner,商品详情页的商品图片等位置都会用到此功能 像这种常用的场景功能肯定是有人早就写好插件了的,所以遇到这种场景,一般都遵循以下三步: ...
最新文章
- 标志位和中断位的区别:USART_ClearFlag和USART_ClearITPendingBit
- 人类血液中首次发现微塑料颗粒,饮料瓶塑料袋化妆品都是来源
- 数据结构例程——线性表顺序存储的应用
- 不同级别UI设计师的区别有哪些?
- Oracle 存储过程错误之PLS-00201: 必须声明标识符
- linux中时间命令详解
- centos下openfire安装(转)
- 应届生想要获取web前端开发岗位?这份技能攻略,面试攻略别错过
- 什么才是一份好的AI求职简历?
- 2015年第六届蓝桥杯国赛试题(JavaA组)
- IE设置自动获得代理(ISA20042006中设置相应项)
- maven指定配置文件
- fst和skiplist
- win7计算机怎么初始化,Win7系统网络初始化的设置方法
- 随机梯度下降法(stochastic gradient descent,SGD)
- 腾讯位置大数据中区域热力图数据的数据解析
- PyPDF2--如何使用python操作你的PDF文档
- C语言 输出Sn = a + aa + aaa + aaaa + ······
- 使用python画函数图像
- jQuery实现一组图片的循环滚动