前言
基础篇已经更新完了,从本篇开始我们进入,中级篇(学习部分源代码)我会挑一些我个人认为比较重要的知识点结合部分开源项目进行源码讲解,咱们废话不说直接上车。
Abp vNext的事件总线分2种,一种是本地事件总线,一种是分布式事件总线,本节主要讲解本地事件总线,下一节讲分布式事件总线。
事件总线所处模块在 Volo.Abp.EventBus 中。
正文
看Abp源代码我们先从Module
下手,这块代码实现的比较简单,就是在类注册的时候判断是否继承ILocalEventHandler/IDistributedEventHandler
,将其记录下来并赋值给对应的Options
配置类当中。
也就是说当我们定义如下Handler
的时候,会在依赖注入的时候,都会将其类型 (Type) 添加到事件总线的配置类当中。
public class ProjectDeleteHandler : ILocalEventHandler<ProjectDeleteEvent>,
ITransientDependency
{
// ...
}
在单元测试EventBusTestBase
类我们找一个LocalEventBus的调用类,直接看PublishAsync
的调用来自于IEventBus
,根据继承我们先找到ILocalEventBus
,也就是下面这些,这里注意EventBusBase
类型,Abp有分布式事件总线和本地事件总线,这里抽离了一个事件基类提供使用。
public interface IEventBus
{
/// <summary>
/// Triggers an event.
/// </summary>
/// <typeparam name="TEvent">Event type</typeparam>
/// <param name="eventData">Related data for the event</param>
/// <param name="onUnitOfWorkComplete">True, to publish the event at the end of the current unit of work, if available</param>
/// <returns>The task to handle async operation</returns>
Task PublishAsync<TEvent>(TEvent eventData, bool onUnitOfWorkComplete = true)
where TEvent : class;
/// <summary>
/// Triggers an event.
/// </summary>
/// <param name="eventType">Event type</param>
/// <param name="eventData">Related data for the event</param>
/// <param name="onUnitOfWorkComplete">True, to publish the event at the end of the current unit of work, if available</param>
/// <returns>The task to handle async operation</returns>
Task PublishAsync(Type eventType, object eventData, bool onUnitOfWorkComplete = true);
/// ......
}
还是顺着我们的PublishAsync
向下看,我们可以看到调用了TriggerHandlersAsync
函数名字就是触发处理器
这个Trigger感觉有点js的味道.
public virtual async Task PublishAsync(LocalEventMessage localEventMessage)
{
await TriggerHandlersAsync(localEventMessage.EventType, localEventMessage.EventData);
}
TriggerHandlersAsync
内部主要就是调用GetHandlerFactories
获取事件工厂,而GetHandlerFactories
内部的数据来自于HandlerFactories
HandlerFactories
的数据内容在LocalEventBus
构造函数中通过调用SubscribeHandlers
函数进行填充,最后在TriggerHandlerAsync
中完成调用。
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
{
var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>();
foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)))
{
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
}
return handlerFactoryList.ToArray();
}
var method = typeof(ILocalEventHandler<>)
.MakeGenericType(eventType)
.GetMethod(
nameof(ILocalEventHandler<object>.HandleEventAsync),
new[] { eventType }
);
await ((Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData }));
在整套的运行中还牵扯到异步等待,异常处理,工作单元,构造事件处理器工厂,事件销毁等。不过核心的处理顺序就这样。
解析
这里也想大家推荐本系列的讲解仓库Dppt
(https://github.com/MrChuJiu/Dppt/tree/master/src)该仓库源于Easy.Core.Flow
是我20年创建的用于对知识点的解析,后面代码都会上传到这个仓库.
接下来对上面的Abp代码进行一个简单的分析,简单来看本地EventBus就是通过某个key来反射调用某个函数.简单点直接开整.
我们也写一个IEventBus
然后让LocalEventBus
继承IEventBus
,这里我不需要EventBusBase和ILocalEventBus
因为我就是一个本地evetbus的包,总代码300行左右我就不全部拿上来了,大家可以去仓库自己下载。
public interface IEventBus
{
Task PublishAsync<TEvent>(TEvent eventData);
void Unsubscribe(Type eventType, IEventHandlerFactory factory);
}
public class LocalEventBus : IEventBus
{
public async Task PublishAsync<TEvent>(TEvent eventData)
{
await TriggerHandlersAsync(typeof(TEvent), eventData);
}
protected virtual async Task TriggerHandlersAsync(Type eventType, object eventData)
{
foreach (var handlerFactories in GetHandlerFactories(eventType))
{
foreach (var handlerFactory in handlerFactories.EventHandlerFactories)
{
await TriggerHandlerAsync(handlerFactory, handlerFactories.EventType, eventData);
}
}
}
protected virtual async Task TriggerHandlerAsync(IEventHandlerFactory asyncHandlerFactory, Type eventType, object eventData)
{
using (var eventHandlerWrapper = asyncHandlerFactory.GetHandler())
{
var handlerType = eventHandlerWrapper.EventHandler.GetType();
var method = typeof(ILocalEventHandler<>)
.MakeGenericType(eventType)
.GetMethod(
nameof(ILocalEventHandler<object>.HandleEventAsync),
new[] { eventType }
);
await ((Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData }));
}
}
}
因为我们我们既没有模块化也没Autofac我们就简单点来注册Handler,让外部把Handler告诉我们,当然你扫描程序集也是可以的,但是没必要,这里我们的步骤等于Abp在模块里面的配置Options。
public static class DpptEventBusRegistrar
{
public static void AddDpptEventBus(this IServiceCollection services, List<Type> types) {
services.AddSingleton<IEventBus, LocalEventBus>();
var localHandlers = types;/*.Where(s => typeof(ILocalEventHandler<>).IsAssignableFrom(s)).ToList();*/
foreach (var item in localHandlers)
{
services.AddTransient(item);
}
services.Configure<LocalEventBusOptions>(options =>
{
options.Handlers.AddIfNotContains(localHandlers);
});
}
}
源码里面要注意好以下几点,EventBus是单例的因为全局引用,但是Handler是瞬时,因为每次执行完就可以了。
测试
新建一个空项目注册一个Handler.测试结果放在下面了
public class WeatherQueryHandler : ILocalEventHandler<WeatherQueryEto>
{
public Task HandleEventAsync(WeatherQueryEto eventData)
{
Console.WriteLine(eventData.Name);
return Task.CompletedTask;
}
}
public class WeatherQueryEto
{
public string Name { get; set; }
}
[HttpGet]
public IEnumerable<WeatherForecast> Get()
{
var rng = new Random();
// 测试发送
_eventBus.PublishAsync(new WeatherQueryEto() { Name = "测试"+ rng.Next(-20, 55) });
return Enumerable.Range(1, 5).Select(index => new WeatherForecast
{
Date = DateTime.Now.AddDays(index),
TemperatureC = rng.Next(-20, 55),
Summary = Summaries[rng.Next(Summaries.Length)]
})
.ToArray();
}
结语
本次挑选了一个比较简单的示例来讲,整个EventBus我应该分成3篇 下一篇我来讲分布式EventBus。
最后欢迎各位读者关注我的博客, https://github.com/MrChuJiu/Dppt/tree/master/src 欢迎大家Star
另外这里有个社区地址(https://github.com/MrChuJiu/Dppt/discussions),如果大家有技术点希望我提前档期可以写在这里,希望本项目助力我们一起成长