C#并行库(TaskParallelLibrary)用法小结
.NET 4.5并行库(TaskParallelLibrary)
也许C和C++的程序员刚刚开始写C#还习惯于new Thread来新建一个线程,但新建线程需要内存和CPU上下文切换的开销,200,000个周期,销毁线程也需要100,000个周期;所以还需要实现一个线程池Threadpool。自从有了并行库(TaskParallelLibrary),这些都不需要了。使用Task.Factory.StartNew(() => DoSomething(item));可以创建一个线程并自动由线程池管理。写法非常简单,但其实里面误区很多:
1. Task.Factory.StartNew(() => DoSomeWork())不是阻塞的
下面的代码会先输出ddd,因为Task.Factory.Startnew不阻塞:
1
2
|
var task = Task.Factory.StartNew(() => Console.WriteLine( "eee" )); Console.WriteLine( "ddd" ); |
如果你想阻塞,应该加上wait,改为这样:
1
2
|
var task = Task.Factory.StartNew(() => Console.WriteLine( "eee" )).Wait(); Console.WriteLine( "ddd" ); |
同样,Task.Factory.StartNew(() => DoSomeWork()).ContinueWith…也是是异步的,想让它阻塞,应该加上wait,这样写:
1
2
|
var task = Task.Factory.StartNew(() => return "" ).ContinueWith( s => { Console.WriteLine(s.Result); }).Wait(); Console.WriteLine( "ddd" ); |
2. Task.Factory.StartNew(() => DoSomeWork()).ContinueWith…没有运行在新的线程里
1
2
3
4
5
|
var task = Task.Factory.StartNew(() => return "" ).ContinueWith( s => { DoSomething2(s.Result); }).Wait(); Console.WriteLine( "ddd" ); |
注意上面的DoSomething2()是运行在主线程,而不是在新的线程里
3. Parallel.ForEach为何导致内存溢出
如果对一个10000个item的collection使用Parallel.ForEach,可以想象会发生什么。TPL默认是Parallel.ForEach使用场景是对CPU敏感的,TPL会持续创建线程,直到你的CPU利用率达100%;问题是你的使用场景如果不是CPU敏感的,例如是I/O敏感的,TPL想尽可能的利用你的CPU,所以检测你的CPU利用率,如果还不是100%就会一直创建线程....直到内存耗尽。所以,使用要注意使用场景十分CPU敏感的,另外可以加一个参数来限制TPL线程的创建:
1
2
3
4
5
6
|
Parallel.ForEach(items, new ParallelOptions { MaxDegreeOfParallelism = 4 }, item => DoSomething(item)); |
ParallelOptions.MaxDegreeOfParallelism参数含义:
If your task is CPU-bound then you should see a pattern like this on a quad-core system:
ParallelOptions.MaximumDegreeOfParallelism = 1
: use one full CPU or 25% CPU utilizationParallelOptions.MaximumDegreeOfParallelism = 2
: use two CPUs or 50% CPU utilizationParallelOptions.MaximumDegreeOfParallelism = 4
: use all CPUs or 100% CPU utilization
4. 如何等待Parallel.ForEach运行都结束
1
2
|
Parallel.ForEach<Item>(items, item => DoSomething(item)); Console.WriteLine( "ddd" ); |
是阻塞的,所以以上代码会在最后输出ddd。
如果是等多个Task,可以这样写:
1
2
3
4
|
var task1 = Task.Factory.StartNew(() => DoSomeWork()); var task2 = Task.Factory.StartNew(() => DoSomeWork()); var task3 = Task.Factory.StartNew(() => DoSomeWork()); Task.WaitAll(task1, task2, task3); |
或者这样写:
1
2
3
4
5
6
7
|
Task.Factory.ContinueWhenAll( new [] { task1, task2, task3 }, tasks => { foreach (Task< string > task in tasks) { Console.WriteLine(task.Result); } }); |
5. Task.Factory.StartNew和Parallel.ForEach可以嵌套使用吗
都可以嵌套使用,例如:
1
2
3
|
var task1 = Task.Factory.StartNew( () => Parallel.ForEach<Item>(items, item => DoSomething(item))); var task2 = Task.Factory.StartNew( () => Parallel.ForEach<Item>(items2, item => DoSomething2(item))); Task.WaitAll(task1, task2); |
6. Thread.Sleep还需要吗
以前,我们轮询的时候常常喜欢这样的写法:
1
2
3
4
5
6
|
while ( true ) { doSomework(); Thread.Sleep(1000); } |
这是一种代码的坏味道,Stackoverflow的讨论在这儿,解决方法是用WaitEvent替代,当然在C#中还是推荐用BlockingCollection替代。
6. TPL中闭包的陷阱
例如在下面的代码中 counter++存在线程不安全的问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
int counter = 0; Task.Factory.StartNew( () => Parallel.ForEach(items, new ParallelOptions { MaxDegreeOfParallelism = 4 }, item => { DoSomething(item); counter++; }); ); |
应该改为:
1
|
Interlocked.Increment( ref successCount); |
7. Lock锁带来的性能问题
性能问题首先要诊断,例如用条件编译打印出线程id和运行时序,可以知道所有线程的运行先后次序和等待情况。还可以借助工具来调试多线程问题。这里要说的锁的问题。如果你的程序用Parallel.ForEach貌似是并发的,但如果有用到Lock,那可能你所有的线程都在等待,性能将是一塌糊涂的。所以最好的方法是避免锁,保证Parallel.ForEach里面每一个对象不会用到竞争的资源/例如修改同一个对象。退而求其次的是用锁,但要非常小心。例如,lock(this),lock(typeof(mytype)),lock(“mylock”),如果lock的是public访问的,或者锁名字一样,将会造成问题。还有的人干脆来个大括号,一整段全都锁住。死锁有时候很难调试发现诊断,
下面的代码有死锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
// thread 1 lock ( typeof ( int )) { Thread.Sleep(1000); lock ( typeof ( float )) { Console.WriteLine( "Thread 1 got both locks" ); } } // thread 2 lock ( typeof ( float )) { Thread.Sleep(1000); lock ( typeof ( int )) { Console.WriteLine( "Thread 2 got both locks" ); } } |
8. TaskFactory.Startnew和异步async/await的不同
1
2
3
|
var Data = await Task.WhenAll(WebService1.Call(), WebService2.Call(), WebService3.Call()); |
关于TaskFactory.Startnew和异步async/await的不同,下面两文章已经讲的非常清楚了:
- http://msdn.microsoft.com/en-us/library/dd997423.aspx
- http://stackoverflow.com/questions/10285159/difference-between-the-tpl-async-await-thread-handling
在下面的情况下,推荐使用Task.Factory.FromAsync()因为异步I/O比同步的CPU等待等有效,特别是对于获取I/O的高伸缩性。
1
2
3
4
5
6
7
8
9
10
11
12
|
NetworkStream stream; byte [] data; int bytesRead; //using FromAsync Task< int > readChunk = Task< int >.Factory.FromAsync ( stream.BeginRead, stream.EndRead, data, bytesRead, data.Length - bytesRead, null ); //using StartNew with blocking version Task< int > readChunk2 = Task< int >.Factory.StartNew(() => stream.Read(data, bytesRead, data.Length - bytesRead)); |