主线程:
1 private void GetPolicy_Load(object sender, EventArgs e) 2 { 3 ////ThreadPool.QueueUserWorkItem(new WaitCallback(ThreadSP)); 4 try 5 { 6 MessageBox.Show("开始"); 7 ManualResetEvent[] _ManualEvents = new ManualResetEvent[2]; 8 _ManualEvents[0] = new ManualResetEvent(false); 9 _ManualEvents[1] = new ManualResetEvent(false); 10 ThreadPool.QueueUserWorkItem(new WaitCallback(ThreadSP2), _ManualEvents[0]); 13 14 foreach (var v in _ManualEvents) 15 { 16 v.WaitOne(); 17 } 18 // 线程结束后执行后面的主线程代码 19 MessageBox.Show("结束了"); 20 } 21 catch (Exception ex) 22 { 23 24 } 25 }
ThreadSP2方法:
// private void ThreadSP2(object obj) { ////处理方法 //string sql = "select id,start,end from tb"; //DataTable dt = SqlHelper.ExecuteDataset("连接字符串", System.Data.CommandType.Text, sql, null).Tables[0]; //List<string> list; //for (int i = 0; i < dt.Rows.Count; i++) //{ // list = new List<string>(); // list.Add(dt.Rows[i]["start"] + ";" + dt.Rows[i]["end"] + ";" + dt.Rows[i]["id"]); // GJShopping2 s1 = new GJShopping2(); // ThreadPool.QueueUserWorkItem(new WaitCallback(s1.GetShopping), list); // Thread.Sleep(1500); //} MessageBox.Show("ThreadSP2 方法执行了"); ManualResetEvent e = (ManualResetEvent)obj; e.Set(); }
========线程队列控制并发
private Dictionary<List<string>, Thread> _ThreadDictionary = new Dictionary<List<string>, Thread>(); //存储采集任务线程 private int _MaxThreadCount = 20; //最大允许多少并发线程进行采集数据 DataTable dt = SqlHelper.ExecuteDataset("连接字符串", System.Data.CommandType.Text, sql, null).Tables[0]; List<string> list; for (int i = 0; i < dt.Rows.Count; i++) { list = new List<string>(); list.Add(dt.Rows[i]["fromcity"] + ";" + dt.Rows[i]["tocity"] + ";" + dt.Rows[i]["policysettingid"]); SPHandler hd = new SPHandler(); _ThreadDictionary.Add(list, new Thread(new ParameterizedThreadStart(hd.GetShopping), 5)); //hd.GetShopping 带参数方法(处理数据) //ThreadPool.QueueUserWorkItem(new WaitCallback(hd.GetShopping), list); //Thread.Sleep(1000 * 2 * 1); } #region 线程队列处理 if (_ThreadDictionary.Count > 0) { int maxThreadSize = _MaxThreadCount; //最大并发线程数量 int totalThreadCount = _ThreadDictionary.Count; //总线程数量 while (totalThreadCount > 0) { int worksThreadSize = _ThreadDictionary.Where(k => k.Value.IsAlive).Count(); //当前活动线程数量 if (worksThreadSize < maxThreadSize) { foreach (KeyValuePair<List<string>, Thread> item in _ThreadDictionary.Where(k => k.Value.ThreadState.Equals(ThreadState.Unstarted))) { item.Value.Start(item.Key); worksThreadSize++; totalThreadCount--; //达到最大活跃线程数量则跳出 Foreach 循环 , 重新执行 while 中代码. if (worksThreadSize == maxThreadSize) break; } } } //等待所有线程操作完成 while (_ThreadDictionary.Where(k => k.Value.IsAlive).Count() > 0) { } } #endregion
==== GetShopping(带参数方法,处理数据)
public void GetShopping(object obj) { List<string> listtable = obj as List<string>; DateTime dtstart = DateTime.Now.AddDays(1); DateTime dtend = Convert.ToDateTime(dtstart.AddMonths(1).ToString("yyyy-MM") + "-01"); dtend = dtend.AddDays(-1); //暂时跑T+3,T+33时间段数据测试 dtstart = DateTime.Now.AddDays(3); dtend = dtstart.AddDays(30); List<string> listids = new List<string>(); List<string> listidstemp = new List<string>(); string[] array; for (int i = 0; i < listtable.Count; i++) { array = listtable[i].Split(';'); //遍历起止日期 for (DateTime dt = dtstart; dt <= dtend; dt = dt.AddDays(1)) { EWin.International.PolicyForm.SPService.AirFareFlightShopRequest req = new EWin.International.PolicyForm.SPService.AirFareFlightShopRequest(); req.UName = SP_UID; req.UPassword = SP_PWD; req.StartCity = array[0]; req.EndCity = array[1]; req.SearchDateDt = dt; if (SaveToDB(req)) { listids.Add(array[2]); } else { listidstemp.Add(array[2]); } } } listidstemp = listidstemp.Distinct().ToList(); if (listidstemp.Count > 0) { BllPolicysetting.UpdateState(listidstemp, "N", listidstemp.Count, Config.CONSQL_WRITE_ETWIN_INTERNATIONAL_DB); listidstemp.Clear(); } listids = listids.Distinct().ToList(); if (listids.Count > 0) { BllPolicysetting.UpdateState(listids, "N", 0, Config.CONSQL_WRITE_ETWIN_INTERNATIONAL_DB); listids.Clear(); } }