zoukankan      html  css  js  c++  java
  • [Oracle] Bulk Insert Data

    命名空间:Oracle.DataAccess.Client

    组件:Oracle.DataAccess.dll(2.112.1.0)

    ODP.NET 版本:ODP.NET for .NET Framework 2.0 或 ODP.NET for .NET Framework 4

    工具:Microsoft Visual Studio Ultimate 2013 + Oracle SQL Developer 1.5.5 + Oracle Database 11g Enterprise Edition 11.2.0.1.0(32位) + TNS for 32-bit Windows 11.2.0.1.0

    方式一:ArrayBind

    当插入一条数据时,SQL 语句如下:

    INSERT INTO table_name VALUES (:col1, :col2, :col3, :col4, :col5)
      1 public void InsertDataRow(Dictionary<string, object> dataRow)
      2 {
      3 	StringBuilder sbCmdText = new StringBuilder();
      4 	sbCmdText.AppendFormat("INSERT INTO {0}(", m_TableName);
      5 	sbCmdText.Append(string.Join(",", dataRow.Keys.ToArray()));
      6 	sbCmdText.Append(") VALUES (");
      7 	sbCmdText.Append(":" + string.Join(",:", dataRow.Keys.ToArray()));
      8 	sbCmdText.Append(")");
      9 
     10 	using (OracleConnection conn = new OracleConnection())
     11 	{
     12 		using (OracleCommand cmd = conn.CreateCommand())
     13 		{
     14 			cmd.CommandType = CommandType.Text;
     15 			cmd.CommandText = sbCmdText.ToString();
     16 			OracleParameter parameter = null;
     17 			OracleDbType dbType = OracleDbType.Object;
     18 			foreach (string colName in dataRow.Keys)
     19 			{
     20 				dbType = GetOracleDbType(dataRow[colName]);
     21 				parameter = new OracleParameter(colName, dbType);
     22 				parameter.Direction = ParameterDirection.Input;
     23 				parameter.OracleDbTypeEx = dbType;
     24 				parameter.Value = dataRow[colName];
     25 				cmd.Parameters.Add(parameter);
     26 			}
     27 			conn.Open();
     28 			int result = cmd.ExecuteNonQuery();
     29 		}
     30 	}
     31 }

    此时,每一个 OracleParameter 的 Value 值都赋予单个字段的 一个具体值,这种也是最为传统的插入数据的方法。

    Oracle V6 中 OCI 编程接口加入了数组接口特性。

    当采用 ArrayBind 时,OraleParameter 的 Value 值则是赋予单个字段的 一个数组,即多条数据的该字段组合成的一个数组。此时 Oracle 仅需要执行一次 SQL 语句,即可在内存中批量解析并导入数据,减少程序与数据库之间来回的操作,其优点就是数据导入的总体时间明显减少,尤其是进程占用 CPU 的时间。

    如果数据源是 DataTable 类型,首先把 DataTable 数据源,转换成 object[][] 类型,然后绑定 OracleParameter 的 Value 值为对应字段的一个 Object[] 数组即可;参考代码如下:

      1 /// <summary>
      2 /// 批量插入大数据量
      3 /// </summary>
      4 /// <param name="columnData">列名-列数据字典</param>
      5 /// <param name="dataCount">数据量</param>
      6 /// <returns>插入数据量</returns>
      7 public int InsertBigData(Dictionary<string, object> columnData, int dataCount)
      8 {
      9 	int result = 0;
     10 	if (columnData == null || columnData.Count < 1)
     11 	{
     12 		return result;
     13 	}
     14 	string[] colHeaders = columnData.Keys.ToArray();
     15 	StringBuilder sbCmdText = new StringBuilder();
     16 	if (columnData.Count > 0)
     17 	{
     18 		// 拼接INSERT的SQL语句
     19 		sbCmdText.AppendFormat("INSERT INTO {0}(", m_TableName);
     20 		sbCmdText.Append(string.Join(",", colHeaders));
     21 		sbCmdText.Append(") VALUES (");
     22 		sbCmdText.Append(m_ParameterPrefix + string.Join("," + m_ParameterPrefix, colHeaders));
     23 		sbCmdText.Append(")");
     24 		OracleConnection connection = null;
     25 		try
     26 		{
     27 			connection = new OracleConnection(GetConnectionString());
     28 			using (OracleCommand command = connection.CreateCommand())
     29 			{
     30 				command.ArrayBindCount = dataCount;
     31 				command.BindByName = true;
     32 				command.CommandType = CommandType.Text;
     33 				command.CommandText = sbCmdText.ToString();
     34 				command.CommandTimeout = 1800;
     35 				OracleParameter parameter;
     36 				OracleDbType dbType = OracleDbType.Object;
     37 				foreach (string colName in colHeaders)
     38 				{
     39 					dbType = GetOracleDbType(columnData[colName]);
     40 					parameter = new OracleParameter(colName, dbType);
     41 					parameter.Direction = ParameterDirection.Input;
     42 					parameter.OracleDbTypeEx = dbType;
     43 					parameter.Value = columnData[colName];
     44 					command.Parameters.Add(parameter);
     45 				}
     46 				connection.Open();
     47 				OracleTransaction trans = connection.BeginTransaction();
     48 				try
     49 				{
     50 					command.Transaction = trans;
     51 					result = command.ExecuteNonQuery();
     52 					trans.Commit();
     53 				}
     54 				catch (Exception ex)
     55 				{
     56 					trans.Rollback();
     57 					throw ex;
     58 				}
     59 			}
     60 		}
     61 		finally
     62 		{
     63 			if (connection != null)
     64 			{
     65 				connection.Close();
     66 				connection.Dispose();
     67 			}
     68 			GC.Collect();
     69 			GC.WaitForFullGCComplete();
     70 		}
     71 	}
     72 	return result;
     73 }
      1 /// <summary>
      2 /// 根据数据类型获取OracleDbType
      3 /// </summary>
      4 /// <param name="value">数据</param>
      5 /// <returns>数据的Oracle类型</returns>
      6 private static OracleDbType GetOracleDbType(object value)
      7 {
      8 	OracleDbType dataType = OracleDbType.Object;
      9 	if (value is string[])
     10 	{
     11 		dataType = OracleDbType.Varchar2;
     12 	}
     13 	else if (value is DateTime[])
     14 	{
     15 		dataType = OracleDbType.TimeStamp;
     16 	}
     17 	else if (value is int[] || value is short[])
     18 	{
     19 		dataType = OracleDbType.Int32;
     20 	}
     21 	else if (value is long[])
     22 	{
     23 		dataType = OracleDbType.Int64;
     24 	}
     25 	else if (value is decimal[] || value is double[] || value is float[])
     26 	{
     27 		dataType = OracleDbType.Decimal;
     28 	}
     29 	else if (value is Guid[])
     30 	{
     31 		dataType = OracleDbType.Varchar2;
     32 	}
     33 	else if (value is bool[] || value is Boolean[])
     34 	{
     35 		dataType = OracleDbType.Byte;
     36 	}
     37 	else if (value is byte[])
     38 	{
     39 		dataType = OracleDbType.Blob;
     40 	}
     41 	else if (value is char[])
     42 	{
     43 		dataType = OracleDbType.Char;
     44 	}
     45 	return dataType;
     46 }
    GetOracleDbType

    说明:如果采用分次(每次1万数据)执行 InsertBigData 方法,速度反而比一次性执行 InsertBigData 方法慢,详见下面测试结果;

    测试结果:

    无索引,数据类型:4列NVARCHAR2,2列NUMBER

    30+万(7.36M):一次性导入用时 15:623,每次10000导入用时

    60+万(14.6M):一次性导入用时 28:207,每次10000导入用时 1:2:300

    100+万(24.9M):一次性导入报如下异常

    image

    此时实际上从资源监视器上可以得知仍有可用内存,但是仍旧报 OutOfMemoryException,所以猜测应该是一个 bug;

    如果每次10000导入用时 2:9:252

    如果每次50000导入用时 58:101

    附加 InsertBigData 方法使用示例:

      1 // 每10000数据导入一次
      2 Dictionary<string, object> columnsData = new Dictionary<string, object>();
      3 int dataCount = m_SourceDataTable.Rows.Count;
      4 int times = dataCount / 10000 + (dataCount % 10000 == 0 ? 0 : 1);
      5 for (int i = 0; i < times; i++)
      6 {
      7 	int startIndex = i * 10000;
      8 	int endIndex = (i + 1) * 10000;
      9 	endIndex = endIndex > dataCount ? dataCount : endIndex;
     10 	int currDataCount = endIndex - startIndex;
     11 	columnsData.Add("COL1", new string[currDataCount]);
     12 	columnsData.Add("COL2", new string[currDataCount]);
     13 	columnsData.Add("COL3", new decimal[currDataCount]);
     14 	columnsData.Add("COL4", new string[currDataCount]);
     15 	columnsData.Add("COL5", new decimal[currDataCount]);
     16 	columnsData.Add("COL6", new string[currDataCount]);
     17 	for (int rowIndex = startIndex; rowIndex < endIndex; rowIndex++)
     18 	{
     19 		int dicRowIndex = rowIndex - startIndex;// 列数据行索引
     20 		foreach (string colName in columnsData.Keys)
     21 		{
     22 			object cell = m_SourceDataTable.Rows[rowIndex][colName];
     23 			string cellStr = (cell + "").TrimEnd(new char[] { '', ' ' });
     24 			if (colName == "COL3" || colName == "COL5")
     25 			{
     26 				decimal value = 0;
     27 				decimal.TryParse(cellStr, out value);
     28 				((decimal[])columnsData[colName])[dicRowIndex] = value;
     29 			}
     30 			else
     31 			{
     32 				((string[])columnsData[colName])[dicRowIndex] = cellStr;
     33 			}
     34 		}
     35 	}
     36 	m_DAL.InsertBigData(columnsData, currDataCount);
     37 
     38 	columnsData.Clear();
     39 	GC.Collect();
     40 	GC.WaitForFullGCComplete();
     41 }
    View Code

    方式二:OracleBulkCopy

    说明:

    1. OracleBulkCopy 采用 direct path 方式导入;

    2. 不支持 transaction,无法 Rollback;

    3. 如果该表存在触发器时,无法使用 OracleBulkCopy(报异常信息 Oracle Error: ORA-26086),除非先禁用该表的所有触发器;

    4. 过程中会自动启用 NOT NULL、UNIQUE 和 PRIMARY KEY 三种约束,其中 NOT NULL 约束在列数组绑定时验证,任何违反 NOT NULL 约束条件的行数据都会舍弃;UNIQUE 约束是在导入完成后重建索引时验证,但是在 bulk copy 时,允许违反索引约束,并在完成后将索引设置成禁用(UNUSABLE)状态;而且,如果索引一开始状态就是禁用(UNUSABLE)状态时,OracleBulkCopy 是会报错的。

    参考代码如下:

      1 /// <summary>
      2 /// 批量插入数据
      3 /// 该方法需要禁用该表所有触发器,并且插入的数据如果为空,是不会采用默认值
      4 /// </summary>
      5 /// <param name="table">数据表</param>
      6 /// <param name="targetTableName">数据库目标表名</param>
      7 /// <returns></returns>
      8 public bool InsertBulkData(DataTable table, string targetTableName)
      9 {
     10 	bool result = false;
     11 	string connStr = GetConnectionString();
     12 	using (OracleConnection connection = new OracleConnection(connStr))
     13 	{
     14 		using (OracleBulkCopy bulkCopy = new OracleBulkCopy(connStr, OracleBulkCopyOptions.Default))
     15 		{
     16 			if (table != null && table.Rows.Count > 0)
     17 			{
     18 				bulkCopy.DestinationTableName = targetTableName;
     19 				for (int i = 0; i < table.Columns.Count; i++)
     20 				{
     21 					string col = table.Columns[i].ColumnName;
     22 					bulkCopy.ColumnMappings.Add(col, col);
     23 				}
     24 				connection.Open();
     25 				bulkCopy.WriteToServer(table);
     26 				result = true;
     27 			}
     28 			bulkCopy.Close();
     29 			bulkCopy.Dispose();
     30 		}
     31 	}
     32 
     33 	return result;
     34 }

    测试结果:

    数据类型:4列NVARCHAR2,2列NUMBER

    30+万(7.36M):用时 14:590

    60+万(14.6M):用时 28:28

    1048576(24.9M):用时 52:971

    附加,禁用表的所有外键SQL:

    ALTER TABLE table_name DISABLE ALL TRIGGERS

    总结

    1、在30+万和60+万数据时,ArrayBind一次性导入和OracleBulkCopy时间相差不是很大,但是ArrayBind方式一般都需要转换数据形式,占用了一些时间,而 OracleBulkCopy 则只需要简单处理一下 DataTable 数据源即可导入;

    2、当数据量达到100+万时,ArrayBind 很容易出现内存不足异常,此时只能采用分批次执行导入,根据测试结果可知,次数越少,速度越快;而采用 OracleBulkCopy 方式则很少出现内存不足现象,由此可见 OracleBulkCopy 占用内存比 ArrayBind 方式少;

    3、采用 OracleBulkCopy 导入时,先要禁用该表所有触发器,如果该表存在自增 ID 触发器,就比较麻烦了,得先禁用改变的自增 ID 的触发器,然后手动自增设置要导入的数据,最后才可以导入;同时,这种导入方式不可并发,一个时刻只能有一个用户在导入(因为自增ID交由程序处理),此时还需要锁表,防止其他人同时批量导入数据;

    参考资料:

    1、ArrayBind http://www.oracle.com/technetwork/issue-archive/2009/09-sep/o59odpnet-085168.html

    2、ArrayBind http://www.soaspx.com/dotnet/csharp/csharp_20130911_10501.html

    3、Oracle数据导入方法 http://dbanotes.net/Oracle/All_About_Oracle_Data_Loading.htm

    4、介绍OracleBulkCopy类 https://docs.oracle.com/cd/E11882_01/win.112/e23174/OracleBulkCopyClass.htm#ODPNT7446

    5、http://dba.stackexchange.com/questions/7287/what-specifically-does-oraclebulkcopy-do-and-how-can-i-optimize-its-performance

  • 相关阅读:
    smb 共享文件夹
    php nginx 配置
    mysql 存储过程
    ubuntu 源码下载分析
    rust 小米R3G官方rom(openwrt) openssl
    rust 配置
    mac 制作树莓派3b启动盘
    小米R2D
    golang 配置
    golang pprof操作流程
  • 原文地址:https://www.cnblogs.com/memento/p/5303342.html
Copyright © 2011-2022 走看看