1.html <%@ Page Language="C#" AutoEventWireup="true" CodeBehind="WebForm2.aspx.cs" Inherits="WebApplication20.WebForm2" %> <!DOCTYPE html> <html xmlns="http://www.w3.org/1999/xhtml"> <head runat="server"> <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/> <title></title> </head> <body> <form id="form1" runat="server"> <div> <input name="_method" value="getFxRate" type="hidden"/> <input type="date" name="start" />--<input type="date" name="end" /> </div> <input type="submit" /> </form> </body> </html> 2.cs using Contract.Domain; using ETLAPP; using Framework; using HraWeb.Common; using NHibernate; using System; using System.Collections; using System.Collections.Generic; using System.Data; using System.IO; using System.Linq; using System.Net; using System.Text; using System.Text.RegularExpressions; using System.Threading; using System.Web; using System.Web.UI; using System.Web.UI.WebControls; using System.Xml; using ThreadTemplate; using Contract.IService; namespace WebApplication20 { public enum SearchRange { th = 0, td = 1 } class ThreadParameters { public string Url { get; set; } public string pairId { get; set; } public string Status { get; set; } public int PageCount { get; set; } public int pageIndex { get; set; } public string start { get; set; } public string end { get; set; } public string webCode { get; set; } } public partial class WebForm2 : BasePage { public static ArrayList contentList = new ArrayList(); private static System.Data.DataTable table = null; private object InitThread(object para, SubThread th) { List<string> trList = new List<string>(); var obj = (ThreadParameters)para; obj.pageIndex = 1; int pageCount = 0; //已经运行了一页的数据 getData(para, null); pageCount = obj.PageCount; obj.pageIndex = pageCount; getData(obj, th); return ""; } public static Dictionary<string, BasCurrencyPair> dic = new Dictionary<string, BasCurrencyPair>(); private object getData(object para, SubThread th) { ABC: ThreadParameters obj = (ThreadParameters)para; var url = $"http://srh.bankofchina.com/search/whpj/search.jsp?erectDate={obj.start}¬hing={obj.start}&pjname=" + obj.webCode + "&page=" + obj.pageIndex; ; WebClient wc = new WebClient(); List<string> trList = new List<string>(); try { using (Stream stream = wc.OpenRead(url)) { using (StreamReader sr = new StreamReader(stream, Encoding.UTF8)) { string content = sr.ReadToEnd(); string pagePatern = @"var m_nRecordCount = (.*);"; var pageMatch = Regex.Match(content, pagePatern); int rows = int.Parse(pageMatch.Groups[1].Value); int pagesize = 20; int pages = (rows / pagesize) + (rows % pagesize == 0 ? 0 : 1); if (obj.pageIndex == 1) { obj.PageCount = pages; } //提取div内容开始 string divPatern = @"(?<=<div (.*)?class=""BOC_main publish""[^>]*?>)([sS]*?)(?=</div>)"; MatchCollection divMatches = Regex.Matches(content, divPatern); string divContent = string.Empty; foreach (Match match2 in divMatches) { divContent = match2.Groups[0].Value; break; } //提取div内容结束 //提取表格内容开始 string tablePatern = @"(?<=<table (.*)?[^>]*?>)([sS]*?)(?=</table>)"; MatchCollection tableMatches = Regex.Matches(divContent, tablePatern); string tableContent = string.Empty; foreach (Match match1 in tableMatches) { tableContent = match1.Groups[0].Value; break; } string trPatern = @"(?<=<tr(.*)?[^>]*?>)([sS]*?)(?=</tr>)"; MatchCollection trMatchCollection = Regex.Matches(tableContent, trPatern); Match match = trMatchCollection[trMatchCollection.Count - 2]; string tr1 = string.Empty; tr1 = match.Groups[0].Value; trList.Add(tr1); //提取行结束 } //获取表头列元素,或者内容行的单元格元素 trlist[0]是表头 SearchR,ange告诉程序要查表头 还是 内容行 System.Collections.ArrayList tdsList = new System.Collections.ArrayList(); ArrayList list = new ArrayList(); System.Threading.Monitor.Enter(table); DataRow row = table.NewRow(); System.Threading.Monitor.Exit(table); List<string> tr = null; if (trList.Count >= 1) { tr = GET_TH_OR_TD_LIST(SearchRange.td, trList[trList.Count - 1]); } //row.FxRate = decimal.Parse(tr[6]); if (tr.Count > 0) { var date = DateTime.Now.Date; row["CURRENCY_PAIR"] = Int64.Parse(obj.pairId); row["FX_RATE"] = decimal.Parse(tr[6]); DateTime.TryParse(tr[7], out date); row["CAPTURE_DATE"] = date.Date; } if ((obj.PageCount > 1 && obj.pageIndex > 1) || (obj.PageCount == 1)) { if (th != null) { th.ReturnObj = row; } } obj.Status = "完成"; } } //应对无法连接到远程服务器,远程服务器没有响应或应答 catch (System.Net.WebException ex) { goto ABC; } return "子完成"; } private Contract.IService.IDaoService _dao; public Contract.IService.IDaoService Dao { get { if (_dao == null) { lock (new object()) { if (_dao == null) { _dao = (Contract.IService.IDaoService)ctx["DaoService"]; } } } return _dao; } } private ISession _session; public ISession session { get { if (_session == null) { lock (new object()) { if (_session == null) _session = Dao.GetSession(); } } return _session; } } private List<string> GET_TH_OR_TD_LIST(SearchRange range, string row) { string tmp = ""; tmp = range.ToString(); string tdPatern = $@"(?<=(<{tmp}[^>]*?>))(?<tdCell>[sS]*?)(?=</{tmp}>)"; MatchCollection CurrenttdMatchCollection = Regex.Matches(row, tdPatern); string td = string.Empty; List<string> tdlList = new List<string>(); List<string> contentList = new List<string>(); foreach (Match match in CurrenttdMatchCollection) { td = match.Groups["tdCell"].Value; contentList.Add(td); } return contentList; } public object RateThreadCompeleted(object currentThread) { SubThread CallThread = currentThread as SubThread; SubThread t11 = new SubThread(Guid.NewGuid().ToString(), (a, b) => { Monitor.Enter(table); if (table.Rows.Count < 100) table.Rows.Add((DataRow)(CallThread.ReturnObj)); if (table.Rows.Count == 100) { Holworth.Utility.HraUtility.DataTableWriteToServer(table, "BAS_FX_RATE_BAK3", "ID", true); table.Clear(); } Monitor.Exit(table); return null; }); t11.Start(); return null; } protected void Page_Load(object sender, EventArgs e) { if (table == null) { table = new System.Data.DataTable(); System.Threading.Monitor.Enter(table); //没想到添加列的顺序对BUlkCopy有影响 table.Columns.Add(new System.Data.DataColumn() { ColumnName = "ID", DataType = typeof(string) }); table.Columns.Add(new System.Data.DataColumn() { ColumnName = "CURRENCY_PAIR", DataType = typeof(Int64) }); table.Columns.Add(new System.Data.DataColumn() { ColumnName = "FX_RATE", DataType = typeof(decimal) }); table.Columns.Add(new System.Data.DataColumn() { ColumnName = "CAPTURE_DATE", DataType = typeof(DateTime) }); System.Threading.Monitor.Exit(table); } string log = ""; if (Request["_method"] == "getFxRate") { info = new Framework.QueryInfo() { CustomSQL = "select * from BASE_CURRENCY_WEB" }; var Dao = (Contract.IService.IDaoService)ctx["DaoService"]; dic = Dao.FindList<BasCurrencyPair>(new QueryInfo() { QueryObject = "BasCurrencyPair" }).ToDictionary(x => x.Id); var ds = Dao.ExcuteDataSet(info); var start = DateTime.Parse(Request["start"]); var end = DateTime.Parse(Request["end"]); if (string.IsNullOrEmpty(Request["start"]) || string.IsNullOrEmpty(Request["end"])) { throw new Exception(":输个日期,我的哥!"); } SubThread thread = new SubThread("1"); //多线程的最大并发数 int maxPoolThread = 200; int totalThreadNum = 0; //当前正在运行的线程 var runingHt = new Dictionary<int, SubThread>(); //处于等待队列的未运行的线程 var unRunHt = new Dictionary<int, SubThread>(); int k = 0; string url = ""; //var code = row[""].ToString(); for (DateTime t1 = start; t1 <= end; t1 = t1.AddDays(1)) { foreach (DataRow row in ds.Tables[0].Rows) { var pairId = row["CURRENCY_PAIR_ID"].ToString(); url = string.Format("http://srh.bankofchina.com/search/whpj/search.jsp?erectDate={0}¬hing={1}&pjname={2}", t1.ToString("yyyy-MM-dd"), t1.ToString("yyyy-MM-dd"), row["WEB_CODE"]); var param = new ThreadParameters() { pairId = pairId, Url = url }; k++; param.pageIndex = 1; param.start = t1.ToString("yyyy-MM-dd"); param.webCode = row["WEB_CODE"].ToString(); SubThread th = new SubThread(k.ToString()); th.ThreadAction = (currentThread,inputParameter) => { param.pageIndex = 1; InitThread(param, th); return ""; }; th.ThreadCompeleted += RateThreadCompeleted; if (k <= maxPoolThread) { runingHt.Add(k, th); th.Start(); } else { unRunHt.Add(k, th); } } } while (true) { Thread.Sleep(500); //初始化完成队列,用于存取已经执行完的线程的id var stepFinishList = new List<int>(); //将完成的线程放入完成队列 foreach (int tid in runingHt.Keys) { var t = runingHt[tid]; if (t.IsStopped) { t.Dispose(); if (t.ReturnObj == null) { log += " " + "无数据" + " "; } //回调处理 stepFinishList.Add(tid); } } System.Threading.Thread.Sleep(1000); //1.遍历完成队列,从当前运行的线程队列中移除该线程 //2.对完成的线程执行回调,将数据持久化到数据库 //3.如果等待队列中还有数据,获取等待队列中的第一个,并执行该线程,将该线程从等待队列移除,加入到运行队列 foreach (int tid in stepFinishList) { runingHt.Remove(tid); if (unRunHt.Count > 0) { SubThread unRunThread = unRunHt.First().Value; var unRunTid = unRunHt.First().Key; unRunThread.Start(); runingHt.Add(unRunTid, unRunThread); unRunHt.Remove(unRunTid); } } //所有线程都完成后,跳出循环 if (runingHt.Count == 0 && unRunHt.Count == 0) { break; } } //Dao.SaveOrUpdateAll(contentList); Holworth.Utility.HraUtility.DataTableWriteToServer(table, "BAS_FX_RATE_BAK3", "ID", true); } } } } 3.subthread.cs using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; namespace ThreadTemplate { public class SubThread : IDisposable { //线程锁 public readonly object _locker = new object(); /// <summary> /// 当前线程类的一个字典集合,可用于传递数据,属性辅助 /// </summary> public IDictionary<object, object> DataDictionary { get; set; } /// <summary> /// 可用作theadstart的传入参数 /// </summary> public dynamic InputParameter { get; set; } /// <summary> /// 第一个object对象是subthread本身,第二个是输入参数,第三个是执行委托函数的返回值 /// </summary> public Func<object, object, object> ThreadAction { get; set; } //线程完成后的回调 /// <summary> /// 第一个参数是subthread本身,第二个参数是委托的返回值 /// </summary> public Func<object, object> ThreadCompeleted { get; set; } private Thread thdSubThread = null; //互斥信号量 //互斥锁(Mutex) //互斥锁是一个互斥的同步对象,意味着同一时间有且仅有一个线程可以获取它。 //互斥锁可适用于一个共享资源每次只能被一个线程访问的情况 public Mutex mUnique = new Mutex(); private bool blnIsStopped; private bool blnSuspended; private bool blnStarted; private string threadId; public string ThreadId { get { return threadId; } set { threadId = value; } } private object _returnObj; public bool IsStopped { get { return blnIsStopped; } } public bool IsSuspended { get { return blnSuspended; } } public object ReturnObj { get { return _returnObj; } set { _returnObj = value; } } //阻塞,等待信号再解除阻塞 public bool Wait() { lock (_locker) { //调用该方法进行阻塞,当有脉冲信号通知线程可以停止了,才结束阻塞 while (!blnIsStopped) { Monitor.Wait(_locker); } }
GC.SuppressFinalize(this); return true; } /// <summary> /// 子线程构造函数 /// </summary> /// <param name="key">必须给,为了自己控制得到线程</param> /// <param name="threadStart">第一个参数subthread,第二个输入参数,第三个返回值</param> /// <param name="threadParamter">可以在 构造函数给,也可以通过赋值属性给</param> public SubThread(string key, Func<object, object, object> threadStart = null, dynamic threadParamter = null) { threadId = key; blnIsStopped = true; blnSuspended = false; blnStarted = false; if (threadStart != null) { ThreadAction = threadStart; } } /// <summary> /// Start sub-thread 线程启动 /// </summary> public void Start() { if (!blnStarted) { thdSubThread = new Thread(ExecuteAction); blnIsStopped = false; blnStarted = true; thdSubThread.Start(); } } /// <summary> /// Thread entry function 线程执行方法,从网站中用正则表达式,抓取需要的数据 /// </summary> /// private void ExecuteAction() { //线程动作 ThreadAction?.Invoke(this, InputParameter); //线程完成回调,这里直接运行 ThreadCompeleted?.Invoke(this); //通知主线程,任务完成内部阻塞,通过脉冲通知解除锁定 this.Stop(); } /// <summary> /// Suspend sub-thread /// </summary> public void Suspend() { if (blnStarted && !blnSuspended) { blnSuspended = true; mUnique.WaitOne(); } } /// <summary> /// Resume sub-thread /// </summary> public void Resume() { if (blnStarted && blnSuspended) { blnSuspended = false; mUnique.ReleaseMutex(); } } /// <summary> /// Stop sub-thread /// </summary> public void Stop() { if (blnStarted) { if (blnSuspended) Resume(); blnStarted = false; blnIsStopped = true; //thdSubThread.Join(); } lock (_locker) { blnStarted = false; blnIsStopped = true; //运行完成,使用脉冲通知锁定代码可以放行了 Monitor.PulseAll(_locker); } } #region IDisposable Members /// <summary> /// Class resources dispose here /// </summary> public void Dispose() { // TODO: Add clsSubThread.Dispose implementation Stop();//Stop thread first GC.SuppressFinalize(this); } #endregion } } 4.bulkcopy public static void DataTableWriteToServer(DataTable dt, string targetTable, string IdField = "ID", bool NeedId = false) { var Dao = GetDao(); QueryInfo info = new QueryInfo(); DataTable table = null; Int64 increId = 0; #region 获取序列的当前值 lock (new object()) { if (NeedId) { QueryInfo searchInfo = new QueryInfo(); searchInfo.CustomSQL = "select HIBERNATE_SEQUENCE.NEXTVAL from dual"; table = Dao.ExecuteDataSet(searchInfo).Tables[0]; increId = Convert.ToInt64(table.Rows[0][0].ToString()); #endregion QueryInfo tmpInfo = new QueryInfo(); tmpInfo.CustomSQL = "drop sequence HIBERNATE_SEQUENCE"; Dao.ExecuteNonQuery(tmpInfo); tmpInfo = new QueryInfo(); tmpInfo.CustomSQL = "CREATE SEQUENCE HIBERNATE_SEQUENCE START WITH " + (increId + 1 + dt.Rows.Count).ToString(); Dao.ExecuteNonQuery(tmpInfo); for (int i = 0; i < dt.Rows.Count; i++) { var row = dt.Rows[i]; increId++; row[IdField] = increId.ToString(); } } string connOrcleString = System.Configuration.ConfigurationManager.ConnectionStrings["ConnectionString"].ConnectionString; OracleConnection conn = new OracleConnection(connOrcleString); OracleBulkCopy bulkCopy = new OracleBulkCopy(connOrcleString, OracleBulkCopyOptions.UseInternalTransaction); bulkCopy.BulkCopyTimeout = 260 * 1000; bulkCopy.DestinationTableName = targetTable; //服务器上目标表的名称 bulkCopy.BatchSize = 5000; //每一批次中的行数 try { conn.Open(); if (dt != null && dt.Rows.Count != 0) bulkCopy.WriteToServer(dt); //将提供的数据源中的所有行复制到目标表中 } catch (Exception ex) { throw ex; } finally { if (bulkCopy != null) bulkCopy.Close(); } } } 5.信号通知 private void ComputeData() { try { QueryInfo info1 = new QueryInfo(); info1.CustomSQL = "delete from SysComputerStatus where CreateUid=:uid"; info1.Parameters.Add("uid", CurrentUser.UserId); Dao.ExecuteUpdate(info1); SubThread sub = new SubThread("RskBookComputeResultManage3_RiskValueComputeData"); sub.InputParameter = new { terminationStep = int.Parse(Request["terminationStep"]), riskBookId = int.Parse(Request["riskBookId"]??"4513"), computeDate = DateTime.Parse(Request["computeDate"]) }; sub.ThreadAction = (a,b) => { RiskValueComputeService2.SaveCompute2(sub.InputParameter.computeDate, sub.InputParameter.riskBookId, sub.InputParameter.terminationStep); return null; }; sub.Start(); //阻塞知道任务完成 if(sub.Wait()) { //if (sub.IsStopped) { Response.Write(Newtonsoft.Json.JsonConvert.SerializeObject(new {msg="计算成功" })); //break; } } } catch (Exception ex) { var errorObj = new { ErrorCode = "-999", error = ex.Message }; Response.Write(Newtonsoft.Json.JsonConvert.SerializeObject(errorObj)); throw; } finally { Response.End(); } }