zoukankan      html  css  js  c++  java
  • EventBus模块

    EventBus 是一种事件发布订阅模式,借助 EventBus 我们可以很好的实现组件之间,服务之间,系统之间的解耦以及相互通信的问题。

     

    EventBus 相当于是定义一些抽象接口,可以用 MQ 来实现EventBus

    1、模块的预处理模块,定义预处理方法,增加实现ILocalEventHandler,IDistributedEventHandler的服务增加到配置

    LocalEventBusOptions、DistributedEventBusOptions,分别存储ITypeList<IEventHandler> Handlers { get; }

    2、IEventHandle接口,分ILocalEventHandler<in TEvent>,IDistributedEventHandler

            /// <summary>
            /// Handler handles the event by implementing this method.
            /// </summary>
            /// <param name="eventData">Event data</param>
            Task HandleEventAsync(TEvent eventData);

    IEventHandlerFactory,负责得到或创建事件处理器,三个实现IocEventHandlerFactory,SingleInstanceHandlerFactory,TransientEventHandlerFactory

     IEventHandlerDisposeWrapper GetHandler();
    
    
      public interface IEventHandlerDisposeWrapper : IDisposable
        {
            IEventHandler EventHandler { get; }
        }
    
      public class EventHandlerDisposeWrapper : IEventHandlerDisposeWrapper
        {
            public IEventHandler EventHandler { get; }
    
            private readonly Action _disposeAction;
    
            public EventHandlerDisposeWrapper(IEventHandler eventHandler, Action disposeAction = null)
            {
                _disposeAction = disposeAction;
                EventHandler = eventHandler;
            }
    
            public void Dispose()
            {
                _disposeAction?.Invoke();
            }
        }

    3、EventBus:用来发布/订阅/取消订阅事件

    public abstract Task PublishAsync(Type eventType, object eventData);
    
    public abstract IDisposable Subscribe(Type eventType, IEventHandlerFactory factory);
    
    
     public abstract void Unsubscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class;
    
     public abstract void Unsubscribe(Type eventType, IEventHandler handler);
     public abstract void Unsubscribe(Type eventType, IEventHandlerFactory factory);
    public abstract void Unsubscribe(Type eventType, IEventHandlerFactory factory);
    public abstract void UnsubscribeAll(Type eventType);

    二、AbpRabbitMqModule

     [DependsOn(
            typeof(AbpJsonModule),
            typeof(AbpThreadingModule)
            )]
        public class AbpRabbitMqModule : AbpModule
        {
            public override void ConfigureServices(ServiceConfigurationContext context)
            {
                var configuration = context.Services.GetConfiguration();
                Configure<AbpRabbitMqOptions>(configuration.GetSection("RabbitMQ"));
            }
    
            public override void OnApplicationShutdown(ApplicationShutdownContext context)
            {
                context.ServiceProvider
                    .GetRequiredService<IChannelPool>()
                    .Dispose();
    
                context.ServiceProvider
                    .GetRequiredService<IConnectionPool>()
                    .Dispose();
            }

    三、AbpEventBusRabbitMqModule

    public override Task PublishAsync(Type eventType, object eventData)
            {
                var eventName = EventNameAttribute.GetNameOrDefault(eventType);
                var body = Serializer.Serialize(eventData);
    
                using (var channel = ConnectionPool.Get(RabbitMqEventBusOptions.ConnectionName).CreateModel())
                {
                    channel.ExchangeDeclare(
                        RabbitMqEventBusOptions.ExchangeName,
                        "direct",
                        durable: true
                    );
                    
                    var properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent;
    
                    channel.BasicPublish(
                       exchange: RabbitMqEventBusOptions.ExchangeName,
                        routingKey: eventName,
                        mandatory: true,
                        basicProperties: properties,
                        body: body
                    );
                }
    
                return Task.CompletedTask;
            }
            public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
            {
                var handlerFactories = GetOrCreateHandlerFactories(eventType);
                
                handlerFactories.Add(factory);
    
                if (handlerFactories.Count == 1) //TODO: Multi-threading!
                {
                    Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType));
                }
    
                return new EventHandlerFactoryUnregistrar(this, eventType, factory);
            }

    一、AbpLocalEventBusOptions,AbpDistributedEventBusOptions

    IEventHandler是一个空方法,分为ILocalEventHandler<TEvent>,IDistributedEventHandler<TEvent>方法,它需要注入到容器。

    具有HandleEventAsync(EventType eventData))方法

    保存着IEventHandler的实现类型,它通过OnRegistered遍历IServiceCollection服务的委托方法,将ILocalEventHandler<EventType>方法保存在AbpLocalEventBusOptions下列表

    IDistributedEventHandler<EventType>保存在AbpDistributedEventBusOptions下列表里

    而ActionEventHandle<T> 是本地ILocalEventHandle的方法,它是 TransientDependency方法,它执行是委托方法。

      

    二、IEventBus它执行的是发布事件,订阅和取消订阅,EventBus是抽象方法,它的实现有NullDistributedEventBus, 这个不注入容器的

    它的实现方法将Option的IEventHandler列表方法,将IEventHandle<eventType>eventType以及EventHandlerFactory(多个实例),即从容器根据HandlerType获取IEventHandler,进行订阅在EventBus里 

    (1)、Local

    (1)LocalBus:ISingletonDependency
    订阅方法:保存EventType,以及工厂方法保存在字典里

    protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }

    IEventHandleFactory工厂方法GetHandler(),IsInFactories(List(IEventHandlerFactory)),有以下3个
    >>SingleInstanceHandlerFactory:
    IEventHandle来自外部赋值,每次执行都是使用同一个实例
    Subscribe(Type eventType, IEventHandler handler)
    Subscribe<TEvent>(TEvent委托,使用是的ActionEventHandler<TEvent>)

     public IEventHandlerDisposeWrapper GetHandler()
            {
                return new EventHandlerDisposeWrapper(HandlerInstance);
            }
    
    
    >>TransientEventHandlerFactory:每次都创建
    Subscribe<TEvent, THandler>()  
     protected virtual IEventHandler CreateHandler()
            {
                return (IEventHandler) Activator.CreateInstance(HandlerType);
            }
            public virtual IEventHandlerDisposeWrapper GetHandler()
            {
                var handler = CreateHandler();
                return new EventHandlerDisposeWrapper(
                    handler,
                    () => (handler as IDisposable)?.Dispose()
                );
            }
    
    
    
    
    >>IocEventHandlerFactory,Option的默认,在容器里依赖注入服务
    Subscribe(Type eventType, IEventHandlerFactory factory)
         public IEventHandlerDisposeWrapper GetHandler()
            {
                var scope = ScopeFactory.CreateScope();
                return new EventHandlerDisposeWrapper(
                    (IEventHandler) scope.ServiceProvider.GetRequiredService(HandlerType),
                    () => scope.Dispose()
                );
            }
    
    

    如果EventType是泛型,泛型的参数只有一个,而且这个泛型是继承自IEventDataWithInheritableGenericArgument,实现之一>entityEventData<TEntity>,

    entityEventData<TEntity>    实现类型 entityChangedEventData<T> ,EntityChangingEventData<T>

    entityChangedEventData<T>   其实现类型有EntityCreatedEventData<T>、EntityUpdatedEventData<T>、EntityDeletedEventData<T>

    EntityChangingEventData<T>   其实现类型有<T> ,EntityUpdatingEventData<T>,EntityDeletingEventData<T>

    EntityChangeReport 三个List,在AbpDbContext的SaveChangesAsync进行创建,赋值

    1、EntityChangeEntry (类型是否Created、Updated、Deleted),使用的是options.EtoMappings,若没有,使用是 EntityEto

    EntityCreatedEto<EntityEto>,EntityUpdatedEto<EntityEto>,EntityDeletedEto<EntityEto>

    2、DomainEvents    (LocalEvent)

    3、DistributedEvents:  使用的Eto,在聚合根手工赋值,不是使用automaper

    LocalBus提交的类型是EntityCreatingEventData<>,EntityCreatedEventData<>

    DistributedEventBus提交是EntityCreatedEto<>,EntityUpdatedEto<>,EntityDeletedEto<>

    依赖EntityToEtoMapper如何将entity转成dto,再发送到EventBus里

       private void ConfigureDistributedEventBus()
            {
               Configure<AbpDistributedEventBusOptions>(options =>
               {
                   options.EtoMappings.Add<Person, PersonEto>();
               });
            }

    另外是LocalEvent、DistributedEvents

    TEntity泛型参数的BaseType,也要执行一次

    直到底层Object(Object是一切对象的基型),也要执行

     if (eventType.GetTypeInfo().IsGenericType &&
                    eventType.GetGenericArguments().Length == 1 &&
                    typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(eventType))
                {
                    var genericArg = eventType.GetGenericArguments()[0];
                    var baseArg = genericArg.GetTypeInfo().BaseType;
                    if (baseArg != null)
                    {
                        var baseEventType = eventType.GetGenericTypeDefinition().MakeGenericType(baseArg);
                        var constructorArgs = ((IEventDataWithInheritableGenericArgument)eventData).GetConstructorArgs();
                        var baseEventData = Activator.CreateInstance(baseEventType, constructorArgs);
                        await PublishAsync(baseEventType, baseEventData);
                    }
                }

     将Option的handler订阅在EventBus里

     protected virtual void SubscribeHandlers(ITypeList<IEventHandler> handlers)
            {
                foreach (var handler in handlers)
                {
                    var interfaces = handler.GetInterfaces();
                    foreach (var @interface in interfaces)
                    {
                        if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(@interface))
                        {
                            continue;
                        }
    
                        var genericArgs = @interface.GetGenericArguments();
                        if (genericArgs.Length == 1)
                        {
                            Subscribe(genericArgs[0], new IocEventHandlerFactory(ServiceScopeFactory, handler));
                        }
                    }
                }
            }
    取消订阅:取消对应类型TEvent,ACtion方法==》ActionHandle<TEvent> 处理Handle   ====>针对SingleInstanceHandlerFactory
    工厂方法,取消所有订阅,=》对列表的Factories
     public abstract void Unsubscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class;
    public abstract void Unsubscribe(Type eventType, IEventHandler handler); public abstract void Unsubscribe(Type eventType, IEventHandlerFactory factory); public abstract void UnsubscribeAll(Type eventType);
     

    (2)Distributed

    1)LocalDistributedEventBus,它是TryRegister方法,它是SingletonDependency方法,它暴露是的服务接口是IDistributedEventBus以LocalDistributedEventBus

     并没有继承EventBus抽象方法

    三、Publish方法

    Task PublishAsync(Type eventType, object eventData);
    Task PublishAsync<TEvent>(TEvent eventData)
                where TEvent : class;

    执行ILocalEventHandler和 IDistributedEventHandler方法

    查找对应EventType有多少个可执行的注册的IEventHandleFactory, 每个EventType有多个List<IEventHandlerFactory> EventHandlerFactories

    还需注意其基类也要执行(见下红色),比如执行是EntityCreatedEventData,其下的EntityChangedEventData也应该执行

        protected override IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
            {
                var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>();
    
                foreach (var handlerFactory in HandlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)))
                {
                    handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
                }
    
                return handlerFactoryList.ToArray();
            }
      private static bool ShouldTriggerEventForHandler(Type targetEventType, Type handlerEventType)
            {
                //Should trigger same type
                if (handlerEventType == targetEventType)
                {
                    return true;
                }
    
                //Should trigger for inherited types
                if (handlerEventType.IsAssignableFrom(targetEventType))
                {
                    return true;
                }
    
                return false;
            }
       await new SynchronizationContextRemover();
    
                foreach (var handlerFactories in GetHandlerFactories(eventType))
                {
                    foreach (var handlerFactory in handlerFactories.EventHandlerFactories)
                    {
                        await TriggerHandlerAsync(handlerFactory, handlerFactories.EventType, eventData, exceptions);
                    }
                }
     protected virtual async Task TriggerHandlerAsync(IEventHandlerFactory asyncHandlerFactory, Type eventType, object eventData, List<Exception> exceptions)
            {
                using (var eventHandlerWrapper = asyncHandlerFactory.GetHandler())
                {
                    try
                    {
                        var handlerType = eventHandlerWrapper.EventHandler.GetType();
    
                        if (ReflectionHelper.IsAssignableToGenericType(handlerType, typeof(ILocalEventHandler<>)))
                        {
                            var method = typeof(ILocalEventHandler<>)
                                .MakeGenericType(eventType)
                                .GetMethod(
                                    nameof(ILocalEventHandler<object>.HandleEventAsync),
                                    new[] { eventType }
                                );
    
                            await (Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData });
                        }
                        else if (ReflectionHelper.IsAssignableToGenericType(handlerType, typeof(IDistributedEventHandler<>)))
                        {
                            var method = typeof(IDistributedEventHandler<>)
                                .MakeGenericType(eventType)
                                .GetMethod(
                                    nameof(IDistributedEventHandler<object>.HandleEventAsync),
                                    new[] { eventType }
                                );
    
                            await (Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData });
                        }
                        else
                        {
                            throw new AbpException("The object instance is not an event handler. Object type: " + handlerType.AssemblyQualifiedName);
                        }
                    }
                    catch (TargetInvocationException ex)
                    {
                        exceptions.Add(ex.InnerException);
                    }
                    catch (Exception ex)
                    {
                        exceptions.Add(ex);
                    }
                }
            }

    四、RabbbitMQ

  • 相关阅读:
    PAT (Advanced Level) Practice 1100 Mars Numbers (20分)
    PAT (Advanced Level) Practice 1107 Social Clusters (30分) (并查集)
    PAT (Advanced Level) Practice 1105 Spiral Matrix (25分)
    PAT (Advanced Level) Practice 1104 Sum of Number Segments (20分)
    PAT (Advanced Level) Practice 1111 Online Map (30分) (两次迪杰斯特拉混合)
    PAT (Advanced Level) Practice 1110 Complete Binary Tree (25分) (完全二叉树的判断+分享致命婴幼儿错误)
    PAT (Advanced Level) Practice 1109 Group Photo (25分)
    PAT (Advanced Level) Practice 1108 Finding Average (20分)
    P6225 [eJOI2019]异或橙子 树状数组 异或 位运算
    P4124 [CQOI2016]手机号码 数位DP
  • 原文地址:https://www.cnblogs.com/cloudsu/p/11230985.html
Copyright © 2011-2022 走看看