zoukankan      html  css  js  c++  java
  • SQL Server 批量插入数据的两种方法

     

          在SQL Server 中插入一条数据使用Insert语句,但是如果想要批量插入一堆数据的话,循环使用Insert不仅效率低,而且会导致SQL一系统性能问题。下面介绍SQL Server支持的两种批量数据插入方法:Bulk和表值参数(Table-Valued Parameters)。

    运行下面的脚本,建立测试数据库和表值参数。

    [c-sharp] view plaincopy
     
     
     
    1. --Create DataBase  
    2. create database BulkTestDB;  
    3. go  
    4. use BulkTestDB;  
    5. go  
    6. --Create Table  
    7. Create table BulkTestTable(  
    8. Id int primary key,  
    9. UserName nvarchar(32),  
    10. Pwd varchar(16))  
    11. go  
    12. --Create Table Valued  
    13. CREATE TYPE BulkUdt AS TABLE  
    14.   (Id int,  
    15.    UserName nvarchar(32),  
    16.    Pwd varchar(16))  

    下面我们使用最简单的Insert语句来插入100万条数据,代码如下:

    [c-sharp] view plaincopy
     
     
     
    1. Stopwatch sw = new Stopwatch();  
    2.   
    3. SqlConnection sqlConn = new SqlConnection(  
    4.     ConfigurationManager.ConnectionStrings["ConnStr"].ConnectionString);//连接数据库  
    5.   
    6. SqlCommand sqlComm = new SqlCommand();  
    7. sqlComm.CommandText = string.Format("insert into BulkTestTable(Id,UserName,Pwd)values(@p0,@p1,@p2)");//参数化SQL  
    8. sqlComm.Parameters.Add("@p0", SqlDbType.Int);  
    9. sqlComm.Parameters.Add("@p1", SqlDbType.NVarChar);  
    10. sqlComm.Parameters.Add("@p2", SqlDbType.VarChar);  
    11. sqlComm.CommandType = CommandType.Text;  
    12. sqlComm.Connection = sqlConn;  
    13. sqlConn.Open();  
    14. try  
    15. {  
    16.     //循环插入100万条数据,每次插入10万条,插入10次。  
    17.     for (int multiply = 0; multiply < 10; multiply++)  
    18.     {  
    19.         for (int count = multiply * 100000; count < (multiply + 1) * 100000; count++)  
    20.         {  
    21.   
    22.             sqlComm.Parameters["@p0"].Value = count;  
    23.             sqlComm.Parameters["@p1"].Value = string.Format("User-{0}", count * multiply);  
    24.             sqlComm.Parameters["@p2"].Value = string.Format("Pwd-{0}", count * multiply);  
    25.             sw.Start();  
    26.             sqlComm.ExecuteNonQuery();  
    27.             sw.Stop();  
    28.         }  
    29.         //每插入10万条数据后,显示此次插入所用时间  
    30.         Console.WriteLine(string.Format("Elapsed Time is {0} Milliseconds", sw.ElapsedMilliseconds));  
    31.     }  
    32. }  
    33. catch (Exception ex)  
    34. {  
    35.     throw ex;  
    36. }  
    37. finally  
    38. {  
    39.     sqlConn.Close();  
    40. }  
    41.   
    42. Console.ReadLine();  

    耗时图如下:

    使用Insert语句插入10万数据的耗时图

    由于运行过慢,才插入10万条就耗时72390 milliseconds,所以我就手动强行停止了。

    下面看一下使用Bulk插入的情况:

    bulk方法主要思想是通过在客户端把数据都缓存在Table中,然后利用SqlBulkCopy一次性把Table中的数据插入到数据库

    代码如下:

    [c-sharp] view plaincopy
     
     
     
    1. public static void BulkToDB(DataTable dt)  
    2. {  
    3.     SqlConnection sqlConn = new SqlConnection(  
    4.         ConfigurationManager.ConnectionStrings["ConnStr"].ConnectionString);  
    5.     SqlBulkCopy bulkCopy = new SqlBulkCopy(sqlConn);  
    6.     bulkCopy.DestinationTableName = "BulkTestTable";  
    7.     bulkCopy.BatchSize = dt.Rows.Count;  
    8.   
    9.     try  
    10.     {  
    11.         sqlConn.Open();  
    12.     if (dt != null && dt.Rows.Count != 0)  
    13.         bulkCopy.WriteToServer(dt);  
    14.     }  
    15.     catch (Exception ex)  
    16.     {  
    17.         throw ex;  
    18.     }  
    19.     finally  
    20.     {  
    21.         sqlConn.Close();  
    22.         if (bulkCopy != null)  
    23.             bulkCopy.Close();  
    24.     }  
    25. }  
    26.   
    27. public static DataTable GetTableSchema()  
    28. {  
    29.     DataTable dt = new DataTable();  
    30.     dt.Columns.AddRange(new DataColumn[]{  
    31.         new DataColumn("Id",typeof(int)),  
    32.         new DataColumn("UserName",typeof(string)),  
    33.     new DataColumn("Pwd",typeof(string))});  
    34.   
    35.     return dt;  
    36. }  
    37.   
    38. static void Main(string[] args)  
    39. {  
    40.     Stopwatch sw = new Stopwatch();  
    41.     for (int multiply = 0; multiply < 10; multiply++)  
    42.     {  
    43.         DataTable dt = Bulk.GetTableSchema();  
    44.         for (int count = multiply * 100000; count < (multiply + 1) * 100000; count++)  
    45.         {  
    46.             DataRow r = dt.NewRow();  
    47.             r[0] = count;  
    48.             r[1] = string.Format("User-{0}", count * multiply);  
    49.             r[2] = string.Format("Pwd-{0}", count * multiply);  
    50.             dt.Rows.Add(r);  
    51.         }  
    52.         sw.Start();  
    53.         Bulk.BulkToDB(dt);  
    54.         sw.Stop();  
    55.         Console.WriteLine(string.Format("Elapsed Time is {0} Milliseconds", sw.ElapsedMilliseconds));  
    56.     }  
    57.   
    58.     Console.ReadLine();  
    59. }  

    耗时图如下:

    使用Bulk插入100万数据的耗时图

    可见,使用Bulk后,效率和性能明显上升。使用Insert插入10万数据耗时72390,而现在使用Bulk插入100万数据才耗时17583。

    最后再看看使用表值参数的效率,会另你大为惊讶的。

    表值参数是SQL Server 2008新特性,简称TVPs。对于表值参数不熟悉的朋友,可以参考最新的book online,我也会另外写一篇关于表值参数的博客,不过此次不对表值参数的概念做过多的介绍。言归正传,看代码:

    [c-sharp] view plaincopy
     
     
     
    1. public static void TableValuedToDB(DataTable dt)  
    2. {  
    3.     SqlConnection sqlConn = new SqlConnection(  
    4.       ConfigurationManager.ConnectionStrings["ConnStr"].ConnectionString);  
    5.     const string TSqlStatement =  
    6.      "insert into BulkTestTable (Id,UserName,Pwd)" +  
    7.      " SELECT nc.Id, nc.UserName,nc.Pwd" +  
    8.      " FROM @NewBulkTestTvp AS nc";  
    9.     SqlCommand cmd = new SqlCommand(TSqlStatement, sqlConn);  
    10.     SqlParameter catParam = cmd.Parameters.AddWithValue("@NewBulkTestTvp", dt);  
    11.     catParam.SqlDbType = SqlDbType.Structured;  
    12.     //表值参数的名字叫BulkUdt,在上面的建立测试环境的SQL中有。  
    13.     catParam.TypeName = "dbo.BulkUdt";  
    14.     try  
    15.     {  
    16.       sqlConn.Open();  
    17.       if (dt != null && dt.Rows.Count != 0)  
    18.       {  
    19.           cmd.ExecuteNonQuery();  
    20.       }  
    21.     }  
    22.     catch (Exception ex)  
    23.     {  
    24.       throw ex;  
    25.     }  
    26.     finally  
    27.     {  
    28.       sqlConn.Close();  
    29.     }  
    30. }  
    31.   
    32. public static DataTable GetTableSchema()  
    33. {  
    34.     DataTable dt = new DataTable();  
    35.     dt.Columns.AddRange(new DataColumn[]{  
    36.       new DataColumn("Id",typeof(int)),  
    37.       new DataColumn("UserName",typeof(string)),  
    38.       new DataColumn("Pwd",typeof(string))});  
    39.   
    40.     return dt;  
    41. }  
    42.   
    43. static void Main(string[] args)  
    44. {  
    45.     Stopwatch sw = new Stopwatch();  
    46.     for (int multiply = 0; multiply < 10; multiply++)  
    47.     {  
    48.         DataTable dt = TableValued.GetTableSchema();  
    49.         for (int count = multiply * 100000; count < (multiply + 1) * 100000; count++)  
    50.         {          
    51.             DataRow r = dt.NewRow();  
    52.             r[0] = count;  
    53.             r[1] = string.Format("User-{0}", count * multiply);  
    54.             r[2] = string.Format("Pwd-{0}", count * multiply);  
    55.             dt.Rows.Add(r);  
    56.         }  
    57.         sw.Start();  
    58.         TableValued.TableValuedToDB(dt);  
    59.         sw.Stop();  
    60.         Console.WriteLine(string.Format("Elapsed Time is {0} Milliseconds", sw.ElapsedMilliseconds));  
    61.     }  
    62.   
    63.     Console.ReadLine();  
    64. }  

    耗时图如下:

    使用表值参数插入100万数据的耗时图

    比Bulk还快5秒。

    SQLBulkCopy,用于数据库之间大批量的数据传递。通常用于新,旧数据库之间数据的更新。即使表结构完全不同,也可以通过字段间的对应关系,顺利的将数据导过来。

    首先,SQLBulkCopy需要2个连接。分别连接到不同的旧表所在的数据库,新表所在的数据库。

    其次,我们要从旧数据库中,把导出的字段读取出来。用什么读呢?可以用Datatable,也可以用SqlDataReader。因为SqlDataReader不占用内存,对大批量的数据复制,不需要事先导入到系统。所以就用SqlDataReader了。

    读出后,设定对应关系,设定目标表名,写入。就这么简单。速度非常快!

    初始化Connection对象

    SqlConnection ConnectionNew=new SqlConnection("连接信息");

    SqlConnection ConnectionOld=new SqlConnection("连接信息");

                try
                {

    //1.在旧表中,用SqlDataReader读取出信息
                    SqlCommand cmd = new SqlCommand(SQL, ConnectionOld);
                    sdr = cmd.ExecuteReader();

                   //2.初始化SqlBulkCopy对象,用新的连接作为参数。

                    SqlBulkCopy bulkCopy = new SqlBulkCopy(ConnectionNew);

                  //3.写对应关系。如旧表的People列的数据,对应新表Human列,那么就写bulkCopy.ColumnMappings.Add("People","Human")

                  //如果两张表的结构一样,那么对应关系就不用写了。

                 //我是用哈希表存储对应关系的,哈希表作为参数到传入方法中,key的值用来存储旧表的字段名,VALUE的值用来存储新表的值
                    foreach (string str in HTDuiYing.Keys)
                    {
                        bulkCopy.ColumnMappings.Add(str, HTDuiYing[str].ToString());
                    }

                   //4.设置目标表名
                    bulkCopy.DestinationTableName = TableNmae;

                    //额外,可不写:设置一次性处理的行数。这个行数处理完后,会激发SqlRowsCopied()方法。默认为1
                    bulkCopy.NotifyAfter = 10;

                   //额外,可不写:设置激发的SqlRowsCopied()方法,这里为bulkCopy_SqlRowsCopied
                    bulkCopy.SqlRowsCopied += new SqlRowsCopiedEventHandler(bulkCopy_SqlRowsCopied);

                   //OK,开始传数据!
                    bulkCopy.WriteToServer(sdr);
                }

              //激发的方法写在外头

             private void bulkCopy_SqlRowsCopied(object sender, SqlRowsCopiedEventArgs e)
            {
                执行的内容。
                这里有2个元素值得拿来用
                e.RowsCopied,返回数值类型,表示当前已经复制的行数
                e.Abort,用于赋值true or false,用于停止赋值的操作 

            }

     由于不同批次在不同事务中执行,因此,如果在批量复制操作期间发生错误,则当前批次中的所有行都将被回滚,但以前批次中的行将保留在数据库中。

    比如:批量复制100条数据到数据库汇总,batchsize设置为10.则没10条数据复制作为一个事务,整个100条数据的复制操作被分割为10个独立的事务。如果复制到第56条数据时(第6个事务),出错了。则前50条数据提交到数据库中,只回滚出错的事务。

      如果由于发生错误而需要回滚整个批量复制操作,或者批量复制应作为更大的可回滚进程的一部分执行,则可以将 SQLTransaction 对象提供给 SqlBulkCopy 构造函数.

      示例:

     using (SqlConnection destinationConnection = new SqlConnection(connectionString))

                {
                    destinationConnection.Open();
                    using (SqlTransaction transaction = destinationConnection.BeginTransaction())
                    {
                        using (SqlBulkCopy bulkCopy = new SqlBulkCopy( destinationConnection, SqlBulkCopyOptions.Default, transaction))
                        {
                            bulkCopy.BatchSize = 10;
                            bulkCopy.DestinationTableName = "dbo.BulkCopyDemoMatchingColumns";

                            try
                            {
                                bulkCopy.WriteToServer(reader);
                                transaction.Commit();
                            }
                            catch (Exception ex)
                            {
                                Console.WriteLine(ex.Message);
                                transaction.Rollback();
                            }
                            finally
                            {
                                reader.Close();
                            }
                        }
                    }
                }

  • 相关阅读:
    Atitit.eclise的ide特性-------abt 编译
    Atitit.eclise的ide特性-------abt 编译
    Atitit python3.0 3.3 3.5 3.6 新特性 Python2.7新特性1Python 3_x 新特性1python3.4新特性1python3.5新特性1值得关注的新特性1Python3.6新特性2  Python2.7新特性Python 2.7的新特性
    Atitit python3.0 3.3 3.5 3.6 新特性 Python2.7新特性1Python 3_x 新特性1python3.4新特性1python3.5新特性1值得关注的新特性1Pyth
    Atitit intellij idea的使用总结attilax
    Atitit.eclipse 4.3 4.4  4.5 4.6新特性
    Atitit.eclipse 4.3 4.4  4.5 4.6新特性
    Atitit.eclipse 4.3 4.4  4.5 4.6新特性
    Atitit 软件国际化原理与概论
    Atitit 软件国际化原理与概论
  • 原文地址:https://www.cnblogs.com/zyy1688/p/10002661.html
Copyright © 2011-2022 走看看