前段时间,在工作中遇到这个需求,ADO.NET需要大批量插入几万条甚至几十万的数据。因为业务特殊,多张表的相互关联,通常做法是先往主表里面插入一条数据,然后获取主表的主键ID,再往其他关联的表里面插入ID的关联数据。刚开始做的时候,想到用事务,把几万条SQL拼装起来,在一个事务里面去执行,结果很壮烈,执行性能非常糟糕。几千条业务数据执行了几分钟。用代码分析工具Dottrace一查,发现单单操作数据库的时间占了99.9%。
(Dottrace,代码性能分析工具,它分dottrace Performance和dottrace Memory两个工具,dottrace Performance用来分析代码性能,比如函数执行时间,调用次数,消耗时间比率等,dottrace Memory一般用来分析内存占用情况。大家如果有兴趣的同学可以去下载玩下,对代码优化工作很有帮助的。)
言归正传,去网上搜了很多资料,原来ADO.NET2.0有一个新的特性:SqlBulkCopy,效率还是很高的。然后结合自己的业务需求,修改了下代码。现在跟大家一起来学习下。(因为也是参考前辈写的资料,所以下面的示例都大同小异)
建立测试数据库(BulkTestDB)、主表(BulkTestMain)、从表(BulkTestDetail)
--Create DataBase create database BulkTestDB; go use BulkTestDB; go --Create Table Create table BulkTestMain( Id int primary key, GuidId long,--辅助的唯一标识 Batch long,--导入的批次标识 Name nvarchar(32) go Create table BulkTestDetail( Id int primary key, PId int, Lesson nvarchar(32) go
数据库建立完毕,开始编写后台代码

public void TestMain() { using (SqlConnection connection = new SqlConnection("你的链接字符串")) { connection.Open(); SqlTransaction transaction = connection.BeginTransaction("Transaction1"); DataTable dtTestMain= GetTableSchema("BulkTestMain");//构建BulkTestMain表结构 DataTable dtTestDetail = GetTableSchema("BulkTestDetail");//构建BulkTestDetail表结构 Guid Batch = Guid.NewGuid();//插入的批次,为后面查询dtTestMainTmp 做条件 for (int i = 0; i < 1000000; i++)//测试100w条数据 { DataRow dr= dtTestMain.NewRow(); Guid newGuid = Guid.NewGuid(); dr["_GuidId"] = newGuid; dr["_Batch"] = Batch; dr["_UserName"] = "测试" + i.ToString(); dtTestMain.Rows.Add(dr); for(int j = 0;j<10;j++)//给从表每次插入10条数据 { DataRow dr1 = dtTestDetail.NewRow(); dr1["_GuidId"]= newGuid; dr1["_Lesson"]="课程"+j.ToString(); dtTestDetail.Rows.Add(dr1); } //这样做的目的,让主表与从表可以临时通过GuidId关联起来 } BulkToDB(dtTestMain, "BulkTestMain", connection, transaction);//先让BulkTestMain插入了大量的数据,注意这些数据是临时的,在SqlTransaction提交之前,查询时要用with(nolock) DataSet dtTestMainTmp = GetNewImportData(Batch.ToString());//好吧,我们来查询下,刚才大量插入的10w条数据,这里只需要查询标识的2列字段 Dictionary<string, long> dicGuidToID = new Dictionary<string, long>(); foreach (DataRow dr in dtTestMainTmp.Tables[0].Rows) { dicGuidToID.Add(dr[1].ToString(), Convert.ToInt64(dr[0])); }//dicGuidToID:guid字段与插入的主表ID字段关联起来成字典,用字典是为了访问起来效率(为什么获取字典key的值效率很高,有兴趣的可以去研究“散列表”的概念) foreach (DataRow dr in dtTestDetail.Rows)//现在给dtTestDetail的PId字段赋值(PId字段与主表Id外键关联) { dr["_PId"] = dicGuidToID[dr["_GuidId"].ToString()].ToString(); } dtTestDetail.Columns.Remove("_GuidId");//移除dtTestDetail的GuidId字段,使它与数据库列匹配 BulkToDB(dtTestDetail,"BulkTestDetail",connection, transaction);//给从表插入数据 transaction.Commit(); connection.Close(); } } /// <summary> /// 根据批次Batch获取导进来的临时数据 /// </summary> /// <returns></returns> public static DataSet GetNewImportData(string batch) { StringBuilder strSql = new StringBuilder(); strSql.Append("SELECT [Id],[GuidId]") .Append(" FROM ContactInfo WITH (NOLOCK) WHERE Batch=@batch"); SqlParameter[] parameters = { new SqlParameter("@Batch", SqlDbType.BigInt){Value = batch} }; DataSet ds = SqlHelper.ExecuteDataset(strSql.ToString(), parameters); return ds; } public static void BulkToDB(DataTable dtSource, string TableName,SqlConnection connection, SqlTransaction transaction) { using (SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.KeepIdentity, transaction)) { sqlBulkCopy.DestinationTableName = TableName;//要插入数据的表的名称 sqlBulkCopy.BatchSize = dtSource.Rows.Count;//数据的行数 List<SqlBulkCopyColumnMapping> mpList = getMapping(TableName);//获取表映射关系 foreach (SqlBulkCopyColumnMapping mp in mpList) { sqlBulkCopy.ColumnMappings.Add(mp); } if (dtSource != null && dtSource.Rows.Count != 0) { sqlBulkCopy.WriteToServer(dtSource);//插入数据 } } } public static List<SqlBulkCopyColumnMapping> getMapping(string TableName) { List<SqlBulkCopyColumnMapping> mpList = new List<SqlBulkCopyColumnMapping>(); switch(TableName) { case "BulkTestMain":{ mpList.Add(new SqlBulkCopyColumnMapping("_Id", "Id")); mpList.Add(new SqlBulkCopyColumnMapping("_GuidId", "GuidId")); mpList.Add(new SqlBulkCopyColumnMapping("_Batch","Batch")); mpList.Add(new SqlBulkCopyColumnMapping("_UserName", "UserName")); }break; case "BulkTestDetail":{ mpList.Add(new SqlBulkCopyColumnMapping("_Id", "Id")); mpList.Add(new SqlBulkCopyColumnMapping("_PId", "PId")); mpList.Add(new SqlBulkCopyColumnMapping("_Lesson", "Lesson")); }break; } return mpList; } private static DataTable GetTableSchema(string TableName) { DataTable dataTable = new DataTable(); switch(TableName) { case "BulkTestMain" :{ dataTable.Columns.AddRange(new DataColumn[] { new DataColumn("_Id",typeof(Int32)), new DataColumn("_GuidId",typeof(Int64)), new DataColumn("_Batch",typeof(Int64)), new DataColumn("_UserName",typeof(String)) });}break; case "BulkTestDetail":{ dataTable.Columns.AddRange(new DataColumn[] { new DataColumn("_Id",typeof(Int32)), new DataColumn("_PId",typeof(Int32)), new DataColumn("_GuidId",typeof(Int64)), new DataColumn("_Lesson",typeof(String))}); }break; } return dataTable; }
总算把代码copy完了,任务完成。
多表关联批量插入数据库的方法,欢迎大家批评指正。