zoukankan      html  css  js  c++  java
  • Effective C# 学习笔记(三十六) 理解PLINQ对IO上瓶颈的处理方式

    IO操作一般会成为程序性能的瓶颈,我们可以用并行的方式来提高该方面的操作效率。

    首先看一个例子,下面是一个从地址列表中下载数据的IO操作循环代码片段。

     

    //顺序执行IO操作

    foreach (var url in urls)

    {

    var result = new WebClient().DownloadData(url);

    UseResult(result);

    }

     

    使用Parallel.ForEach()

    您可以使用Parallel.ForEach()方法来并行处理上面的IO操作。该方法会动态分配创建线程,在IO操作阻塞线程运行时,它会创建更多线程来处理IO操作,而当过多的线程处于活跃状态时,它又会自动地减少线程,来减少更多的线程间调度的工作。

     

    //Parallel.ForEach()方法并行操作IO

    Parallel.ForEach(urls, url =>

    {

    var result = new WebClient().DownloadData(url);

    UseResult(result);

    });

     

    使用PLINQAsParallel()方法

    您还可以使用PLINQAsParallel()方法来并行处理IO操作。这种操作创建固定个数的线程来处理并发,您可以使用WithDegreeOfParallelism() 来控制线程的个数。

     

    //使用PLINQ并行处理IO

    var results = from url in urls.AsParallel()

            select new WebClient().DownloadData(url);

    results.ForAll(result => UseResult(result));

     

    其实以上的两种并行处理方式并不是真正意义上的并行操作,其只对运行的结果的处理并没有做到真正的异步并行处理。继续往下看

     

    下面的代码通过ParallellIO类的RunAsync方法来抽象异步IO处理的起始、结束回调的方法,以实现对Url的资源的异步操作。

     

    /// <summary>

    /// 下载资源类

    /// </summary>

    public class DownloadResource

    {

        /// <summary>

        /// 下载地址

        /// </summary>

        public string URL { get; set; }

     

        /// <summary>

        /// 下载内容

        /// </summary>

        public byte[] Content { get; set; }

    }

     

              static class ParallelIO

        {

            /// <summary>

            /// 并行运行

            /// </summary>

            /// <typeparam name="T">枚举集合数据项类型</typeparam>

            /// <typeparam name="TResult">任务线程的返回值</typeparam>

            /// <param name="taskParms">枚举结合对象</param>

            /// <param name="taskStarter">起始处理方法</param>

            /// <param name="taskFinisher">终止处理方法</param>

            public static void RunAsync<T, TResult>(

                this IEnumerable<T> taskParms,

                Func<T, Task<TResult>> taskStarter,

                Action<Task<TResult>> taskFinisher)

            {

                taskParms.Select(parm => taskStarter(parm)).

                    AsParallel().

                    ForAll(t => t.ContinueWith(t2 => taskFinisher(t2)));

            }

            //

            /// <summary>

            /// 处理完成时调用的handler

            /// </summary>

            /// <param name="downLoadResourceTask">下载资源的线程</param>

            public static void finishDownload(Task<DownloadResource> downLoadResourceTask)

            {

                byte[] downloadedContent = downLoadResourceTask.Result.Content;

                Console.WriteLine("Read {0} bytes from {1}",

                    downloadedContent.Length.ToString(),

                    downLoadResourceTask.Result.URL);

            }

     

            /// <summary>

            /// 处理开始时调用的hander

            /// </summary>

            /// <param name="url">下载地址</param>

            /// <returns>处理下载的任务对象</returns>

            public static Task<DownloadResource> startDownload(string url)

            {

                var tcs = new TaskCompletionSource<DownloadResource>();

                var wc = new WebClient();

                wc.DownloadDataCompleted += (sender, e) =>

                {

                    if (e.UserState == tcs)

                    {

                        if (e.Cancelled)

                            tcs.TrySetCanceled();

                        else if (e.Error != null)

                            tcs.TrySetException(e.Error);

                        else

                            tcs.TrySetResult(new DownloadResource()

                            {

                                Content = e.Result,

                                URL = url

                            });

                    }

                };

                wc.DownloadDataAsync(new Uri(url), tcs); //webClient使用了EAP异步处理下载

                return tcs.Task;

            }

        }

     

    Console.WriteLine("---------------ParallelIO Begin--------------");

    string[] urls = { "http://www.google.com", "http://www.bing.com" ,"http://www.baidu.com"};

    ParallelIO.RunAsync<string, DownloadResource>(urls, ParallelIO.startDownload, ParallelIO.finishDownload);

    Console.WriteLine("---------------ParallelIO End--------------");

     

     

    WebClient 使用了Event-based Asynchronous Pattern (EAP)(基于事件的异步模式)。也就是说你需要先注册一个handler到一个事件上,然后在一个异步操作完成后,这个事件会激活调用你的方法。 当下载事件开始时,startDownLoad()将任务的完成Task对象保存在TaskCompletionSource中,下载也就是开始了。而startDownload()方法将在task完成时返回Task对象。当下载完成时,触发DownloadDataCompleted事件,而事件handler会为TaskCompletionSoure设置完成状态。标识嵌入的任务已完成。

     

    除了以上的EAP处理机制外,.NET 库还提供了 Asynchronous Programming Model (APM) pattern来处理其他的异步操作。在这种模式下,你先调用 Begin操作名()方法,进行起始操作,该方法返回 IAsyncResult类型的对象。当操作完成时,你调用End操作名()方法,传入IAsyncResult对象,处理完成时的逻辑。你可以使用Task<TResult> .Factory.FromAsync()方法来实现以上模式的调用。

  • 相关阅读:
    C语言寒假大作战01
    C语言I作业12—学期总结
    C语言I博客作业11
    C语言I博客作业10
    浅谈js模块加载方式(初级)
    浅谈.net的后台校验
    api接口访问限制
    系统操作日志表单形式构建
    RedisUtil(未完,持续更新中....)
    定时处理组件---Quartz.net
  • 原文地址:https://www.cnblogs.com/haokaibo/p/2115063.html
Copyright © 2011-2022 走看看