zoukankan      html  css  js  c++  java
  • C#使用Parallel处理数据同步写入Datatable并使用BulkInsert批量导入数据库

    项目需要,几十万张照片需要计算出每个照片的特征值(调用C++编写的DLL)。

    业务流程:选择照片文件夹,分别访问照片-->调用DLL接口传递照片路径-->接收处理返回值-->写入数据库。

    前期使用的for循环来处理,几十万张照片处理起来差不多10个小时。速度太慢,后面改进使用Parallel来进行平行计算(调用DLL处理照片),统一写入Datatable,然后使用BulkInsert批量把Datatable写入数据库,目前测试8万张照片并行计算速度30分钟,速度提高约30%-40%左右。

    代码示例如下:

    private static SqlConnection sqlconn;
    private static ConcurrentDictionary<string, int> currInts = new ConcurrentDictionary<string, int>();
    private void Button1_Click(object sender, EventArgs e)
            {           
                var dirPath = "";
                using (var folderBrowser = new FolderBrowserDialog())
                {
                    if (folderBrowser.ShowDialog() != DialogResult.OK) return;
                    dirPath = folderBrowser.SelectedPath;
                    if (!Directory.Exists(dirPath))
                    {
                        MessageBox.Show(@"所选路径不存在或无权访问", @"错误", MessageBoxButtons.OK, MessageBoxIcon.Error);
                        return;
                    }
                }
     
                BeginInvoke(new Action(async () =>
                {
                    button1.Enabled = false;
                    var sw = new Stopwatch();
                    sw.Start();
     
                    //检测服务器链接
                    Log.WriteLine(@"尝试连接数据库服务器");
     
                    sqlconn = new SqlConnection(
                        $"Data Source={txt_serverIP.Text},{txt_ServerPort.Text};User ID={txt_User.Text};Password={txt_Pwd.Text};Initial Catalog={txt_DB.Text};Persist Security Info=False;Pooling=true;Min Pool Size=30;Max Pool Size=200;");
                    if (sqlconn.State == ConnectionState.Closed)
                    {
                        try
                        {
                            sqlconn.Open();
                        }
                        catch (Exception exception)
                        {
                            Log.WriteLine($@"连接数据库服务器【失败】-->{exception.Message}");
                            button1.Enabled = true;
                            return;
                        }
                    }
     
                    Log.WriteLine($@"连接数据库服务器【成功】{Environment.NewLine}获取未转换图片数据。。。");
                    var ds = new DataSet();
                    int.TryParse(txt_start.Text, out var start);
                    int.TryParse(txt_end.Text, out var end);
                    var sqlstrALL = "";
                    if (start == 0 || end == 0)
                    {
                        sqlstrALL = "SELECT * FROM ViewWeiZhuanHuan";
                    }
                    else
                    {
                        sqlstrALL = $"SELECT * FROM ViewWeiZhuanHuan WHERE {txt_mastKey.Text} BETWEEN {start} AND {end}";
                    }
     
                    var sqlcmd = new SqlCommand(sqlstrALL, sqlconn);
                    DataAdapter da = new SqlDataAdapter(sqlcmd);
                    da.Fill(ds);
                    if (ds.Tables.Count == 0 || ds.Tables[0].Rows.Count == 0)
                    {
                        Log.WriteLine("所有图片都已经转换完毕。");
                        sqlconn.Close();
                        return;
                    }
     
                    Log.WriteLine($"{ds.Tables[0].Rows.Count}个图片需要转换。");
     
                    var total = ds.Tables[0].Rows.Count;
                    var rowkey = comboBox1.SelectedValue.ToString();
                    var splitkey = txt_split.Text.Trim();
     
                    #region 定义数据保存
                    var dt = new DataTable();
                    dt.Columns.Add("zd1", typeof(int));
                    dt.Columns.Add("zd2", typeof(int));
                    dt.Columns.Add("zd3", typeof(string));
                    dt.Columns.Add("zd4", typeof(string));
                    dt.Columns.Add("zd5", typeof(string));
                    dt.Columns.Add("zd6", typeof(string));
                    #endregion
     
                    #region 并行执行
                    currInts.TryAdd("currInts", 1);//初始化进度数字为1
                    await Task.Run(() =>
                   {
                      //使用8个CPU核心来运行
                       var result = Parallel.For(0, ds.Tables[0].Rows.Count, new ParallelOptions { MaxDegreeOfParallelism = 8}, (rotIndex, state) =>
                       {
                           BeginInvoke(new Action(() =>
                           {
                               currInts.TryGetValue("currInts", out var currValue);
                               lb_process.Text = $@"{currValue}/{total}";//显示进度
                               var nextValue = currValue + 1;
                               currInts.TryUpdate("currInts", nextValue, currValue);//加1
                           }));                      
                          
                           var fileDirPath = "";//根据选择的文件名格式,用填写的规则生成路径                      
     
                           var file = new List<string>{
                                $"{dirPath}\{fileDirPath}\{ksno}_fp1.jpg",
                                $"{dirPath}\{fileDirPath}\{ksno}_fp2.jpg",
                                $"{dirPath}\{fileDirPath}\{ksno}_fp3.jpg"};
     
                           foreach (var zwzp in file)
                           {
                               try
                               {
                                   var model = ZwHelper.zwzhAsync($"{zwzp}").Result;//调用C++转换
                                   if (model != null)
                                   {
    //并行计算下写入Datatable需要锁定才可以,否则会提示datatable索引损坏                            
                                       lock (dt.Rows.SyncRoot)
                                       {
                                           var dr = dt.NewRow();
                                           dr["zd1"] = Convert.ToInt32(filexe);
                                           dr["zd2"] = Convert.ToInt32(ds.Tables[0].Rows[rotIndex]["zd1"]);
                                           dr["zd3"] = model.zhtz;
                                           dr["zd4"] = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
                                           dr["zd5"] = "";
                                           dr["zd6"] = "";
                                           dt.Rows.Add(dr);
                                       }
                                      
                                   }
                                   else
                                   {
                                       Log.WriteLine($@"{ksno}转换失败");
                                       Log.Log.Error($"{ksno}转换失败。");
                                   }
                               }
                               catch (Exception exception)
                               {
                                   Log.Log.Error($"学号{ksno},图片路径{zwzp}转换失败。{exception}");
                               }
                           }
     
                       });
                       sw.Stop();
                       Log.WriteLine($"转换耗时:{sw.ElapsedMilliseconds}毫秒");
                       Log.WriteLine($@"开始写入数据库,数量{dt.Rows.Count}");
     
                       #region 批量写入
     
                       if (dt.Rows.Count ==0)
                       {
                           Log.WriteLine(@"没有要写入的数据。");
                           return;
                       }
                       sw.Restart();
                       var sucess = false;
                       if (SqlHelper.BulkInsert(sqlconn, txt_TableName.Text.Trim(), dt, out var err))
                       {
                           sucess = true;
                       }
                       else
                       {
                           Log.Log.Error($"写入数据库失败==》{err}");
                       }
                       sw.Stop();
                       Log.WriteLine($"写入数据库是否成功=>{sucess},耗时{sw.ElapsedMilliseconds}毫秒");
                       #endregion
                   });
                    #endregion
                  
                  
                    if (sqlconn.State == ConnectionState.Open)
                    {
                        sqlconn.Close();
                    }
                    button1.Enabled = true;
                }));
            }
    

      SQL批量写入函数

            /// <summary>
            /// 批量插入
            /// </summary>
            /// <param name="conn">连接对象</param>
            /// <param name="tableName">将泛型集合插入到本地数据库表的表名</param>
            /// <param name="dataTable">要批量写入的Datatable</param>
            /// <param name="err">错误时返回的信息</param>
            public static bool BulkInsert(SqlConnection conn, string tableName, DataTable dataTable, out string err)
            {
                err = "";
                if (dataTable == null || dataTable.Rows.Count == 0)
                {
                    err = "要写入的数据为空";
                    return false;
                }
                var tran = conn.BeginTransaction();//开启事务 
                var bulkCopy = new SqlBulkCopy(conn, SqlBulkCopyOptions.KeepNulls, tran);
                try
                {
                    if (conn.State == ConnectionState.Closed)
                    {
                        conn.Open();
                    }
                    bulkCopy.BatchSize = 1000;
                    bulkCopy.DestinationTableName = tableName;
                    bulkCopy.WriteToServer(dataTable);
                    tran.Commit();
                    return true;
                }
                catch (Exception e)
                {
                    err = e.ToString();
                    tran.Rollback();
                    return false;
                }
                finally
                {
                    bulkCopy.Close();
                    if (conn.State == ConnectionState.Open)
                    {
                        conn.Close();
                    }
                }
            }
    

      

  • 相关阅读:
    vsftp
    数据类型
    第三篇:表相关操作
    第二篇:库相关操作
    第一篇: 初识数据库
    五 python并发编程之IO模型
    四 python并发编程之协程
    Python GIL
    三 python并发编程之多线程-重点
    三 python并发编程之多线程-理论
  • 原文地址:https://www.cnblogs.com/wdw984/p/10968808.html
Copyright © 2011-2022 走看看