第一步:事件总线和事件聚合器
【1】事件总线 IEventBus
IUnitOfWork.cs
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Keasy5.Infrastructure { /// <summary> /// 表示所有集成于该接口的类型都是Unit Of Work的一种实现。 /// </summary> /// <remarks>有关Unit Of Work的详细信息,请参见UnitOfWork模式:http://martinfowler.com/eaaCatalog/unitOfWork.html。 /// </remarks> public interface IUnitOfWork { /// <summary> /// 获得一个<see cref="System.Boolean"/>值, /// 该值表示当前的Unit Of Work是否支持Microsoft分布式事务处理机制。 /// </summary> bool DistributedTransactionSupported { get; } /// <summary> /// 获得一个<see cref="System.Boolean"/>值, /// 该值表述了当前的Unit Of Work事务是否已被提交。 /// </summary> bool Committed { get; } /// <summary> /// 提交当前的Unit Of Work事务。 /// </summary> void Commit(); /// <summary> /// 回滚当前的Unit Of Work事务。 /// </summary> void Rollback(); } }
IBus.cs
using System; using System.Collections.Generic; using System.Linq; using System.Text; using Keasy5.Infrastructure; namespace Keasy5.Events.Bus { /// <summary> /// Represents the message bus. /// </summary> public interface IBus : IUnitOfWork, IDisposable { Guid ID { get; } /// <summary> /// Publishes the specified message to the bus. /// </summary> /// <param name="message">The message to be published.</param> void Publish<TMessage>(TMessage message) where TMessage : class, IEvent; /// <summary> /// Publishes a collection of messages to the bus. /// </summary> /// <param name="messages">The messages to be published.</param> void Publish<TMessage>(IEnumerable<TMessage> messages) where TMessage : class, IEvent; /// <summary> /// Clears the published messages waiting for commit. /// </summary> void Clear(); } }
接口: IEventBus.cs
using System; using System.Collections.Generic; using System.Linq; using System.Text; namespace Keasy5.Events.Bus { public interface IEventBus : IBus { } }
实现类:EventBus
EventBus.cs
using System; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Text; using System.Threading; using System.Threading.Tasks; using Keasy5.Infrastructure; namespace Keasy5.Events.Bus { public class EventBus : DisposableObject, IEventBus { private readonly Guid id = Guid.NewGuid(); private readonly ThreadLocal<Queue<object>> messageQueue = new ThreadLocal<Queue<object>>(() => new Queue<object>()); private readonly IEventAggregator aggregator; private ThreadLocal<bool> committed = new ThreadLocal<bool>(() => true); private readonly MethodInfo publishMethod; /// <summary> /// /// </summary> /// <param name="aggregator"></param> /// <remarks> /// 参数IEventAggregator aggregator 的一个实现:EventAggregator<see cref="EventAggregator"/> /// </remarks> public EventBus(IEventAggregator aggregator) { this.aggregator = aggregator; //得到aggregator对象中的名为Publish的函数信息 // 1.相关资料:C# 反射泛型 // http://www.cnblogs.com/easy5weikai/p/3790589.html publishMethod = (from m in aggregator.GetType().GetMethods() let parameters = m.GetParameters() let methodName = m.Name where methodName == "Publish" && parameters != null && parameters.Length == 1 select m).First(); } protected override void Dispose(bool disposing) { if (disposing) { messageQueue.Dispose(); committed.Dispose(); } } #region IBus Members public void Publish<TMessage>(TMessage message) where TMessage : class, IEvent { messageQueue.Value.Enqueue(message); committed.Value = false; } public void Publish<TMessage>(IEnumerable<TMessage> messages) where TMessage : class, IEvent { foreach (var message in messages) Publish(message); } public void Clear() { messageQueue.Value.Clear(); committed.Value = true; } #endregion #region IUnitOfWork Members public bool DistributedTransactionSupported { get { return false; } } public bool Committed { get { return committed.Value; } } public void Commit() { while (messageQueue.Value.Count > 0) { var evnt = messageQueue.Value.Dequeue(); var evntType = evnt.GetType(); //设置aggregator的publish方法的参数类型, //子所以这样做,是因为存在重载,传人参数类型以确定哪个具体的方法。 var method = publishMethod.MakeGenericMethod(evntType); //调用aggregator对象的publish method.Invoke(aggregator, new object[] { evnt }); } committed.Value = true; } public void Rollback() { Clear(); } public Guid ID { get { return id; } } #endregion } }
【2】事件聚合器IEventAggregator
接口:IEventAggregator.cs
using System; using System.Collections.Generic; using System.Linq; using System.Text; namespace Keasy5.Events { public interface IEventAggregator { void Subscribe<TEvent>(IEventHandler<TEvent> domainEventHandler) where TEvent : class, IEvent; void Subscribe<TEvent>(IEnumerable<IEventHandler<TEvent>> domainEventHandlers) where TEvent : class, IEvent; void Subscribe<TEvent>(params IEventHandler<TEvent>[] domainEventHandlers) where TEvent : class, IEvent; void Subscribe<TEvent>(Action<TEvent> domainEventHandlerFunc) where TEvent : class, IEvent; void Subscribe<TEvent>(IEnumerable<Func<TEvent, bool>> domainEventHandlerFuncs) where TEvent : class, IEvent; void Subscribe<TEvent>(params Func<TEvent, bool>[] domainEventHandlerFuncs) where TEvent : class, IEvent; void Unsubscribe<TEvent>(IEventHandler<TEvent> domainEventHandler) where TEvent : class, IEvent; void Unsubscribe<TEvent>(IEnumerable<IEventHandler<TEvent>> domainEventHandlers) where TEvent : class, IEvent; void Unsubscribe<TEvent>(params IEventHandler<TEvent>[] domainEventHandlers) where TEvent : class, IEvent; void Unsubscribe<TEvent>(Action<TEvent> domainEventHandlerFunc) where TEvent : class, IEvent; void Unsubscribe<TEvent>(IEnumerable<Func<TEvent, bool>> domainEventHandlerFuncs) where TEvent : class, IEvent; void Unsubscribe<TEvent>(params Func<TEvent, bool>[] domainEventHandlerFuncs) where TEvent : class, IEvent; void UnsubscribeAll<TEvent>() where TEvent : class, IEvent; void UnsubscribeAll(); IEnumerable<IEventHandler<TEvent>> GetSubscriptions<TEvent>() where TEvent : class, IEvent; void Publish<TEvent>(TEvent domainEvent) where TEvent : class, IEvent; void Publish<TEvent>(TEvent domainEvent, Action<TEvent, bool, Exception> callback, TimeSpan? timeout = null) where TEvent : class, IEvent; } }
实现类:EventAggregator
EventAggregator.cs
using System; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Text; using System.Threading.Tasks; namespace Keasy5.Events { public class EventAggregator : IEventAggregator { #region private property private readonly object sync = new object(); private readonly Dictionary<Type, List<object>> eventHandlers = new Dictionary<Type, List<object>>(); private readonly MethodInfo registerEventHandlerMethod; private readonly Func<object, object, bool> eventHandlerEquals = (o1, o2) => { var o1Type = o1.GetType(); var o2Type = o2.GetType(); if (o1Type.IsGenericType && o1Type.GetGenericTypeDefinition() == typeof(ActionDelegatedEventHandler<>) && o2Type.IsGenericType && o2Type.GetGenericTypeDefinition() == typeof(ActionDelegatedEventHandler<>)) return o1.Equals(o2); return o1Type == o2Type; }; // checks if the two event handlers are equal. if the event handler is an action-delegated, just simply // compare the two with the object.Equals override (since it was overriden by comparing the two delegates. Otherwise, // the type of the event handler will be used because we don't need to register the same type of the event handler // more than once for each specific event. #endregion #region Ctor public EventAggregator() { registerEventHandlerMethod = (from p in this.GetType().GetMethods() let methodName = p.Name let parameters = p.GetParameters() where methodName == "Subscribe" && parameters != null && parameters.Length == 1 && parameters[0].ParameterType.GetGenericTypeDefinition() == typeof(IEventHandler<>) select p).First(); } /// <summary> /// /// </summary> /// <param name="handlers"></param> /// <remarks> /// 1.相关资料:C# 反射泛型 /// http://www.cnblogs.com/easy5weikai/p/3790589.html /// 2. 依赖注入: /// <!--Event Aggregator--> /// <register type="Keasy5.Events.IEventAggregator, Keasy5.Events" mapTo="Keasy5.Events.EventAggregator, Keasy5.Events"> /// <constructor> /// <param name="handlers"> /// <array> /// <dependency name="orderDispatchedSendEmailHandler" type="Keasy5.Events.IEventHandler`1[[Keasy5.Domain.Events.OrderDispatchedEvent, Keasy5.Domain]], Keasy5.Events" /> /// <dependency name="orderConfirmedSendEmailHandler" type="Keasy5.Events.IEventHandler`1[[Keasy5.Domain.Events.OrderConfirmedEvent, Keasy5.Domain]], Keasy5.Events" /> /// </array> /// </param> /// </constructor> /// </register> /// </remarks> public EventAggregator(object[] handlers) : this() { foreach (var obj in handlers) { var type = obj.GetType(); var implementedInterfaces = type.GetInterfaces(); foreach (var implementedInterface in implementedInterfaces) { if (implementedInterface.IsGenericType && implementedInterface.GetGenericTypeDefinition() == typeof(IEventHandler<>)) { var eventType = implementedInterface.GetGenericArguments().First(); var method = registerEventHandlerMethod.MakeGenericMethod(eventType); method.Invoke(this, new object[] { obj }); } } } } #endregion #region interface IEventAggregator members public void Subscribe<TEvent>(IEventHandler<TEvent> eventHandler) where TEvent : class, IEvent { lock (sync) { var eventType = typeof(TEvent); if (eventHandlers.ContainsKey(eventType)) { var handlers = eventHandlers[eventType]; if (handlers != null) { if (!handlers.Exists(deh => eventHandlerEquals(deh, eventHandler))) handlers.Add(eventHandler); } else { handlers = new List<object>(); handlers.Add(eventHandler); } } else eventHandlers.Add(eventType, new List<object> { eventHandler }); } } public void Subscribe<TEvent>(IEnumerable<IEventHandler<TEvent>> eventHandlers) where TEvent : class, IEvent { foreach (var eventHandler in eventHandlers) Subscribe<TEvent>(eventHandler); } public void Subscribe<TEvent>(params IEventHandler<TEvent>[] eventHandlers) where TEvent : class, IEvent { foreach (var eventHandler in eventHandlers) Subscribe<TEvent>(eventHandler); } public void Subscribe<TEvent>(Action<TEvent> eventHandlerFunc) where TEvent : class, IEvent { Subscribe<TEvent>(new ActionDelegatedEventHandler<TEvent>(eventHandlerFunc)); } public void Subscribe<TEvent>(IEnumerable<Func<TEvent, bool>> eventHandlerFuncs) where TEvent : class, IEvent { foreach (var eventHandlerFunc in eventHandlerFuncs) Subscribe<TEvent>(eventHandlerFunc); } public void Subscribe<TEvent>(params Func<TEvent, bool>[] eventHandlerFuncs) where TEvent : class, IEvent { foreach (var eventHandlerFunc in eventHandlerFuncs) Subscribe<TEvent>(eventHandlerFunc); } public void Unsubscribe<TEvent>(IEventHandler<TEvent> eventHandler) where TEvent : class, IEvent { lock (sync) { var eventType = typeof(TEvent); if (eventHandlers.ContainsKey(eventType)) { var handlers = eventHandlers[eventType]; if (handlers != null && handlers.Exists(deh => eventHandlerEquals(deh, eventHandler))) { var handlerToRemove = handlers.First(deh => eventHandlerEquals(deh, eventHandler)); handlers.Remove(handlerToRemove); } } } } public void Unsubscribe<TEvent>(IEnumerable<IEventHandler<TEvent>> eventHandlers) where TEvent : class, IEvent { foreach (var eventHandler in eventHandlers) Unsubscribe<TEvent>(eventHandler); } public void Unsubscribe<TEvent>(params IEventHandler<TEvent>[] eventHandlers) where TEvent : class, IEvent { foreach (var eventHandler in eventHandlers) Unsubscribe<TEvent>(eventHandler); } public void Unsubscribe<TEvent>(Action<TEvent> eventHandlerFunc) where TEvent : class, IEvent { Unsubscribe<TEvent>(new ActionDelegatedEventHandler<TEvent>(eventHandlerFunc)); } public void Unsubscribe<TEvent>(IEnumerable<Func<TEvent, bool>> eventHandlerFuncs) where TEvent : class, IEvent { foreach (var eventHandlerFunc in eventHandlerFuncs) Unsubscribe<TEvent>(eventHandlerFunc); } public void Unsubscribe<TEvent>(params Func<TEvent, bool>[] eventHandlerFuncs) where TEvent : class, IEvent { foreach (var eventHandlerFunc in eventHandlerFuncs) Unsubscribe<TEvent>(eventHandlerFunc); } public void UnsubscribeAll<TEvent>() where TEvent : class, IEvent { lock (sync) { var eventType = typeof(TEvent); if (eventHandlers.ContainsKey(eventType)) { var handlers = eventHandlers[eventType]; if (handlers != null) handlers.Clear(); } } } public void UnsubscribeAll() { lock (sync) { eventHandlers.Clear(); } } public IEnumerable<IEventHandler<TEvent>> GetSubscriptions<TEvent>() where TEvent : class, IEvent { var eventType = typeof(TEvent); if (eventHandlers.ContainsKey(eventType)) { var handlers = eventHandlers[eventType]; if (handlers != null) return handlers.Select(p => p as IEventHandler<TEvent>).ToList(); else return null; } else return null; } public void Publish<TEvent>(TEvent evnt) where TEvent : class, IEvent { if (evnt == null) throw new ArgumentNullException("evnt"); var eventType = evnt.GetType(); if (eventHandlers.ContainsKey(eventType) && eventHandlers[eventType] != null && eventHandlers[eventType].Count > 0) { var handlers = eventHandlers[eventType]; foreach (var handler in handlers) { var eventHandler = handler as IEventHandler<TEvent>; if (eventHandler == null) throw new ArgumentNullException("eventHandler"); if (eventHandler.GetType().IsDefined(typeof(HandlesAsynchronouslyAttribute), false)) { Task.Factory.StartNew((o) => eventHandler.Handle((TEvent)o), evnt); } else { eventHandler.Handle(evnt); } } } } public void Publish<TEvent>(TEvent evnt, Action<TEvent, bool, Exception> callback, TimeSpan? timeout = null) where TEvent : class, IEvent { if (evnt == null) throw new ArgumentNullException("evnt"); var eventType = evnt.GetType(); if (eventHandlers.ContainsKey(eventType) && eventHandlers[eventType] != null && eventHandlers[eventType].Count > 0) { var handlers = eventHandlers[eventType]; List<Task> tasks = new List<Task>(); try { foreach (var handler in handlers) { var eventHandler = handler as IEventHandler<TEvent>; if (eventHandler.GetType().IsDefined(typeof(HandlesAsynchronouslyAttribute), false)) { tasks.Add(Task.Factory.StartNew((o) => eventHandler.Handle((TEvent)o), evnt)); } else { eventHandler.Handle(evnt); } } if (tasks.Count > 0) { if (timeout == null) Task.WaitAll(tasks.ToArray()); else Task.WaitAll(tasks.ToArray(), timeout.Value); } callback(evnt, true, null); } catch (Exception ex) { callback(evnt, false, ex); } } else callback(evnt, false, null); } #endregion } }
第二步:使用:
private readonly IEventBus eventBus; 。。。。 eventBus.Publish<OrderDispatchedEvent>(evnt);