zoukankan      html  css  js  c++  java
  • EF6的多线程与分库架构设计实现

    1.项目背景 

    这里简单介绍一下项目需求背景,之前公司的项目基于EF++Repository+UnitOfWork的框架设计的,其中涉及到的技术有RabbitMq消息队列,Autofac依赖注入等常用的.net插件。由于公司的发展,业务不断更新,变得复杂起来,对于数据的实时性、存储容量要求也提高了一个新的高度。数据库上下文DbContext设计的是单例模式,基本上告别了多线程高并发的数据读写能力,看了园子里很多大神的博客,均为找到适合自己当前需求的DbContext的管理方式。总结目前主要的管理方式:

    1)DbContext单例模式(长连接)。即公司之前的设计。很明显,这种设计方式无法支持多线程同步数据操作。报各种错误,最常见的,比如:集合已修改,无法进行枚举操作。---弃用

    2)Using模式(短连接)。这种模式适合一些对于外键,导航属性不经常使用的场合,由于导航属性是放在上下文缓存中的,一旦上下文释放掉,导航属性就为null。当然,也尝试了其他大神的做法,比如,在上下文释放之前转换为        ToList或者使用饥饿加载的方式(ps:这种方式很不灵活,你总不可能遇到一个类类型就去利用反射加载找到它具有的导航属性吧或者直接InCluding),这些方法依旧没有办法解决目前的困境。也尝试这直接赋值给一个定义的同类      型的变量,但是对于这种带导航的导航的复杂类的深拷贝,没有找到合适的路子,有知道的可以告诉我,非常感谢!

    以上两种方式及网上寻找的其他方式都没有解决我的问题。这里先上一下之前的Repository:

     1 using System.Data.Entity;
     2 using System.Data.Entity.Validation;
     3 
     4 namespace MM.Data.Library.Entityframework
     5 {
     6     public class EntityFrameworkRepositoryContext : RepositoryContext, IEntityFrameworkRepositoryContext
     7     {
     8         protected DbContext container;
     9 
    10         public EntityFrameworkRepositoryContext(DbContext container)
    11         {
    12             this.container = container;
    13         }
    14 
    15         public override void RegisterNew<TAggregateRoot>(TAggregateRoot obj)
    16         {            
    17             this.container.Set<TAggregateRoot>().Add(obj);
    18             this.IsCommit = false;
    19         }
    20 
    21         public override void RegisterModified<TAggregateRoot>(TAggregateRoot obj)
    22         {
    23             if (this.container.Entry<TAggregateRoot>(obj).State == EntityState.Detached)
    24             {
    25                 this.container.Set<TAggregateRoot>().Attach(obj);
    26             }
    27             this.container.Entry<TAggregateRoot>(obj).State = EntityState.Modified;
    28             this.IsCommit = false;
    29         }
    30 
    31         public override void RegisterDeleted<TAggregateRoot>(TAggregateRoot obj)
    32         {
    33             this.container.Set<TAggregateRoot>().Remove(obj);
    34             this.IsCommit = false;
    35         }
    36 
    37         public override void Rollback()
    38         {
    39             this.IsCommit = false;
    40         }
    41 
    42         protected override void DoCommit()
    43         {
    44             if (!IsCommit)
    45             {
    46                 //var count = container.SaveChanges();
    47                 //IsCommit = true;
    48                 try
    49                 {
    50                     var count = container.SaveChanges();
    51                     IsCommit = true;
    52                 }
    53                 catch (DbEntityValidationException dbEx)
    54                 {
    55                     foreach (var validationErrors in dbEx.EntityValidationErrors)
    56                     {
    57                         foreach (var validationError in validationErrors.ValidationErrors)
    58                         { 
    59                         }
    60                     }
    61                     IsCommit = false;
    62                 }
    63             }
    64         }
    65 
    66         public System.Data.Entity.DbContext DbContext
    67         {
    68             get { return container; }
    69         }
    70 
    71         public override void Dispose()
    72         {
    73             if (container != null)
    74                 container.Dispose();
    75         }
    76     }
    77 }
    View Code

    2.设计思路及方法

     从上下文的单例模式来看,所要解决的问题无非就是在多线程对数据库写操作上面。只要在这上面做手脚,问题应该就能引刃而解。我的想法是将所有的要修改的数据分别放入UpdateList,InsertList,DeleteList三个集合中去,然后提交到数据库保存。至于DbContext的管理,通过一个数据库工厂获取,保证每一个数据库的连接都是唯一的,不重复的(防止发生类似这种错误:正在创建模型,此时不可使用上下文。),用的时候直接去Factory拿。等到数据库提交成功后,清空集合数据。看起来,实现起来很容易,但是因为还涉及到其他技术,比如Redis。所以实现过程费劲。也许我的能力还差很多。总之,废话不多说,直接上部分实现代码:

    数据库上下文建立工厂:

     1     /// <summary>
     2     /// 数据库建立工厂
     3     /// Modify By:
     4     /// Modify Date:
     5     /// Modify Reason:
     6     /// </summary>
     7     public sealed class DbFactory
     8     {
     9         public static IDbContext GetCurrentDbContext(string connectstring,string threadName)
    10         {
    11             lock (threadName)
    12             {
    13                 //CallContext:是线程内部唯一的独用的数据槽(一块内存空间)  
    14                 //传递Context进去获取实例的信息,在这里进行强制转换。  
    15                 var Context = CallContext.GetData("Context") as IDbContext;
    16 
    17                 if (Context == null)  //线程在内存中没有此上下文  
    18                 {
    19                     var Scope = UnitoonIotContainer.Container.BeginLifetimeScope();
    20                     //如果不存在上下文 创建一个(自定义)EF上下文  并且放在数据内存中去  
    21                     Context = Scope.Resolve<IDbContext>(new NamedParameter("connectionString", connectstring));
    22                     CallContext.SetData("Context", Context);
    23                 }
    24                 else
    25                 {
    26 
    27                     if (!Context.ConnectionString.Equals(connectstring))
    28                     {
    29                         var Scope = UnitoonIotContainer.Container.BeginLifetimeScope();
    30                         //如果不存在上下文 创建一个(自定义)EF上下文  并且放在数据内存中去  
    31                         Context = Scope.Resolve<IDbContext>(new NamedParameter("connectionString", connectstring));
    32                         CallContext.SetData("Context", Context);
    33                     }
    34                 }
    35                 return Context;
    36             }
    37         }
    38 
    39     }
    View Code

    Repository:

      1     public class RepositoryBase<T, TContext> : IRepositoryBase<T> where T : BaseEntity
      2 
      3         where TContext : ContextBase, IDbContext, IDisposable, new()
      4     {
      5         public List<T> InsertList { get; set; }
      6         public List<T> DeleteList { get; set; }
      7         public List<T> UpdateList { get; set; }
      8 
      9         #region field
     10 
     11         protected readonly string Connectstring;
     12         ///// <summary>
     13         ///// </summary>
     14         //protected static IDbContext Context;
     15         protected  IDbContext dbContext;
     16         private static readonly ILifetimeScope Scope;
     17         public static int xcount = 0;
     18         /////// <summary>
     19         /////// </summary>
     20         //protected readonly DbSet<T> Dbset;
     21 
     22         #endregion
     23 
     24         #region ctor
     25 
     26         static RepositoryBase()
     27         {
     28             Scope = UnitoonIotContainer.Container.BeginLifetimeScope();
     29         }
     30 
     31         /// <summary>
     32         ///     使用默认连接字符串name=connName
     33         /// </summary>
     34         public RepositoryBase() : this("")
     35         {
     36         }
     37 
     38         /// <summary>
     39         /// 构造函数
     40         /// </summary>
     41         /// <param name="connectionString">连接字符串</param>
     42         public RepositoryBase(string connectionString)
     43         {
     44             InsertList = new List<T>();
     45             DeleteList = new List<T>();
     46             UpdateList = new List<T>();
     47 
     48 
     49             //*****做以下调整,初始化,建立所有数据库连接,保持长连接状态,在用的时候去判断使用连接
     50 
     51 
     52             //todo 待处理
     53 
     54 
     55             if (string.IsNullOrWhiteSpace(connectionString))
     56             {
     57                 var name = DataBase.GetConnectionString(Activator.CreateInstance<T>().DbType);
     58                 //Context= ContextHelper.GetDbContext(Activator.CreateInstance<T>().DbType);
     59                 connectionString = name;
     60             }
     61             Connectstring = connectionString;
     62             
     63            // Context = Scope.Resolve<IDbContext>(new NamedParameter("connectionString", Connectstring));
     64 
     65             //Context = new TContext { ConnectionString = connectionString };
     66 
     67 
     68             //  Dbset = Context.Set<T>();
     69 
     70             //var loggerFactory = ((DbContext)Context).GetService<ILoggerFactory>();
     71             //loggerFactory.AddProvider(new DbLoggerProvider(Console.WriteLine));
     72             //loggerFactory.AddConsole(minLevel: LogLevel.Warning);
     73         }
     74 
     75         //public RepositoryBase(TContext context)
     76         //{
     77         //    Context = context;
     78         //    Dbset = context.Set<T>();
     79         //}
     80 
     81         #endregion
     82 
     83         #region Method
     84 
     85         //public virtual IDbContext GetDbContext(ILifetimeScope scope)
     86         //{
     87 
     88         //}
     89 
     90         #region Check Model
     91 
     92         /// <summary>
     93         ///     校正Model
     94         /// </summary>
     95         protected virtual void ValidModel()
     96         {
     97         }
     98 
     99         #endregion
    100 
    101         #region Update
    102 
    103         public virtual void Update(T entity)
    104         {
    105             Check.NotNull(entity, "entity");
    106             UpdateList.Add(entity);
    107             //context.Set<T>().Update(entity);
    108         }
    109 
    110         public virtual void Update(IEnumerable<T> entities)
    111         {
    112             Check.NotNull(entities, "entities");
    113             UpdateList.AddRange(entities);
    114         }
    115 
    116         #endregion
    117 
    118         #region PageList
    119 
    120         public virtual IEnumerable<T> GetPageList(Expression<Func<T, bool>> where, Expression<Func<T, object>> orderBy,
    121             int pageIndex, int pageSize)
    122         {
    123            //todo
    124         }
    125 
    126         #endregion
    127 
    128         #region Insert
    129 
    130         public virtual void Add(T entity)
    131         {
    132             Check.NotNull(entity, "entity");
    133             //排除已经存在的项(对于多线程没有任何用处)
    134             if (!InsertList.Exists(e => e.Equals(entity)))
    135             {
    136                 InsertList.Add(entity);
    137             }
    138 
    139         }
    140 
    141         public virtual void Add(IEnumerable<T> entities)
    142         {
    143             Check.NotNull(entities, "entities");
    144             InsertList.AddRange(entities);
    145         }
    146 
    147         public void BulkInsert(IEnumerable<T> entities)
    148         {
    149             Check.NotNull(entities, "entities");
    150             InsertList.AddRange(entities);
    151         }
    152 
    153         #endregion
    154 
    155         #region Delete
    156 
    157         public virtual void Delete(int id)
    158         {
    159             var entity = GetById(id);
    160             Delete(entity);
    161             // throw new NotImplementedException("Delete(int id)");
    162         }
    163 
    164         public virtual void Delete(string id)
    165         {
    166             throw new NotImplementedException("Delete(string id)");
    167         }
    168 
    169         public virtual void Delete(T entity)
    170         {
    171             Check.NotNull(entity, "entity");
    172             DeleteList.Add(entity);
    173         }
    174 
    175         public virtual void Delete(IEnumerable<T> entities)
    176         {
    177             Check.NotNull(entities, "entities");
    178             foreach (var x1 in DeleteList)
    179             {
    180                 DeleteList.Add(x1);
    181             }
    182         }
    183 
    184         public virtual void Delete(Expression<Func<T, bool>> where)
    185         {
    186             var list = DeleteList.Where(where.Compile());
    187             Delete(list);
    188         }
    189 
    190         #endregion
    191 
    192         #region Commit
    193 
    194         public int Commit()
    195         {
    196             ValidModel();
    197             //var x = Activator.CreateInstance<T>();
    198             //Context = ContextHelper.GetDbContext(x.DbType);
    199             //using (var scope = UnitoonIotContainer.Container.BeginLifetimeScope())
    200             //{
    201             // var context = scope.Resolve<IDbContext>(new NamedParameter("connectionString", Connectstring));
    202 
    203             //var loggerFactory = Activator.CreateInstance<ILoggerFactory>();// ((DbContext)context).GetService<ILoggerFactory>();
    204             //loggerFactory.AddProvider(new DbLoggerProvider(Console.WriteLine));
    205             dbContext = DbFactory.GetCurrentDbContext(Connectstring, Thread.CurrentThread.Name);
    206             var dbset = dbContext.Set<T>();
    207                 if (InsertList != null && InsertList.Any())
    208                 {
    209                     List<T> InsertNewList = InsertList.Distinct(new PropertyComparer<T>("Id")).ToList();//按照特定规则排除重复项
    210                     dbset.AddRange(InsertNewList);
    211                 }
    212 
    213                 if (DeleteList != null && DeleteList.Any())
    214                     DeleteList.ForEach(t =>
    215                     {
    216                        // Context.Entry(t).State = EntityState.Detached;//将之前上下文跟踪的状态丢弃
    217                         //dbContext.Entry(t).State = EntityState.Detached;//将之前上下文跟踪的状态丢弃
    218                         dbset.Attach(t);
    219                         dbset.Remove(t);
    220                     });
    221                 if (UpdateList != null && UpdateList.Any())
    222                 {
    223                     List<T> UpdateNewList = UpdateList.Distinct(new PropertyComparer<T>("Id")).ToList();//按照特定规则排除重复项
    224                     UpdateNewList.ForEach(t =>
    225                     {
    226                         //Context.Entry(t).State = EntityState.Detached;//将之前上下文跟踪的状态丢弃
    227                        // dbContext.Entry(t).State = EntityState.Detached;//将之前上下文跟踪的状态丢弃
    228                         dbContext.Entry(t).State = EntityState.Modified;
    229                     });//.UpdateRange(UpdateNewList);
    230                 }
    231                 var result = 0;
    232                 try
    233                 {
    234                     result = dbContext.SaveChanges();
    235                 }
    236                 catch (Exception ex)
    237                 {
    238 
    239                     //  throw;
    240                 }
    241 
    242                 if (InsertList != null && InsertList.Any())
    243                     InsertList.Clear();
    244                 if (DeleteList != null && DeleteList.Any())
    245                     DeleteList.Clear();
    246                 if (UpdateList != null && UpdateList.Any())
    247                     UpdateList.Clear();
    248                 return result;
    249             //}
    250         }
    251 
    252         public async Task<int> CommitAsync()
    253         {
    254             ValidModel();
    255            //todo
    256            
    257         }
    258 
    259         #endregion
    260 
    261         #region Query
    262         public IQueryable<T> Get()
    263         {
    264             return GetAll().AsQueryable();
    265         }
    266         //public virtual T Get(Expression<Func<T, bool>> @where)
    267         //{
    268         //    using (var scope = UnitoonIotContainer.Container.BeginLifetimeScope())
    269         //    {
    270 
    271         //        var context = scope.Resolve<IDbContext>(new NamedParameter("connectionString", Connectstring));
    272         //        var dbset = context.Set<T>();
    273         //        return dbset.FirstOrDefault(where);
    274         //    }
    275         //}
    276 
    277         public virtual async Task<T> GetAsync(Expression<Func<T, bool>> @where)
    278         {
    279              //todo
    280         }
    281 
    282         public virtual T GetById(int id)
    283         {
    284             throw new NotImplementedException("GetById(int id)");
    285         }
    286 
    287         public virtual async Task<T> GetByIdAsync(int id)
    288         {
    289             throw new NotImplementedException("GetById(int id)");
    290         }
    291 
    292         public virtual T GetById(string id)
    293         {
    294             throw new NotImplementedException("GetById(int id)");
    295         }
    296 
    297         public virtual async Task<T> GetByIdAsync(string id)
    298         {
    299             throw new NotImplementedException("GetById(int id)");
    300         }
    301 
    302         public virtual T Get(Expression<Func<T, bool>> @where)//, params string[] includeProperties
    303         {
    304             //var scope = UnitoonIotContainer.Container.BeginLifetimeScope();
    305             //{
    306             //    var context = scope.Resolve<IDbContext>(new NamedParameter("connectionString", Connectstring));
    307             //Thread.Sleep(50);
    308             //lock (Context)
    309             {
    310                 dbContext= DbFactory.GetCurrentDbContext(Connectstring, Thread.CurrentThread.Name);
    311                 var dbset = dbContext.Set<T>().Where(e => !e.IsDeleted).AsQueryable();
    312                 var entity = dbset.FirstOrDefault(where);
    313                 //test
    314                 // Context.Entry(entity).State = EntityState.Detached;//将之前上下文跟踪的状态丢弃
    315                 return entity;
    316             }
    317            
    318 
    319             //}
    320         }
    321 
    322         public virtual IEnumerable<T> GetAll()
    323         {
    324             //Thread.Sleep(50);
    325             //lock (Context)
    326             {
    327                 dbContext = DbFactory.GetCurrentDbContext(Connectstring, Thread.CurrentThread.Name);
    328                 var dbset = dbContext.Set<T>().Where(e => !e.IsDeleted);
    329                 //test
    330                 //dbset.ToList().ForEach(t =>
    331                 //{
    332                 //    Context.Entry(t).State = EntityState.Detached;//将之前上下文跟踪的状态丢弃
    333                 //});
    334 
    335                 return dbset;
    336             }
    337                
    338 
    339            
    340             
    341             //var scope = UnitoonIotContainer.Container.BeginLifetimeScope();
    342 
    343             // var context = scope.Resolve<IDbContext>(new NamedParameter("connectionString", Connectstring));
    344            
    345         }
    346 
    347         public async virtual Task<IEnumerable<T>> GetAllAsync()
    348         {
    349             //todo
    350         }
    351 
    352         public virtual IEnumerable<T> GetMany(Expression<Func<T, bool>> where)
    353         {
    354             //using (var scope = UnitoonIotContainer.Container.BeginLifetimeScope())
    355             //{
    356             //    var context = scope.Resolve<IDbContext>(new NamedParameter("connectionString", Connectstring));
    357 
    358             //    var dbset = context.Set<T>();
    359             //Thread.Sleep(50);
    360             //lock (Context)
    361             {
    362                 dbContext = DbFactory.GetCurrentDbContext(Connectstring, Thread.CurrentThread.Name);
    363                 var dbset = dbContext.Set<T>().Where(e => !e.IsDeleted);
    364                 //test
    365                 //dbset.ToList().ForEach(t =>
    366                 //{
    367                 //    Context.Entry(t).State = EntityState.Detached;//将之前上下文跟踪的状态丢弃
    368                 //});
    369                 return dbset.Where(@where).ToList();
    370             }
    371           
    372             //}
    373         }
    374 
    375         public virtual async Task<IEnumerable<T>> GetManyAsync(Expression<Func<T, bool>> where)
    376         {
    377            //todo
    378         }
    379 
    380         public virtual IEnumerable<T> IncludeSubSets(params Expression<Func<T, object>>[] includeProperties)
    381         {
    382            //todo
    383         }
    384 
    385         #region  navigation
    386         /// <summary>
    387         /// 加载导航
    388         /// </summary>
    389         /// <param name="where"></param>
    390         /// <param name="includeProperties"></param>
    391         /// <returns></returns>
    392         //public virtual T Get(Expression<Func<T, bool>> @where, params Expression<Func<T, object>>[] includeProperties)
    393         //{
    394         //    using (var scope = UnitoonIotContainer.Container.BeginLifetimeScope())
    395         //    {
    396 
    397         //        var context = scope.Resolve<IDbContext>(new NamedParameter("connectionString", Connectstring));
    398         //        var dbset = context.Set<T>();
    399         //        var query = includeProperties.Aggregate<Expression<Func<T, object>>, IQueryable<T>>(dbset,
    400         //            (current, includeProperty) => current.Include(includeProperty));
    401         //        return query.FirstOrDefault(where);
    402         //    }
    403         //}
    404 
    405         //public virtual T Get(Expression<Func<T, bool>> @where)//, params string[] includeProperties
    406         //{
    407         //    //反射获取导航
    408         //    var includeProperties =
    409         //        Activator.CreateInstance<T>().GetType().GetProperties().Where(p => p.GetMethod.IsVirtual).Select(e => e.Name).ToArray();
    410 
    411         //todo
    412         //}
    413         #endregion
    414         public List<TDynamicEntity> GetDynamic<TTable, TDynamicEntity>(Expression<Func<TTable, object>> selector,
    415             Func<object, TDynamicEntity> maker) where TTable : class
    416         {
    417              //todo
    418         }
    419 
    420         public List<TDynamicEntity> GetDynamic<TTable, TDynamicEntity>(Func<TTable, object> selector,
    421             Func<object, TDynamicEntity> maker) where TTable : class
    422         {
    423            //todo
    424            
    425         }
    426 
    427         #endregion
    428 
    429         #region Count
    430 
    431         public virtual async Task<int> CountAsync()
    432         {
    433            //todo
    434         }
    435 
    436         public virtual async Task<int> CountByAsync(Expression<Func<T, bool>> where)
    437         {
    438             //todo
    439         }
    440 
    441         #endregion
    442 
    443         #region Exists
    444 
    445         public virtual bool Exists(string id)
    446         {
    447             throw new NotImplementedException();
    448         }
    449 
    450         public virtual bool Exists(int id)
    451         {
    452             throw new NotImplementedException();
    453         }
    454 
    455         public virtual async Task<bool> ExistsAsync(string id)
    456         {
    457             throw new NotImplementedException();
    458         }
    459 
    460         public virtual async Task<bool> ExistsAsync(int id)
    461         {
    462             throw new NotImplementedException();
    463         }
    464 
    465         public virtual bool Exists(Expression<Func<T, bool>> @where)
    466         {
    467             throw new NotImplementedException();
    468         }
    469 
    470         public virtual async Task<bool> ExistsAsync(Expression<Func<T, bool>> @where)
    471         {
    472             throw new NotImplementedException();
    473         }
    474 
    475         #endregion
    476 
    477 
    478         #endregion
    479     }
    View Code

    以上就是EF6的多线程与分库架构设计实现的部分相关内容。

    如果有需要全部源代码或者交流的,直接联系我QQ:694666781  或者: https://item.taobao.com/item.htm?spm=a2oq0.12575281.0.0.3d2a1debWgF2RT&ft=t&id=608713437692

    另外,该场景下,Redis相关使用方法及可能遇到的问题及解决方法我会另写一篇进行展开。如有不妥不正之处,请大神指正,谢谢!

  • 相关阅读:
    PetaPOCO 一对多 多对一 多对多
    PetaPoco使用要点
    MySQL_杭州北仓 12.3-12.7需求活动期间累计下单达到3天及以上的客户_20161212
    Python 2.7_Second_try_爬取阳光电影网_获取电影下载地址并写入文件 20161207
    Python 2.7_First_try_爬取阳光电影网_20161206
    MySQL计算销售员昨日各指标综合得分_20161206
    MySQL_关于用嵌套表计算的可以不用 20161205
    MySQL_财务统计各产品品类各城市上周收入毛利表_20161202
    借助取色工具ColorPix对Pycharm编辑器设定自己喜欢的代码颜色_20161202
    python2.7 爬虫_爬取小说盗墓笔记章节及URL并导入MySQL数据库_20161201
  • 原文地址:https://www.cnblogs.com/gbat/p/6374607.html
Copyright © 2011-2022 走看看