zoukankan      html  css  js  c++  java
  • 简单的学习,实现,领域事件,事件存储,事件溯源

    为什么写这篇文章

    自己以前都走了弯路,以为学习战术设计就会DDD了,其实DDD的精华在战略设计,但是对于我们菜鸟来说,学习一些技术概念也是挺好的 经常看到这些术语,概念太多,也想简单学习一下,记忆力比较差记录一下实现的细节

    领域事件

    1.领域事件是过去发生的与业务有关的事实,一但发生就不可更改,所以存储事件时只能追加 3.领域事件具有时间点的特征,所有事件连接起来会形成明显的时间轴 4.领域事件会导致目标对象状态的变化,聚合根的行为会产生领域事件,所以会改变聚合的状态 在聚合根里面维护一个领域事件的聚合,每一个事件对应一个Handle,通过反射维护一个数据字典,通过事件查找到指定的Handle 领域事件实现的方式:目前看到有3种方式,MediatR,消息队列 ,发布订阅模式 eShopOnContainers 中使用的是MediatR ENode 中使用的是EQueue,EQueue是一个纯C#写的消息队列 使用已经写好的消息队列Rabbitmq ,kafka

    事件存储,事件溯源,事件快照

    事件存储:存储所有聚合根里面发生过的事件 1.事件存储中可以做并发的处理,比如Command 重复,领域事件的重复 2.领域事件的重复通过聚合根Id+版本号判断,可以在数据库中建立联合唯一索引,在存储事件时检测重复,记录重复的事件,根据业务做处理 3.这里要保证存储事件与发布领域事件的一致性 如何保证存储事件与发布领域事件的一致性 先存储事件然后在发布领域事件,如果发生异常,就一直重试,一直到成功为止,也可以做一定的处理,比如重试到一定的次数,就通知,进行人工处理 我选择了CAP + Policy + Dapper 事件溯源:在事件存储中记录导致状态变化的一系列领域事件。通过持久化记录改变状态的事件,通过重新播放获得状态改变的历史。 事件回放可以返回系统到任何状态 聚合快照:聚合的生命周期各有长短,有的聚合里面有大量的事件,,事件越多加载事件以及重建聚合的执行效率就会越来越低,快照里面存储的是聚合 1.定时存储整个聚合根:使用定时器每隔一段时间就存储聚合到快照表中 2.定量存储整个聚合根:根据事件存储中的数量来存储聚合到快照表中 事件溯源的实现方式 1.首先我们需要实现聚合In Memory, 2.在CommandHandler中订阅 Command命令, 创建聚合时 ,在内存中维护一个数据字典,key为:聚合根的Id,value为:聚合 修改,删除,聚合时,根据聚合根的Id,查询出聚合 如果内存中聚合不存在时:根据聚合根的Id 从聚合快照表中查询出聚合,然后根据聚合快照存储的时间,聚合根Id,查询事件存储中的所有事件,然后回放事件,得到聚合最终的状态

    记录遇到的问题

    由于基础非常的差,所以实现的方式都是以最简单的方式来写的,存在许多的问题,代码中有问题的地方希望大家提出来,让我学习一下 代码的实现目前还没有写快照的部分,也没有处理EventStorage中的命令重复与聚合根+版本号重复,具体的请看汤总的ENode,里面有全部的实现 1.怎样保证存储事件,发布事件的最终一致性 2.怎么解析EventStorage中的事件,回放事件 先存储事件,当事件存储成功之后,在发布事件 存储事件失败:就一直重试,发布事件失败,使用的是CAP,CAP内部使用的是本地消息表的方式,如果发布事件失败,也一直重试,如果服务器重启了,Rabbitmq里面消息为Ack,消息没有丢,重连后会继续执行 存储事件,发布事件
        /// <summary>
        /// 存储聚合根中的事件到EventStorage 发布事件
        /// </summary>
        /// <typeparam name="TAggregationRoot"></typeparam>
        /// <param name="event"></param>
        /// <returns></returns>
        public async Task AppendEventStoragePublishEventAsync<TAggregationRoot>(TAggregationRoot @event)
            where TAggregationRoot : IAggregationRoot
        {
            var domainEventList = @event.UncommittedEvents.ToList();
            if (domainEventList.Count == 0)
            {
                throw new Exception("请添加事件!");
            }
    
            await TryAppendEventStorageAsync(domainEventList).ContinueWith(async e =>
            {
                if (e.Result == (int)EventStorageStatus.Success)
                {
                    await TryPublishDomainEventAsync(domainEventList).ConfigureAwait(false);
                    @event.ClearEvents();
                }
            });
        }
    
        /// <summary>
        /// 发布领域事件
        /// </summary>
        /// <returns></returns>
        public async Task PublishDomainEventAsync(List<IDomainEvent> domainEventList)
        {
            using (var connection =
                new SqlConnection(ConnectionStr))
            {
                if (connection.State == ConnectionState.Closed)
                {
                    await connection.OpenAsync().ConfigureAwait(false);
                }
                using (var transaction = await connection.BeginTransactionAsync().ConfigureAwait(false))
                {
                    try
                    {
                        if (domainEventList.Count > 0)
                        {
                            foreach (var domainEvent in domainEventList)
                            {
                                await _capPublisher.PublishAsync(domainEvent.GetRoutingKey(), domainEvent).ConfigureAwait(false);
                            }
                        }
                        await transaction.CommitAsync().ConfigureAwait(false);
                    }
                    catch (Exception e)
                    {
                        await transaction.RollbackAsync().ConfigureAwait(false);
                        throw;
                    }
                }
            }
        }
    
        /// <summary>
        /// 发布领域事件重试
        /// </summary>
        /// <param name="domainEventList"></param>
        /// <returns></returns>
        public async Task TryPublishDomainEventAsync(List<IDomainEvent> domainEventList)
        {
            var policy = Policy.Handle<SocketException>().Or<IOException>().Or<Exception>()
                .RetryForeverAsync(onRetry: exception =>
                {
                    Task.Factory.StartNew(() =>
                    {
                        //记录重试的信息
                        _loggerHelper.LogInfo("发布领域事件异常", exception.Message);
                    });
                });
            await policy.ExecuteAsync(async () =>
            {
                await PublishDomainEventAsync(domainEventList).ConfigureAwait(false);
            });
    
        }
    
        /// <summary>
        /// 存储聚合根中的事件到EventStorage中
        /// </summary>
        /// <returns></returns>
        public async Task<int> AppendEventStorageAsync(List<IDomainEvent> domainEventList)
        {
            if (domainEventList.Count == 0)
            {
                throw new Exception("请添加事件!");
            }
            var status = (int)EventStorageStatus.Failure;
            using (var connection = new SqlConnection(ConnectionStr))
            {
                try
                {
                    if (connection.State == ConnectionState.Closed)
                    {
                        await connection.OpenAsync().ConfigureAwait(false);
                    }
                    using (var transaction = await connection.BeginTransactionAsync().ConfigureAwait(false))
                    {
                        try
                        {
                            if (domainEventList.Count > 0)
                            {
                                foreach (var domainEvent in domainEventList)
                                {
                                    EventStorage eventStorage = new EventStorage
                                    {
                                        Id = Guid.NewGuid(),
                                        AggregateRootId = domainEvent.AggregateRootId,
                                        AggregateRootType = domainEvent.AggregateRootType,
                                        CreateDateTime = domainEvent.CreateDateTime,
                                        Version = domainEvent.Version,
                                        EventData = Events(domainEvent)
                                    };
                                    var eventStorageSql =
                                        $"INSERT INTO EventStorageInfo(Id,AggregateRootId,AggregateRootType,CreateDateTime,Version,EventData) VALUES (@Id,@AggregateRootId,@AggregateRootType,@CreateDateTime,@Version,@EventData)";
                                    await connection.ExecuteAsync(eventStorageSql, eventStorage, transaction).ConfigureAwait(false);
                                }
                            }
                            await transaction.CommitAsync().ConfigureAwait(false);
                            status = (int)EventStorageStatus.Success;
                        }
                        catch (Exception e)
                        {
                            await transaction.RollbackAsync().ConfigureAwait(false);
                            throw;
                        }
                    }
    
                }
                catch (Exception e)
                {
                    connection.Close();
                    throw;
                }
            }
            return status;
        }
    
        /// <summary>
        /// AppendEventStorageAsync异常重试
        /// </summary>
        public async Task<int> TryAppendEventStorageAsync(List<IDomainEvent> domainEventList)
        {
            var policy = Policy.Handle<SocketException>().Or<IOException>().Or<Exception>()
                .RetryForeverAsync(onRetry: exception =>
                {
                    Task.Factory.StartNew(() =>
                    {
                        //记录重试的信息
                        _loggerHelper.LogInfo("存储事件异常", exception.Message);
                    });
                });
            var result = await policy.ExecuteAsync(async () =>
              {
                  var resulted = await AppendEventStorageAsync(domainEventList).ConfigureAwait(false);
                  return resulted;
              });
            return result;
        }
    
        /// <summary>
        /// 根据DomainEvent序列化事件Json
        /// </summary>
        /// <param name="domainEvent"></param>
        /// <returns></returns>
        public string Events(IDomainEvent domainEvent)
        {
            ConcurrentDictionary<string, string> dictionary = new ConcurrentDictionary<string, string>();
            //获取领域事件的类型(方便解析Json)
            var domainEventTypeName = domainEvent.GetType().Name;
            var domainEventStr = JsonConvert.SerializeObject(domainEvent);
            dictionary.GetOrAdd(domainEventTypeName, domainEventStr);
            var eventData = JsonConvert.SerializeObject(dictionary);
            return eventData;
        }
    
    解析EventStorage中存储的事件
        public async Task<List<IDomainEvent>> GetAggregateRootEventStorageById(Guid AggregateRootId)
        {
            try
            {
                using (var connection = new SqlConnection(ConnectionStr))
                {
                    var eventStorageList = await connection.QueryAsync<EventStorage>($"SELECT * FROM dbo.EventStorageInfo WHERE AggregateRootId='{AggregateRootId}'");
                    List<IDomainEvent> domainEventList = new List<IDomainEvent>();
                    foreach (var item in eventStorageList)
                    {
                        var dictionaryDomainEvent = JsonConvert.DeserializeObject<Dictionary<string, string>>(item.EventData);
                        foreach (var entry in dictionaryDomainEvent)
                        {
                            var domainEventType = TypeNameProvider.GetType(entry.Key);
                            if (domainEventType != null)
                            {
                                var domainEvent = JsonConvert.DeserializeObject(entry.Value, domainEventType) as IDomainEvent;
                                domainEventList.Add(domainEvent);
                            }
                        }
                    }
                    return domainEventList;
                }
            }
            catch (Exception ex)
            {
                throw;
            }
    

    注意事项

    1.事件没持久化就代表事件还没发生成功,事件存储可能失败,必须先存储事件,在发布事件,保证存储事件与发布事件一致性 1.使用事件驱动,必须要做好冥等的处理 2.如果业务场景中有状态时:通过状态来控制 3.新建一张表,用来记录消费的信息,消费端的代码里面,根据唯一的标识,判断是否处理过该事件 4.Q端的任何更新都应该把聚合根ID和事件版本号作为条件,Q端的更新不用遵循聚合的原则,可以使用最简单的方式处理 5.仓储是用来重建聚合的,它的行为和集合一样只有Get ,Add ,Delete 6.DDD不是技术,是思想,核心在战略模块,战术设计是实现的一种选择,战略设计,需要面向对象的分析能力,职责分配,深层次的分析业务

    感谢

    虽然学习DDD的时间不短了,感觉还是在入门阶段,在学习的过程中有许多的不解,经常问ENode群里面的大佬,也经常@汤总,谢谢大家的帮助与解惑。
  • 相关阅读:
    HBase数据存储格式
    百元买百鸡
    驾驶机动车在高速公路上倒车、逆行、穿越中央分隔带掉头的一次记6分。
    驾驶机动车在高速公路遇到能见度低于200米的气象条件时,最高车速是多少?_2600218
    驾驶人违反交通运输管理法规发生重大事故后,因逃逸致人死亡的,处3年以上7年以下有期徒刑。
    国家级心理咨询师培训(二、三级)_课程开设_北京林业大学在职课程进修班(同等学历)网站
    三级心理咨询师_百度百科
    国家二级心理咨询师_百度百科
    国家二级心理咨询师
    国家二级心理咨询师
  • 原文地址:https://www.cnblogs.com/lifeng618/p/11916831.html
Copyright © 2011-2022 走看看