zoukankan      html  css  js  c++  java
  • 主线程create一个ConcurrentQueue ,然后用三个子线程往里面写数,主线程建timer检查ConcurrentQueue>某值,存数据库,存进多少就清ConcurrentQueue多少

    代码 :

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    using System.Threading;
    using System.Data;
    using System.Data.SqlClient;
    using System.Data.SqlTypes;
    using System.Collections.Concurrent;
    
    namespace MultiThreadInserDataIntoList
    {
        public class CommandDetail
        {
            public string sessionIP { get; set; }
            public string sessionID { get; set; }
            public string commandName { get; set; }
            public DateTime cmd_recv_time { get; set; }
            public DateTime cmd_reply_time { get; set; }
            public string cmd_content { get; set; }
            public string reply_content { get; set; }
            public string err_reason { get; set; }
        }
    
        class Program
        {
            static System.Threading.Timer _timer;
            static ConcurrentQueue<CommandDetail> CommandDetailList = new ConcurrentQueue<CommandDetail>();
            
    
    
            static void Main(string[] args)
            {
                Task task1 = Task.Factory.StartNew(() => doStuff(CommandDetailList));
                Task task2 = Task.Factory.StartNew(() => doStuff(CommandDetailList));
                Task task3 = Task.Factory.StartNew(() => doStuff(CommandDetailList));
                _timer = new System.Threading.Timer(new TimerCallback(JobCallBack), null, 0, 1000);
                Console.ReadLine();
    
    
            }
            //
            static void doStuff(object o1)
            {
                ConcurrentQueue<CommandDetail> typed = (ConcurrentQueue<CommandDetail>)o1;
                CommandDetail cst = new CommandDetail { cmd_recv_time = DateTime.Now, cmd_reply_time = DateTime.Now, reply_content = "111", cmd_content="111", commandName="GETNAP", sessionID="111", sessionIP="111" };
    
                for (int i = 0; i < 300; i++)
                {
                    Thread.Sleep(100);
                    typed.Enqueue(cst);
                    //Console.WriteLine("add one row to list");
                }
            }
            //
            static void JobCallBack(object state)
            {
                int iEffect = 0;
                if (CommandDetailList.Count >= 100)
                {
                    //var items = CommandDetailList.Take(100);
                    
                    //Console.WriteLine("DB call with {0} Items",items.Count());
                    
                    using (SqlConnection conn = new SqlConnection("Data Source=DESKTOP-DQPC13K;Initial Catalog=SmalllSmartBilling;Integrated Security=True;"))
                    {
                        try
                        {
                            conn.Open();
    
                            SqlCommand comm = conn.CreateCommand();
                            comm.CommandText = "sp_InsertRequestReply";
                            comm.CommandType = System.Data.CommandType.StoredProcedure;
                            comm.CommandTimeout = 300;
    
    
                            comm.Parameters.Add(new SqlParameter()
                            {
                                ParameterName = "@TVP",
                                SqlDbType = SqlDbType.Structured,
                                Value = GetDataTableParam(CommandDetailList)
                            });
    
                            comm.Parameters.Add(new SqlParameter()
                            {
                                ParameterName = "@result",
                                SqlDbType = SqlDbType.Int,
                                Direction = ParameterDirection.Output,
                                Size = 4
                            });
    
                            comm.Parameters.Add(new SqlParameter()
                            {
                                ParameterName = "@errormsg",
                                SqlDbType = SqlDbType.VarChar,
                                Direction = ParameterDirection.Output,
                                Size = 1000
                            });
    
                            iEffect = comm.ExecuteNonQuery();
                            Console.WriteLine("this time total insert {0} rows", iEffect);
                        }
                        catch (Exception uep)
                        {
                            return;
                        }
                    }   //using
    
    
    
                    //clear
                    //Console.WriteLine("before clear,List have {0} rows", CommandDetailList.Count);
    
                    //CommandDetail localValue;
                    //int iRety = 100;
                    //while (iRety > 0)
                    //{
                    //    CommandDetailList.TryDequeue(out localValue);
                    //    iRety--;
                    //}
                    //Console.WriteLine("success remove {0} rows", iRety);
    
                    Remove(CommandDetailList, iEffect);
    
                }   // if > 100
                else
                {
    
                    Console.WriteLine("remain {0} rows", CommandDetailList.Count);
                }
            }
    
            private static void Remove(ConcurrentQueue<CommandDetail> q, int count)
            {
                CommandDetail commandDetail;
                Enumerable.Range(1, count).ToList().ForEach(n => q.TryDequeue(out commandDetail));
            }
    
    
            //
            static DataTable GetDataTableParam(IEnumerable<CommandDetail> People)
            {
                //define the table and rows (the rows match those in the TVP)
                DataTable peopleTable = new DataTable();
                peopleTable.Columns.Add("Session_IP", typeof(string));
                peopleTable.Columns.Add("Session_ID", typeof(string));
                peopleTable.Columns.Add("CommandName", typeof(string));
    
                peopleTable.Columns.Add("Request_Recv_Time", typeof(DateTime));
                peopleTable.Columns.Add("Request_reply_Time", typeof(DateTime));
    
                peopleTable.Columns.Add("Request_Content", typeof(string));
                peopleTable.Columns.Add("Reply_Content", typeof(string));
                peopleTable.Columns.Add("Err_Reason", typeof(string));
    
    
                foreach (CommandDetail p in People)
                {
                    // add a row for each person
                    DataRow row = peopleTable.NewRow();
                    //row["clsCarrierID"] = p.clsCarrierID;
                    //row["clsis_supplier"] = p.clsis_supplier;
                    row["Session_IP"] = p.sessionIP;
                    row["Session_ID"] = p.sessionID;
                    row["CommandName"] = p.commandName;
    
                    row["Request_Recv_Time"] = p.cmd_recv_time;
                    row["Request_reply_Time"] = p.cmd_reply_time;
                    row["Request_Content"] = p.cmd_content;
                    row["Reply_Content"] = p.reply_content;
                    row["Err_Reason"] = p.err_reason;
    
                    peopleTable.Rows.Add(row);
                }
                return peopleTable;
    
    
            }
    
            //
        }
    }

    注:一开始用的是 List<> , 但发现 不能保证线程安全,后来在 stackoverflow 提问, 改用 ConcurrentQueue ,队列的特征是 front delete 的同时, 可以在 tail insert ,

    足以保证thread safe 

    原帖网址 : http://stackoverflow.com/questions/43905150/issues-when-use-a-timer-to-save-listclass-into-db-and-the-list-is-writing-by

  • 相关阅读:
    正则表达式
    Java 基础类型
    2014 新的开始
    (9)Microsoft office Word 2013版本操作入门_文本框_word排版
    (8)Microsoft office Word 2013版本操作入门_制作传单海报
    (7)Microsoft office Word 2013版本操作入门_常用技巧
    (6)Microsoft office Word 2013版本操作入门_文件封面,页首,页尾
    (5)Microsoft office Word 2013版本操作入门_标尺
    (4)Microsoft office Word 2013版本操作入门_插入图片及图片的排版
    (8)Jquery1.8.3快速入门_可见性选择器
  • 原文地址:https://www.cnblogs.com/lthxk-yl/p/6841041.html
Copyright © 2011-2022 走看看