zoukankan      html  css  js  c++  java
  • CQRS架构中同步服务的一种实现方式

    概述

    同步服务(Synchronization Service)在CQRS架构中有着重要的作用。它通过访问事件总线来读取事件数据,并对事件进行派发。应用程序会向同步服务注册事件处理器,以便同步服务在派发事件的过程中,能够通过事件处理器对事件进行处理。在此,我将针对“查询数据库的同步”这一基本的CQRS应用场景,来给出一种最简单的同步服务实现方式。

    image

    回顾一下CQRS架构,在《EntityFramework之领域驱动设计实践【扩展阅读】:CQRS体系结构模式》一文中,我给出了一个简单的CQRS架构模型图,在该图的“事件总线(Event Bus)”与“查询数据库(Query Database)”之间,有一个Denormalizers/Synchronizers的组件,它负责侦听事件总线,并将事件数据同步到查询数据库中。在具体实现上,Denormalizer/Synchronizer通常会以服务(Service)的方式存在,也就是这里所说的“同步服务”。同步服务的实现是多样化的,基本上也都是按照项目和应用程序的具体情况进行设计,不过大体上离不开两种方式,即主动方式和被动方式。主动方式就是同步服务主动监视事件总线,发现有事件到达后便读取事件数据然后更新查询数据库;被动方式则是由事件发起方(通常是领域仓储,或者是基础结构设施所支持的通知服务等)负责通知同步服务,服务接到通知后,再到事件总线获取数据。其实两种方式都各有利弊,主动方式需要定期对事件总线做查询,这个“定期”的度就不太好把握,时间间隔太短会影响性能,间隔太长又会影响实时性;被动方式避免了定期查询带来的系统开销,但同时又加大了Command部分与Query部分之间的耦合,它需要依靠一些技术手段(比如WCF)来实现进程间通信,又或者,还需要利用一些位于基础结构层的系统组件(比如MSMQ Trigger)。我的一个想法是,在实际项目中根据情况对事件进行路由,分别结合两种方式实现事件派发与数据同步,当然,这只是我的一个设想,并没有真正实践过。

    之前,我发布了Apworks开发框架的Alpha版本(地址:http://apworks.codeplex.com),同时也针对这个版本发布了一个基于CQRS架构的演示案例:TinyLibrary CQRS(地址:http://tlibcqrs.codeplex.com)。在Alpha版本中,Apworks仅提供了基于内存对象的“直接事件总线(Direct Event Bus)”,它在获得来自领域仓储的事件后,会直接调用派发器实现事件派发,于是查询数据库也将被同步更新。Direct Event Bus的最大弊病就是要求Event Bus与Command部分在物理上被部署在同一台机器上(因为是直接内存对象实现的),而且其它任何外部系统都无法访问Event Bus,这在系统整合方面就造成了很大的困难。现在,Apworks已经能够支持基于MSMQ的总线机制了,无论是Command Bus还是Event Bus,都可以基于MSMQ来实现。通过使用MSMQ,基于CQRS架构的应用程序在系统整合的方案选取上获得了巨大的发挥空间,比如我们可以使用Biztalk Server的MSMQ Adapter来访问MSMQ。有关Biztalk Server与CQRS架构的整合,我会在另外的文章中讨论,这里不作太多介绍。

    为了使得TinyLibrary CQRS演示案例能够支持当前版本的Apworks,并希望在演示中使用MSMQ替代原有的Direct Event Bus作为事件总线,就需要实现一个具有完整功能的同步服务。对于这个同步服务的实现,我对上述主动与被动两种方式进行了分析,最后决定还是采用主动方式(即定期查询MSMQ)。如果是采用被动方式,那么又有如下三个选项:

    • 使用WCF,在仓储完成Event Store与Event Bus的两次提交(2PC)之后,以WCF客户端的角色,调用同步服务(同时也是WCF服务端的角色)中的方法,并在该方法中完成MSMQ的读取与数据库的同步
    • 使用MSMQ Trigger,但这种方式需要实现并注册COM组件,实现起来不方便
    • 通过Query端的查询请求来通知同步服务完成同步,也就是说仓储不需要对同步服务进行通知,同步服务本身也不去定期地查询MSMQ,而是在出现Query端的查询请求时,触发通知并完成同步任务

    由于Command部分的仓储操作和Query部分的操作是非常频繁的,因此事实上第一个选项和第三个选项会频繁地通知同步服务,造成同步服务不断地读取MSMQ并处理事件同步任务,这又加重了同步服务的负担,降低了系统性能。而基于MSMQ Trigger的方式,实现则相对更为复杂。权衡一下,针对TinyLibrary CQRS这个演示案例,我还是打算采取主动方式实现同步服务。

    TinyLibrary CQRS中基于MSMQ的同步服务的实现方式

    总体上讲,TinyLibrary CQRS演示案例的同步服务的设计,主体上有以下几个方面:

    • 结合Windows Service和控制台应用的实现方式
    • MSMQ的定期查询
    • 事件数据读取

    结合Windows Service和控制台应用的实现方式

    在做服务程序调试的过程中,与读取日志相比,我们更希望能够看到一些实时的结果;而在生产环境中,服务通常以后台的形式运行,并会将一些结果、错误信息写到日志中。TinyLibrary CQRS同步服务结合了这两种方式,在开发的时候可以以控制台方式运行,后台则以Windows Service的形式运行。实现这样的效果其实很简单,首先创建一个控制台应用程序,然后向其中添加一个继承于System.ServiceProcess.ServiceBase的类,并在该类中重写OnStart、OnStop等方法以实现服务运行逻辑。控制台应用程序通常会有一个Main的静态函数作为其执行入口,那么我们只需要在这个Main静态函数中以new关键字创建刚刚新建的类的实例,即可启动服务。大致代码如下:

    01 public sealed class SynchronizationServiceProc : ServiceBase
    02 {
    03 #if !CONSOLE
    04   static void Main()
    05   {
    06       ServiceBase.Run(new SynchronizationServiceProc());
    07   }
    08 #endif
    09      
    10   public void StartProc()
    11   {
    12     // 处理启动逻辑
    13   }
    14    
    15   public void StopProc()
    16   {
    17     // 处理停止逻辑
    18   }
    19    
    20   protected override void OnStart(string[] args)
    21   {
    22     this.StartProc();
    23   }
    24    
    25   protected override void OnStop()
    26   {
    27     this.StopProc();
    28   }
    29 }
    30  
    31 #if CONSOLE
    32 class Program
    33 {
    34   static void Main(string[] args)
    35   {
    36     using (SynchronizationServiceProc proc = new SynchronizationServiceProc())
    37     {
    38       proc.StartProc();
    39       Console.ReadLine();
    40       proc.StopProc();
    41     }
    42   }
    43 }
    44 #endif

    你会发现在上面的代码中有两个Main的静态函数,如果让它们同时存在的话,是无法编译通过的。因此,我在这个控制台程序的Build选项中,向Conditional compilation symbols添加了CONSOLE宏,并在上面的代码中加入了#if/#endif的宏判断以支持两种不同的编译方式。另外,如需通过installutil.exe命令行安装Windows Service的话,还需向这个控制台程序添加Installer Class。在此就不详述这个过程了。

    MSMQ的定期查询

    TinyLibrary CQRS的同步服务中,使用System.Timers.Timer类,实现对MSMQ的定期查询。事实上,TinyLibrary CQRS的同步服务并不是真正在Timer的Elapsed事件被触发的时候进行同步操作的。同步操作会被BackgroundWorker分派到另一个线程中执行,这个待会我会介绍。Timer的Elapsed事件只对MSMQ中是否有消息进行判断,首先,确定BackgroundWorker是空闲的,然后读取MSMQ并判断其中是否有消息,若有,则启动BackgroundWorker进行同步操作,否则直接返回。当下一次间隔时间到来时,如果BackgroundWorker正在处理上一次触发的任务,那么Elapsed处理函数会直接返回,于是就达到了既能持续监听MSMQ,又能有效地处理同步任务的目的。Timer的Elapsed代码如下:

    01 private void timer_Elapsed(object sender, ElapsedEventArgs e)
    02 {
    03     // 如果BackgroundWorker为空闲状态,则
    04     // 查询MSMQ以确定是否有消息
    05     if (!worker.IsBusy)
    06     {
    07         int messageCount = 0;
    08         List<string> messageIds = new List<string>();
    09         using (MessageQueue messageQueue =
    10             new MessageQueue(this.EventMessageQueue))
    11         {
    12             var messages = messageQueue.GetAllMessages();
    13             messageCount = messages.Length;
    14             messageIds = messages.Select(p => p.Id).ToList();
    15             messageQueue.Close();
    16         }
    17         // 如果MSMQ中有消息,则启动BackgroundWorker
    18         // 并将所有消息的ID作为参数传给BackgroundWorker
    19         if (messageCount > 0)
    20         {
    21             worker.RunWorkerAsync(messageIds);
    22         }
    23     }
    24 }

    事件数据读取

    这个功能是在一个单独的线程中完成的。Tiny Library CQRS的同步服务采用Background Worker实现这一机制。在Background Worker的DoWork事件处理函数中,首先读取由Timer传入的消息ID列表,然后使用MSMQ的PeekById方法根据ID读取消息内容,同时对读入的消息进行组织(比如判断消息的正确性、获取消息的二进制代码、将二进制代码反序列化为XML字符串、从XML字符串解析出领域事件的类型以及事件触发时间等信息)。最后,通过这些已组织好的数据信息构建出领域事件的实体,并使用消息派发器(Message Dispatcher)将事件派发出去。

    在这里有两个需要认真思考的问题:

    • 如果事件处理失败怎么办? - 所以我们用的是PeekById,而不是ReceiveById。PeekById只会根据ID从MSMQ读取出消息,而不会将其移除;ReceiveById则会将消息移除
    • Peek、PeekById、Receive、ReceiveById都是阻塞式调用,如果读取消息不成功怎么办? - 有网上资料提议使用异步的方式,比如使用BeginReceive等,但这种方式在异步完成处理时仍需要另一个BeginReceive请求来完成下一个消息的读取操作,从实现上看无非就是多出了几个处理线程,并没有对系统性能带来太大好处,而且增加了实现的复杂度

    Background Worker的DoWork事件处理函数大致如下:

    01 private void worker_DoWork(object sender, DoWorkEventArgs e)
    02 {
    03     BackgroundWorker localWorker = sender as BackgroundWorker;
    04     if (localWorker.CancellationPending)
    05     {
    06         e.Cancel = true;
    07         return;
    08     }
    09     List<string> allMessageIds = e.Argument as List<string>;
    10     var messageCount = allMessageIds.Count;
    11     List<DomainEventMessageContent> messageContents =
    12         new List<DomainEventMessageContent>();
    13     using (MessageQueue messageQueue = new MessageQueue(this.EventMessageQueue))
    14     {
    15         messageQueue.MessageReadPropertyFilter.SentTime = true;
    16         for (int i = 0; i < messageCount; i++)
    17         {
    18             Message message = messageQueue.PeekById(allMessageIds[i],
    19                 this.EventMessageReceiveTimeout);
    20             var messageContent = new DomainEventMessageContent(message);
    21             messageContents.Add(messageContent);
    22         }
    23         messageQueue.Close();
    24     }
    25     var sortedMessageContents = messageContents.OrderBy(mc => mc.SentTime);
    26     foreach (var mc in sortedMessageContents)
    27     {
    28         bool canRemove = true;
    29         try
    30         {
    31             if (!mc.IsValidMessage)
    32                 throw new Exception("Invalid Message Content.");
    33             OnProcessing(mc);
    34             Type eventType = Type.GetType(mc.Type);
    35             if (eventType != null)
    36             {
    37                 DomainEventXmlSerializer xmlSerializer =
    38                     new DomainEventXmlSerializer();
    39                 var domainEvent = xmlSerializer.Deserialize(eventType, mc.Bytes);
    40                 messageDispatcher.DispatchMessage(domainEvent);
    41             }
    42             else
    43                 canRemove = false;
    44         }
    45         catch (Exception ex)
    46         {
    47             OnProcessFailed(mc.MessageId, mc, ex);
    48             canRemove = false;
    49         }
    50         finally
    51         {
    52             if (canRemove)
    53             {
    54                 using (MessageQueue messageQueue =
    55                     new MessageQueue(this.EventMessageQueue))
    56                 {
    57                     try
    58                     {
    59                         messageQueue.ReceiveById(mc.MessageId,
    60                             this.EventMessageReceiveTimeout);
    61                     }
    62                     finally
    63                     {
    64                         messageQueue.Close();
    65                     }
    66                 }
    67             }
    68         }
    69     }
    70 }

     

    从上面的代码可以看到,在处理和派发消息时,如果失败,则会引发ProcessFailed事件,同时会将canRemove设置为false,以防止未成功处理的消息从消息队列中移除,造成数据丢失。在finally代码块中,会对已成功处理的消息进行移除操作。

    此外,在处理所有获得的消息之前,程序会首先根据消息的发送事件对消息进行排序。这样做的目的是确保消息是按照其发布的顺序进行处理的。比如修改客户信息的消息一定是在创建客户信息之后被处理的。貌似MSMQ并不能够100%确保其Send、Receive的操作是FIFO(First In First Out)的,好像是与队列是否为事务性队列有关系,这部分内容还值得继续研究。不管怎样,对消息排序总归还是行得通的。

    运行效果

    • 启动同步服务
    • 向MSMQ中随意发送一条文本消息,同步服务会读取这个消息并试图处理。由于在处理时发现消息格式不正确,同步服务会显示出错误信息,并在MSQM中保留这个消息,以便在下一时间到来时试图再次处理该消息
      image

    • 创建一个UserAccountCreated的领域事件,以表示有一个用户账号被创建。通过发起RegisterUserAccount命令,Command Handler会向领域仓储保存新创建的UserAccount实体。领域仓储在保存实体(确切地说是实体的领域事件序列)时,同时会将领域事件发送到MSMQ事件总线。以下是发起这个RegisterUserAccount命令的测试代码:
    01 [TestMethod]
    02 public void CommandBus_HandleRegisterUserAccountCommandTest()
    03 {
    04     RegisterUserAccountCommand registerUserAccountCommand =
    05         new RegisterUserAccountCommand
    06     {
    07         UserName = "daxnet",
    08         Password="password",
    09         DisplayName="Sunny Chen",
    10         Email = "daxnet@live.com",
    11         ContactPhone = "1234567",
    12         ContactAddressZip="201203",
    13         ContactAddressCity="Shanghai",
    14         ContactAddressState="Shanghai",
    15         ContactAddressCountry="China",
    16         ContactAddressStreet="Zuchongzhi Rd.",
    17     };
    18  
    19     using (ICommandBus commandBus = appIniter
    20         .Application
    21         .ObjectContainer
    22         .GetService<ICommandBus>())
    23     {
    24         commandBus.Publish(registerUserAccountCommand);
    25         commandBus.Commit();
    26     }
    27     long msgCnt = TestEnvironment.GetMessageCount();
    28     int recordCnt = TestEnvironment.GetDomainEventsTableRecordCount();
    29     Assert.AreEqual(1, recordCnt);
    30 }
    • 同步服务在获得了来自Command部分的领域事件消息后,便对消息进行信息提取,然后使用事件派发器派发到相应的事件处理器(Event Handler),我们可以通过同步服务的输出结果看到消息已经被处理:
      image

    • 事件处理器(Event Handler)在获得了来自消息派发器(Event Dispatcher)的事件之后,直接使用SQL语句更新查询数据库。Event Handler代码如下:
    01 public class TinyLibraryCQRSEventHandler : IEventHandler<UserAccountCreatedEvent>
    02 {
    03     private string queryDBConnectionString = null;
    04      
    05     private string QueryDBConnectionString
    06     {
    07         get
    08         {
    09             if (queryDBConnectionString == null)
    10                 queryDBConnectionString = ConfigurationManager
    11                     .ConnectionStrings["QueryDBConnectionString"].ConnectionString;
    12             return queryDBConnectionString;
    13         }
    14     }
    15  
    16     #region IHandler<UserAccountCreatedEvent> Members
    17  
    18     public bool Handle(UserAccountCreatedEvent message)
    19     {
    20         string insertUserAccoutSql = @"INSERT INTO [UserAccounts]
    21 ([UserName], [Password], [DisplayName], [Email], [ContactPhone], [Address_Country],
    22  [Address_State], [Address_Street], [Address_City], [Address_Zip])
    23 VALUES
    24 (@userName, @password, @displayName, @email,
    25 @contactPhone, @country, @state, @street, @city, @zip)";
    26  
    27         var rowsAffected = SqlHelper.ExecuteNonQuery(QueryDBConnectionString,
    28             CommandType.Text, insertUserAccoutSql,
    29             new SqlParameter("@userName", message.UserName),
    30             new SqlParameter("@password", message.Password),
    31             new SqlParameter("@displayName", message.DisplayName),
    32             new SqlParameter("@email", message.Email),
    33             new SqlParameter("@contactPhone", message.ContactPhone),
    34             new SqlParameter("@country", message.ContactAddressCountry),
    35             new SqlParameter("@state", message.ContactAddressState),
    36             new SqlParameter("@street", message.ContactAddressStreet),
    37             new SqlParameter("@city", message.ContactAddressCity),
    38             new SqlParameter("@zip", message.ContactAddressZip));
    39  
    40         return rowsAffected > 0;
    41     }
    42  
    43     #endregion
    44 }
    • 最后,检查查询数据库,我们发现UserAccounts数据表中已经产生了所需的记录:
      image

    总结

    通过这篇文章的介绍,我们不仅了解了Tiny Library CQRS演示案例中同步服务的实现方式,我们还了解了CQRS架构中同步服务的主要任务和大致上的操作过程。当然,本文给出的这种实现方式也不是100%的能够确保所有的消息都能够被准确、正确地处理,或许有可能还是会造成数据丢失,但这至少是一种解决方案,而且还是具有相当的改进余地。针对这种方案,我们会有两个疑惑:1、MSMQ查询频率应该是多少?我在案例中使用的是5秒,太频繁会导致服务器严重过载,但太不频繁又会导致数据的不实时性。对于这个不实时性的处理,我提个方案,就是对领域事件的优先级进行规划,并根据优先级对领域事件进行路由,采用不同的同步服务进行处理。2、对于某些需要多个领域事件进行确认的业务逻辑,很抱歉,本文提供的演示案例暂不支持Saga,Apworks目前的版本也不支持Saga,这个问题我会在后续版本的Apworks框架中逐步解决。

    部分代码示例

    单击此处下载与本文相关的部分代码示例。整个Tiny Library CQRS项目的最新版目前正在进行中,因此在codeplex上并无任何与此版本相关的签入代码。敬请谅解。

  • 相关阅读:
    Spring Boot (20) 拦截器
    Spring Boot (19) servlet、filter、listener
    Spring Boot (18) @Async异步
    Spring Boot (17) 发送邮件
    Spring Boot (16) logback和access日志
    Spring Boot (15) pom.xml设置
    Spring Boot (14) 数据源配置原理
    Spring Boot (13) druid监控
    Spring boot (12) tomcat jdbc连接池
    Spring Boot (11) mybatis 关联映射
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/2162470.html
Copyright © 2011-2022 走看看