  • 汇率多线程 草稿

    <%@ Page Language="C#" AutoEventWireup="true" CodeBehind="WebForm2.aspx.cs" Inherits="WebApplication20.WebForm2" %>
    <!DOCTYPE html>
    <html xmlns="http://www.w3.org/1999/xhtml">
    <head runat="server">
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
        <form id="form1" runat="server">
            <input name="_method" value="getFxRate"  type="hidden"/>
        <input type="date" name="start" />--<input type="date" name="end" />
            <input type="submit" />
    using Contract.Domain;
    using ETLAPP;
    using Framework;
    using HraWeb.Common;
    using NHibernate;
    using System;
    using System.Collections;
    using System.Collections.Generic;
    using System.Data;
    using System.IO;
    using System.Linq;
    using System.Net;
    using System.Text;
    using System.Text.RegularExpressions;
    using System.Threading;
    using System.Web;
    using System.Web.UI;
    using System.Web.UI.WebControls;
    using System.Xml;
    using ThreadTemplate;
    using Contract.IService;
    namespace WebApplication20
        public enum SearchRange
            th = 0,
            td = 1
        class ThreadParameters
            public string Url
            public string pairId
            public string Status
            public int PageCount
            public int pageIndex
            public string start { get; set; }
            public string end { get; set; }
            public string webCode { get; set; }
        public partial class WebForm2 : BasePage
            public static ArrayList contentList = new ArrayList();
            private static System.Data.DataTable table = null;
            private object InitThread(object para, SubThread th)
                List<string> trList = new List<string>();
                var obj = (ThreadParameters)para;
                obj.pageIndex = 1;
                int pageCount = 0;
                getData(para, null);
                pageCount = obj.PageCount;
                obj.pageIndex = pageCount;
                getData(obj, th);
                return "";
            public static Dictionary<string, BasCurrencyPair> dic = new Dictionary<string, BasCurrencyPair>();
            private object getData(object para, SubThread th)
                ABC: ThreadParameters obj = (ThreadParameters)para;
                var url = $"http://srh.bankofchina.com/search/whpj/search.jsp?erectDate={obj.start}&nothing={obj.start}&pjname=" + obj.webCode + "&page=" + obj.pageIndex; ;
                WebClient wc = new WebClient();
                List<string> trList = new List<string>();
                    using (Stream stream = wc.OpenRead(url))
                        using (StreamReader sr = new StreamReader(stream, Encoding.UTF8))
                            string content = sr.ReadToEnd();
                            string pagePatern = @"var m_nRecordCount = (.*);";
                            var pageMatch = Regex.Match(content, pagePatern);
                            int rows = int.Parse(pageMatch.Groups[1].Value);
                            int pagesize = 20;
                            int pages = (rows / pagesize) + (rows % pagesize == 0 ? 0 : 1);
                            if (obj.pageIndex == 1)
                                obj.PageCount = pages;
                            string divPatern = @"(?<=<div (.*)?class=""BOC_main publish""[^>]*?>)([sS]*?)(?=</div>)";
                            MatchCollection divMatches = Regex.Matches(content, divPatern);
                            string divContent = string.Empty;
                            foreach (Match match2 in divMatches)
                                divContent = match2.Groups[0].Value;
                            string tablePatern = @"(?<=<table (.*)?[^>]*?>)([sS]*?)(?=</table>)";
                            MatchCollection tableMatches = Regex.Matches(divContent, tablePatern);
                            string tableContent = string.Empty;
                            foreach (Match match1 in tableMatches)
                                tableContent = match1.Groups[0].Value;
                            string trPatern = @"(?<=<tr(.*)?[^>]*?>)([sS]*?)(?=</tr>)";
                            MatchCollection trMatchCollection = Regex.Matches(tableContent, trPatern);
                            Match match = trMatchCollection[trMatchCollection.Count - 2];
                            string tr1 = string.Empty;
                            tr1 = match.Groups[0].Value;
                        //获取表头列元素,或者内容行的单元格元素 trlist[0]是表头 SearchR,ange告诉程序要查表头 还是 内容行
                        System.Collections.ArrayList tdsList = new System.Collections.ArrayList();
                        ArrayList list = new ArrayList();
                        DataRow row = table.NewRow();
                        List<string> tr = null;
                        if (trList.Count >= 1)
                            tr = GET_TH_OR_TD_LIST(SearchRange.td, trList[trList.Count - 1]);
                        //row.FxRate = decimal.Parse(tr[6]);
                        if (tr.Count > 0)
                            var date = DateTime.Now.Date;
                            row["CURRENCY_PAIR"] = Int64.Parse(obj.pairId);
                            row["FX_RATE"] = decimal.Parse(tr[6]);
                            DateTime.TryParse(tr[7], out date);
                            row["CAPTURE_DATE"] = date.Date;
                        if ((obj.PageCount > 1 && obj.pageIndex > 1) || (obj.PageCount == 1))
                            if (th != null)
                                th.ReturnObj = row;
                        obj.Status = "完成";
                catch (System.Net.WebException ex)
                    goto ABC;
                return "子完成";
            private Contract.IService.IDaoService _dao;
            public Contract.IService.IDaoService Dao
                    if (_dao == null)
                        lock (new object())
                            if (_dao == null)
                                _dao = (Contract.IService.IDaoService)ctx["DaoService"];
                    return _dao;
            private ISession _session;
            public ISession session
                    if (_session == null)
                        lock (new object())
                            if (_session == null)
                                _session = Dao.GetSession();
                    return _session;
            private List<string> GET_TH_OR_TD_LIST(SearchRange range, string row)
                string tmp = "";
                tmp = range.ToString();
                string tdPatern = $@"(?<=(<{tmp}[^>]*?>))(?<tdCell>[sS]*?)(?=</{tmp}>)";
                MatchCollection CurrenttdMatchCollection = Regex.Matches(row, tdPatern);
                string td = string.Empty;
                List<string> tdlList = new List<string>();
                List<string> contentList = new List<string>();
                foreach (Match match in CurrenttdMatchCollection)
                    td = match.Groups["tdCell"].Value;
                return contentList;
            public object RateThreadCompeleted(object currentThread)
                SubThread CallThread = currentThread as SubThread;
                SubThread t11 = new SubThread(Guid.NewGuid().ToString(), (a, b) =>
                    if (table.Rows.Count < 100)
                    if (table.Rows.Count == 100)
                        Holworth.Utility.HraUtility.DataTableWriteToServer(table, "BAS_FX_RATE_BAK3", "ID", true);
                    return null;
                return null;
            protected void Page_Load(object sender, EventArgs e)
                if (table == null)
                    table = new System.Data.DataTable();
                    table.Columns.Add(new System.Data.DataColumn() { ColumnName = "ID", DataType = typeof(string) });
                    table.Columns.Add(new System.Data.DataColumn() { ColumnName = "CURRENCY_PAIR", DataType = typeof(Int64) });
                    table.Columns.Add(new System.Data.DataColumn() { ColumnName = "FX_RATE", DataType = typeof(decimal) });
                    table.Columns.Add(new System.Data.DataColumn() { ColumnName = "CAPTURE_DATE", DataType = typeof(DateTime) });
                string log = "";
                if (Request["_method"] == "getFxRate")
                    info = new Framework.QueryInfo() { CustomSQL = "select * from BASE_CURRENCY_WEB" };
                    var Dao = (Contract.IService.IDaoService)ctx["DaoService"];
                    dic = Dao.FindList<BasCurrencyPair>(new QueryInfo() { QueryObject = "BasCurrencyPair" }).ToDictionary(x => x.Id);
                    var ds = Dao.ExcuteDataSet(info);
                    var start = DateTime.Parse(Request["start"]);
                    var end = DateTime.Parse(Request["end"]);
                    if (string.IsNullOrEmpty(Request["start"]) || string.IsNullOrEmpty(Request["end"]))
                        throw new Exception(":输个日期,我的哥!");
                    SubThread thread = new SubThread("1");
                    int maxPoolThread = 200;
                    int totalThreadNum = 0;
                    var runingHt = new Dictionary<int, SubThread>();
                    var unRunHt = new Dictionary<int, SubThread>();
                    int k = 0;
                    string url = "";
                    //var code = row[""].ToString();
                    for (DateTime t1 = start; t1 <= end; t1 = t1.AddDays(1))
                        foreach (DataRow row in ds.Tables[0].Rows)
                            var pairId = row["CURRENCY_PAIR_ID"].ToString();
                            url = string.Format("http://srh.bankofchina.com/search/whpj/search.jsp?erectDate={0}&nothing={1}&pjname={2}", t1.ToString("yyyy-MM-dd"), t1.ToString("yyyy-MM-dd"), row["WEB_CODE"]);
                            var param = new ThreadParameters() { pairId = pairId, Url = url };
                            param.pageIndex = 1;
                            param.start = t1.ToString("yyyy-MM-dd");
                            param.webCode = row["WEB_CODE"].ToString();
                            SubThread th = new SubThread(k.ToString());
                            th.ThreadAction = (currentThread,inputParameter) =>
                                param.pageIndex = 1;
                                InitThread(param, th);
                                return "";
                            th.ThreadCompeleted += RateThreadCompeleted;
                            if (k <= maxPoolThread)
                                runingHt.Add(k, th);
                                unRunHt.Add(k, th);
                    while (true)
                        var stepFinishList = new List<int>();
                        foreach (int tid in runingHt.Keys)
                            var t = runingHt[tid];
                            if (t.IsStopped)
                                if (t.ReturnObj == null)
                                    log += "
    " + "无数据" + "
                        foreach (int tid in stepFinishList)
                            if (unRunHt.Count > 0)
                                SubThread unRunThread = unRunHt.First().Value;
                                var unRunTid = unRunHt.First().Key;
                                runingHt.Add(unRunTid, unRunThread);
                        if (runingHt.Count == 0 && unRunHt.Count == 0)
                    Holworth.Utility.HraUtility.DataTableWriteToServer(table, "BAS_FX_RATE_BAK3", "ID", true);
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    namespace ThreadTemplate
        public class SubThread : IDisposable
            public readonly object _locker = new object();
            /// <summary>
            /// 当前线程类的一个字典集合,可用于传递数据,属性辅助
            /// </summary>
            public IDictionary<object, object> DataDictionary { get; set; }
            /// <summary>
            /// 可用作theadstart的传入参数
            /// </summary>
            public dynamic InputParameter { get; set; }
            /// <summary>
            /// 第一个object对象是subthread本身,第二个是输入参数,第三个是执行委托函数的返回值
            /// </summary>
            public Func<object, object, object> ThreadAction { get; set; }
            /// <summary>
            /// 第一个参数是subthread本身,第二个参数是委托的返回值
            /// </summary>
            public Func<object, object> ThreadCompeleted { get; set; }
            private Thread thdSubThread = null;
            public Mutex mUnique = new Mutex();
            private bool blnIsStopped;
            private bool blnSuspended;
            private bool blnStarted;
            private string threadId;
            public string ThreadId { get { return threadId; } set { threadId = value; } }
            private object _returnObj;
            public bool IsStopped
                get { return blnIsStopped; }
            public bool IsSuspended
                get { return blnSuspended; }
            public object ReturnObj
                get { return _returnObj; }
                set { _returnObj = value; }
            public bool Wait()
                lock (_locker)
                    while (!blnIsStopped) { Monitor.Wait(_locker); }
    return true; } /// <summary> /// 子线程构造函数 /// </summary> /// <param name="key">必须给,为了自己控制得到线程</param> /// <param name="threadStart">第一个参数subthread,第二个输入参数,第三个返回值</param> /// <param name="threadParamter">可以在 构造函数给,也可以通过赋值属性给</param> public SubThread(string key, Func<object, object, object> threadStart = null, dynamic threadParamter = null) { threadId = key; blnIsStopped = true; blnSuspended = false; blnStarted = false; if (threadStart != null) { ThreadAction = threadStart; } } /// <summary> /// Start sub-thread 线程启动 /// </summary> public void Start() { if (!blnStarted) { thdSubThread = new Thread(ExecuteAction); blnIsStopped = false; blnStarted = true; thdSubThread.Start(); } } /// <summary> /// Thread entry function 线程执行方法,从网站中用正则表达式,抓取需要的数据 /// </summary> /// private void ExecuteAction() { //线程动作 ThreadAction?.Invoke(this, InputParameter); //线程完成回调,这里直接运行 ThreadCompeleted?.Invoke(this); //通知主线程,任务完成内部阻塞,通过脉冲通知解除锁定 this.Stop(); } /// <summary> /// Suspend sub-thread /// </summary> public void Suspend() { if (blnStarted && !blnSuspended) { blnSuspended = true; mUnique.WaitOne(); } } /// <summary> /// Resume sub-thread /// </summary> public void Resume() { if (blnStarted && blnSuspended) { blnSuspended = false; mUnique.ReleaseMutex(); } } /// <summary> /// Stop sub-thread /// </summary> public void Stop() { if (blnStarted) { if (blnSuspended) Resume(); blnStarted = false; blnIsStopped = true; //thdSubThread.Join(); } lock (_locker) { blnStarted = false; blnIsStopped = true; //运行完成,使用脉冲通知锁定代码可以放行了 Monitor.PulseAll(_locker); } } #region IDisposable Members /// <summary> /// Class resources dispose here /// </summary> public void Dispose() { // TODO: Add clsSubThread.Dispose implementation Stop();//Stop thread first GC.SuppressFinalize(this); } #endregion } } 4.bulkcopy public static void DataTableWriteToServer(DataTable dt, string targetTable, string IdField = "ID", bool NeedId = false) { var Dao = GetDao(); QueryInfo info = new QueryInfo(); DataTable table = null; Int64 increId = 0; #region 获取序列的当前值 lock (new object()) { if (NeedId) { QueryInfo searchInfo = new QueryInfo(); searchInfo.CustomSQL = "select HIBERNATE_SEQUENCE.NEXTVAL from dual"; table = Dao.ExecuteDataSet(searchInfo).Tables[0]; increId = Convert.ToInt64(table.Rows[0][0].ToString()); #endregion QueryInfo tmpInfo = new QueryInfo(); tmpInfo.CustomSQL = "drop sequence HIBERNATE_SEQUENCE"; Dao.ExecuteNonQuery(tmpInfo); tmpInfo = new QueryInfo(); tmpInfo.CustomSQL = "CREATE SEQUENCE HIBERNATE_SEQUENCE START WITH " + (increId + 1 + dt.Rows.Count).ToString(); Dao.ExecuteNonQuery(tmpInfo); for (int i = 0; i < dt.Rows.Count; i++) { var row = dt.Rows[i]; increId++; row[IdField] = increId.ToString(); } } string connOrcleString = System.Configuration.ConfigurationManager.ConnectionStrings["ConnectionString"].ConnectionString; OracleConnection conn = new OracleConnection(connOrcleString); OracleBulkCopy bulkCopy = new OracleBulkCopy(connOrcleString, OracleBulkCopyOptions.UseInternalTransaction); bulkCopy.BulkCopyTimeout = 260 * 1000; bulkCopy.DestinationTableName = targetTable; //服务器上目标表的名称 bulkCopy.BatchSize = 5000; //每一批次中的行数 try { conn.Open(); if (dt != null && dt.Rows.Count != 0) bulkCopy.WriteToServer(dt); //将提供的数据源中的所有行复制到目标表中 } catch (Exception ex) { throw ex; } finally { if (bulkCopy != null) bulkCopy.Close(); } } } 5.信号通知 private void ComputeData() { try { QueryInfo info1 = new QueryInfo(); info1.CustomSQL = "delete from SysComputerStatus where CreateUid=:uid"; info1.Parameters.Add("uid", CurrentUser.UserId); Dao.ExecuteUpdate(info1); SubThread sub = new SubThread("RskBookComputeResultManage3_RiskValueComputeData"); sub.InputParameter = new { terminationStep = int.Parse(Request["terminationStep"]), riskBookId = int.Parse(Request["riskBookId"]??"4513"), computeDate = DateTime.Parse(Request["computeDate"]) }; sub.ThreadAction = (a,b) => { RiskValueComputeService2.SaveCompute2(sub.InputParameter.computeDate, sub.InputParameter.riskBookId, sub.InputParameter.terminationStep); return null; }; sub.Start(); //阻塞知道任务完成 if(sub.Wait()) { //if (sub.IsStopped) { Response.Write(Newtonsoft.Json.JsonConvert.SerializeObject(new {msg="计算成功" })); //break; } } } catch (Exception ex) { var errorObj = new { ErrorCode = "-999", error = ex.Message }; Response.Write(Newtonsoft.Json.JsonConvert.SerializeObject(errorObj)); throw; } finally { Response.End(); } }
