读写分离(CQRS)
CQRS的高大上的理论和文档一大堆,在这里就不累述了,后面的技术也是一样的,只谈我的理解,不会有很多文字描述,如果想了解更多请移步到别的文章哈。
是什么?
代码层分离数据库读写操作,搭配数据库的读写分离功能
为什么?
可以达到提高数据库执行效率,并且可以额外附带事件回溯功能
怎么做?
架构图
internal class ConnectionStrings { public string Key { get; set; } public string Value { get; set; } } internal class ConnectionStringConfig { /// <summary> /// 写库连接配置 /// </summary> public ConnectionStrings CommandDB { get; set; } /// <summary> /// 读库连接配置 /// </summary> public List<ConnectionStrings> QueryDB { get; set; } }
配置文件 ↑
分配管理↓
/// <summary> /// DC分配工厂 /// </summary> public interface IDCDistributiveFactory { /// <summary> /// 选择为读操作的库 /// </summary> string SelectQuery(string QueryKey); /// <summary> /// 选择为写操作的库 /// </summary> string SelectCommand(); } public class DCDistributiveFactory : IDCDistributiveFactory { IServiceProvider sp; ConnectionStringConfig config; public DCDistributiveFactory(IServiceProvider sp) { this.sp = sp; var _config = sp.GetService<IConfiguration>(); config = _config.GetSection("ConnectionStringConfig").Get<ConnectionStringConfig>(); } /// <summary> /// 分配为写操作 /// </summary> public string SelectCommand() { //更换连接串为写库连接串 return this.config.CommandDB.Value; } /// <summary> /// 分配为读操作 /// </summary> /// <param name="QueryKey"></param> public string SelectQuery(string QueryKey) { string result = string.Empty; //更换连接串为写库连接串 if (string.IsNullOrWhiteSpace(QueryKey)) { result = this.config.QueryDB.OrderBy(x => Guid.NewGuid()).FirstOrDefault()?.Value; } else { result = config.QueryDB.FirstOrDefault(x => x.Key.Equals(QueryKey))?.Value; } if (result == null) throw new Exception($"操作库选择错误,不存在Key为{QueryKey}的读取库"); return result; } }
应该叫 IDCDistributive 的(*^_^*) 我懒得改了,IDCDistributiveFactory 也行,只是难免会让人联想到工厂上
它负责管理数据库连接串
CQRS的标记特性和Chloe的拦截器 ↓
特性
/// <summary> /// 本次请求将会操作数据库 /// </summary> public class CommandAttribute: Attribute { } /// <summary> /// 本次访问只有读取操作 /// </summary> public class QueryAttribute:Attribute { /// <summary> /// 指定的读库Sql连接字符串 /// </summary> public string QueryKey { get; } /// <summary> /// 使用随机的KEY /// </summary> public QueryAttribute() { } /// <summary> /// 使用指定的Key /// 若不存在则会抛出异常 /// </summary> /// <param name="QueryKey">数据库配置文件中的Key</param> public QueryAttribute(string QueryKey) { this.QueryKey = QueryKey; } }
拦截器:
/// <summary> /// 写操作拦截器 /// </summary> public class CQRSDCCommandInterceptor : IDbCommandInterceptor { IMediator mediator; public CQRSDCCommandInterceptor(IMediator mediator) { this.mediator = mediator; } public void NonQueryExecuted(IDbCommand command, DbCommandInterceptionContext<int> interceptionContext) { StringBuilder executeSql =new StringBuilder(command.CommandText); foreach (MySql.Data.MySqlClient.MySqlParameter item in command.Parameters) { executeSql.Replace(item.ParameterName, item.Value.ToString()); } mediator.Publish<DCCQRSCommandEvent>(new DCCQRSCommandEvent(executeSql.ToString(), DateTime.Now)); } public void NonQueryExecuting(IDbCommand command, DbCommandInterceptionContext<int> interceptionContext) { } public void ReaderExecuted(IDbCommand command, DbCommandInterceptionContext<IDataReader> interceptionContext) { } public void ReaderExecuting(IDbCommand command, DbCommandInterceptionContext<IDataReader> interceptionContext) { } public void ScalarExecuted(IDbCommand command, DbCommandInterceptionContext<object> interceptionContext) { } public void ScalarExecuting(IDbCommand command, DbCommandInterceptionContext<object> interceptionContext) { } }
/// <summary> /// 读操作拦截器 /// </summary> public class CQRSDCQueryInterceptor : IDbCommandInterceptor { public void NonQueryExecuted(IDbCommand command, DbCommandInterceptionContext<int> interceptionContext) { } public void NonQueryExecuting(IDbCommand command, DbCommandInterceptionContext<int> interceptionContext) { command.Cancel(); throw new Exception("当前DC上下文不允许执行DB操作,但是却在方法中执行了DB操作,如要更改请在执行的控制器方法上标有CommandAttribute特性"); } public void ReaderExecuted(IDbCommand command, DbCommandInterceptionContext<IDataReader> interceptionContext) { } public void ReaderExecuting(IDbCommand command, DbCommandInterceptionContext<IDataReader> interceptionContext) { } public void ScalarExecuted(IDbCommand command, DbCommandInterceptionContext<object> interceptionContext) { } public void ScalarExecuting(IDbCommand command, DbCommandInterceptionContext<object> interceptionContext) { } }
NonQueryExecuted 非查询操作执行后
NonQueryExecuting 非查询操作执行前
不知道还有人记得我上一文说的的话嘛,不要记得哈,啪啪啪~
在写操作完成后通过mediator对象发布一个事件,告诉其他服务这里有个写事件
mediator对象是EventBus服务维护的,现在还没说到那
在这有几个问题
1、我破坏了原则,动了ORM的工作(ORM服务怒目而视>_<)
2、EventBus没有被封装,暴露了mediator对象
解释:
1、CQRS引用了ORM和EventBus服务,CQRS是他们的孩子,我想就暂时这么改吧。另一种方法可以在IDCScoped中加上IDbCommandInterceptor的对象,这样的话也不算太违背服务原则吧
2、这个纯粹是我偷懒了,后续会改好的
重点↓
/// <summary> /// 用于辨别CQRS的操作对象 /// </summary> public class ChloeCQRSMiddleware { private readonly RequestDelegate _next; public ChloeCQRSMiddleware(RequestDelegate next) { _next = next; } public Task Invoke(HttpContext httpContext) { ReadOrWrite(httpContext); // Endpoint will be null if the URL didn't match an action, e.g. a 404 response return _next(httpContext); } public static Endpoint GetEndpoint(HttpContext context) { return context.Features.Get<IEndpointFeature>()?.Endpoint; } /// <summary> /// 读写判断 /// </summary> public static void ReadOrWrite(HttpContext httpContext) { var endpoint = GetEndpoint(httpContext); var mediator = httpContext.RequestServices.GetService<IMediator>(); IDbCommandInterceptor dbCommand; string connectionStr = string.Empty; DCOperationType OperationType = DCOperationType.Query; if (endpoint != null) { var command = endpoint.Metadata.GetMetadata<CommandAttribute>(); if (command != null) { connectionStr = httpContext.RequestServices.GetService<IDCDistributiveFactory>().SelectCommand(); OperationType = DCOperationType.Command; dbCommand = new CQRSDCCommandInterceptor(mediator); } else { var query = endpoint.Metadata.GetMetadata<QueryAttribute>(); connectionStr= httpContext.RequestServices.GetService<IDCDistributiveFactory>().SelectQuery(query?.QueryKey); OperationType = DCOperationType.Query; dbCommand = new CQRSDCQueryInterceptor(); } httpContext.RequestServices.GetService<IDCScoped>().SelectOpeation(connectionStr, OperationType); var context = httpContext.RequestServices.GetService<IDbContext>(); //添加操作拦截器 context.Session.AddInterceptor(dbCommand); } } } // Extension method used to add the middleware to the HTTP request pipeline. public static class ChloeCQRSMiddlewareExtensions { public static IApplicationBuilder UseChloeCQRSMiddleware(this IApplicationBuilder builder) { return builder.UseMiddleware<ChloeCQRSMiddleware>(); } }
中间件首次出现
它辨别方法上的操作特性头并操作DC选择数据库连接串和拦截器对象
其中IEndpointFeature服务这个需要AspNetCore2.2版本上
并且在CQRS中间件上加入一层app.UseEndpointRouting();
最后
public static class Startup { public static void ConfigureServices(IServiceCollection services) { services.AddSingleton<IDCDistributiveFactory, DCDistributiveFactory>(); } public static void Configure(IApplicationBuilder app, IHostingEnvironment env) { app.UseChloeCQRSMiddleware(); } }
OK,这次我们介绍了CQRS并且实现了它,
还是有两个问题哈,以后再来改