zoukankan      html  css  js  c++  java
  • 看eShopOnContainers学一个EventBus

    最近在看微软eShopOnContainers 项目,看到事件总线觉得不错,和大家分享一下

    看完此文你将获得什么?

    1. eShop中是如何设计事件总线的
    2. 实现一个InMemory事件总线eShop中是没有InMemory实现的,这算是一个小小小的挑战

    发布订阅模式

    发布订阅模式可以让应用程序组件之间解耦,这是我们使用这种模式最重要的理由之一,如果你完全不知道这个东西,建议你先通过搜索引擎了解一下这种模式,网上的资料很多这里就不再赘述了。

    eShop中的EventBus就是基于这种模式的发布/订阅
    发布订阅模式核心概念有三个:发布者、订阅者、调度中心,这些概念在消息队列中就是生产者、消费者、MQ实例

    在eShop中有两个EventBus的实现:

    • 基于RabbitMq的EventBusRabbitMQ
    • 基于AzureServiceBus的EventBusServiceBus

    IEventBus开始

    先来看一看,所有EventBus的接口IEventBus

    public interface IEventBus
    {
        void Publish(IntegrationEvent @event);
    
        void Subscribe<T, TH>()
            where T : IntegrationEvent
            where TH : IIntegrationEventHandler<T>;
    
        void SubscribeDynamic<TH>(string eventName)
            where TH : IDynamicIntegrationEventHandler;
    
        void UnsubscribeDynamic<TH>(string eventName)
            where TH : IDynamicIntegrationEventHandler;
    
        void Unsubscribe<T, TH>()
            where TH : IIntegrationEventHandler<T>
            where T : IntegrationEvent;
    }
    

    嗯,乍一看看是有点眼晕的,仔细看它的核心功能只有三个:

    1. Publish 发布
    2. Subscribe 订阅
    3. Unsubscribe 取消订阅

    这对应着发布订阅模式的基本概念,不过对于事件总线的接口添加了许多约束:

    1. 发布的内容(消息)必须是IntegrationEvent及其子类
    2. 订阅事件必须指明要订阅事件的类型,并附带处理器类型
    3. 处理器必须是IIntegrationEventHandler的实现类

    Ok,看到这里先不要管Dynamic相关的方法,然后记住这个两个关键点:

    1. 事件必须继承IntegrationEvent
    2. 处理器必须实现IIntegrationEventHandler<T>TIntegrationEvent子类

    另外,看下 IntegrationEvent有什么

    public class IntegrationEvent
    {
        public IntegrationEvent()
        {
            Id = Guid.NewGuid();
            CreationDate = DateTime.UtcNow;
        }
    
        public Guid Id  { get; }
        public DateTime CreationDate { get; }
    }
    

    IEventBusSubscriptionsManager是什么

    public interface IEventBusSubscriptionsManager
    {
        bool IsEmpty { get; }
        event EventHandler<string> OnEventRemoved;
        void AddDynamicSubscription<TH>(string eventName)
           where TH : IDynamicIntegrationEventHandler;
    
        void AddSubscription<T, TH>()
           where T : IntegrationEvent
           where TH : IIntegrationEventHandler<T>;
    
        void RemoveSubscription<T, TH>()
             where TH : IIntegrationEventHandler<T>
             where T : IntegrationEvent;
        void RemoveDynamicSubscription<TH>(string eventName)
            where TH : IDynamicIntegrationEventHandler;
    
        bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent;
        bool HasSubscriptionsForEvent(string eventName);
        Type GetEventTypeByName(string eventName);
        void Clear();
        IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent;
        IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName);
        string GetEventKey<T>();
    }
    

    这个接口看起来稍显复杂些,我们来简化下看看:

    public interface IEventBusSubscriptionsManager
    {
        void AddSubscription<T, TH>()
        void RemoveSubscription<T, TH>()
        IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() 
    }
    

    最终,这三个方法就是我们要关注的,添加订阅、移除订阅、获取指定事件的订阅信息。

    SubscriptionInfo是什么?

    public bool IsDynamic { get; }
    public Type HandlerType{ get; }
    

    SubscriptionInfo中只有两个信息,这是不是一个Dynamic类型的Event以及这个Event所对应的处理器的类型。

    这是你可能会有另一个疑问:

    这个和IEventBus有什么关系?

    1. IEventBusSubscriptionsManager含有更多功能:查看是否有订阅,获取事件的Type,获取事件的处理器等等

    2. IEventBusSubscriptionsManagerIEventBus使用,在RabbitMq和ServiceBus的实现中,都使用Manager去存储事件的信息,例如下面的代码:

       public void Subscribe<T, TH>()
           where T : IntegrationEvent
           where TH : IIntegrationEventHandler<T>
       {
           // 查询事件的全名
           var eventName = _subsManager.GetEventKey<T>();
      
           //向mq添加注册
           DoInternalSubscription(eventName);
      
           // 向manager添加订阅
           _subsManager.AddSubscription<T, TH>();
       }
      
       private void DoInternalSubscription(string eventName)
       {
           var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
           if (!containsKey)
           {
               if (!_persistentConnection.IsConnected)
               {
                   _persistentConnection.TryConnect();
               }
      
               using (var channel = _persistentConnection.CreateModel())
               {
                   channel.QueueBind(queue: _queueName,
                                       exchange: BROKER_NAME,
                                       routingKey: eventName);
               }
           }
       }
      

    查询事件的名字是manager做的,订阅的时候是先向mq添加订阅,之后又加到manager中,manager管理着订阅的基本信息。

    另外一个重要功能是获取事件的处理器信息,在rabbit mq的实现中,ProcessEvent方法中用manager获取了事件的处理器,再用依赖注入获得处理器的实例,反射调用Handle方法处理事件信息:

        private async Task ProcessEvent(string eventName, string message)
        {
            // 从manager查询信息
            if (_subsManager.HasSubscriptionsForEvent(eventName))
            {
                using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
                {
    
                    // 从manager获取处理器
                    var subscriptions = _subsManager.GetHandlersForEvent(eventName);
                    foreach (var subscription in subscriptions)
                    {
    
                        // Di + 反射调用,处理事件(两个都是,只是针对是否是dynamic做了不同的处理)
                        if (subscription.IsDynamic)
                        { 
                            var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
                            dynamic eventData = JObject.Parse(message);
                            await handler.Handle(eventData);
                        }
                        else
                        {
                            var eventType = _subsManager.GetEventTypeByName(eventName);
                            var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
                            var handler = scope.ResolveOptional(subscription.HandlerType);
                            var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
                            await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
                        }
                    }
                }
            }
        }
    

    IEventBusSubscriptionsManager的默认实现

    在eShop中只有一个实现就是InMemoryEventBusSubscriptionsManager

    这个类中有两个重要的字段

        private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;
        private readonly List<Type> _eventTypes;
    

    他们分别存储了事件列表和事件处理器信息词典

    接下来就是实现一个

    基于内存的事件总线

    我们要做什么呢?IEventBusSubscriptionsManager 已经有了InMemory的实现了,我们可以直接拿来用,所以我们只需要自己实现一个EventBus就好了

    先贴出最终代码:

    public class InMemoryEventBus : IEventBus
    {
        private readonly IServiceProvider _provider;
        private readonly ILogger<InMemoryEventBus> _logger;
        private readonly ISubscriptionsManager _manager;
        private readonly IList<IntegrationEvent> _events;
        public InMemoryEventBus(
            IServiceProvider provider,
            ILogger<InMemoryEventBus> logger, 
            ISubscriptionsManager manager)
        {
            _provider = provider;
            _logger = logger;
            _manager = manager;
        }
    
        public void Publish(IntegrationEvent e)
        {
    
            var eventType = e.GetType();
            var handlers = _manager.GetHandlersForEvent(eventType.FullName);
    
            foreach (var handlerInfo in handlers)
            {
                var handler = _provider.GetService(handlerInfo.HandlerType);
    
                var method = handlerInfo.HandlerType.GetMethod("Handle");
    
                method.Invoke(handler, new object[] { e });
            }
        }
    
        public void Subscribe<T, TH>()
            where T : IntegrationEvent
            where TH : IIntegrationEventHandler<T>
        {
    
            _manager.AddSubscription<T, TH>();
    
        }
    
        public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
        {
            throw new NotImplementedException();
        }
    
        public void Unsubscribe<T, TH>()
            where T : IntegrationEvent
            where TH : IIntegrationEventHandler<T>
        {
            _manager.RemoveSubscription<T, TH>();
        }
    
        public void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
        {
            throw new NotImplementedException();
        }
    }
    

    首先构造函数中声明我们要使用的东西:

    public InMemoryEventBus(
        IServiceProvider provider,
        ILogger<InMemoryEventBus> logger, 
        ISubscriptionsManager manager)
    {
        _provider = provider;
        _logger = logger;
        _manager = manager;
    }
    

    这里要注意的就是IServiceProvider provider这是 DI容器,当我们在切实处理事件的时候我们选择从DI获取处理器的实例,而不是反射创建,这要做的好处在于,处理器可以依赖于其它东西,并且可以是单例的

    public void Subscribe<T, TH>()
        where T : IntegrationEvent
        where TH : IIntegrationEventHandler<T>
    {
    
        _manager.AddSubscription<T, TH>();
    
    }
    
    public void Unsubscribe<T, TH>()
        where T : IntegrationEvent
        where TH : IIntegrationEventHandler<T>
    {
        _manager.RemoveSubscription<T, TH>();
    }
    

    订阅和取消订阅很简单,因为我们是InMemory的所以只调用了manager的方法。

    接下来就是最重要的Publish方法,实现Publish有两种方式:

    1. 使用额外的线程和Queue让发布和处理异步

    2. 为了简单起见,我们先写个简单易懂的同步的

       public void Publish(IntegrationEvent e)
       {
           // 首先要拿到集成事件的Type信息
           var eventType = e.GetType();
      
           // 获取属于这个事件的处理器列表,可能有很多,注意获得的是SubscriptionInfo
           var handlers = _manager.GetHandlersForEvent(eventType.FullName);
      
           // 不解释循环
           foreach (var handlerInfo in handlers)
           {
               // 从DI中获取类型的实例
               var handler = _provider.GetService(handlerInfo.HandlerType);
      
               // 拿到Handle方法
               var method = handlerInfo.HandlerType.GetMethod("Handle");
      
               // 调用方法
               method.Invoke(handler, new object[] { e });
           }
       }
      

    OK,我们的InMemoryEventBus就写好了!

    要实践这个InMemoryEventBus,那么还需要一个IntegrationEvent的子类,和一个IIntegrationEventHandler<T>的实现类,这些都不难,例如我们做一个添加用户的事件,A在添加用户后,发起一个事件并将新用户的名字作为事件数据,B去订阅事件,并在自己的处理器中处理名字信息。

    思路是这样的:

    1. 写一个 AddUserEvent:IntegrationEvent,里面有一个UserId和一个UserName

    2. 写一个AddUserEventHandler:IIntegrationEventHandler<AddUserEvent>,在Handle方法中输出UserId和Name到日志。

    3. 注册DI,你要注册下面这些服务:

       IEventBus=>InMemoryEventBus
       ISubscriptionsManager=>InMemorySubscriptionsManager
       AddUserEventHandler=>AddUserEventHandler
      
    4. 在Startup中为刚刚写的事件和处理器添加订阅(在这里已经可以获取到IEventBus实例了)

    5. 写一个Api接口或是什么,调用IEventBus的Publish方法,new 一个新的AddUserEvent作为参数传进去。

    OK!到这里一个切实可用的InMemoryEventBus就可以使用了。

  • 相关阅读:
    Key Figure、Exception Aggreagion、Non-Cumulative KeyFigure
    特征创建:Reference Characteristic、Template
    Compounding绑定属性
    特征的SID表、M表、P表、Q表、X表、Y表、T表
    特征的Attribute Only选项
    将InfoObject作为信息提供者Characteristic is InfoProvider
    滚动RollUp、压缩
    Aggregation 聚集
    VirtualProvider (VirtualCube)虚拟立方体
    自建数据源(RSO2)、及数据源增强
  • 原文地址:https://www.cnblogs.com/rocketRobin/p/8510198.html
Copyright © 2011-2022 走看看