zoukankan      html  css  js  c++  java
  • CodeSharp.EventSourcing框架介绍如何实现异步事件订阅

    上一篇介绍了CodeSharp.EventSourcing框架的概况和一个简单的也是最常用的使用场景,本篇文章想介绍一下如何实现异步事件订阅。

    异步事件订阅是指:领域层中一个事件发出来后,事件订阅者的响应处理不是在同一个线程中同步执行,而是该事件会被放在消息队列,然后由另外一个线程从队列取出该事件消息,然后响应处理。由于消息的发出和处理不是在同一个线程和事务中实现,所以我们叫做异步事件订阅或处理。

    本文先简单介绍一个例子,然后讲解背后框架实现的思路。

    一般涉及到异步处理的,最好放在两个端点处理,一个端点发出事件消息,另一个端点响应处理事件消息;发出事件消息的端点往队列中发送消息,处理事件消息的端点从队列中取出消息。

    所以我建了两个控制台应用:

    第一个工程负责实现业务逻辑,产生可溯源事件,将事件发送到消息队列;第二个工程负责从队列取出消息,然后处理消息;看一下这两个工程的启动逻辑:

    EventSourcing.Sample.AsyncEventBus:

    与框架平常的初始化过程相比,就是需要再增加一句话,StartAsyncEventPublisherEndpoint(),这句话告诉框架,当前端点需要开启异步发送事件消息的功能,即当前端点是一个异步事件的发送方端点。这个工程的其他代码就和开篇中介绍的架构没什么不同了,我们不需要在该工程中实现事件订阅者,因为它会在下面的工程中实现。

    EventSourcing.Sample.AsyncEventBus.Host:

    与框架平常的初始化过程相比,就是需要再增加一句话,StartAsyncEventSubscriberEndpoint(),这句话告诉框架,当前端点需要开启异步接收事件消息的功能,即当前端点是一个异步事件的订阅方端点。异步事件订阅类的实现代码如下:

    复制代码
    public class NoteEventSubscriber
    {
        private IDbConnectionFactory _connectionFactory;
    
        public NoteEventSubscriber(IDbConnectionFactory connectionFactory)
        {
            _connectionFactory = connectionFactory;
        }
    
        [AsyncEventHandler]
        public void Handle(NoteCreated evnt)
        {
            using (var connection = _connectionFactory.OpenConnection())
            {
                connection.Insert(evnt, "EventSourcing_Sample_Note");
            }
        }
    
        [AsyncEventHandler]
        public void Handle(NoteTitleChanged evnt)
        {
            using (var connection = _connectionFactory.OpenConnection())
            {
                connection.Update(
                new { Title = evnt.Title, UpdatedTime = evnt.UpdatedTime },
                new { Id = evnt.Id },
                "EventSourcing_Sample_Note");
            }
        }
    }
    复制代码

    和同步事件订阅类的实现相比,基本上一致,只有两个地方不同:1)用AsyncEventHandler特性来标记某个方法是异步事件响应方法,同步的时候是用SyncEventHandler特性;2)IDbConnection实例的获取总是通过connectionFactory新建出来一个新的数据库连接,同步的时候是获取一个当前上下文可用的数据库连接;这里为什么我们需要每次open一个新的数据库连接是因为当前事件响应函数所在上下文不存在一个可用的事务对象,并且实际上异步的情况下,我们应该让处理函数总是自己负责连接数据库,更新数据库,最后释放连接。 理论上,一个事件订阅者可能就会是一个独立的端点。

    好了,示例代码介绍完了,简单吧。

    下面谈谈框架如何实现事件消息的异步订阅。

    思路很总要,先说一下思路:

    1. context.SaveChanges时,事件被持久化完成后需要被publish出去,所以我们需要一个IAsyncEventPublisher接口,用来将给定事件分发出去;当前IAsyncEventPublisher默认是被禁用的,只有当用户明确告诉框架当前应用需要支持异步发送消息时(也就是上面例子中调用了StartAsyncEventPublisherEndpoint方法后)才会工作。
    2. IAsyncEventPublisher需要知道该把当前给定的事件发送到哪些端点(endpoint),可能会有多个订阅者端点订阅该事件;所以我们需要一个ISubscriptionStore的东西用来存储事件类型与订阅者端点地址的映射关系,这样我们就可以根据事件的类型知道该把事件发送给哪些端点;这里的一个问题时,如果每次要知道当前消息要发给哪些订阅者端点时都要从数据库查询,那效率一定不高。所以我采用一种折中的做法,就是:第一次加载时从数据库获取所有订阅信息,并缓存在内存,以后每隔30秒去更新缓存,用一个简单的计时器就能实现。这样一来确保高性能,不会频繁查询数据库,二来订阅者信息的延迟也最多只有30秒,一般一个应用的异步订阅者是不会变动的;
    3. 知道了要发给哪些端点后,需要将当前给定的事件封装为一个可在网络上传输(支持序列化和反序列化)的Message对象;目前框架默认使用MSMQ来作为消息队列进行消息传输。
    4. 将Message发送到每个端点,但是这个职责最好不要由IAsyncEventPublisher自己实现,因为如果他自己实现,他就和固定的消息发送逻辑绑定在一起了,以后不利于替换消息发送的逻辑;所以我们应该定义一个IMessageTransport接口,专门用于消息发送、消息传输、取出消息的功能;我们可以设计一个void SendMessage(Message message, Address targetAddress)这样的方法,将指定的消息发送到指定的端点地址去;
    5. 好了,对于消息发送的逻辑到上面这4步就完成了;接下来需要考虑如何实现消息接收和处理的问题。上面第4步提到,IMessageTransport具有消息发送、消息传输、取出消息的功能。但是由谁以及什么时候去触发取出消息这个动作呢,也就是说该由谁负责调用IMessageTransport的PopMessage方法呢?还有另外一个问题是,如何尽可能及时地处理到达队列的消息呢?我们的期望当然是消息一到达就能立即处理,当然这是不可能的,肯定会有一点点延迟。我们先讨论该由谁主动去取出消息的问题。其实很简单,我们前面讨论过,消息发送给了多个订阅者端点,所以我们肯定需要定义一个IAsyncEventSubscriberEndpoint这样的东西,它表示一个事件订阅者端点。它负责定时不间断的去监听是否有新消息到达(访问IMessageTransport.PopMessage方法),一旦有新消息到达,则需要马上进行处理。目前实现监听的方法就是用一个计时器,每隔1秒扫描队列中是否有新消息进来。上面例子中的:StartAsyncEventSubscriberEndpoint方法内部做的事情就是启动当前的事件订阅者端点。端点一启动,它内部的计时器就启动了,然后就开始了自动监听新队列消息的功能;
    6. 消息从队列取出后,接下来就是处理消息了。处理消息的逻辑很简单,和同步事件处理的逻辑一致。思路是:1)根据事件类型找到有哪些处理函数;2)循环调用每个处理函数进行处理即可,这里我们应该给每个处理函数在处理时总是用try/catch,因为我们不希望当一个事件处理函数出异常后别的处理函数无法执行了。这点和同步的事件处理函数有所不同,同步的事件处理函数只要有任何一个挂掉,那整个事务就应该自动回滚。

    总体思路就是这样,这个过程中的一些细节,比如如何存储事件类型与处理函数元数据之间的映射关系,这里就不讨论了,框架已经设计了灵活的接口和默认实现。在后续的例子中会进一步讨论如何设计和存储事件处理函数元数据。

    使用场景:

    我们一般在什么时候用异步事件订阅呢?我的想法是,如果当前业务允许数据有一定延迟,或者数据操作比较耗时,或者应用追求在极高的并发情况时仍要有极高的性能的话,我们应该考虑使用异步的事件订阅机制。实际上,在EventSourcing和CQRS的架构下,异步事件处理是首选,因为只有这样才能突出EventSourcing其中之一的一个价值:高性能。如果CQRS的命令端只需要保存事件,而显示表的数据可以通过异步的方式保存,那整个应用的性能将是非常高效的。当然,如果你觉得异步的方式复杂度太高,那可以采用同步的事件订阅,这样可以确保数据的强一致性;

     
  • 相关阅读:
    SpringMvc 框架
    面试:你最大的长处和弱点分别是什么?这些长处和弱点对你在企业的业绩会有什么样的影响?
    线程、并发、并行、进程是什么,以及如何开启新的线程?
    面向对象三大特性
    一台客户端有三百个客户与三百个客户端有三百个客户对服务器施压,有什么区别?
    JavaScript 引擎
    Spring Data JPA简介 Spring Data JPA特点
    redo log 有什么作用?
    Spring的简介和优点?
    学习笔记——享元模式Flyweight
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/2821030.html
Copyright © 2011-2022 走看看