设计模式——.net并行编程,清华大学出版的中译本。
- 相关资源地址主页面: http://parallelpatterns.codeplex.com/
- 代码下载: http://parallelpatterns.codeplex.com/releases/view/50473
- 书籍在线地址: https://msdn.microsoft.com/en-us/library/ff963553.aspx
- 使用并行编程的一些示例: https://code.msdn.microsoft.com/ParExtSamples
- Task的介绍页面: https://msdn.microsoft.com/en-us/library/system.threading.tasks.task(v=vs.110).aspx
这本书介绍了一些多线程编程的模式,也就是会使用多线程的场景,以及可以使用.net中的什么技术实现--当然主要是TPL(Task parallel Library)和PLINQ(parallel LINQ)。TPL是.NET Framework 4中加的功能,目的是封装以前的thread和同步,线程池等概念,大家只要使用task和System.Threading.Tasks.Parallel,提供并行任务,并行和同步的细节交给库处理。 PLINQ是LINQ to Objects的并行版本。下面是这些结构的一个概览。
1. 模式分类
- 数据并行data parallelism。对不同的数据执行相同的计算——比如for循环中的计算,这是属于数据并行。 包括的模式有并行循环(parallel loops)和并行聚合(parallel aggregation)。并行循环强调并行循环之间没有数据依赖,不用控制并行顺序;并行聚合类似于map-reduce,任务有并发的部分,但数据也有需要控制同步的地方,.net提供的库很好的封装了同步,使用起来就像不需要同步一样简单。
- 任务并行task parallelism。强调并行执行的任务不同,一般任务的输入数据也不同。 包括的模式有并行任务parallel tasks、future模式、动态任务并行(dynamic task parallelism)、流水线pipelines。
2. 并行循环
当需要对集合中的每个元素执行相同的独立操作时,可以使用并行循环模式,注意循环需要相互独立。
a. 一般情况。例如我们可能有这么一个for循环
int n = ... for (int i = 0; i < n; i++) { // do some task }
对应的并行版本:
int n = ... Parallel.For(0, n, i => { // ... });
Foreach也有相应的并行版本
IEnumerable<MyObject> myEnumerable = ... foreach (var obj in myEnumerable) { // ... } IEnumerable<MyObject> myEnumerable = ... Parallel.ForEach(myEnumerable, obj => { // ... });
PLINQ多线程的例子
IEnumerable<MyObject> source = ... // LINQ var query1 = from i in source select Normalize(i); // PLINQ var query2 = from i in source.AsParallel() select Normalize(i);
b. 控制循环过程。可以在任务执行过程中控制其它并行任务,可以使用break,stop和cancel。一般stop和cancel使用比较常见。
i. 中断循环break。类似于for循环的break。注意break之后,依然会执行比break 的task的索引值小的任务,而且会保证所有索引值小的任务都会执行。如果索引值大的任务在break前开始执行,也会执行完毕。这种方式适用于任务顺序有依赖的情况,需要保证中断前的任务执行完毕。
int n = ... for (int i = 0; i < n; i++) { // ... if (/* stopping condition is true */) break; }采用多线程版本时可以这样退出循环:
int n = ... Parallel.For(0, n, (i, loopState) => { // ... if (/* stopping condition is true */) { loopState.Break(); return; } });签名:
Parallel.For(int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body);检查task状态是不是中断退出的方法:
int n = ... var result = new double[n]; var loopResult = Parallel.For(0, n, (i, loopState) => { if (/* break condition is true */) { loopState.Break(); return; } result[i] = DoWork(i); }); if (!loopResult.IsCompleted && loopResult.LowestBreakIteration.HasValue) { Console.WriteLine("Loop encountered a break at {0}", loopResult.LowestBreakIteration.Value); }ii. 中断停止stop。类似于break,不同的是stop以后不会再执行索引值小的任务,也就是正在执行的执行完毕,其它的就不再执行。任务之间完全没有依赖,只要有一个任务stop,那么不再调度剩下的任务。
var n = ... var loopResult = Parallel.For(0, n, (i, loopState) => { if (/* stopping condition is true */) { loopState.Stop(); return; } result[i] = DoWork(i); }); if (!loopResult.IsCompleted && !loopResult.LowestBreakIteration.HasValue) { Console.WriteLine(“Loop was stopped”); }iii. 外部循环取消。对于一些执行时间比较长的任务,可以使用取消操作。任务执行期间检查是否取消的标识。
void DoLoop(CancellationTokenSource cts) { int n = ... CancellationToken token = cts.Token; var options = new ParallelOptions { CancellationToken = token }; try { Parallel.For(0, n, options, (i) => { // ... // ... optionally check to see if cancellation happened if (token.IsCancellationRequested) { // ... optionally exit this iteration early return; } }); } catch (OperationCanceledException ex) { // ... handle the loop cancellation } }函数签名:
Parallel.For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int> body);问题:如果使用cancel,是否还会调度执行剩下的任务?
Cancel以后,如果任务还没有开始执行的会直接取消,已经开始的由任务自己决定是否需要取消。
c. 异常处理。如果有一个任务中抛出了异常,后面不会再调度新的任务,已经调度的会执行完成。最后所有任务可能的异常会打包放在一个异常AggregationException里面抛出来。
d. 分批执行小循环体。有的循环体执行时间较少,如果每次循环都调度一个任务,显然得不偿失。可以讲循环的执行过程分区,比如每100个循环调度一次。下面的例子根据cpu的核数自动分配每批任务的数量。
int n = ... double[] result = new double[n]; Parallel.ForEach(Partitioner.Create(0, n), (range) => { for (int i = range.Item1; i < range.Item2; i++) { // very small, equally sized blocks of work result[i] = (double)(i * i); } });
函数签名:
Parallel.ForEach<TSource>( Partitioner<TSource> source, Action<TSource> body);
下面的设置每个任务执行50000个循环。
double[] result = new double[1000000]; Parallel.ForEach(Partitioner.Create(0, 1000000, 50000), (range) => { for (int i = range.Item1; i < range.Item2; i++) { // small, equally sized blocks of work result[i] = (double)(i * i); } });
这里System.Collections.Concurrent.Partitioner将区间切割成IEnumerable<Tuple<int,int>>的形式。
e. 控制并行度。一般TPL会自动根据CPU内核数控制同时执行的任务数,你也可以通过ParallelOption的MaxDegreeOfParallelism来控制最大的并行任务数。
var n = ... var options = new ParallelOptions() { MaxDegreeOfParallelism = 2}; Parallel.For(0, n, options, i => { // ... });
函数签名:
Parallel.For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int> body);
PLINQ使用示例:
IEnumerable<T> myCollection = // ... myCollection.AsParallel() .WithDegreeOfParallelism(8) .ForAll(obj => /* ... */);
f. 在循环体中使用局部任务状态。
int numberOfSteps = 10000000; double[] result = new double[numberOfSteps]; Parallel.ForEach( Partitioner.Create(0, numberOfSteps), new ParallelOptions(), () => { return new Random(MakeRandomSeed()); }, (range, loopState, random) => { for (int i = range.Item1; i < range.Item2; i++) result[i] = random.NextDouble(); return random; }, _ => {});
函数签名:
ForEach<TSource, TLocal>( OrderablePartitioner<TSource> source, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)
3. 并行任务
如果有多个一步任务可以同时执行,可以使用并行任务模式。例如
Parallel.Invoke(DoLeft, DoRight);
等价于下面的方法:
Task t1 = Task.Factory.StartNew(DoLeft); Task t2 = Task.Factory.StartNew(DoRight); Task.WaitAll(t1, t2);
a. 处理异常(https://msdn.microsoft.com/en-us/library/dd997415(v=vs.110).aspx)。使用Wait和WaitAll可以观察任务抛出来的异常,WaitAny不能。收到的异常会包装到AggregateException里面,可以使用Handle方法来处理里面的异常。
try { Task t = Task.Factory.StartNew( ... ); // ... t.Wait(); } catch (AggregateException ae) { ae.Handle(e => { if (e is MyException) { // ... handle exception ... return true; } else { return false; } }); }
由于异常有可能嵌套其它的异常,形成一个多级的树结构,可以使用Flatten压平树结构,然后调用handle,可以保证聚合的所有异常都可以被处理。
try { Task t1 = Task.Factory.StartNew(() => { Task t2 = Task.Factory.StartNew(() => { // ... throw new MyException(); }); // ... t2.Wait(); }); // ... t1.Wait(); } catch (AggregateException ae) { ae.Flatten().Handle(e => { if (e is MyException) { // ... handle exception ... return true; } else { return false; } }); }
b. 等待第一个任务完成。可以使用WaitAny等待第一个任务完成。注意WaitAny不会观察到异常,后面加了WaitAll来处理异常。
var taskIndex = -1; Task[] tasks = new Task[] { Task.Factory.StartNew(DoLeft), Task.Factory.StartNew(DoRight), Task.Factory.StartNew(DoCenter) }; Task[] allTasks = tasks; // Print completion notices one by one as tasks finish. while (tasks.Length > 0) { taskIndex = Task.WaitAny(tasks); Console.WriteLine("Finished task {0}.", taskIndex + 1); tasks = tasks.Where((t) => t != tasks[taskIndex]).ToArray(); } // Observe any exceptions that might have occurred. try { Task.WaitAll(allTasks); } catch (AggregateException ae) { ... }
下面的例子等待第一个完成的任务,然后取消其它任务,处理取消操作异常,其它的异常会重新抛出。
public static void SpeculativeInvoke( params Action<CancellationToken>[] actions) { var cts = new CancellationTokenSource(); var token = cts.Token; var tasks = (from a in actions select Task.Factory.StartNew(() => a(token), token)) .ToArray(); // Wait for fastest task to complete. Task.WaitAny(tasks); // Cancel all of the slower tasks. cts.Cancel(); // Wait for cancellation to finish and observe exceptions. try { Task.WaitAll(tasks); } catch (AggregateException ae) { // Filter out the exception caused by cancellation itself. ae.Flatten().Handle(e => e is OperationCanceledException); } finally { if (cts != null) cts.Dispose(); } }
c. 新手易犯的错误。
i. 闭包捕获的变量问题。考虑下面这段代码。
for (int i = 0; i < 4; i++) { // WARNING: BUGGY CODE, i has unexpected value Task.Factory.StartNew(() => Console.WriteLine(i)); }你可能希望输出数字1,2,3,4,只是打乱了顺序。实际上你很可能看到4,4,4,4. 因为几个Task其实访问了同一个变量i。可以使用下面的方法来避免这个问题。
for (int i = 0; i < 4; i++) { var tmp = i; Task.Factory.StartNew(() => Console.WriteLine(tmp)); }ii. 错误的时间清理任务所需资源的问题。考虑下面这段代码。
Task<string> t; using (var file = new StringReader("text")) { t = Task<string>.Factory.StartNew(() => file.ReadLine()); } // WARNING: BUGGY CODE, file has been disposed Console.WriteLine(t.Result);很可能任务执行的时候file已经dispose了。
d. 任务的生命周期。
e. 任务调度机制。一个Task Scheduler的例子:"How to: Create a Task Scheduler That Limits the Degree of Concurrency."
4. 并行合并计算
类似于map-reduce,先并行计算出中间结果,然后合并得到最终结果。下面的例子先并发计算部分和,然后合并总和(注意同步)。
double[] sequence = ... object lockObject = new object(); double sum = 0.0d; Parallel.ForEach( // The values to be aggregated sequence, // The local initial partial result () => 0.0d, // The loop body (x, loopState, partialResult) => { return Normalize(x) + partialResult; }, // The final step of each local context (localPartialSum) => { // Enforce serial access to single, shared result lock (lockObject) { sum += localPartialSum; } }); return sum;
5. future模式
在一个任务结束后执行其他的任务。下面的例子使用ContinueWith和ContinueWhenAll实现任务的延续执行。
TextBox myTextBox = ...; var futureB = Task.Factory.StartNew<int>(() => F1(a)); var futureD = Task.Factory.StartNew<int>(() => F3(F2(a))); var futureF = Task.Factory.ContinueWhenAll<int, int>( new[] { futureB, futureD }, (tasks) => F4(futureB.Result, futureD.Result)); futureF.ContinueWith((t) => myTextBox.Dispatcher.Invoke( (Action)(() => { myTextBox.Text = t.Result.ToString(); })) );
6. 动态任务并行
动态添加任务。父子任务关系。下面是并行处理快速排序的例子。
static void ParallelQuickSort(int[] array, int from, int to, int depthRemaining) { if (to - from <= Threshold) { InsertionSort(array, from, to); } else { int pivot = from + (to - from) / 2; pivot = Partition(array, from, to, pivot); if (depthRemaining > 0) { Parallel.Invoke( () => ParallelQuickSort(array, from, pivot, depthRemaining - 1), () => ParallelQuickSort(array, pivot + 1, to, depthRemaining - 1)); } else { ParallelQuickSort(array, from, pivot, 0); ParallelQuickSort(array, pivot + 1, to, 0); } } }
任务链与父子任务。创建任务时使用TaskCreationOptions.AttachedToParent可以建立父子任务关系,在子任务完成前,父任务结束运行时,会进入WaitingForChildrenToComplete的状态。
static void ParallelWalk2<T>(Tree<T> tree, Action<T> action) { if (tree == null) return; var t1 = Task.Factory.StartNew( () => action(tree.Data), TaskCreationOptions.AttachedToParent); var t2 = Task.Factory.StartNew( () => ParallelWalk2(tree.Left, action), TaskCreationOptions.AttachedToParent); var t3 = Task.Factory.StartNew( () => ParallelWalk2(tree.Right, action), TaskCreationOptions.AttachedToParent); Task.WaitAll(t1, t2, t3); }
7. 流水线
利用并发队列实现并发任务顺次处理。下面是处理语句的例子,先读取字符,校正,输出。注意这里使用了容器BlockingCollection,实现了生产者消费者的同步。
int seed = ... int BufferSize = ... var buffer1 = new BlockingCollection<string>(BufferSize); var buffer2 = new BlockingCollection<string>(BufferSize); var buffer3 = new BlockingCollection<string>(BufferSize); var f = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None); var stage1 = f.StartNew(() => ReadStrings(buffer1, ...)); var stage2 = f.StartNew(() => CorrectCase(buffer1, buffer2)); var stage3 = f.StartNew(() => CreateSentences(buffer2, buffer3)); var stage4 = f.StartNew(() => WriteSentences(buffer3)); Task.WaitAll(stage1, stage2, stage3, stage4);
读取字符
static void ReadStrings(BlockingCollection<string> output, int seed) { try { foreach (var phrase in PhraseSource(seed)) { Stage1AdditionalWork(); output.Add(phrase); } } finally { output.CompleteAdding(); } }
几种并行模式的比较
Await/async模式
- 相关资源合集: https://msdn.microsoft.com/library/hh191443.aspx
- 常见问题: http://blogs.msdn.com/b/pfxteam/archive/2012/04/12/10293335.aspx
- 解释由来的文章:Asynchronous Programming: Easier Asynchronous Programming with the New Visual Studio Async CTP
- 解释实现机制:Asynchronous Programming: Pause and Play with Await
- 解释** Asynchronous Programming: Understanding the Costs of Async and Await
- 解释: Await, SynchronizationContext, and Console Apps
- MSDN Magazine: http://blogs.msdn.com/b/msdnmagazine/