zoukankan      html  css  js  c++  java
  • [转载]Parallel的使用

    对Parallel.Invoke进行控制

      Parallel.Invoke提供了一个重载版本,它可以接受一个ParallelOptions对象作为参数,对Parallel.Invoke的执行进行控制。通过这个对象,我们可以控制并行的最大线程数,各个任务是否取消执行等等。例如,在一个智能化的家中,系统会判断主人是否离开房间,如果主人离开了房间,则自动关闭屋子里的各种电器。利用Parallel.Invoke我们可以实现如下:



    public static void
    PInvokeCancel()
    {
    // 创建取消对象
    CancellationTokenSource cts = new
    CancellationTokenSource();
    // 利用取消对象,创建ParallelOptions
    ParallelOptions
    pOption
    = new ParallelOptions() { CancellationToken = cts.Token
    };
    //
    设置最大线程数

    pOption.MaxDegreeOfParallelism = 2;

    //
    创建一个守护监视进程

    Task.Factory.StartNew(() =>
    {
    Console.WriteLine(
    "Cancellation in 5
    sec.
    ");
    Thread.Sleep(
    5000);
    // 取消,结束任务的执行
    cts.Cancel();
    Console.WriteLine(
    "Canceled
    requested
    ");
    });

    try
    {
    //
    以ParallelOptions作为参数,
    // 调用Parallel.Invoke
    Parallel.Invoke(pOption, () =>
    ShutdownLights(pOption.CancellationToken),
    ()
    =>
    ShutdownComputer(pOption.CancellationToken));

    //输出执行结果
    Console.WriteLine("Lights and computer
    are tuned off.
    ");
    }
    catch (Exception
    e)
    {
    Console.WriteLine(e.Message);
    }
    }

    private static void
    ShutdownLights(CancellationToken token)
    {
    while (!token.IsCancellationRequested)
    {
    Console.WriteLine(
    "Light is on.
    "
    );
    Thread.Sleep(
    1000);
    }

    }
    private static void
    ShutdownComputer(CancellationToken token)
    {
    while (!token.IsCancellationRequested)
    {
    Console.WriteLine(
    "Computer is
    on.
    "
    );
    Thread.Sleep(
    1000);
    }
    }




        
    除了这种方式之外,ParallelOptions更多地应用在取消任务队列中还未来得及执行的任务。当我们限制了最大并发线程数的时候,如果需要通过Parallel.Invoke执行的任务较多,则有可能部分任务在队列中排队而得不到及时的执行,如果到了一定的条件这些任务还没有执行,我们可能取消这些任务。一个恰当的现实生活中的例子就是火车站买票。火车站买票的人很多,但是售票的窗口有限,当到了下班时间后,窗口就不再售票了,也就是剩下的售票任务需要取消掉。我们可以用下面的代码来模拟这样一个场景:



    public static void
    PInvokeCancel()

    {

    // 创建取消对象

    CancellationTokenSource cts
    = new
    CancellationTokenSource();

    //
    利用取消对象,创建ParallelOptions


    ParallelOptions pOption
    = new ParallelOptions()
    { CancellationToken
    = cts.Token };

    //
    设置最大线程数,也就相当于20个售票窗口


    pOption.MaxDegreeOfParallelism
    = 20;

    //
    创建一个守护监视进程

    // 当到下班时间后就取消剩下的售票活动

    Task.Factory.StartNew(()
    =>

    {

    Console.WriteLine(
    "Cancellation in 5
    sec.
    ");

    Thread.Sleep(
    5000);

    //
    取消,结束任务的执行


    cts.Cancel();

    Console.WriteLine(
    "Canceled
    requested
    ");

    });

    try

    {

    // 创建售票活动

    Action[]
    CustomerServices
    = CreateCustomerService(1000);

    //
    以ParallelOptions作为参数,

    //
    调用Parallel.Invoke


    Parallel.Invoke(pOption,
    CustomerServices);

    }

    catch (Exception
    e)

    {

    // 当任务取消后,抛出一个异常

    Console.WriteLine(e.Message);

    }

    }

    //
    创建售票的活动


    static Action[]
    CreateCustomerService(
    int n)

    {

    Action[] result
    = new
    Action[n];

    for (int i = 0; i < n; i++)

    {

    result[i]
    = () =>

    {

    Console.WriteLine(
    "Customer Service
    {0}
    ",
    Task.CurrentId);

    // 模拟售票需要的时间

    Thread.Sleep(
    2000);

    };

    }

    return
    result;

    }



        并行任务之间的同步


      有时候我们在处理并行任务的时候,各个任务之间需要同步,也就是同时执行的并行任务,需要在共同到达某一个状态的后再一共继续执行。我们可以举一个现实生活中的例子。陈良乔,贾玮和单春晖是好朋友,他们相约到电影院看《建国大业》。他们三个住在不同的地方,为了能一起买票进电影院,他们约好先在电影院门口的KFC会合,然后再一起进电影院。这其中就涉及到一个同步的问题:他们需要先在KFC会合。他们是从家里分别到KFC的,但是需要在KFC进行同步,等到三个人都到齐后在完成后后继的动作,进电影院看电影。


      为了完成并行任务之间的同步,.NET
    Framework中提供了一个类Barrier。顾名思义,Barrier就像一个关卡或者是剪票口一样,通过Barrier类,我们可以管理并行任务的执行,完成他们之间的同步。Barrier类的使用非常简单,我们只需要在主线程中声明一个Barrier对象,同时指明需要同步的任务数。然后,在需要进行同步的地方调用Barrier类的SignalAndWait函数就可以了。
    当一个并行任务到达SignalAndWait后,它会暂停执行,等待所有并行任务都到达同步点之后再继续往下执行。下面我们以一个实际的例子,来看看如何利用Barrier类完成看电影的同步问题。



    using System;

    using
    System.Collections.Generic;

    using System.Linq;

    using
    System.Text;

    using System.Threading;

    using
    System.Threading.Tasks;

    namespace
    ParallelBarrier

    {

    class
    Program

    {

    // 用于同步的Barrier对象

    static Barrier
    sync;

    static void Main(string[] args)

    {

    //
    创建Barrier对象,这里我们需要同步

    // 任务有三个

    sync
    =
    new
    Barrier(
    3);

    //
    开始执行并行任务


    var steps
    = new Action[] { ()
    =>
    gotothecinema(
    "陈良乔", TimeSpan.FromSeconds(5) ),

    ()
    =>
    gotothecinema(
    "贾玮", TimeSpan.FromSeconds(2) ),

    ()
    =>
    gotothecinema(
    "单春晖", TimeSpan.FromSeconds(4)
    )};

    Parallel.Invoke(steps);

    Console.ReadKey();

    }

    // 任务

    static void
    gotothecinema(string strName, TimeSpan timeToKFC
    )

    {

    Console.WriteLine(
    "[{0}]
    从家里出发。
    ",
    strName);

    // 从家里到KFC

    Thread.Sleep(timeToKFC);

    Console.WriteLine(
    "[{0}]
    到达KFC。
    ",
    strName);

    // 等待其他人到达

    sync.SignalAndWait();

    //
    同步后,进行后继动作


    Console.WriteLine(
    "[{0}]
    买票进电影院。
    ",
    strName);

    }

    }

    }

    IT168 专稿】书接上回。在前一篇“Visual Studio
    2010对并行计算的支持
    ”文章中,我们介绍了如何利用Parallel.For和Parallel.ForEach函数来并行化for循环和foreach循环。实际上,Parallel.For和Parallel.ForEach函数主要是针对“并行数据”的并行化操作,所谓并行数据,就是整个数据集中数据单元是相互独立的,可以同时进行处理。在实际开发中,我们遇到的可以并行处理的不仅包括“并行数据”,还包括可以同时进行的“并行逻辑”。所谓“并行逻辑”,就是相互独立,可以同时执行的多个任务。比如,程序员陈良乔每天早上要做两件事情:烧水洗脸和锻炼身体。这两件事情就是相互独立可以并行的,也就是说他在烧水的时候可以同时锻炼身体。在以前的单核时代,CPU在同一时间只能完成一件事情,那么陈良乔只能先烧水后锻炼,或者是先锻炼后烧水,这导致他上班总是迟到。进入多核时代,CPU可以在同一时间完成多件事情了,借助.Net
    Framework
    4.0中的Parallel类,我们可以方便地处理“并行逻辑”。现在,程序员陈良乔可以一边锻炼一边烧水,再也没有迟到过了。他逢人便说:“Parallel真是个好东西!自从用了它,我腰也不酸了,背也不疼了,编程更有劲儿了。。。”

      使用Parallel.Invoke处理并行逻辑


      跟Parallel.For函数相似,Parallel.Invoke也是Parallel类的一个静态函数,它可以接受一个Action[]类型的对象作为参数,这个对象,就是我们要执行的任务。系统会根据代码运行的硬件环境,主要是CPU运算核心的个数,自动地进行线程的创建和分配。这有些类似于我们所熟悉的多线程开发,通过为每个线程指定一个线程函数而让多个任务同时进行,只是Parallel.Invoke函数简化了线程的创建和分配等繁琐的动作,我们只需要提供核心的线程函数就可以了。下面我们来看一个实际的例子。在上文中,我们介绍了程序员陈良乔起床的例子,在以前的单核时代,他起床大约是这个样子的:



            // 串行式起床
            private static void
    GetUp()
            {
                Start(
    "GetUp");
               
    // 先烧水
                boil();
               
    // 后锻炼
                exercise();
                End(
    "GetUp");
            }
           

           
    // 锻炼
            private static void
    exercise()
            {
                Console.WriteLine(
    "Exercise");
                Thread.Sleep(
    2000);
                Console.WriteLine(
    "Finish
    Exercise
    ");
            }

           
    // 烧水
            private static void
    boil()
            {
                Console.WriteLine(
    "Boil");
                Thread.Sleep(
    3000);
                Console.WriteLine(
    "Finish
    Boil
    ");
            }

    在单核时代,CPU在同一时间只能做一件事情,所以他只能先烧水,后锻炼,这样显然会耽误时间。一天,他又因为这事而迟到了,老板骂道,“你是猪啊,你不会用Parallel.Invoke一边烧水一边锻炼啊?”于是,有了下面的并行式起床:
           
    // 并行式起床
            private static void
    ParallelGetUp()
            {
                Start(
    "ParallelGetUp");
               
    //
    在烧水的同时,锻炼身体

                var steps = new Action[] { ()
    =>
    boil(), ()
    => exercise()
    };
                Parallel.Invoke(steps);
                End(
    "ParallelGetUp");
            }



        通过Parallel.Invoke函数,我们将一些相互独立的任务同时执行,实现了“并行逻辑”,也大大地提高了应用程序的性能和效率。从下面的截图中,我们可以明显地看出两种方式的差别。串行方式所耗费的时间,是两个步骤的时间总和,而并行方式所耗费的时间,大约是单个任务的耗时最长的哪一个。



      图1 串行和并行的执行情况



      对Parallel.Invoke进行控制


      Parallel.Invoke提供了一个重载版本,它可以接受一个ParallelOptions对象作为参数,对Parallel.Invoke的执行进行控制。通过这个对象,我们可以控制并行的最大线程数,各个任务是否取消执行等等。例如,在一个智能化的家中,系统会判断主人是否离开房间,如果主人离开了房间,则自动关闭屋子里的各种电器。利用Parallel.Invoke我们可以实现如下:



    public static void
    PInvokeCancel()
    {
    // 创建取消对象
    CancellationTokenSource cts = new
    CancellationTokenSource();
    // 利用取消对象,创建ParallelOptions
    ParallelOptions
    pOption
    = new ParallelOptions() { CancellationToken = cts.Token
    };
    //
    设置最大线程数

    pOption.MaxDegreeOfParallelism = 2;

    //
    创建一个守护监视进程

    Task.Factory.StartNew(() =>
    {
    Console.WriteLine(
    "Cancellation in 5
    sec.
    ");
    Thread.Sleep(
    5000);
    // 取消,结束任务的执行
    cts.Cancel();
    Console.WriteLine(
    "Canceled
    requested
    ");
    });

    try
    {
    //
    以ParallelOptions作为参数,
    // 调用Parallel.Invoke
    Parallel.Invoke(pOption, () =>
    ShutdownLights(pOption.CancellationToken),
    ()
    =>
    ShutdownComputer(pOption.CancellationToken));

    //输出执行结果
    Console.WriteLine("Lights and computer
    are tuned off.
    ");
    }
    catch (Exception
    e)
    {
    Console.WriteLine(e.Message);
    }
    }

    private static void
    ShutdownLights(CancellationToken token)
    {
    while (!token.IsCancellationRequested)
    {
    Console.WriteLine(
    "Light is on.
    "
    );
    Thread.Sleep(
    1000);
    }

    }
    private static void
    ShutdownComputer(CancellationToken token)
    {
    while (!token.IsCancellationRequested)
    {
    Console.WriteLine(
    "Computer is
    on.
    "
    );
    Thread.Sleep(
    1000);
    }
    }




        
    除了这种方式之外,ParallelOptions更多地应用在取消任务队列中还未来得及执行的任务。当我们限制了最大并发线程数的时候,如果需要通过Parallel.Invoke执行的任务较多,则有可能部分任务在队列中排队而得不到及时的执行,如果到了一定的条件这些任务还没有执行,我们可能取消这些任务。一个恰当的现实生活中的例子就是火车站买票。火车站买票的人很多,但是售票的窗口有限,当到了下班时间后,窗口就不再售票了,也就是剩下的售票任务需要取消掉。我们可以用下面的代码来模拟这样一个场景:



    public static void
    PInvokeCancel()

      {

      
    // 创建取消对象

      CancellationTokenSource cts
    = new
    CancellationTokenSource();

      
    //
    利用取消对象,创建ParallelOptions


      ParallelOptions pOption
    = new ParallelOptions()
    { CancellationToken
    = cts.Token };

      
    //
    设置最大线程数,也就相当于20个售票窗口


      pOption.MaxDegreeOfParallelism
    = 20;

      
    //
    创建一个守护监视进程

      
    // 当到下班时间后就取消剩下的售票活动

      Task.Factory.StartNew(()
    =>

      {

      Console.WriteLine(
    "Cancellation in 5
    sec.
    ");

      Thread.Sleep(
    5000);

      
    //
    取消,结束任务的执行


      cts.Cancel();

      Console.WriteLine(
    "Canceled
    requested
    ");

      });

      
    try

      {

      
    // 创建售票活动

      Action[]
    CustomerServices
    = CreateCustomerService(1000);

      
    //
    以ParallelOptions作为参数,

      
    //
    调用Parallel.Invoke


      Parallel.Invoke(pOption,
    CustomerServices);

      }

      
    catch (Exception
    e)

      {

      
    // 当任务取消后,抛出一个异常

      Console.WriteLine(e.Message);

      }

      }

      
    //
    创建售票的活动


      
    static Action[]
    CreateCustomerService(
    int n)

      {

      Action[] result
    = new
    Action[n];

      
    for (int i = 0; i < n; i++)

      {

      result[i]
    = () =>

      {

      Console.WriteLine(
    "Customer Service
    {0}
    ",
    Task.CurrentId);

      
    // 模拟售票需要的时间

      Thread.Sleep(
    2000);

      };

      }

      
    return
    result;

      }



        并行任务之间的同步


      有时候我们在处理并行任务的时候,各个任务之间需要同步,也就是同时执行的并行任务,需要在共同到达某一个状态的后再一共继续执行。我们可以举一个现实生活中的例子。陈良乔,贾玮和单春晖是好朋友,他们相约到电影院看《建国大业》。他们三个住在不同的地方,为了能一起买票进电影院,他们约好先在电影院门口的KFC会合,然后再一起进电影院。这其中就涉及到一个同步的问题:他们需要先在KFC会合。他们是从家里分别到KFC的,但是需要在KFC进行同步,等到三个人都到齐后在完成后后继的动作,进电影院看电影。


      为了完成并行任务之间的同步,.NET
    Framework中提供了一个类Barrier。顾名思义,Barrier就像一个关卡或者是剪票口一样,通过Barrier类,我们可以管理并行任务的执行,完成他们之间的同步。Barrier类的使用非常简单,我们只需要在主线程中声明一个Barrier对象,同时指明需要同步的任务数。然后,在需要进行同步的地方调用Barrier类的SignalAndWait函数就可以了。
    当一个并行任务到达SignalAndWait后,它会暂停执行,等待所有并行任务都到达同步点之后再继续往下执行。下面我们以一个实际的例子,来看看如何利用Barrier类完成看电影的同步问题。



    using System;

      using
    System.Collections.Generic;

      using System.Linq;

      using
    System.Text;

      using System.Threading;

      using
    System.Threading.Tasks;

      namespace
    ParallelBarrier

      {

      
    class
    Program

      {

      
    // 用于同步的Barrier对象

      
    static Barrier
    sync;

      
    static void Main(string[] args)

      {

      
    //
    创建Barrier对象,这里我们需要同步

      
    // 任务有三个

      sync
    =
    new
    Barrier(
    3);

      
    //
    开始执行并行任务


      var steps
    = new Action[] { ()
    =>
    gotothecinema(
    "陈良乔", TimeSpan.FromSeconds(5) ),

      ()
    =>
    gotothecinema(
    "贾玮", TimeSpan.FromSeconds(2) ),

      ()
    =>
    gotothecinema(
    "单春晖", TimeSpan.FromSeconds(4)
    )};

      Parallel.Invoke(steps);

      Console.ReadKey();

      }

      
    // 任务

      
    static void
    gotothecinema(string strName, TimeSpan timeToKFC
    )

      {

      Console.WriteLine(
    "[{0}]
    从家里出发。
    ",
    strName);

      
    // 从家里到KFC

      Thread.Sleep(timeToKFC);

      Console.WriteLine(
    "[{0}]
    到达KFC。
    ",
    strName);

      
    // 等待其他人到达

      sync.SignalAndWait();

      
    //
    同步后,进行后继动作


      Console.WriteLine(
    "[{0}]
    买票进电影院。
    ",
    strName);

      }

      }

      }




      在这段代码中,我们首先创建了Barrier对象,因为在这里需要同步的任务有三个,所以创建Barrier对象时是的参数是3。然后就是使用Parallel.Invoke执行并行任务。我们在并行任务gotothecinema中设置了一个同步点,在这里我们调用Barrier对象的SignalAndWait函数,它表示当前任务已经到达同步点并同时等待其他任务到达同步点。当所有任务都到达同步点之后,再继续往下执行。运行上面的程序,我们可以获得这样的输出:



      图2 使用Barrier进行同步


      更复杂的任务之间的同步


      我们在使用Barrier进行并行任务之间的同步时,有这样一个缺陷,我们需要预先知道所有需要同步的并行任务的数目,如果这个数目是随机的,就无法使用Barrier进行任务之间的同步了。并行任务数目不定这种情况很常见。我们还是来看上文中看电影的例子,每场进电影院看电影的观众数目是不固定的,那么退场的观众也是不固定的,甚至还有中途退场的。当所有观众都退场后,我们需要打扫电影院的卫生。这里需要的同步的就是所有观众都退场。针对这种数目不定的多个并行任务,.NET
    Framework提供了CountdownEvent这个类来进行任务之间的同步。


      就像它的名字一样,CountdownEvent基于这样一个简单的规则:当有新的需要同步的任务产生时,就调用AddCount增加它的计数,当有任务到达同步点是,就调用Signal函数减小它的计数,当CountdownEvent的计数为零时,就表示所有需要同步的任务已经完成,可以开始下一步任务了。下面我们利用CountdownEvent来模拟一下观众进场立场的情景。



      1 using
    System;
      2
      3   using System.Collections.Generic;
      4
      5   using
    System.Linq;
      6
      7   using System.Text;
      8
      9   using
    System.Threading;
    10
    11   using System.Threading.Tasks;
    12
    13   namespace
    CountdownEventDemo
    14
    15   {
    16

    17
      //
    观众类,用来表示一位观众

    18
    19   class
    Customer
    20
    21   {
    22

    23
      public
    Customer(
    int nID)
    24

    25
      {
    26
    27   m_nID
    =
    nID;
    28
    29   }
    30

    31
      // 观众的ID
    32
    33   public int m_nID;
    34

    35
      }
    36
    37   class
    Program
    38
    39   {
    40

    41
      static void Main(string[]
    args)
    42
    43   {
    44

    45
      //
    创建CountdownEvent同步对象

    46
    47   using (var
    countdown
    = new CountdownEvent(1))
    48
    49   {
    50

    51
      //
    产生一个随机数,表示观众的数目

    52
    53   Random
    countRandom
    = new Random(DateTime.Now.Millisecond);
    54
    55   int nCount
    =
    countRandom.Next(
    10);
    56
    57   // 构造每一位观众看电影的任务
    58
    59   Action[] seeafilm = new Action[ nCount
    ];
    60
    61   for (int i = 0; i < nCount; i++)
    62
    63   {
    64

    65
      //
    构造Customer对象,表示观众

    66
    67   Customer
    currentCustomer
    = new Customer( i+1 );
    68

    69
      seeafilm[i] = () =>
    70
    71   {
    72

    73
      // 观众进场
    74
    75   countdown.AddCount();
    76
    77   Console.WriteLine("观众 {0}
    进场。
    ",
    currentCustomer.m_nID);
    78
    79   // 模拟看电影的时间
    80
    81   Thread.Sleep(countRandom.Next(3000,6000));
    82

    83
      // 观众退场
    84
    85   countdown.Signal();
    86
    87   Console.WriteLine("观众 {0}
    退场。
    ",
    currentCustomer.m_nID);
    88
    89   };
    90

    91
      }
    92
    93   //并行执行任务
    94
    95   Parallel.Invoke( seeafilm );
    96
    97   //
    在此同步,最后CountdownEvent的计数变为零

    98
    99   countdown.Signal();
    100
    101   countdown.Wait();
    102
    103   }
    104

    105
      Console.WriteLine("所有观众退场,开始打扫卫生。");
    106
    107   Console.ReadKey();
    108
    109   }
    110

    111

        在这段代码中,我们使用CountdownEvent进行随机个数任务之间的同步。最后,我们可以得到这样的输出。





      图3 使用CountdownEvent进行同步


      通过Parallel.Invoke函数,我们可以轻松地将相互独立的任务并行执行,同时通过Barrier和CountdownEvent类进行任务之间的同步。这种并行计算的开发方式,比以前那种基于线程的并行计算开发方式简便很多,解放了程序员的脑袋,让他们可以把更多的脑力放到业务逻辑问题的解决之上。


      使用Parallel类,多快好省地开发并行计算应用程序。

  • 相关阅读:
    正则表达式替换所有符合条件的字符
    关于jquery ajax不执行success回调函数
    关于jquery绑定事件执行两次
    同步选中所有checkbox
    Jquery动态改变my97datepicker的日期形式
    关于button在td中时,zclip复制不能的问题
    关于各种高度的获取方法
    慎用--skip-grant-tables命令
    Mysql中判断是否存在
    前端html
  • 原文地址:https://www.cnblogs.com/fx2008/p/2409609.html
Copyright © 2011-2022 走看看