随着CPU多核的普及,编程时充分利用这个特性越显重要。上篇首先用传统的嵌套循环进行数组填充,然后用.NET 4.0中的System.Threading.Tasks提供的Parallel Class来并行地进行填充,最后对比他们的性能。本文将深入分析Parallel Class并借机回答上篇9楼提出的问题,而System.Threading.Tasks分析,这个将推迟到.NET(C#) Internals: 以一个数组填充的例子初步了解.NET 4.0中的并行(三)中介绍。内容如下:
- 1、Parallel Class
- 1.1、For方法
- 1.2、ForEach方法
- 1.3、Invoke方法
- 2、并发控制疑问?
- 2.1、使用Lock锁
- 2.2、使用PLINQ——用AsParallel
- 2.3、使用PLINQ——用ParallelEnumerable
- 2.4、使用Interlocked操作
- 2.5、使用Parallel.For的有Thread-Local变量重载函数
- 性能比较
1、Parallel Class
01 |
using System.Threading.Tasks; |
02 |
class Test |
03 |
{ |
04 |
static int N = 1000; |
05 |
06 |
static void TestMethod() |
07 |
{ |
08 |
// Using a named method. |
09 |
Parallel.For(0, N, Method2); |
10 |
11 |
// Using an anonymous method. |
12 |
Parallel.For(0, N, delegate ( int i) |
13 |
{ |
14 |
// Do Work. |
15 |
}); |
16 |
17 |
// Using a lambda expression. |
18 |
Parallel.For(0, N, i => |
19 |
{ |
20 |
// Do Work. |
21 |
}); |
22 |
} |
23 |
24 |
static void Method2( int i) |
25 |
{ |
26 |
// Do work. |
27 |
} |
28 |
} |
- For方法
- ForEach方法
- Invoke方法
- For(Int32 fromInclusive, Int32 toExclusive, Action<Int32> body),该方法对区间(fromInclusive,toExclusive)之间的迭代调用body表示的委托。body委托有一个迭代数次的int32参数,如果fromInclusive>=toExclusive,则不会执行任何迭代。
- For(Int32 fromInclusive, Int32 toExclusive, Action<Int32, ParallelLoopState>),该方法对区间(fromInclusive, toExclusive)之间的迭代调用body表示的委托。body委托有两个参数——表示迭代数次的int32参数、一个可用于过早地跳出循环的ParallelLoopState实例。如果fromInclusive>=toExclusive,则不会执行任何迭代。
如果在当前迭代之前的迭代不必要执行,应该调用Stop而不是Break。调用Stop通知For循环放弃剩下的迭代,不管它是否在当前迭代之前或之后,因为所以要求的工作已经完成。然而,Break并不能保证这个。 - For(Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action<Int32> body),跟第一个方法类似,但它的区间是[fromInclusive, toExclusive)。
- For(Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action<Int32, ParallelLoopState> body),跟第二个方法类似,单的区间是[fromInclusive, toExclusive)。
- For<TLocal>(Int32 fromInclusive, Int32 toExclusive, Func<TLocal> localInit, Func<Int32, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally),它的迭代区间是[fromInclusive, toExclusive)。另外body有两个local状态变量用于同一线程的迭代之间共享。localInit委托将在每个线程参与循环执行时调用,并返回这些线程初始的local状态。这些初始状态被传递给body,当它在每个线程上第一次调用时。然后,接下来body调用返回一个可能的修改状态值且传递给下一次body调用。最终,最后一次在每个线程上的body调用返回的一个状态值传递给localFinally委托。每个线程执行在自己的loacl 状态上执行最后一个动作时,localFinally委托将被调用。这个委托可能在多个线程上并发执行,因此,你必须同步访问任何共享变量。
- For<TLocal>(Int32, Int32, ParallelOptions, Func<TLocal>, Func<Int32, ParallelLoopState, TLocal, TLocal>, Action<TLocal>),跟上面的方法类似。
下面代码演示了For(Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action<Int32> body)方法(来自MSDN):
01 |
using System; |
02 |
using System.Collections.Generic; |
03 |
using System.Linq; |
04 |
using System.Text; |
05 |
using System.Threading; |
06 |
using System.Threading.Tasks; |
07 |
08 |
namespace ConsoleApplication2 |
09 |
{ |
10 |
class Program |
11 |
{ |
12 |
// Demonstrated features: |
13 |
// CancellationTokenSource |
14 |
// Parallel.For() |
15 |
// ParallelOptions |
16 |
// ParallelLoopResult |
17 |
// Expected results: |
18 |
// An iteration for each argument value (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) is executed. |
19 |
// The order of execution of the iterations is undefined. |
20 |
// The iteration when i=2 cancels the loop. |
21 |
// Some iterations may bail out or not start at all; because they are temporally executed in unpredictable order, |
22 |
// it is impossible to say which will start/complete and which won't. |
23 |
// At the end, an OperationCancelledException is surfaced. |
24 |
// Documentation: |
26 |
27 |
static void Main( string [] args) |
28 |
{ |
29 |
CancellationTokenSource cancellationSource = new CancellationTokenSource(); |
30 |
ParallelOptions options = new ParallelOptions(); |
31 |
options.CancellationToken = cancellationSource.Token; |
32 |
try |
33 |
{ |
34 |
ParallelLoopResult loopResult = Parallel.For( |
35 |
0, |
36 |
10, |
37 |
options, |
38 |
(i, loopState) => |
39 |
{ |
40 |
Console.WriteLine( "Start Thread={0}, i={1}" , Thread.CurrentThread.ManagedThreadId, i); |
41 |
42 |
// Simulate a cancellation of the loop when i=2 |
43 |
if (i == 2) |
44 |
{ |
45 |
cancellationSource.Cancel(); |
46 |
} |
47 |
48 |
// Simulates a long execution |
49 |
for ( int j = 0; j < 10; j++) |
50 |
{ |
51 |
Thread.Sleep(1 * 200); |
52 |
53 |
// check to see whether or not to continue |
54 |
if (loopState.ShouldExitCurrentIteration) return ; |
55 |
} |
56 |
57 |
Console.WriteLine( "Finish Thread={0}, i={1}" , Thread.CurrentThread.ManagedThreadId, i); |
58 |
} |
59 |
); |
60 |
if (loopResult.IsCompleted) |
61 |
{ |
62 |
Console.WriteLine( "All iterations completed successfully. THIS WAS NOT EXPECTED." ); |
63 |
} |
64 |
} |
65 |
// No exception is expected in this example, but if one is still thrown from a task, |
66 |
// it will be wrapped in AggregateException and propagated to the main thread. |
67 |
catch (AggregateException e) |
68 |
{ |
69 |
Console.WriteLine( "Parallel.For has thrown an AggregateException. THIS WAS NOT EXPECTED.\n{0}" , e); |
70 |
} |
71 |
// Catching the cancellation exception |
72 |
catch (OperationCanceledException e) |
73 |
{ |
74 |
Console.WriteLine( "An iteration has triggered a cancellation. THIS WAS EXPECTED.\n{0}" , e.ToString()); |
75 |
} |
76 |
} |
77 |
} |
78 |
} |
- Invoke(params Action[] actions):actions是一个要执行的动作数组,这些动作可能并行地执行,但并不保证执行的顺序及一定并行执行。这个方法直到提供的所有操作完成时才返回,不管是否正常地完成或异常终止。
- Invoke(ParallelOptions parallelOptions, params Action[] actions):跟上面的方法类似,只是增加了一个parallelOptions参数,可以用户调用者取消整个操作。
01 |
using System; |
02 |
using System.Collections.Generic; |
03 |
using System.Linq; |
04 |
using System.Text; |
05 |
using System.Threading; |
06 |
using System.Threading.Tasks; |
07 |
08 |
namespace ConsoleApplication2 |
09 |
{ |
10 |
class Program |
11 |
{ |
12 |
static void Main() |
13 |
{ |
14 |
try |
15 |
{ |
16 |
Parallel.Invoke( |
17 |
BasicAction, // Param #0 - static method |
18 |
() => // Param #1 - lambda expression |
19 |
{ |
20 |
Console.WriteLine( "Method=beta, Thread={0}" , Thread.CurrentThread.ManagedThreadId); |
21 |
}, |
22 |
delegate () // Param #2 - in-line delegate |
23 |
{ |
24 |
Console.WriteLine( "Method=gamma, Thread={0}" , Thread.CurrentThread.ManagedThreadId); |
25 |
} |
26 |
); |
27 |
} |
28 |
// No exception is expected in this example, but if one is still thrown from a task, |
29 |
// it will be wrapped in AggregateException and propagated to the main thread. |
30 |
catch (AggregateException e) |
31 |
{ |
32 |
Console.WriteLine( "An action has thrown an exception. THIS WAS UNEXPECTED.\n{0}" , e.InnerException.ToString()); |
33 |
} |
34 |
} |
35 |
36 |
static void BasicAction() |
37 |
{ |
38 |
Console.WriteLine( "Method=alpha, Thread={0}" , Thread.CurrentThread.ManagedThreadId); |
39 |
} |
40 |
} |
41 |
} |
01 |
using System; |
02 |
using System.Collections.Generic; |
03 |
using System.Linq; |
04 |
using System.Text; |
05 |
using System.Threading; |
06 |
using System.Threading.Tasks; |
07 |
08 |
namespace ConsoleApplication2 |
09 |
{ |
10 |
class Program |
11 |
{ |
12 |
static void Main( string [] args) |
13 |
{ |
14 |
int loops=0; |
15 |
while (loops <= 100) |
16 |
{ |
17 |
long sum = 0; |
18 |
Parallel.For(1, 1001, delegate ( long i) |
19 |
{ |
20 |
sum += i; |
21 |
}); |
22 |
System.Console.WriteLine(sum); |
23 |
loops++; |
24 |
} |
25 |
} |
26 |
} |
27 |
} |
01 |
using System; |
02 |
using System.Collections.Generic; |
03 |
using System.Linq; |
04 |
using System.Text; |
05 |
using System.Threading; |
06 |
using System.Threading.Tasks; |
07 |
08 |
namespace ConsoleApplication2 |
09 |
{ |
10 |
class Program |
11 |
{ |
12 |
static void Main( string [] args) |
13 |
{ |
14 |
int loops = 0; |
15 |
object moniter = new object (); |
16 |
while (loops <= 100) |
17 |
{ |
18 |
long sum = 0; |
19 |
Parallel.For(1, 1001, delegate ( long i) |
20 |
{ |
21 |
lock (moniter) { sum += i; } |
22 |
}); |
23 |
System.Console.WriteLine(sum); |
24 |
loops++; |
25 |
} |
26 |
} |
27 |
} |
28 |
} |
01 |
using System; |
02 |
using System.Collections.Generic; |
03 |
using System.Linq; |
04 |
using System.Text; |
05 |
using System.Threading; |
06 |
using System.Threading.Tasks; |
07 |
08 |
namespace ConsoleApplication2 |
09 |
{ |
10 |
class Program |
11 |
{ |
12 |
static void Main( string [] args) |
13 |
{ |
14 |
int loops = 0; |
15 |
while (loops <= 100) |
16 |
{ |
17 |
long sum = 0; |
18 |
sum = Enumerable.Range(0, 1001).AsParallel().Sum(); |
19 |
System.Console.WriteLine(sum); |
20 |
loops++; |
21 |
} |
22 |
} |
23 |
} |
24 |
} |
01 |
using System; |
02 |
using System.Collections.Generic; |
03 |
using System.Linq; |
04 |
using System.Text; |
05 |
using System.Threading; |
06 |
using System.Threading.Tasks; |
07 |
08 |
namespace ConsoleApplication2 |
09 |
{ |
10 |
class Program |
11 |
{ |
12 |
static void Main( string [] args) |
13 |
{ |
14 |
int loops = 0; |
15 |
while (loops <= 100) |
16 |
{ |
17 |
long sum = 0; |
18 |
sum = ParallelEnumerable.Range(0, 1001).Sum(); |
19 |
System.Console.WriteLine(sum); |
20 |
loops++; |
21 |
} |
22 |
} |
23 |
} |
24 |
} |
01 |
using System; |
02 |
using System.Collections.Generic; |
03 |
using System.Linq; |
04 |
using System.Text; |
05 |
using System.Threading; |
06 |
using System.Threading.Tasks; |
07 |
08 |
namespace ConsoleApplication2 |
09 |
{ |
10 |
class Program |
11 |
{ |
12 |
static void Main( string [] args) |
13 |
{ |
14 |
int loops = 0; |
15 |
while (loops <= 100) |
16 |
{ |
17 |
long sum = 0; |
18 |
Parallel.For(1, 1001, delegate ( long i) |
19 |
{ |
20 |
Interlocked.Add( ref sum, i); |
21 |
}); |
22 |
System.Console.WriteLine(sum); |
23 |
loops++; |
24 |
} |
25 |
26 |
} |
27 |
} |
28 |
} |
01 |
using System; |
02 |
using System.Collections.Generic; |
03 |
using System.Linq; |
04 |
using System.Text; |
05 |
using System.Threading; |
06 |
using System.Threading.Tasks; |
07 |
08 |
namespace ConsoleApplication2 |
09 |
{ |
10 |
class Program |
11 |
{ |
12 |
static void Main( string [] args) |
13 |
{ |
14 |
int loops = 0; |
15 |
while (loops <= 100) |
16 |
{ |
17 |
int sum = 0; |
18 |
Parallel.For(0, 1001, () => 0, (i, state,subtotal) => |
19 |
{ |
20 |
subtotal += i; |
21 |
return subtotal; |
22 |
}, |
23 |
partial => Interlocked.Add( ref sum, partial)); |
24 |
25 |
System.Console.WriteLine(sum); |
26 |
loops++; |
27 |
} |
28 |
29 |
} |
30 |
} |
31 |
} |