1 //-------------------------------------------------------------------------- 2 // 3 // Copyright (c) Microsoft Corporation. All rights reserved. 4 // 5 // File: LimitedConcurrencyTaskScheduler.cs 6 // 7 //-------------------------------------------------------------------------- 8 9 using System.Collections.Generic; 10 using System.Linq; 11 12 namespace System.Threading.Tasks.Schedulers 13 { 14 /// <summary> 15 /// Provides a task scheduler that ensures a maximum concurrency level while 16 /// running on top of the ThreadPool. 17 /// </summary> 18 public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler 19 { 20 /// <summary>Whether the current thread is processing work items.</summary> 21 [ThreadStatic] 22 private static bool _currentThreadIsProcessingItems; 23 /// <summary>The list of tasks to be executed.</summary> 24 private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks) 25 /// <summary>The maximum concurrency level allowed by this scheduler.</summary> 26 private readonly int _maxDegreeOfParallelism; 27 /// <summary>Whether the scheduler is currently processing work items.</summary> 28 private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks) 29 30 /// <summary> 31 /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the 32 /// specified degree of parallelism. 33 /// </summary> 34 /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param> 35 public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism) 36 { 37 if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism"); 38 _maxDegreeOfParallelism = maxDegreeOfParallelism; 39 } 40 41 /// <summary>Queues a task to the scheduler.</summary> 42 /// <param name="task">The task to be queued.</param> 43 protected sealed override void QueueTask(Task task) 44 { 45 // Add the task to the list of tasks to be processed. If there aren't enough 46 // delegates currently queued or running to process tasks, schedule another. 47 lock (_tasks) 48 { 49 _tasks.AddLast(task); 50 if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) 51 { 52 ++_delegatesQueuedOrRunning; 53 NotifyThreadPoolOfPendingWork(); 54 } 55 } 56 } 57 58 /// <summary> 59 /// Informs the ThreadPool that there's work to be executed for this scheduler. 60 /// </summary> 61 private void NotifyThreadPoolOfPendingWork() 62 { 63 ThreadPool.UnsafeQueueUserWorkItem(_ => 64 { 65 // Note that the current thread is now processing work items. 66 // This is necessary to enable inlining of tasks into this thread. 67 _currentThreadIsProcessingItems = true; 68 try 69 { 70 // Process all available items in the queue. 71 while (true) 72 { 73 Task item; 74 lock (_tasks) 75 { 76 // When there are no more items to be processed, 77 // note that we're done processing, and get out. 78 if (_tasks.Count == 0) 79 { 80 --_delegatesQueuedOrRunning; 81 break; 82 } 83 84 // Get the next item from the queue 85 item = _tasks.First.Value; 86 _tasks.RemoveFirst(); 87 } 88 89 // Execute the task we pulled out of the queue 90 base.TryExecuteTask(item); 91 } 92 } 93 // We're done processing items on the current thread 94 finally { _currentThreadIsProcessingItems = false; } 95 }, null); 96 } 97 98 /// <summary>Attempts to execute the specified task on the current thread.</summary> 99 /// <param name="task">The task to be executed.</param> 100 /// <param name="taskWasPreviouslyQueued"></param> 101 /// <returns>Whether the task could be executed on the current thread.</returns> 102 protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) 103 { 104 // If this thread isn't already processing a task, we don't support inlining 105 if (!_currentThreadIsProcessingItems) return false; 106 107 // If the task was previously queued, remove it from the queue 108 if (taskWasPreviouslyQueued) TryDequeue(task); 109 110 // Try to run the task. 111 return base.TryExecuteTask(task); 112 } 113 114 /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary> 115 /// <param name="task">The task to be removed.</param> 116 /// <returns>Whether the task could be found and removed.</returns> 117 protected sealed override bool TryDequeue(Task task) 118 { 119 lock (_tasks) return _tasks.Remove(task); 120 } 121 122 /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary> 123 public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } } 124 125 /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary> 126 /// <returns>An enumerable of the tasks currently scheduled.</returns> 127 protected sealed override IEnumerable<Task> GetScheduledTasks() 128 { 129 bool lockTaken = false; 130 try 131 { 132 Monitor.TryEnter(_tasks, ref lockTaken); 133 if (lockTaken) return _tasks.ToArray(); 134 else throw new NotSupportedException(); 135 } 136 finally 137 { 138 if (lockTaken) Monitor.Exit(_tasks); 139 } 140 } 141 } 142 }
测试:
输出结果:
参考:
http://msdn.microsoft.com/en-us/library/ee789351(v=vs.110).aspx
https://code.msdn.microsoft.com/Samples-for-Parallel-b4b76364/view/SourceCode#content