zoukankan      html  css  js  c++  java
  • CQRS学习——Storage实现(EF+Code First+DynamicReponsitory)[其四]

    【这里是的实现,指的是针对各个数据访问框架的一个基础实现】

    目标

    •   定义仓储/QueryEntry的基本功能
    •   实现仓储的基本功能,以利于复用
    •   实现一些常用的功能
    •   提供一些便利的功能

    目标框架

    博主使用的ORM框架是EF6.x,使用MAP来配置模型和数据库之间的映射(因为模型是定义在领域层[CQRS]的),所以不打算使用声明式的Attribute。使用code first来生成数据库。

    仓储基本功能

    使用一个泛型接口定义了一个仓储需要实现的功能:

    public interface IBasicReponsitory<T>
        {
            void Insert(T item);
            void Delete(T item);
            void Delete(Guid aggregateId);
            void Update(T category);
            T Fetch(Guid aggregateId);
            T TryFetch(Guid aggregateId);
            bool Exists(Expression<Func<T, bool>> predict);
    
            /*以下是额外的一些接口方法,待商榷*/
            IQueryable<T> Query();
            Task<T> FetchAsync(Guid id);
            Task<T> TryFetchAsync(Guid id);
            Task<IEnumerable<T>> RetriveAsync(Expression<Func<T, bool>> predict);
        }
    View Code

    以及一个QueryEntry需要实现的一些基本功能:

    public interface IQueryEntry<T> where T : IHasPrimaryKey
        {
            T TryFetch(Guid id);
            Task<T> TryFetchAsync(Guid id);
    
            bool Exsits(Guid id);
            bool Exsits(Expression<Func<T, bool>> selector);
        }
    View Code

    随着时间的推移,这个接口会发生变更,添加一些更多的功能。同时,并不要求所有的仓储或者QueryEntry继承接口,基本接口的定义和实现仅仅是为了提供便利。

    为了方便QueryEntry的实现,提供了一个抽象类:

    public abstract class ReponsitoryBasedQueryEntry<T> : IQueryEntry<T> where T : IHasPrimaryKey
        {
            public abstract IBasicReponsitory<T> BasicReponsitory { get; }
    
            public T TryFetch(Guid id)
            {
                return BasicReponsitory.TryFetch(id);
            }
    
            public Task<T> TryFetchAsync(Guid id)
            {
                return BasicReponsitory.TryFetchAsync(id);
            }
    
            public bool Exsits(Guid id)
            {
                return BasicReponsitory.Query().Any(i => i.Id == id);
            }
    
            public bool Exsits(System.Linq.Expressions.Expression<Func<T, bool>> selector)
            {
                return BasicReponsitory.Query().Any(selector);
            }
        }
    View Code

    基本实现

    public class BasicEntityFrameworkReponsitory<T> : IBasicReponsitory<T> where T : class, IHasPrimaryKey
        {
            public BasicEntityFrameworkReponsitory()
            {
                Table = StorageConfiguration.DbContext.Set<T>();
            }
    
            public DbSet<T> Table { get; private set; }
    
            public virtual void Insert(T item)
            {
                Table.Add(item);
            }
    
            public virtual void Delete(T item)
            {
                Table.Remove(item);
            }
    
            public virtual void Delete(Guid aggregateId)
            {
                var item = TryFetch(aggregateId);
                Delete(item);
            }
    
            public virtual void Update(T category)
            {
                //do nothing...
            }
    
            public T Fetch(Guid aggregateId)
            {
                var item = TryFetch(aggregateId);
                if (item == null)
                {
                    throw new AggregateRootNotFoundException(aggregateId);
                }
                return item;
            }
    
            public T TryFetch(Guid aggregateId)
            {
                var item = Query().FirstOrDefault(i => i.Id == aggregateId);
                return item;
            }
    
            public virtual IQueryable<T> Query()
            {
                return Table;
            }
    
            public async Task<T> FetchAsync(Guid id)
            {
                return await Table.FirstAsync(i => i.Id == id);
            }
    
            public async Task<T> TryFetchAsync(Guid id)
            {
                return await Table.FirstOrDefaultAsync(i => i.Id == id);
            }
    
            public bool Exists(Expression<Func<T, bool>> predict)
            {
                return Table.Any(predict);
            }
    
            public async Task<IEnumerable<T>> RetriveAsync(Expression<Func<T, bool>> predict)
            {
                return await Table.Where(predict).ToArrayAsync();
            }
        }
    View Code

    这部分代码表达了个人的几个想法:
    1.DbContext的生命周期是由Storage自行管理的。当然,可以通过一定的方式指定。

    2.提供了基础的Query()方法,并设置为虚方法。个人并不抵制使用IQueryable对象进行查询。我觉得可以把使用IQueryable对象进行查询的代码片段看作匿名方法。

     常用的功能:软删除

    这里是继承基本实现的一个实现:

    public class SoftDeleteEntityFrameworkReponsitory<T> : BasicEntityFrameworkReponsitory<T>
            where T : class, IHasPrimaryKey, ISoftDelete
        {
            public override IQueryable<T> Query()
            {
                return base.Query().Where(i => !i.IsDeleted);
            }
    
            public override void Delete(T item)
            {
                item.IsDeleted = true;
                Update(item);
            }
        }
    View Code

    这里要求仓储对应的模型实现接口ISoftDelete,为软删除提供支持:

    public interface ISoftDelete
        {
            bool IsDeleted { get; set; }
        }

    同时override了Query()方法,过滤了已删除的内容。

    常用的功能:操作跟踪

    好吧,这应该是事件溯源干的事,然而事件溯源目前太难了。原理和软删除差不多:

    /// <summary>
        /// 既然开启了跟踪,那么这条数据必然是不能硬删除的
        /// </summary>
        /// <typeparam name="T"></typeparam>
        public class TraceEnabledEntityFrameworkReponsitory<T> : SoftDeleteEntityFrameworkReponsitory<T>
            where T : class, ISoftDelete, ITrackLastModifying, IHasPrimaryKey
        {
            /// <summary>
            /// 开启跟踪时,不允许匿名操作
            /// </summary>
            [Dependency]
            public IDpfbSession Session { get; set; }
    
            public override void Update(T item)
            {
                if (!Session.UserId.HasValue)
                    throw new Exception(); //todo 提供一个明确的异常
                item.LastModifiedBy = Session.UserId.Value;
                item.LastModifiedTime = DateTime.Now;
            }
    
            public override void Insert(T item)
            {
                if (!Session.UserId.HasValue)
                    throw new Exception(); //todo 提供一个明确的异常
                item.LastModifiedBy = Session.UserId.Value;
                item.LastModifiedTime = DateTime.Now;
                base.Insert(item);
            }
        }
    View Code

    不过这个功能的侵入性很强,Storage应该无法感知“用户”这种概念才对。

    便利的功能:动态仓储(DynamicReponsitory)

    前一篇文章中说过,引入QueryEntry是为了将查询和提交分来,同时为查询操作提供更大的优化空间。在面对数据库的查询中,多表联查是非常普遍的。所以打算针对多表联查提供一个遍历的组件。同时,直接提交语句查询是和数据库相关的,所以要针对不同的数据库提供不同的DynamicReponsitory。

    这个组件解决的问题是:直接提交数据库多表联查,查询结果自动转换模型,提供分页支持。

    模型转换

    先来解决这个比较有趣的问题:将一个DataReader转换为一个值或者一个可枚举的集合。直接上实现代码:

    public class DataReaderTransfer<T> : CacheBlock<string, Func<IDataReader, T>> where T : new()
        {
            protected DataReaderTransfer()
            {
            }
    
            /// <summary>
            /// 
            /// </summary>
            /// <param name="filedsNameArray"></param>
            /// <param name="key">编译缓存所使用的key,建议使用查询字符串的hash</param>
            /// <returns></returns>
            public Func<IDataReader, T> Compile(string[] filedsNameArray, string key)
            {
                var outType = typeof (T);
                var func = ConcurrentDic.GetOrAdd(key, k =>
                {
                    var expressions = new List<Expression>();
                    //public T xxx(IDataReader reader){
                    var param = Expression.Parameter(typeof (IDataReader));
    
                    //var instance = new T();
                    var newExp = Expression.New(outType);
                    var varExp = Expression.Variable(outType, "instance");
                    var varAssExp = Expression.Assign(varExp, newExp);
                    expressions.Add(varAssExp);
    
                    var indexProp = typeof (IDataRecord).GetProperties().Last(p => p.Name == "Item"); //表示 reader[""]
                    foreach (var fieldName in filedsNameArray)
                    {
                        //if(xxx)xxx.xxx=null;else xxx.xxx = (xxx)value;
    
                        var prop = outType.GetProperty(fieldName);
                        if (prop == null)
                            continue;
                        var propExp = Expression.PropertyOrField(varExp, fieldName);
                        Expression value = Expression.MakeIndex(param, indexProp,
                            new Expression[] {Expression.Constant(fieldName)});
    
                        //处理空值
                        var defaultExp = Expression.Default(prop.PropertyType);
                        var isDbNullExp = Expression.TypeIs(value, typeof (DBNull));
    
                        //处理枚举以及可空枚举
                        if (prop.PropertyType.IsEnum ||
                            prop.PropertyType.IsGenericType && prop.PropertyType.GetGenericArguments()[0].IsEnum)
                        {
                            value = Expression.Convert(value, typeof (int));
                        }
                        var convertedExp = Expression.Convert(value, prop.PropertyType);
                        //读取到dbnull的时候,使用一个默认值
                        var condExp = Expression.IfThenElse(isDbNullExp,
                            Expression.Assign(propExp, defaultExp),
                            Expression.Assign(propExp, convertedExp));
    
                        expressions.Add(condExp);
                    }
    
                    //return instance; 
                    var retarget = Expression.Label(outType);
                    var returnExp = Expression.Return(retarget, varExp);
                    expressions.Add(returnExp);
    
                    //}
                    var relabel = Expression.Label(retarget, Expression.Default(outType));
                    expressions.Add(relabel);
    
                    var blockExp = Expression.Block(new[] {varExp}, expressions);
                    var expression = Expression.Lambda<Func<IDataReader, T>>(blockExp, param);
                    return expression.Compile();
                });
                return func;
            }
    
            public Func<IDataReader, T> Compile(IDataReader reader, string key)
            {
                var length = reader.FieldCount;
                var names = Enumerable.Range(1, length).Select(i => reader.GetName(i - 1)).ToArray();
                return Compile(names, key);
            }
    
            public static DataReaderTransfer<T> Instance = new DataReaderTransfer<T>();
    
            //基于反射的映射....
            //private static T DynamicMap<T>(IDataReader reader) where T : new()
            //{
            //    var instance = new T();
            //    var count = reader.FieldCount;
            //    while (count-- > 1)
            //    {
            //        object value = reader[count - 1];
            //        var name = reader.GetName(count - 1);
            //        var prop = typeof (T).GetProperty(name);
            //        if (prop == null)
            //        {
            //            continue;
            //        }
            //        if (value is DBNull)
            //        {
            //            value = null;
            //        }
            //        prop.SetValue(instance, value);
            //    }
            //    return instance;
            //}
        }
    View Code

    主要的思想是:在运行期间对一个特定的模型分析一次,分析构造这个模型需要如何访问DataReader,并将访问操作编译为Func<>,通过一个静态字典缓存。下一次构造的时候,直接访问静态字典的Func<>,将DataReader的行转换为模型。这个耗时,大概是硬编码转换的2倍,可以获得比反射好的性能受益。

    链式调用以及延时查询

    先来看一段调用代码:

    [TestClass]
        public class DynamicReponsitorySamples
        {
            static DynamicReponsitorySamples()
            {
                //DbContext 配置
                StorageConfiguration.Config.Use<DbContext, ObjectManageContext>(new ContainerControlledLifetimeManager());
                //无法使用.UseDbContext<ObjectManageContext>(),因为无法提供基于HTTP生命周期的管理对象
                DynamicReponsitory = new DynamicReponsitory();
            }
    
            public static DynamicReponsitory DynamicReponsitory { get; set; }
    
            [TestMethod]
            public void Query()
            {
                //直接提交一个SQL查询,并映射到实体
                var queryText =
                    "SELECT A.*,D.Name AS DepartmentName FROM [ADMIN] A LEFT JOIN [Department] D ON A.DepartmentCode = D.Code";
                var query = DynamicReponsitory.Query<AdminListItem>(queryText);
                //QueryResult对象遵循延时查询的规则,直到执行枚举才会执行查询操作
                query.Foreach(i => Trace.WriteLine(string.Format("{0}	{1}", i.UserName, i.RealName)));
            }
    
            [TestMethod]
            public void Count()
            {
                //可以直接执行一个COUNT(*)语句
                var countQueryText = "SELECT COUNT(*) FROM [ADMIN]";
                var countQuery = DynamicReponsitory.Count(countQueryText);
                Trace.WriteLine("Count:" + countQuery.Value);
                //可以提供一个SELECT * 语句
                countQueryText = "SELECT * FROM [ADMIN]";
                //但是需要将重载的第二个参数置为true
                countQuery = DynamicReponsitory.Count(countQueryText, true);
                Trace.WriteLine("Count:" + countQuery.Value);
                //可以对一个query对象执行CmountAmount()扩展方法,但是这个query对象代表的查询必须很普通
                var query = DynamicReponsitory.Query<AdminListItem>(
                    "SELECT A.*,D.Name AS DepartmentName FROM [ADMIN] A LEFT JOIN [Department] D ON A.DepartmentCode = D.Code");
                countQuery = query.CountAmount();
                //Value的值同样遵循延时查询的规则,但是重复访问会导致访问内存中缓存的数据
                Trace.WriteLine("Count:" + countQuery.Value);
                Trace.WriteLine("Count:" + countQuery.Value);
                //如果需要重新查询,可以调用Result.ReQuery()方法
                var reQuery = countQuery.ReQuery();
                Trace.WriteLine("Count:" + reQuery.Value);
            }
    
            /// <summary>
            /// 分页调用,支持分页信息和分页列表信息的无序访问
            /// </summary>
            [TestMethod]
            public void Page()
            {
                //可以对所有的query对象执行Page()扩展方法,从而进行分页
                //必须执行要求OrderBy参数的重载,否则会进行内存分页(加载所有行)
                var query = DynamicReponsitory.Query<AdminListItem>(
                    "SELECT A.*,D.Name AS DepartmentName FROM [ADMIN] A LEFT JOIN [Department] D ON A.DepartmentCode = D.Code");
                var paged = query.Page("ORDER BY DepartmentName", 1, 2);
    
                /*
                 * 以下表示支持分页信息和分页列表信息的无序访问
                 * 如果使用一条sql同时返回这些信息,必须先枚举集合才能继续访问分页信息
                 */
    
                Trace.WriteLine(string.Format("从{0}行到{1}行,在所有的{2}行中", paged.From, paged.To, paged.Amount));
                paged.Foreach(i => Trace.WriteLine(string.Format("{0}	{1}", i.UserName, i.RealName)));
                //重复访问会导致访问内存中缓存的数据
                var resultArray = paged.Take(1);
                Trace.WriteLine(string.Format("从{0}行到{1}行,在所有的{2}行中", paged.From, paged.To, paged.Amount));
                resultArray.Foreach(i => Trace.WriteLine(string.Format("{0}	{1}", i.UserName, i.RealName))); 
            }
        }
    View Code

    链式调用是指,我调用了DynamicReponsitory.Query()方法之后,可以紧接着调用Page()或者Count()方法。那么,显而易见,如果查询不是延时的,很容易导致这个问题:我把服务器上1W条数据全down下来了,然后在内存里面数数或者分页。
    为了实现延时查询的目标,引入了这几个类型:

    public class SqlQueryExpression : ICloneable
        {
            public SqlQueryExpression()
            {
                Parameters = new List<object>();
            }
    
            public SqlQueryExpression(string expressionText) : this()
            {
                ExpressionText = expressionText;
            }
    
            public string ExpressionText { get; set; }
            public IList<object> Parameters { get; private set; }
    
            public IDataReader Read(DbConnection connection)
            {
                var parameters = Parameters.ToArray();
                if (connection.State != ConnectionState.Open)
                    connection.Open();
                //查询,开启最低级别的事务隔离,防止默认事务产生争用锁
                var trans = connection.BeginTransaction(IsolationLevel.ReadUncommitted);
                var command = connection.CreateCommand();
                command.CommandType = CommandType.Text;
                command.CommandText = ExpressionText;
                command.Parameters.AddRange(parameters);
                command.Transaction = trans;
                return command.ExecuteReader(CommandBehavior.CloseConnection);
            }
    
            public virtual object Clone()
            {
                //实现拷贝接口
                var cloned = new SqlQueryExpression(ExpressionText);
                Parameters.Foreach(i =>
                {
                    var parameter = (SqlParameter) i;
                    var clonedParameter = new SqlParameter(parameter.ParameterName, parameter.Value);
                    clonedParameter.Direction = parameter.Direction;
                    cloned.Parameters.Add(clonedParameter);
                });
                return cloned;
            }
        }
    View Code
     public class SqlQueryResult
        {
            public SqlQueryExpression SqlQueryExpression { get; private set; }
            public DbConnection DbConnection { get; private set; }
            public virtual bool Enumerated { get; protected set; }
            protected IDataReader DataReader;
    
            protected void Query()
            {
                DataReader = DataReader ?? SqlQueryExpression.Read(DbConnection);
            }
    
            public SqlQueryResult(SqlQueryExpression expression, DbConnection connection)
            {
                SqlQueryExpression = expression;
                DbConnection = connection;
            }
        }
    
        /// <summary>
        /// 代表DynamicReponsitory的查询结果
        /// </summary>
        /// <typeparam name="T">代表需要构造的类型</typeparam>
        public class SqlQueryResult<T> : SqlQueryResult, IEnumerable<T> where T : new()
        {
            public SqlQueryResult(SqlQueryExpression expression, DbConnection connection)
                : base(expression, connection)
            {
    
            }
    
            public IEnumerator<T> GetEnumerator()
            {
                //对于一个Query对象,在第一次访问的时候,要求加载所有数据,防止Skip与Take导致数据丢失
                if (!Enumerated)
                {
                    Query();
                    using (DataReader)
                    {
                        Enumerated = true;
                        var uniqueKey = typeof (T).FullName + SqlQueryExpression.ExpressionText;
                        var func = DataReaderTransfer<T>.Instance.Compile(DataReader, uniqueKey);
                        while (DataReader.Read())
                        {
                            var item = func(DataReader);
                            ResultSet.Add(item);
                            //yield return item;
                        }
                    }
                }
                return ResultSet.GetEnumerator();
                //return ((IEnumerable<T>) ResultSet).GetEnumerator();
            }
    
            protected List<T> ResultSet = new List<T>();
    
            IEnumerator IEnumerable.GetEnumerator()
            {
                return GetEnumerator();
            }
    
            public SqlQueryResult<T> ReQuery()
            {
                var exp = SqlQueryExpression.Clone() as SqlQueryExpression;
                return new SqlQueryResult<T>(exp, DbConnection);
            }
        }
    View Code

    SqlQueryExpression存储了将要执行的查询,而SqlQueryResult则存储了查询返回的结果。同时,SqlQueryExpression实现了拷贝,以支持ReQuery()。
    关于具体的分页支持,实际上是使用了一个开窗函数,通过注入子查询的方式,从而支持了各种查询的分页(不奇葩的查询)。

    为了防止查询被锁住,默认开启了最低的事务隔离级别。

    ...

    【想到什么再补充】

  • 相关阅读:
    背包九讲——动态规划
    Collection、Map、数组 遍历方式
    TCP三次握手与四次挥手
    数据结构——B树、B+树
    数据结构——红黑树
    数据结构——二叉查找树、AVL树
    jquery 抽奖示例
    comebotree树
    初玩Linux部署项目
    springMvc + websocket 实现点对点 聊天通信功能
  • 原文地址:https://www.cnblogs.com/lightluomeng/p/4922671.html
Copyright © 2011-2022 走看看