zoukankan      html  css  js  c++  java
  • EF批量插入太慢?那是你的姿势不对

    大概所有的程序员应该都接触过批量插入的场景,我也相信任何的程序员都能写出可正常运行的批量插入的代码。但怎样实现一个高效、快速插入的批量插入功能呢?

    由于每个人的工作履历,工作年限的不同,在实现这样的一个需求时,可能技术选型各有不同,有直接生成insert语句的,有用EF的或者其他的orm框架的。其实不管是手写insert还是使用EF,最终交给数据库执行的还是insert语句。下面是EF批量插入的示例代码:

    var list = new List<Student>();
    for (int i = 0; i < 100; i++)
    {
        list.Add(new Student { CreateTime = DateTime.Now, Name = "zjjjjjj" });
    }
    
    await _context.Students.AddRangeAsync(list);
    await _context.SaveChangesAsync();
    

    生成的脚本截图如下:
    zzz

    这种实现方式在数据量100以内时,耗时还算可以。但如果要批量导入的数据达到万级的时候,那耗时简直是灾难。我测试的数据如下(测试数据库为mysql,具体配置不详):

    数据量 耗时(s)
    10 0.028
    1w 3.929
    10w 31.280

    10w的数据已经耗时超过了30s,我没有勇气测试100w数据的耗时,有兴趣的可以自行测试下。

    下面就应该进入正题了,对于较大数据量(1000以上)场景下的批量插入,各个数据库应该都提供了相关的解决方案,由于工作所限,目前笔者仅接触过mysql和mssql。

    mysql的实现方案是LOAD DATA命令,此命令接收一个csv文件,然后将文件上传到数据库服务器后,解析数据后插入。好在MySqlConnector提供了相关的封装,不用咱们去熟悉那么复杂的命令参数。

    mssql实现的方案是使用SqlBulkCopy类,不过此类仅接收DataTable类型的数据,所以,在批量插入的时候,需要将数据源转换成DataTable。

    综上所示,不管是mysql,还是mssql,均需要将数据源转换成指定的格式才可以使用批量导入的功能,所以这一块的主要核心就是转换数据源格式。mysql需要转换成csv,mssql需要转换成DataTable。下面就来一起看看具体的转换的方法。

    以下代码是转换csv和DataTable相关方法:

    namespace FL.DbBulk
    {
        public static class Extension
        {
            /// <summary>
            /// 获取实体影射的表名
            /// </summary>
            /// <param name="type"></param>
            /// <returns></returns>
            public static string GetMappingName(this System.Type type)
            {
                var key = $"batch{type.FullName}";
    
                var tableName = CacheService.Get(key);
                if (string.IsNullOrEmpty(tableName))
                {
                    var tableAttr = type.GetCustomAttribute<TableAttribute>();
                    if (tableAttr != null)
                    {
                        tableName = tableAttr.Name;
                    }
                    else
                    {
                        tableName = type.Name;
                    }
                    CacheService.Add(key, tableName);
                }
                return tableName;
            }
    
            public static List<EntityInfo> GetMappingProperties(this System.Type type)
            {
                var key = $"ICH.King.DbBulk{type.Name}";
                var list = CacheService.Get<List<EntityInfo>>(key);
                if (list == null)
                {
                    list = new List<EntityInfo>();
                    foreach (var propertyInfo in type.GetProperties())
                    {
                        if (!propertyInfo.PropertyType.IsValueType &&
                            propertyInfo.PropertyType.Name != "Nullable`1" && propertyInfo.PropertyType != typeof(string)) continue;
                        var temp = new EntityInfo();
                        temp.PropertyInfo = propertyInfo;
                        temp.FieldName = propertyInfo.Name;
                        var attr = propertyInfo.GetCustomAttribute<ColumnAttribute>();
                        if (attr != null)
                        {
                            temp.FieldName = attr.Name;
                        }
                        temp.GetMethod = propertyInfo.CreateGetter();
                        list.Add(temp);
                    }
                    CacheService.Add(key, list);
                }
    
                return list;
            }
    
            /// <summary>
            /// 创建cvs字符串
            /// </summary>
            /// <typeparam name="T"></typeparam>
            /// <param name="entities"></param>
            /// <param name="primaryKey"></param>
            /// <returns></returns>
            public static string CreateCsv<T>(this IEnumerable<T> entities, string primaryKey = "")
            {
                var sb = new StringBuilder();
                var properties = typeof(T).GetMappingProperties().ToArray();
                foreach (var entity in entities)
                {
                    for (int i = 0; i < properties.Length; i++)
                    {
                        var ele = properties[i];
                        if (i != 0) sb.Append(",");
                        var value = ele.Get(entity);
                        if (ele.PropertyInfo.PropertyType.Name == "Nullable`1")
                        {
                            if (ele.PropertyInfo.PropertyType.GenericTypeArguments[0] == typeof(DateTime))
                            {
                                if (value == null)
                                {
                                    sb.Append("NULL");
                                }
                                else
                                {
                                    sb.Append(Convert.ToDateTime(value).ToString("yyyy-MM-dd HH:mm:ss"));
                                }
                                continue;
                            }
                        }
    
                        if (ele.PropertyInfo.PropertyType == typeof(DateTime))
                        {
                            sb.Append(Convert.ToDateTime(value).ToString("yyyy-MM-dd HH:mm:ss"));
                            continue;
                        }
                        //如果是主键&&string类型,且值不为空
                        if (ele.FieldName == primaryKey && ele.PropertyInfo.PropertyType == typeof(string))
                        {
                            sb.Append(Guid.NewGuid().ToString());
                            continue;
                        }
                        if (value == null)
                        {
                            continue;
                        }
                        if (ele.PropertyInfo.PropertyType == typeof(string))
                        {
                            var vStr = value.ToString();
                            if (vStr.Contains("""))
                            {
                                vStr = vStr.Replace(""", """");
                            }
                            if (vStr.Contains(",") || vStr.Contains("
    ") || vStr.Contains("
    "))
                            {
                                vStr = $""{vStr}"";
                            }
                            sb.Append(vStr);
                        }
                        else sb.Append(value);
                    }
                    sb.Append(IsWin() ? "
    " : "
    ");
                    //sb.AppendLine();
                }
    
                return sb.ToString();
            }
    
            public static bool IsWin()
            {
                return RuntimeInformation.IsOSPlatform(OSPlatform.Windows);
            }
    
            public static string CreateCsv(this DataTable table)
            {
                StringBuilder sb = new StringBuilder();
                DataColumn colum;
                foreach (DataRow row in table.Rows)
                {
                    for (int i = 0; i < table.Columns.Count; i++)
                    {
                        colum = table.Columns[i];
                        if (i != 0) sb.Append(",");
                        if (colum.DataType == typeof(string))
                        {
                            var vStr = row[colum].ToString();
                            if (vStr.Contains("""))
                            {
                                vStr = vStr.Replace(""", """");
                            }
                            if (vStr.Contains(",") || vStr.Contains("
    ") || vStr.Contains("
    "))
                            {
                                vStr = $""{vStr}"";
                            }
                            sb.Append(vStr);
                        }
                        else sb.Append(row[colum]);
                    }
                    sb.Append(IsWin() ? "
    " : "
    ");
                }
                return sb.ToString();
            }
    
    
            public static DataTable ToDataTable<T>(this IEnumerable<T> list, string primaryKey = "")
            {
                var type = typeof(T);
                //获取实体映射的表名
                var mappingName = type.GetMappingName();
                var dt = new DataTable(mappingName);
                //获取实体映射的属性列表
                var columns = type.GetMappingProperties();
                dt.Columns.AddRange(columns.Select(x => new DataColumn(x.FieldName)).ToArray());
                foreach (var data in list)
                {
                    var row = dt.NewRow();
                    foreach (var entityInfo in columns)
                    {
                        var value = entityInfo.Get(data);
                        if (primaryKey == entityInfo.FieldName && entityInfo.PropertyInfo.PropertyType == typeof(string))
                        {
                            row[entityInfo.FieldName] = value ?? Guid.NewGuid().ToString();
                        }
                        else
                        {
                            row[entityInfo.FieldName] = value;
                        }
                    }
                    dt.Rows.Add(row);
                }
    
                return dt;
            }
    
           
        }
    }
    

    转换成DataTable方法相对简单,但这里我做了个优化下,当判断主键是string类型,且值为空时,会自动生成一个GUID,并给其赋值,这样做的目的是为了和EF原生的插入功能兼容。

    生成Csv的相对比较麻烦,因为Csv是用逗号以及其他符号来区分每一行、每一列数据,但经常会存在要插入的数据包含了csv的特殊符号,这样情况下就需要做转义。另外,还有一个需要考虑的问题,linux和windows默认的换行符是有区别的,windows的换行符为 ,而linux默认的是 ,所以在生成csv时,需要根据不同的系统进行处理。

    下面来看下具体怎么调用相关的插入方法,首先看下mysql的,主要代码如下所示:

    private async Task InsertCsvAsync(string csv, string tableName, List<string> columns)
    {
        var fileName = Path.GetTempFileName();
        await File.WriteAllTextAsync(fileName, csv);
        var conn = _context.Database.GetDbConnection() as MySqlConnection;
        var loader = new MySqlBulkLoader(conn)
        {
            FileName = fileName,
            Local = true,
            LineTerminator = Extension.IsWin() ? "
    " : "
    ",
            FieldTerminator = ",",
            TableName = tableName,
            FieldQuotationCharacter = '"',
            EscapeCharacter = '"',
            CharacterSet = "UTF8"
        };
        loader.Columns.AddRange(columns);
        await loader.LoadAsync();
    }
    

    在上述的代码中,首先创建一个临时文件,然后将其他数据源转换的csv内容写入到文件中,获取数据库连接,再然后创建MySqlBulkLoader类的实例,将相关参数进行复制后,还需要配置字段列表,最后执行LoadAsync命令。

    下面是mssql的批量插入的核心代码:

    public async Task InsertAsync(DataTable table)
    {
        if (table == null)
        {
            throw new ArgumentNullException();
        }
    
        if (string.IsNullOrEmpty(table.TableName))
        {
            throw new ArgumentNullException("DataTable的TableName属性不能为空");
        }
        var conn = (SqlConnection)_context.Database.GetDbConnection();
        await conn.OpenAsync();
        using (var bulk = new SqlBulkCopy(conn))
        {
            bulk.DestinationTableName = table.TableName;
            foreach (DataColumn column in table.Columns)
            {
                bulk.ColumnMappings.Add(column.ColumnName, column.ColumnName);
            }
            await bulk.WriteToServerAsync(table);
        }
    }
    

    以上方法相对简单,在此不做更多解释。

    至此,mysql和mssql批量的导入的方案已经介绍完毕,但可能就会有人说了,这跟EF好像也没什么关系呀。
    其实如果你有仔细看的话,或许能发现,我在代码中使用了一个名为_context字段,此字段其实就是EF的DbContext的实例。但文章内容到此时也没有完全的和EF结合,下面就来介绍下如何更优雅的将此功能集成到EF中。

    在.net core中,接入EF的时候其实已经指定了使用的数据库类型,实例代码如下:

     services.AddDbContext<MyDbContext>(opt => opt.UseMySql("server=10.0.0.146;Database=demo;Uid=root;Pwd=123456;Port=3306;AllowLoadLocalInfile=true"))
    

    既然以及指定了数据库类型,那么在调用批量插入的时候,应该就不需要让调用者判断是使用mysql的方法,还是mssql的方法。具体怎么设计呢?且耐心往下看。

    首先分别定义接口ISqlBulk,IMysqlBulk,ISqlServerBulk代码如下:

    namespace FL.DbBulk
    {
        public interface ISqlBulk
        {
            /// <summary>
            /// 批量导入数据
            /// </summary>
            /// <param name="table">数据源</param>
            void Insert(DataTable table);
            /// <summary>
            /// 批量导入数据
            /// </summary>
            /// <param name="table">数据源</param>
            Task InsertAsync(DataTable table);
    
            void Insert<T>(IEnumerable<T> enumerable) where T : class;
            Task InsertAsync<T>(IEnumerable<T> enumerable) where T : class;
        }
    }
    

    IMysqlBulk,ISqlServerBulk接口继承ISqlBulk,代码如下:

    namespace FL.DbBulk
    {
        public interface IMysqlBulk : ISqlBulk
        {
            Task InsertAsync<T>(string csvPath, string tableName = "") where T : class;
        }
    }
    
    namespace FL.DbBulk
    {
        public interface ISqlServerBulk:ISqlBulk
        {
            
        }
    }
    

    然后创建ISqlBulk实现类:

    namespace FL.DbBulk
    {
        public class SqlBulk : ISqlBulk
        {
            private ISqlBulk _bulk;
            public SqlBulk(DbContext context, IServiceProvider provider)
            {
                if (context.Database.IsMySql())
                {
                    _bulk = provider.GetService<IMysqlBulk>();
                }
                else if (context.Database.IsSqlServer())
                {
                    _bulk = provider.GetService<ISqlServerBulk>();
                }
            }
    
            public void Insert(DataTable table)
            {
                _bulk.Insert(table);
            }
    
            public async Task InsertAsync(DataTable table)
            {
                await _bulk.InsertAsync(table);
            }
    
            public void Insert<T>(IEnumerable<T> enumerable) where T : class
            {
                _bulk.Insert(enumerable);
            }
    
            public async Task InsertAsync<T>(IEnumerable<T> enumerable) where T : class
            {
                await _bulk.InsertAsync(enumerable);
            }
    
        }
    }
    

    在SqlBulk的构造函数中,通过context.Database的扩展方法判断数据库的类型,然后再获取相应的接口的实例。再然后就是实现IMysqlBulk和ISqlServerBulk的实现类。上文已经把核心代码贴出,再此为了篇幅,就不贴完整代码了。

    再然后,就是提供一个注入services的方法,代码如下:

    namespace Microsoft.Extensions.DependencyInjection
    {
        public static class ServiceCollectionExtension
        {
            public static IServiceCollection AddBatchDB<T>(this IServiceCollection services) where  T:DbContext
            {
                services.TryAddScoped<IMysqlBulk, MysqlBulk>();
                services.TryAddScoped<ISqlServerBulk, SqlServerBulk>();
                services.TryAddScoped<ISqlBulk, SqlBulk>();
                services.AddScoped<DbContext, T>();
                return services;
            }
        }
    }
    

    有了以上代码,我们就可以通过在Startup中很方便的启用批量插入的功能了。

    最后,贴出两种插入方式对比的测试数据:

    数据量 EF默认耗时(s) ISqlBulk耗时(s)
    10 0.028 0.030
    1w 3.929 1.581
    10w 31.280 15.408

    以上测试数据均是使用同一个mysql数据库,不同配置以及网络环境下,测试的数据会有差异,有兴趣的可以自己试试。

    至此,本人内容已完毕。


    最后,贴出git地址,如果思路或代码可以帮到你,欢迎点赞,点star
    https://github.com/fuluteam/FL.DbBulk.git

    福禄ICH·架构组 福尔斯
  • 相关阅读:
    中译英26
    listen 59
    Speaking 1
    listen 58
    listen 57
    中译英25
    listen 56
    2018.2.27 RF module distance test part I
    中译英24
    第二章、PyQt5应用构建详细过程介绍
  • 原文地址:https://www.cnblogs.com/fulu/p/13370335.html
Copyright © 2011-2022 走看看