zoukankan      html  css  js  c++  java
  • C# & SQL Server大数据量插入方式对比

    以下内容大部分来自:

    http://blog.csdn.net/tjvictor/article/details/4360030

    部分内容出自互联网,实验结果为亲测。

          最近自己开发一个向数据库中插入大量历史数据的函数库,需要解决一个大数据量插入的效率问题。不用分析,我知道如果采取逐条数据插入的方式,那么效率肯定很低,光是那么多循环就知道很慢了。于是乎,我找到了上篇博客,知道了BulkCopy和TVPs方式。为了更好的了解其效率,我自己动手亲测了一下效果,测试的数据库位于本机。

    (1)方式1:循环插入

            public static void NormalInerst(String connString)
            {
                Console.WriteLine("使用NNormalInerst方式:");
                Stopwatch sw = new Stopwatch();
                SqlConnection sqlConn = new SqlConnection(connString);
                SqlCommand sqlCmd = new SqlCommand();
                sqlCmd.CommandText = String.Format("insert into BulkTestTable(Id,UserName,Pwd)values(@p0,@p1,@p2)");
                sqlCmd.Parameters.Add("@p0", SqlDbType.Int);
                sqlCmd.Parameters.Add("@p1", SqlDbType.NVarChar);
                sqlCmd.Parameters.Add("@p2", SqlDbType.VarChar);
                sqlCmd.CommandType = CommandType.Text;
                sqlCmd.Connection = sqlConn;
                sqlConn.Open();
                try
                {
                    for (int i = 0, j = 0; i < 10; ++i )
                    {
                        for (j = i * 10000; j < (i + 1) * 10000; ++j )
                        {
                            sqlCmd.Parameters["@p0"].Value = j;
                            sqlCmd.Parameters["@p1"].Value = String.Format("User-{0}", i * j);
                            sqlCmd.Parameters["@p2"].Value = String.Format("Pwd-{0}", i * j);
                            sw.Start();
                            sqlCmd.ExecuteNonQuery();
                            sw.Stop();
                        }
                        
                        Console.WriteLine("第{0}次插入{1}条数据耗时:{2}", (i + 1), dataScale, sw.ElapsedMilliseconds);
                        sw.Reset();
                    }
                }
                catch (System.Exception ex)
                {
                    throw ex;
                }
                finally
                {
                    sqlConn.Close();
                }
            }
    View Code

    该方式的效率极低,运行时间很长,我这里就不给出结果了,有兴趣可以自己粘贴试一下。PS:其中的数据规模应该是dataScale而不是10000,不过总是还是慢。

    (2)方式2:使用BulkCopy

            public static void BulkInerst(String connString)
            {
                Console.WriteLine("使用BulkInerst方式:");
                Stopwatch sw = new Stopwatch();
                
                String strDel = "delete from BulkTestTable";
                float millTime = 0;
                for (int multiply = 0; multiply < 10; multiply++)
                {
                    DataTable dt = GetTableSchema();
                    for (int count = multiply * dataScale; count < (multiply + 1) * dataScale; count++)
                    {
                        DataRow r = dt.NewRow();
                        r[0] = count;
                        r[1] = string.Format("User-{0}", count * multiply);
                        r[2] = string.Format("Pwd-{0}", count * multiply);
                        dt.Rows.Add(r);
                    }
    
                    SqlConnection sqlConn = new SqlConnection(connString);
                    SqlBulkCopy bulkCopy = new SqlBulkCopy(sqlConn);
                    bulkCopy.DestinationTableName = "BulkTestTable";
                    bulkCopy.BatchSize = dt.Rows.Count;
    
                    sw.Reset();
                    sw.Start();
                    try
                    {
                        sqlConn.Open();
                        if (dt != null && dt.Rows.Count != 0)
                            bulkCopy.WriteToServer(dt);
                    }
                    catch (Exception ex)
                    {
                        throw ex;
                    }
                    finally
                    {
                        sqlConn.Close();
                        if (bulkCopy != null)
                            bulkCopy.Close();
                    } 
                    sw.Stop();
    
                    Console.WriteLine("第{0}次插入{1}条数据耗时:{2}", (multiply + 1), dataScale, sw.ElapsedMilliseconds);
                    millTime += sw.ElapsedMilliseconds;
                }
                Console.WriteLine("总耗时:{0}毫秒,平均耗时:{1}毫秒", millTime, millTime / 10);
                SqlConnection sqlConn2 = new SqlConnection(connString);
                SqlCommand sqlCmd = new SqlCommand(strDel, sqlConn2);
                try
                {
                    sqlConn2.Open();
                    sqlCmd.ExecuteNonQuery();
                }
                catch (Exception ex)
                {
                    throw ex;
                }
                finally
                {
                    sqlConn2.Close();
                }
                Console.WriteLine("Done!"); 
            }
    View Code

    (3)方式3:使用TVPs

            public static void TVPsInerst(String connString)
            {
                Console.WriteLine("使用TVPsInerst方式:");
                Stopwatch sw = new Stopwatch();
                SqlConnection sqlConn = new SqlConnection(connString);
                String strSQL = "insert into BulkTestTable (Id,UserName,Pwd)" +
                    " SELECT nc.Id, nc.UserName,nc.Pwd" +
                    " FROM @NewBulkTestTvp AS nc";
                String strDel = "delete from BulkTestTable";
                float millTime = 0;
    
                for (int multiply = 0; multiply < 10; multiply++)
                {
                    DataTable dt = GetTableSchema();
                    for (int count = multiply * dataScale; count < (multiply + 1) * dataScale; count++)
                    {
                        DataRow r = dt.NewRow();
                        r[0] = count;
                        r[1] = string.Format("User-{0}", count * multiply);
                        r[2] = string.Format("Pwd-{0}", count * multiply);
                        dt.Rows.Add(r);
                    }
    
                    sw.Reset();
                    sw.Start();
                    SqlCommand cmd = new SqlCommand(strSQL, sqlConn);
                    SqlParameter catParam = cmd.Parameters.AddWithValue("@NewBulkTestTvp", dt);
                    catParam.SqlDbType = SqlDbType.Structured;
                    catParam.TypeName = "dbo.BulkUDT";
                    try
                    {
                        sqlConn.Open();
                        if (dt != null && dt.Rows.Count != 0)
                        {
                            cmd.ExecuteNonQuery();
                        }
                    }
                    catch (Exception ex)
                    {
                        throw ex;
                    }
                    finally
                    {
                        sqlConn.Close();
                    }
                    sw.Stop();
    
                    Console.WriteLine("第{0}次插入{1}条数据耗时:{2}", (multiply + 1), dataScale, sw.ElapsedMilliseconds);
                    millTime += sw.ElapsedMilliseconds;
                }
                Console.WriteLine("总耗时:{0}毫秒,平均耗时:{1}毫秒", millTime, millTime / 10);
                SqlCommand sqlCmd = new SqlCommand(strDel, sqlConn);
                try
                {
                    sqlConn.Open();
                    sqlCmd.ExecuteNonQuery();
                }
                catch (Exception ex)
                {
                    throw ex;
                }
                finally
                {
                    sqlConn.Close();
                }
                Console.WriteLine("Done!");
            }
    View Code

    这里TVPs方式需要利用Visual Studio 2008采用的自定义数据表类型,这是一个比较新的东西。这里补充几个类型和函数,主要是为了检测数据库中是否存在数据表和数据表类型,如果不存在则进行创建。补充代码如下:

            public enum CheckType
            {
                isTable = 0,
                isType
            }
    
            protected static int dataScale = 100000;
    
            public static bool CheckExistsObject(String connString, String objectName, CheckType type)
            {
                String strSQL = "select COUNT(1) from sys.sysobjects where name='" + objectName + "'";
                switch (type)
                {
                    case CheckType.isTable:
                        strSQL = "select COUNT(1) from sys.sysobjects where name='" + objectName + "'";
                        break;
                    case CheckType.isType:
                        strSQL = "select COUNT(1) from sys.types where name='" + objectName + "'";
                        break;
                    default:
                        break;
                }
                using (SqlConnection conn = new SqlConnection(connString)) 
                {
                    conn.Open();
                    SqlCommand cmd = new SqlCommand(strSQL, conn);
                    int result = Convert.ToInt32(cmd.ExecuteScalar());
                    if (0 == result)
                    {
                        return false;
                    }
                }
    
                return true;
            }
    
            public static bool CreateObject(String connString, String objectName, CheckType type)
            {
                String strSQL = "";
                switch (type)
                {
                    case CheckType.isTable:
                        strSQL = "Create table " + objectName + " (Id int primary key, UserName nvarchar(32), Pwd varchar(16))";
                        break;
                    case CheckType.isType:
                        strSQL = "CREATE TYPE " + objectName + " AS TABLE (Id int, UserName nvarchar(32), Pwd varchar(16))";
                        break;
                    default:
                        break;
                }
                using (SqlConnection conn = new SqlConnection(connString))
                {
                    conn.Open();
                    SqlCommand cmd = new SqlCommand(strSQL, conn);
                    cmd.ExecuteNonQuery();
                }
    
                return true;
            }
            public static DataTable GetTableSchema()
            {
                DataTable dt = new DataTable();
                dt.Columns.AddRange(new DataColumn[]{  
                        new DataColumn("Id",typeof(int)),  
                        new DataColumn("UserName",typeof(string)),  
                        new DataColumn("Pwd",typeof(string))});
    
                return dt;
            }
    View Code

    调用的方式就很好说了,参见如下测试代码:

            public static void Main(string[] args)
            {
                String conString = "Persist Security Info=False;User ID=sa;Password=scbj123!@#;Initial Catalog=testGR;Server=KLH-PC";
                String strType = "BulkUDT";
                String strTable = "BulkTestTable";
                if (!CheckExistsObject(conString, strType, CheckType.isType))
                {
                    Console.WriteLine("类型{0}不存在", strType);
                    if (CreateObject(conString, strType, CheckType.isType))
                    {
                        Console.WriteLine("类型{0}创建成功!", strType);
                    }
                }
    
                if (!CheckExistsObject(conString, strTable, CheckType.isTable))
                {
                    Console.WriteLine("表格{0}不存在", strTable);
                    if (CreateObject(conString, strTable, CheckType.isTable))
                    {
                        Console.WriteLine("表格{0}创建成功!", strTable);
                    }
                }
                Console.WriteLine("==================================================");
    
                //NormalInerst(conString);
                BulkInerst(conString);
                TVPsInerst(conString);
                
                Console.ReadKey();
            }
    View Code

    -------------------------------------------------------------------------------------------------

    直接看效果对比:

    <1>第一次运行

    <2>第二次和第三次运行

    这里考虑到了SQL Server自身缓存的原因,所以进行了多次测试,不过数据量没有变。可以从上述结果中看出:TVPs方式不愧是新出的啊,一代更比一代强!

  • 相关阅读:
    phalcon: 当删除循环删除一组数据,需要判断影响的行affectedRows
    java:经典消费生成者
    java:同步和死锁
    java: Thread 和 runnable线程类
    git: windows git ssh keys生成
    ​'JAVAC' 不是内部或外部命令解决方法,记得要重启cmd
    java jar包解析:打包文件,引入文件
    php 对多维数组排序array_multisort
    appium入门基础
    中文Appium API 文档
  • 原文地址:https://www.cnblogs.com/kuliuheng/p/3791041.html
Copyright © 2011-2022 走看看