引言
本文主要从线程的基础用法,CLR线程池当中工作者线程与I/O线程的开发,并行操作PLINQ等多个方面介绍多线程的开发。
其中委托的BeginInvoke方法以及回调函数最为常用。
而 I/O线程可能容易遭到大家的忽略,其实在开发多线程系统,更应该多留意I/O线程的操作。特别是在ASP.NET开发当中,可能更多人只会留意在客户端使用Ajax或者在服务器端使用UpdatePanel。其实合理使用I/O线程在通讯项目或文件下载时,能尽可能地减少IIS的压力。
并行编程是Framework4.0中极力推广的异步操作方式,更值得更深入地学习。
希望本篇文章能对各位的学习研究有所帮助,当中有所错漏的地方敬请点评。
目录
一、线程的定义
1. 1 进程、应用程序域与线程的关系
进程(Process)是Windows系统中的一个基本概念,它包含着一个运行程序所需要的资源。进程之间是相对独立的,一个进程无法访问另一个进程的数据(除非利用分布式计算方式),一个进程运行的失败也不会影响其他进程的运行,Windows系统就是利用进程把工作划分为多个独立的区域的。进程可以理解为一个程序的基本边界。
应用程序域(AppDomain)是一个程序运行的逻辑区域,它可以视为一个轻量级的进程,.NET的程序集正是在应用程序域中运行的,一个进程可以包含有多个应用程序域,一个应用程序域也可以包含多个程序集。在一个应用程序域中包含了一个或多个上下文context,使用上下文CLR就能够把某些特殊对象的状态放置在不同容器当中。
线程(Thread)是进程中的基本执行单元,在进程入口执行的第一个线程被视为这个进程的主线程。在.NET应用程序中,都是以Main()方法作为入口的,当调用此方法时系统就会自动创建一个主线程。线程主要是由CPU寄存器、调用栈和线程本地存储器(Thread Local Storage,TLS)组成的。CPU寄存器主要记录当前所执行线程的状态,调用栈主要用于维护线程所调用到的内存与数据,TLS主要用于存放线程的状态信息。
进程、应用程序域、线程的关系如下图,一个进程内可以包括多个应用程序域,也有包括多个线程,线程也可以穿梭于多个应用程序域当中。但在同一个时刻,线程只会处于一个应用程序域内。
由于本文是以介绍多线程技术为主题,对进程、应用程序域的介绍就到此为止。关于进程、线程、应用程序域的技术,在“C#综合揭秘——细说进程、应用程序域与上下文”会有详细介绍。
1.2 多线程
在单CPU系统的一个单位时间(time slice)内,CPU只能运行单个线程,运行顺序取决于线程的优先级别。如果在单位时间内线程未能完成执行,系统就会把线程的状态信息保存到线程的本地存储器(TLS) 中,以便下次执行时恢复执行。而多线程只是系统带来的一个假像,它在多个单位时间内进行多个线程的切换。因为切换频密而且单位时间非常短暂,所以多线程可被视作同时运行。
适当使用多线程能提高系统的性能,比如:在系统请求大容量的数据时使用多线程,把数据输出工作交给异步线程,使主线程保持其稳定性去处理其他问题。但需要注意一点,因为CPU需要花费不少的时间在线程的切换上,所以过多地使用多线程反而会导致性能的下降。
二、线程的基础知识
2.1 System.Threading.Thread类
System.Threading.Thread是用于控制线程的基础类,通过Thread可以控制当前应用程序域中线程的创建、挂起、停止、销毁。
它包括以下常用公共属性:
属性名称 | 说明 |
---|---|
CurrentContext | 获取线程正在其中执行的当前上下文。 |
CurrentThread | 获取当前正在运行的线程。 |
ExecutionContext | 获取一个 ExecutionContext 对象,该对象包含有关当前线程的各种上下文的信息。 |
IsAlive | 获取一个值,该值指示当前线程的执行状态。 |
IsBackground | 获取或设置一个值,该值指示某个线程是否为后台线程。 |
IsThreadPoolThread | 获取一个值,该值指示线程是否属于托管线程池。 |
ManagedThreadId | 获取当前托管线程的唯一标识符。 |
Name | 获取或设置线程的名称。 |
Priority | 获取或设置一个值,该值指示线程的调度优先级。 |
ThreadState | 获取一个值,该值包含当前线程的状态。 |
2.1.1 线程的标识符
ManagedThreadId是确认线程的唯一标识符,程序在大部分情况下都是通过Thread.ManagedThreadId来辨别线程的。而Name是一个可变值,在默认时候,Name为一个空值 Null,开发人员可以通过程序设置线程的名称,但这只是一个辅助功能。
2.1.2 线程的优先级别
.NET为线程设置了Priority属性来定义线程执行的优先级别,里面包含5个选项,其中Normal是默认值。除非系统有特殊要求,否则不应该随便设置线程的优先级别。
成员名称 | 说明 |
---|---|
Lowest | 可以将 Thread 安排在具有任何其他优先级的线程之后。 |
BelowNormal | 可以将 Thread 安排在具有 Normal 优先级的线程之后,在具有 Lowest 优先级的线程之前。 |
Normal | 默认选择。可以将 Thread 安排在具有 AboveNormal 优先级的线程之后,在具有BelowNormal 优先级的线程之前。 |
AboveNormal | 可以将 Thread 安排在具有 Highest 优先级的线程之后,在具有 Normal 优先级的线程之前。 |
Highest | 可以将 Thread 安排在具有任何其他优先级的线程之前。 |
2.1.3 线程的状态
通过ThreadState可以检测线程是处于Unstarted、Sleeping、Running 等等状态,它比 IsAlive 属性能提供更多的特定信息。
前面说过,一个应用程序域中可能包括多个上下文,而通过CurrentContext可以获取线程当前的上下文。
CurrentThread是最常用的一个属性,它是用于获取当前运行的线程。
2.1.4 System.Threading.Thread的方法
Thread 中包括了多个方法来控制线程的创建、挂起、停止、销毁,以后来的例子中会经常使用。
方法名称 | 说明 |
---|---|
Abort() | 终止本线程。 |
GetDomain() | 返回当前线程正在其中运行的当前域。 |
GetDomainId() | 返回当前线程正在其中运行的当前域Id。 |
Interrupt() | 中断处于 WaitSleepJoin 线程状态的线程。 |
Join() | 已重载。 阻塞调用线程,直到某个线程终止时为止。 |
Resume() | 继续运行已挂起的线程。 |
Start() | 执行本线程。 |
Suspend() | 挂起当前线程,如果当前线程已属于挂起状态则此不起作用 |
Sleep() | 把正在运行的线程挂起一段时间。 |
2.1.5 开发实例
以下这个例子,就是通过Thread显示当前线程信息
运行结果
2.2 System.Threading 命名空间
在System.Threading命名空间内提供多个方法来构建多线程应用程序,其中ThreadPool与Thread是多线程开发中最常用到的,在.NET中专门设定了一个CLR线程池专门用于管理线程的运行,这个CLR线程池正是通过ThreadPool类来管理。而Thread是管理线程的最直接方式,下面几节将详细介绍有关内容。
类 | 说明 |
AutoResetEvent | 通知正在等待的线程已发生事件。无法继承此类。 |
ExecutionContext | 管理当前线程的执行上下文。无法继承此类。 |
Interlocked | 为多个线程共享的变量提供原子操作。 |
Monitor | 提供同步对对象的访问的机制。 |
Mutex | 一个同步基元,也可用于进程间同步。 |
Thread | 创建并控制线程,设置其优先级并获取其状态。 |
ThreadAbortException | 在对 Abort 方法进行调用时引发的异常。无法继承此类。 |
ThreadPool | 提供一个线程池,该线程池可用于发送工作项、处理异步 I/O、代表其他线程等待以及处理计时器。 |
Timeout | 包含用于指定无限长的时间的常数。无法继承此类。 |
Timer | 提供以指定的时间间隔执行方法的机制。无法继承此类。 |
WaitHandle | 封装等待对共享资源的独占访问的操作系统特定的对象。 |
在System.Threading中的包含了下表中的多个常用委托,其中ThreadStart、ParameterizedThreadStart是最常用到的委托。
由ThreadStart生成的线程是最直接的方式,但由ThreadStart所生成并不受线程池管理。
而ParameterizedThreadStart是为异步触发带参数的方法而设的,在下一节将为大家逐一细说。
委托 | 说明 |
---|---|
ContextCallback | 表示要在新上下文中调用的方法。 |
ParameterizedThreadStart | 表示在 Thread 上执行的方法。 |
ThreadExceptionEventHandler | 表示将要处理 Application 的 ThreadException 事件的方法。 |
ThreadStart | 表示在 Thread 上执行的方法。 |
TimerCallback | 表示处理来自 Timer 的调用的方法。 |
WaitCallback | 表示线程池线程要执行的回调方法。 |
WaitOrTimerCallback | 表示当 WaitHandle 超时或终止时要调用的方法。 |
2.3 线程的管理方式
通过ThreadStart来创建一个新线程是最直接的方法,但这样创建出来的线程比较难管理,如果创建过多的线程反而会让系统的性能下载。有见及此,.NET为线程管理专门设置了一个CLR线程池,使用CLR线程池系统可以更合理地管理线程的使用。所有请求的服务都能运行于线程池中,当运行结束时线程便会回归到线程池。通过设置,能控制线程池的最大线程数量,在请求超出线程最大值时,线程池能按照操作的优先级别来执行,让部分操作处于等待状态,待有线程回归时再执行操作。
基础知识就为大家介绍到这里,下面将详细介绍多线程的开发。
三、以ThreadStart方式实现多线程
3.1 使用ThreadStart委托
这里先以一个例子体现一下多线程带来的好处,首先在Message类中建立一个方法ShowMessage(),里面显示了当前运行线程的Id,并使用Thread.Sleep(int ) 方法模拟部分工作。在main()中通过ThreadStart委托绑定Message对象的ShowMessage()方法,然后通过Thread.Start()执行异步方法。
1 public class Message
2 {
3 public void ShowMessage()
4 {
5 string message = string.Format("Async threadId is :{0}",
6 Thread.CurrentThread.ManagedThreadId);
7 Console.WriteLine(message);
8
9 for (int n = 0; n < 10; n++)
10 {
11 Thread.Sleep(300);
12 Console.WriteLine("The number is:" + n.ToString());
13 }
14 }
15 }
16
17 class Program
18 {
19 static void Main(string[] args)
20 {
21 Console.WriteLine("Main threadId is:"+
22 Thread.CurrentThread.ManagedThreadId);
23 Message message=new Message();
24 Thread thread = new Thread(new ThreadStart(message.ShowMessage));
25 thread.Start();
26 Console.WriteLine("Do something ..........!");
27 Console.WriteLine("Main thread working is complete!");
28
29 }
30 }
请注意运行结果,在调用Thread.Start()方法后,系统以异步方式运行Message.ShowMessage(),而主线程的操作是继续执行的,在Message.ShowMessage()完成前,主线程已完成所有的操作。
3.2 使用ParameterizedThreadStart委托
ParameterizedThreadStart委托与ThreadStart委托非常相似,但ParameterizedThreadStart委托是面向带参数方法的。注意ParameterizedThreadStart 对应方法的参数为object,此参数可以为一个值对象,也可以为一个自定义对象。
1 public class Person
2 {
3 public string Name
4 {
5 get;
6 set;
7 }
8 public int Age
9 {
10 get;
11 set;
12 }
13 }
14
15 public class Message
16 {
17 public void ShowMessage(object person)
18 {
19 if (person != null)
20 {
21 Person _person = (Person)person;
22 string message = string.Format(" {0}'s age is {1}! Async threadId is:{2}",
23 _person.Name,_person.Age,Thread.CurrentThread.ManagedThreadId);
24 Console.WriteLine(message);
25 }
26 for (int n = 0; n < 10; n++)
27 {
28 Thread.Sleep(300);
29 Console.WriteLine("The number is:" + n.ToString());
30 }
31 }
32 }
33
34 class Program
35 {
36 static void Main(string[] args)
37 {
38 Console.WriteLine("Main threadId is:"+Thread.CurrentThread.ManagedThreadId);
39
40 Message message=new Message();
41 //绑定带参数的异步方法
42 Thread thread = new Thread(new ParameterizedThreadStart(message.ShowMessage));
43 Person person = new Person();
44 person.Name = "Jack";
45 person.Age = 21;
46 thread.Start(person); //启动异步线程
47
48 Console.WriteLine("Do something ..........!");
49 Console.WriteLine("Main thread working is complete!");
50
51 }
52 }
运行结果:
3.3 前台线程与后台线程
注意以上两个例子都没有使用Console.ReadKey(),但系统依然会等待异步线程完成后才会结束。这是因为使用Thread.Start()启动的线程默认为前台线程,而系统必须等待所有前台线程运行结束后,应用程序域才会自动卸载。
在第二节曾经介绍过线程Thread有一个属性IsBackground,通过把此属性设置为true,就可以把线程设置为后台线程!这时应用程序域将在主线程完成时就被卸载,而不会等待异步线程的运行。
3.4 挂起线程
为了等待其他后台线程完成后再结束主线程,就可以使用Thread.Sleep()方法。
1 public class Message
2 {
3 public void ShowMessage()
4 {
5 string message = string.Format(" Async threadId is:{0}",
6 Thread.CurrentThread.ManagedThreadId);
7 Console.WriteLine(message);
8 for (int n = 0; n < 10; n++)
9 {
10 Thread.Sleep(300);
11 Console.WriteLine("The number is:" + n.ToString());
12 }
13 }
14 }
15
16 class Program
17 {
18 static void Main(string[] args)
19 {
20 Console.WriteLine("Main threadId is:"+
21 Thread.CurrentThread.ManagedThreadId);
22
23 Message message=new Message();
24 Thread thread = new Thread(new ThreadStart(message.ShowMessage));
25 thread.IsBackground = true;
26 thread.Start();
27
28 Console.WriteLine("Do something ..........!");
29 Console.WriteLine("Main thread working is complete!");
30 Console.WriteLine("Main thread sleep!");
31 Thread.Sleep(5000);
32 }
33 }
运行结果如下,此时应用程序域将在主线程运行5秒后自动结束
但系统无法预知异步线程需要运行的时间,所以用通过Thread.Sleep(int)阻塞主线程并不是一个好的解决方法。有见及此,.NET专门为等待异步线程完成开发了另一个方法thread.Join()。把上面例子中的最后一行Thread.Sleep(5000)修改为 thread.Join() 就能保证主线程在异步线程thread运行结束后才会终止。
3.5 Suspend 与 Resume (慎用)
Thread.Suspend()与 Thread.Resume()是在Framework1.0 就已经存在的老方法了,它们分别可以挂起、恢复线程。但在Framework2.0中就已经明确排斥这两个方法。这是因为一旦某个线程占用了已有的资源,再使用Suspend()使线程长期处于挂起状态,当在其他线程调用这些资源的时候就会引起死锁!所以在没有必要的情况下应该避免使用这两个方法。
3.6 终止线程
若想终止正在运行的线程,可以使用Abort()方法。在使用Abort()的时候,将引发一个特殊异常 ThreadAbortException 。
若想在线程终止前恢复线程的执行,可以在捕获异常后 ,在catch(ThreadAbortException ex){...} 中调用Thread.ResetAbort()取消终止。
而使用Thread.Join()可以保证应用程序域等待异步线程结束后才终止运行。
1 static void Main(string[] args)
2 {
3 Console.WriteLine("Main threadId is:" +
4 Thread.CurrentThread.ManagedThreadId);
5
6 Thread thread = new Thread(new ThreadStart(AsyncThread));
7 thread.IsBackground = true;
8 thread.Start();
9 thread.Join();
10
11 }
12
13 //以异步方式调用
14 static void AsyncThread()
15 {
16 try
17 {
18 string message = string.Format(" Async threadId is:{0}",
19 Thread.CurrentThread.ManagedThreadId);
20 Console.WriteLine(message);
21
22 for (int n = 0; n < 10; n++)
23 {
24 //当n等于4时,终止线程
25 if (n >= 4)
26 {
27 Thread.CurrentThread.Abort(n);
28 }
29 Thread.Sleep(300);
30 Console.WriteLine("The number is:" + n.ToString());
31 }
32 }
33 catch (ThreadAbortException ex)
34 {
35 //输出终止线程时n的值
36 if (ex.ExceptionState != null)
37 Console.WriteLine(string.Format("Thread abort when the number is: {0}!",
38 ex.ExceptionState.ToString()));
39
40 //取消终止,继续执行线程
41 Thread.ResetAbort();
42 Console.WriteLine("Thread ResetAbort!");
43 }
44
45 //线程结束
46 Console.WriteLine("Thread Close!");
47 }
运行结果如下
四、CLR线程池的工作者线程
4.1 关于CLR线程池
使用ThreadStart与ParameterizedThreadStart建立新线程非常简单,但通过此方法建立的线程难于管理,若建立过多的线程反而会影响系统的性能。
有见及此,.NET引入CLR线程池这个概念。CLR线程池并不会在CLR初始化的时候立刻建立线程,而是在应用程序要创建线程来执行任务时,线程池才初始化一个线程。线程的初始化与其他的线程一样。在完成任务以后,该线程不会自行销毁,而是以挂起的状态返回到线程池。直到应用程序再次向线程池发出请求时,线程池里挂起的线程就会再度激活执行任务。这样既节省了建立线程所造成的性能损耗,也可以让多个任务反复重用同一线程,从而在应用程序生存期内节约大量开销。
注意:通过CLR线程池所建立的线程总是默认为后台线程,优先级数为ThreadPriority.Normal。
4.2 工作者线程与I/O线程
CLR线程池分为工作者线程(workerThreads)与I/O线程 (completionPortThreads) 两种,工作者线程是主要用作管理CLR内部对象的运作,I/O(Input/Output) 线程顾名思义是用于与外部系统交换信息,IO线程的细节将在下一节详细说明。
通过ThreadPool.GetMax(out int workerThreads,out int completionPortThreads )和 ThreadPool.SetMax( int workerThreads, int completionPortThreads)两个方法可以分别读取和设置CLR线程池中工作者线程与I/O线程的最大线程数。在Framework2.0中最大线程默认为25*CPU数,在Framewok3.0、4.0中最大线程数默认为250*CPU数,在近年 I3,I5,I7 CPU出现后,线程池的最大值一般默认为1000、2000。
若想测试线程池中有多少的线程正在投入使用,可以通过ThreadPool.GetAvailableThreads( out int workerThreads,out int completionPortThreads ) 方法。
使用CLR线程池的工作者线程一般有两种方式,一是直接通过 ThreadPool.QueueUserWorkItem() 方法,二是通过委托,下面将逐一细说。
4.3 通过QueueUserWorkItem启动工作者线程
ThreadPool线程池中包含有两个静态方法可以直接启动工作者线程:
一为 ThreadPool.QueueUserWorkItem(WaitCallback)
二为 ThreadPool.QueueUserWorkItem(WaitCallback,Object)
先把WaitCallback委托指向一个带有Object参数的无返回值方法,再使用 ThreadPool.QueueUserWorkItem(WaitCallback) 就可以异步启动此方法,此时异步方法的参数被视为null 。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //把CLR线程池的最大值设置为1000
6 ThreadPool.SetMaxThreads(1000, 1000);
7 //显示主线程启动时线程池信息
8 ThreadMessage("Start");
9 //启动工作者线程
10 ThreadPool.QueueUserWorkItem(new WaitCallback(AsyncCallback));
11 Console.ReadKey();
12 }
13
14 static void AsyncCallback(object state)
15 {
16 Thread.Sleep(200);
17 ThreadMessage("AsyncCallback");
18 Console.WriteLine("Async thread do work!");
19 }
20
21 //显示线程现状
22 static void ThreadMessage(string data)
23 {
24 string message = string.Format("{0} CurrentThreadId is {1}",
25 data, Thread.CurrentThread.ManagedThreadId);
26 Console.WriteLine(message);
27 }
28 }
运行结果
使用 ThreadPool.QueueUserWorkItem(WaitCallback,Object) 方法可以把object对象作为参数传送到回调函数中。
下面例子中就是把一个string对象作为参数发送到回调函数当中。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //把线程池的最大值设置为1000
6 ThreadPool.SetMaxThreads(1000, 1000);
7
8 ThreadMessage("Start");
9 ThreadPool.QueueUserWorkItem(new WaitCallback(AsyncCallback),"Hello Elva");
10 Console.ReadKey();
11 }
12
13 static void AsyncCallback(object state)
14 {
15 Thread.Sleep(200);
16 ThreadMessage("AsyncCallback");
17
18 string data = (string)state;
19 Console.WriteLine("Async thread do work! "+data);
20 }
21
22 //显示线程现状
23 static void ThreadMessage(string data)
24 {
25 string message = string.Format("{0} CurrentThreadId is {1}",
26 data, Thread.CurrentThread.ManagedThreadId);
27 Console.WriteLine(message);
28 }
29 }
运行结果
通过ThreadPool.QueueUserWorkItem启动工作者线程虽然是方便,但WaitCallback委托指向的必须是一个带有Object参数的无返回值方法,这无疑是一种限制。若方法需要有返回值,或者带有多个参数,这将多费周折。有见及此,.NET提供了另一种方式去建立工作者线程,那就是委托。
4.4 委托类
使用CLR线程池中的工作者线程,最灵活最常用的方式就是使用委托的异步方法,在此先简单介绍一下委托类。
当定义委托后,.NET就会自动创建一个代表该委托的类,下面可以用反射方式显示委托类的方法成员(对反射有兴趣的朋友可以先参考一下“.NET基础篇——反射的奥妙”)
1 class Program
2 {
3 delegate void MyDelegate();
4
5 static void Main(string[] args)
6 {
7 MyDelegate delegate1 = new MyDelegate(AsyncThread);
8 //显示委托类的几个方法成员
9 var methods=delegate1.GetType().GetMethods();
10 if (methods != null)
11 foreach (MethodInfo info in methods)
12 Console.WriteLine(info.Name);
13 Console.ReadKey();
14 }
15 }
委托类包括以下几个重要方法
1 public class MyDelegate:MulticastDelegate
2 {
3 public MyDelegate(object target, int methodPtr);
4 //调用委托方法
5 public virtual void Invoke();
6 //异步委托
7 public virtual IAsyncResult BeginInvoke(AsyncCallback callback,object state);
8 public virtual void EndInvoke(IAsyncResult result);
9 }
当调用Invoke()方法时,对应此委托的所有方法都会被执行。而BeginInvoke与EndInvoke则支持委托方法的异步调用,由BeginInvoke启动的线程都属于CLR线程池中的工作者线程,在下面将详细说明。
4.5 利用BeginInvoke与EndInvoke完成异步委托方法
首先建立一个委托对象,通过IAsyncResult BeginInvoke(string name,AsyncCallback callback,object state) 异步调用委托方法,BeginInvoke 方法除最后的两个参数外,其它参数都是与方法参数相对应的。通过 BeginInvoke 方法将返回一个实现了 System.IAsyncResult 接口的对象,之后就可以利用EndInvoke(IAsyncResult ) 方法就可以结束异步操作,获取委托的运行结果。
1 class Program
2 {
3 delegate string MyDelegate(string name);
4
5 static void Main(string[] args)
6 {
7 ThreadMessage("Main Thread");
8
9 //建立委托
10 MyDelegate myDelegate = new MyDelegate(Hello);
11 //异步调用委托,获取计算结果
12 IAsyncResult result=myDelegate.BeginInvoke("Leslie", null, null);
13 //完成主线程其他工作
14 .............
15 //等待异步方法完成,调用EndInvoke(IAsyncResult)获取运行结果
16 string data=myDelegate.EndInvoke(result);
17 Console.WriteLine(data);
18
19 Console.ReadKey();
20 }
21
22 static string Hello(string name)
23 {
24 ThreadMessage("Async Thread");
25 Thread.Sleep(2000); //虚拟异步工作
26 return "Hello " + name;
27 }
28
29 //显示当前线程
30 static void ThreadMessage(string data)
31 {
32 string message = string.Format("{0} ThreadId is:{1}",
33 data,Thread.CurrentThread.ManagedThreadId);
34 Console.WriteLine(message);
35 }
36 }
运行结果
4.6 善用IAsyncResult
在以上例子中可以看见,如果在使用myDelegate.BeginInvoke后立即调用myDelegate.EndInvoke,那在异步线程未完成工作以前主线程将处于阻塞状态,等到异步线程结束获取计算结果后,主线程才能继续工作,这明显无法展示出多线程的优势。此时可以好好利用IAsyncResult 提高主线程的工作性能,IAsyncResult有以下成员:
1 public interface IAsyncResult
2 {
3 object AsyncState {get;} //获取用户定义的对象,它限定或包含关于异步操作的信息。
4 WailHandle AsyncWaitHandle {get;} //获取用于等待异步操作完成的 WaitHandle。
5 bool CompletedSynchronously {get;} //获取异步操作是否同步完成的指示。
6 bool IsCompleted {get;} //获取异步操作是否已完成的指示。
7 }
通过轮询方式,使用IsCompleted属性判断异步操作是否完成,这样在异步操作未完成前就可以让主线程执行另外的工作。
1 class Program
2 {
3 delegate string MyDelegate(string name);
4
5 static void Main(string[] args)
6 {
7 ThreadMessage("Main Thread");
8
9 //建立委托
10 MyDelegate myDelegate = new MyDelegate(Hello);
11 //异步调用委托,获取计算结果
12 IAsyncResult result=myDelegate.BeginInvoke("Leslie", null, null);
13 //在异步线程未完成前执行其他工作
14 while (!result.IsCompleted)
15 {
16 Thread.Sleep(200); //虚拟操作
17 Console.WriteLine("Main thead do work!");
18 }
19 string data=myDelegate.EndInvoke(result);
20 Console.WriteLine(data);
21
22 Console.ReadKey();
23 }
24
25 static string Hello(string name)
26 {
27 ThreadMessage("Async Thread");
28 Thread.Sleep(2000);
29 return "Hello " + name;
30 }
31
32 static void ThreadMessage(string data)
33 {
34 string message = string.Format("{0} ThreadId is:{1}",
35 data,Thread.CurrentThread.ManagedThreadId);
36 Console.WriteLine(message);
37 }
38 }
运行结果:
除此以外,也可以使用WailHandle完成同样的工作,WaitHandle里面包含有一个方法WaitOne(int timeout),它可以判断委托是否完成工作,在工作未完成前主线程可以继续其他工作。运行下面代码可得到与使用 IAsyncResult.IsCompleted 同样的结果,而且更简单方便 。
1 namespace Test
2 {
3 class Program
4 {
5 delegate string MyDelegate(string name);
6
7 static void Main(string[] args)
8 {
9 ThreadMessage("Main Thread");
10
11 //建立委托
12 MyDelegate myDelegate = new MyDelegate(Hello);
13
14 //异步调用委托,获取计算结果
15 IAsyncResult result=myDelegate.BeginInvoke("Leslie", null, null);
16
17 while (!result.AsyncWaitHandle.WaitOne(200))
18 {
19 Console.WriteLine("Main thead do work!");
20 }
21 string data=myDelegate.EndInvoke(result);
22 Console.WriteLine(data);
23
24 Console.ReadKey();
25 }
26
27 static string Hello(string name)
28 {
29 ThreadMessage("Async Thread");
30 Thread.Sleep(2000);
31 return "Hello " + name;
32 }
33
34 static void ThreadMessage(string data)
35 {
36 string message = string.Format("{0} ThreadId is:{1}",
37 data,Thread.CurrentThread.ManagedThreadId);
38 Console.WriteLine(message);
39 }
40 }
当要监视多个运行对象的时候,使用IAsyncResult.WaitHandle.WaitOne可就派不上用场了。
幸好.NET为WaitHandle准备了另外两个静态方法:WaitAny(waitHandle[], int)与WaitAll (waitHandle[] , int)。
其中WaitAll在等待所有waitHandle完成后再返回一个bool值。
而WaitAny是等待其中一个waitHandle完成后就返回一个int,这个int是代表已完成waitHandle在waitHandle[]中的数组索引。
下面就是使用WaitAll的例子,运行结果与使用 IAsyncResult.IsCompleted 相同。
1 class Program
2 {
3 delegate string MyDelegate(string name);
4
5 static void Main(string[] args)
6 {
7 ThreadMessage("Main Thread");
8
9 //建立委托
10 MyDelegate myDelegate = new MyDelegate(Hello);
11
12 //异步调用委托,获取计算结果
13 IAsyncResult result=myDelegate.BeginInvoke("Leslie", null, null);
14
15 //此处可加入多个检测对象
16 WaitHandle[] waitHandleList = new WaitHandle[] { result.AsyncWaitHandle,........ };
17 while (!WaitHandle.WaitAll(waitHandleList,200))
18 {
19 Console.WriteLine("Main thead do work!");
20 }
21 string data=myDelegate.EndInvoke(result);
22 Console.WriteLine(data);
23
24 Console.ReadKey();
25 }
26
27 static string Hello(string name)
28 {
29 ThreadMessage("Async Thread");
30 Thread.Sleep(2000);
31 return "Hello " + name;
32 }
33
34 static void ThreadMessage(string data)
35 {
36 string message = string.Format("{0} ThreadId is:{1}",
37 data,Thread.CurrentThread.ManagedThreadId);
38 Console.WriteLine(message);
39 }
40 }
4.7 回调函数
使用轮询方式来检测异步方法的状态非常麻烦,而且效率不高,有见及此,.NET为 IAsyncResult BeginInvoke(AsyncCallback , object)准备了一个回调函数。使用 AsyncCallback 就可以绑定一个方法作为回调函数,回调函数必须是带参数 IAsyncResult 且无返回值的方法: void AsycnCallbackMethod(IAsyncResult result) 。在BeginInvoke方法完成后,系统就会调用AsyncCallback所绑定的回调函数,最后回调函数中调用 XXX EndInvoke(IAsyncResult result) 就可以结束异步方法,它的返回值类型与委托的返回值一致。
1 class Program
2 {
3 delegate string MyDelegate(string name);
4
5 static void Main(string[] args)
6 {
7 ThreadMessage("Main Thread");
8
9 //建立委托
10 MyDelegate myDelegate = new MyDelegate(Hello);
11 //异步调用委托,获取计算结果
12 myDelegate.BeginInvoke("Leslie", new AsyncCallback(Completed), null);
13 //在启动异步线程后,主线程可以继续工作而不需要等待
14 for (int n = 0; n < 6; n++)
15 Console.WriteLine(" Main thread do work!");
16 Console.WriteLine("");
17
18 Console.ReadKey();
19 }
20
21 static string Hello(string name)
22 {
23 ThreadMessage("Async Thread");
24 Thread.Sleep(2000); \模拟异步操作
25 return " Hello " + name;
26 }
27
28 static void Completed(IAsyncResult result)
29 {
30 ThreadMessage("Async Completed");
31
32 //获取委托对象,调用EndInvoke方法获取运行结果
33 AsyncResult _result = (AsyncResult)result;
34 MyDelegate myDelegate = (MyDelegate)_result.AsyncDelegate;
35 string data = myDelegate.EndInvoke(_result);
36 Console.WriteLine(data);
37 }
38
39 static void ThreadMessage(string data)
40 {
41 string message = string.Format("{0} ThreadId is:{1}",
42 data, Thread.CurrentThread.ManagedThreadId);
43 Console.WriteLine(message);
44 }
45 }
可以看到,主线在调用BeginInvoke方法可以继续执行其他命令,而无需再等待了,这无疑比使用轮询方式判断异步方法是否完成更有优势。
在异步方法执行完成后将会调用AsyncCallback所绑定的回调函数,注意一点,回调函数依然是在异步线程中执行,这样就不会影响主线程的运行,这也使用回调函数最值得青昧的地方。
在回调函数中有一个既定的参数IAsyncResult,把IAsyncResult强制转换为AsyncResult后,就可以通过 AsyncResult.AsyncDelegate 获取原委托,再使用EndInvoke方法获取计算结果。
运行结果如下:
如果想为回调函数传送一些外部信息,就可以利用BeginInvoke(AsyncCallback,object)的最后一个参数object,它允许外部向回调函数输入任何类型的参数。只需要在回调函数中利用 AsyncResult.AsyncState 就可以获取object对象。
1 class Program
2 {
3 public class Person
4 {
5 public string Name;
6 public int Age;
7 }
8
9 delegate string MyDelegate(string name);
10
11 static void Main(string[] args)
12 {
13 ThreadMessage("Main Thread");
14
15 //建立委托
16 MyDelegate myDelegate = new MyDelegate(Hello);
17
18 //建立Person对象
19 Person person = new Person();
20 person.Name = "Elva";
21 person.Age = 27;
22
23 //异步调用委托,输入参数对象person, 获取计算结果
24 myDelegate.BeginInvoke("Leslie", new AsyncCallback(Completed), person);
25
26 //在启动异步线程后,主线程可以继续工作而不需要等待
27 for (int n = 0; n < 6; n++)
28 Console.WriteLine(" Main thread do work!");
29 Console.WriteLine("");
30
31 Console.ReadKey();
32 }
33
34 static string Hello(string name)
35 {
36 ThreadMessage("Async Thread");
37 Thread.Sleep(2000);
38 return " Hello " + name;
39 }
40
41 static void Completed(IAsyncResult result)
42 {
43 ThreadMessage("Async Completed");
44
45 //获取委托对象,调用EndInvoke方法获取运行结果
46 AsyncResult _result = (AsyncResult)result;
47 MyDelegate myDelegate = (MyDelegate)_result.AsyncDelegate;
48 string data = myDelegate.EndInvoke(_result);
49 //获取Person对象
50 Person person = (Person)result.AsyncState;
51 string message = person.Name + "'s age is " + person.Age.ToString();
52
53 Console.WriteLine(data+" "+message);
54 }
55
56 static void ThreadMessage(string data)
57 {
58 string message = string.Format("{0} ThreadId is:{1}",
59 data, Thread.CurrentThread.ManagedThreadId);
60 Console.WriteLine(message);
61 }
62 }
运行结果:
五、CLR线程池的I/O线程
在前一节所介绍的线程都属于CLR线程池的工作者线程,这一节开始为大家介绍一下CLR线程池的I/O线程
I/O 线程是.NET专为访问外部资源所设置的一种线程,因为访问外部资源常常要受到外界因素的影响,为了防止让主线程受影响而长期处于阻塞状态,.NET为多个I/O操作都建立起了异步方法,例如:FileStream、TCP/IP、WebRequest、WebService等等,而且每个异步方法的使用方式都非常类似,都是以BeginXXX为开始,以EndXXX结束,下面为大家一一解说。
5.1 异步读写 FileStream
需要在 FileStream 异步调用 I/O线程,必须使用以下构造函数建立 FileStream 对象,并把useAsync设置为 true。
FileStream stream = new FileStream ( string path, FileMode mode, FileAccess access, FileShare share, int bufferSize,bool useAsync ) ;
其中 path 是文件的相对路径或绝对路径; mode 确定如何打开或创建文件; access 确定访问文件的方式; share 确定文件如何进程共享; bufferSize 是代表缓冲区大小,一般默认最小值为8,在启动异步读取或写入时,文件大小一般大于缓冲大小; userAsync代表是否启动异步I/O线程。
5.1.1 异步写入
FileStream中包含BeginWrite、EndWrite 方法可以启动I/O线程进行异步写入。
public override IAsyncResult BeginWrite ( byte[] array, int offset, int numBytes, AsyncCallback userCallback, Object stateObject )
public override void EndWrite (IAsyncResult asyncResult )
BeginWrite 返回值为IAsyncResult, 使用方式与委托的BeginInvoke方法相似,最好就是使用回调函数,避免线程阻塞。在最后两个参数中,参数AsyncCallback用于绑定回调函数; 参数Object用于传递外部数据。要注意一点:AsyncCallback所绑定的回调函数必须是带单个 IAsyncResult 参数的无返回值方法。
在例子中,把FileStream作为外部数据传递到回调函数当中,然后在回调函数中利用IAsyncResult.AsyncState获取FileStream对象,最后通过FileStream.EndWrite(IAsyncResult)结束写入。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //把线程池的最大值设置为1000
6 ThreadPool.SetMaxThreads(1000, 1000);
7 ThreadPoolMessage("Start");
8
9 //新立文件File.sour
10 FileStream stream = new FileStream("File.sour", FileMode.OpenOrCreate,
11 FileAccess.ReadWrite,FileShare.ReadWrite,1024,true);
12 byte[] bytes = new byte[16384];
13 string message = "An operating-system ThreadId has no fixed relationship........";
14 bytes = Encoding.Unicode.GetBytes(message);
15
16 //启动异步写入
17 stream.BeginWrite(bytes, 0, (int)bytes.Length,new AsyncCallback(Callback),stream);
18 stream.Flush();
19
20 Console.ReadKey();
21 }
22
23 static void Callback(IAsyncResult result)
24 {
25 //显示线程池现状
26 Thread.Sleep(200);
27 ThreadPoolMessage("AsyncCallback");
28 //结束异步写入
29 FileStream stream = (FileStream)result.AsyncState;
30 stream.EndWrite(result);
31 stream.Close();
32 }
33
34 //显示线程池现状
35 static void ThreadPoolMessage(string data)
36 {
37 int a, b;
38 ThreadPool.GetAvailableThreads(out a, out b);
39 string message = string.Format("{0} CurrentThreadId is {1} "+
40 "WorkerThreads is:{2} CompletionPortThreads is :{3}",
41 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
42 Console.WriteLine(message);
43 }
44 }
由输出结果可以看到,在使用FileStream.BeginWrite方法后,系统将自动启动CLR线程池中I/O线程。
5.1.2 异步读取
FileStream 中包含 BeginRead 与 EndRead 可以异步调用I/O线程进行读取。
public override IAsyncResult BeginRead ( byte[] array,int offset,int numBytes, AsyncCallback userCallback,Object stateObject)
public override int EndRead(IAsyncResult asyncResult)
其使用方式与BeginWrite和EndWrite相似,AsyncCallback用于绑定回调函数; Object用于传递外部数据。在回调函数只需要使用IAsyncResut.AsyncState就可获取外部数据。EndWrite 方法会返回从流读取到的字节数量。
首先定义 FileData 类,里面包含FileStream对象,byte[] 数组和长度。然后把FileData对象作为外部数据传到回调函数,在回调函数中,把IAsyncResult.AsyncState强制转换为FileData,然后通过FileStream.EndRead(IAsyncResult)结束读取。最后比较一下长度,若读取到的长度与输入的数据长度不一至,则抛出异常。
1 class Program
2 {
3 public class FileData
4 {
5 public FileStream Stream;
6 public int Length;
7 public byte[] ByteData;
8 }
9
10 static void Main(string[] args)
11 {
12 //把线程池的最大值设置为1000
13 ThreadPool.SetMaxThreads(1000, 1000);
14 ThreadPoolMessage("Start");
15 ReadFile();
16
17 Console.ReadKey();
18 }
19
20 static void ReadFile()
21 {
22 byte[] byteData=new byte[80961024];
23 FileStream stream = new FileStream("File1.sour", FileMode.OpenOrCreate,
24 FileAccess.ReadWrite, FileShare.ReadWrite, 1024, true);
25
26 //把FileStream对象,byte[]对象,长度等有关数据绑定到FileData对象中,以附带属性方式送到回调函数
27 FileData fileData = new FileData();
28 fileData.Stream = stream;
29 fileData.Length = (int)stream.Length;
30 fileData.ByteData = byteData;
31
32 //启动异步读取
33 stream.BeginRead(byteData, 0, fileData.Length, new AsyncCallback(Completed), fileData);
34 }
35
36 static void Completed(IAsyncResult result)
37 {
38 ThreadPoolMessage("Completed");
39
40 //把AsyncResult.AsyncState转换为FileData对象,以FileStream.EndRead完成异步读取
41 FileData fileData = (FileData)result.AsyncState;
42 int length=fileData.Stream.EndRead(result);
43 fileData.Stream.Close();
44
45 //如果读取到的长度与输入长度不一致,则抛出异常
46 if (length != fileData.Length)
47 throw new Exception("Stream is not complete!");
48
49 string data=Encoding.ASCII.GetString(fileData.ByteData, 0, fileData.Length);
50 Console.WriteLine(data.Substring(2,22));
51 }
52
53 //显示线程池现状
54 static void ThreadPoolMessage(string data)
55 {
56 int a, b;
57 ThreadPool.GetAvailableThreads(out a, out b);
58 string message = string.Format("{0} CurrentThreadId is {1} "+
59 "WorkerThreads is:{2} CompletionPortThreads is :{3}",
60 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
61 Console.WriteLine(message);
62 }
63
64 }
由输出结果可以看到,在使用FileStream.BeginRead方法后,系统将自动启动CLR线程池中I/O线程。
5.2 异步操作TCP/IP套接字
在介绍 TCP/IP 套接字前先简单介绍一下 NetworkStream 类,它是用于网络访问的基础数据流。 NetworkStream 提供了好几个方法控制套接字数据的发送与接收, 其中BeginRead、EndRead、BeginWrite、EndWrite 能够实现异步操作,而且异步线程是来自于CLR线程池的I/O线程。
public override int ReadByte ()
public override int Read (byte[] buffer,int offset, int size)
public override void WriteByte (byte value)
public override void Write (byte[] buffer,int offset, int size)
public override IAsyncResult BeginRead (byte [] buffer, int offset, int size, AsyncCallback callback, Object state )
public override int EndRead(IAsyncResult result)
public override IAsyncResult BeginWrite (byte [] buffer, int offset, int size, AsyncCallback callback, Object state )
public override void EndWrite(IAsyncResult result)
若要创建 NetworkStream,必须提供已连接的 Socket。而在.NET中使用TCP/IP套接字不需要直接与Socket打交道,因为.NET把Socket的大部分操作都放在System.Net.TcpListener和System.Net.Sockets.TcpClient里面,这两个类大大地简化了Socket的操作。一般套接字对象Socket包含一个Accept()方法,此方法能产生阻塞来等待客户端的请求,而在TcpListener类里也包含了一个相似的方法 public TcpClient AcceptTcpClient()用于等待客户端的请求。此方法将会返回一个TcpClient 对象,通过 TcpClient 的 public NetworkStream GetStream()方法就能获取NetworkStream对象,控制套接字数据的发送与接收。
下面以一个例子说明异步调用TCP/IP套接字收发数据的过程。
首先在服务器端建立默认地址127.0.0.1用于收发信息,使用此地址与端口500新建TcpListener对象,调用TcpListener.Start 侦听传入的连接请求,再使用一个死循环来监听信息。
在ChatClient类包括有接收信息与发送信息两个功能:当接收到客户端请求时,它会利用 NetworkStream.BeginRead 读取客户端信息,并在回调函数ReceiveAsyncCallback中输出信息内容,若接收到的信息的大小小于1时,它将会抛出一个异常。当信息成功接收后,再使用 NetworkStream.BeginWrite 方法回馈信息到客户端
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //设置CLR线程池最大线程数
6 ThreadPool.SetMaxThreads(1000, 1000);
7
8 //默认地址为127.0.0.1
9 IPAddress ipAddress = IPAddress.Parse("127.0.0.1");
10 TcpListener tcpListener = new TcpListener(ipAddress, 500);
11 tcpListener.Start();
12
13 //以一个死循环来实现监听
14 while (true)
15 { //调用一个ChatClient对象来实现监听
16 ChatClient chatClient = new ChatClient(tcpListener.AcceptTcpClient());
17 }
18 }
19 }
20
21 public class ChatClient
22 {
23 static TcpClient tcpClient;
24 static byte[] byteMessage;
25 static string clientEndPoint;
26
27 public ChatClient(TcpClient tcpClient1)
28 {
29 tcpClient = tcpClient1;
30 byteMessage = new byte[tcpClient.ReceiveBufferSize];
31
32 //显示客户端信息
33 clientEndPoint = tcpClient.Client.RemoteEndPoint.ToString();
34 Console.WriteLine("Client's endpoint is " + clientEndPoint);
35
36 //使用NetworkStream.BeginRead异步读取信息
37 NetworkStream networkStream = tcpClient.GetStream();
38 networkStream.BeginRead(byteMessage, 0, tcpClient.ReceiveBufferSize,
39 new AsyncCallback(ReceiveAsyncCallback), null);
40 }
41
42 public void ReceiveAsyncCallback(IAsyncResult iAsyncResult)
43 {
44 //显示CLR线程池状态
45 Thread.Sleep(100);
46 ThreadPoolMessage(" Message is receiving");
47
48 //使用NetworkStream.EndRead结束异步读取
49 NetworkStream networkStreamRead = tcpClient.GetStream();
50 int length=networkStreamRead.EndRead(iAsyncResult);
51
52 //如果接收到的数据长度少于1则抛出异常
53 if (length < 1)
54 {
55 tcpClient.GetStream().Close();
56 throw new Exception("Disconnection!");
57 }
58
59 //显示接收信息
60 string message = Encoding.UTF8.GetString(byteMessage, 0, length);
61 Console.WriteLine("Message:" + message);
62
63 //使用NetworkStream.BeginWrite异步发送信息
64 byte[] sendMessage = Encoding.UTF8.GetBytes("Message is received!");
65 NetworkStream networkStreamWrite=tcpClient.GetStream();
66 networkStreamWrite.BeginWrite(sendMessage, 0, sendMessage.Length,
67 new AsyncCallback(SendAsyncCallback), null);
68 }
69
70 //把信息转换成二进制数据,然后发送到客户端
71 public void SendAsyncCallback(IAsyncResult iAsyncResult)
72 {
73 //显示CLR线程池状态
74 Thread.Sleep(100);
75 ThreadPoolMessage(" Message is sending");
76
77 //使用NetworkStream.EndWrite结束异步发送
78 tcpClient.GetStream().EndWrite(iAsyncResult);
79
80 //重新监听
81 tcpClient.GetStream().BeginRead(byteMessage, 0, tcpClient.ReceiveBufferSize,
82 new AsyncCallback(ReceiveAsyncCallback), null);
83 }
84
85 //显示线程池现状
86 static void ThreadPoolMessage(string data)
87 {
88 int a, b;
89 ThreadPool.GetAvailableThreads(out a, out b);
90 string message = string.Format("{0} CurrentThreadId is {1} " +
91 "WorkerThreads is:{2} CompletionPortThreads is :{3} ",
92 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
93
94 Console.WriteLine(message);
95 }
96 }
而在客户端只是使用简单的开发方式,利用TcpClient连接到服务器端,然后调用NetworkStream.Write方法发送信息,最后调用NetworkStream.Read方法读取回馈信息
1 static void Main(string[] args)
2 {
3 //连接服务端
4 TcpClient tcpClient = new TcpClient("127.0.0.1", 500);
5
6 //发送信息
7 NetworkStream networkStream = tcpClient.GetStream();
8 byte[] sendMessage = Encoding.UTF8.GetBytes("Client request connection!");
9 networkStream.Write(sendMessage, 0, sendMessage.Length);
10 networkStream.Flush();
11
12 //接收信息
13 byte[] receiveMessage=new byte[1024];
14 int count=networkStream.Read(receiveMessage, 0,1024);
15 Console.WriteLine(Encoding.UTF8.GetString(receiveMessage));
16 Console.ReadKey();
17 }
注意观察运行结果,服务器端的异步操作线程都是来自于CLR线程池的I/O线程
5.3 异步WebRequest
System.Net.WebRequest 是 .NET 为实现访问 Internet 的 “请求/响应模型” 而开发的一个 abstract 基类, 它主要有三个子类:FtpWebRequest、HttpWebRequest、FileWebRequest。当使用WebRequest.Create(string uri)创建对象时,应用程序就可以根据请求协议判断实现类来进行操作。FileWebRequest、FtpWebRequest、HttpWebRequest 各有其作用:FileWebRequest 使用 “file://路径” 的URI方式实现对本地资源和内部文件的请求/响应、FtpWebRequest 使用FTP文件传输协议实现文件请求/响应、HttpWebRequest 用于处理HTTP的页面请求/响应。由于使用方法相类似,下面就以常用的HttpWebRequest为例子介绍一下异步WebRequest的使用方法。
在使用ASP.NET开发网站的时候,往往会忽略了HttpWebRequest的使用,因为开发都假设客户端是使用浏览器等工具去阅读页面的。但如果你对REST开发方式有所了解,那对 HttpWebRequest 就应该非常熟悉。它可以在路径参数、头文件、页面主体、Cookie 等多处地方加入请求条件,然后对回复数据进行适当处理。HttpWebRequest 包含有以下几个常用方法用于处理请求/响应:
public override Stream GetRequestStream ()
public override WebResponse GetResponse ()
public override IAsyncResult BeginGetRequestStream ( AsyncCallback callback, Object state )
public override Stream EndGetRequestStream ( IAsyncResult asyncResult )
public override IAsyncResult BeginGetResponse ( AsyncCallback callback, Object state )
public override WebResponse EndGetResponse ( IAsyncResult asyncResult )
其中BeginGetRequestStream、EndGetRequestStream 用于异步向HttpWebRequest对象写入请求信息; BeginGetResponse、EndGetResponse 用于异步发送页面请求并获取返回信息。使用异步方式操作Internet的“请求/响应”,避免主线程长期处于等待状态,而操作期间异步线程是来自CLR线程池的I/O线程。
下面以简单的例子介绍一下异步请求的用法。
首先为Person类加上可序列化特性,在服务器端建立Hanlder.ashx,通过Request.InputStream 获取到请求数据并把数据转化为String对象,此实例中数据是以 “Id:1” 的形式实现传送的。然后根据Id查找对应的Person对象,并把Person对象写入Response.OutStream 中返还到客户端。
在客户端先把 HttpWebRequird.Method 设置为 "post",使用异步方式通过BeginGetRequireStream获取请求数据流,然后写入请求数据 “Id:1”。再使用异步方法BeginGetResponse 获取回复数据,最后把数据反序列化为Person对象显示出来。
注意:HttpWebRequire.Method默认为get,在写入请求前必须把HttpWebRequire.Method设置为post,否则在使用BeginGetRequireStream 获取请求数据流的时候,系统就会发出 “无法发送具有此谓词类型的内容正文" 的异常。
Model
1 namespace Model
2 {
3 [Serializable]
4 public class Person
5 {
6 public int ID
7 {
8 get;
9 set;
10 }
11 public string Name
12 {
13 get;
14 set;
15 }
16 public int Age
17 {
18 get;
19 set;
20 }
21 }
22 }
服务器端
1 public class Handler : IHttpHandler {
2
3 public void ProcessRequest(HttpContext context)
4 {
5 //把信息转换为String,找出输入条件Id
6 byte[] bytes=new byte[1024];
7 int length=context.Request.InputStream.Read(bytes,0,1024);
8 string condition = Encoding.Default.GetString(bytes);
9 int id = int.Parse(condition.Split(new string[] { ":" },
10 StringSplitOptions.RemoveEmptyEntries)[1]);
11
12 //根据Id查找对应Person对象
13 var person = GetPersonList().Where(x => x.ID == id).First();
14
15 //所Person格式化为二进制数据写入OutputStream
16 BinaryFormatter formatter = new BinaryFormatter();
17 formatter.Serialize(context.Response.OutputStream, person);
18 }
19
20 //模拟源数据
21 private IList<Person> GetPersonList()
22 {
23 var personList = new List<Person>();
24
25 var person1 = new Person();
26 person1.ID = 1;
27 person1.Name = "Leslie";
28 person1.Age = 30;
29 personList.Add(person1);
30 ...........
31 return personList;
32 }
33
34 public bool IsReusable
35 {
36 get { return true;}
37 }
38 }
客户端
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 ThreadPool.SetMaxThreads(1000, 1000);
6 Request();
7 Console.ReadKey();
8 }
9
10 static void Request()
11 {
12 ThreadPoolMessage("Start");
13 //使用WebRequest.Create方法建立HttpWebRequest对象
14 HttpWebRequest webRequest = (HttpWebRequest)WebRequest.Create(
15 "http://localhost:5700/Handler.ashx");
16 webRequest.Method = "post";
17
18 //对写入数据的RequestStream对象进行异步请求
19 IAsyncResult result=webRequest.BeginGetRequestStream(
20 new AsyncCallback(EndGetRequestStream),webRequest);
21 }
22
23 static void EndGetRequestStream(IAsyncResult result)
24 {
25 ThreadPoolMessage("RequestStream Complete");
26 //获取RequestStream
27 HttpWebRequest webRequest = (HttpWebRequest)result.AsyncState;
28 Stream stream=webRequest.EndGetRequestStream(result);
29
30 //写入请求条件
31 byte[] condition = Encoding.Default.GetBytes("Id:1");
32 stream.Write(condition, 0, condition.Length);
33
34 //异步接收回传信息
35 IAsyncResult responseResult = webRequest.BeginGetResponse(
36 new AsyncCallback(EndGetResponse), webRequest);
37 }
38
39 static void EndGetResponse(IAsyncResult result)
40 {
41 //显出线程池现状
42 ThreadPoolMessage("GetResponse Complete");
43
44 //结束异步请求,获取结果
45 HttpWebRequest webRequest = (HttpWebRequest)result.AsyncState;
46 WebResponse webResponse = webRequest.EndGetResponse(result);
47
48 //把输出结果转化为Person对象
49 Stream stream = webResponse.GetResponseStream();
50 BinaryFormatter formatter = new BinaryFormatter();
51 var person=(Person)formatter.Deserialize(stream);
52 Console.WriteLine(string.Format("Person Id:{0} Name:{1} Age:{2}",
53 person.ID, person.Name, person.Age));
54 }
55
56 //显示线程池现状
57 static void ThreadPoolMessage(string data)
58 {
59 int a, b;
60 ThreadPool.GetAvailableThreads(out a, out b);
61 string message = string.Format("{0} CurrentThreadId is {1} " +
62 "WorkerThreads is:{2} CompletionPortThreads is :{3} ",
63 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
64
65 Console.WriteLine(message);
66 }
67 }
从运行结果可以看到,BeginGetRequireStream、BeginGetResponse方法是使用CLR线程池的I/O线程。
5.4 异步调用WebService
相比TCP/IP套接字,在使用WebService的时候,服务器端需要更复杂的操作处理,使用时间往往会更长。为了避免客户端长期处于等待状态,在配置服务引用时选择 “生成异步操作”,系统可以自动建立异步调用的方式。
以.NET 2.0以前,系统都是使用ASMX来设计WebService,而近年来WCF可说是火热登场,下面就以WCF为例子简单介绍一下异步调用WebService的例子。
由于系统可以自动生成异步方法,使用起来非常简单,首先在服务器端建立服务ExampleService,里面包含方法Method。客户端引用此服务时,选择 “生成异步操作”。然后使用 BeginMethod 启动异步方法, 在回调函数中调用EndMethod结束异步调用。
服务端
1 [ServiceContract]
2 public interface IExampleService
3 {
4 [OperationContract]
5 string Method(string name);
6 }
7
8 public class ExampleService : IExampleService
9 {
10 public string Method(string name)
11 {
12 return "Hello " + name;
13 }
14 }
15
16 class Program
17 {
18 static void Main(string[] args)
19 {
20 ServiceHost host = new ServiceHost(typeof(ExampleService));
21 host.Open();
22 Console.ReadKey();
23 host.Close();
24 }
25 }
26
27 <configuration>
28 <system.serviceModel>
29 <services>
30 <service name="Example.ExampleService">
31 <endpoint address="" binding="wsHttpBinding" contract="Example.IExampleService">
32 <identity>
33 <dns value="localhost" />
34 </identity>
35 </endpoint>
36 <endpoint address="mex" binding="mexHttpBinding" contract="IMetadataExchange" />
37 <host>
38 <baseAddresses>
39 <add baseAddress="http://localhost:7200/Example/ExampleService/" />
40 </baseAddresses>
41 </host>
42 </service>
43 </services>
44 </system.serviceModel>
45 </configuration>
客户端
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //设置最大线程数
6 ThreadPool.SetMaxThreads(1000, 1000);
7 ThreadPoolMessage("Start");
8
9 //建立服务对象,异步调用服务方法
10 ExampleServiceReference.ExampleServiceClient exampleService = new
11 ExampleServiceReference.ExampleServiceClient();
12 exampleService.BeginMethod("Leslie",new AsyncCallback(AsyncCallbackMethod),
13 exampleService);
14 Console.ReadKey();
15 }
16
17 static void AsyncCallbackMethod(IAsyncResult result)
18 {
19 Thread.Sleep(1000);
20 ThreadPoolMessage("Complete");
21 ExampleServiceReference.ExampleServiceClient example =
22 (ExampleServiceReference.ExampleServiceClient)result.AsyncState;
23 string data=example.EndMethod(result);
24 Console.WriteLine(data);
25 }
26
27 //显示线程池现状
28 static void ThreadPoolMessage(string data)
29 {
30 int a, b;
31 ThreadPool.GetAvailableThreads(out a, out b);
32 string message = string.Format("{0} CurrentThreadId is {1} " +
33 "WorkerThreads is:{2} CompletionPortThreads is :{3} ",
34 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
35
36 Console.WriteLine(message);
37 }
38 }
39
40 <configuration>
41 <system.serviceModel>
42 <bindings>
43 <wsHttpBinding>
44 <binding name="WSHttpBinding_IExampleService" closeTimeout="00:01:00"
45 openTimeout="00:01:00" receiveTimeout="00:10:00" sendTimeout="00:01:00"
46 bypassProxyOnLocal="false" transactionFlow="false"
47 hostNameComparisonMode="StrongWildcard" maxBufferPoolSize="524288"
48 maxReceivedMessageSize="65536" messageEncoding="Text" textEncoding="utf-8"
49 useDefaultWebProxy="true" allowCookies="false">
50 <readerQuotas maxDepth="32" maxStringContentLength="8192" maxArrayLength="16384"
51 maxBytesPerRead="4096" maxNameTableCharCount="16384" />
52 <reliableSession ordered="true" inactivityTimeout="00:10:00" enabled="false" />
53 <security mode="Message">
54 <transport clientCredentialType="Windows" proxyCredentialType="None"
55 realm="" />
56 <message clientCredentialType="Windows" negotiateServiceCredential="true"
57 algorithmSuite="Default" />
58 </security>
59 </binding>
60 </wsHttpBinding>
61 </bindings>
62 <client>
63 <endpoint address="http://localhost:7200/Example/ExampleService/"
64 binding="wsHttpBinding" bindingConfiguration="WSHttpBinding_IExampleService"
65 contract="ExampleServiceReference.IExampleService"
66 name="WSHttpBinding_IExampleService">
67 <identity>
68 <dns value="localhost" />
69 </identity>
70 </endpoint>
71 </client>
72 </system.serviceModel>
73 </configuration>
注意观察运行结果,异步调用服务时,回调函数都是运行于CLR线程池的I/O线程当中。
六、异步 SqlCommand
从ADO.NET 2.0开始,SqlCommand就新增了几个异步方法执行SQL命令。相对于同步执行方式,它使主线程不需要等待数据库的返回结果,在使用复杂性查询或批量插入时将有效提高主线程的效率。使用异步SqlCommand的时候,请注意把ConnectionString 的 Asynchronous Processing 设置为 true 。
注意:SqlCommand异步操作的特别之处在于线程并不依赖于CLR线程池,而是由Windows内部提供,这比使用异步委托更有效率。但如果需要使用回调函数的时候,回调函数的线程依然是来自于CLR线程池的工作者线程。
SqlCommand有以下几个方法支持异步操作:
public IAsyncResult BeginExecuteNonQuery (......)
public int EndExecuteNonQuery(IAsyncResult)
public IAsyncResult BeginExecuteReader(......)
public SqlDataReader EndExecuteReader(IAsyncResult)
public IAsyncResult BeginExecuteXmlReader (......)
public XmlReader EndExecuteXmlReader(IAsyncResult)
由于使用方式相似,此处就以 BeginExecuteNonQuery 为例子,介绍一下异步SqlCommand的使用。首先建立connectionString,注意把Asynchronous Processing设置为true来启动异步命令,然后把SqlCommand.CommandText设置为 WAITFOR DELAY "0:0:3" 来虚拟数据库操作。再通过BeginExecuteNonQuery启动异步操作,利用轮询方式监测操作情况。最后在操作完成后使用EndExecuteNonQuery完成异步操作。
1 class Program
2 {
3 //把Asynchronous Processing设置为true
4 static string connectionString = "Data Source=LESLIE-PC;Initial Catalog=Business;“+
5 "Integrated Security=True;Asynchronous Processing=true";
6
7 static void Main(string[] args)
8 {
9 //把CLR线程池最大线程数设置为1000
10 ThreadPool.SetMaxThreads(1000, 1000);
11 ThreadPoolMessage("Start");
12
13 //使用WAITFOR DELAY命令来虚拟操作
14 SqlConnection connection = new SqlConnection(connectionString);
15 SqlCommand command = new SqlCommand("WAITFOR DELAY '0:0:3';", connection);
16 connection.Open();
17
18 //启动异步SqlCommand操作,利用轮询方式监测操作
19 IAsyncResult result = command.BeginExecuteNonQuery();
20 ThreadPoolMessage("BeginRead");
21 while (!result.AsyncWaitHandle.WaitOne(500))
22 Console.WriteLine("Main thread do work........");
23
24 //结束异步SqlCommand
25 int count= command.EndExecuteNonQuery(result);
26 ThreadPoolMessage(" Completed");
27 Console.ReadKey();
28 }
29
30 //显示线程池现状
31 static void ThreadPoolMessage(string data)
32 {
33 int a, b;
34 ThreadPool.GetAvailableThreads(out a, out b);
35 string message = string.Format("{0} CurrentThreadId is {1} "+
36 "WorkerThreads is:{2} CompletionPortThreads is :{3} ",
37 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
38 Console.WriteLine(message);
39 }
40 }
注意运行结果,SqlCommand的异步执行线程并不属于CLR线程池。
如果觉得使用轮询方式过于麻烦,可以使用回调函数,但要注意当调用回调函数时,线程是来自于CLR线程池的工作者线程。
1 class Program
2 {
3 //把Asynchronous Processing设置为true
4 static string connectionString = "Data Source=LESLIE-PC;Initial Catalog=Business;”+
5 “Integrated Security=True;Asynchronous Processing=true";
6 static void Main(string[] args)
7 {
8 //把CLR线程池最大线程数设置为1000
9 ThreadPool.SetMaxThreads(1000, 1000);
10 ThreadPoolMessage("Start");
11
12 //使用WAITFOR DELAY命令来虚拟操作
13 SqlConnection connection = new SqlConnection(connectionString);
14 SqlCommand command = new SqlCommand("WAITFOR DELAY '0:0:3';", connection);
15 connection.Open();
16
17 //启动异步SqlCommand操作,并把SqlCommand对象传递到回调函数
18 IAsyncResult result = command.BeginExecuteNonQuery(
19 new AsyncCallback(AsyncCallbackMethod),command);
20 Console.ReadKey();
21 }
22
23 static void AsyncCallbackMethod(IAsyncResult result)
24 {
25 Thread.Sleep(200);
26 ThreadPoolMessage("AsyncCallback");
27 SqlCommand command = (SqlCommand)result.AsyncState;
28 int count=command.EndExecuteNonQuery(result);
29 command.Connection.Close();
30 }
31
32 //显示线程池现状
33 static void ThreadPoolMessage(string data)
34 {
35 int a, b;
36 ThreadPool.GetAvailableThreads(out a, out b);
37 string message = string.Format("{0} CurrentThreadId is {1} "+
38 "WorkerThreads is:{2} CompletionPortThreads is :{3} ",
39 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
40
41 Console.WriteLine(message);
42 }
43 }
运行结果:
七、并行编程与PLINQ
要使用多线程开发,必须非常熟悉Thread的使用,而且在开发过程中可能会面对很多未知的问题。为了简化开发,.NET 4.0 特别提供一个并行编程库System.Threading.Tasks,它可以简化并行开发,你无需直接跟线程或线程池打交道,就可以简单建立多线程应用程序。此外,.NET还提供了新的一组扩展方法PLINQ,它具有自动分析查询功能,如果并行查询能提高系统效率,则同时运行,如果查询未能从并行查询中受益,则按原顺序查询。下面将详细介绍并行操作的方式。
7.1 泛型委托
使用并行编程可以同时操作多个委托,在介绍并行编程前先简单介绍一下两个泛型委托System.Func<>与System.Action<>。
Func<>是一个能接受多个参数和一个返回值的泛型委托,它能接受0个到16个输入参数, 其中 T1,T2,T3,T4......T16 代表自定的输入类型,TResult为自定义的返回值。
public delegate TResult Func<TResult>()
public delegate TResult Func<T1,TResult>(T1 arg1)
public delegate TResult Func<T1,T2, TResult>(T1 arg1,T2 arg2)
public delegate TResult Func<T1,T2, T3, TResult>(T1 arg1,T2 arg2,T3 arg3)
public delegate TResult Func<T1,T2, T3, ,T4, TResult>(T1 arg1,T2 arg2,T3 arg3,T4 arg4)
..............
public delegate TResult Func<T1,T2, T3, ,T4, ...... ,T16,TResult>(T1 arg1,T2 arg2,T3 arg3,T4 arg4,...... ,T16 arg16)
Action<>与Func<>十分相似,不同在于Action<>的返回值为void,Action能接受0~16个参数
public delegate void Action<T1>()
public delegate void Action<T1,T2>(T1 arg1,T2 arg2)
public delegate void Action<T1,T2, T3>(T1 arg1,T2 arg2, T3 arg3)
.............
public delegate void Action<T1,T2, T3, ,T4, ...... ,T16>(T1 arg1,T2 arg2,T3 arg3,T4 arg4,...... ,T16 arg16)
7.2 任务并行库(TPL)
System.Threading.Tasks中的类被统称为任务并行库(Task Parallel Library,TPL),TPL使用CLR线程池把工作分配到CPU,并能自动处理工作分区、线程调度、取消支持、状态管理以及其他低级别的细节操作,极大地简化了多线程的开发。
TPL包括常用的数据并行与任务并行两种执行方式:
7.2.1 数据并行
数据并行的核心类就是System.Threading.Tasks.Parallel,它包含两个静态方法 Parallel.For 与 Parallel.ForEach, 使用方式与for、foreach相仿。通过这两个方法可以并行处理System.Func<>、System.Action<>委托。
以下一个例子就是利用 public static ParallelLoopResult For( int from, int max, Action<int>) 方法对List<Person>进行并行查询。
假设使用单线程方式查询3个Person对象,需要用时大约6秒,在使用并行方式,只需使用2秒就能完成查询,而且能够避开Thread的繁琐处理。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //设置最大线程数
6 ThreadPool.SetMaxThreads(1000, 1000);
7 //并行查询
8 Parallel.For(0, 3,n =>
9 {
10 Thread.Sleep(2000); //模拟查询
11 ThreadPoolMessage(GetPersonList()[n]);
12 });
13 Console.ReadKey();
14 }
15
16 //模拟源数据
17 static IList<Person> GetPersonList()
18 {
19 var personList = new List<Person>();
20
21 var person1 = new Person();
22 person1.ID = 1;
23 person1.Name = "Leslie";
24 person1.Age = 30;
25 personList.Add(person1);
26 ...........
27 return personList;
28 }
29
30 //显示线程池现状
31 static void ThreadPoolMessage(Person person)
32 {
33 int a, b;
34 ThreadPool.GetAvailableThreads(out a, out b);
35 string message = string.Format("Person ID:{0} Name:{1} Age:{2} " +
36 " CurrentThreadId is {3} WorkerThreads is:{4}" +
37 " CompletionPortThreads is :{5} ",
38 person.ID, person.Name, person.Age,
39 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
40
41 Console.WriteLine(message);
42 }
43 }
观察运行结果,对象并非按照原排列顺序进行查询,而是使用并行方式查询。
若想停止操作,可以利用ParallelLoopState参数,下面以ForEach作为例子。
public static ParallelLoopResult ForEach<TSource>( IEnumerable<TSource> source, Action<TSource, ParallelLoopState> action)
其中source为数据集,在Action<TSource,ParallelLoopState>委托的ParallelLoopState参数当中包含有Break()和 Stop()两个方法都可以使迭代停止。Break的使用跟传统for里面的使用方式相似,但因为处于并行处理当中,使用Break并不能保证所有运行能立即停止,在当前迭代之前的迭代会继续执行。若想立即停止操作,可以使用Stop方法,它能保证立即终止所有的操作,无论它们是处于当前迭代的之前还是之后。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //设置最大线程数
6 ThreadPool.SetMaxThreads(1000, 1000);
7
8 //并行查询
9 Parallel.ForEach(GetPersonList(), (person, state) =>
10 {
11 if (person.ID == 2)
12 state.Stop();
13 ThreadPoolMessage(person);
14 });
15 Console.ReadKey();
16 }
17
18 //模拟源数据
19 static IList<Person> GetPersonList()
20 {
21 var personList = new List<Person>();
22
23 var person1 = new Person();
24 person1.ID = 1;
25 person1.Name = "Leslie";
26 person1.Age = 30;
27 personList.Add(person1);
28 ..........
29 return personList;
30 }
31
32 //显示线程池现状
33 static void ThreadPoolMessage(Person person)
34 {
35 int a, b;
36 ThreadPool.GetAvailableThreads(out a, out b);
37 string message = string.Format("Person ID:{0} Name:{1} Age:{2} " +
38 " CurrentThreadId is {3} WorkerThreads is:{4}" +
39 " CompletionPortThreads is :{5} ",
40 person.ID, person.Name, person.Age,
41 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
42
43 Console.WriteLine(message);
44 }
45 }
观察运行结果,当Person的ID等于2时,运行将会停止。
当要在多个线程中调用本地变量,可以使用以下方法:
public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<Of TSource>, Func<Of TLocal>, Func<Of TSource,ParallelLoopState,TLocal,TLocal>, Action<Of TLocal>)
其中第一个参数为数据集;
第二个参数是一个Func委托,用于在每个线程执行前进行初始化;
第 三个参数是委托Func<Of T1,T2,T3,TResult>,它能对数据集的每个成员进行迭代,当中T1是数据集的成员,T2是一个ParallelLoopState对 象,它可以控制迭代的状态,T3是线程中的本地变量;
第四个参数是一个Action委托,用于对每个线程的最终状态进行最终操作。
在以下例子中,使用ForEach计算多个Order的总体价格。在ForEach方法中,首先把参数初始化为0f,然后用把同一个Order的多个OrderItem价格进行累加,计算出Order的价格,最后把多个Order的价格进行累加,计算出多个Order的总体价格。
1 public class Order
2 {
3 public int ID;
4 public float Price;
5 }
6
7 public class OrderItem
8 {
9 public int ID;
10 public string Goods;
11 public int OrderID;
12 public float Price;
13 public int Count;
14 }
15
16 class Program
17 {
18 static void Main(string[] args)
19 {
20 //设置最大线程数
21 ThreadPool.SetMaxThreads(1000, 1000);
22 float totalPrice = 0f;
23 //并行查询
24 var parallelResult = Parallel.ForEach(GetOrderList(),
25 () => 0f, //把参数初始值设为0
26 (order, state, orderPrice) =>
27 {
28 //计算单个Order的价格
29 orderPrice = GetOrderItem().Where(item => item.OrderID == order.ID)
30 .Sum(item => item.Price * item.Count);
31 order.Price = orderPrice;
32 ThreadPoolMessage(order);
33
34 return orderPrice;
35 },
36 (finallyPrice) =>
37 {
38 totalPrice += finallyPrice;//计算多个Order的总体价格
39 }
40 );
41
42 while (!parallelResult.IsCompleted)
43 Console.WriteLine("Doing Work!");
44
45 Console.WriteLine("Total Price is:" + totalPrice);
46 Console.ReadKey();
47 }
48 //虚拟数据
49 static IList<Order> GetOrderList()
50 {
51 IList<Order> orderList = new List<Order>();
52 Order order1 = new Order();
53 order1.ID = 1;
54 orderList.Add(order1);
55 ............
56 return orderList;
57 }
58 //虚拟数据
59 static IList<OrderItem> GetOrderItem()
60 {
61 IList<OrderItem> itemList = new List<OrderItem>();
62
63 OrderItem orderItem1 = new OrderItem();
64 orderItem1.ID = 1;
65 orderItem1.Goods = "iPhone 4S";
66 orderItem1.Price = 6700;
67 orderItem1.Count = 2;
68 orderItem1.OrderID = 1;
69 itemList.Add(orderItem1);
70 ...........
71 return itemList;
72 }
73
74 //显示线程池现状
75 static void ThreadPoolMessage(Order order)
76 {
77 int a, b;
78 ThreadPool.GetAvailableThreads(out a, out b);
79 string message = string.Format("OrderID:{0} OrderPrice:{1} " +
80 " CurrentThreadId is {2} WorkerThreads is:{3}" +
81 " CompletionPortThreads is:{4} ",
82 order.ID, order.Price,
83 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
84
85 Console.WriteLine(message);
86 }
87 }
运行结果
7.2.2 任务并行
在TPL当中还可以使用Parallel.Invoke方法触发多个异步任务,其中 actions 中可以包含多个方法或者委托,parallelOptions用于配置Parallel类的操作。
public static void Invoke(Action[] actions )
public static void Invoke(ParallelOptions parallelOptions, Action[] actions )
下面例子中利用了Parallet.Invoke并行查询多个Person,actions当中可以绑定方法、lambda表达式或者委托,注意绑定方法时必须是返回值为void的无参数方法。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //设置最大线程数
6 ThreadPool.SetMaxThreads(1000, 1000);
7
8 //任务并行
9 Parallel.Invoke(option,
10 PersonMessage,
11 ()=>ThreadPoolMessage(GetPersonList()[1]),
12 delegate(){
13 ThreadPoolMessage(GetPersonList()[2]);
14 });
15 Console.ReadKey();
16 }
17
18 static void PersonMessage()
19 {
20 ThreadPoolMessage(GetPersonList()[0]);
21 }
22
23 //显示线程池现状
24 static void ThreadPoolMessage(Person person)
25 {
26 int a, b;
27 ThreadPool.GetAvailableThreads(out a, out b);
28 string message = string.Format("Person ID:{0} Name:{1} Age:{2} " +
29 " CurrentThreadId is {3} WorkerThreads is:{4}" +
30 " CompletionPortThreads is :{5} ",
31 person.ID, person.Name, person.Age,
32 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
33
34 Console.WriteLine(message);
35 }
36
37 //模拟源数据
38 static IList<Person> GetPersonList()
39 {
40 var personList = new List<Person>();
41
42 var person1 = new Person();
43 person1.ID = 1;
44 person1.Name = "Leslie";
45 person1.Age = 30;
46 personList.Add(person1);
47 ..........
48 return personList;
49 }
50 }
运行结果
7.3 Task简介
以Thread创建的线程被默认为前台线程,当然你可以把线程IsBackground属性设置为true,但TPL为此提供了一个更简单的类Task。
Task存在于System.Threading.Tasks命名空间当中,它可以作为异步委托的简单替代品。
通过Task的Factory属性将返回TaskFactory类,以TaskFactory.StartNew(Action)方法可以创建一个新线程,所创建的线程默认为后台线程。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 ThreadPool.SetMaxThreads(1000, 1000);
6 Task.Factory.StartNew(() => ThreadPoolMessage());
7 Console.ReadKey();
8 }
9
10 //显示线程池现状
11 static void ThreadPoolMessage()
12 {
13 int a, b;
14 ThreadPool.GetAvailableThreads(out a, out b);
15 string message = string.Format("CurrentThreadId is:{0} " +
16 "CurrentThread IsBackground:{1} " +
17 "WorkerThreads is:{2} CompletionPortThreads is:{3} ",
18 Thread.CurrentThread.ManagedThreadId,
19 Thread.CurrentThread.IsBackground.ToString(),
20 a.ToString(), b.ToString());
21 Console.WriteLine(message);
22 }
23 }
运行结果
若要取消处理,可以利用CancellationTakenSource对象,在TaskFactory中包含有方法
public Task StartNew( Action action, CancellationToken cancellationToken )
在方法中加入CancellationTakenSource对象的CancellationToken属性,可以控制任务的运行,调用CancellationTakenSource.Cancel时任务就会自动停止。下面以图片下载为例子介绍一下TaskFactory的使用。
服务器端页面
1 <html xmlns="http://www.w3.org/1999/xhtml">
2 <head runat="server">
3 <title></title>
4 <script type="text/C#" runat="server">
5 private static List<string> url=new List<string>();
6
7 protected void Page_Load(object sender, EventArgs e)
8 {
9 if (!Page.IsPostBack)
10 {
11 url.Clear();
12 Application["Url"] = null;
13 }
14 }
15
16 protected void CheckBox_CheckedChanged(object sender, EventArgs e)
17 {
18 CheckBox checkBox = (CheckBox)sender;
19 if (checkBox.Checked)
20 url.Add(checkBox.Text);
21 else
22 url.Remove(checkBox.Text);
23 Application["Url"]= url;
24 }
25 </script>
26 </head>
27 <body>
28 <form id="form1" runat="server" >
29 <div align="left">
30 <div align="center" style="float: left;">
31 <asp:Image ID="Image1" runat="server" ImageUrl="~/Images/A.jpg" /><br />
32 <asp:CheckBox ID="CheckBox1" runat="server" AutoPostBack="True"
33 oncheckedchanged="CheckBox_CheckedChanged" Text="A.jpg" />
34 </div>
35 <div align="center" style="float: left">
36 <asp:Image ID="Image2" runat="server" ImageUrl="~/Images/B.jpg" /><br />
37 <asp:CheckBox ID="CheckBox2" runat="server" AutoPostBack="True"
38 oncheckedchanged="CheckBox_CheckedChanged" Text="B.jpg" />
39 </div>
40 <div align="center" style="float: left">
41 <asp:Image ID="Image3" runat="server" ImageUrl="~/Images/C.jpg" /><br />
42 <asp:CheckBox ID="CheckBox3" runat="server" AutoPostBack="True"
43 oncheckedchanged="CheckBox_CheckedChanged" Text="C.jpg" />
44 </div>
45 <div align="center" style="float: left">
46 <asp:Image ID="Image4" runat="server" ImageUrl="~/Images/D.jpg" /><br />
47 <asp:CheckBox ID="CheckBox4" runat="server" AutoPostBack="True"
48 oncheckedchanged="CheckBox_CheckedChanged" Text="D.jpg" />
49 </div>
50 <div align="center" style="float: left">
51 <asp:Image ID="Image5" runat="server" ImageUrl="~/Images/E.jpg" /><br />
52 <asp:CheckBox ID="CheckBox5" runat="server" AutoPostBack="True"
53 oncheckedchanged="CheckBox_CheckedChanged" Text="E.jpg" />
54 </div>
55 </div>
56 </form>
57 </body>
58 </html>
首先在服务器页面中显示多个*.jpg图片,每个图片都有对应的CheckBox检测其选择情况。
所选择图片的路径会记录在Application["Url"]当中传递到Handler.ashx当中。
Handler.ashx 处理图片的下载,它从 Application["Url"] 当中获取所选择图片的路径,并把图片转化成byte[]二进制数据。
再把图片的数量,每副图片的二进制数据的长度记录在OutputStream的头部。
最后把图片的二进制数据记入 OutputStream 一并输出。
1 public class Handler : IHttpHandler
2 {
3 public void ProcessRequest(HttpContext context)
4 {
5 //获取图片名,把图片数量写OutputStream
6 List<String> urlList = (List<string>)context.Application["Url"];
7 context.Response.OutputStream.Write(BitConverter.GetBytes(urlList.Count), 0, 4);
8
9 //把图片转换成二进制数据
10 List<string> imageList = GetImages(urlList);
11
12 //把每副图片长度写入OutputStream
13 foreach (string image in imageList)
14 {
15 byte[] imageByte=Convert.FromBase64String(image);
16 context.Response.OutputStream.Write(BitConverter.GetBytes(imageByte.Length),0,4);
17 }
18
19 //把图片写入OutputStream
20 foreach (string image in imageList)
21 {
22 byte[] imageByte = Convert.FromBase64String(image);
23 context.Response.OutputStream.Write(imageByte,0,imageByte.Length);
24 }
25 }
26
27 //获取多个图片的二进制数据
28 private List<string> GetImages(List<string> urlList)
29 {
30 List<string> imageList = new List<string>();
31 foreach (string url in urlList)
32 imageList.Add(GetImage(url));
33 return imageList;
34 }
35
36 //获取单副图片的二进制数据
37 private string GetImage(string url)
38 {
39 string path = "E:/My Projects/Example/WebSite/Images/"+url;
40 FileStream stream = new FileStream(path, FileMode.Open, FileAccess.Read);
41 byte[] imgBytes = new byte[10240];
42 int imgLength = stream.Read(imgBytes, 0, 10240);
43 return Convert.ToBase64String(imgBytes,0,imgLength);
44 }
45
46 public bool IsReusable
47 {
48 get{ return false;}
49 }
50 }
客户端
建立一个WinForm窗口,里面加入一个WebBrowser连接到服务器端的Default.aspx页面。
当按下Download按键时,系统就会利用TaskFactory.StartNew的方法建立异步线程,使用WebRequest方法向Handler.ashx发送请求。
接收到回传流时,就会根据头文件的内容判断图片的数量与每副图片的长度,把二进制数据转化为*.jpg文件保存。
系统利用TaskFactory.StartNew(action,cancellationToken) 方式异步调用GetImages方法进行图片下载。
当用户按下Cancel按钮时,异步任务就会停止。值得注意的是,在图片下载时调用了CancellationToken.ThrowIfCancellationRequested方法,目的在检查并行任务的运行情况,在并行任务被停止时释放出OperationCanceledException异常,确保用户按下Cancel按钮时,停止所有并行任务。
1 public partial class Form1 : Form
2 {
3 private CancellationTokenSource tokenSource = new CancellationTokenSource();
4
5 public Form1()
6 {
7 InitializeComponent();
8 ThreadPool.SetMaxThreads(1000, 1000);
9 }
10
11 private void downloadToolStripMenuItem_Click(object sender, EventArgs e)
12 {
13 Task.Factory.StartNew(GetImages,tokenSource.Token);
14 }
15
16 private void cancelToolStripMenuItem_Click(object sender, EventArgs e)
17 {
18 tokenSource.Cancel();
19 }
20
21 private void GetImages()
22 {
23 //发送请求,获取输出流
24 WebRequest webRequest = HttpWebRequest.Create("Http://localhost:5800/Handler.ashx");
25 Stream responseStream=webRequest.GetResponse().GetResponseStream();
26
27 byte[] responseByte = new byte[81960];
28 IAsyncResult result=responseStream.BeginRead(responseByte,0,81960,null,null);
29 int responseLength = responseStream.EndRead(result);
30
31 //获取图片数量
32 int imageCount = BitConverter.ToInt32(responseByte, 0);
33
34 //获取每副图片的长度
35 int[] lengths = new int[imageCount];
36 for (int n = 0; n < imageCount; n++)
37 {
38 int length = BitConverter.ToInt32(responseByte, (n + 1) * 4);
39 lengths[n] = length;
40 }
41 try
42 {
43 //保存图片
44 for (int n = 0; n < imageCount; n++)
45 {
46 string path = string.Format("E:/My Projects/Example/Test/Images/pic{0}.jpg", n);
47 FileStream file = new FileStream(path, FileMode.Create, FileAccess.ReadWrite);
48
49 //计算字节偏移量
50 int offset = (imageCount + 1) * 4;
51 for (int a = 0; a < n; a++)
52 offset += lengths[a];
53
54 file.Write(responseByte, offset, lengths[n]);
55 file.Flush();
56
57 //模拟操作
58 Thread.Sleep(1000);
59
60 //检测CancellationToken变化
61 tokenSource.Token.ThrowIfCancellationRequested();
62 }
63 }
64 catch (OperationCanceledException ex)
65 {
66 MessageBox.Show("Download cancel!");
67 }
68 }
69 }
7.4 并行查询(PLINQ)
并行 LINQ (PLINQ) 是 LINQ 模式的并行实现,主要区别在于 PLINQ 尝试充分利用系统中的所有处理器。 它利用所有处理器的方法,把数据源分成片段,然后在多个处理器上对单独工作线程上的每个片段并行执行查询, 在许多情况下,并行执行意味着查询运行速度显著提高。但这并不说明所有PLINQ都会使用并行方式,当系统测试要并行查询会对系统性能造成损害时,那将自动化地使用同步执行。
在System.Linq.ParallelEnumerable类中,包含了并行查询的大部分方法。
方法成员 |
说明 |
AsParallel |
PLINQ 的入口点。 指定如果可能,应并行化查询的其余部分。 |
AsSequential(Of TSource) |
指定查询的其余部分应像非并行 LINQ 查询一样按顺序运行。 |
AsOrdered |
指定 PLINQ 应保留查询的其余部分的源序列排序,直到例如通过使用 orderby(在 Visual Basic 中为 Order By)子句更改排序为止。 |
AsUnordered(Of TSource) |
指定查询的其余部分的 PLINQ 不需要保留源序列的排序。 |
WithCancellation(Of TSource) |
指定 PLINQ 应定期监视请求取消时提供的取消标记和取消执行的状态。 |
WithDegreeOfParallelism(Of TSource) |
指定 PLINQ 应当用来并行化查询的处理器的最大数目。 |
WithMergeOptions(Of TSource) |
提供有关 PLINQ 应当如何(如果可能)将并行结果合并回到使用线程上的一个序列的提示。 |
WithExecutionMode(Of TSource) |
指定 PLINQ 应当如何并行化查询(即使默认行为是按顺序运行查询)。 |
ForAll(Of TSource) |
多线程枚举方法,与循环访问查询结果不同,它允许在不首先合并回到使用者线程的情况下并行处理结果。 |
Aggregate 重载 |
对于 PLINQ 唯一的重载,它启用对线程本地分区的中间聚合以及一个用于合并所有分区结果的最终聚合函数。 |
7.4.1 AsParallel
通常想要实现并行查询,只需向数据源添加 AsParallel 查询操作即可。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 var personList=GetPersonList().AsParallel()
6 .Where(x=>x.Age>30);
7 Console.ReadKey();
8 }
9
10 //模拟源数据
11 static IList<Person> GetPersonList()
12 {
13 var personList = new List<Person>();
14
15 var person1 = new Person();
16 person1.ID = 1;
17 person1.Name = "Leslie";
18 person1.Age = 30;
19 personList.Add(person1);
20 ...........
21 return personList;
22 }
23 }
7.4.2 AsOrdered
若要使查询结果必须保留源序列排序方式,可以使用AsOrdered方法。
AsOrdered依然使用并行方式,只是在查询过程加入额外信息,在并行结束后把查询结果再次进行排列。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 var personList=GetPersonList().AsParallel().AsOrdered()
6 .Where(x=>x.Age<30);
7 Console.ReadKey();
8 }
9
10 static IList<Person> GetPersonList()
11 {......}
12 }
7.4.3 WithDegreeOfParallelism
默认情况下,PLINQ 使用主机上的所有处理器,这些处理器的数量最多可达 64 个。
通过使用 WithDegreeOfParallelism(Of TSource) 方法,可以指示 PLINQ 使用不多于指定数量的处理器。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 var personList=GetPersonList().AsParallel().WithDegreeOfParallelism(2)
6 .Where(x=>x.Age<30);
7 Console.ReadKey();
8 }
9
10 static IList<Person> GetPersonList()
11 {.........}
12 }
7.4.4 ForAll
如果要对并行查询结果进行操作,一般会在for或foreach中执行,执行枚举操作时会使用同步方式。
有见及此,PLINQ中包含了ForAll方法,它可以使用并行方式对数据集进行操作。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 ThreadPool.SetMaxThreads(1000, 1000);
6 GetPersonList().AsParallel().ForAll(person =>{
7 ThreadPoolMessage(person);
8 });
9 Console.ReadKey();
10 }
11
12 static IList<Person> GetPersonList()
13 {.......}
14
15 //显示线程池现状
16 static void ThreadPoolMessage(Person person)
17 {
18 int a, b;
19 ThreadPool.GetAvailableThreads(out a, out b);
20 string message = string.Format("Person ID:{0} Name:{1} Age:{2} " +
21 " CurrentThreadId is {3} WorkerThreads is:{4}" +
22 " CompletionPortThreads is :{5} ",
23 person.ID, person.Name, person.Age,
24 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
25 Console.WriteLine(message);
26 }
27 }
运行结果
7.4.5 WithCancellation
如果需要停止查询,可以使用 WithCancellation(Of TSource) 运算符并提供 CancellationToken 实例作为参数。
与第三节Task的例子相似,如果标记上的 IsCancellationRequested 属性设置为 true,则 PLINQ 将会注意到它,并停止所有线程上的处理,然后引发 OperationCanceledException。这可以保证并行查询能够立即停止。
1 class Program
2 {
3 static CancellationTokenSource tokenSource = new CancellationTokenSource();
4
5 static void Main(string[] args)
6 {
7 Task.Factory.StartNew(Cancel);
8 try
9 {
10 GetPersonList().AsParallel().WithCancellation(tokenSource.Token)
11 .ForAll(person =>
12 {
13 ThreadPoolMessage(person);
14 });
15 }
16 catch (OperationCanceledException ex)
17 { }
18 Console.ReadKey();
19 }
20
21 //在10~50毫秒内发出停止信号
22 static void Cancel()
23 {
24 Random random = new Random();
25 Thread.Sleep(random.Next(10,50));
26 tokenSource.Cancel();
27 }
28
29 static IList<Person> GetPersonList()
30 {......}
31
32 //显示线程池现状
33 static void ThreadPoolMessage(Person person)
34 {
35 int a, b;
36 ThreadPool.GetAvailableThreads(out a, out b);
37 string message = string.Format("Person ID:{0} Name:{1} Age:{2} " +
38 " CurrentThreadId is {3} WorkerThreads is:{4}" +
39 " CompletionPortThreads is :{5} ",
40 person.ID, person.Name, person.Age,
41 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
42 Console.WriteLine(message);
43 }
44 }
45
八、定时器与锁
8.1定时器
若要长期定时进行一些工作,比如像邮箱更新,实时收听信息等等,可以利用定时器Timer进行操作。
在System.Threading命名空间中存在Timer类与对应的TimerCallback委托,它可以在后台线程中执行一些长期的定时操作,使主线程不受干扰。
Timer类中最常用的构造函数为 public Timer( timerCallback , object , int , int )
timerCallback委托可以绑定执行方法,执行方法必须返回void,它可以是无参数方法,也可以带一个object参数的方法。
第二个参数是为 timerCallback 委托输入的参数对象。
第三个参数是开始执行前等待的时间。
第四个参数是每次执行之间的等待时间。
开发实例
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 ThreadPool.SetMaxThreads(1000, 1000);
6
7 TimerCallback callback = new TimerCallback(ThreadPoolMessage);
8 Timer t = new Timer(callback,"Hello Jack! ", 0, 1000);
9 Console.ReadKey();
10 }
11
12 //显示线程池现状
13 static void ThreadPoolMessage(object data)
14 {
15 int a, b;
16 ThreadPool.GetAvailableThreads(out a, out b);
17 string message = string.Format("{0} CurrentThreadId is:{1} " +
18 " CurrentThread IsBackground:{2} " +
19 " WorkerThreads is:{3} CompletionPortThreads is:{4} ",
20 data + "Time now is " + DateTime.Now.ToLongTimeString(),
21 Thread.CurrentThread.ManagedThreadId,
22 Thread.CurrentThread.IsBackground.ToString(),
23 a.ToString(), b.ToString());
24 Console.WriteLine(message);
25 }
26 }
注意观察运行结果,每次调用Timer绑定的方法时不一定是使用同一线程,但线程都会是来自工作者线程的后台线程。
8.2 锁
在使用多线程开发时,存在一定的共用数据,为了避免多线程同时操作同一数据,.NET提供了lock、Monitor、Interlocked等多个锁定数据的方式。
8.2.1 lock
lock的使用比较简单,如果需要锁定某个对象时,可以直接使用lock(this)的方式。
1 private void Method()
2 {
3 lock(this)
4 {
5 //在此进行的操作能保证在同一时间内只有一个线程对此对象操作
6 }
7 }
如果操作只锁定某段代码,可以事先建立一个object对象,并对此对象进行操作锁定,这也是.net提倡的锁定用法。
1 class Control
2 {
3 private object obj=new object();
4
5 public void Method()
6 {
7 lock(obj)
8 {.......}
9 }
10 }
8.2.2 Montior
Montior存在于System.Thread命名空间内,相比lock,Montior使用更灵活。
它存在 Enter, Exit 两个方法,它可以对对象进行锁定与解锁,比lock使用更灵活。
1 class Control
2 {
3 private object obj=new object();
4
5 public void Method()
6 {
7 Monitor.Enter(obj);
8 try
9 {......}
10 catch(Excetion ex)
11 {......}
12 finally
13 {
14 Monitor.Exit(obj);
15 }
16 }
17 }
18
使用try的方式,能确保程序不会因死锁而释放出异常!
而且在finally中释放obj对象能够确保无论是否出现死锁状态,系统都会释放obj对象。
而且Monitor中还存在Wait方法可以让线程等待一段时间,然后在完成时使用Pulse、PulseAll等方法通知等待线程。
8.2.3 Interlocked
Interlocked存在于System.Thread命名空间内,它的操作比Monitor使用更简单。
它存在CompareExchange、Decrement、Exchange、Increment等常用方法让参数在安全的情况进行数据交换。
Increment、Decrement 可以使参数安全地加1或减1并返回递增后的新值。
1 class Example
2 {
3 private int a=1;
4
5 public void AddOne()
6 {
7 int newA=Interlocked.Increment(ref a);
8 }
9 }
Exchange可以安全地变量赋值。
1 public void SetData()
2 {
3 Interlocked.Exchange(ref a,100);
4 }
CompareExchange使用特别方便,它相当于if的用法,当a等于1时,则把100赋值给a。
1 public void CompareAndExchange()
2 {
3 Interlocked.CompareExchange(ref a,100,1);
4 }