前言
这几天在研究DDD和CQRS。快把我绕晕了。发现国外的好文质量还是挺高的。之所以先体验CQRSlite这个小框架,是因为看了一位大神写的文章:https://www.codeproject.com/articles/991648/cqrs-a-cross-examination-of-how-it-works 。于是乎,下载框架体验一下。
什么是CQRS?
Command Query Responsibility Segregation 的简称。翻译过来就是命令查询职责分离模式。在具体的也就不由我这个小菜鸟去阐述了。根据我的理解,在项目中,我们通常做一些数据保存的工作,但是数据查询的时候可能需要联合查询多张表,为了优化查询速度会通过一些冗余字段或者缓存在或者其他方式去优化。而查询的一个流程基本和添加修改删除没有太大的关系。在平日的开发过程中,基本上,查询和增删改都放在同一个服务中。而CQRS要做的就是让他们分离。命令是命令,查询是查询。而他们之间是如何交互的呢,这就要用到 Pub/Sub 机制了。(不过框架里的实现貌似是通过反射注册一些Handler实现的)下面这个图或许可以帮你理解一下。(图片来源于上文的文章中)
CQRSlite
其他的就不瞎扯了,我也只是通过这个轻量级框架去理解CQRS的实现。所以下面就简单介绍这个框架以及我的学习过程。
首先看一下自带Demo,Demo很简单,就是一个增加,修改和展示。
是不是超简单的Demo。下面我们看一下具体代码。
首先,添加这个动作属于一个命令,那么我们就创建一个Create的命令。然后通过CommandBus发送命令。
/// <summary> /// 【创建一个新项】命令 /// </summary> public class CreateInventoryItem : ICommand { public readonly string Name; public CreateInventoryItem(Guid id, string name) { Id = id; Name = name; } public Guid Id { get; set; } public int ExpectedVersion { get; set; } }
Controller层只要负责发送命令即可
[HttpPost] public async Task<ActionResult> Add(string name, CancellationToken cancellationToken) { await _commandSender.Send(new CreateInventoryItem(Guid.NewGuid(), name), cancellationToken); return RedirectToAction("Index"); }
当 CreateInventoryItem 这个命令发送出去之后,框架就去找匹配的命令处理器。代码如下:
public class InventoryCommandHandlers : ICommandHandler<CreateInventoryItem> { private readonly ISession _session; public InventoryCommandHandlers(ISession session) { _session = session; } public async Task Handle(CreateInventoryItem message) { var item = new InventoryItem(message.Id, message.Name); await _session.Add(item); await _session.Commit(); }
然后通过Handle方法去处理这个命令。可以看到,Handle中调用了session.Add 和Commit方法。
Add方法,就是将这个Aggregate添加到内存缓存中。用于后期版本追踪。
_trackedAggregates = new Dictionary<Guid, AggregateDescriptor>();
然后,Commit方法又调用了CacheRepository,CacheRepository又调用了EventStore的Save方法,看到EventStore这个词就要提起EventSourcing。其实我理解的事件朔源就是说,通过一定顺序的事件序列可以重新得到当前聚合状态。上文中的CacheRepository和EventStore都是CQRSlite框架中的实现。
//Session.cs
public async Task Commit(CancellationToken cancellationToken = default(CancellationToken)) { var tasks = new Task[_trackedAggregates.Count]; var i = 0; foreach (var descriptor in _trackedAggregates.Values) { //这个_repository 是cacheRepository tasks[i] = _repository.Save(descriptor.Aggregate, descriptor.Version, cancellationToken); i++; } await Task.WhenAll(tasks).ConfigureAwait(false); _trackedAggregates.Clear(); }
//CacheRepository.cs
public async Task Save<T>(T aggregate, int? expectedVersion = null, CancellationToken cancellationToken = default(CancellationToken)) where T : AggregateRoot { var @lock = _locks.GetOrAdd(aggregate.Id, CreateLock); await @lock.WaitAsync(cancellationToken).ConfigureAwait(false); try { if (aggregate.Id != Guid.Empty && !await _cache.IsTracked(aggregate.Id).ConfigureAwait(false)) { await _cache.Set(aggregate.Id, aggregate).ConfigureAwait(false); } //这里的_repository是Domain.Repository await _repository.Save(aggregate, expectedVersion, cancellationToken).ConfigureAwait(false); } catch (Exception) { await _cache.Remove(aggregate.Id).ConfigureAwait(false); throw; } finally { @lock.Release(); } }
//Repository.cs
public async Task Save<T>(T aggregate, int? expectedVersion = null, CancellationToken cancellationToken = default(CancellationToken)) where T : AggregateRoot { if (expectedVersion != null && (await _eventStore.Get(aggregate.Id, expectedVersion.Value, cancellationToken).ConfigureAwait(false)).Any()) { throw new ConcurrencyException(aggregate.Id); } var changes = aggregate.FlushUncommitedChanges(); //最后调用EventStore的Save方法。也就是只存储事件 await _eventStore.Save(changes, cancellationToken).ConfigureAwait(false); if (_publisher != null) { foreach (var @event in changes) { await _publisher.Publish(@event, cancellationToken).ConfigureAwait(false); } } }
//实现IEventStore接口的自定义EventStore
public async Task Save(IEnumerable<IEvent> events, CancellationToken cancellationToken = default(CancellationToken)) { foreach (var @event in events) { _inMemoryDb.TryGetValue(@event.Id, out var list); if (list == null) { list = new List<IEvent>(); _inMemoryDb.Add(@event.Id, list); } list.Add(@event); //调用事件发布 await _publisher.Publish(@event, cancellationToken); } }
在当前的这个例子中,事件是Created
public class InventoryItemCreated : IEvent { public readonly string Name; public InventoryItemCreated(Guid id, string name) { Id = id; Name = name; } public Guid Id { get; set; } public int Version { get; set; } public DateTimeOffset TimeStamp { get; set; } }
最后呢,View层接收到事件,进行处理就OK了。
public class InventoryItemDetailView : ICancellableEventHandler<InventoryItemCreated>, ICancellableEventHandler<InventoryItemDeactivated>, ICancellableEventHandler<InventoryItemRenamed>, ICancellableEventHandler<ItemsRemovedFromInventory>, ICancellableEventHandler<ItemsCheckedInToInventory> { public Task Handle(InventoryItemCreated message, CancellationToken token) { InMemoryDatabase.Details.Add(message.Id, new InventoryItemDetailsDto(message.Id, message.Name, 0, message.Version)); return Task.CompletedTask; }
相信小伙伴们读到这里还是一脸懵逼。没关系,上文中的简化版流程如下:
由于这里是同步的,所以在视图上展示是没有什么问题的,但是真正使用的时候大部分视图展示可能由于异步处理事件更新View,所以展示上会有延迟。CQRS和ES使用上还是和普通的服务开发有些区别的。不过作为入门,我能学到的目前就这么多,里面肯定还有更大的空间去发掘。
总结
本文不是一个CQRS的介绍,也不是一篇科普文章,只是一个小菜鸟的学习过程。有错误之处在所难免。