zoukankan      html  css  js  c++  java
  • CQRS学习——最小单元的Cqrs(CommandEvent)[其一]


    【说明:博主采用边写边思考的方式完成这一系列的博客,所以代码以附件为准,文中代码仅为了说明。】

    结构

    在学习和实现CQRS的过程中,首要参考的项目是这个【http://www.cnblogs.com/yangecnu/p/Introduction-CQRS.html】。所以Dpfb.Cqrs中的整体结构都是参考这个例子来的,在这个基础之上添加和改进。总的来说,.Cqrs项目的整体结构如下所示:

    主要包含了(命令,事件,通信,命令处理,事件处理这几个方面)。具体的角色则如下图所示:

    通信中包含了事件总线和命令总线。由于查询入口(QueryEntry)如何处置暂时没想好,所以先放一个文件夹卖萌。

    实现

    此时的目标是实现一个最小单元的CQRS(主要是命令和事件部分)。其中命令总线和事件总线的实现比较固定,他们的职责就是为命令和事件找到对应的处理类,然后依次调用对象的接口方法,从而将一般的直接调用“打断”,同时提供各种额外操作的注入点。为了更好的控制他们如何寻找处理类,这里引入两个接口ICommandHandlerSearcher以及IEventHandlerSearcher。放在项目的Configuration名称空间下面。现在,可以为这些接口提供一个基础的实现,供测试使用。以下是接口定义:

        public interface ICommand
        {
            Guid Id { get; set; }
        }
    
        public interface IEvent
        {
            Guid Id { get; set; }
        }
    
        public interface IEventHandler<T> where T : IEvent
        {
            void Handle(T @event);
        }
    
        public interface ICommandHandler<T> where T: ICommand
        {
            void Execute(T command);
        }
    
        public interface ICommandBus
        {
            void Send<T>(T command) where T : ICommand;
        }
    
        public interface IEventBus
        {
            void Publish<T>(T @event) where T : IEvent;
        }
    
        public interface ICommandHandlerSearcher
        {
            ICommandHandler<T> Find<T>() where T : ICommand;
        }
    
        public interface IEventHandlerSearcher
        {
            IEnumerable<IEventHandler<T>> Find<T>() where T : IEvent;
        }
    接口定义

    以及通信(Buses)部分的实现:

    public class DpfbEventBus : IEventBus
        {
            void IEventBus.Publish<T>(T @event)
            {
                foreach (var handler in EventHandlerSearcher.Find<T>())
                {
                    handler.Handle(@event);
                }
            }
    
            public IEventHandlerSearcher EventHandlerSearcher { get; set; }
        }
    
    public class DpfbCommandBus : ICommandBus
        {
            void ICommandBus.Send<T>(T command)
            {
                var handler = CommandHandlerSearcher.Find<T>();
                handler.Execute(command);
            }
    
            public ICommandHandlerSearcher CommandHandlerSearcher { get; set; }
        }
    Buses实现

    其中,两个ISearcheres的实现包含了如何寻找Handlers以及到哪里寻找Handlers,这些具体的内容,.Cqrs这层其实并不关心。处于测试的目的,将这些实现代码移到Test名称空间下。并实现本程序集内的查找:

     public class TestCommandHandlerSearcher:ICommandHandlerSearcher
        {
            ICommandHandler<T> ICommandHandlerSearcher.Find<T>()
            {
                var assembly = Assembly.GetCallingAssembly();
                var declaredType = typeof (ICommandHandler<T>);
                var handlers = assembly.GetTypes()
                    .Where(i => declaredType.IsAssignableFrom(i))
                    .Select(i => Activator.CreateInstance(i))
                    .ToArray();
                if (handlers.Count() != 1)
                    throw new Exception();
                return handlers.First() as ICommandHandler<T>;
            }
        }
    
     public class TestEventHandlerSearcher : IEventHandlerSearcher
        {
            IEnumerable<IEventHandler<T>> IEventHandlerSearcher.Find<T>()
            {
                var assembly = Assembly.GetCallingAssembly();
                var declaredType = typeof (IEventHandler<T>);
                var handlers = assembly.GetTypes()
                    .Where(i => declaredType.IsAssignableFrom(i))
                    .Select(i => Activator.CreateInstance(i));
                return handlers.Cast<IEventHandler<T>>();
            }
        }
    Searchers实现

    测试

    接下来添加一些实现代码,以进行测试:

        public class TestEventTwo : IEvent
        {
            Guid IEvent.Id
            {
                get { throw new NotImplementedException(); }
                set { throw new NotImplementedException(); }
            }
        }
    
        public class TestEventOne : IEvent
        {
            Guid IEvent.Id
            {
                get { throw new NotImplementedException(); }
                set { throw new NotImplementedException(); }
            }
        }
    
        public class TestEventHandler : IEventHandler<TestEventOne>,
            IEventHandler<TestEventTwo>
        {
            void IEventHandler<TestEventOne>.Handle(TestEventOne @event)
            {
                Console.WriteLine("处理了事件eventOne");
                Configuration.EventBus.Publish(new TestEventTwo());
                Console.WriteLine("引发了事件eventOne");
            }
    
            void IEventHandler<TestEventTwo>.Handle(TestEventTwo @event)
            {
                Console.WriteLine("处理了事件eventTwo");
            }
        }
    
        public class TestCommandHandler : ICommandHandler<TestCommand>
        {
            void ICommandHandler<TestCommand>.Execute(TestCommand command)
            {
                Console.WriteLine("获取并处理消息:" + command.Message);
                var eventOne = new TestEventOne();
                Configuration.EventBus.Publish(eventOne);
            }
        }
    
        public class TestCommand : ICommand
        {
            Guid ICommand.Id
            {
                get { throw new NotImplementedException(); }
                set { throw new NotImplementedException(); }
            }
    
            public string Message { get; set; }
        }
    
        public static class Configuration
        {
            public static ICommandBus CommandBus = new DpfbCommandBus();
    
            public static IEventBus EventBus = new DpfbEventBus();
        }
    测试代码

    其中,Buses作为单例存在于一个静态类的字段中。
    最后,用一个单元测试运行:

    [TestMethod]
            public void TestMethod1()
            {
                var command = new TestCommand {Message = "国庆记得回家"};
                Configuration.CommandBus.Send(command);
            }
    UnitTest

    结果(倒序是正常的,总的来说,事件链的执行会是一个深度优先的调用):

    审计(以及Session)

    在上个测试例子中,使用了控制台输出来表明各个事件的执行顺序。在正常开发过程中,我们总不能到处输出到控制台,就算输出了也不见得有用。所以我们使用另外的方法来实现这一点。博主第一次接触审计这个概念是在接触一个名为ABP的框架的时候【此处应有链接】,里面包含了很多的信息(用户身份,性能计数,时间等...),当时非常震惊。于是就把ABP源码中的一个文件夹扒了下来自己用。ABP实现了Service方法的审计,用的是动态代理(Castle)。而Cqrs本身就留了无数的注入点,所以实现起来更加直观和方便。另外,博主同时参考了ABP的Session的实现,然后搞出一个土鳖版的Session。虽然比较无耻,但是对于以后ABP的理解应该会有帮助。

    博主当初匆匆忙忙的撸了一个CQRS的框架,只是想试水,所以没有想很多,Auditing部分真的是直接扒下来改了改,现在开始写博客,有更多的时间思考,所以打算从头开始实现一个。首先开始抽象,Auditing是一种消息,同时还要考虑它的存放问题,所以定义了以下两个接口(由于Abudting还包含了其他方面,如web,所有接口定义移到Dpfb层):

    public interface IAuditInfo
        {
    
        }
    
     public interface IAuditStorage
        {
            IEnumerable<IAuditInfo> Retrive();
        }
    Auditing接口

    顺便实现一个基于内存的存储:

     public class MemoryAuditStorage:IAuditStorage
        {
            private List<IAuditInfo> _inMemory = new List<IAuditInfo>();
    
            IEnumerable<IAuditInfo> IAuditStorage.Retrive()
            {
                return _inMemory;
            }
    
    
            void IAuditStorage.Save(IAuditInfo auditInfo)
            {
                _inMemory.Add(auditInfo);
            }
        }
    MemoryAuditStorage

    接下来实现Cqrs的命令和时间的审计对象。由于事件的调用链是棵树,这里在Dpfb层引入一些必须的数据结构(见DaaStructure名称空间)。以下是针对Cqrs的AuditInfo实现:

    public class CommandEventAuditInfo : ExtendedTreeNode<CommandEventAuditInfo>, IAuditInfo
        {
            public DateTime InvokedTime { get; set; }
    
            /// <summary>
            /// 单位为毫秒
            /// </summary>
            public int InvokingDuration { get; set; }
    
            public IDpfbSession DpfbSession { get; set; }
    
            public Type CommandEventHandlerType { get; set; }
    
            public Type CommandEventType { get; set; }
    
            public Type DeclaredHandlerType { get; set; }
    
            //todo 待扩展
            public object ThreadInfo { get; set; }
    
            public bool Stopped { get; private set; }
    
            private Stopwatch _stopwatch = new Stopwatch();
    
            public void Start()
            {
                _stopwatch.Start();
            }
    
            public void Stop()
            {
                _stopwatch.Stop();
                InvokingDuration = (int) _stopwatch.ElapsedMilliseconds;
                Stopped = true;
                Current = Parent;
            }
    
            public static ConcurrentDictionary<int, CommandEventAuditInfo> ConcurrentDic =
                new ConcurrentDictionary<int, CommandEventAuditInfo>();
    
            public override string ToString()
            {
                return PrintTree();
            }
    
            public string PrintTree()
            {
                var sb = new StringBuilder();
                var userStr = DpfbSession != null ? DpfbSession.ToString() : "匿名用户";
                var @abstract = string.Format("命令事件调用分析[{3}]:用户[{1}]引发了[{0}],总耗时[{2}]毫秒。调用链:",
                    CommandEventType.Name, userStr, InvokingDuration, InvokedTime);
                sb.Append(@abstract);
                var recursionDepth = 1;
                return PrintTree(this, ref recursionDepth, sb);
            }
    
            private string PrintTree(CommandEventAuditInfo auditInfo, ref int recursionDepth, StringBuilder sb)
            {
                sb.AppendLine();
                var span = recursionDepth == 0
                    ? ""
                    : string.Join("", new string[recursionDepth].Select(i => "         ").ToArray());
                sb.Append(span + "|---");
                sb.AppendFormat("[{2}]处理[{0}],耗时[{1}毫秒]", auditInfo.CommandEventType.Name,
                    auditInfo.InvokingDuration, auditInfo.CommandEventType.Name);
                if (auditInfo.Children.Any())
                    recursionDepth++;
                foreach (var commandEventAuditInfo in auditInfo.Children)
                {
                    PrintTree(commandEventAuditInfo, ref recursionDepth, sb);
                }
                return sb.ToString();
            }
    
            private static CommandEventAuditInfo CreateUnstoppedOnCurrentThread()
            {
                Func<int, CommandEventAuditInfo> creator = k => new CommandEventAuditInfo()
                {
                    ThreadInfo = k
                };
                var threadName = Thread.CurrentThread.ManagedThreadId;
                var auditInfo = ConcurrentDic.GetOrAdd(threadName, creator);
                if (auditInfo.Stopped)
                {
                    return ConcurrentDic.AddOrUpdate(threadName, creator, (k, nv) => creator((int) nv.ThreadInfo));
                }
                return auditInfo;
            }
    
            public static CommandEventAuditInfo StartNewForCommand<TCommand>(Type handlerType) where TCommand : ICommand
            {
                var root = CreateUnstoppedOnCurrentThread();
                root.InvokedTime = DateTime.Now;
                root.CommandEventHandlerType = handlerType;
                root.CommandEventType = typeof (TCommand);
                root.DeclaredHandlerType = typeof (ICommandHandler<TCommand>);
                Current = root;
                return root;
            }
    
            public static CommandEventAuditInfo StartNewForEvent<TEvent>(Type handlerType) where TEvent : IEvent
            {
                var auditInfo = new CommandEventAuditInfo()
                {
                    CommandEventType = typeof (TEvent),
                    CommandEventHandlerType = handlerType,
                    DeclaredHandlerType = typeof (IEventHandler<TEvent>),
                    InvokedTime = DateTime.Now
                };
                Current.Children.Add(auditInfo);
                auditInfo.Parent = Current;
                return auditInfo;
            }
    
            public static CommandEventAuditInfo Root
            {
                get { return CreateUnstoppedOnCurrentThread(); }
            }
    
            public static CommandEventAuditInfo Current { get; private set; }
        }
    AuditInfo4Cqrs

    稍作修改之后,重新运行单元测试:

    [TestMethod]
            public void TestAuditing()
            {
                var command = new TestCommand {Message = "国庆记得回家"};
                Configuration.CommandBus.Send(command);
                command = new TestCommand {Message = "国庆记得回家"};
                Configuration.CommandBus.Send(command);
                foreach (var auditInfo in Configuration.AuditStorage.Retrive())
                {
                    Console.WriteLine(auditInfo.ToString());
                }
            }
    TestAuditing

    这里是运行结果[事件调用链的关系有误,请移步第二篇查看]:

     至于Session,此时实现并不能表示其作用,所以只是定义了一个接口。

    补丁

    至此,一个可以发挥作用的Cqrs就完成了。同时也遗留下一些需要思考的问题:

    【事件的继承关系】

    在博主的目前实现中,是不考虑事件的继承关系的。

    【事件的先后顺序】

    使用Handler实现方法上的Attribute实现,这个仅仅作为辅助功能实现。逻辑上的先后顺序由事件链指定。

    【CqrsAuditng的生命周期】

    目前博主使用Unity作IoC,基于线程的生命周期管理,会存在并发问题(ASP.NET),需要继续考虑。

    【Searchers的性能优化】

    实际上调用链在编译期间就确定了,所以,可以使用ExpressionTree作缓存。

    【事件/命令异步执行的支持】

    在ASP.NET中,使用基于TPL的一套异步编程框架可以提高吞吐。这个可以大致这么理解:在web系统中,存在两种角色,请求入口(IIS)【标记为A】,以及请求处理者(实现代码)【标记为B】。我们先假定整个系统只有一个A。那么,在改进之前,情况是“A接到了一个请求告诉B,然后看着B把事情干完,再把结果告诉请求方”。改进之后的情况是“A接到了一个请求B,告诉B处理完事情之后给个反馈——A马上开始处理其他请求。”显然,在改进之前A的等待时间很长,改进之后A当了甩手掌柜(只等报告),等待时间很短。于是乎,A单位时间能的处理量就上去了,吞吐就上去了。然而,对于一次请求而言,时长取决于B,除非B的工作效率有所改进,不然并不会提升性能。

    这些是博主的个人理解,并不求多少准确,仅希望能大致描述这么一个意思。

    然而博主做了很多测试,并没有反应出这一点。详细内容,请移步这篇文章:【此处应有链接】

    【Audting的配置】

    这是ABP做的好的地方,也是工作量最大的地方,配置的优先级等。 

    此篇完成时,所使用的代码:【http://pan.baidu.com/s/1o6IeNXK

  • 相关阅读:
    schema的详解
    递归删除文件
    如何写一个schema文件
    如何写一个dtd文件
    WebService随笔记录
    文件分割
    三级数据显示
    数据库锁表查询及解除方法
    list分页
    JXLS模板导出多个sheet文件
  • 原文地址:https://www.cnblogs.com/lightluomeng/p/4738147.html
Copyright © 2011-2022 走看看