EventBus 是一种事件发布订阅模式,借助 EventBus 我们可以很好的实现组件之间,服务之间,系统之间的解耦以及相互通信的问题。
EventBus 相当于是定义一些抽象接口,可以用 MQ 来实现EventBus
1、模块的预处理模块,定义预处理方法,增加实现ILocalEventHandler,IDistributedEventHandler的服务增加到配置
LocalEventBusOptions、DistributedEventBusOptions,分别存储ITypeList<IEventHandler> Handlers { get; }
2、IEventHandle接口,分ILocalEventHandler<in TEvent>,IDistributedEventHandler
/// <summary> /// Handler handles the event by implementing this method. /// </summary> /// <param name="eventData">Event data</param> Task HandleEventAsync(TEvent eventData);
IEventHandlerFactory,负责得到或创建事件处理器,三个实现IocEventHandlerFactory,SingleInstanceHandlerFactory,TransientEventHandlerFactory
IEventHandlerDisposeWrapper GetHandler(); public interface IEventHandlerDisposeWrapper : IDisposable { IEventHandler EventHandler { get; } } public class EventHandlerDisposeWrapper : IEventHandlerDisposeWrapper { public IEventHandler EventHandler { get; } private readonly Action _disposeAction; public EventHandlerDisposeWrapper(IEventHandler eventHandler, Action disposeAction = null) { _disposeAction = disposeAction; EventHandler = eventHandler; } public void Dispose() { _disposeAction?.Invoke(); } }
3、EventBus
:用来发布/订阅/取消订阅事件
public abstract Task PublishAsync(Type eventType, object eventData); public abstract IDisposable Subscribe(Type eventType, IEventHandlerFactory factory); public abstract void Unsubscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class; public abstract void Unsubscribe(Type eventType, IEventHandler handler); public abstract void Unsubscribe(Type eventType, IEventHandlerFactory factory); public abstract void Unsubscribe(Type eventType, IEventHandlerFactory factory); public abstract void UnsubscribeAll(Type eventType);
二、AbpRabbitMqModule
[DependsOn( typeof(AbpJsonModule), typeof(AbpThreadingModule) )] public class AbpRabbitMqModule : AbpModule { public override void ConfigureServices(ServiceConfigurationContext context) { var configuration = context.Services.GetConfiguration(); Configure<AbpRabbitMqOptions>(configuration.GetSection("RabbitMQ")); } public override void OnApplicationShutdown(ApplicationShutdownContext context) { context.ServiceProvider .GetRequiredService<IChannelPool>() .Dispose(); context.ServiceProvider .GetRequiredService<IConnectionPool>() .Dispose(); }
三、AbpEventBusRabbitMqModule
public override Task PublishAsync(Type eventType, object eventData) { var eventName = EventNameAttribute.GetNameOrDefault(eventType); var body = Serializer.Serialize(eventData); using (var channel = ConnectionPool.Get(RabbitMqEventBusOptions.ConnectionName).CreateModel()) { channel.ExchangeDeclare( RabbitMqEventBusOptions.ExchangeName, "direct", durable: true ); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent; channel.BasicPublish( exchange: RabbitMqEventBusOptions.ExchangeName, routingKey: eventName, mandatory: true, basicProperties: properties, body: body ); } return Task.CompletedTask; }
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); handlerFactories.Add(factory); if (handlerFactories.Count == 1) //TODO: Multi-threading! { Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType)); } return new EventHandlerFactoryUnregistrar(this, eventType, factory); }
一、AbpLocalEventBusOptions,AbpDistributedEventBusOptions
IEventHandler是一个空方法,分为ILocalEventHandler<TEvent>,IDistributedEventHandler<TEvent>方法,它需要注入到容器。
具有HandleEventAsync(EventType eventData))方法
保存着IEventHandler的实现类型,它通过OnRegistered遍历IServiceCollection服务的委托方法,将ILocalEventHandler<EventType>方法保存在AbpLocalEventBusOptions下列表
将IDistributedEventHandler<EventType>保存在AbpDistributedEventBusOptions下列表里
而ActionEventHandle<T> 是本地ILocalEventHandle的方法,它是 TransientDependency方法,它执行是委托方法。
二、IEventBus,它执行的是发布事件,订阅和取消订阅,EventBus是抽象方法,它的实现有NullDistributedEventBus, 这个不注入容器的
它的实现方法将Option的IEventHandler列表方法,将IEventHandle<eventType>的eventType以及EventHandlerFactory(多个实例),即从容器根据HandlerType获取IEventHandler,进行订阅在EventBus里
(1)、Local
(1)LocalBus:ISingletonDependency
订阅方法:保存EventType,以及工厂方法保存在字典里
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
IEventHandleFactory工厂方法GetHandler(),IsInFactories(List(IEventHandlerFactory)),有以下3个
>>SingleInstanceHandlerFactory:
IEventHandle来自外部赋值,每次执行都是使用同一个实例
Subscribe(Type eventType, IEventHandler handler)
Subscribe<TEvent>(TEvent委托,使用是的ActionEventHandler<TEvent>)
public IEventHandlerDisposeWrapper GetHandler()
{
return new EventHandlerDisposeWrapper(HandlerInstance);
}
>>TransientEventHandlerFactory:每次都创建
Subscribe<TEvent, THandler>()
protected virtual IEventHandler CreateHandler() { return (IEventHandler) Activator.CreateInstance(HandlerType); } public virtual IEventHandlerDisposeWrapper GetHandler() { var handler = CreateHandler(); return new EventHandlerDisposeWrapper( handler, () => (handler as IDisposable)?.Dispose() ); }
>>IocEventHandlerFactory,Option的默认,在容器里依赖注入服务
Subscribe(Type eventType, IEventHandlerFactory factory)
public IEventHandlerDisposeWrapper GetHandler()
{
var scope = ScopeFactory.CreateScope();
return new EventHandlerDisposeWrapper(
(IEventHandler) scope.ServiceProvider.GetRequiredService(HandlerType),
() => scope.Dispose()
);
}
如果EventType是泛型,泛型的参数只有一个,而且这个泛型是继承自IEventDataWithInheritableGenericArgument,实现之一>entityEventData<TEntity>,
entityEventData<TEntity> 实现类型 entityChangedEventData<T> ,EntityChangingEventData<T>
entityChangedEventData<T> 其实现类型有EntityCreatedEventData<T>、EntityUpdatedEventData<T>、EntityDeletedEventData<T>
EntityChangingEventData<T> 其实现类型有<T> ,EntityUpdatingEventData<T>,EntityDeletingEventData<T>
EntityChangeReport 三个List,在AbpDbContext的SaveChangesAsync进行创建,赋值
1、EntityChangeEntry (类型是否Created、Updated、Deleted),使用的是options.EtoMappings,若没有,使用是 EntityEto
EntityCreatedEto<EntityEto>,EntityUpdatedEto<EntityEto>,EntityDeletedEto<EntityEto>
2、DomainEvents (LocalEvent)
3、DistributedEvents: 使用的Eto,在聚合根手工赋值,不是使用automaper
LocalBus提交的类型是EntityCreatingEventData<>,EntityCreatedEventData<>
DistributedEventBus提交是EntityCreatedEto<>,EntityUpdatedEto<>,EntityDeletedEto<>
依赖EntityToEtoMapper如何将entity转成dto,再发送到EventBus里
private void ConfigureDistributedEventBus()
{
Configure<AbpDistributedEventBusOptions>(options =>
{
options.EtoMappings.Add<Person, PersonEto>();
});
}
另外是LocalEvent、DistributedEvents
TEntity泛型参数的BaseType,也要执行一次
直到底层Object(Object是一切对象的基型),也要执行
if (eventType.GetTypeInfo().IsGenericType &&
eventType.GetGenericArguments().Length == 1 &&
typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(eventType))
{
var genericArg = eventType.GetGenericArguments()[0];
var baseArg = genericArg.GetTypeInfo().BaseType;
if (baseArg != null)
{
var baseEventType = eventType.GetGenericTypeDefinition().MakeGenericType(baseArg);
var constructorArgs = ((IEventDataWithInheritableGenericArgument)eventData).GetConstructorArgs();
var baseEventData = Activator.CreateInstance(baseEventType, constructorArgs);
await PublishAsync(baseEventType, baseEventData);
}
}
将Option的handler订阅在EventBus里
protected virtual void SubscribeHandlers(ITypeList<IEventHandler> handlers)
{
foreach (var handler in handlers)
{
var interfaces = handler.GetInterfaces();
foreach (var @interface in interfaces)
{
if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(@interface))
{
continue;
}
var genericArgs = @interface.GetGenericArguments();
if (genericArgs.Length == 1)
{
Subscribe(genericArgs[0], new IocEventHandlerFactory(ServiceScopeFactory, handler));
}
}
}
}
取消订阅:取消对应类型TEvent,ACtion方法==》ActionHandle<TEvent> 处理Handle ====>针对SingleInstanceHandlerFactory
工厂方法,取消所有订阅,=》对列表的Factories
public abstract void Unsubscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class;
public abstract void Unsubscribe(Type eventType, IEventHandler handler); public abstract void Unsubscribe(Type eventType, IEventHandlerFactory factory); public abstract void UnsubscribeAll(Type eventType);
(2)Distributed
1)LocalDistributedEventBus,它是TryRegister方法,它是SingletonDependency方法,它暴露是的服务接口是IDistributedEventBus以LocalDistributedEventBus
并没有继承EventBus抽象方法
三、Publish方法
Task PublishAsync(Type eventType, object eventData);
Task PublishAsync<TEvent>(TEvent eventData)
where TEvent : class;
执行ILocalEventHandler和 IDistributedEventHandler方法
查找对应EventType有多少个可执行的注册的IEventHandleFactory, 每个EventType有多个List<IEventHandlerFactory> EventHandlerFactories
还需注意其基类也要执行(见下红色),比如执行是EntityCreatedEventData,其下的EntityChangedEventData也应该执行
protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
{
var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>();
foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)))
{
handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
}
return handlerFactoryList.ToArray();
}
private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
{
//Should trigger same type
if (handlerEventType == targetEventType)
{
return true;
}
//Should trigger for inherited types
if (handlerEventType.IsAssignableFrom(targetEventType))
{
return true;
}
return false;
}
await new SynchronizationContextRemover();
foreach (var handlerFactories in GetHandlerFactories(eventType))
{
foreach (var handlerFactory in handlerFactories.EventHandlerFactories)
{
await TriggerHandlerAsync(handlerFactory, handlerFactories.EventType, eventData, exceptions);
}
}
protected virtual async Task TriggerHandlerAsync(IEventHandlerFactory asyncHandlerFactory, Type eventType, object eventData, List<Exception> exceptions)
{
using (var eventHandlerWrapper = asyncHandlerFactory.GetHandler())
{
try
{
var handlerType = eventHandlerWrapper.EventHandler.GetType();
if (ReflectionHelper.IsAssignableToGenericType(handlerType, typeof(ILocalEventHandler<>)))
{
var method = typeof(ILocalEventHandler<>)
.MakeGenericType(eventType)
.GetMethod(
nameof(ILocalEventHandler<object>.HandleEventAsync),
new[] { eventType }
);
await (Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData });
}
else if (ReflectionHelper.IsAssignableToGenericType(handlerType, typeof(IDistributedEventHandler<>)))
{
var method = typeof(IDistributedEventHandler<>)
.MakeGenericType(eventType)
.GetMethod(
nameof(IDistributedEventHandler<object>.HandleEventAsync),
new[] { eventType }
);
await (Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData });
}
else
{
throw new AbpException("The object instance is not an event handler. Object type: " + handlerType.AssemblyQualifiedName);
}
}
catch (TargetInvocationException ex)
{
exceptions.Add(ex.InnerException);
}
catch (Exception ex)
{
exceptions.Add(ex);
}
}
}
四、RabbbitMQ