zoukankan      html  css  js  c++  java
  • 8天玩转并行开发——第四天 同步机制(上)

       在并行计算中,不可避免的会碰到多个任务共享变量,实例,集合。虽然task自带了两个方法:task.ContinueWith()和Task.Factory

    .ContinueWhenAll()来实现任务串行化,但是这些简单的方法远远不能满足我们实际的开发需要,从.net 4.0开始,类库给我们提供了很多

    的类来帮助我们简化并行计算中复杂的数据同步问题。

    大体上分为二种:

    ①   并发集合类:           这个在先前的文章中也用到了,他们的出现不再让我们过多的关注同步细节。

    ②  轻量级同步机制:      相对于老版本中那些所谓的重量级同步机制而言,新的机制更加节省cpu的额外开销。

    关于并发集合类没什么好讲的,如果大家熟悉非线程安全的集合,那么这些并发的集合对你来说小菜一碟,这一篇和下一篇我们仔细来玩玩这

    些轻量级的同步机制。

    一:Barrier(屏障同步)

    1:基本概念

        msdn对它的解释是:使多个任务能够采用并行方式依据某种算法在多个阶段中协同工作。乍一看有点不懂,没关系,我们采取提干法。

    ”多个任务“,”多个阶段”,“协同”,仔细想想知道了,下一阶段的执行必须等待上一个阶段中多task全部执行完,那么我们实际中有这样

    的需求吗?当然有的,比如我们数据库中有100w条数据需要导入excel,为了在数据库中加速load,我们需要开多个任务去跑,比如这

    里的4个task,要想load产品表,必须等4个task都跑完用户表才行,那么你有什么办法可以让task为了你两肋插刀呢?它就是Barrier。

    好,我们知道barrier叫做屏障,就像下图中的“红色线”,如果我们的屏障设为4个task就认为已经满了的话,那么执行中先到的task必须等待

    后到的task,通知方式也就是barrier.SignalAndWait(),屏障中线程设置操作为new Barrier(4,(i)=>{})。

    啰嗦了半天,还是上下代码说话:

    复制代码
     1 using System.Collections.Concurrent;
    2 using System.Threading.Tasks;
    3 using System;
    4 using System.Diagnostics;
    5 using System.Collections.Generic;
    6 using System.Linq;
    7 using System.Threading;
    8
    9 class Program
    10 {
    11 //四个task执行
    12 static Task[] tasks = new Task[4];
    13
    14 static Barrier barrier = null;
    15
    16 static void Main(string[] args)
    17 {
    18 barrier = new Barrier(tasks.Length, (i) =>
    19 {
    20 Console.WriteLine("**********************************************************");
    21 Console.WriteLine(" 屏障中当前阶段编号:{0} ", i.CurrentPhaseNumber);
    22 Console.WriteLine("**********************************************************");
    23 });
    24
    25 for (int j = 0; j < tasks.Length; j++)
    26 {
    27 tasks[j] = Task.Factory.StartNew((obj) =>
    28 {
    29 var single = Convert.ToInt32(obj);
    30
    31 LoadUser(single);
    32 barrier.SignalAndWait();
    33
    34 LoadProduct(single);
    35 barrier.SignalAndWait();
    36
    37 LoadOrder(single);
    38 barrier.SignalAndWait();
    39 }, j);
    40 }
    41
    42 Task.WaitAll(tasks);
    43
    44 Console.WriteLine("指定数据库中所有数据已经加载完毕!");
    45
    46 Console.Read();
    47 }
    48
    49 static void LoadUser(int num)
    50 {
    51 Console.WriteLine("当前任务:{0}正在加载User部分数据!", num);
    52 }
    53
    54 static void LoadProduct(int num)
    55 {
    56 Console.WriteLine("当前任务:{0}正在加载Product部分数据!", num);
    57 }
    58
    59 static void LoadOrder(int num)
    60 {
    61 Console.WriteLine("当前任务:{0}正在加载Order部分数据!", num);
    62 }
    63 }
    复制代码


    2:死锁问题

        先前的例子我们也知道,屏障必须等待4个task通过SignalAndWait()来告知自己已经到达,当4个task全部达到后,我们可以通过

    barrier.ParticipantsRemaining来获取task到达状态,那么如果有一个task久久不能到达那会是怎样的情景呢?好,我举个例子。

    复制代码
     1 using System.Collections.Concurrent;
    2 using System.Threading.Tasks;
    3 using System;
    4 using System.Diagnostics;
    5 using System.Collections.Generic;
    6 using System.Linq;
    7 using System.Threading;
    8
    9 class Program
    10 {
    11 //四个task执行
    12 static Task[] tasks = new Task[4];
    13
    14 static Barrier barrier = null;
    15
    16 static void Main(string[] args)
    17 {
    18 barrier = new Barrier(tasks.Length, (i) =>
    19 {
    20 Console.WriteLine("**********************************************************");
    21 Console.WriteLine(" 屏障中当前阶段编号:{0} ", i.CurrentPhaseNumber);
    22 Console.WriteLine("**********************************************************");
    23 });
    24
    25 for (int j = 0; j < tasks.Length; j++)
    26 {
    27 tasks[j] = Task.Factory.StartNew((obj) =>
    28 {
    29 var single = Convert.ToInt32(obj);
    30
    31 LoadUser(single);
    32 barrier.SignalAndWait();
    33
    34 LoadProduct(single);
    35 barrier.SignalAndWait();
    36
    37 LoadOrder(single);
    38 barrier.SignalAndWait();
    39
    40 }, j);
    41 }
    42
    43 Task.WaitAll(tasks);
    44
    45 barrier.Dispose();
    46
    47 Console.WriteLine("指定数据库中所有数据已经加载完毕!");
    48
    49 Console.Read();
    50 }
    51
    52 static void LoadUser(int num)
    53 {
    54 Console.WriteLine(" 当前任务:{0}正在加载User部分数据!", num);
    55
    56 if (num == 0)
    57 {
    58 //num=0:表示0号任务
    59 //barrier.ParticipantsRemaining == 0:表示所有task到达屏障才会退出
    60 // SpinWait.SpinUntil: 自旋锁,相当于死循环
    61 SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0);
    62 }
    63 }
    64
    65 static void LoadProduct(int num)
    66 {
    67 Console.WriteLine("当前任务:{0}正在加载Product部分数据!", num);
    68 }
    69
    70 static void LoadOrder(int num)
    71 {
    72 Console.WriteLine("当前任务:{0}正在加载Order部分数据!", num);
    73 }
    74 }
    复制代码

    我们发现程序在加载User表的时候卡住了,出现了类似死循环,这句SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0)中

    的ParticipantsRemaining==0 永远也不能成立,导致task0永远都不能退出,然而barrier还在一直等待task0调用SignalAndWait来结束屏障。

    结果就是造成了相互等待的尴尬局面,我们下个断点看看情况。

    3:超时机制

        当我们coding的时候遇到了这种问题还是很纠结的,所以我们必须引入一种“超时机制”,如果在指定的时候内所有的参与者(task)都

    没有到达屏障的话,我们就需要取消这些参与者的后续执行,幸好SignalAndWait给我们提供了超时的重载,为了能够取消后续执行,我们

    还要采用CancellationToken机制。

    复制代码
      1 using System.Collections.Concurrent;
    2 using System.Threading.Tasks;
    3 using System;
    4 using System.Diagnostics;
    5 using System.Collections.Generic;
    6 using System.Linq;
    7 using System.Threading;
    8
    9 class Program
    10 {
    11 //四个task执行
    12 static Task[] tasks = new Task[4];
    13
    14 static Barrier barrier = null;
    15
    16 static void Main(string[] args)
    17 {
    18 CancellationTokenSource cts = new CancellationTokenSource();
    19
    20 CancellationToken ct = cts.Token;
    21
    22 barrier = new Barrier(tasks.Length, (i) =>
    23 {
    24 Console.WriteLine("**********************************************************");
    25 Console.WriteLine(" 屏障中当前阶段编号:{0} ", i.CurrentPhaseNumber);
    26 Console.WriteLine("**********************************************************");
    27 });
    28
    29 for (int j = 0; j < tasks.Length; j++)
    30 {
    31 tasks[j] = Task.Factory.StartNew((obj) =>
    32 {
    33 var single = Convert.ToInt32(obj);
    34
    35 LoadUser(single);
    36
    37 if (!barrier.SignalAndWait(2000))
    38 {
    39 //抛出异常,取消后面加载的执行
    40 throw new OperationCanceledException(string.Format("我是当前任务{0},我抛出异常了!", single), ct);
    41 }
    42
    43 LoadProduct(single);
    44 barrier.SignalAndWait();
    45
    46 LoadOrder(single);
    47 barrier.SignalAndWait();
    48
    49 }, j, ct);
    50 }
    51
    52 //等待所有tasks 4s
    53 Task.WaitAll(tasks, 4000);
    54
    55 try
    56 {
    57 for (int i = 0; i < tasks.Length; i++)
    58 {
    59 if (tasks[i].Status == TaskStatus.Faulted)
    60 {
    61 //获取task中的异常
    62 foreach (var single in tasks[i].Exception.InnerExceptions)
    63 {
    64 Console.WriteLine(single.Message);
    65 }
    66 }
    67 }
    68
    69 barrier.Dispose();
    70 }
    71 catch (AggregateException e)
    72 {
    73 Console.WriteLine("我是总异常:{0}", e.Message);
    74 }
    75
    76 Console.Read();
    77 }
    78
    79 static void LoadUser(int num)
    80 {
    81 Console.WriteLine(" 当前任务:{0}正在加载User部分数据!", num);
    82
    83 if (num == 0)
    84 {
    85 //自旋转5s
    86 if (!SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0, 5000))
    87 return;
    88 }
    89
    90 Console.WriteLine("当前任务:{0}正在加载User数据完毕!", num);
    91 }
    92
    93 static void LoadProduct(int num)
    94 {
    95 Console.WriteLine("当前任务:{0}正在加载Product部分数据!", num);
    96 }
    97
    98 static void LoadOrder(int num)
    99 {
    100 Console.WriteLine("当前任务:{0}正在加载Order部分数据!", num);
    101 }
    102 }
    复制代码


    二:spinLock(自旋锁)

        我们初识多线程或者多任务时,第一个想到的同步方法就是使用lock或者Monitor,然而在4.0 之后给我们提供了另一把武器spinLock,

    如果你的任务持有锁的时间非常短,具体短到什么时候msdn也没有给我们具体的答案,但是有一点值得确定的时,如果持有锁的时候比较

    短,那么它比那些重量级别的Monitor具有更小的性能开销,它的用法跟Monitor很相似,下面举个例子,Add2方法采用自旋锁。

    复制代码
     1 using System.Collections.Concurrent;
    2 using System.Threading.Tasks;
    3 using System;
    4 using System.Diagnostics;
    5 using System.Collections.Generic;
    6 using System.Linq;
    7 using System.Threading;
    8
    9 class Program
    10 {
    11 static SpinLock slock = new SpinLock(false);
    12
    13 static int sum1 = 0;
    14
    15 static int sum2 = 0;
    16
    17 static void Main(string[] args)
    18 {
    19 Task[] tasks = new Task[100];
    20
    21 for (int i = 1; i <= 100; i++)
    22 {
    23 tasks[i - 1] = Task.Factory.StartNew((num) =>
    24 {
    25 Add1((int)num);
    26
    27 Add2((int)num);
    28
    29 }, i);
    30 }
    31
    32 Task.WaitAll(tasks);
    33
    34 Console.WriteLine("Add1数字总和:{0}", sum1);
    35
    36 Console.WriteLine("Add2数字总和:{0}", sum2);
    37
    38 Console.Read();
    39 }
    40
    41 //无锁
    42 static void Add1(int num)
    43 {
    44 Thread.Sleep(100);
    45
    46 sum1 += num;
    47 }
    48
    49 //自旋锁
    50 static void Add2(int num)
    51 {
    52 bool lockTaken = false;
    53
    54 Thread.Sleep(100);
    55
    56 try
    57 {
    58 slock.Enter(ref lockTaken);
    59 sum2 += num;
    60 }
    61 finally
    62 {
    63 if (lockTaken)
    64 slock.Exit(false);
    65 }
    66 }
    67 }
    复制代码

     
    分类: 并行开发
  • 相关阅读:
    STL函数对象和Lambda表达式
    STL算法之排序算法
    STL 算法
    STL源码剖析---根据最新版本的g++4.9.0(支持C++11)的修订(1)空间配置器
    STL Iterator的里里外外(一)?
    STL对比解说——关联容器
    Select单进程非阻塞TCP echo服务器
    TCP建立(3次握手)与终止(4次挥手)
    TIME_WAIT状态
    C标准I/O库
  • 原文地址:https://www.cnblogs.com/Jeely/p/10999336.html
Copyright © 2011-2022 走看看