zoukankan      html  css  js  c++  java
  • 动手造轮子:实现一个简单的 EventBus

    动手造轮子:实现一个简单的 EventBus

    Intro

    EventBus 是一种事件发布订阅模式,通过 EventBus 我们可以很方便的实现解耦,将事件的发起和事件的处理的很好的分隔开来,很好的实现解耦。 微软官方的示例项目 EShopOnContainers 也有在使用 EventBus 。

    这里的 EventBus 实现也是参考借鉴了微软 eShopOnContainers 项目。

    EventBus 处理流程:

    event bus flow

    微服务间使用 EventBus 实现系统间解耦:

    event bus in microservice

    借助 EventBus 我们可以很好的实现组件之间,服务之间,系统之间的解耦以及相互通信的问题。

    起初觉得 EventBus 和 MQ 其实差不多嘛,都是通过异步处理来实现解耦合,高性能。后来看到了下面这张图才算明白为什么要用 EventBus 以及 EventBus 和 MQ 之间的关系,EventBus 是抽象的,可以用MQ来实现 EventBus.

    eventbus implement

    为什么要使用 EventBus

    1. 解耦合(轻松的实现系统间解耦)
    2. 高性能可扩展(每一个事件都是简单独立且不可更改的对象,只需要保存新增的事件,不涉及其他的变更删除操作)
    3. 系统审计(每一个事件都是不可变更的,每一个事件都是可追溯的)
    4. ...

    EventBus 整体架构:

    • IEventBase :所有的事件应该实现这个接口,这个接口定义了事件的唯一id EventId 和事件发生的事件 EventAt

    IEventBase

    • IEventHandler:定义了一个 Handle 方法来处理相应的事件

    IEventHandler

    • IEventStore:所有的事件的处理存储,保存事件的IEventHandler,一般不会直接操作,通过 EventBus 的订阅和取消订阅来操作 EventStore

    IEventStore

    • IEventBus:用来发布/订阅/取消订阅事件,并将事件的某一个 IEventHandler 保存到 EventStore 或从 EventStore 中移除

    IEventBus

    使用示例

    来看一个使用示例,完整代码示例

    internal class EventTest
    {
        public static void MainTest()
        {
            var eventBus = DependencyResolver.Current.ResolveService<IEventBus>();
            eventBus.Subscribe<CounterEvent, CounterEventHandler1>();
            eventBus.Subscribe<CounterEvent, CounterEventHandler2>();
            eventBus.Subscribe<CounterEvent, DelegateEventHandler<CounterEvent>>();
            eventBus.Publish(new CounterEvent { Counter = 1 });
    
            eventBus.Unsubscribe<CounterEvent, CounterEventHandler1>();
            eventBus.Unsubscribe<CounterEvent, DelegateEventHandler<CounterEvent>>();
            eventBus.Publish(new CounterEvent { Counter = 2 });
        }
    }
    
    internal class CounterEvent : EventBase
    {
        public int Counter { get; set; }
    }
    
    internal class CounterEventHandler1 : IEventHandler<CounterEvent>
    {
        public Task Handle(CounterEvent @event)
        {
            LogHelper.GetLogger<CounterEventHandler1>().Info($"Event Info: {@event.ToJson()}, Handler Type:{GetType().FullName}");
            return Task.CompletedTask;
        }
    }
    
    internal class CounterEventHandler2 : IEventHandler<CounterEvent>
    {
        public Task Handle(CounterEvent @event)
        {
            LogHelper.GetLogger<CounterEventHandler2>().Info($"Event Info: {@event.ToJson()}, Handler Type:{GetType().FullName}");
            return Task.CompletedTask;
        }
    }
    

    具体实现

    EventStoreInMemory 实现:

    EventStoreInMemory 是 IEventStore 将数据放在内存中的实现,使用了 ConcurrentDictionary 以及 HashSet 来尽可能的保证高效,具体实现代码如下:

    public class EventStoreInMemory : IEventStore
    {
        private readonly ConcurrentDictionary<string, HashSet<Type>> _eventHandlers = new ConcurrentDictionary<string, HashSet<Type>>();
    
        public bool AddSubscription<TEvent, TEventHandler>()
            where TEvent : IEventBase
            where TEventHandler : IEventHandler<TEvent>
        {
            var eventKey = GetEventKey<TEvent>();
            if (_eventHandlers.ContainsKey(eventKey))
            {
                return _eventHandlers[eventKey].Add(typeof(TEventHandler));
            }
            else
            {
                return _eventHandlers.TryAdd(eventKey, new HashSet<Type>()
                {
                    typeof(TEventHandler)
                });
            }
        }
    
        public bool Clear()
        {
            _eventHandlers.Clear();
            return true;
        }
    
        public ICollection<Type> GetEventHandlerTypes<TEvent>() where TEvent : IEventBase
        {
            if(_eventHandlers.Count == 0)
                return  new Type[0];
            var eventKey = GetEventKey<TEvent>();
            if (_eventHandlers.TryGetValue(eventKey, out var handlers))
            {
                return handlers;
            }
            return new Type[0];
        }
    
        public string GetEventKey<TEvent>()
        {
            return typeof(TEvent).FullName;
        }
    
        public bool HasSubscriptionsForEvent<TEvent>() where TEvent : IEventBase
        {
            if(_eventHandlers.Count == 0)
                return false;
    
            var eventKey = GetEventKey<TEvent>();
            return _eventHandlers.ContainsKey(eventKey);
        }
    
        public bool RemoveSubscription<TEvent, TEventHandler>()
            where TEvent : IEventBase
            where TEventHandler : IEventHandler<TEvent>
        {
            if(_eventHandlers.Count == 0)
                return false;
    
            var eventKey = GetEventKey<TEvent>();
            if (_eventHandlers.ContainsKey(eventKey))
            {
                return _eventHandlers[eventKey].Remove(typeof(TEventHandler));
            }
            return false;
        }
    }
    

    EventBus 的实现,从上面可以看到 EventStore 保存的是 IEventHandler 对应的 Type,在 Publish 的时候根据 Type 从 IoC 容器中取得相应的 Handler 即可,如果没有在 IoC 容器中找到对应的类型,则会尝试创建一个类型实例,然后调用 IEventHandlerHandle 方法,代码如下:

    /// <summary>
    /// EventBus in process
    /// </summary>
    public class EventBus : IEventBus
    {
        private static readonly ILogHelperLogger Logger = Helpers.LogHelper.GetLogger<EventBus>();
    
        private readonly IEventStore _eventStore;
        private readonly IServiceProvider _serviceProvider;
    
        public EventBus(IEventStore eventStore, IServiceProvider serviceProvider = null)
        {
            _eventStore = eventStore;
            _serviceProvider = serviceProvider ?? DependencyResolver.Current;
        }
    
        public bool Publish<TEvent>(TEvent @event) where TEvent : IEventBase
        {
            if (!_eventStore.HasSubscriptionsForEvent<TEvent>())
            {
                return false;
            }
            var handlers = _eventStore.GetEventHandlerTypes<TEvent>();
            if (handlers.Count > 0)
            {
                var handlerTasks = new List<Task>();
                foreach (var handlerType in handlers)
                {
                    try
                    {
                        if (_serviceProvider.GetServiceOrCreateInstance(handlerType) is IEventHandler<TEvent> handler)
                        {
                            handlerTasks.Add(handler.Handle(@event));
                        }
                    }
                    catch (Exception ex)
                    {
                        Logger.Error(ex, $"handle event [{_eventStore.GetEventKey<TEvent>()}] error, eventHandlerType:{handlerType.FullName}");
                    }
                }
                handlerTasks.WhenAll().ConfigureAwait(false);
    
                return true;
            }
            return false;
        }
    
        public bool Subscribe<TEvent, TEventHandler>()
            where TEvent : IEventBase
            where TEventHandler : IEventHandler<TEvent>
        {
            return _eventStore.AddSubscription<TEvent, TEventHandler>();
        }
    
        public bool Unsubscribe<TEvent, TEventHandler>()
            where TEvent : IEventBase
            where TEventHandler : IEventHandler<TEvent>
        {
            return _eventStore.RemoveSubscription<TEvent, TEventHandler>();
        }
    }
    

    项目实例

    来看一个实际的项目中的使用,在我的活动室预约项目中有一个公告的模块,访问公告详情页面,这个公告的访问次数加1,把这个访问次数加1改成了用 EventBus 来实现,实际项目代码:https://github.com/WeihanLi/ActivityReservation/blob/67e2cb8e92876629a7af6dc051745dd8c7e9faeb/ActivityReservation/Startup.cs

    1. 定义 Event 以及 EventHandler
    public class NoticeViewEvent : EventBase
    {
        public Guid NoticeId { get; set; }
    
        // UserId
        // IP
        // ...
    }
    
    public class NoticeViewEventHandler : IEventHandler<NoticeViewEvent>
    {
        public async Task Handle(NoticeViewEvent @event)
        {
            await DependencyResolver.Current.TryInvokeServiceAsync<ReservationDbContext>(async dbContext =>
            {
                var notice = await dbContext.Notices.FindAsync(@event.NoticeId);
                notice.NoticeVisitCount += 1;
                await dbContext.SaveChangesAsync();
            });
        }
    }
    

    这里的 Event 只定义了一个 NoticeId ,其实也可以把请求信息如IP/UA等信息加进去,在 EventHandler里处理以便日后数据分析。

    1. 注册 EventBus 相关服务以及 EventHandlers
    services.AddSingleton<IEventBus, EventBus>();
    services.AddSingleton<IEventStore, EventStoreInMemory>();
    //register EventHandlers
    services.AddSingleton<NoticeViewEventHandler>();
    
    1. 订阅事件
    public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory, IEventBus eventBus)
    {
        eventBus.Subscribe<NoticeViewEvent, NoticeViewEventHandler>(); 
        // ...
    }
    
    1. 发布事件
    eventBus.Publish(new NoticeViewEvent { NoticeId = notice.NoticeId });
    

    Reference

  • 相关阅读:
    centos下安装nethogs
    Nginx+Tomcat反向代理利用certbot实现https
    Mysql主从复制读写分离
    短链接及关键字过滤ac自动机设计思路
    接口访问加密和限频方案
    算法笔记
    redis原理及使用
    memcached原理
    Spring中AOP的两种代理方式(Java动态代理和CGLIB代理-转载
    基于注解的Spring AOP的配置和使用--转载
  • 原文地址:https://www.cnblogs.com/weihanli/p/implement-a-simple-event-bus.html
Copyright © 2011-2022 走看看