代码 :
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Threading; using System.Data; using System.Data.SqlClient; using System.Data.SqlTypes; using System.Collections.Concurrent; namespace MultiThreadInserDataIntoList { public class CommandDetail { public string sessionIP { get; set; } public string sessionID { get; set; } public string commandName { get; set; } public DateTime cmd_recv_time { get; set; } public DateTime cmd_reply_time { get; set; } public string cmd_content { get; set; } public string reply_content { get; set; } public string err_reason { get; set; } } class Program { static System.Threading.Timer _timer; static ConcurrentQueue<CommandDetail> CommandDetailList = new ConcurrentQueue<CommandDetail>(); static void Main(string[] args) { Task task1 = Task.Factory.StartNew(() => doStuff(CommandDetailList)); Task task2 = Task.Factory.StartNew(() => doStuff(CommandDetailList)); Task task3 = Task.Factory.StartNew(() => doStuff(CommandDetailList)); _timer = new System.Threading.Timer(new TimerCallback(JobCallBack), null, 0, 1000); Console.ReadLine(); } // static void doStuff(object o1) { ConcurrentQueue<CommandDetail> typed = (ConcurrentQueue<CommandDetail>)o1; CommandDetail cst = new CommandDetail { cmd_recv_time = DateTime.Now, cmd_reply_time = DateTime.Now, reply_content = "111", cmd_content="111", commandName="GETNAP", sessionID="111", sessionIP="111" }; for (int i = 0; i < 300; i++) { Thread.Sleep(100); typed.Enqueue(cst); //Console.WriteLine("add one row to list"); } } // static void JobCallBack(object state) { int iEffect = 0; if (CommandDetailList.Count >= 100) { //var items = CommandDetailList.Take(100); //Console.WriteLine("DB call with {0} Items",items.Count()); using (SqlConnection conn = new SqlConnection("Data Source=DESKTOP-DQPC13K;Initial Catalog=SmalllSmartBilling;Integrated Security=True;")) { try { conn.Open(); SqlCommand comm = conn.CreateCommand(); comm.CommandText = "sp_InsertRequestReply"; comm.CommandType = System.Data.CommandType.StoredProcedure; comm.CommandTimeout = 300; comm.Parameters.Add(new SqlParameter() { ParameterName = "@TVP", SqlDbType = SqlDbType.Structured, Value = GetDataTableParam(CommandDetailList) }); comm.Parameters.Add(new SqlParameter() { ParameterName = "@result", SqlDbType = SqlDbType.Int, Direction = ParameterDirection.Output, Size = 4 }); comm.Parameters.Add(new SqlParameter() { ParameterName = "@errormsg", SqlDbType = SqlDbType.VarChar, Direction = ParameterDirection.Output, Size = 1000 }); iEffect = comm.ExecuteNonQuery(); Console.WriteLine("this time total insert {0} rows", iEffect); } catch (Exception uep) { return; } } //using //clear //Console.WriteLine("before clear,List have {0} rows", CommandDetailList.Count); //CommandDetail localValue; //int iRety = 100; //while (iRety > 0) //{ // CommandDetailList.TryDequeue(out localValue); // iRety--; //} //Console.WriteLine("success remove {0} rows", iRety); Remove(CommandDetailList, iEffect); } // if > 100 else { Console.WriteLine("remain {0} rows", CommandDetailList.Count); } } private static void Remove(ConcurrentQueue<CommandDetail> q, int count) { CommandDetail commandDetail; Enumerable.Range(1, count).ToList().ForEach(n => q.TryDequeue(out commandDetail)); } // static DataTable GetDataTableParam(IEnumerable<CommandDetail> People) { //define the table and rows (the rows match those in the TVP) DataTable peopleTable = new DataTable(); peopleTable.Columns.Add("Session_IP", typeof(string)); peopleTable.Columns.Add("Session_ID", typeof(string)); peopleTable.Columns.Add("CommandName", typeof(string)); peopleTable.Columns.Add("Request_Recv_Time", typeof(DateTime)); peopleTable.Columns.Add("Request_reply_Time", typeof(DateTime)); peopleTable.Columns.Add("Request_Content", typeof(string)); peopleTable.Columns.Add("Reply_Content", typeof(string)); peopleTable.Columns.Add("Err_Reason", typeof(string)); foreach (CommandDetail p in People) { // add a row for each person DataRow row = peopleTable.NewRow(); //row["clsCarrierID"] = p.clsCarrierID; //row["clsis_supplier"] = p.clsis_supplier; row["Session_IP"] = p.sessionIP; row["Session_ID"] = p.sessionID; row["CommandName"] = p.commandName; row["Request_Recv_Time"] = p.cmd_recv_time; row["Request_reply_Time"] = p.cmd_reply_time; row["Request_Content"] = p.cmd_content; row["Reply_Content"] = p.reply_content; row["Err_Reason"] = p.err_reason; peopleTable.Rows.Add(row); } return peopleTable; } // } }
注:一开始用的是 List<> , 但发现 不能保证线程安全,后来在 stackoverflow 提问, 改用 ConcurrentQueue ,队列的特征是 front delete 的同时, 可以在 tail insert ,
足以保证thread safe
原帖网址 : http://stackoverflow.com/questions/43905150/issues-when-use-a-timer-to-save-listclass-into-db-and-the-list-is-writing-by