zoukankan      html  css  js  c++  java
  • TWX 比较好的多线程使用实例

    using Newtonsoft.Json;
    using System;
    using System.Collections.Generic;
    using System.Configuration;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using TRS.Export.BLL;
    using TRS.Export.Business;
    using TRS.Export.Common;
    using TRS.Export.Constant;
    using TRS.Export.Entity;
    using TRS.Export.FrameEntity.Enums;
    using TRS.Export.FrameProvider;
    using TRS.Export.Param.Bases;
    using TRS.Export.Param.Enums;
    using TRS.Export.Scheduler.Interfaces;

    namespace TRS.Export.Scheduler.Schedulers.Business
    {
    public class BusinessTaobaoEventScheduler : IScheduler
    {
    private readonly TBD_EventLogBLL m_objEventLogBLL = new TBD_EventLogBLL();

    private readonly TaobaoEventBusiness m_objEventBusiness = new TaobaoEventBusiness();

    private readonly BackupTableBusiness m_objBackupBusiness = new BackupTableBusiness();

    private readonly string m_HttpResolve = ConfigurationManager.AppSettings["HttpResolve"];

    private readonly string m_PCSReceiveCodes = ConfigurationManager.AppSettings["PCSReceiveCodes"];

    private const int top = 1000;

    private OrderByEnum orderByEnum = OrderByEnum.Asc;

    delegate void AsyncExecute(object obj);

    public int SleepInterval { get; set; }

    public string[] Args { get; set; }

    public BusinessTaobaoEventScheduler()
    {
    SleepInterval = 500;
    }
    //Business.BusinessTaobaoEventScheduler|LOGISTICS_DISPATCHED#后缀$0,1,2,3,4/5,6,7,8,9
    public void Execute()
    {
    if (Args == null || Args.Length == 0)
    {
    return;
    }
    //Args=LOGISTICS_DISPATCHED#后缀$0,1,2,3,4/5,6,7,8,9
    //eventType=LOGISTICS_DISPATCHED#后缀$0,1,2,3,4/5,6,7,8,9
    string[] eventType = Args[0].Split('|');
    if (eventType == null || eventType.Length == 0)
    {
    return;
    }

    if (eventType.Length == 1)
    {
    if (eventType[0].IndexOf("#") < 0)//以EventType分线程
    {
    ExecuteByEventType(eventType[0]);
    }
    else if (eventType[0].IndexOf("后缀") > -1)//以EventType+LogID最后一位分线程
    { //array=0,1,2,3,4/5,6,7,8,9
    string[] array = eventType[0].Substring(eventType[0].IndexOf("后缀") + 3).Split('/');
    for (int i = 0; i < array.Length; i++)
    {
    if (StaticConstant.ASYNC)
    {
    AsyncExecute async = new AsyncExecute(ExecuteBySuffix);

    async.BeginInvoke(new string[] { eventType[0].Split('#')[0], array[i] }, AsyncCallback, async);
    }
    else
    {
    Thread thread = new Thread(new ParameterizedThreadStart(ExecuteBySuffix));
    thread.Name = string.Format("报文解析后台Job-{0}", array[i]);
    thread.Start(new string[] { eventType[0].Split('#')[0], array[i] });
    }
    }
    }
    else if (eventType[0].IndexOf("的地") > -1)//以EventType+DestinationName分线程
    {
    string[] array = eventType[0].Substring(eventType[0].IndexOf("的地") + 3).Split('/');
    for (int i = 0; i < array.Length; i++)
    {
    if (StaticConstant.ASYNC)
    {
    AsyncExecute async = new AsyncExecute(ExecuteByDestinationName);

    async.BeginInvoke(new string[] { eventType[0].Split('#')[0], array[i] }, AsyncCallback, async);
    }
    else
    {
    Thread thread = new Thread(new ParameterizedThreadStart(ExecuteByDestinationName));
    thread.Name = string.Format("报文解析后台Job-{0}", eventType[0].Split('#')[0] + "-" + array[i]);
    thread.Start(new string[] { eventType[0].Split('#')[0], array[i] });
    }
    }
    }
    }
    else
    {
    for (int i = 0; i < eventType.Length; i++)//以EventType分线程
    {
    if (StaticConstant.ASYNC)
    {
    AsyncExecute async = new AsyncExecute(ExecuteByEventType);

    async.BeginInvoke(eventType[i], AsyncCallback, async);
    }
    else
    {
    Thread thread = new Thread(new ParameterizedThreadStart(ExecuteByEventType));
    thread.Name = string.Format("报文解析后台Job-{0}", eventType[i]);
    thread.Start(eventType[i]);
    }
    }
    }

    Console.ReadKey();
    }

    public void AsyncCallback(IAsyncResult result)
    {
    AsyncExecute async = (AsyncExecute)result.AsyncState;

    async.EndInvoke(result);
    }

    public void ExecuteByEventType(object obj)
    {
    while (true)
    {
    try
    {
    string threadName = string.Format("报文解析后台Job-{0}", obj);

    new ThreadMonitorBusiness().CallbackThread(threadName);

    WhereHelper<TBD_EventLog> where = new WhereHelper<TBD_EventLog>();
    where.ApeendAnd(a => a.SuccessFlag == 0 && a.ActionTimes < 5);
    where.ApeendAnd(a => a.EventType == obj.ToString());

    OrderByHelper<TBD_EventLog> orderBy = new OrderByHelper<TBD_EventLog>(a => a.ActionTime, orderByEnum);

    List<TBD_EventLog> lstEventLog = m_objEventLogBLL.Select(where, top, null, orderBy);
    if (lstEventLog == null || lstEventLog.Count == 0)
    {
    Console.WriteLine("当前线程:[{0}],没有数据,等待{1}秒后继续...{2}", threadName, 60, DateTime.Now);
    Thread.Sleep(60 * 1000);

    continue;
    }

    foreach (var item in lstEventLog)
    {
    Execute(item);

    Console.WriteLine("当前线程:[{0}],等待{1}秒后继续...{2}", threadName, 10 / 1000.00, DateTime.Now);
    Thread.Sleep(10);
    }

    Console.WriteLine("当前线程:[{0}],等待{1}秒后继续...{2}", threadName, SleepInterval / 1000, DateTime.Now);

    Thread.Sleep(SleepInterval);
    }
    catch (Exception ex)
    {
    new EmailBusiness().SendExceptionEmail(ex);
    }
    }
    }

    public void ExecuteByDestinationName(object obj)
    {
    while (true)
    {
    try
    {
    string[] array = obj as string[];
    if (array == null || array.Length < 2)
    {
    break;
    }

    string threadName = string.Format("报文解析后台Job-{0}-{1}", array[0], array[1]);

    new ThreadMonitorBusiness().CallbackThread(threadName);

    WhereHelper<TBD_EventLog> where = new WhereHelper<TBD_EventLog>();
    where.ApeendAnd(a => a.SuccessFlag == 0 && a.ActionTimes < 5);
    if (array.Length > 0)
    {
    where.ApeendAnd(a => a.EventType == array[0]);
    }
    if (array.Length > 1)
    {
    where.ApeendAnd(a => a.DestinationName.In(array[1].Split(',')));
    }

    OrderByHelper<TBD_EventLog> orderBy = new OrderByHelper<TBD_EventLog>(a => a.ActionTime, orderByEnum);

    List<TBD_EventLog> lstEventLog = m_objEventLogBLL.Select(where, top, null, orderBy);
    if (lstEventLog == null || lstEventLog.Count == 0)
    {
    Console.WriteLine("当前线程:[{0}],没有数据,等待{1}秒后继续...{2}", threadName, 60, DateTime.Now);
    Thread.Sleep(60 * 1000);

    continue;
    }

    foreach (var item in lstEventLog)
    {
    Execute(item);

    Console.WriteLine("当前线程:[{0}],等待{1}秒后继续...{2}", threadName, 10 / 1000.00, DateTime.Now);
    Thread.Sleep(10);
    }

    Console.WriteLine("当前线程:[{0}],等待{1}秒后继续...{2}", threadName, SleepInterval / 1000, DateTime.Now);

    }
    catch (Exception ex)
    {
    new EmailBusiness().SendExceptionEmail(ex);
    }
    }
    }

    public void ExecuteBySuffix(object obj)
    {
    while (true)
    {
    try
    {
    string[] array = obj as string[];
    if (array == null || array.Length < 2)
    {
    break;
    }

    string threadName = string.Format("报文解析后台Job-{0}-{1}", array[0], array[1]);

    new ThreadMonitorBusiness().CallbackThread(threadName);

    WhereHelper<TBD_EventLog> where = new WhereHelper<TBD_EventLog>();
    where.ApeendAnd(a => a.ActionTime < DateTime.Now);
    where.ApeendAnd(a => a.SuccessFlag == 0 && a.ActionTimes < 5);
    if (array.Length > 0)
    {
    where.ApeendAnd(a => a.EventType == array[0]);
    }
    if (array.Length > 1)
    {

    //防止并发处理的小措施
    where.ApeendAnd(a => a.LogID.Right(array[1].Split(',')));
    }

    OrderByHelper<TBD_EventLog> orderBy = new OrderByHelper<TBD_EventLog>(a => a.ActionTime, orderByEnum);

    List<TBD_EventLog> lstEventLog = m_objEventLogBLL.Select(where, top, null, orderBy);
    if (lstEventLog == null || lstEventLog.Count == 0)
    {
    Console.WriteLine("当前线程:[{0}],没有数据,等待{1}秒后继续...{2}", threadName, 60, DateTime.Now);
    Thread.Sleep(60 * 1000);

    continue;
    }

    foreach (var item in lstEventLog)
    {
    Execute(item);

    Console.WriteLine("当前线程:[{0}],等待{1}秒后继续...{2}", threadName, 10 / 1000.00, DateTime.Now);
    Thread.Sleep(10);
    }

    Console.WriteLine("当前线程:[{0}],等待{1}秒后继续...{2}", threadName, SleepInterval / 1000, DateTime.Now);

    }
    catch (Exception ex)
    {
    new EmailBusiness().SendExceptionEmail(ex);
    }
    }
    }

    public void Execute(TBD_EventLog eventLog)
    {
    System.Diagnostics.Stopwatch watch = new System.Diagnostics.Stopwatch();
    watch.Start();

    bool pcs = false;
    if (!m_PCSReceiveCodes.IsNullOrEmpty())
    {
    string[] codes = m_PCSReceiveCodes.Split('|');
    foreach (var code in codes)
    {
    if (eventLog.EventContent.IndexOf(code) > -1)
    {
    //pcs = true;
    }
    }
    }

    if (pcs)
    {
    string eventContent = eventLog.EventContent;
    string destinationName = StringHelper.GetValueByCutStr(ref eventContent, "<consoWarehouseCode>", "</consoWarehouseCode>", false, false);

    ExecuteSuccess(eventLog.LogID.Value, 1, destinationName);

    Console.WriteLine("当前订单为PCS:[{0}],等待{1}秒后继续...{2}", eventLog.TradeOrderId, 000 / 1000.00, DateTime.Now);

    return;
    }
    else
    {

    }

    ResponseParam response = null;

    if (!m_HttpResolve.IsNullOrEmpty())
    {
    string result = new HttpHelper().Execute(m_HttpResolve, "{"logId":" + eventLog.LogID + ","appKey":"trTaoBaoV02"}");
    if (!result.IsNullOrEmpty())
    {
    response = JsonConvert.DeserializeObject<ResponseParam>(result);
    }
    }
    else
    {
    response = m_objEventBusiness.TABAO_EVENT(eventLog);
    }

    watch.Stop();

    if (response == null || !response.success)
    {
    if (response.result == "2" && eventLog.ActionTimes.Value > 3)
    {
    ExecuteSuccess(eventLog.LogID.Value);
    }
    else
    {
    ExecuteFailure(eventLog.LogID.Value, eventLog.ActionTimes.Value, response.msg);
    }

    Console.WriteLine("交易订单:{0}分析失败!耗时:{1} {2} {3}", eventLog.TradeOrderId, watch.ElapsedMilliseconds / 1000.00, response == null ? "" : response.msg, DateTime.Now);
    }
    else
    {
    ExecuteSuccess(eventLog.LogID.Value);

    Console.WriteLine("交易订单:{0}分析成功!耗时:{1} {2} {3}", eventLog.TradeOrderId, watch.ElapsedMilliseconds / 1000.00, response == null ? "" : response.msg, DateTime.Now);
    }
    }

    public void ExecuteFailure(long logID, int actionTimes, string exception)
    {
    TBD_EventLog update = new TBD_EventLog();
    update.LogID = logID;
    update.ActionTimes = actionTimes + 1;
    update.Exception = exception;
    update.ActionTime = DateTime.Now;
    if (actionTimes >= 4)
    {
    update.SuccessFlag = -1;
    }

    m_objEventLogBLL.Update(update);
    }

    public void ExecuteSuccess(long logID, int it = 0, string destinationName = null)
    {
    TBD_EventLog update = new TBD_EventLog();
    update.LogID = logID;
    update.SuccessFlag = 1;
    update.ActionTime = DateTime.Now;
    update.IT = it;
    if (!destinationName.IsNullOrEmpty())
    {
    update.DestinationName = destinationName;
    }

    m_objEventLogBLL.Update(update);

    ResponseParam response = m_objBackupBusiness.BackupTable(update);
    if (response.success == false)
    {
    update = new TBD_EventLog();
    update.LogID = logID;
    update.UpdateTime = DateTime.Now;
    update.Exception = response.msg ?? "";

    m_objEventLogBLL.Update(update);
    }
    }
    }
    }

  • 相关阅读:
    获取指定日期相关DATENAME和DATEPART数据
    MySQL 5.7 新备份工具mysqlpump 使用说明
    Kubernetes之Pod控制器
    Python3出现"No module named 'MySQLdb'"问题-以及使用PyMySQL连接数据库
    分布式监控系统Zabbix3.4-针对MongoDB性能监控操作笔记
    基于MongodbDB的用户认证-运维笔记
    基于Nginx+Keepalived的LB服务监控(邮件报警)
    Linux系统用户密码规则
    通过容器提交镜像(docker commit)以及推送镜像(docker push)笔记
    Centos6.9下RocketMQ3.4.6高可用集群部署记录(双主双从+Nameserver+Console)
  • 原文地址:https://www.cnblogs.com/chengjun/p/9007559.html
Copyright © 2011-2022 走看看