zoukankan      html  css  js  c++  java
  • 【DDD-Apwork框架】事件总线和事件聚合器

    第一步:事件总线和事件聚合器

       【1】事件总线 IEventBus

       IUnitOfWork.cs

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace Keasy5.Infrastructure
    {
        /// <summary>
        /// 表示所有集成于该接口的类型都是Unit Of Work的一种实现。
        /// </summary>
        /// <remarks>有关Unit Of Work的详细信息,请参见UnitOfWork模式:http://martinfowler.com/eaaCatalog/unitOfWork.html/// </remarks>
        public interface IUnitOfWork
        {
            /// <summary>
            /// 获得一个<see cref="System.Boolean"/>值,
            /// 该值表示当前的Unit Of Work是否支持Microsoft分布式事务处理机制。
            /// </summary>
            bool DistributedTransactionSupported { get; }
            /// <summary>
            /// 获得一个<see cref="System.Boolean"/>值,
            /// 该值表述了当前的Unit Of Work事务是否已被提交。
            /// </summary>
            bool Committed { get; }
            /// <summary>
            /// 提交当前的Unit Of Work事务。
            /// </summary>
            void Commit();
            /// <summary>
            /// 回滚当前的Unit Of Work事务。
            /// </summary>
            void Rollback();
        }
    }
    View Code

       IBus.cs

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using Keasy5.Infrastructure;
    
    namespace Keasy5.Events.Bus
    {
        /// <summary>
        /// Represents the message bus.
        /// </summary>
        public interface IBus : IUnitOfWork, IDisposable
        {
            Guid ID { get; }
            /// <summary>
            /// Publishes the specified message to the bus.
            /// </summary>
            /// <param name="message">The message to be published.</param>
            void Publish<TMessage>(TMessage message)
                where TMessage : class, IEvent;
            /// <summary>
            /// Publishes a collection of messages to the bus.
            /// </summary>
            /// <param name="messages">The messages to be published.</param>
            void Publish<TMessage>(IEnumerable<TMessage> messages)
                where TMessage : class, IEvent;
            /// <summary>
            /// Clears the published messages waiting for commit.
            /// </summary>
            void Clear();
        }
    }
    View Code

      接口: IEventBus.cs

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    
    namespace Keasy5.Events.Bus
    {
        public interface IEventBus : IBus
        {
        }
    }
    View Code

    实现类:EventBus

    EventBus.cs

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Reflection;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using Keasy5.Infrastructure;
    
    namespace Keasy5.Events.Bus
    {
        public class EventBus : DisposableObject, IEventBus
        {
            private readonly Guid id = Guid.NewGuid();
            private readonly ThreadLocal<Queue<object>> messageQueue = new ThreadLocal<Queue<object>>(() => new Queue<object>());
            private readonly IEventAggregator aggregator;
            private ThreadLocal<bool> committed = new ThreadLocal<bool>(() => true);
            private readonly MethodInfo publishMethod;
    
            /// <summary>
            /// 
            /// </summary>
            /// <param name="aggregator"></param>
            /// <remarks>
            ///    参数IEventAggregator aggregator 的一个实现:EventAggregator<see cref="EventAggregator"/>
            /// </remarks>
            public EventBus(IEventAggregator aggregator)
            {
    
                this.aggregator = aggregator;
    
                //得到aggregator对象中的名为Publish的函数信息
                // 1.相关资料:C# 反射泛型
                //    http://www.cnblogs.com/easy5weikai/p/3790589.html
                publishMethod = (from m in aggregator.GetType().GetMethods()
                                 let parameters = m.GetParameters()
                                 let methodName = m.Name
                                 where methodName == "Publish" &&
                                 parameters != null &&
                                 parameters.Length == 1
                                 select m).First();
            }
    
            protected override void Dispose(bool disposing)
            {
                if (disposing)
                {
                    messageQueue.Dispose();
                    committed.Dispose();
                }
            }
    
            #region IBus Members
    
            public void Publish<TMessage>(TMessage message)
                where TMessage : class, IEvent
            {
                messageQueue.Value.Enqueue(message);
                committed.Value = false;
            }
    
            public void Publish<TMessage>(IEnumerable<TMessage> messages)
                where TMessage : class, IEvent
            {
                foreach (var message in messages)
                    Publish(message);
            }
    
            public void Clear()
            {
                messageQueue.Value.Clear();
                committed.Value = true;
            }
    
            #endregion
    
            #region IUnitOfWork Members
    
            public bool DistributedTransactionSupported
            {
                get { return false; }
            }
    
            public bool Committed
            {
                get { return committed.Value; }
            }
    
            public void Commit()
            {
                while (messageQueue.Value.Count > 0)
                {
                    var evnt = messageQueue.Value.Dequeue();
                    var evntType = evnt.GetType();
                    //设置aggregator的publish方法的参数类型,
                    //子所以这样做,是因为存在重载,传人参数类型以确定哪个具体的方法。
                    var method = publishMethod.MakeGenericMethod(evntType);
                    //调用aggregator对象的publish
                    method.Invoke(aggregator, new object[] { evnt });
                }
                committed.Value = true;
            }
    
            public void Rollback()
            {
                Clear();
            }
    
            public Guid ID
            {
                get { return id; }
            }
    
            #endregion
        }
    }
    View Code

       【2】事件聚合器IEventAggregator

    接口:IEventAggregator.cs

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    
    namespace Keasy5.Events
    {
        public interface IEventAggregator
        {
            void Subscribe<TEvent>(IEventHandler<TEvent> domainEventHandler)
                where TEvent : class, IEvent;
            void Subscribe<TEvent>(IEnumerable<IEventHandler<TEvent>> domainEventHandlers)
                where TEvent : class, IEvent;
            void Subscribe<TEvent>(params IEventHandler<TEvent>[] domainEventHandlers)
                where TEvent : class, IEvent;
            void Subscribe<TEvent>(Action<TEvent> domainEventHandlerFunc)
                where TEvent : class, IEvent;
            void Subscribe<TEvent>(IEnumerable<Func<TEvent, bool>> domainEventHandlerFuncs)
                where TEvent : class, IEvent;
            void Subscribe<TEvent>(params Func<TEvent, bool>[] domainEventHandlerFuncs)
                where TEvent : class, IEvent;
            void Unsubscribe<TEvent>(IEventHandler<TEvent> domainEventHandler)
                where TEvent : class, IEvent;
            void Unsubscribe<TEvent>(IEnumerable<IEventHandler<TEvent>> domainEventHandlers)
                where TEvent : class, IEvent;
            void Unsubscribe<TEvent>(params IEventHandler<TEvent>[] domainEventHandlers)
                where TEvent : class, IEvent;
            void Unsubscribe<TEvent>(Action<TEvent> domainEventHandlerFunc)
                where TEvent : class, IEvent;
            void Unsubscribe<TEvent>(IEnumerable<Func<TEvent, bool>> domainEventHandlerFuncs)
                where TEvent : class, IEvent;
            void Unsubscribe<TEvent>(params Func<TEvent, bool>[] domainEventHandlerFuncs)
                where TEvent : class, IEvent;
            void UnsubscribeAll<TEvent>()
                where TEvent : class, IEvent;
            void UnsubscribeAll();
            IEnumerable<IEventHandler<TEvent>> GetSubscriptions<TEvent>()
                where TEvent : class, IEvent;
            void Publish<TEvent>(TEvent domainEvent)
                where TEvent : class, IEvent;
            void Publish<TEvent>(TEvent domainEvent, Action<TEvent, bool, Exception> callback, TimeSpan? timeout = null)
                where TEvent : class, IEvent;
        }
        
    }
    View Code

     

    实现类:EventAggregator

    EventAggregator.cs

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Reflection;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace Keasy5.Events
    {
        public class EventAggregator : IEventAggregator
        {
            #region private property
            private readonly object sync = new object();
            private readonly Dictionary<Type, List<object>> eventHandlers = new Dictionary<Type, List<object>>();
            private readonly MethodInfo registerEventHandlerMethod;
            private readonly Func<object, object, bool> eventHandlerEquals = (o1, o2) =>
            {
                var o1Type = o1.GetType();
                var o2Type = o2.GetType();
                if (o1Type.IsGenericType &&
                    o1Type.GetGenericTypeDefinition() == typeof(ActionDelegatedEventHandler<>) &&
                    o2Type.IsGenericType &&
                    o2Type.GetGenericTypeDefinition() == typeof(ActionDelegatedEventHandler<>))
                    return o1.Equals(o2);
                return o1Type == o2Type;
            }; // checks if the two event handlers are equal. if the event handler is an action-delegated, just simply
            // compare the two with the object.Equals override (since it was overriden by comparing the two delegates. Otherwise,
            // the type of the event handler will be used because we don't need to register the same type of the event handler
            // more than once for each specific event. 
            #endregion
    
            #region Ctor
    
            public EventAggregator()
            {
                registerEventHandlerMethod = (from p in this.GetType().GetMethods()
                                              let methodName = p.Name
                                              let parameters = p.GetParameters()
                                              where methodName == "Subscribe" &&
                                              parameters != null &&
                                              parameters.Length == 1 &&
                                              parameters[0].ParameterType.GetGenericTypeDefinition() == typeof(IEventHandler<>)
                                              select p).First();
            }
    
            /// <summary>
            /// 
            /// </summary>
            /// <param name="handlers"></param>
            /// <remarks>
            /// 1.相关资料:C# 反射泛型
            ///    http://www.cnblogs.com/easy5weikai/p/3790589.html
            /// 2.    依赖注入:
            ///       <!--Event Aggregator-->
            ///          <register type="Keasy5.Events.IEventAggregator, Keasy5.Events" mapTo="Keasy5.Events.EventAggregator, Keasy5.Events">
            ///            <constructor>
            ///              <param name="handlers">
            ///                <array>
            ///                  <dependency name="orderDispatchedSendEmailHandler" type="Keasy5.Events.IEventHandler`1[[Keasy5.Domain.Events.OrderDispatchedEvent, Keasy5.Domain]], Keasy5.Events" />
            ///                  <dependency name="orderConfirmedSendEmailHandler" type="Keasy5.Events.IEventHandler`1[[Keasy5.Domain.Events.OrderConfirmedEvent, Keasy5.Domain]], Keasy5.Events" />
            ///                </array>
            ///              </param>
            ///            </constructor>
            ///          </register>
            /// </remarks>
            public EventAggregator(object[] handlers)
                : this()
            {
                foreach (var obj in handlers)
                {
                    var type = obj.GetType();
                    var implementedInterfaces = type.GetInterfaces();
                    foreach (var implementedInterface in implementedInterfaces)
                    {
                        if (implementedInterface.IsGenericType &&
                            implementedInterface.GetGenericTypeDefinition() == typeof(IEventHandler<>))
                        {
                            var eventType = implementedInterface.GetGenericArguments().First();
                            var method = registerEventHandlerMethod.MakeGenericMethod(eventType);
                            method.Invoke(this, new object[] { obj });
                        }
                    }
                }
            } 
            #endregion
    
            #region interface IEventAggregator members
            public void Subscribe<TEvent>(IEventHandler<TEvent> eventHandler) 
                where TEvent : class, IEvent
            {
                lock (sync)
                {
                    var eventType = typeof(TEvent);
                    if (eventHandlers.ContainsKey(eventType))
                    {
                        var handlers = eventHandlers[eventType];
                        if (handlers != null)
                        {
                            if (!handlers.Exists(deh => eventHandlerEquals(deh, eventHandler)))
                                handlers.Add(eventHandler);
                        }
                        else
                        {
                            handlers = new List<object>();
                            handlers.Add(eventHandler);
                        }
                    }
                    else
                        eventHandlers.Add(eventType, new List<object> { eventHandler });
                }
            }
    
            public void Subscribe<TEvent>(IEnumerable<IEventHandler<TEvent>> eventHandlers)
                where TEvent : class, IEvent
            {
                foreach (var eventHandler in eventHandlers)
                    Subscribe<TEvent>(eventHandler);
            }
    
            public void Subscribe<TEvent>(params IEventHandler<TEvent>[] eventHandlers)
                where TEvent : class, IEvent
            {
                foreach (var eventHandler in eventHandlers)
                    Subscribe<TEvent>(eventHandler);
            }
    
            public void Subscribe<TEvent>(Action<TEvent> eventHandlerFunc)
                where TEvent : class, IEvent
            {
                Subscribe<TEvent>(new ActionDelegatedEventHandler<TEvent>(eventHandlerFunc));
            }
    
            public void Subscribe<TEvent>(IEnumerable<Func<TEvent, bool>> eventHandlerFuncs)
                where TEvent : class, IEvent
            {
                foreach (var eventHandlerFunc in eventHandlerFuncs)
                    Subscribe<TEvent>(eventHandlerFunc);
            }
    
            public void Subscribe<TEvent>(params Func<TEvent, bool>[] eventHandlerFuncs)
                where TEvent : class, IEvent
            {
                foreach (var eventHandlerFunc in eventHandlerFuncs)
                    Subscribe<TEvent>(eventHandlerFunc);
            }
    
            public void Unsubscribe<TEvent>(IEventHandler<TEvent> eventHandler)
                where TEvent : class, IEvent
            {
                lock (sync)
                {
                    var eventType = typeof(TEvent);
                    if (eventHandlers.ContainsKey(eventType))
                    {
                        var handlers = eventHandlers[eventType];
                        if (handlers != null &&
                            handlers.Exists(deh => eventHandlerEquals(deh, eventHandler)))
                        {
                            var handlerToRemove = handlers.First(deh => eventHandlerEquals(deh, eventHandler));
                            handlers.Remove(handlerToRemove);
                        }
                    }
                }
            }
    
            public void Unsubscribe<TEvent>(IEnumerable<IEventHandler<TEvent>> eventHandlers)
                where TEvent : class, IEvent
            {
                foreach (var eventHandler in eventHandlers)
                    Unsubscribe<TEvent>(eventHandler);
            }
    
            public void Unsubscribe<TEvent>(params IEventHandler<TEvent>[] eventHandlers)
                where TEvent : class, IEvent
            {
                foreach (var eventHandler in eventHandlers)
                    Unsubscribe<TEvent>(eventHandler);
            }
    
            public void Unsubscribe<TEvent>(Action<TEvent> eventHandlerFunc)
                where TEvent : class, IEvent
            {
                Unsubscribe<TEvent>(new ActionDelegatedEventHandler<TEvent>(eventHandlerFunc));
            }
    
            public void Unsubscribe<TEvent>(IEnumerable<Func<TEvent, bool>> eventHandlerFuncs)
                where TEvent : class, IEvent
            {
                foreach (var eventHandlerFunc in eventHandlerFuncs)
                    Unsubscribe<TEvent>(eventHandlerFunc);
            }
    
            public void Unsubscribe<TEvent>(params Func<TEvent, bool>[] eventHandlerFuncs)
                where TEvent : class, IEvent
            {
                foreach (var eventHandlerFunc in eventHandlerFuncs)
                    Unsubscribe<TEvent>(eventHandlerFunc);
            }
    
            public void UnsubscribeAll<TEvent>()
                where TEvent : class, IEvent
            {
                lock (sync)
                {
                    var eventType = typeof(TEvent);
                    if (eventHandlers.ContainsKey(eventType))
                    {
                        var handlers = eventHandlers[eventType];
                        if (handlers != null)
                            handlers.Clear();
                    }
                }
            }
    
            public void UnsubscribeAll()
            {
                lock (sync)
                {
                    eventHandlers.Clear();
                }
            }
    
            public IEnumerable<IEventHandler<TEvent>> GetSubscriptions<TEvent>()
                where TEvent : class, IEvent
            {
                var eventType = typeof(TEvent);
                if (eventHandlers.ContainsKey(eventType))
                {
                    var handlers = eventHandlers[eventType];
                    if (handlers != null)
                        return handlers.Select(p => p as IEventHandler<TEvent>).ToList();
                    else
                        return null;
                }
                else
                    return null;
            }
    
            public void Publish<TEvent>(TEvent evnt)
                where TEvent : class, IEvent
            {
                if (evnt == null)
                    throw new ArgumentNullException("evnt");
                var eventType = evnt.GetType();
                if (eventHandlers.ContainsKey(eventType) &&
                    eventHandlers[eventType] != null &&
                    eventHandlers[eventType].Count > 0)
                {
                    var handlers = eventHandlers[eventType];
                    foreach (var handler in handlers)
                    {
                        var eventHandler = handler as IEventHandler<TEvent>;
                        if (eventHandler == null)
                            throw new ArgumentNullException("eventHandler");
                        if (eventHandler.GetType().IsDefined(typeof(HandlesAsynchronouslyAttribute), false))
                        {
                            Task.Factory.StartNew((o) => eventHandler.Handle((TEvent)o), evnt);
                        }
                        else
                        {
                            eventHandler.Handle(evnt);
                        }
                    }
                }
            }
    
            public void Publish<TEvent>(TEvent evnt,
                Action<TEvent, bool, Exception> callback,
                TimeSpan? timeout = null)
                where TEvent : class, IEvent
            {
                if (evnt == null)
                    throw new ArgumentNullException("evnt");
                var eventType = evnt.GetType();
                if (eventHandlers.ContainsKey(eventType) &&
                    eventHandlers[eventType] != null &&
                    eventHandlers[eventType].Count > 0)
                {
                    var handlers = eventHandlers[eventType];
                    List<Task> tasks = new List<Task>();
                    try
                    {
                        foreach (var handler in handlers)
                        {
                            var eventHandler = handler as IEventHandler<TEvent>;
                            if (eventHandler.GetType().IsDefined(typeof(HandlesAsynchronouslyAttribute), false))
                            {
                                tasks.Add(Task.Factory.StartNew((o) => eventHandler.Handle((TEvent)o), evnt));
                            }
                            else
                            {
                                eventHandler.Handle(evnt);
                            }
                        }
                        if (tasks.Count > 0)
                        {
                            if (timeout == null)
                                Task.WaitAll(tasks.ToArray());
                            else
                                Task.WaitAll(tasks.ToArray(), timeout.Value);
                        }
                        callback(evnt, true, null);
                    }
                    catch (Exception ex)
                    {
                        callback(evnt, false, ex);
                    }
                }
                else
                    callback(evnt, false, null);
            } 
            #endregion
        }
    }
    View Code

    第二步:使用:

    private readonly IEventBus eventBus;
    。。。。
    eventBus.Publish<OrderDispatchedEvent>(evnt);
  • 相关阅读:
    Structured streaming
    streaming窗口操作
    scala伴生对象,apply()及单例
    storm集成kafka
    solr简易安装配置
    拦路雨偏似雪花,饮泣的你冻吗?--稍瑞,我是关键字过滤器
    我存在,你深深的循环里--从反射看JSON死循环
    ueditor:原谅我这一生不羁放纵爱独特
    或许你不知道(2):LinkedList
    自定义负载均衡
  • 原文地址:https://www.cnblogs.com/easy5weikai/p/3791005.html
Copyright © 2011-2022 走看看