zoukankan      html  css  js  c++  java
  • C# 多线程与高并发处理并且具备暂停、继续、停止功能

    复制代码
    --近期有一个需要运用多线程的项目,会有并发概率,所以写了一份代码,可能有写地方还不完善,后续有需求在改
    1 /// <summary> 2 /// 并发对象 3 /// </summary> 4 public class MeterAsyncQueue 5 { 6 public MeterAsyncQueue() 7 { 8 MeterInfoTask = new MeterInfo(); 9 } 10 11 public MeterInfo MeterInfoTask { get; set; } 12 } 13 public class MeterInfo 14 { 15 public MeterInfo() 16 { 17 18 } 19 public int Id { get; set; } 20 21 }
    复制代码
    复制代码
      1     /// <summary>
      2     /// 线程通用类
      3     /// </summary>
      4     public class TaskCommand
      5     {
      6         CancellationTokenSource tokenSource = new CancellationTokenSource();
      7         ManualResetEvent resetEvent = new ManualResetEvent(true);
      8         Thread thread = null;
      9         /// <summary>
     10         /// 开始任务
     11         /// </summary>
     12         public void StartData()
     13         {
     14             tokenSource = new CancellationTokenSource();
     15             resetEvent = new ManualResetEvent(true);
     16 
     17             List<int> Ids = new List<int>();
     18             for (int i = 0; i < 10000; i++)
     19             {
     20                 Ids.Add(i);
     21             }
     22             thread = new Thread(new ThreadStart(() => StartTask(Ids)));
     23             thread.Start();
     24         }
     25         /// <summary>
     26         /// 暂停任务
     27         /// </summary>
     28         public void OutData()
     29         {
     30             //task暂停
     31             resetEvent.Reset();
     32         }
     33         /// <summary>
     34         /// 继续任务
     35         /// </summary>
     36         public void ContinueData()
     37         {
     38             //task继续
     39             resetEvent.Set();
     40         }
     41         /// <summary>
     42         /// 取消任务
     43         /// </summary>
     44         public void Cancel()
     45         {
     46             //释放对象
     47             resetEvent.Dispose();
     48             foreach (var CurrentTask in ParallelTasks)
     49             {
     50                 if (CurrentTask != null)
     51                 {
     52                     if (CurrentTask.Status == TaskStatus.Running) { }
     53                     {
     54                         //终止task线程
     55                         tokenSource.Cancel();
     56                     }
     57                 }
     58             }
     59             thread.Abort();
     60         }
     61         /// <summary>
     62         /// 执行数据
     63         /// </summary>
     64         /// <param name="Index"></param>
     65         public void Execute(int Index)
     66         {
     67             //阻止当前线程
     68             resetEvent.WaitOne();
     69 
     70             Console.WriteLine("当前第" + Index + "个线程");
     71 
     72             Thread.Sleep(1000);
     73 
     74         }
     75         //队列对象
     76         private Queue<MeterAsyncQueue> AsyncQueues { get; set; }
     77 
     78         /// <summary>
     79         /// 并发任务数
     80         /// </summary>
     81         private int ParallelTaskCount { get; set; }
     82 
     83 
     84         /// <summary>
     85         /// 并行任务集合
     86         /// </summary>
     87         private List<Task> ParallelTasks { get; set; }
     88         //控制线程并行数量
     89         public void StartTask(List<int> Ids)
     90         {
     91             IsInitTask = true;
     92             ParallelTasks = new List<Task>();
     93             AsyncQueues = new Queue<MeterAsyncQueue>();
     94             //获取并发数
     95             ParallelTaskCount = 5;
     96 
     97             //初始化异步队列
     98             InitAsyncQueue(Ids);
     99             //开始执行队列任务
    100             HandlingTask();
    101 
    102             Task.WaitAll(new Task[] { Task.WhenAll(ParallelTasks.ToArray()) });
    103         }
    104         /// <summary>
    105         /// 初始化异步队列
    106         /// </summary>
    107         private void InitAsyncQueue(List<int> Ids)
    108         {
    109             foreach (var item in Ids)
    110             {
    111                 MeterInfo info = new MeterInfo();
    112                 info.Id = item;
    113                 AsyncQueues.Enqueue(new MeterAsyncQueue()
    114                 {
    115                     MeterInfoTask = info
    116                 });
    117             }
    118         }
    119         /// <summary>
    120         /// 是否首次执行任务
    121         /// </summary>
    122         private bool IsInitTask { get; set; }
    123         //124         private readonly object _objLock = new object();
    125 
    126         /// <summary>
    127         /// 开始执行队列任务
    128         /// </summary>
    129         private void HandlingTask()
    130         {
    131             lock (_objLock)
    132             {
    133                 if (AsyncQueues.Count <= 0)
    134                 {
    135                     return;
    136                 }
    137 
    138                 var loopCount = GetAvailableTaskCount();
    139                 //并发处理队列
    140                 for (int i = 0; i < loopCount; i++)
    141                 {
    142                     HandlingQueue();
    143                 }
    144                 IsInitTask = false;
    145             }
    146         }
    147         /// <summary>
    148         /// 获取队列锁
    149         /// </summary>
    150         private readonly object _queueLock = new object();
    151 
    152         /// <summary>
    153         /// 处理队列
    154         /// </summary>
    155         private void HandlingQueue()
    156         {
    157             CancellationToken token = tokenSource.Token;
    158             lock (_queueLock)
    159             {
    160                 if (AsyncQueues.Count > 0)
    161                 {
    162                     var asyncQueue = AsyncQueues.Dequeue();
    163 
    164                     if (asyncQueue == null) return;
    165                     var task = Task.Factory.StartNew(() =>
    166                     {
    167                         if (token.IsCancellationRequested)
    168                         {
    169                             return;
    170                         }
    171                         //阻止当前线程
    172                         resetEvent.WaitOne();
    173                         //执行任务
    174                         Execute(asyncQueue.MeterInfoTask.Id);
    175 
    176                     }, token).ContinueWith(t =>
    177                     {
    178                         HandlingTask();
    179                     }, TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously);
    180                     ParallelTasks.Add(task);
    181                 }
    182             }
    183         }
    184         /// <summary>
    185         /// 获取当前有效并行的任务数
    186         /// </summary>
    187         /// <returns></returns>
    188         [MethodImpl(MethodImplOptions.Synchronized)]
    189         private int GetAvailableTaskCount()
    190         {
    191             if (IsInitTask)
    192                 return ParallelTaskCount;
    193             return 1;
    194         }
    195     }
    复制代码
  • 相关阅读:
    定义函数
    变量与常量
    字符串与格式化
    字符串与编码
    字符编码
    元组-tuple
    列表-list
    分支和循环
    润乾配置连接kingbase(金仓)数据库
    润乾报表在proxool应用下的数据源配置
  • 原文地址:https://www.cnblogs.com/mengcheng9300/p/11613232.html
Copyright © 2011-2022 走看看