结合DI,实现发布者与订阅者的解耦,属于本次事务的对象主体不应定义为订阅者,因为订阅者不应与发布者产生任何关联
一、发布者订阅者模式
发布者发出一个事件主题,一个或多个订阅者接收这个事件,中间通过事件总线通讯(消息队列),并且发布者与订阅者这两者间是无状态的,根据产品实际场景需要,可以自己实现单机单点的发布订阅,也可选择使用目前流行的分布式消息中间件:
RabbitMQ、ActiveMQ、RocketMQ、kafka等
二、观察者与订阅者的区别
观察者与业务主体是耦合的,并且是即时通知的;订阅者与业务主体完全解耦,只通过中间的信息通道通知,互相不知道对方的存在,可以是同步也可以是异步
三、具体实现
本文主要讲解单点模式,有需要随时可以扩展为分布式方案
发布者接口:
1 namespace Xms.Event.Abstractions 2 { 3 /// <summary> 4 /// 事件发布接口 5 /// </summary> 6 public interface IEventPublisher 7 { 8 /// <summary> 9 /// 发布事件 10 /// </summary> 11 /// <typeparam name="TEvent">事件类型</typeparam> 12 /// <param name="e"></param> 13 void Publish<TEvent>(TEvent e); 14 } 15 }
发布者实现:
1 using System; 2 using System.Linq; 3 using Xms.Event.Abstractions; 4 using Xms.Infrastructure.Inject; 5 using Xms.Logging.AppLog; 6 7 namespace Xms.Event 8 { 9 /// <summary> 10 /// 事件发布者 11 /// </summary> 12 public class EventPublisher : IEventPublisher 13 { 14 private readonly ILogService _logService; 15 private readonly IServiceResolver _serviceResolver; 16 17 public EventPublisher(ILogService logService 18 , IServiceResolver serviceResolver) 19 { 20 _logService = logService; 21 _serviceResolver = serviceResolver; 22 } 23 24 #region Methods 25 26 /// <summary> 27 /// 发布事件 28 /// </summary> 29 /// <typeparam name="TEvent">事件类</typeparam> 30 /// <param name="e">事件对象</param> 31 public virtual void Publish<TEvent>(TEvent e) 32 { 33 //获取所有事件接收者 34 var consumers = _serviceResolver.GetAll<IConsumer<TEvent>>().ToList(); 35 foreach (var consumer in consumers) 36 { 37 try 38 { 39 //处理事件 40 consumer.HandleEvent(e); 41 } 42 catch (Exception exception) 43 { 44 _logService.Error(exception); 45 } 46 } 47 } 48 49 #endregion Methods 50 } 51 }
订阅(消费)者接口:
1 namespace Xms.Event.Abstractions 2 { 3 /// <summary> 4 /// 事件接收接口 5 /// </summary> 6 /// <typeparam name="T"></typeparam> 7 public interface IConsumer<T> 8 { 9 /// <summary> 10 /// 处理事件 11 /// </summary> 12 /// <param name="eventMessage">事件</param> 13 void HandleEvent(T eventMessage); 14 } 15 }
事件(消息):
这里只给出一个作为示例,实际上一般会有记录的:“创建”、“修改”、“删除”,流程相关的:“发起审批”、“审批通过”、“审批完成”等等
1 namespace Xms.Flow.Core.Events 2 { 3 /// <summary> 4 /// 工作流启动后事件 5 /// </summary> 6 public class WorkFlowStartedEvent 7 { 8 public WorkFlowStartUpContext Context { get; set; } 9 public WorkFlowExecutionResult Result { get; set; } 10 } 11 }
服务注册:
详细实现回看.netcore之DI批量注入(支持泛型)
1 using Microsoft.Extensions.Configuration; 2 using Microsoft.Extensions.DependencyInjection; 3 using Xms.Core; 4 using Xms.Infrastructure.Inject; 5 6 namespace Xms.Event 7 { 8 /// <summary> 9 /// 事件模块服务注册 10 /// </summary> 11 public class ServiceRegistrar : IServiceRegistrar 12 { 13 public int Order => 1; 14 15 public void Add(IServiceCollection services, IConfiguration configuration) 16 { 17 //event publisher 18 services.AddScoped<Event.Abstractions.IEventPublisher, Event.EventPublisher>(); 19 //event consumers 20 services.RegisterScope(typeof(Event.Abstractions.IConsumer<>)); 21 } 22 } 23 }
四、应用场景
比如在工作流启动审批后发送通知
1 using System.Collections.Generic; 2 using Xms.Context; 3 using Xms.Event.Abstractions; 4 using Xms.Flow.Core.Events; 5 using Xms.Infrastructure.Utility; 6 using Xms.Localization.Abstractions; 7 using Xms.Notify.Abstractions; 8 using Xms.Notify.Internal; 9 10 namespace Xms.EventConsumers.Notify 11 { 12 /// <summary> 13 /// 工作流启动审批后发送通知 14 /// </summary> 15 public class WorkflowStartedNotify : IConsumer<WorkFlowStartedEvent> 16 { 17 private readonly IAppContext _appContext; 18 private readonly ILocalizedTextProvider _loc; 19 private readonly IEnumerable<INotify> _notifies; 20 21 public WorkflowStartedNotify(IAppContext appContext 22 , IEnumerable<INotify> notifies) 23 { 24 _appContext = appContext; 25 _loc = _appContext.GetFeature<ILocalizedTextProvider>(); 26 _notifies = notifies; 27 } 28 public void HandleEvent(WorkFlowStartedEvent eventMessage) 29 { 30 //当前节点处理人 31 foreach (var handlerId in eventMessage.Result.NextHandlerId) 32 { 33 //通知方式:微信、短信、邮件、系统消息等 34 var msg = _loc["workflow_newtasknotify"].FormatWith(eventMessage.Context.EntityMetaData.LocalizedName); 35 //发送消息 36 foreach (var notifier in _notifies) 37 { 38 notifier.Send(new InternalNotifyBody() 39 { 40 TypeCode = 2 41 , 42 Subject = msg 43 , 44 Content = "到你审批了,快到碗里来" 45 , 46 ToUserId = handlerId 47 , 48 LinkTo = "/entity/create?entityid=" + eventMessage.Context.EntityMetaData.EntityId + "&recordid=" + eventMessage.Context.ObjectId 49 }); 50 } 51 } 52 } 53 } 54 }
四、总结
前面讲解了订阅者模式的基本概念及与观察者的区别,后面展示了具体实现及实际应用场景,大家记住一点就行,这些设计模式最终都是为了达到解藕的目的,要查看完整代码,请回到这一章