zoukankan      html  css  js  c++  java
  • 数据迁移最快方式,多线程并行执行 Sql插入

    前言:

      由于系统升级,新开发的系统对数据验证,及数据关联做了很多优化,现需要将原历史版本的数据迁移到新系统中;原数据库大约有 1千多万数据,大约 50个表。

      历史数据库命名为:A。 新系统库暂命名为 :B;

      使用  .net 4.5 控制台程序 + EF + MSSQL 数据库,由于有业务逻辑及时序处理,故只能按时序从单表一条条的写入到新库中;

    化化过程:

      1、EF 如果使用多线程会出现 Sql 连接超过,或是连接不上数据库;

      2、EF 优化连接 自定义 SqlConnection,并传到入 多线程中,解决连接不上数据库的问题减少数据库连接数,但由于 EF 在 SaveChangesAsync的时候做了事务提交,但事务是不支持并行操作,故会出现异常;

      3、EF 优化事务,关闭EF默认事务  DbContextConfiguration.EnsureTransactionsForFunctionsAndCommands = false; 这里有个坑  关闭事务对  SaveChangesAsync 无效,问题依然存在;

      4、找了很多资料总算找到可以通过  ExecuteSqlCommandAsync 执行 Sql 语句,可以关闭事务;

      5、优化成执行Sql 语句:await db.Database.ExecuteSqlCommandAsync(TransactionalBehavior.DoNotEnsureTransaction, sql, SqlParameters[]);

      经过以上优化处理后,就开始写代码:

    一、关键的异步锁程序:

        /// <summary>
        /// 提供异步锁
        /// </summary>
        class AsyncRoot : IDisposable
        {
            /// <summary>
            /// 信号量
            /// </summary>
            private readonly SemaphoreSlim semaphoreSlim;
    
            /// <summary>
            /// 异步锁
            /// </summary>
            public AsyncRoot()
                : this(1)
            {
            }
    
            /// <summary>
            /// 异步锁
            /// </summary>
            /// <param name="concurrent">允许并行的线程数</param>
            public AsyncRoot(int concurrent)
            {
                this.semaphoreSlim = new SemaphoreSlim(concurrent, concurrent);
            }
    
            /// <summary>
            /// 锁住代码块
            /// using( asyncRoot.Lock() ){ }
            /// </summary>
            /// <returns></returns>
            public IDisposable Lock()
            {
                this.semaphoreSlim.Wait();
                return new UnLocker(this.semaphoreSlim);
            }
    
            /// <summary>
            /// 锁住代码块
            /// using( await asyncRoot.LockAsync() ){ }
            /// </summary>
            /// <returns></returns>
            public async Task<IDisposable> LockAsync()
            {
                await this.semaphoreSlim.WaitAsync().ConfigureAwait(false);
                return new UnLocker(this.semaphoreSlim);
            }
    
            /// <summary>
            /// 释放资源
            /// </summary>
            public void Dispose()
            {
                this.semaphoreSlim.Dispose();
            }
    
            /// <summary>
            /// 提供解锁
            /// </summary>
            class UnLocker : IDisposable
            {
                /// <summary>
                /// 信号量
                /// </summary>
                private readonly SemaphoreSlim semaphoreSlim;
    
                /// <summary>
                /// 解锁
                /// </summary>
                /// <param name="semaphoreSlim">信号量</param>
                public UnLocker(SemaphoreSlim semaphoreSlim)
                {
                    this.semaphoreSlim = semaphoreSlim;
                }
    
                /// <summary>
                /// 释放锁
                /// </summary>
                public void Dispose()
                {
                    this.semaphoreSlim.Release();
                }
            }
        }
    多线层异常锁

    二、对数据插入到数据库:

     逻辑分析:对传入的 数据集合,拆分为单个实体操作任务,每个任务使用同一个连接独立的数据库上下文,对实体反射为 Sql 语句(其中增加主键,表名、字段名、值的判断验证),

    然后通过 ExecuteSqlCommandAsync 不使用事务的方式执行 Sql 语句;具体代码见下:

    //表示最大线程数
    private readonly AsyncRoot root = new AsyncRoot(50);

        /// <summary>
        /// 多线程工作
        /// </summary>
        public class Workers
        {
            /// <summary>
            /// 多线程锁
            /// </summary>
            private readonly AsyncRoot root = new AsyncRoot(50);
    
            /// <summary>
            /// 执行对象操作
            /// </summary>
            /// <param name="datas"></param>
            /// <returns></returns>
            public async Task RunAsync<T>(IEnumerable<T> datas) where T : class
            {
                //创建 Sql 连接
                var connection = new SqlConnection(System.Configuration.ConfigurationManager.ConnectionStrings["SqlDb"].ConnectionString);
                await connection.OpenAsync();
                var tasks = datas.Select(item => SaveToDbAsync(item, connection));
                await Task.WhenAll(tasks);
            }
    
            /// <summary>
            /// 单条记录保存到数据库
            /// </summary>
            /// <typeparam name="T"></typeparam>
            /// <param name="data"></param>
            /// <param name="connection"></param>
            /// <returns></returns>
            private async Task SaveToDbAsync<T>(T data, DbConnection connection) where T : class
            {
                using (await root.LockAsync())
                {
                    using (var db = new SqlDb(connection))
                    {
                        try
                        {
                            var dbset = db.Set<T>();
                            var tType = typeof(T);
                            var tableName = tType.Name;
                            //获取  TableAttribute 数据库中的表名
                            var tableAtt = Attribute.GetCustomAttribute(tType, typeof(TableAttribute)) as TableAttribute;
                            if (tableAtt != null)
                            {
                                tableName = tableAtt.Name;
                            }
    
                            var sbSql = new StringBuilder();
    
                            sbSql.AppendLine("insert into " + tableName + " (");
                            var plist = new List<string>();
                            var fieldParameters = new List<SqlParameter>();
                            var keyFiled = "ID";
                            foreach (var p in typeof(T).GetProperties())
                            {
                                var pName = p.Name.ToUpper();
                                //获取  ColumnAttribute 数据库中的列名
                                var pAtt = Attribute.GetCustomAttribute(p, typeof(ColumnAttribute)) as ColumnAttribute;
                                if (pAtt != null)
                                {
                                    pName = pAtt.Name.ToUpper();
                                }
    
                                var keyAtt = Attribute.GetCustomAttribute(p, typeof(KeyAttribute)) as KeyAttribute;
                                if (keyAtt != null || p.Name.Equals("ID", StringComparison.OrdinalIgnoreCase))
                                {
                                    keyFiled = pName;
                                }
    
                                var fieldParameter = "@" + pName;
                                //过滤不插入数据库中的字段
                                var mapAtt = Attribute.GetCustomAttribute(p, typeof(NotMappedAttribute));
                                if (mapAtt == null)
                                {
                                    var value = p.GetValue(data, null);
                                    //如果属性值为 Null,不插入数据库
                                    if (value != null)
                                    {
                                        plist.Add(fieldParameter);
                                        fieldParameters.Add(new SqlParameter(fieldParameter, value));
                                    }
                                }
                            }
                            sbSql.Append(string.Join(",", plist.Select(item => item.Replace("@", ""))));
                            sbSql.Append(")values(");
                            sbSql.Append(string.Join(",", plist));
                            sbSql.Append(")");
                            //判断主键是否已经存在,存在就不插入数据
                            var ifSql = "if not exists(select 1 from [" + tableName + "] where " + keyFiled + " = @" + keyFiled + ")";
    
                            var sql = ifSql + sbSql.ToString();
                            await db.Database.ExecuteSqlCommandAsync(TransactionalBehavior.DoNotEnsureTransaction, sql, fieldParameters.ToArray());
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine(ex);
                        }
                    }
                }
            }
        }
    多线程及对象生成 Sql插入数据库
        /// <summary>
        /// Sql数据库
        /// </summary>
        public class SqlDb : DbContext
        {
            /// <summary>
            /// 自定义连接
            /// </summary>
            /// <param name="connection">数据库连接</param>
            public SqlDb(DbConnection connection) :
                base(connection, false)
            {
                if (connection.State != System.Data.ConnectionState.Open)
                {
                    connection.Open();
                }
    
                this.Database.CommandTimeout = 60 * 1000;
                this.Configuration.UseDatabaseNullSemantics = true;
                this.Configuration.EnsureTransactionsForFunctionsAndCommands = false;
                this.Configuration.ValidateOnSaveEnabled = false;
            }
        }
    数据库上下文

    三、注意事项:

      1、如果字段为 geography (地理位置) 类型,会出现异常,希望在使用的时候注意一下;

      2、由于集合为同一个对象,故在每次反射的对象几乎都是重复操作,可以根据实际情况增加缓存;

    其它:

    多线程并行操作小实例源码:https://github.com/intotf/netExample/tree/master/Tool/MultiTaskAsync

      

  • 相关阅读:
    【Ubuntu】Ubuntu使用root登录
    【Ubuntu】在Ubuntu 12.04 LTS上安装JDK6
    2012/12/18 水曜日 感怀
    【Ubuntu】Ubuntu Java aptget安装配置
    GRUB,分区,menu.ls,(hd0,1)【转载】
    【ExtJS】错误:this.config[...].width为空或不是对象
    【Ubuntu】Ubuntu常用文件操作命令
    Win7 64bit OS 安装64bit JDK后 不能安装Spket IDE
    IE中控制焦点(asp.net)
    mac os里各种启动参数的含义
  • 原文地址:https://www.cnblogs.com/intotf/p/11133759.html
Copyright © 2011-2022 走看看