zoukankan      html  css  js  c++  java
  • .Net4.0并行库介绍——通过TaskFactory.FromAsync简化APM

    异步执行 I/O 密集型操作是生产高响应和可伸缩应用程序及组件的关键。可让您使用极少量的线程来执行大量的工作,而无需阻止任何线程。然而异步编程却有些麻烦,许多程序员不愿意去做它。

    网上有不少通过lambda 表达式和AsyncEnumerator 等来实现简化异步编程的方法,这些方法也确实行之有效,但在.net 4.0中,我们又多了一种选择——通过TaskFactory.FromAsync简化APM。

    TaskFactory.FromAsync这个方法非常简单,通过它可以把一个异步的任务转换为一个Task,首先我们来看一个简单的例子吧:

        static IEnumerable<Task> CopyStreamAsync(Stream input, Stream output)
        {
            var buffer = new byte[0x2000];
            while (true)
            {
                var readTask = Task<int>.Factory.FromAsync(input.BeginRead, input.EndRead, buffer, 0, buffer.Length, null);
                yield return readTask;

                if (readTask.Result == 0)
                    break;

                yield return Task.Factory.FromAsync(output.BeginWrite, output.EndWrite, buffer, 0, buffer.Length, null);

            }
        }

    这个例子通过TaskFactory.FromAsync把一系列异步操作转换为了一个任务列。虽然这些都是异步操作,但在函数中却和同步操作一样直观,十分简单而清晰。

    转换为了的任务列后,我们就需要来执行这一系列任务,最简单的方法如下:

        foreach (var task in CopyStreamAsync(input, output))
        {
            task.Wait();
        }

    这种方式虽然直接有效,但它却是一种阻塞式的操作,没有达到异步的目的,我们一般可以通过如下方式把这个任务列转换为一个任务,从而实现异步执行。

        public static Task Iterate(this TaskFactory factory, IEnumerable<Task> asyncIterator)
        {
            var scheduler = factory.Scheduler ?? TaskScheduler.Current;

            // Get an enumerator from the enumerable
            var enumerator = asyncIterator.GetEnumerator();
            if (enumerator == null) throw new InvalidOperationException();

            // Create the task to be returned to the caller. And ensure
            // that when everything is done, the enumerator is cleaned up.
            var trs = new TaskCompletionSource<object>(factory.CreationOptions);
            trs.Task.ContinueWith(_ => enumerator.Dispose(), scheduler);

            // This will be called every time more work can be done.
            Action<Task> recursiveBody = null;
            recursiveBody = antecedent =>
            {
                try
                {
                    // If the previous task completed with any exceptions, bail
                    if (antecedent != null && antecedent.IsFaulted)
                        trs.TrySetException(antecedent.Exception);

                    else if (trs.Task.IsCanceled) trs.TrySetCanceled();

                    else if (enumerator.MoveNext())
                        enumerator.Current.ContinueWith(recursiveBody, scheduler);

                    // Otherwise, we're done!
                    else trs.TrySetResult(null);
                }
                // If MoveNext throws an exception, propagate that to the user
                catch (Exception exc) { trs.TrySetException(exc); }
            };

            // Get things started by launching the first task
            factory.StartNew(_ => recursiveBody(null), scheduler);

            // Return the representative task to the user
            return trs.Task;
        }

    这个函数我是摘录自ParallelProgrammingSamples中的,里面还有好几种其它的调用形式,可以根据需要选择合适的方法。

  • 相关阅读:
    Python实现二叉树的遍历
    Selenium模拟浏览器初识
    sklearn中的随机森林
    scrapy爬虫事件以及数据保存为txt,json,mysql
    mac安装mysql及终端操作mysql与pycharm的数据库可视化
    爬虫框架scrapy的基本内容
    多进程实例——爬取百度贴吧
    Python多进程并发操作进程池Pool
    【转】数据库设计三大范式
    【转】栈和堆的区别
  • 原文地址:https://www.cnblogs.com/TianFang/p/1597953.html
Copyright © 2011-2022 走看看