zoukankan      html  css  js  c++  java
  • 怎么设置task的最大线程数

      1 //-------------------------------------------------------------------------- 
      2 //  
      3 //  Copyright (c) Microsoft Corporation.  All rights reserved.  
      4 //  
      5 //  File: LimitedConcurrencyTaskScheduler.cs 
      6 // 
      7 //-------------------------------------------------------------------------- 
      8  
      9 using System.Collections.Generic; 
     10 using System.Linq; 
     11  
     12 namespace System.Threading.Tasks.Schedulers 
     13 { 
     14     /// <summary> 
     15     /// Provides a task scheduler that ensures a maximum concurrency level while 
     16     /// running on top of the ThreadPool. 
     17     /// </summary> 
     18     public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler 
     19     { 
     20         /// <summary>Whether the current thread is processing work items.</summary> 
     21         [ThreadStatic] 
     22         private static bool _currentThreadIsProcessingItems; 
     23         /// <summary>The list of tasks to be executed.</summary> 
     24         private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks) 
     25         /// <summary>The maximum concurrency level allowed by this scheduler.</summary> 
     26         private readonly int _maxDegreeOfParallelism; 
     27         /// <summary>Whether the scheduler is currently processing work items.</summary> 
     28         private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks) 
     29  
     30         /// <summary> 
     31         /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the 
     32         /// specified degree of parallelism. 
     33         /// </summary> 
     34         /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param> 
     35         public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism) 
     36         { 
     37             if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism"); 
     38             _maxDegreeOfParallelism = maxDegreeOfParallelism; 
     39         } 
     40  
     41         /// <summary>Queues a task to the scheduler.</summary> 
     42         /// <param name="task">The task to be queued.</param> 
     43         protected sealed override void QueueTask(Task task) 
     44         { 
     45             // Add the task to the list of tasks to be processed.  If there aren't enough 
     46             // delegates currently queued or running to process tasks, schedule another. 
     47             lock (_tasks) 
     48             { 
     49                 _tasks.AddLast(task); 
     50                 if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) 
     51                 { 
     52                     ++_delegatesQueuedOrRunning; 
     53                     NotifyThreadPoolOfPendingWork(); 
     54                 } 
     55             } 
     56         } 
     57  
     58         /// <summary> 
     59         /// Informs the ThreadPool that there's work to be executed for this scheduler. 
     60         /// </summary> 
     61         private void NotifyThreadPoolOfPendingWork() 
     62         { 
     63             ThreadPool.UnsafeQueueUserWorkItem(_ => 
     64             { 
     65                 // Note that the current thread is now processing work items. 
     66                 // This is necessary to enable inlining of tasks into this thread. 
     67                 _currentThreadIsProcessingItems = true; 
     68                 try 
     69                 { 
     70                     // Process all available items in the queue. 
     71                     while (true) 
     72                     { 
     73                         Task item; 
     74                         lock (_tasks) 
     75                         { 
     76                             // When there are no more items to be processed, 
     77                             // note that we're done processing, and get out. 
     78                             if (_tasks.Count == 0) 
     79                             { 
     80                                 --_delegatesQueuedOrRunning; 
     81                                 break; 
     82                             } 
     83  
     84                             // Get the next item from the queue 
     85                             item = _tasks.First.Value; 
     86                             _tasks.RemoveFirst(); 
     87                         } 
     88  
     89                         // Execute the task we pulled out of the queue 
     90                         base.TryExecuteTask(item); 
     91                     } 
     92                 } 
     93                 // We're done processing items on the current thread 
     94                 finally { _currentThreadIsProcessingItems = false; } 
     95             }, null); 
     96         } 
     97  
     98         /// <summary>Attempts to execute the specified task on the current thread.</summary> 
     99         /// <param name="task">The task to be executed.</param> 
    100         /// <param name="taskWasPreviouslyQueued"></param> 
    101         /// <returns>Whether the task could be executed on the current thread.</returns> 
    102         protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) 
    103         { 
    104             // If this thread isn't already processing a task, we don't support inlining 
    105             if (!_currentThreadIsProcessingItems) return false; 
    106  
    107             // If the task was previously queued, remove it from the queue 
    108             if (taskWasPreviouslyQueued) TryDequeue(task); 
    109  
    110             // Try to run the task. 
    111             return base.TryExecuteTask(task); 
    112         } 
    113  
    114         /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary> 
    115         /// <param name="task">The task to be removed.</param> 
    116         /// <returns>Whether the task could be found and removed.</returns> 
    117         protected sealed override bool TryDequeue(Task task) 
    118         { 
    119             lock (_tasks) return _tasks.Remove(task); 
    120         } 
    121  
    122         /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary> 
    123         public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } } 
    124  
    125         /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary> 
    126         /// <returns>An enumerable of the tasks currently scheduled.</returns> 
    127         protected sealed override IEnumerable<Task> GetScheduledTasks() 
    128         { 
    129             bool lockTaken = false; 
    130             try 
    131             { 
    132                 Monitor.TryEnter(_tasks, ref lockTaken); 
    133                 if (lockTaken) return _tasks.ToArray(); 
    134                 else throw new NotSupportedException(); 
    135             } 
    136             finally 
    137             { 
    138                 if (lockTaken) Monitor.Exit(_tasks); 
    139             } 
    140         } 
    141     } 
    142 }

    测试:

    输出结果:

    参考:

    http://msdn.microsoft.com/en-us/library/ee789351(v=vs.110).aspx

    https://social.msdn.microsoft.com/Forums/zh-CN/b02ba3b4-539b-46b7-af6b-a5ca3a61a309/task?forum=visualcshartzhchs

    https://code.msdn.microsoft.com/Samples-for-Parallel-b4b76364/view/SourceCode#content

  • 相关阅读:
    Python之socket
    Python之创建low版的线程池
    Python之面向对象及相关
    Python之面向对象(进阶篇)
    Python之面向对象(初级篇)
    Python之线程与进程
    python中执行父类的构造方法
    python之反射
    记一次有趣的米筐经历~
    算法第四章作业
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/4134369.html
Copyright © 2011-2022 走看看