(一)概述
所有需要等待的操作,例如,因为文件、数据库或网络访问都需要一定的时间,此时就可以启动一个新的线程,同时完成其他任务。
线程是程序中独立的指令流。
(二)Paraller类
Paraller类是对线程的一个很好的抽象,该类位于System.Threading.Tasks名称空间中,提供了数据和任务并行性。
Paraller.For()和Paraller.ForEach()方法在每次迭代中调用相同的代码,二Parallel.Invoke()方法允许同时调用不同的方法。Paraller.Invoke用于任务并行性,而Parallel.ForEach用于数据并行性。
1、Parallel.For()方法循环
ParallelLoopResult result = Parallel.For(0, 10, i => { Console.WriteLine("当前迭代顺序:" + i); Thread.Sleep(10);//线程等待 });
在For()方法中,前两个参数定义了循环的开头和结束,第三个参数是一个Action<int>委托,参数是循环迭代的次数。
Parallel类只等待它创建的任务,而不等待其他后台活动。
Parallel.For()方法可以提前停止:
var result = Parallel.For(10, 40, async (int i, ParallelLoopState pls) => { Console.WriteLine("迭代序号:{0}, 任务: {1}, 线程: {2}", i, Task.CurrentId, Thread.CurrentThread.ManagedThreadId); await Task.Delay(10); if (i > 15) { pls.Break(); } }); Console.WriteLine("循环完成状态:" + result.IsCompleted); Console.WriteLine("Break索引:" + result.LowestBreakIteration);
需要注意的是,Break()方法仅是告知循环在合适的时候退出当前迭代之外的迭代。
Parallel.For()还可以对线程进行初始化和退出时制定方法:
Parallel.For<string>(10,25,()=> { Console.WriteLine("初始线程{0},任务{1}",Thread.CurrentThread.ManagedThreadId,Task.CurrentId); return string.Format("线程Id"+ Thread.CurrentThread.ManagedThreadId); }, (i,pls,str1)=> { Console.WriteLine("迭代顺序:【{0}】,线程初始化返回值:【{1}】,线程Id:【{2}】,任务Id:【{3}】",i,str1, Thread.CurrentThread.ManagedThreadId, Task.CurrentId); Thread.Sleep(10); return string.Format("迭代顺序:"+i); }, (str1)=> { Console.WriteLine("线程主体返回值:{0}",str1); });
除了循环开头与结束的指定,第三个是对迭代调用的每个线程进行处理,第四个是迭代的方法主体,第四个是迭代完成时对线程的处理。
2、使用Paralle.ForEach()方法循环
string[] data = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k" }; Parallel.ForEach(data, (s, pls, l) => { Console.WriteLine(s + " " + l);//s是当前循环的项的值,pls是ParallelLoopState类型,l是当前迭代的顺序 });
3、通过Parallel.Invoke()方法调用多个方法
Parallel.Invoke()方法运行传递一个Action委托数组,在其中可以指定需要并行运行的方法。
1 static void Main(string[] args) 2 { 3 Parallel.Invoke(Say1, Say2, Say3, Say4, Say5); 4 Console.WriteLine("---------"); 5 Say1(); 6 Say2(); 7 Say3(); 8 Say4(); 9 Say5(); 10 11 Console.ReadKey(); 12 } 13 static void Say1() 14 { 15 Thread.Sleep(100); 16 Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff") + "1"); 17 } 18 static void Say2() 19 { 20 Thread.Sleep(100); 21 Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff") + "2"); 22 } 23 static void Say3() 24 { 25 Thread.Sleep(100); 26 Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff") + "3"); 27 } 28 static void Say4() 29 { 30 Thread.Sleep(100); 31 Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff") + "4"); 32 } 33 static void Say5() 34 { 35 Thread.Sleep(100); 36 Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff") + "5"); 37 }
(三)任务
为了更好的控制并行动作,可以使用System.Threading.Tasks名称空间中的Task类。
1、启动任务
(1)使用线程池的任务
1 private static readonly object locker = new object(); 2 static void Main(string[] args) 3 { 4 var tf = new TaskFactory(); 5 Task t1 = tf.StartNew(TaskMethod, "使用TaskFactory"); 6 7 Task t2 = Task.Factory.StartNew(TaskMethod, "使用Task.Factory"); 8 9 var t3 = new Task(TaskMethod, "使用Task构造函数并启动"); 10 t3.Start(); 11 12 Task t4 = Task.Run(() => { TaskMethod("运行"); }); 13 14 Console.ReadKey(); 15 } 16 static void TaskMethod(object title) 17 { 18 lock (locker) 19 { 20 Console.WriteLine(title); 21 Console.WriteLine("任务Id:{0},线程Id:{1}", Task.CurrentId == null ? "no Task" : Task.CurrentId.ToString(), Thread.CurrentThread.ManagedThreadId); 22 Console.WriteLine("是否为线程池线程:{0}", Thread.CurrentThread.IsThreadPoolThread); 23 Console.WriteLine("是否为后台线程:{0}",Thread.CurrentThread.IsBackground); 24 Console.WriteLine(); 25 } 26 }
(2)同步任务
任务不一定要使用线程池中的线程,也可以使用其他线程。
TaskMethod("主线程调用"); var t1 = new Task(TaskMethod,"同步运行"); t1.RunSynchronously();
(3)使用单独线程的任务
如果任务的代码应该长时间运行,就应该使用TaskCreationOptions.LongRunning告诉任务调度器创建一个新线程,而不是使用线程池中的线程。
var t1 = new Task(TaskMethod, "长时间运行任务", TaskCreationOptions.LongRunning); t1.Start();
2、Future——任务的结果
任务结束时,它可以把一些有用的状态信息写到共享对象中,这个共享对象必须是线程安全的。另一个选项是使用返回某个结果的任务,如Future它是Task类的一个泛型版本,使用这个类时,可以定义任务返回的结果的类型。
var t1 = new Task<Tuple<int, int>>(TaskWithResult, Tuple.Create<int, int>(10, 5)); t1.Start(); Console.WriteLine(t1.Result); t1.Wait(); Console.WriteLine("任务结果:{0} {1}",t1.Result.Item1, t1.Result.Item2);
3、连续的任务
通过任务,可以指定在任务完成后,应开始运行另一个特定任务。
Task t1 = new Task(DoOnFirst); t1.Start(); Task t2 = t1.ContinueWith(DoOnSecond); Task t3 = t1.ContinueWith(DoOnSecond); Task t4 = t2.ContinueWith(DoOnSecond); Task t5 = t3.ContinueWith(DoOnSecond, TaskContinuationOptions.OnlyOnFaulted);//第二个参数指t3在失败的情况下运行t5
4、任务层次结构
任务也可以构成一个层次结构。一个任务启动一个新任务时,就启动了一个父/子层次结构。
1 static void Main(string[] args) 2 { 3 var parent = new Task(ParentTask); 4 parent.Start(); 5 Thread.Sleep(2000); 6 Console.WriteLine(parent.Status); 7 Thread.Sleep(4000); 8 Console.WriteLine(parent.Status); 9 Console.ReadKey(); 10 } 11 static void ParentTask() 12 { 13 Console.WriteLine("任务Id:"+Task.CurrentId); 14 var child = new Task(ChildTask); 15 child.Start(); 16 Thread.Sleep(1000); 17 Console.WriteLine("父级子任务已开始运行"); 18 } 19 static void ChildTask() 20 { 21 Console.WriteLine("子任务开始"); 22 Thread.Sleep(5000); 23 Console.WriteLine("子任务结束"); 24 }
(四)取消架构
.NET4.5包含一个取消架构,允许以标准方式取消长时间运行的任务。取消架构基于协作行为,它不是强制的。长时间运行的任务会检查它是否被取消,并返回控制权。支持取消的方法接受一个CancellationToken参考。
1、Parallel.For()方法的取消
1 var cts = new CancellationTokenSource(); 2 cts.Token.Register(() => Console.WriteLine("*** token canceled")); 3 4 5 //在500毫秒以后发送取消指令 6 cts.CancelAfter(500); 7 try 8 { 9 var result = Parallel.For(0, 100, new ParallelOptions() { CancellationToken = cts.Token, }, x => 10 { 11 Console.WriteLine("{0}次循环开始", x) 12 int sum = 0; 13 for (int i = 0; i < 100; i++) 14 { 15 Thread.Sleep(2); 16 sum += i; 17 } 18 Console.WriteLine("{0}次循环结束", x); 19 }); 20 } 21 catch (OperationCanceledException ex) 22 { 23 Console.WriteLine(ex.Message); 24 }
使用.NET4.5中的一个新方法CancelAfter,在500毫秒后取消标记。在For()循环的实现代码内部,Parallel类验证CanceledToken的结果,并取消操作。一旦取消操作,For()方法就抛出一个OperationCanceledException类型的异常。
2、任务的取消
同样的取消模式也可以用于任务。
1 var cts = new CancellationTokenSource(); 2 cts.Token.Register(() => Console.WriteLine("*** token canceled")); 3 4 //在500毫秒以后发送取消指令 5 cts.CancelAfter(500); 6 7 Task t1 = Task.Run(()=> { 8 Console.WriteLine("任务进行中..."); 9 for (int i = 0; i < 20; i++) 10 { 11 Thread.Sleep(100); 12 CancellationToken token = cts.Token; 13 if (token.IsCancellationRequested) 14 { 15 Console.WriteLine("已发送取消请求,取消请求来自当前任务"); 16 token.ThrowIfCancellationRequested(); 17 break; 18 } 19 Console.WriteLine("循环中..."); 20 } 21 Console.WriteLine("任务结束没有取消"); 22 }); 23 try 24 { 25 t1.Wait(); 26 } 27 catch (AggregateException ex) 28 { 29 Console.WriteLine("异常:{0}, {1}",ex.GetType().Name,ex.Message); 30 foreach (var innerException in ex.InnerExceptions) 31 { 32 Console.WriteLine("异常:{0}, {1}", ex.InnerException.GetType().Name, ex.InnerException.Message); 33 } 34 }
(五)线程池
int nWorkerThreads; int nCompletionPortThreads; ThreadPool.GetMaxThreads(out nWorkerThreads, out nCompletionPortThreads); Console.WriteLine("线程池中辅助线程的最大数目:{0}, 线程池中异步 I/O 线程的最大数目:{1}",nWorkerThreads,nCompletionPortThreads); for (int i = 0; i < 5; i++) { ThreadPool.QueueUserWorkItem(JobForAThread); } Thread.Sleep(3000);
需要注意的是:线程池中的所有线程都是后台线程,不可设置线程优先级、名称,随着前台线程的结束而结束,只适用于短时间任务。
(六)Thread类
该类允许创建前台线程,以及设置线程的优先级。
1、给线程传递数据
static void Main(string[] args) { var t2 = new Thread(ThreadMainWithParameter); t2.Start("参数字符串"); Console.ReadKey(); } static void ThreadMainWithParameter(object message) { Console.WriteLine("运行主线程,接受参数:" + message.ToString()); }
如果使用了ParameterizedThreadStart委托,线程的入口点必须有一个object类型的参数,且返回类型为void。
2、后台线程
只要有一个前台线程在运行,应用程序的进程就在运行。如果多个前台线程在运行,而Main()方法结束了,应用程序的进程就仍然是激活的,直到所有前台线程完成其任务为止。
默认情况下,用Thread类创建的线程是前台线程,线程池中的线程是后台线程。Thread类创建线程时,可以设置IsBackground属性来确定创建前台还是后台线程。
static void Main(string[] args) { var t1 = new Thread(ThreadMain) { Name = "MyNewThread", IsBackground = false }; t1.Start(); Console.WriteLine("主线程现在结束"); Console.ReadKey(); } private static void ThreadMain() { Console.WriteLine(Thread.CurrentThread.Name+"线程开始运行"); Thread.Sleep(3000); Console.WriteLine(Thread.CurrentThread.Name+"线程结束"); }
在通过Thread类创建线程的时候,设置IsBackground属性为false,也就是创建一个前台线程。这种情况下在主线程结束时,t1不会结束。但如果将IsBackground设置为true,则会随着主线程的结束而结束。
3、线程的优先级
给线程指定优先级,就可以影响系统对线程的调度顺序。在Thread类中,可以设置Priority属性,以影响线程的基本优先级。
4、控制线程
读取Tread的ThreadState属性,可以获取线程的状态。
Thread的Start()方法创建线程,线程状态为UnStarted;
线程调度器选择运行后,线程转台改变为Running;
Thread.Sleep()方法会使线程处于WaitSleepJoin;
Thread类的Abort()方法,触发ThreadAbortException类型的异常,线程状态会变为AbortedRequested,如果没有重置终止则变为Aborted;
Thread.ResetAbort()方法可以让线程在触发ThreadAbortException异常后继续运行;
Thread类的Join()会停止当前线程,等待加入的线程完成为止,此时线程状态为WaitSleepJoin。
(七)线程问题
1、争用条件
如果两个或多个线程访问相同的对象,并且对共享状态的访问没有同步,就会出现争用条件。要避免该问题,可以锁定共享对象。这可以通过lock语句锁定在线程中共享的state变量。
private static readonly object locker = new object(); public void ChnageI(int i) { lock (locker) { if (i == 0) { i++; Console.WriteLine(i == 1); } i = 0; } }
2、死锁
由于两个线程都在等待对方,就出现了死锁,线程将无线等待下去。为了避免这个问题,可以在应用程序的体系架构中,从一开始就设计好锁定的顺序,也可以为锁定定义超时时间。
(八)同步
共享数据必须使用同步技术,确保一次只有一个线程访问和改变共享状态。可以使用lock语句、Interlocked类和Monitor类进行进程内部的同步。Mutex类、Event类、SemaphoreSlim类和ReaderWriterLockSlim类提供了多个进程之间的线程同步。
1、lock语句和线程安全
lock语句是设置锁定和解除锁定的一种简单方式。
在没有使用lock语句的情况下,多个线程操作共享数据,最后得到的结果没有一个会正确。
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 for (int j = 0; j < 5; j++) 6 { 7 int numTasks = 20; 8 var state = new SharedState(); 9 var tasks = new Task[numTasks]; 10 for (int i = 0; i < numTasks; i++) 11 { 12 tasks[i] = Task.Run(() => { new Job(state).DoTheJob(); }); 13 } 14 15 for (int i = 0; i < numTasks; i++) 16 { 17 tasks[i].Wait(); 18 } 19 Console.WriteLine("最后结果:{0}", state.State); 20 } 21 Console.ReadKey(); 22 } 23 } 24 25 public class SharedState 26 { 27 public int State { get; set; } 28 } 29 30 public class Job 31 { 32 SharedState sharedState; 33 public Job(SharedState sharedState) 34 { 35 this.sharedState = sharedState; 36 } 37 public void DoTheJob() 38 { 39 for (int i = 0; i < 50000; i++) 40 { 41 sharedState.State += 1; 42 } 43 } 44 }
使用lock语句,修改DoTheJob()方法,现在才能获得正确的结果。
private readonly object syncRoot = new object(); public void DoTheJob() { for (int i = 0; i < 50000; i++) { lock (syncRoot) { sharedState.State += 1; } } }
2、Interlocked类
Interlocked类用于使变量的简单语句原子化。
public int State { get { lock (this) { return ++state; } } } public int State { get { return Interlocked.Increment(ref state); } }
使用Interlocked类可以更快。
3、Monitor类
lock语句由C#编译器解析为使用Monitor类。
lock (syncRoot) { //代码 } //C#编译器将lock语句解析为 Monitor.Enter(syncRoot); try { //代码 } finally { Monitor.Exit(syncRoot); }
Monitor类相对于lock语句的优点是:可以通过调用TryEnter()方法添加一个等待被锁定的超时值。
bool lockTaken = false; Monitor.TryEnter(syncRoot, 1000, ref lockTaken); if (lockTaken) { //获取锁后操作 try { //代码 } finally { Monitor.Exit(syncRoot); } } else { //没有获取锁的操作 }
4、SpinLock结构
相对于Monitor垃圾回收导致过高的系统开销,使用SpinLock结构就能有效降低系统开销。SpinLock的使用方式与Monitor非常相似,但因为SpinLock是结构所以在把变量赋值为另一个变量会创建一个副本。
5、WaitHandle基类
WaitHandle是一个抽象基类,用于等待一个信号的设置。可以等待不同的信号,因为WaitHandle是一个基类,可以从中派生一些类。
6、Mutex类
Mutex(mutual exclusion,互斥)是.NET Framework中提供跨多个线程同步访问的一个类。
在Mutex类的构造函数中,可以指定互斥是否最初由主调线程拥有,定义互斥的名称,获得互斥是否存在的信息。
bool createdNew; var mutex = new Mutex(false, "MyMutex", out createdNew);
系统可以识别有名称的互斥,可以使用它来禁止应用程序启动两次。
bool createdNew; var mutex = new Mutex(false, "MyMutex", out createdNew); if (!createdNew) { Console.WriteLine("每次只能启动一个应用程序"); Environment.Exit(0); } Console.WriteLine("运行中...");
7、Semaphore类
信号量是一种计数的互斥锁。如果需要限制可以访问可用资源的线程数,信号量就很有用。
.NET4.5为信号量功能提供了两个类Semaphore和SemaphoreSlim。Semaphore类可以命名,使用系统范围内的资源,允许在不同进程之间同步。SemaphoreSlim类是对较短等待时间进行了优化的轻型版本。
1 static void Main(string[] args) 2 { 3 int taskCount = 6; 4 int semaphoreCount = 3; 5 var semaphore = new SemaphoreSlim(semaphoreCount, semaphoreCount); 6 var tasks = new Task[taskCount]; 7 8 9 for (int i = 0; i < taskCount; i++) 10 { 11 tasks[i] = Task.Run(() => 12 { 13 TaskMain(semaphore); 14 }); 15 } 16 17 Task.WaitAll(tasks); 18 19 Console.WriteLine("所有任务已结束"); 20 Console.ReadKey(); 21 } 22 23 24 private static void TaskMain(SemaphoreSlim semaphore) 25 { 26 bool isCompleted = false; 27 while (!isCompleted) 28 { 29 if (semaphore.Wait(600)) 30 { 31 try 32 { 33 Console.WriteLine("任务{0}锁定了信号", Task.CurrentId); 34 Thread.Sleep(2000); 35 } 36 finally 37 { 38 Console.WriteLine("任务{0}释放了信号", Task.CurrentId); 39 semaphore.Release(); 40 isCompleted = true; 41 } 42 } 43 else 44 { 45 Console.WriteLine("任务{0}超时,等待再次执行", Task.CurrentId); 46 } 47 } 48 }
8、Events类
与互斥和信号量对象一样,事件也是一个系统范围内的资源同步方法。为了从托管代码中使用系统事件,.NET Framework在System.Threading名称空间中提供了ManualResetEvent、AutoResetEvent、ManualResetEventSlim和CountdownEvent类。
C#中event关键字与这里的event类没有任何关系。
9、Barrier类
对于同步,Barrier类非常适用于其中工作有多个任务分支且以后又需要合并工作的情况。
10、ReaderWriterLockSlim类
为了使锁定机制允许锁定多个读取器访问某个资源,可以使用ReaderWriterLockSlim类。
(九)Timer类
.NET Framework提供了几个Timer类,用于在某个时间间隔后调用某个方法。System.Threading.Timer、System.Timers.Timer、System.WIndows.Forms.Timer、System.Web.UI.Timer和System.Windows.Threading.Timer。
(十)数据流
Parallel类、Task类和Parallel LINQ为数据并行性提供了很多帮助。但是,这些类不能直接支持数据流的处理,以及并行转换数据。这种情况下,使用System.Threading.Tasks.Dataflow名称空间中的相关类来处理。