zoukankan      html  css  js  c++  java
  • 多线程通用处理队列类(一)

    最近实现了一个支持多FTP,同时FTP要多线程上传的小项目,通过对线程池的动态管理,很好的解决了同一时刻允许最大线程和空闲线程时任务自动入队的的问题,贴出来给大家使用:

      1     /// <summary>
      2     /// 表示一个线程(同步)安全的通用泛型处理队列
      3     /// </summary>
      4     /// <typeparam name="T">ProcessQueue中包含的数据类型</typeparam>
      5     public class ProcessQueue<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
      6     {
      7         #region Instance Variables
      8 
      9         private object syncRoot = new object();
     10         private Queue<T> queue;
     11         private List<WorkerThread> threads = new List<WorkerThread>();
     12         private Action<T> operation;
     13         bool isDisposed;
     14         bool isRunning;
     15 
     16         private int _maxThreadCount;
     17 
     18         public int MaxThreadCount
     19         {
     20             get { return _maxThreadCount == 0 ? 5 : _maxThreadCount; }
     21             set { value = _maxThreadCount; }
     22         }
     23 
     24         #endregion
     25 
     26         #region Constructors
     27         /// <summary>
     28         /// 初始一个新的ProcessQueue 并指定具有特定容量的工作线程
     29         /// </summary>
     30         public ProcessQueue(Action<T> action) : this(action, 5) { }
     31         /// <summary>
     32         /// 初始一个新的ProcessQueue
     33         /// </summary>
     34         public ProcessQueue(int capacity, Action<T> action) : this(capacity, action, 5) { }
     35         /// <summary>
     36         /// 初始一个新的ProcessQueue
     37         /// </summary>
     38         public ProcessQueue(IEnumerable<T> collection, Action<T> action) : this(collection, action, 5) { }
     39 
     40         /// <summary>
     41         /// 初始一个新的ProcessQueue
     42         /// </summary>
     43         public ProcessQueue(Action<T> action, int threadCount)
     44         {
     45             queue = new Queue<T>();
     46             operation = action;
     47 
     48             SetThreadCount(MaxThreadCount);
     49         }
     50 
     51         /// <summary>
     52         /// 初始一个新的ProcessQueue
     53         /// </summary>
     54         /// <param name="capacity">初始容量</param>
     55         public ProcessQueue(int capacity, Action<T> action, int threadCount)
     56         {
     57             queue = new Queue<T>(capacity);
     58             operation = action;
     59 
     60             SetThreadCount(MaxThreadCount);
     61         }
     62 
     63         /// <summary>
     64         /// 初始一个新的ProcessQueue
     65         /// </summary>
     66         /// <param name="collection">将数据复制到ProcessQueue.</param>
     67         public ProcessQueue(IEnumerable<T> collection, Action<T> action, int threadCount)
     68         {
     69             queue = new Queue<T>(collection);
     70             operation = action;
     71 
     72             SetThreadCount(MaxThreadCount);
     73         }
     74         #endregion
     75 
     76         #region Processing Control
     77 
     78         /// <summary>
     79         /// 停止 (挂起) 
     80         /// </summary>
     81         public void Stop()
     82         {
     83             lock (syncRoot)
     84             {
     85                 foreach (WorkerThread thread in threads)
     86                 {
     87                     thread.Pause();
     88                 }
     89 
     90                 isRunning = false;
     91             }
     92         }
     93 
     94         /// <summary>
     95         /// 开始运行
     96         /// </summary>
     97         public void Start()
     98         {
     99             lock (syncRoot)
    100             {
    101                 //清空队列集合重新创建新的线程
    102                 RegenerateIfDisposed();
    103                 //如果新进的项目少于当前线程集合总的线程数,则创建当前新进的项目数
    104                 //如果新进 的项目多余当前线程集合的线程数,则创建同样多的数程集合的线程数
    105                 for (int i = 0; i < Math.Min(threads.Count, queue.Count); i++)
    106                 {
    107                     //设置信号让其运行
    108                     threads[i].Signal();
    109                 }
    110                 isRunning = true;
    111             }
    112         }
    113 
    114         /// <summary>
    115         /// 获取此ProcessQueue使用的工作线程数。 使用SetThreadCount更改此值。
    116         /// </summary>
    117         public int ThreadCount { get { return threads.Count; } }
    118 
    119         /// <summary>
    120         /// 设置此ProcessQueue使用的工作线程数,并根据需要分配或释放线程。
    121         /// </summary>
    122         /// <param name="threadCount">线程数</param>
    123         public void SetThreadCount(int threadCount)
    124         {
    125             //至少要有一个线程
    126             if (threadCount < 1) throw new ArgumentOutOfRangeException("threadCount", "The ProcessQueue class requires at least one worker thread.");
    127             //同步线程
    128             lock (syncRoot)
    129             {
    130                 // 等待队列
    131                 int pending = queue.Count;
    132                 // 创建一个指定最大工作线程数的线程集合,每个线程用来处理排队的项目
    133                 for (int i = threads.Count; i < threadCount; i++) 
    134                 {
    135                     //注意:在实例化工作线程WorkerThread 时,已经创建了一个ThreadProc 无限循环方法,改方法检测 signalEvent, abortEvent 信号
    136                     //在收到 abortEvent 时终止并退出,收到 Signal时 循环调用 ProcessItems() 用来处理队列里排队的项目
    137                     WorkerThread thread = new ProcessQueue<T>.WorkerThread(this);
    138                     //添加到列表
    139                     threads.Add(thread);
    140                     //线程启动
    141                     thread.Start();
    142                     //如果队列总有待排项目时
    143                     if (pending > 1)
    144                     {
    145                         //设置信号,让当前工作线程运行(不等待)
    146                         thread.Signal();
    147                     }
    148                     //待排队数减一
    149                     pending--;
    150                 }
    151 
    152                 //如果其它线程调用了SetThreadCount,或者多次调用了 SetThreadCount,从而导致当前实际的线程集合有可能远远大于最大线程数
    153                 //在这种情况下,需要移除多余的线程,从而保证当前threadCount有效
    154                 //移除的线程数 = 当前创建的工作线程集合总数 - 设置的最大线程数
    155                 int toRemove = threads.Count - threadCount;
    156                 if (toRemove > 0)
    157                 {
    158                     //IsSignaled 如果当前实例收到信号,则为 true;否则为 false
    159                     //从线程集合里取出正在等待的线程
    160                     foreach (WorkerThread thread in threads.Where(t => !t.IsSignaled).ToList())
    161                     {
    162                         //设置信号使得该线程终止
    163                         thread.Abort();
    164                         //从集合中移除改项
    165                         threads.Remove(thread);
    166                         //移除数减一
    167                         toRemove--;
    168                     }
    169                     //如果待移除的线程正在运行中
    170                     //则强制移除该线程直到移除完为止
    171                     while (toRemove > 0)
    172                     {
    173                         WorkerThread thread = threads[threads.Count - 1];
    174                         thread.Abort();
    175                         threads.Remove(thread);
    176                         toRemove--;
    177                     }
    178                 }
    179             }
    180         }
    181 
    182         /// <summary>
    183         /// 处理队列项
    184         /// </summary>
    185         /// <param name="item"></param>
    186         private void ProcessItem(T item)
    187         {
    188             operation(item);
    189         }
    190 
    191         /// <summary>
    192         /// 释放时重置线程
    193         /// </summary>
    194         private void RegenerateIfDisposed()
    195         {
    196             if (isDisposed)
    197             {
    198                 int threadCount = threads.Count;
    199 
    200                 threads.Clear();
    201 
    202                 SetThreadCount(threadCount);
    203             }
    204 
    205             isDisposed = false;
    206         }
    207         #endregion
    208 
    209         /// <summary>
    210         /// 从ProcessQueue清除所有未处理的项目
    211         /// </summary>
    212         public void Clear()
    213         {
    214             lock (syncRoot)
    215             {
    216                 queue.Clear();
    217             }
    218         }
    219 
    220         /// <summary>
    221         /// 尝试从ProcessQueue中检索获取下一个项目(如果存在)。 如果不存在,则将值设置为其默认值。
    222         /// </summary>
    223         /// <param name="value">如果不存在,则该变量将被设置为默认值.</param>
    224         /// <returns>如果ProcessQueue包含一个项,则为真,如果没有则为False</returns>
    225         public bool TryDequeue(out T value)
    226         {
    227             lock (syncRoot)
    228             {
    229                 if (queue.Count > 0)
    230                 {
    231                     value = queue.Dequeue();
    232 
    233                     return true;
    234                 }
    235                 else
    236                 {
    237                     value = default(T);
    238 
    239                     return false;
    240                 }
    241             }
    242         }
    243 
    244         /// <summary>
    245         /// 确定队列是否包含指定项
    246         /// </summary>
    247         /// <param name="item">当前项</param>
    248         /// <returns>存在则 True, 不存在则 False</returns>
    249         public bool Contains(T item)
    250         {
    251             lock (syncRoot)
    252             {
    253                 return queue.Contains(item);
    254             }
    255         }
    256 
    257         /// <summary>
    258         /// 将ProcessQueue的内容复制到外部数组,而不影响ProcessQueue的内容
    259         /// </summary>
    260         /// <param name="array">The array to copy the items into</param>
    261         /// <param name="arrayIndex">The starting index in the array</param>
    262         public void CopyTo(T[] array, int arrayIndex)
    263         {
    264             lock (syncRoot)
    265             {
    266                 queue.CopyTo(array, arrayIndex);
    267             }
    268         }
    269 
    270         /// <summary>
    271         /// 从ProcessQueue中检索下一个未处理的项目并将其删除
    272         /// </summary>
    273         /// <returns>The next unprocessed item in the ProcessQueue</returns>
    274         public T Dequeue()
    275         {
    276             lock (syncRoot)
    277             {
    278                 return queue.Dequeue();
    279             }
    280         }
    281 
    282         /// <summary>
    283         /// 将一个项目添加到处理队列的末尾
    284         /// </summary>
    285         /// <param name="item">添加项</param>
    286         public void Enqueue(T item)
    287         {
    288             lock (syncRoot)
    289             {
    290                 //新进队列项
    291                 queue.Enqueue(item);
    292                 //当前处理队列正在运行时
    293                 if (isRunning)
    294                 {
    295                     //清空队列集合重新创建新的线程
    296                     RegenerateIfDisposed();
    297                     //取出一个等待的线程
    298                     WorkerThread firstThread = threads.Where(t => !t.IsSignaled).FirstOrDefault();
    299                     //存在则运行它
    300                     if (firstThread != null) firstThread.Signal();
    301                 }
    302             }
    303         }
    304 
    305         /// <summary>
    306         /// 从ProcessQueue中检索下一个未处理的项目,而不删除它
    307         /// </summary>
    308         /// <returns>The next unprocessed item in the ProcessQueue</returns>
    309         public T Peek()
    310         {
    311             lock (syncRoot)
    312             {
    313                 return queue.Peek();
    314             }
    315         }
    316 
    317         /// <summary>
    318         /// 返回一个包含ProcessQueue中所有未处理项目的数组
    319         /// </summary>
    320         /// <returns></returns>
    321         public T[] ToArray()
    322         {
    323             lock (syncRoot)
    324             {
    325                 return queue.ToArray();
    326             }
    327         }
    328 
    329         /// <summary>
    330         /// 将ProcessQueue的容量设置为其包含的项目的实际数量,除非该数量超过当前容量的90%。
    331         /// </summary>
    332         public void TrimExcess()
    333         {
    334             lock (syncRoot)
    335             {
    336                 queue.TrimExcess();
    337             }
    338         }
    339 
    340         #region IEnumerable<T> Members
    341 
    342         public IEnumerator<T> GetEnumerator()
    343         {
    344             return queue.GetEnumerator();
    345         }
    346 
    347         #endregion
    348 
    349         #region IEnumerable Members
    350 
    351         IEnumerator IEnumerable.GetEnumerator()
    352         {
    353             return queue.GetEnumerator();
    354         }
    355 
    356         #endregion
    357 
    358         #region ICollection Members
    359 
    360         void ICollection.CopyTo(Array array, int index)
    361         {
    362             lock (syncRoot)
    363             {
    364                 ((ICollection)queue).CopyTo(array, index);
    365             }
    366         }
    367 
    368         public int Count
    369         {
    370             get
    371             {
    372                 lock (syncRoot)
    373                 {
    374                     return queue.Count;
    375                 }
    376             }
    377         }
    378 
    379         bool ICollection.IsSynchronized
    380         {
    381             get { return true; }
    382         }
    383 
    384         object ICollection.SyncRoot
    385         {
    386             get { return syncRoot; }
    387         }
    388 
    389         #endregion
    390 
    391         #region IDisposable Members
    392 
    393         public void Dispose()
    394         {
    395             Dispose(true);
    396         }
    397 
    398         private void Dispose(bool disposing)
    399         {
    400             if (disposing)
    401             {
    402                 foreach (WorkerThread thread in threads) thread.Abort();
    403             }
    404 
    405             isDisposed = true;
    406         }
    407 
    408         #endregion
    409 
    410         /// <summary>
    411         /// 封装.NET Thread对象并管理与控制其行为相关联的WaitHandles
    412         /// </summary>
    413         private class WorkerThread
    414         {
    415             private ManualResetEvent abortEvent;
    416             private ManualResetEvent signalEvent;
    417             private ProcessQueue<T> queue;
    418 
    419             private Thread thread;
    420 
    421             public WorkerThread(ProcessQueue<T> queue)
    422             {
    423                 abortEvent = new ManualResetEvent(false);
    424                 signalEvent = new ManualResetEvent(false);
    425                 this.queue = queue;
    426 
    427                 thread = new Thread(ThreadProc);
    428                 thread.Name = "ProcessQueue Worker ID " + thread.ManagedThreadId;
    429             }
    430 
    431             /// <summary>
    432             /// 运行当前线程
    433             /// </summary>
    434             public void Start()
    435             {
    436                 thread.Start();
    437             }
    438 
    439             /// <summary>
    440             /// 终止当前线程
    441             /// </summary>
    442             public void Abort()
    443             {
    444                 abortEvent.Set();
    445 
    446                 thread.Join();
    447             }
    448 
    449             /// <summary>
    450             /// 清除信号WaitHandle,导致线程完成当前的迭代后暂停
    451             /// </summary>
    452             public void Pause()
    453             {
    454                 signalEvent.Reset();
    455             }
    456 
    457             /// <summary>
    458             /// 设置信号WaitHandle,等待线程使其恢复运行(如果暂停)
    459             /// </summary>
    460             public void Signal()
    461             {
    462                 signalEvent.Set();
    463             }
    464 
    465             public bool IsSignaled
    466             {
    467                 get { return signalEvent.WaitOne(0); }
    468             }
    469 
    470             /// <summary>
    471             /// ThreadProc 总线程方法由一个无限循环组成,在发出中止事件时退出
    472             /// </summary>
    473             private void ThreadProc()
    474             {
    475                 WaitHandle[] handles = new WaitHandle[] { signalEvent, abortEvent };
    476 
    477                 while (true)
    478                 {
    479                     //等待指定数组中的任一元素收到信号
    480                     switch (WaitHandle.WaitAny(handles))
    481                     {
    482                         case 0: // signal
    483                             {
    484                                 ProcessItems();
    485                             }
    486                             break;
    487                         case 1: // abort
    488                             {
    489                                 return;
    490                             }
    491                     }
    492                 }
    493             }
    494 
    495             /// <summary>
    496             /// 处理项目
    497             /// </summary>
    498             private void ProcessItems()
    499             {
    500                 T item;
    501                 //从队列中取出一项,这是一个同步的过程
    502                 while (queue.TryDequeue(out item))
    503                 {
    504                     //处理队列项
    505                     queue.ProcessItem(item);
    506                     //如果当前实例收到信号,则为 true;否则为 false。
    507                     //等待当前队列完成 在 调用 signalEvent.Set() 或者 abortEvent.Set() 时 
    508                     if (!signalEvent.WaitOne(0) || abortEvent.WaitOne(0)) return;
    509                 }
    510                 //线程状态设置为非终止状态
    511                 signalEvent.Reset();
    512             }
    513         }
    514     }

    使用示例:

    public void UpLoadFile(TaskFile task)
    {
          OnProcessLog("Thread:" + Thread.CurrentThread.ManagedThreadId);
          OnProcessLog("Uploading: " + FTPName + "://" + IP + "/" + task.DIR);
          OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = 0, uploadStatus = UploadStatus.Uploading });
          for (int i = 1; i <= 100; i++)
          {
              OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = i, uploadStatus = UploadStatus.Uploading });
              Thread.Sleep(GenerateRandomInteger(100,200));
          }
          OnProcessLog("File Uploaded Successfully: " + FTPName + "://" + IP + "/" + task.DIR);
          OnCompleted(task, new CompetedEventArgs() { CompetedPrecent = 0, uploadStatus = UploadStatus.Completed });
    }
    
    //模拟
    var processQueue = new ProcessQueue<TaskFile>(UpLoadFile);
    processQueue.Start();
  • 相关阅读:
    webStorm Ctrl+s 自动格式化 然后 保存 用宏命令
    vuez init webStorm vscode
    Node + Express + MySQL 接口开发完整案例
    sysUpload.vue上传组件 的时候 看进度的时候 不要mock 注释掉 // if (process.env.NODE_ENV !== 'production') require('@/mock')
    目录下 shift 右键菜单 打开cmd 或者在 地址栏输入cmd 回车进入cmd
    xls表格 拼接字段 拼json =CONCAT("{ code:'",A2,"',","codeName: '",B2,"',","flag: '",C2,"'},")
    js 数组过滤 filter
    libs/tools.js stringToDate dateToString 日期字符串转换函数
    vue 组件内 directives指令的调用方式 <base-table v-auto-height:tableHeight="{vm:this, diffHeight:ahTable.diffHeight}"
    油猴 tamperMonkey 在百度首页 添加自己的自定义链接
  • 原文地址:https://www.cnblogs.com/mschen/p/7771453.html
Copyright © 2011-2022 走看看