zoukankan      html  css  js  c++  java
  • 汇率多线程 草稿

    1.html
    <%@ Page Language="C#" AutoEventWireup="true" CodeBehind="WebForm2.aspx.cs" Inherits="WebApplication20.WebForm2" %>
    
    <!DOCTYPE html>
    
    <html xmlns="http://www.w3.org/1999/xhtml">
    <head runat="server">
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
        <title></title>
    </head>
    <body>
        <form id="form1" runat="server">
            
        <div>
            <input name="_method" value="getFxRate"  type="hidden"/>
        <input type="date" name="start" />--<input type="date" name="end" />
        </div>
            <input type="submit" />
        </form>
    </body>
    </html>
    2.cs
    using Contract.Domain;
    using ETLAPP;
    using Framework;
    using HraWeb.Common;
    using NHibernate;
    using System;
    using System.Collections;
    using System.Collections.Generic;
    using System.Data;
    using System.IO;
    using System.Linq;
    using System.Net;
    using System.Text;
    using System.Text.RegularExpressions;
    using System.Threading;
    using System.Web;
    using System.Web.UI;
    using System.Web.UI.WebControls;
    using System.Xml;
    using ThreadTemplate;
    using Contract.IService;
    namespace WebApplication20
    {
        public enum SearchRange
        {
            th = 0,
            td = 1
        }
        class ThreadParameters
        {
            public string Url
            {
                get;
                set;
            }
            public string pairId
            {
                get;
                set;
            }
            public string Status
            {
                get;
                set;
            }
            public int PageCount
            {
                get;
                set;
            }
            public int pageIndex
            {
                get;
                set;
            }
            public string start { get; set; }
    
            public string end { get; set; }
    
            public string webCode { get; set; }
        }
    
        public partial class WebForm2 : BasePage
        {
            public static ArrayList contentList = new ArrayList();
            private static System.Data.DataTable table = null;
            private object InitThread(object para, SubThread th)
            {
                List<string> trList = new List<string>();
                var obj = (ThreadParameters)para;
                obj.pageIndex = 1;
                int pageCount = 0;
                //已经运行了一页的数据
                getData(para, null);
                pageCount = obj.PageCount;
                obj.pageIndex = pageCount;
                getData(obj, th);
                return "";
    
            }
            public static Dictionary<string, BasCurrencyPair> dic = new Dictionary<string, BasCurrencyPair>();
            private object getData(object para, SubThread th)
            {
    
    
                ABC: ThreadParameters obj = (ThreadParameters)para;
                var url = $"http://srh.bankofchina.com/search/whpj/search.jsp?erectDate={obj.start}&nothing={obj.start}&pjname=" + obj.webCode + "&page=" + obj.pageIndex; ;
                WebClient wc = new WebClient();
                List<string> trList = new List<string>();
                try
                {
                    using (Stream stream = wc.OpenRead(url))
                    {
                        using (StreamReader sr = new StreamReader(stream, Encoding.UTF8))
                        {
    
                            string content = sr.ReadToEnd();
                            string pagePatern = @"var m_nRecordCount = (.*);";
                            var pageMatch = Regex.Match(content, pagePatern);
                            int rows = int.Parse(pageMatch.Groups[1].Value);
                            int pagesize = 20;
                            int pages = (rows / pagesize) + (rows % pagesize == 0 ? 0 : 1);
                            if (obj.pageIndex == 1)
                            {
                                obj.PageCount = pages;
                            }
                            //提取div内容开始
                            string divPatern = @"(?<=<div (.*)?class=""BOC_main publish""[^>]*?>)([sS]*?)(?=</div>)";
                            MatchCollection divMatches = Regex.Matches(content, divPatern);
                            string divContent = string.Empty;
                            foreach (Match match2 in divMatches)
                            {
                                divContent = match2.Groups[0].Value;
                                break;
                            }
                            //提取div内容结束
    
                            //提取表格内容开始
                            string tablePatern = @"(?<=<table (.*)?[^>]*?>)([sS]*?)(?=</table>)";
                            MatchCollection tableMatches = Regex.Matches(divContent, tablePatern);
                            string tableContent = string.Empty;
                            foreach (Match match1 in tableMatches)
                            {
                                tableContent = match1.Groups[0].Value;
                                break;
                            }
                            string trPatern = @"(?<=<tr(.*)?[^>]*?>)([sS]*?)(?=</tr>)";
                            MatchCollection trMatchCollection = Regex.Matches(tableContent, trPatern);
                            Match match = trMatchCollection[trMatchCollection.Count - 2];
                            string tr1 = string.Empty;
                            tr1 = match.Groups[0].Value;
                            trList.Add(tr1);
                            //提取行结束
    
                        }
    
                        //获取表头列元素,或者内容行的单元格元素 trlist[0]是表头 SearchR,ange告诉程序要查表头 还是 内容行
                        System.Collections.ArrayList tdsList = new System.Collections.ArrayList();
                        ArrayList list = new ArrayList();
                        System.Threading.Monitor.Enter(table);
                        DataRow row = table.NewRow();
                        System.Threading.Monitor.Exit(table);
    
                        List<string> tr = null;
                        if (trList.Count >= 1)
                        {
                            tr = GET_TH_OR_TD_LIST(SearchRange.td, trList[trList.Count - 1]);
                        }
    
    
    
                        //row.FxRate = decimal.Parse(tr[6]);
                        if (tr.Count > 0)
                        {
                            var date = DateTime.Now.Date;
                            row["CURRENCY_PAIR"] = Int64.Parse(obj.pairId);
                            row["FX_RATE"] = decimal.Parse(tr[6]);
                            DateTime.TryParse(tr[7], out date);
                            row["CAPTURE_DATE"] = date.Date;
                        }
    
    
                        if ((obj.PageCount > 1 && obj.pageIndex > 1) || (obj.PageCount == 1))
                        {
                            if (th != null)
                            {
                                th.ReturnObj = row;
    
                            }
                        }
                        obj.Status = "完成";
                    }
                }
                //应对无法连接到远程服务器,远程服务器没有响应或应答
                catch (System.Net.WebException ex)
                {
    
                    goto ABC;
                }
    
                return "子完成";
            }
    
    
            private Contract.IService.IDaoService _dao;
            public Contract.IService.IDaoService Dao
            {
                get
                {
                    if (_dao == null)
                    {
                        lock (new object())
                        {
                            if (_dao == null)
                            {
                                _dao = (Contract.IService.IDaoService)ctx["DaoService"];
                            }
                        }
                    }
                    return _dao;
                }
            }
            private ISession _session;
            public ISession session
            {
                get
                {
                    if (_session == null)
                    {
                        lock (new object())
                        {
                            if (_session == null)
                                _session = Dao.GetSession();
                        }
                    }
                    return _session;
                }
            }
    
            private List<string> GET_TH_OR_TD_LIST(SearchRange range, string row)
            {
                string tmp = "";
                tmp = range.ToString();
                string tdPatern = $@"(?<=(<{tmp}[^>]*?>))(?<tdCell>[sS]*?)(?=</{tmp}>)";
                MatchCollection CurrenttdMatchCollection = Regex.Matches(row, tdPatern);
                string td = string.Empty;
                List<string> tdlList = new List<string>();
                List<string> contentList = new List<string>();
                foreach (Match match in CurrenttdMatchCollection)
                {
    
                    td = match.Groups["tdCell"].Value;
                    contentList.Add(td);
    
                }
                return contentList;
    
            }
            public object RateThreadCompeleted(object currentThread)
            {
                SubThread CallThread = currentThread as SubThread;
    
                SubThread t11 = new SubThread(Guid.NewGuid().ToString(), (a, b) =>
                {
                    Monitor.Enter(table);
                    if (table.Rows.Count < 100)
                        table.Rows.Add((DataRow)(CallThread.ReturnObj));
                    if (table.Rows.Count == 100)
                    {
                        Holworth.Utility.HraUtility.DataTableWriteToServer(table, "BAS_FX_RATE_BAK3", "ID", true);
                        table.Clear();
                    }
                    Monitor.Exit(table);
                    return null;
    
                });
                t11.Start();
    
    
                return null;
            }
            protected void Page_Load(object sender, EventArgs e)
            {
                if (table == null)
                {
                    table = new System.Data.DataTable();
                    System.Threading.Monitor.Enter(table);
                    //没想到添加列的顺序对BUlkCopy有影响
                    table.Columns.Add(new System.Data.DataColumn() { ColumnName = "ID", DataType = typeof(string) });
                    table.Columns.Add(new System.Data.DataColumn() { ColumnName = "CURRENCY_PAIR", DataType = typeof(Int64) });
                    table.Columns.Add(new System.Data.DataColumn() { ColumnName = "FX_RATE", DataType = typeof(decimal) });
                    table.Columns.Add(new System.Data.DataColumn() { ColumnName = "CAPTURE_DATE", DataType = typeof(DateTime) });
                    System.Threading.Monitor.Exit(table);
                }
                string log = "";
                if (Request["_method"] == "getFxRate")
                {
                    info = new Framework.QueryInfo() { CustomSQL = "select * from BASE_CURRENCY_WEB" };
                    var Dao = (Contract.IService.IDaoService)ctx["DaoService"];
                    dic = Dao.FindList<BasCurrencyPair>(new QueryInfo() { QueryObject = "BasCurrencyPair" }).ToDictionary(x => x.Id);
                    var ds = Dao.ExcuteDataSet(info);
                    var start = DateTime.Parse(Request["start"]);
                    var end = DateTime.Parse(Request["end"]);
                    if (string.IsNullOrEmpty(Request["start"]) || string.IsNullOrEmpty(Request["end"]))
                    {
                        throw new Exception(":输个日期,我的哥!");
                    }
                    SubThread thread = new SubThread("1");
                    //多线程的最大并发数
                    int maxPoolThread = 200;
                    int totalThreadNum = 0;
                    //当前正在运行的线程
                    var runingHt = new Dictionary<int, SubThread>();
                    //处于等待队列的未运行的线程
                    var unRunHt = new Dictionary<int, SubThread>();
                    int k = 0;
                    string url = "";
    
                    //var code = row[""].ToString();
    
                    for (DateTime t1 = start; t1 <= end; t1 = t1.AddDays(1))
                    {
                        foreach (DataRow row in ds.Tables[0].Rows)
                        {
                            var pairId = row["CURRENCY_PAIR_ID"].ToString();
                            url = string.Format("http://srh.bankofchina.com/search/whpj/search.jsp?erectDate={0}&nothing={1}&pjname={2}", t1.ToString("yyyy-MM-dd"), t1.ToString("yyyy-MM-dd"), row["WEB_CODE"]);
                            var param = new ThreadParameters() { pairId = pairId, Url = url };
                            k++;
                            param.pageIndex = 1;
                            param.start = t1.ToString("yyyy-MM-dd");
                            param.webCode = row["WEB_CODE"].ToString();
                            SubThread th = new SubThread(k.ToString());
    
                            th.ThreadAction = (currentThread,inputParameter) =>
                            {
                                param.pageIndex = 1;
                                InitThread(param, th);
                                return "";
                            };
                            th.ThreadCompeleted += RateThreadCompeleted;
                            if (k <= maxPoolThread)
                            {
                                runingHt.Add(k, th);
                                th.Start();
                            }
                            else
                            {
                                unRunHt.Add(k, th);
                            }
    
    
                        }
                    }
                    while (true)
                    {
                        Thread.Sleep(500);
                        //初始化完成队列,用于存取已经执行完的线程的id
                        var stepFinishList = new List<int>();
    
                        //将完成的线程放入完成队列
                        foreach (int tid in runingHt.Keys)
                        {
                            var t = runingHt[tid];
                            if (t.IsStopped)
                            {
                                t.Dispose();
                                if (t.ReturnObj == null)
                                {
                                    log += "
    " + "无数据" + "
    ";
                                }
    
                                //回调处理
    
                                stepFinishList.Add(tid);
                            }
    
                        }
                        System.Threading.Thread.Sleep(1000);
                        //1.遍历完成队列,从当前运行的线程队列中移除该线程
                        //2.对完成的线程执行回调,将数据持久化到数据库
                        //3.如果等待队列中还有数据,获取等待队列中的第一个,并执行该线程,将该线程从等待队列移除,加入到运行队列
    
                        foreach (int tid in stepFinishList)
                        {
    
                            runingHt.Remove(tid);
                            if (unRunHt.Count > 0)
                            {
                                SubThread unRunThread = unRunHt.First().Value;
                                var unRunTid = unRunHt.First().Key;
                                unRunThread.Start();
                                runingHt.Add(unRunTid, unRunThread);
                                unRunHt.Remove(unRunTid);
                            }
                        }
                        //所有线程都完成后,跳出循环
                        if (runingHt.Count == 0 && unRunHt.Count == 0)
                        {
                            break;
                        }
    
                    }
                    //Dao.SaveOrUpdateAll(contentList);
                    Holworth.Utility.HraUtility.DataTableWriteToServer(table, "BAS_FX_RATE_BAK3", "ID", true);
    
    
                }
    
            }
    
        }
    
    
    }
    3.subthread.cs
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    
    namespace ThreadTemplate
    {
        public class SubThread : IDisposable
        {
            //线程锁
            public readonly object _locker = new object();
            /// <summary>
            /// 当前线程类的一个字典集合,可用于传递数据,属性辅助
            /// </summary>
            public IDictionary<object, object> DataDictionary { get; set; }
            /// <summary>
            /// 可用作theadstart的传入参数
            /// </summary>
            public dynamic InputParameter { get; set; }
            /// <summary>
            /// 第一个object对象是subthread本身,第二个是输入参数,第三个是执行委托函数的返回值
            /// </summary>
            public Func<object, object, object> ThreadAction { get; set; }
            //线程完成后的回调
            /// <summary>
            /// 第一个参数是subthread本身,第二个参数是委托的返回值
            /// </summary>
            public Func<object, object> ThreadCompeleted { get; set; }
            private Thread thdSubThread = null;
            //互斥信号量 
            //互斥锁(Mutex)
            //互斥锁是一个互斥的同步对象,意味着同一时间有且仅有一个线程可以获取它。
            //互斥锁可适用于一个共享资源每次只能被一个线程访问的情况
            public Mutex mUnique = new Mutex();
            private bool blnIsStopped;
            private bool blnSuspended;
            private bool blnStarted;
            private string threadId;
            public string ThreadId { get { return threadId; } set { threadId = value; } }
            private object _returnObj;
            public bool IsStopped
            {
                get { return blnIsStopped; }
            }
            public bool IsSuspended
            {
                get { return blnSuspended; }
            }
            public object ReturnObj
            {
                get { return _returnObj; }
                set { _returnObj = value; }
    
            }
    
            //阻塞,等待信号再解除阻塞
            public bool Wait()
            {
                lock (_locker)
                {
                    //调用该方法进行阻塞,当有脉冲信号通知线程可以停止了,才结束阻塞
                    while (!blnIsStopped) { Monitor.Wait(_locker); }
                }
      GC.SuppressFinalize(this);
    return true; } /// <summary> /// 子线程构造函数 /// </summary> /// <param name="key">必须给,为了自己控制得到线程</param> /// <param name="threadStart">第一个参数subthread,第二个输入参数,第三个返回值</param> /// <param name="threadParamter">可以在 构造函数给,也可以通过赋值属性给</param> public SubThread(string key, Func<object, object, object> threadStart = null, dynamic threadParamter = null) { threadId = key; blnIsStopped = true; blnSuspended = false; blnStarted = false; if (threadStart != null) { ThreadAction = threadStart; } } /// <summary> /// Start sub-thread 线程启动 /// </summary> public void Start() { if (!blnStarted) { thdSubThread = new Thread(ExecuteAction); blnIsStopped = false; blnStarted = true; thdSubThread.Start(); } } /// <summary> /// Thread entry function 线程执行方法,从网站中用正则表达式,抓取需要的数据 /// </summary> /// private void ExecuteAction() { //线程动作 ThreadAction?.Invoke(this, InputParameter); //线程完成回调,这里直接运行 ThreadCompeleted?.Invoke(this); //通知主线程,任务完成内部阻塞,通过脉冲通知解除锁定 this.Stop(); } /// <summary> /// Suspend sub-thread /// </summary> public void Suspend() { if (blnStarted && !blnSuspended) { blnSuspended = true; mUnique.WaitOne(); } } /// <summary> /// Resume sub-thread /// </summary> public void Resume() { if (blnStarted && blnSuspended) { blnSuspended = false; mUnique.ReleaseMutex(); } } /// <summary> /// Stop sub-thread /// </summary> public void Stop() { if (blnStarted) { if (blnSuspended) Resume(); blnStarted = false; blnIsStopped = true; //thdSubThread.Join(); } lock (_locker) { blnStarted = false; blnIsStopped = true; //运行完成,使用脉冲通知锁定代码可以放行了 Monitor.PulseAll(_locker); } } #region IDisposable Members /// <summary> /// Class resources dispose here /// </summary> public void Dispose() { // TODO: Add clsSubThread.Dispose implementation Stop();//Stop thread first GC.SuppressFinalize(this); } #endregion } } 4.bulkcopy public static void DataTableWriteToServer(DataTable dt, string targetTable, string IdField = "ID", bool NeedId = false) { var Dao = GetDao(); QueryInfo info = new QueryInfo(); DataTable table = null; Int64 increId = 0; #region 获取序列的当前值 lock (new object()) { if (NeedId) { QueryInfo searchInfo = new QueryInfo(); searchInfo.CustomSQL = "select HIBERNATE_SEQUENCE.NEXTVAL from dual"; table = Dao.ExecuteDataSet(searchInfo).Tables[0]; increId = Convert.ToInt64(table.Rows[0][0].ToString()); #endregion QueryInfo tmpInfo = new QueryInfo(); tmpInfo.CustomSQL = "drop sequence HIBERNATE_SEQUENCE"; Dao.ExecuteNonQuery(tmpInfo); tmpInfo = new QueryInfo(); tmpInfo.CustomSQL = "CREATE SEQUENCE HIBERNATE_SEQUENCE START WITH " + (increId + 1 + dt.Rows.Count).ToString(); Dao.ExecuteNonQuery(tmpInfo); for (int i = 0; i < dt.Rows.Count; i++) { var row = dt.Rows[i]; increId++; row[IdField] = increId.ToString(); } } string connOrcleString = System.Configuration.ConfigurationManager.ConnectionStrings["ConnectionString"].ConnectionString; OracleConnection conn = new OracleConnection(connOrcleString); OracleBulkCopy bulkCopy = new OracleBulkCopy(connOrcleString, OracleBulkCopyOptions.UseInternalTransaction); bulkCopy.BulkCopyTimeout = 260 * 1000; bulkCopy.DestinationTableName = targetTable; //服务器上目标表的名称 bulkCopy.BatchSize = 5000; //每一批次中的行数 try { conn.Open(); if (dt != null && dt.Rows.Count != 0) bulkCopy.WriteToServer(dt); //将提供的数据源中的所有行复制到目标表中 } catch (Exception ex) { throw ex; } finally { if (bulkCopy != null) bulkCopy.Close(); } } } 5.信号通知 private void ComputeData() { try { QueryInfo info1 = new QueryInfo(); info1.CustomSQL = "delete from SysComputerStatus where CreateUid=:uid"; info1.Parameters.Add("uid", CurrentUser.UserId); Dao.ExecuteUpdate(info1); SubThread sub = new SubThread("RskBookComputeResultManage3_RiskValueComputeData"); sub.InputParameter = new { terminationStep = int.Parse(Request["terminationStep"]), riskBookId = int.Parse(Request["riskBookId"]??"4513"), computeDate = DateTime.Parse(Request["computeDate"]) }; sub.ThreadAction = (a,b) => { RiskValueComputeService2.SaveCompute2(sub.InputParameter.computeDate, sub.InputParameter.riskBookId, sub.InputParameter.terminationStep); return null; }; sub.Start(); //阻塞知道任务完成 if(sub.Wait()) { //if (sub.IsStopped) { Response.Write(Newtonsoft.Json.JsonConvert.SerializeObject(new {msg="计算成功" })); //break; } } } catch (Exception ex) { var errorObj = new { ErrorCode = "-999", error = ex.Message }; Response.Write(Newtonsoft.Json.JsonConvert.SerializeObject(errorObj)); throw; } finally { Response.End(); } }
  • 相关阅读:
    Codeforces Round #578 (Div. 2)
    Educational Codeforces Round 70
    Codeforces Round #576 (Div. 1)
    The 2019 ICPC China Nanchang National Invitational and International Silk-Road Programming Contest
    Educational Codeforces Round 69
    Codeforces Global Round 4
    Codeforces Round #574 (Div. 2)
    Educational Codeforces Round 68
    Codeforces Round #573 (Div. 1)
    The Preliminary Contest for ICPC China Nanchang National Invitational
  • 原文地址:https://www.cnblogs.com/kexb/p/6045178.html
Copyright © 2011-2022 走看看