IO操作的MDA(Direct memory access)模式:直接访问内存,是一种不经过CPU而直接进行内存数据存储的数据交换模式,几乎可以不损耗CPU的资源;
CLR所提供的异步编程模型就是充分利用硬件的DMA功能来释放CPU的压力;使用线程池进行管理,异步将工作移交给线程池中的某个工作线程来完成,直到异步完成,异步才会通过回调的方式通知线程池,让CLR响应异步完毕;
它是并发编程的一种形式,采用 future 模式或回调(callback)机制,以避免产生不必要的线程。一个 future(或 promise)类型代表一些即将完成的操作。新版 future 类型有Task 和Task<TResult>。 老式异步编程 API 中,采用回调或事件(event) ,而不是future。
异步编程的核心理念是异步操作:启动了的操作将会在一段时间后完成。这个操作正在执行时,不会阻塞原来的线程。启动了这个操作的线程,可以继续执行其他任务。当操作完成时,会通知它的 future,或者调用回调函数,以便让程序知道操作已经结束。
异步编程模式------利用委托和线程池实现的模式
APM 异步编程模型,Asynchronous Programming Model C#1.0
EAP 基于事件的异步编程模式,Event-based Asynchronous Pattern C#2.0
TAP 基于任务的异步编程模式,Task-based Asynchronous Pattern C#4.0
Asyncawait简化异步编程;任务并行库,Task Parallel Library C#5
APM
使用IAsyncResult设计模式的异步操作是通过名为 BeginXXX 和 EndXXX 的两个方法来实现,这两个方法分别指开始和结束异步操作。该模式允许用更少的CPU资源(线程)去做更多的操作,.NET Framework很多类也实现了该模式,同时我们也可以自定义类来实现该模式(也就是在自定义的类中实现返回类型为IAsyncResult接口的BeginXXX方法和接受IAsyncResult兼容类型作为唯一参数的EndXXX方法),另外委托类型也定义了BeginInvoke和EndInvoke方法。例如,FileStream类提供BeginRead和EndRead方法来从文件异步读取字节。这两个方法实现了 Read 方法的异步版本。
调用 BeginXXX 后,应用程序可以继续在调用线程上执行指令,同时异步操作在另一个线程上执行(如果有返回值还应调用 EndXXX结束异步操作,并向该方法传递BeginXXX 方法返回的IAsyncResult对象,获取操作的返回值)。
CompletedSynchronously属性值侧重与提示信息,而非操作
访问异步操作的结果,APM提供了四种方式:
1.在调用BeginXXX方法的线程上调用EndXXX方法来得到异步操作的结果;但是这种方式会阻塞调用线程,在知道操作完成之后调用线程才能继续运行。
2.循环查询IAsyncResult的IsComplete属性,操作完成后再调用EndXXX方法来获得操作返回的结果。
3.IAsyncResult的AsyncWaitHandle属性实现更加灵活的等待逻辑,调用该属性WaitOne()方法来使一个线程阻塞并等待操作完成;再调用EndXXX方法来获得操作的结果。WaitHandle.WaitOne()可以指定最长的等待时间,如超时返回false;
4. 在调用BeginXXX方法时提供AsyncCallback委托的实例作为参数,在异步操作完成后委托会自动调用(AsyncCallback对象)指定的方法。(首选方式)AsyncCallback委托仅能够调用符合特定模式的方法(只有一个参数IAsyncResult,且没有返回值);
using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Runtime.Remoting.Messaging; namespace AsyncCallbackDelegate { public delegate int BinaryOp(int x, int y); class Program { private static bool isDone = false; static void Main(string[] args) { Console.WriteLine("***** AsyncCallbackDelegate Example *****"); Console.WriteLine("Main() invoked on thread {0}.", Thread.CurrentThread.ManagedThreadId); BinaryOp b = new BinaryOp(Add); IAsyncResult iftAR = b.BeginInvoke(10, 10, new AsyncCallback(AddComplete), "Main() thanks you for adding these numbers.");//传入数据 // Assume other work is performed here... while (!isDone) { Thread.Sleep(1000); Console.WriteLine("Working...."); } Console.ReadLine(); } #region Target for AsyncCallback delegate // Don't forget to add a 'using' directive for // System.Runtime.Remoting.Messaging! static void AddComplete(IAsyncResult itfAR) { Console.WriteLine("AddComplete() invoked on thread {0}.", Thread.CurrentThread.ManagedThreadId); Console.WriteLine("Your addition is complete"); // Now get the result. //AsyncCallback委托的目标无法调用其他方法中创建的委托 //IAsyncResult itfAR 实际上是System.Runtime.Remoting.Messaging命名空间AsyncResult类的一个实例 AsyncResult ar = (AsyncResult)itfAR; //AsyncDelegate静态属性返回原始异步委托引用 BinaryOp b = (BinaryOp)ar.AsyncDelegate; Console.WriteLine("10 + 10 is {0}.", b.EndInvoke(itfAR)); // Retrieve the informational object and cast it to string. //AsyncState属性获取 BeginInvoke第四个参数传入的值 string msg = (string)itfAR.AsyncState; Console.WriteLine(msg); isDone = true; } #endregion #region Target for BinaryOp delegate static int Add(int x, int y) { Console.WriteLine("Add() invoked on thread {0}.", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(5000); return x + y; } #endregion } }
异常捕获
在同步执行的方法里面通常处理异常的方式是将可能抛出异常的代码放到try...catch...finally里面,之所以能够捕获到,是因为发生异常的代码与调用的代码位于同一个线程。当调用一个异步方法发生异常时,CLR会捕获并且在EndXXX方法时再次将异常抛出抛出,所以异步调用中的异常在EndXXX方法出捕获就行了。
class ApmExceptionHandling { public static void Go() { WebRequest webRequest = WebRequest.Create("http://0.0.0.0/"); webRequest.BeginGetResponse(ProcessWebResponse, webRequest); Console.ReadLine(); } private static void ProcessWebResponse(IAsyncResult result) { WebRequest webRequest = (WebRequest)result.AsyncState; WebResponse webResponse = null; try { webResponse = webRequest.EndGetResponse(result); Console.WriteLine("Content length: " + webResponse.ContentLength); } catch (WebException we) { Console.WriteLine(we.GetType() + ": " + we.Message); } finally { if (webResponse != null) webResponse.Close(); } } }
APM WinForm UI线程回调
由于AsyncCallback委托回调是从ThreadPool中的线程执行的,因此对于Winform,如果回调需要操作UI控件,就需要返回到UI线程去,常用的两个方法:
1. Control类实现了ISynchronizeInvoke接口,提供了Invoke和BeginInvoke方法来支持其它线程更新GUI界面控件的机制(将回调方法投递到创建该控件的线程中执行)。
Control类的 Invoke,BeginInvoke 内部实现如下:
a) Invoke(同步调用)先判断控件创建线程与当前线程是否相同,相同则直接调用委托方法;否则使用Win32API的PostMessage异步执行,但是Invoke内部会调用IAsyncResult.AsyncWaitHandle等待执行完成。
b) BeginInvoke(异步调用)使用Win32API的PostMessage 异步执行,并且返回 IAsyncResult 对象。
使用方式:回调方法中对控件检测InvokeRequired值,if true,在该回调中封送一次委托,调用控件的Invoke/ BeginInvoke方法;
2.GUI(WinForm/WPF)应用程序引入了一个线程处理模型:创建窗口的线程是唯一能对那个窗口进行更新的线程;在GUI线程中,经常需要生成异步操作,使GUI线程不阻塞并停止响应用户输入。然而,异步操作完成时,由于是用一个线程池线程完成的,而线程池线程不能更新UI控件。为解决这些问题,FCL定义一个System.Threading.SynchronizationContext(线程同步上下文)的基类,其派生对象负责将一个应用程序模型连接到它的线程处理模型。
GUI线程都有一个和它关联的SynchronizationContext派生对象,使用其静态Current属性获取:SynchronizationContext sc = SynchronizationContext.Current; 将此对象传给其他线程,当一个线程池线程需要让GUI线程更新UI时,调用该对象的sc.Post方法,向Post传递一个匹配SendOrPostCallback委托签名的回调方法(一般是更新UI的操作方法,由GUI线程去执行),以及一个要传给回调方法的实参。
SynchronizationContext 的Post方法和Send方法的区别:(分别对应于异步/同步调用)
Post方法将回调方法送人GUI线程的队列,允许程序池线程立即返回,不进行阻塞;Post方法内部调用了BeginInvoke方法;
Send方法也将回调方法送人GUI线程的队列,但随后就会阻塞线程池线程,直到GUI线程完成对回调方法的调用。阻塞线程池线程极有可能造成线程池创建一个新的线程,避免调用该方法;Send方法内部调用了Invoke方法;
对winform来说是 System.Windows.Forms.WindowsFormsSynchronizationContext是其子类.
Winform窗口出现后,UI线程 SynchronizationContext.Current会被绑定赋值,只有UI线程的Current不为null。
Public class SendOrPostUI { public static void Go() { System.Windows.Forms.Application.Run(new MyWindowsForm()); } private static AsyncCallback SyncContextCallback(AsyncCallback callback) { // Capture the calling thread's SynchronizationContext-derived object SynchronizationContext sc = SynchronizationContext.Current; // If there is no SC, just return what was passed in if (sc == null) return callback; // Return a delegate that, when invoked, posts to the captured SC a method that // calls the original AsyncCallback passing it the IAsyncResult argument return asyncResult => sc.Post(result => callback((IAsyncResult)result), asyncResult); } private sealed class MyWindowsForm : System.Windows.Forms.Form { public MyWindowsForm() { Text = "Click in the window to start a Web request"; Width = 400; Height = 100; } protected override void OnMouseClick(System.Windows.Forms.MouseEventArgs e) { // The GUI thread initiates the asynchronous Web request Text = "Web request initiated"; var webRequest = WebRequest.Create("http://Wintellect.com/"); webRequest.BeginGetResponse(SyncContextCallback(ProcessWebResponse), webRequest); base.OnMouseClick(e); } private void ProcessWebResponse(IAsyncResult result) { // If we get here, this must be the GUI thread, it's OK to update the UI var webRequest = (WebRequest)result.AsyncState; using (var webResponse = webRequest.EndGetResponse(result)) { Text = "Content length: " + webResponse.ContentLength; } } } }
比较两种方法其实差不太多,一个是回调内再次包装,一个是包装原来的回调。但是SynchronizationContext业务层与UI分离来讲的话是比较好;
EAP
EAP是为了更便于处理UI的更新推出的模式,主要优点:它同Visual Studio UI设计器进行了很好的集成,可将大多数实现了EAP的类拖放到设计平面(design surface)上,双击控件对应的XXXCompleted事件名,会自动生成事件的回调方法,并将方法同事件自身联系起来。EAP保证事件在应用程序的GUI线程上引发,允许事件回调方法中的代码更新UI控件;
EAP另一重要功能:支持EAP的类自动将应用程序模型映射到它的线程处理模型;EAP类在内部使用SynchronizationContext类。有的EAP类提供了取消、进度报告功能。
FCL中只有17个类型实现了EAP模式,一般有一个 XXXAsync方法和一个对应的XXXCompleted事件,以及这些方法的同步版本:
System.Object的派生类型:
System.Activies.WorkflowInvoke
System.Deployment.Application.ApplicationDeployment
System.Deployment.Application.InPlaceHosingManager
System.Net.Mail.SmtpClient
System.Net.PeerToPeer.PeerNameResolver
System.Net.PeerToPeer.Collaboration.ContactManager
System.Net.PeerToPeer.Collaboration.Peer
System.Net.PeerToPeer.Collaboration.PeerContact
System.Net.PeerToPeer.Collaboration.PeerNearMe
System.ServiceModel.Activities.WorkflowControlClient
System.ServiceModel.Discovery.AnnoucementClient
System.ServiceModel.Discovery.DiscoveryClient
System.ComponentModel.Component的派生类型:
System.ComponentModel.BackgroundWorker
System.Net.NetworkInformation.Ping
System.Windows.Forms.PictureBox(继承于Control类,Control类派生于Component类)
private sealed class MyForm : System.Windows.Forms.Form { protected override void OnClick(EventArgs e) { // The System.Net.WebClient class supports the Event-based Asynchronous Pattern WebClient wc = new WebClient(); // When a string completes downloading, the WebClient object raises the // DownloadStringCompleted event which will invoke our ProcessString method wc.DownloadStringCompleted += ProcessString; // Start the asynchronous operation (this is like calling a BeginXxx method) wc.DownloadStringAsync(new Uri("http://Wintellect.com")); base.OnClick(e); } // This method is guaranteed to be called via the GUI thread private void ProcessString(Object sender, DownloadStringCompletedEventArgs e) { // If an error occurred, display it; else display the downloaded string System.Windows.Forms.MessageBox.Show((e.Error != null) ? e.Error.Message : e.Result); } }
BackgroundWorker:只有该类型用于可用于执行异步的计算限制的工作;提供三个事件:
DoWork:向这个事件登记的方法应该包含计算限制的代码。这个事件由一个线程池线程调用RunWorkerAsync(两个重载方法,带参的方法是向DoWork登记的方法的DoWorkEventArgs参数对象的Argument属性传值,只能在登记的方法中(如e.Argument)获取,Result属性必须设置成计算限制的操作希望返回的值)时引发;
ProgressChanged:向这个事件登记的方法应该包含使用进度信息来更新UI的代码。这个事件总是在GUI线程上引发。DoWork登记的方法必须定期调用BackgroundWorker的ReportProgress方法来引发ProgressChanged事件;
RunWorkerCompleted:向这个事件登记的方法应该包含使用计算限制操作的结果对UI进行更新的代码。这个事件总是在GUI线程上引发。Result获取表示异步操作的结果;
公共属性:CancellationPending(标识是否已请求取消后台操作)、IsBusy(标识是否正在运行异步操作)、WorkReportsProgress(获取/设置是否报告进度更新)、WorkerSupportsCancellation(获取/设置是否支持异步取消)
公共方法:CancelAsync(请求取消挂起的后台操作)、ReportProgress、RunWorkerAsync
异常
异常不会抛出。在XXXCompleted事件处理方法中,必须查询AsyncCompletedEventArgs的Exception属性,看它是不是null。如果不是null,就必须使用if语句判断Exception派生对象的类型,而不是使用catch块。
TAP
.NET4.0 中引入了新的异步编程模型“基于任务的异步编程模型(TAP)”,并且推荐我们在开发新的多线程应用程序中首选TAP,在.NET4.5中更是对TPL库进行了大量的优化与改进(async和await)。那现在我先介绍下TAP具有哪些优势:
- 任务调度器(TaskScheduler)依赖于底层的线程池引擎,可自定义一个TaskScheduler更改调度算法,同时不更改代码或编程模型。通过局部队列的任务内联化(task inlining)和工作窃取(work-stealing)机制而发起了大量任务,Task可以为我们提升程序性能。
- 可以使用PreferFairness标志,获取与ThreadPool.QueueUserWorkItem或者一个委托的BeginInvoke相同的线程池行为。
3. 轻松实现任务等待、任务取消、延续任务、异常处理(System.AggregateException)、GUI线程操作。
4. 在任务启动后,可以随时以任务延续的形式注册回调。
5. 充分利用现有的线程,避免创建不必要的额外线程。
6. 结合C#5.0引入async和await关键字轻松实现“异步方法”。
APM转换为TAP:
使用TaskFactory的FromAsync方法,传递四个实参:BeginXxx方法、EndXxx方法、Object状态、可选的TaskCreationOptions值,返回对一个Task对象的引用;
private static void ConvertingApmToTask() { // Instead of this: WebRequest webRequest = WebRequest.Create("http://Wintellect.com/"); webRequest.BeginGetResponse(result => { WebResponse webResponse = null; try { webResponse = webRequest.EndGetResponse(result); Console.WriteLine("Content length: " + webResponse.ContentLength); } catch (WebException we) { Console.WriteLine("Failed: " + we.GetBaseException().Message); } finally { if (webResponse != null) webResponse.Close(); } }, null); Console.ReadLine(); // for testing purposes // Make a Task from an async operation that FromAsync starts webRequest = WebRequest.Create("http://Wintellect.com/"); var t1 = Task.Factory.FromAsync<WebResponse>(webRequest.BeginGetResponse, webRequest.EndGetResponse, null, TaskCreationOptions.None); var t2 = t1.ContinueWith(task => { WebResponse webResponse = null; try { webResponse = task.Result; Console.WriteLine("Content length: " + webResponse.ContentLength); } catch (AggregateException ae) { if (ae.GetBaseException() is WebException) Console.WriteLine("Failed: " + ae.GetBaseException().Message); else throw; } finally { if (webResponse != null) webResponse.Close(); } }); try {t2.Wait(); // for testing purposes only} catch (AggregateException) { } }
EAP转换成TAP:
使用System.Threading.Tasks.TaskCompletionSource类进行包装;
当构造一个TaskCompletionSource对象,也会生成一个Task,可通过其Task属性获取;当一个异步操作完成时,它使用TaskCompletionSource对象来设置它因为什么而完成,取消,未处理的异常或者它的结果。调用某个SetXxx方法,可以设置底层Task对象的状态。
private sealed class MyFormTask : System.Windows.Forms.Form { protected override void OnClick(EventArgs e) { // The System.Net.WebClient class supports the Event-based Asynchronous Pattern WebClient wc = new WebClient(); // Create the TaskCompletionSource and its underlying Task object var tcs = new TaskCompletionSource<String>(); // When a string completes downloading, the WebClient object raises the // DownloadStringCompleted event which will invoke our ProcessString method wc.DownloadStringCompleted += (sender, ea) => { // This code always executes on the GUI thread; set the Task’s state if (ea.Cancelled) tcs.SetCanceled(); else if (ea.Error != null) tcs.SetException(ea.Error); else tcs.SetResult(ea.Result); }; // Have the Task continue with this Task that shows the result in a message box // NOTE: The TaskContinuationOptions.ExecuteSynchronously flag is required to have this code // run on the GUI thread; without the flag, the code runs on a thread pool thread tcs.Task.ContinueWith(t => { try { System.Windows.Forms.MessageBox.Show(t.Result);} catch (AggregateException ae) { System.Windows.Forms.MessageBox.Show(ae.GetBaseException().Message); } }, TaskContinuationOptions.ExecuteSynchronously); // Start the asynchronous operation (this is like calling a BeginXxx method) wc.DownloadStringAsync(new Uri("http://Wintellect.com")); base.OnClick(e); } }
实现了TAP的类:存在XxxTaskAsync的方法, 支持异步操作的取消和进度的报告的功能;
取消:可以通过协作式取消模式,向异步方法传入CancellationToken 参数,通过调用其ThrowIfCancellationRequested方法来定时检查操作是否已经取消;
进度报告:可以通过IProgress<T>接口来实现进度报告的功能;
更新GUI: TaskScheduler.FromCurrentSynchronizationContext()获取同步上下文任务调度器,将关联该对象的所有任务都调度给GUI线程,使任务代码能成功更新UI;
private sealed class MyForm : System.Windows.Forms.Form { public MyForm() { Text = "Synchronization Context Task Scheduler Demo"; Visible = true; Width = 400; Height = 100; } private static Int32 Sum(CancellationToken ct, Int32 n) { Int32 sum = 0; for (; n > 0; n--) { // The following line throws OperationCanceledException when Cancel // is called on the CancellationTokenSource referred to by the token ct.ThrowIfCancellationRequested(); //Thread.Sleep(0); // Simulate taking a long time checked { sum += n; } } return sum; } private readonly TaskScheduler m_syncContextTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(); private CancellationTokenSource m_cts; protected override void OnMouseClick(System.Windows.Forms.MouseEventArgs e) { if (m_cts != null) { // An operation is in flight, cancel it m_cts.Cancel(); m_cts = null; } else { // An operation is not in flight, start it Text = "Operation running"; m_cts = new CancellationTokenSource(); // This task uses the default task scheduler and executes on a thread pool thread var t = new Task<Int32>(() => Sum(m_cts.Token, 20000), m_cts.Token); t.Start(); // These tasks use the synchronization context task scheduler and execute on the GUI thread t.ContinueWith(task => Text = "Result: " + task.Result, CancellationToken.None, TaskContinuationOptions.OnlyOnRanToCompletion, m_syncContextTaskScheduler); t.ContinueWith(task => Text = "Operation canceled", CancellationToken.None, TaskContinuationOptions.OnlyOnCanceled, m_syncContextTaskScheduler); t.ContinueWith(task => Text = "Operation faulted", CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted, m_syncContextTaskScheduler); } base.OnMouseClick(e); } }
异常处理
在任务抛出的未处理异常都封装在System.AggregateException对象中。这个对象会存储在方法返回的Task或Task<TResult>对象中,需要通过访问Wait()、Result、Exception成员才能观察到异常。(所以,在访问Result之前,应先观察IsCanceled和IsFaulted属性)
假如一直不访问Task的Wait()、Result、Exception成员,那么你将永远注意不到这些异常的发生。为了帮助你检测到这些未处理的异常,可以向TaskScheduler对象的UnobservedTaskException事件注册回调函数。每当一个Task被垃圾回收时,如果存在一个没有注意到的异常,CLR的终结器线程会引发这个事件。
可在事件回调函数中调用UnobservedTaskExceptionEventArgs对象的SetObserved() 方法来指出已经处理好了异常,从而阻止CLR终止线程。然而并不推荐这么做,宁愿终止进程也不要带着已经损坏的状态继续运行。
Async /Await
在.NET Framework 4.0中添加.NET Framework 4.5中新的异步操作库(async/await),该包由三个库组成:Microsoft.Bcl、Microsoft.Bcl.Async和Microsoft.Bcl.Build。
Install-Package Microsoft.Bcl.Async
注:asp.net 框架必须要升级.net framework框架才能使用 async/await
如果异常信息是“Message : Could not load file or assembly 'System.Core, Version=2.0.5.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e, Retargetable=Yes' or one of its dependencies. The given assembly name or codebase was invalid. (Exception from HRESULT: 0x80131047)”,
那需要你去微软官网下载.net4.0的KB2468871补丁来安装。
C# 5引入了异步函数(asynchrnous function)的概念。通常是指用async修饰符声明的,可
包含await表达式的方法或匿名函数;
async关键字创建了一个状态机,类似于yield return语句;await关键字只能用于有用async修饰符声明的方法。async修饰符只能用于返回Task/Task<TResult>或void的方法。await只能用来调用返回Task/Task<TResult>的方法;await会解除线程的阻塞,完成调用的任务;等待任务完成后,获取结果,然后执行await关键字后面的代码;编译器会把await的表达式后的代码使用 Task.ContinueWith 包装了起来,回调时默认使用当前线程的同步上下文任务调度器;如果不使用相同的同步上下文,必须调用Task实例的ConfigureAwait(false)方法;
await msg.Content.ReadAsStringAsync().ConfigureAwait(false);
异步方法的声明语法与其他方法完全一样,只是要包含async上下文关键字。async可以出
现在返回类型之前的任何位置。async修饰符在生成的代码中没有作用,也可省略不写,它明确表达了你的预期,告诉编译器可以主动寻找await表达式,也可以寻找应该转换成异步调用和await表达式的块调用。
调用者和异步方法之间是通过返回值来通信的。异步函数的返回类型只能为:
Void 、Task、Task<TResult>;Task和Task<TResult>类型都表示一个可能还未完成的操作。 Task<TResult>继承自Task。二者的区别是,Task<TResult>表示一个返回值为T类型的操作,而Task则不需要产生返回值。在某种意义上,你可以认为Task就是Task<void>类型;
之所以将异步方法设计为可以返回void,是为了和事件处理程序兼容。
异步方法签名的约束:所有参数都不能使用out或ref修饰符。