zoukankan      html  css  js  c++  java
  • CQRS学习——最小单元的Cqrs(CommandEvent)[其一]


    【说明:博主采用边写边思考的方式完成这一系列的博客,所以代码以附件为准,文中代码仅为了说明。】

    结构

    在学习和实现CQRS的过程中,首要参考的项目是这个【http://www.cnblogs.com/yangecnu/p/Introduction-CQRS.html】。所以Dpfb.Cqrs中的整体结构都是参考这个例子来的,在这个基础之上添加和改进。总的来说,.Cqrs项目的整体结构如下所示:

    主要包含了(命令,事件,通信,命令处理,事件处理这几个方面)。具体的角色则如下图所示:

    通信中包含了事件总线和命令总线。由于查询入口(QueryEntry)如何处置暂时没想好,所以先放一个文件夹卖萌。

    实现

    此时的目标是实现一个最小单元的CQRS(主要是命令和事件部分)。其中命令总线和事件总线的实现比较固定,他们的职责就是为命令和事件找到对应的处理类,然后依次调用对象的接口方法,从而将一般的直接调用“打断”,同时提供各种额外操作的注入点。为了更好的控制他们如何寻找处理类,这里引入两个接口ICommandHandlerSearcher以及IEventHandlerSearcher。放在项目的Configuration名称空间下面。现在,可以为这些接口提供一个基础的实现,供测试使用。以下是接口定义:

        public interface ICommand
        {
            Guid Id { get; set; }
        }
    
        public interface IEvent
        {
            Guid Id { get; set; }
        }
    
        public interface IEventHandler<T> where T : IEvent
        {
            void Handle(T @event);
        }
    
        public interface ICommandHandler<T> where T: ICommand
        {
            void Execute(T command);
        }
    
        public interface ICommandBus
        {
            void Send<T>(T command) where T : ICommand;
        }
    
        public interface IEventBus
        {
            void Publish<T>(T @event) where T : IEvent;
        }
    
        public interface ICommandHandlerSearcher
        {
            ICommandHandler<T> Find<T>() where T : ICommand;
        }
    
        public interface IEventHandlerSearcher
        {
            IEnumerable<IEventHandler<T>> Find<T>() where T : IEvent;
        }
    接口定义

    以及通信(Buses)部分的实现:

    public class DpfbEventBus : IEventBus
        {
            void IEventBus.Publish<T>(T @event)
            {
                foreach (var handler in EventHandlerSearcher.Find<T>())
                {
                    handler.Handle(@event);
                }
            }
    
            public IEventHandlerSearcher EventHandlerSearcher { get; set; }
        }
    
    public class DpfbCommandBus : ICommandBus
        {
            void ICommandBus.Send<T>(T command)
            {
                var handler = CommandHandlerSearcher.Find<T>();
                handler.Execute(command);
            }
    
            public ICommandHandlerSearcher CommandHandlerSearcher { get; set; }
        }
    Buses实现

    其中,两个ISearcheres的实现包含了如何寻找Handlers以及到哪里寻找Handlers,这些具体的内容,.Cqrs这层其实并不关心。处于测试的目的,将这些实现代码移到Test名称空间下。并实现本程序集内的查找:

     public class TestCommandHandlerSearcher:ICommandHandlerSearcher
        {
            ICommandHandler<T> ICommandHandlerSearcher.Find<T>()
            {
                var assembly = Assembly.GetCallingAssembly();
                var declaredType = typeof (ICommandHandler<T>);
                var handlers = assembly.GetTypes()
                    .Where(i => declaredType.IsAssignableFrom(i))
                    .Select(i => Activator.CreateInstance(i))
                    .ToArray();
                if (handlers.Count() != 1)
                    throw new Exception();
                return handlers.First() as ICommandHandler<T>;
            }
        }
    
     public class TestEventHandlerSearcher : IEventHandlerSearcher
        {
            IEnumerable<IEventHandler<T>> IEventHandlerSearcher.Find<T>()
            {
                var assembly = Assembly.GetCallingAssembly();
                var declaredType = typeof (IEventHandler<T>);
                var handlers = assembly.GetTypes()
                    .Where(i => declaredType.IsAssignableFrom(i))
                    .Select(i => Activator.CreateInstance(i));
                return handlers.Cast<IEventHandler<T>>();
            }
        }
    Searchers实现

    测试

    接下来添加一些实现代码,以进行测试:

        public class TestEventTwo : IEvent
        {
            Guid IEvent.Id
            {
                get { throw new NotImplementedException(); }
                set { throw new NotImplementedException(); }
            }
        }
    
        public class TestEventOne : IEvent
        {
            Guid IEvent.Id
            {
                get { throw new NotImplementedException(); }
                set { throw new NotImplementedException(); }
            }
        }
    
        public class TestEventHandler : IEventHandler<TestEventOne>,
            IEventHandler<TestEventTwo>
        {
            void IEventHandler<TestEventOne>.Handle(TestEventOne @event)
            {
                Console.WriteLine("处理了事件eventOne");
                Configuration.EventBus.Publish(new TestEventTwo());
                Console.WriteLine("引发了事件eventOne");
            }
    
            void IEventHandler<TestEventTwo>.Handle(TestEventTwo @event)
            {
                Console.WriteLine("处理了事件eventTwo");
            }
        }
    
        public class TestCommandHandler : ICommandHandler<TestCommand>
        {
            void ICommandHandler<TestCommand>.Execute(TestCommand command)
            {
                Console.WriteLine("获取并处理消息:" + command.Message);
                var eventOne = new TestEventOne();
                Configuration.EventBus.Publish(eventOne);
            }
        }
    
        public class TestCommand : ICommand
        {
            Guid ICommand.Id
            {
                get { throw new NotImplementedException(); }
                set { throw new NotImplementedException(); }
            }
    
            public string Message { get; set; }
        }
    
        public static class Configuration
        {
            public static ICommandBus CommandBus = new DpfbCommandBus();
    
            public static IEventBus EventBus = new DpfbEventBus();
        }
    测试代码

    其中,Buses作为单例存在于一个静态类的字段中。
    最后,用一个单元测试运行:

    [TestMethod]
            public void TestMethod1()
            {
                var command = new TestCommand {Message = "国庆记得回家"};
                Configuration.CommandBus.Send(command);
            }
    UnitTest

    结果(倒序是正常的,总的来说,事件链的执行会是一个深度优先的调用):

    审计(以及Session)

    在上个测试例子中,使用了控制台输出来表明各个事件的执行顺序。在正常开发过程中,我们总不能到处输出到控制台,就算输出了也不见得有用。所以我们使用另外的方法来实现这一点。博主第一次接触审计这个概念是在接触一个名为ABP的框架的时候【此处应有链接】,里面包含了很多的信息(用户身份,性能计数,时间等...),当时非常震惊。于是就把ABP源码中的一个文件夹扒了下来自己用。ABP实现了Service方法的审计,用的是动态代理(Castle)。而Cqrs本身就留了无数的注入点,所以实现起来更加直观和方便。另外,博主同时参考了ABP的Session的实现,然后搞出一个土鳖版的Session。虽然比较无耻,但是对于以后ABP的理解应该会有帮助。

    博主当初匆匆忙忙的撸了一个CQRS的框架,只是想试水,所以没有想很多,Auditing部分真的是直接扒下来改了改,现在开始写博客,有更多的时间思考,所以打算从头开始实现一个。首先开始抽象,Auditing是一种消息,同时还要考虑它的存放问题,所以定义了以下两个接口(由于Abudting还包含了其他方面,如web,所有接口定义移到Dpfb层):

    public interface IAuditInfo
        {
    
        }
    
     public interface IAuditStorage
        {
            IEnumerable<IAuditInfo> Retrive();
        }
    Auditing接口

    顺便实现一个基于内存的存储:

     public class MemoryAuditStorage:IAuditStorage
        {
            private List<IAuditInfo> _inMemory = new List<IAuditInfo>();
    
            IEnumerable<IAuditInfo> IAuditStorage.Retrive()
            {
                return _inMemory;
            }
    
    
            void IAuditStorage.Save(IAuditInfo auditInfo)
            {
                _inMemory.Add(auditInfo);
            }
        }
    MemoryAuditStorage

    接下来实现Cqrs的命令和时间的审计对象。由于事件的调用链是棵树,这里在Dpfb层引入一些必须的数据结构(见DaaStructure名称空间)。以下是针对Cqrs的AuditInfo实现:

    public class CommandEventAuditInfo : ExtendedTreeNode<CommandEventAuditInfo>, IAuditInfo
        {
            public DateTime InvokedTime { get; set; }
    
            /// <summary>
            /// 单位为毫秒
            /// </summary>
            public int InvokingDuration { get; set; }
    
            public IDpfbSession DpfbSession { get; set; }
    
            public Type CommandEventHandlerType { get; set; }
    
            public Type CommandEventType { get; set; }
    
            public Type DeclaredHandlerType { get; set; }
    
            //todo 待扩展
            public object ThreadInfo { get; set; }
    
            public bool Stopped { get; private set; }
    
            private Stopwatch _stopwatch = new Stopwatch();
    
            public void Start()
            {
                _stopwatch.Start();
            }
    
            public void Stop()
            {
                _stopwatch.Stop();
                InvokingDuration = (int) _stopwatch.ElapsedMilliseconds;
                Stopped = true;
                Current = Parent;
            }
    
            public static ConcurrentDictionary<int, CommandEventAuditInfo> ConcurrentDic =
                new ConcurrentDictionary<int, CommandEventAuditInfo>();
    
            public override string ToString()
            {
                return PrintTree();
            }
    
            public string PrintTree()
            {
                var sb = new StringBuilder();
                var userStr = DpfbSession != null ? DpfbSession.ToString() : "匿名用户";
                var @abstract = string.Format("命令事件调用分析[{3}]:用户[{1}]引发了[{0}],总耗时[{2}]毫秒。调用链:",
                    CommandEventType.Name, userStr, InvokingDuration, InvokedTime);
                sb.Append(@abstract);
                var recursionDepth = 1;
                return PrintTree(this, ref recursionDepth, sb);
            }
    
            private string PrintTree(CommandEventAuditInfo auditInfo, ref int recursionDepth, StringBuilder sb)
            {
                sb.AppendLine();
                var span = recursionDepth == 0
                    ? ""
                    : string.Join("", new string[recursionDepth].Select(i => "         ").ToArray());
                sb.Append(span + "|---");
                sb.AppendFormat("[{2}]处理[{0}],耗时[{1}毫秒]", auditInfo.CommandEventType.Name,
                    auditInfo.InvokingDuration, auditInfo.CommandEventType.Name);
                if (auditInfo.Children.Any())
                    recursionDepth++;
                foreach (var commandEventAuditInfo in auditInfo.Children)
                {
                    PrintTree(commandEventAuditInfo, ref recursionDepth, sb);
                }
                return sb.ToString();
            }
    
            private static CommandEventAuditInfo CreateUnstoppedOnCurrentThread()
            {
                Func<int, CommandEventAuditInfo> creator = k => new CommandEventAuditInfo()
                {
                    ThreadInfo = k
                };
                var threadName = Thread.CurrentThread.ManagedThreadId;
                var auditInfo = ConcurrentDic.GetOrAdd(threadName, creator);
                if (auditInfo.Stopped)
                {
                    return ConcurrentDic.AddOrUpdate(threadName, creator, (k, nv) => creator((int) nv.ThreadInfo));
                }
                return auditInfo;
            }
    
            public static CommandEventAuditInfo StartNewForCommand<TCommand>(Type handlerType) where TCommand : ICommand
            {
                var root = CreateUnstoppedOnCurrentThread();
                root.InvokedTime = DateTime.Now;
                root.CommandEventHandlerType = handlerType;
                root.CommandEventType = typeof (TCommand);
                root.DeclaredHandlerType = typeof (ICommandHandler<TCommand>);
                Current = root;
                return root;
            }
    
            public static CommandEventAuditInfo StartNewForEvent<TEvent>(Type handlerType) where TEvent : IEvent
            {
                var auditInfo = new CommandEventAuditInfo()
                {
                    CommandEventType = typeof (TEvent),
                    CommandEventHandlerType = handlerType,
                    DeclaredHandlerType = typeof (IEventHandler<TEvent>),
                    InvokedTime = DateTime.Now
                };
                Current.Children.Add(auditInfo);
                auditInfo.Parent = Current;
                return auditInfo;
            }
    
            public static CommandEventAuditInfo Root
            {
                get { return CreateUnstoppedOnCurrentThread(); }
            }
    
            public static CommandEventAuditInfo Current { get; private set; }
        }
    AuditInfo4Cqrs

    稍作修改之后,重新运行单元测试:

    [TestMethod]
            public void TestAuditing()
            {
                var command = new TestCommand {Message = "国庆记得回家"};
                Configuration.CommandBus.Send(command);
                command = new TestCommand {Message = "国庆记得回家"};
                Configuration.CommandBus.Send(command);
                foreach (var auditInfo in Configuration.AuditStorage.Retrive())
                {
                    Console.WriteLine(auditInfo.ToString());
                }
            }
    TestAuditing

    这里是运行结果[事件调用链的关系有误,请移步第二篇查看]:

     至于Session,此时实现并不能表示其作用,所以只是定义了一个接口。

    补丁

    至此,一个可以发挥作用的Cqrs就完成了。同时也遗留下一些需要思考的问题:

    【事件的继承关系】

    在博主的目前实现中,是不考虑事件的继承关系的。

    【事件的先后顺序】

    使用Handler实现方法上的Attribute实现,这个仅仅作为辅助功能实现。逻辑上的先后顺序由事件链指定。

    【CqrsAuditng的生命周期】

    目前博主使用Unity作IoC,基于线程的生命周期管理,会存在并发问题(ASP.NET),需要继续考虑。

    【Searchers的性能优化】

    实际上调用链在编译期间就确定了,所以,可以使用ExpressionTree作缓存。

    【事件/命令异步执行的支持】

    在ASP.NET中,使用基于TPL的一套异步编程框架可以提高吞吐。这个可以大致这么理解:在web系统中,存在两种角色,请求入口(IIS)【标记为A】,以及请求处理者(实现代码)【标记为B】。我们先假定整个系统只有一个A。那么,在改进之前,情况是“A接到了一个请求告诉B,然后看着B把事情干完,再把结果告诉请求方”。改进之后的情况是“A接到了一个请求B,告诉B处理完事情之后给个反馈——A马上开始处理其他请求。”显然,在改进之前A的等待时间很长,改进之后A当了甩手掌柜(只等报告),等待时间很短。于是乎,A单位时间能的处理量就上去了,吞吐就上去了。然而,对于一次请求而言,时长取决于B,除非B的工作效率有所改进,不然并不会提升性能。

    这些是博主的个人理解,并不求多少准确,仅希望能大致描述这么一个意思。

    然而博主做了很多测试,并没有反应出这一点。详细内容,请移步这篇文章:【此处应有链接】

    【Audting的配置】

    这是ABP做的好的地方,也是工作量最大的地方,配置的优先级等。 

    此篇完成时,所使用的代码:【http://pan.baidu.com/s/1o6IeNXK

  • 相关阅读:
    hibernate_0100_HelloWorld
    MYSQL子查询的五种形式
    JSF是什么?它与Struts是什么关系?
    nop指令的作用
    htmlparser实现从网页上抓取数据(收集)
    The Struts dispatcher cannot be found. This is usually caused by using Struts tags without the associated filter. Struts tags are only usable when the
    FCKeditor 在JSP上的完全安装
    Java遍历文件夹的2种方法
    充电电池和充电时间说明
    吃知了有什么好处
  • 原文地址:https://www.cnblogs.com/lightluomeng/p/4738147.html
Copyright © 2011-2022 走看看