GitHub上项目地址:https://github.com/shendashan/BulkCopy
最近在工作中遇到一些性能问题,在大批量的数据写入和修改数据库时太慢了,甚至会出现操作超时。
所以去网上找了下资料,找到了一些解决方案SqlBulkCopy和SqlDataAdapter(SqlDataAdapter实测了下,批量修改数据的时候速度不快,可能是我使用的姿势不对。哪位大神知道正确使用姿势,望留言指点)。下面主要介绍下SqlBulkCopy:
SqlBulkCopy
SqlBulkCopy用于做批量写入的操作,通过调用WriteToServer方法来实现批量写入的功能(WriteToServer方法的实现原理没有深入研究)。从测试结果来看,执行效率完全可以满足公司的需求。
壹.先来说下批量写入的具体实现:
1.在调用WriteToServer方法之前,需要准备一个DataTable实例,DataTable的实例中存放的是需要批量写入的数据集。
如图,先创建一个生成DataTable的方法:
第一个参数,需要批量写入的集合,这里使用的泛型集合,以方便共用。
第二个参数,SqlConnection对象,用于连接数据库。
第三个参数,被写入目标表的名字,因为是一个共用的方法,不然无法知道要向哪张表写数据。
方法内部实现:
创建一条查询的sql语句,如图:
上面图中的sql语句,使用来获取目标表的字段。通过SqlCommand的ExecuteReader方法来获取SqlDataReader的对象。然后在通过获取到的SqlDataReader对象来获取表的列名和对应的类型。
拿到列名和类型后,使用DataColumn的有参构造函数创建DataColumn的对象(如:column),将对象(column)添加到DataTable对象的Columns属性中。如图:
到这里,要写入目标表的所以列都已经添加到DataTable中了(别忘记关闭DataReader,不然运行的时候回报错哦O(∩_∩)O),下面就可以去处理要批量写入到表的集合数据了。
在处理集合数据之前,先利用反射技术获取到实体的所有属性。首先通过typeof()获取到Type对象,然后通过对象的GetProperties()来获取所有属性的数组,如图:
获取到了实体的所有属性,接下来就可以处理要写入的数据集合了。
循环数据集合,对数据进行逐条处理。在循环的内部通过循环实体属性的数组对象来获取对应的数据。在根据属性获取数据值的时候,需要进一步处理。因为实体属性的数组中很有可能包含导航属性(导航属性:ef中codefirst建表时,建立表之间关联关系的属性),如果不处理掉导航属性的话,执行语句的时候会出问题。将处理之后的属性和获取到的值添加到DataRow中。如下图:
写到这里,DataTable的对象就已经准备好了,要写人的数据也已经添加到DataTable的对象中了。
接下来就使用SqlBulkCopy来批量将数据写入到数据库中。这一步很简单,只要创建一个SqlBulkCopy的对象,告诉SqlBulkCopy被写入表的名字,然后调用WriteToServer()将准备好的DataTable对象放到方法中就OK了。如下图:
批量写入数据的方法到这里就完全结束了。最后别忘了将SqlConnection对象关闭。
具体代码如下:
a)批量插入数据代码片段
/// <summary> /// 批量插入数据 /// </summary> /// <typeparam name="TModel"></typeparam> /// <param name="modelList">数据集合</param> /// <param name="connectionString">数据库连接字符串</param> /// <param name="tableName">表名</param> public static void BulkInert<TModel>(IList<TModel> modelList, string connectionString, string tableName) { try { using (SqlConnection sqlConnect = new SqlConnection(connectionString)) { DataTable dt = ToSqlBulkCopyDataTable(modelList, sqlConnect, tableName); SqlBulkCopy sqlBulk = null; sqlBulk = new SqlBulkCopy(sqlConnect); using (sqlBulk) { sqlBulk.DestinationTableName = tableName; if (sqlConnect.State != ConnectionState.Open) { sqlConnect.Open(); } sqlBulk.WriteToServer(dt); } } } catch (Exception ex) { throw ex; } }
b)生成DataTable对象代码片段
/// <summary> /// 生成DataTable /// </summary> /// <typeparam name="TModel"></typeparam> /// <param name="modelList">数据集合</param> /// <param name="conn">SqlConnection对象</param> /// <param name="tableName">表名</param> /// <returns></returns> private static DataTable ToSqlBulkCopyDataTable<TModel>(IList<TModel> modelList, SqlConnection conn, string tableName) { DataTable dt = new DataTable(); #region 获取所有表字段 string sql = string.Format("select top 0 * from {0}", tableName); if (conn.State != ConnectionState.Open) { conn.Open(); } SqlCommand command = new SqlCommand(); command.CommandText = sql; command.CommandType = CommandType.Text; command.Connection = conn; SqlDataReader reader = command.ExecuteReader(); try { for (int i = 0; i < reader.FieldCount; i++) { var re_name = reader.GetName(i); var re_type = reader.GetFieldType(i); DataColumn column = new DataColumn(re_name, re_type); dt.Columns.Add(column); } } catch (Exception ex) { throw ex; } finally { if (reader != null) { reader.Close(); } } #endregion //获取实体 Type mType = typeof(TModel); var mType_Properts = mType.GetProperties(); foreach (var model in modelList) { DataRow dr = dt.NewRow(); foreach (var proper in mType_Properts) { string fullName = proper.PropertyType.FullName; bool isValueType = proper.PropertyType.IsValueType; bool isClass = proper.PropertyType.IsClass; bool isEnum = proper.PropertyType.IsEnum; if ((isValueType || isEnum) && !isClass) { object value = proper.GetValue(model); if (proper.PropertyType.IsEnum) { if (value != null) { value = (int)value; } } dr[proper.Name] = value ?? DBNull.Value; } else if (fullName == "System.String") { object value = proper.GetValue(model); dr[proper.Name] = value ?? DBNull.Value; } } dt.Rows.Add(dr); } return dt; }
到这里批量写入就结束了O(∩_∩)O
贰.接下来要说的就是批量修改了
为了能更快的更新数据,在批量修改的实现中还需要用到SqlBulkCopy。这里实现批量修改的操作很简单,只要在批量写入的操作之后再执行批量修改的操作就行了(方法比较笨^_^)。
下面开始正式介绍批量修改的实现:
1.首先要准备一个临时表,用于保存要修改的数据(注意:临时表的名字不可重复,不然会报错,这里要做处理,保证表名的唯一性)。
2.使用SqlBulkCopy来向临时表 中写入数据,以确保数据能快速写入到临时表中。
3.同样,使用反射技术来获取到修改的实体属性,然后根据获取到的属性来组建update语句,用update语句将临时表和被修改的目标表关联起来进线批量修改操作。
批量修改的操作就这么简单的几个步骤。需要注意的地方就是在组建update语句的时候对实体属性的处理,一定要将不属于表的导航属性去除掉。不然在执行修改的时候会报出错误“XXX列不存在”的提示。下面之间贴出代码:
a)批量修改代码片段
/// <summary> /// 批量修改数据 /// </summary> /// <param name="modelList"></param> /// <param name="connectionString">数据库连接字符串</param> /// <param name="tableName">表名</param> /// <param name="primaryKey">主键</param> /// <returns></returns> public static int BulkUpdate<TModel>(IList<TModel> modelList, string connectionString, string tableName, string primaryKey) { try { Debug.WriteLine("进入BulkCopy"); //临时表名使用日期加随机数 Random ran = new Random(); int ranNumber = ran.Next(1, 10000); string dateStr = DateTime.Now.ToString("yyyyMMddHHmmss"); string tempName = "#" + tableName + dateStr + ranNumber; var model = typeof(TModel); var propers = model.GetProperties(); StringBuilder updateStrBuild = new StringBuilder(); foreach (var item in propers) { string fullName = item.PropertyType.FullName; bool isValueType = item.PropertyType.IsValueType; bool isClass = item.PropertyType.IsClass; bool isEnum = item.PropertyType.IsEnum; if ((isValueType || isEnum) && !isClass) { updateStrBuild.Append(" t2." + item.Name + " = t1." + item.Name + ","); } else if (fullName == "System.String") { updateStrBuild.Append(" t2." + item.Name + " = t1." + item.Name + ","); } } string updaSql = updateStrBuild.ToString(); updaSql = updaSql.TrimEnd(','); Debug.WriteLine("修改语句" + updaSql); string updateSql = string.Format("update t2 SET {2} FROM {0} AS t1,{1} AS t2 WHERE t1.{3} = t2.{3}", tempName, tableName, updaSql, primaryKey); Debug.WriteLine(updateSql); StringBuilder strB = new StringBuilder(); foreach (var item in propers) { string fullName = item.PropertyType.FullName; bool isValueType = item.PropertyType.IsValueType; bool isClass = item.PropertyType.IsClass; bool isEnum = item.PropertyType.IsEnum; if ((isValueType || isEnum) && !isClass) { strB.Append(" " + item.Name + ", "); } else if (fullName == "System.String") { strB.Append(" " + item.Name + ", "); } } strB.Append(" " + primaryKey + " as Ids "); string strSql = strB.ToString(); using (SqlConnection conn = new SqlConnection(connectionString)) { string sql = "SELECT top 0 " + strSql + " into " + tempName + " from " + tableName; if (conn.State != ConnectionState.Open) { conn.Open(); } SqlCommand command = new SqlCommand(); command.CommandText = sql; command.CommandType = CommandType.Text; command.Connection = conn; command.ExecuteNonQuery(); DataTable dt = ToSqlBulkCopyDataTable2(modelList, conn, tableName, strSql, primaryKey); SqlBulkCopy sqlBulk = null; sqlBulk = new SqlBulkCopy(conn); using (sqlBulk) { sqlBulk.DestinationTableName = tempName; if (conn.State != ConnectionState.Open) { conn.Open(); } sqlBulk.WriteToServer(dt); } command.CommandText = updateSql; int count = command.ExecuteNonQuery(); return count; } } catch (Exception ex) { throw ex; } }
b)批量修改生成DataTable对象的代码片段
/// <summary> /// 批量修改准备DataTable /// </summary> /// <typeparam name="TModel"></typeparam> /// <param name="modelList"></param> /// <param name="conn"></param> /// <param name="tableName"></param> /// <param name="strSql"></param> /// <param name="key"></param> /// <returns></returns> private static DataTable ToSqlBulkCopyDataTable2<TModel>(IList<TModel> modelList, SqlConnection conn, string tableName, string strSql, string key) { Debug.WriteLine("进入ToSqlBulkCopyDataTable2"); DataTable dt = new DataTable(); //获取实体 Type mType = typeof(TModel); var mType_Properts = mType.GetProperties(); #region string sql = string.Format("select top 1 " + strSql + " from {0}", tableName); Debug.WriteLine("查询语句:" + sql); if (conn.State != ConnectionState.Open) { conn.Open(); } SqlCommand command = new SqlCommand(); command.CommandText = sql; command.CommandType = CommandType.Text; command.Connection = conn; var reader = command.ExecuteReader(); try { while (reader.Read()) { for (int i = 0; i < reader.FieldCount; i++) { var re_name = reader.GetName(i); var re_type = reader.GetFieldType(i); DataColumn column = new DataColumn(re_name, re_type); dt.Columns.Add(column); } } } catch (Exception ex) { throw ex; } finally { if (reader != null) { reader.Close(); } } #endregion foreach (var model in modelList) { DataRow dr = dt.NewRow(); foreach (var proper in mType_Properts) { string fullName = proper.PropertyType.FullName; bool isValueType = proper.PropertyType.IsValueType; bool isClass = proper.PropertyType.IsClass; bool isEnum = proper.PropertyType.IsEnum; if ((isValueType || isEnum) && !isClass) { object value = proper.GetValue(model); if (proper.PropertyType.IsEnum) { if (value != null) { value = (int)value; } } dr[proper.Name] = value ?? DBNull.Value; if (key.Equals(proper.Name)) { dr["Ids"] = value; } } else if (fullName == "System.String") { object value = proper.GetValue(model); dr[proper.Name] = value ?? DBNull.Value; if (key.Equals(proper.Name)) { dr["Ids"] = value; } } } dt.Rows.Add(dr); } return dt; }
对数据的批量修改和批量写入完全结束了O(∩_∩)O