zoukankan      html  css  js  c++  java
  • 实践基于Task的异步模式

    Await

    返回该系列目录《基于Task的异步模式--全面介绍》


         在API级别,实现没有阻塞的等待的方法是提供callback(回调函数)。对于Tasks来说,这是通过像ContinueWith的方法实现的。基于语言的异步支持通过允许在正常控制流内部等待异步操作隐藏callbacks,具有和编译器生成的代码相同的API级别的支持。

        在.Net 4.5,C#直接异步地支持等待的Task和Task<TResult>,在C#中使用"await"关键字。如果等待一个Task,那么await表达式是void类型。如果等待一个Task<TResult>,那么await表达式是TResult类型。await表达式必须出现在返回类型是void,Task,Task<TResult>的异步方法体内。

       在幕后,await功能通过在该Task上的一个continuation(后续操作)安装了一个callback(回调函数)。此回调将恢复暂停的异步方法。当该异步方法恢复时,如果等待的异步操作成功地完成,并且它是一个Task<TResult>,那它就返回一个TResult。如果等待的Task或者Task<TResult>以Canceled状态结束,那么会引发OperationCanceledException。如果等待的Task或者Task<TResult>以Faulted状态结束,那么会引发造成它失败的异常。对于一个Task来说,可能由于多个异常造成失败,在这种情况下,这些异常只有一个会传递。然而,该Task的Exception属性会返回一个包含所有错误的AggregateException。

    如果一个同步上下文和执行在挂起状态的异步方法相关联(如SynchronizationContext.Current是非空的),那么异步方法的恢复会通过使用该下上文的Post方法发生在相同的同步上下文上。否则,它会依赖当前在挂起点的System.Threading.Tasks.TaskScheduler是什么(针对.Net线程池,一般这个默认是TaskScheduler.Default)。它取决于该TaskScheduler是否允许恢复在等待异步操作完成的地方或者强制恢复被调度的地方执行。默认的调度一般会允许后续操作在等待的操作完成的线程上执行。

    当调用的时候,异步方法同步地执行方法体,直到遇到在还没有完成的等待的实例上的第一个await表达式,此时控制权返回给调用者。如果该异步方法没有返回void,那么就会返回Task或Task<TResult>表示正在进行的计算。在一个非void的异步方法中,如果遇到了return语句或者到达了方法体的末尾,那么该task会以RanToCompletion的最终状态完成。如果一个未处理的异常造成控制权离开了异步方法体,该任务会以Faulted状态结束(如果异常是OperationCanceledException,任务会以Canceled状态结束)。结果或异常最终以这种方式发布。

    Yield 和 ConfigureAwait

    一些成员对异步方法的执行提供了更多的控制。Task类提供了一个Yield方法,可以使用它把一个屈服点(yield point)引入异步方法。

    public class Task : …

    {

    public static YieldAwaitable Yield();

    }

    这个等价于异步地推送(post)或调度回到当前的上下文。

    Task.Run(async delegate

    {

    for(int i=0; i<1000000; i++)

    {

    await Task.Yield(); // fork the continuation into a separate work item

    ...

    }
    });

    Task类也提供了ConfigureAwait方法更多的控制在异步方法中如何发生挂起和恢复。正如之前提到的,默认异步方法挂起时,当前上下文被捕获,用捕获的上下文在恢复时调用异步方法的后续操作。在许多案例中,这是你想要的确切的行为。然而,在某些情况下你不关心你在哪里结束,结果你可以通过避免这些返回到原始上下文的posts来实现更好的性能。为了开启这个,可以使用ConfigureAwait通知await操作不捕获和恢复上下文,而是更倾向于,无论异步操作在哪儿等待完成,都继续执行:

    await someTask.ConfigureAwait(continueOnCapturedContext:false);

    Cancellation(撤销)

    支持可撤销的TAP方法都至少公开了接受一个CancellationToken的重载,该类型在是在.Net 4的System.Threading中引入的。

    CancellationToken是通过CancellationTokenSource创建的。当CancellationTokenSource的Cancel方法调用时,它的Token属性会返回接收到信号的CancellationToken。比如,思考一下,下载一个单独的web页面,然后想要取消该操作。我们创建一个CancellationTokenSource,再把它的token传给TAP方法,以后会可能调用它的Cancel方法:

    var cts = new CancellationTokenSource();

    string result = await DownloadStringAsync(url, cts.Token);

    // at some point later, potentially on another thread

    cts.Cancel();

    为了取消多个异步的调用,可以将相同的token传给多有的调用:

    var cts = new CancellationTokenSource();

    IList<string> results = await Task.WhenAll(

    from url in urls select DownloadStringAsync(url, cts.Token));

    cts.Cancel();

    类似地,相同的token也可以有选择性地传给异步操作的子集:

    var cts = new CancellationTokenSource();

    byte [] data = await DownloadDataAsync(url, cts.Token);

    await SaveToDiskAsync(outputPath, data, CancellationToken.None);

    cts.Cancel();

    来自任何线程的Cancellation请求都可以被初始化。

    为了表明cancellation请求永远不会发出,CancellationToken.None可以传给任何接受一个CancellationToken的方法。被调用者会发现cancellationToken的CanBeCanceled属性会返回false,因此它起到了优化。

    相同的CancellationToken可以分发给任何数量的异步和同步操作。这是CancellationToken 方法强项之一:cancellation可能使同步方法的调用请求的,并且相同的cancellation请求可能激增到任何数量的监听器。另一个好处是异步API的开发者可以完全地控制cancellation是否可能被请求以及cancellation何时生效,还有该API的消费者可以选择性地决定多个异步调用的cancellation请求中哪一个会被传播。

    Progress

    一些异步方法通过把progress(进度)接口传给该异步方法来公开进度。比如,思考一下异步下载一个文本字符串的函数,然后会引发包含至今已完成下载的百分比的进度的更新。下面的WPF应用的一个例子用到了这样一个方法:

    private async void btnDownload_Click(object sender, RoutedEventArgs e)

    {

    btnDownload.IsEnabled = false;

    try

    {

    txtResult.Text = await DownloadStringAsync(txtUrl.Text,

    new Progress<int>(p => pbDownloadProgress.Value = p));

    }

    finally { btnDownload.IsEnabled = true; }

    }

    使用内置的基于Task的连接器

    System.Threading.Tasks命名空间包含了几种处理和组合tasks的主要方法。

    Task.Run

    Task类公开了几种Run方法,它们可以轻易地作为线程池的Task或Task<TResult>进行卸载工作,如:

    public async void button1_Click(object sender, EventArgs e)

    {

    textBox1.Text = await Task.Run(() =>

    {

    // … 这里处理一些计算受限的任务

    return answer;

    });

    }

    其中一些Run方法是自从.Net 4 就存在的TaskFactory.StartNew方法的简写。然而,其他的重载(如Run<TResult>(Func<Task<TResult>>))可以在卸载工作中使用await关键字,如:

    public async void button1_Click(object sender, EventArgs e)

    {

    pictureBox1.Image = await Task.Run(() =>

    {

    using(Bitmap bmp1 = await DownloadFirstImageAsync())

    using(Bitmap bmp2 = await DownloadSecondImageAsync())

    return Mashup(bmp1, bmp2);

    });

    }

    Task.FromResult

    对于已经是可使用的或只需要从返回task的方法返回的数据提升到Task<TResult>的场景,可以使用Task.FromResult方法:

    public Task<int>GetValueAsync(string key)

    {

    int cachedValue;

    return TryGetCachedValue(out cachedValue) ?

    Task.FromResult(cachedValue) :

    GetValueAsyncInternal();

    }

    private async Task<int> GetValueAsyncInternal(string key)

    {

    }

    Task.WhenAll

    WhenAll方法用于异步等待多个代表Tasks的异步操作。为了适应一系列的非泛型的tasks或者一系列不均匀的泛型tasks(例如,异步等待多个返回void的操作,或者异步等待多个返回值类型的方法,每个值可以是不同的类型)以及一系列均匀 的泛型tasks(如,异步等待多个返回TResult的方法)。

    思考给多个顾客发送邮件的需求。我们可以重叠所有的邮件发送(在发送下一封邮件之前不需要等待前一封邮件已经完成发送),然后我们需要知道发送何时完成和是否有错误发生。

    IEnumerable<Task> asyncOps = from addr in addrs select SendMailAsync(addr);

    await Task.WhenAll(asyncOps);

    上面的代码没有显示地处理可能发生的异常,反而选择让异常在WhenAll产生的task上的await传播出来。为了处理这些异常,开发者可以使用像下面这样的代码:

    IEnumerable<Task> asyncOps = from addr in addrs select SendMailAsync(addr);

    try

    {

    await Task.WhenAll(asyncOps);

    }

    catch(Exception exc)

    {

    ...

    }

    思考另一个从web上异步地下载多个文件的例子。在这种情况下,所有的异步操作有同类的结果类型,并且这些结果的访问很简单:

    string [] pages = await Task.WhenAll(

    from url in urls select DownloadStringAsync(url));

    作为之前返回void的案例,这里可以使用相同的异步处理技巧:

    Task [] asyncOps =

    (from url in urls select DownloadStringAsync(url)).ToArray();

    try

    {

    string [] pages = await Task.WhenAll(asyncOps);

    ...

    }

    catch(Exception exc)

    {

    foreach(Task<string> faulted in asyncOps.Where(t => t.IsFaulted))

    {

    // work with faulted and faulted.Exception

    }

    }

    Task.WhenAny

    WhenAny API异步地等待表示多个异步的操作,并且只是异步地等待它们中的一个完成。WhenAny主要有四种用法:

    1. 冗余。多次执行一个操作,并且选择最先完成的那个(例如,联系多个会产生唯一结果的股票报价的Web服务并选择完成最快的那个)。
    2. 交叉。启动多个操作,并需要它们都要完成,但是当它们完成时再处理。
    3. 调节。当其它操作完成时允许多余的操作开始。这是交叉情况的扩展。
    4. 提早应急。t1代表的操作可以组合进伴有另一个task t2的WhenAny,然后我们可以等待WhenAny task。t2可以代表一个超时,撤销或某些其他的信号来使WhenAny在t1完成之前完成。

    冗余

    思考一下,我们是否买一只股票的决定的情况。我们有多个我们信赖的股票推荐Web服务,但是基于每日负荷,每一种服务可能终将在不同的时间变得相当缓慢。我们可以发挥WhenAny的优势来知晓任意一个操作何时完成:

    var recommendations = new List<Task<bool>>()

    {

    GetBuyRecommendation1Async(symbol),

    GetBuyRecommendation2Async(symbol),

    GetBuyRecommendation3Async(symbol)

    };

    Task<bool> recommendation = await Task.WhenAny(recommendations);

    if (await recommendation) BuyStock(symbol);

    不像WhenAll在所有的tasks成功完成的情况下返回它们为包装的结果集,WhenAny返回完成的Task:如果一个task失败了,知道哪一个task失败很重要;如果一个task成功了,知道返回的值和哪一个task相关很重要。因为这个原因,我们需要访问返回的task的Result属性,或者进一步等待它直到它完成。

    自从有了WhenAll,我们需要能够适应异常情况。由于已经接收到了返回的已完成的task,为了让错误传播,我们可以等待返回的task,然后适当地try/catch,例如:

    Task<bool> [] recommendations = …;

    while(recommendations.Count > 0)

    {

    Task<bool> recommendation = await Task.WhenAny(recommendations);

    try

    {

    if (await recommendation) BuyStock(symbol);

    break;

    }

    catch(WebException exc)

    {

    recommendations.Remove(recommendation);

    }

    }

    除此之外,即使第一个task成功完成了,随后 的task也可能失败。此时,我们有多个处理这些异常的选择。一种用例可能要求直到所有启动的tasks已经完成才做进一步的向前进展,在这种情况我们可以使用WhenAll。另一种用例要求所有的异常必须要记录。对于这个,当tasks已经异步完成时,我们可以利用后续操作直接接收一个通知:

    foreach(Task recommendation in recommendations)

    {

    var ignored = recommendation.ContinueWith(

    t => { if (t.IsFaulted) Log(t.Exception); });

    }

    或者

    foreach(Task recommendation in recommendations)

    {

    var ignored = recommendation.ContinueWith(

    t => Log(t.Exception), TaskContinuationOptions.OnlyOnFaulted);

    }

    或者

    private static async void LogCompletionIfFailed(IEnumerable<Task> tasks)

    {

    foreach(var task in tasks)

    {

    try { await task; }

    catch(Exception exc) { Log(exc); }

    }
    }

    LogCompletionIfFailed(recommendations);

    最后,开发者可能实际想要取消所有的保留的操作。

    var cts = new CancellationTokenSource();

    var recommendations = new List<Task<bool>>()

    {

    GetBuyRecommendation1Async(symbol, cts.Token),

    GetBuyRecommendation2Async(symbol, cts.Token),

    GetBuyRecommendation3Async(symbol, cts.Token)

    };

     

    Task<bool> recommendation = await Task.WhenAny(recommendations);

    cts.Cancel();

    if (await recommendation) BuyStock(symbol);

    交叉

    思考这样一个情况,从Web下载图片,并对每张图片做一些处理,例如把它加到UI控件上去。我们需要按顺序处理(在UI控件的例子中,在UI线程上),但我们想要尽可能地并发下载。并且我们不想所有图片都下载完毕后再加载到UI上,而是当它们完成时就加载它们:

    List<Task<Bitmap>> imageTasks =

    (from imageUrl in urls select GetBitmapAsync(imageUrl)).ToList();

    while(imageTasks.Count > 0)

    {

    try

    {

    Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);

    imageTasks.Remove(imageTask);

     

    Bitmap image = await imageTask;

    panel.AddImage(image);

    }

    catch{}

    }

    那相同的交叉可以应用到涉及下载以及在下载的图片上的线程池的进行计算密集的处理的场景上,例如:

    List<Task<Bitmap>> imageTasks =

    (from imageUrl in urls select GetBitmapAsync(imageUrl)

    .ContinueWith(t => ConvertImage(t.Result)).ToList();

    while(imageTasks.Count > 0)

    {

    try

    {

    Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);

    imageTasks.Remove(imageTask);

     

    Bitmap image = await imageTask;

    panel.AddImage(image);

    }

    catch{}

    }

    调节

    思考和交叉例子相同的案例,除了用户正在下载很多图片,这些下载需要显示调节,例如,只有15个下载可能同时发生。为实现这个,异步操作的子集可能会被调用。当操作完成的时候,其他的操作会取而代之被调用。

    const int CONCURRENCY_LEVEL = 15;

    Uri [] urls = …;

    int nextIndex = 0;

    var imageTasks = new List<Task<Bitmap>>();

    while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)

    {

    imageTasks.Add(GetBitmapAsync(urls[nextIndex]));

    nextIndex++;

    }

     

    while(imageTasks.Count > 0)

    {

    try

    {

    Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);

    imageTasks.Remove(imageTask);

     

    Bitmap image = await imageTask;

    panel.AddImage(image);

    }

    catch(Exception exc) { Log(exc); }

     

    if (nextIndex < urls.Length)

    {

    imageTasks.Add(GetBitmapAsync(urls[nextIndex]));

    nextIndex++;

    }

    }

    提早应急

    思考当异步等待一个操作完成的同时,又要响应一个用户的撤销请求(例如,点击UI上的一个取消按钮)。

    private CancellationTokenSource m_cts;

     

    public void btnCancel_Click(object sender, EventArgs e)

    {

    if (m_cts != null) m_cts.Cancel();

    }

     

    public async void btnRun_Click(object sender, EventArgs e)

    {

    m_cts = new CancellationTokenSource();

    btnRun.Enabled = false;

    try

    {

    Task<Bitmap> imageDownload = GetBitmapAsync(txtUrl.Text);

    await UntilCompletionOrCancellation(imageDownload, m_cts.Token);

    if (imageDownload.IsCompleted)

    {

    Bitmap image = await imageDownload;

    panel.AddImage(image);

    }

    else imageDownload.ContinueWith(t => Log(t));

    }

    finally { btnRun.Enabled = true; }

    }

     

    private static async Task UntilCompletionOrCancellation(

    Task asyncOp, CancellationToken ct)

    {

    var tcs = new TaskCompletionSource<bool>();

    using(ct.Register(() => tcs.TrySetResult(true)))

    await Task.WhenAny(asyncOp, tcs.Task);

    return asyncOp;

    }

    我们一决定拯救而不取消隐含的异步操作,这个实现就会再次开启UI。当我们决定拯救的时候,另一个选择会取消挂起的操作,然而直到操作最终完成才会重建UI,这可能是因为早期由于取消请求结束造成的:

    private CancellationTokenSource m_cts;

     

    public async void btnRun_Click(object sender, EventArgs e)

    {

    m_cts = new CancellationTokenSource();

     

    btnRun.Enabled = false;

    try

    {

    Task<Bitmap> imageDownload = GetBitmapAsync(txtUrl.Text, m_cts.Token);

    await UntilCompletionOrCancellation(imageDownload, m_cts.Token);

    Bitmap image = await imageDownload;

    panel.AddImage(image);

    }

    catch(OperationCanceledException) {}

    finally { btnRun.Enabled = true; }

    }

    使用 WhenAny 为早期提供紧急援助的另一个例子涉及到Task.WhenAny 与 Task.Delay 的一起使用。

    Task.Delay

    之前演示过,可以通过调用Task.Delay把中断引入一个异步方法的执行当中。这对于各种各样的功能是有用的,包括构建轮询,对于一个预定时间内用户输入处理的延迟等诸如此类。在联合Task.WhenAny对await实现超时也是有用的。

    如果一个task是一个更大的异步操作(如,一个ASP.Net Web服务)的一部分,花费太长时间完成了,那么整体的操作就会变差,尤其是如果该操作曾经没有完成的话。为此,能够在一个异步操作上超时等待很重要。同步的Task.Wait, WaitAll, 和WaitAny方法接收超时值,但相应的 ContinueWhenAll/Any和上述的 WhenAll/WhenAny APIs不这样做。相反,Task.Delay和Task.WhenAny可以联合使用来实现超时。

    思考一个UI应用,该应用想下载一张图片,并且当图片在下载的过程中时,UI不可用。然而,如果下载需要花很长时间,那么该UI应该可用并且下载的操作应该放弃。

    public async void btnDownload_Click(object sender, EventArgs e)

    {

    btnDownload.Enabled = false;

    try

    {

    Task<Bitmap> download = GetBitmapAsync(url);

    if (download == await Task.WhenAny(download, Task.Delay(3000)))

    {

    Bitmap bmp = await download;

    pictureBox.Image = bmp;

    status.Text = "Downloaded";

    }

    else

    {

    pictureBox.Image = null;

    status.Text = "Timed out";

    var ignored = download.ContinueWith(

    t => Trace("Task finally completed"));

    }

    }

    finally { btnDownload.Enabled = true; }

    }

    既然WhenAll返回一个task,那么一样可以用到多个下载上:

    public async void btnDownload_Click(object sender, RoutedEventArgs e)

    {

    btnDownload.Enabled = false;

    try

    {

    Task<Bitmap[]> downloads =

    Task.WhenAll(from url in urls select GetBitmapAsync(url));

    if (downloads == await Task.WhenAny(downloads, Task.Delay(3000)))

    {

    foreach(var bmp in downloads) panel.AddImage(bmp);

    status.Text = "Downloaded";

    }

    else

    {

    status.Text = "Timed out";

    downloads.ContinueWith(t => Log(t));

    }

    }

    finally { btnDownload.Enabled = true; }

    }

    构建基于Task的连接器

    由于一个task完全有能力表示一个异步操作,并提供同步和异步连接,检索该操作等的能力,因而构建有用的组合创建更大模式的tasks的"连接器"库成为可能。该文章先前提到过,.Net Framework包括了多个内置的连接器,然而,也可能和期待开发者创建他们自己的。这里我们提供几个可能会用到的连接器方法和类型的例子。

    RetryOnFault(错误重试)

    在很多场合,如果一个之前的尝试操作失败了,很渴望重试一下该操作。对于同步代码来说,我们可以构建一个帮助方法来完成这个:

    public static T RetryOnFault<T>(

    Func<T> function, int maxTries)

    {

    for(int i=0; i<maxTries; i++)

    {

    try { return function(); }

    catch { if (i == maxTries-1) throw; }

    }

    return default(T);

    }

    我们可以构建一个几乎完全一样的帮助方法,但是针对使用TAP实现的异步操作的,因而返回tasks:

    public static async Task<T> RetryOnFault<T>(

    Func<Task<T>> function, int maxTries)

    {

    for(int i=0; i<maxTries; i++)

    {

    try { return await function().ConfigureAwait(false); }

    catch { if (i == maxTries-1) throw; }

    }

    return default(T);

    }

    有了自己的函数,我们现在可以利用此连接器将重试编码到应用逻辑中,如:

    // Download the URL, trying up to three times in case of failure

    string pageContents = await RetryOnFault(

    () => DownloadStringAsync(url), 3);

    RetryOnFault函数可以进一步扩展,例如,为了决定何时重试更好,可以接受在重试操作之间的另一个Func<Task>:

    public static async Task<T> RetryOnFault<T>(

    Func<Task<T>> function, int maxTries, Func<Task> retryWhen)

    {

    for(int i=0; i<maxTries; i++)

    {

    try { return await function(); }

    catch { if (i == maxTries-1) throw; }

    await retryWhen().ConfigureAwait(false);

    }

    return default(T);

    }

    此后可以使用像下面的代码在重试之前再等待一秒:

    // Download the URL, trying up to three times in case of failure,

    // and delaying for a second between retries

    string pageContents = await RetryOnFault(

    () => DownloadStringAsync(url), 3, () => Task.Delay(1000));

    NeedOnlyOne(只需要一个)

    有时发挥冗余的优势可以提高操作的延迟和成功的机会。思考一下,有多个提供股票报价的Web服务,但在当天的不同时间,每一个服务可能提供不同级别的数量和响应时间。为处理这些情况,我们可以向所有Web服务发送请求,只要获得了任何响应就取消其他请求。我们可以实现一个函数来简化这个启动多个操作,等待任意一个,然后取消其余请求的通用模式:

    public static async Task<T> NeedOnlyOne(

    params Func<CancellationToken,Task<T>> [] functions)

    {

    var cts = new CancellationTokenSource();

    var tasks = (from function in functions

    select function(cts.Token)).ToArray();

    var completed = await Task.WhenAny(tasks).ConfigureAwait(false);

    cts.Cancel();

    foreach(var task in tasks)

    {

    var ignored = task.ContinueWith(

    t => Log(t), TaskContinuationOptions.OnlyOnFaulted);

    }

    return completed;

    }

    然后可以使用该函数来实现我们的例子:

    double currentPrice = await NeedOnlyOne(

    ct => GetCurrentPriceFromServer1Async("msft", ct),

    ct => GetCurrentPriceFromServer2Async("msft", ct),

    ct => GetCurrentPriceFromServer3Async("msft", ct));

    Interleaved(交错)

    当使用非常大的tasks集合时,使用Task.WhenAnyl来支持一个该交错的场景会有一个潜在的性能问题。WhenAny的每次调用会导致每一个task注册一个后续操作,对于N个tasks的调用会在交错的操作的生命周期内产生O(N2)数量级的后续操作。为了解决这个问题,一种方法是使用专注于目标的组合:

    static IEnumerable<Task<T>> Interleaved<T>(IEnumerable<Task<T>> tasks)

    {

    var inputTasks = tasks.ToList();

    var sources = (from _ in Enumerable.Range(0, inputTasks.Count)

    select new TaskCompletionSource<T>()).ToList();

    int nextTaskIndex = -1;

    foreach (var inputTask in inputTasks)

    {

    inputTask.ContinueWith(completed =>

    {

    var source = sources[Interlocked.Increment(ref nextTaskIndex)];

    if (completed.IsFaulted)

    source.TrySetException(completed.Exception.InnerExceptions);

    else if (completed.IsCanceled)

    source.TrySetCanceled();

    else

    source.TrySetResult(completed.Result);

    }, CancellationToken.None,

    TaskContinuationOptions.ExecuteSynchronously,

    TaskScheduler.Default);

    }

    return from source in sources

    select source.Task;

    }

    当tasks完成之后,可以使用这个可以处理tasks的结果,如:

    IEnumerable<Task<int>> tasks = ...;

    foreach(var task in tasks)

    {

    int result = await task;

    }

    WhenAllOrFirstException

    在确定的分散/集中场合,可能会想要等待所有的tasks,除非它们中有错误,在这种情况下,只要一出现异常,你就想要停止等待。我们也可以使用连接器方法来完成,例如:

    public static Task<T[]> WhenAllOrFirstException<T>(IEnumerable<Task<T>> tasks)
    {

        var inputs = tasks.ToList();

        var ce = new CountdownEvent(inputs.Count);

        var tcs = new TaskCompletionSource<T[]>();


        Action<Task> onCompleted = (Task completed) =>
        {
            if (completed.IsFaulted)
                tcs.TrySetException(completed.Exception.InnerExceptions);
            if (ce.Signal() && !tcs.Task.IsCompleted)
                tcs.TrySetResult(inputs.Select(t => t.Result).ToArray());
        };

        foreach (var t in inputs) t.ContinueWith(onCompleted);

        return tcs.Task;
    }

    构建基于Task的数据结构

    除了构建自定义的基于task的连接器的能力之外,在Task和代表异步操作结果以及连接必要的同步的Task<TResult>中使用数据结构可以使它成为非常强大的类型,在此类型上构建的自定义的数据结构可以用在异步情景中。

    AsyncCache(异步缓存)

    Task一个重要的方面是它可以提供给多个消费者,所有的消费者可以等待它,用它注册后续操作,获得结果(Task<TResult>的场合)或异常等等。这使得Task和Task<TResult>完美地集成到了异步缓存基础设施中。这儿是一个小而有力的构建在Task<TResult>之上的异步缓存:

    public class AsyncCache<TKey, TValue>

    {

    private readonly Func<TKey, Task<TValue>> _valueFactory;

    private readonly ConcurrentDictionary<TKey, Lazy<Task<TValue>>> _map;

     

    public AsyncCache(Func<TKey, Task<TValue>> valueFactory)

    {

    if (valueFactory == null) throw new ArgumentNullException("loader");

    _valueFactory = valueFactory;

    _map = new ConcurrentDictionary<TKey, Lazy<Task<TValue>>>();

    }

     

    public Task<TValue> this[TKey key]

    {

    get

    {

    if (key == null) throw new ArgumentNullException("key");

    return _map.GetOrAdd(key, toAdd =>

    new Lazy<Task<TValue>>(() => _valueFactory(toAdd))).Value;

    }

    }

    }

     

    AsyncCache<TKey,TValue>类的构造函数接受一个TKey作为参数,返回Task<TValue>的方法委托。之前从这个cache访问到的任何值都存储在内部的字典中,同时AsyncCache确保每个key只生成一个task,即使并发访问cache。

    使用这个,我们可以构建一个下载web页面的cache,如:

    private AsyncCache<string,string> m_webPages =

    new AsyncCache<string,string>(DownloadStringAsync);

    现在,无论何时我们需要web页面的内容,都可以在异步方法中使用这个,并且AsyncCache会确保我们尽可能同时下载多个页面,缓存该结果。

    private async void btnDownload_Click(object sender, RoutedEventArgs e)

    {

    btnDownload.IsEnabled = false;

    try

    {

    txtContents.Text = await m_webPages["http://www.microsoft.com"];

    }

    finally { btnDownload.IsEnabled = true; }

    }

    AsyncProducerConsumerCollection

    Tasks也用来构建数据结构来协调多个异步活动。思考一个典型的平行设计模式:生产者/消费者,生产者生成消费者消费的数据,并且生产者和消费者可能并发运行(例如,消费者处理生产者之前生产的item1时,生产者在生产item2).对于生产者/消费者,我们始终需要一些数据结构存储生产者创建的任务,为的是可以告知消费者新数据且当它可用时发现它。

    这里有一个简单的构建于tasks之上的数据结构的例子,它使异步方法用作生产者和消费者:

    public class AsyncProducerConsumerCollection<T>

    {

    private readonly Queue<T> m_collection = new Queue<T>();

    private readonly Queue<TaskCompletionSource<T>> m_waiting =

    new Queue<TaskCompletionSource<T>>();

     

    public void Add(T item)

    {

    TaskCompletionSource<T> tcs = null;

    lock (m_collection)

    {

    if (m_waiting.Count > 0) tcs = m_waiting.Dequeue();

    else m_collection.Enqueue(item);

    }

    if (tcs != null) tcs.TrySetResult(item);

    }

     

    public Task<T> Take()

    {

    lock (m_collection)

    {

    if (m_collection.Count > 0)

    {

    return Task.FromResult(m_collection.Dequeue());

    }

    else

    {

    var tcs = new TaskCompletionSource<T>();

    m_waiting.Enqueue(tcs);

    return tcs.Task;

    }

    }

    }

    }

    现在我们可以像下面一样写代码了:

    private static AsyncProducerConsumerCollection<int> m_data = …;

    private static async Task ConsumerAsync()

    {

    while(true)

    {

    int nextItem = await m_data.Take();

    ProcessNextItem(nextItem);

    }

    }

    private static void Produce(int data)

    {

    m_data.Add(data);
    }

     

    返回该系列目录《基于Task的异步模式--全面介绍》


  • 相关阅读:
    redis发布订阅
    redis学习笔记(面试题)
    redis安全 (error) NOAUTH Authentication required
    HDU3001 Travelling —— 状压DP(三进制)
    POJ3616 Milking Time —— DP
    POJ3186 Treats for the Cows —— DP
    HDU1074 Doing Homework —— 状压DP
    POJ1661 Help Jimmy —— DP
    HDU1260 Tickets —— DP
    HDU1176 免费馅饼 —— DP
  • 原文地址:https://www.cnblogs.com/farb/p/4884070.html
Copyright © 2011-2022 走看看