zoukankan      html  css  js  c++  java
  • 并行开发 —— 第五篇 同步机制(下)

       

         承接上一篇,我们继续说下.net4.0中的同步机制,是的,当出现了并行计算的时候,轻量级别的同步机制应运而生,在信号量这一块

    出现了一系列的轻量级,今天继续介绍下面的3个信号量 CountdownEvent,SemaphoreSlim,ManualResetEventSlim。

    一:CountdownEvent

         这种采用信号状态的同步基元非常适合在动态的fork,join的场景,它采用“信号计数”的方式,就比如这样,一个麻将桌只能容纳4个

    人打麻将,如果后来的人也想搓一把碰碰运气,那么他必须等待直到麻将桌上的人走掉一位。好,这就是简单的信号计数机制,从技术角

    度上来说它是定义了最多能够进入关键代码的线程数。

         但是CountdownEvent更牛X之处在于我们可以动态的改变“信号计数”的大小,比如一会儿能够容纳8个线程,一下又4个,一下又10个,

    这样做有什么好处呢?还是承接上一篇文章所说的,比如一个任务需要加载1w条数据,那么可能出现这种情况。

    加载User表:         根据user表的数据量,我们需要开5个task。

    加载Product表:    产品表数据相对比较多,计算之后需要开8个task。

    加载order表:       由于我的网站订单丰富,计算之后需要开12个task。

    先前的文章也说了,我们需要协调task在多阶段加载数据的同步问题,那么如何应对这里的5,8,12,幸好,CountdownEvent给我们提供了

    可以动态修改的解决方案。

      1 using System.Collections.Concurrent;
    2 using System.Threading.Tasks;
    3 using System;
    4 using System.Diagnostics;
    5 using System.Collections.Generic;
    6 using System.Linq;
    7 using System.Threading;
    8
    9 class Program
    10 {
    11 //默认的容纳大小为“硬件线程“数
    12 static CountdownEvent cde = new CountdownEvent(Environment.ProcessorCount);
    13
    14 static void Main(string[] args)
    15 {
    16 //加载User表需要5个任务
    17 var userTaskCount = 5;
    18
    19 //重置信号
    20 cde.Reset(userTaskCount);
    21
    22 for (int i = 0; i < userTaskCount; i++)
    23 {
    24 Task.Factory.StartNew((obj) =>
    25 {
    26 LoadUser(obj);
    27 }, i);
    28 }
    29
    30 //等待所有任务执行完毕
    31 cde.Wait();
    32
    33 Console.WriteLine("\nUser表数据全部加载完毕!\n");
    34
    35 //加载product需要8个任务
    36 var productTaskCount = 8;
    37
    38 //重置信号
    39 cde.Reset(productTaskCount);
    40
    41 for (int i = 0; i < productTaskCount; i++)
    42 {
    43 Task.Factory.StartNew((obj) =>
    44 {
    45 LoadProduct(obj);
    46 }, i);
    47 }
    48
    49 cde.Wait();
    50
    51 Console.WriteLine("\nProduct表数据全部加载完毕!\n");
    52
    53 //加载order需要12个任务
    54 var orderTaskCount = 12;
    55
    56 //重置信号
    57 cde.Reset(orderTaskCount);
    58
    59 for (int i = 0; i < orderTaskCount; i++)
    60 {
    61 Task.Factory.StartNew((obj) =>
    62 {
    63 LoadOrder(obj);
    64 }, i);
    65 }
    66
    67 cde.Wait();
    68
    69 Console.WriteLine("\nOrder表数据全部加载完毕!\n");
    70
    71 Console.WriteLine("\n(*^__^*) 嘻嘻,恭喜你,数据全部加载完毕\n");
    72
    73 Console.Read();
    74 }
    75
    76 static void LoadUser(object obj)
    77 {
    78 try
    79 {
    80 Console.WriteLine("当前任务:{0}正在加载User部分数据!", obj);
    81 }
    82 finally
    83 {
    84 cde.Signal();
    85 }
    86 }
    87
    88 static void LoadProduct(object obj)
    89 {
    90 try
    91 {
    92 Console.WriteLine("当前任务:{0}正在加载Product部分数据!", obj);
    93 }
    94 finally
    95 {
    96 cde.Signal();
    97 }
    98 }
    99
    100 static void LoadOrder(object obj)
    101 {
    102 try
    103 {
    104 Console.WriteLine("当前任务:{0}正在加载Order部分数据!", obj);
    105 }
    106 finally
    107 {
    108 cde.Signal();
    109 }
    110 }
    111 }


    我们看到有两个主要方法:Wait和Signal。每调用一次Signal相当于麻将桌上走了一个人,直到所有人都搓过麻将wait才给放行,这里同样要

    注意也就是“超时“问题的存在性,尤其是在并行计算中,轻量级别给我们提供了”取消标记“的机制,这是在重量级别中不存在的,比如下面的

    重载public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken),具体使用可以看前一篇文章的介绍。

    二:SemaphoreSlim

         在.net 4.0之前,framework中有一个重量级的Semaphore,人家可以跨进程同步,咋轻量级不行,msdn对它的解释为:限制可同时访问

    某一资源或资源池的线程数。关于它的重量级demo,我的上一个系列有演示,你也可以理解为CountdownEvent是 SemaphoreSlim的功能加

    强版,好了,举一个轻量级使用的例子。

     1 using System.Collections.Concurrent;
    2 using System.Threading.Tasks;
    3 using System;
    4 using System.Diagnostics;
    5 using System.Collections.Generic;
    6 using System.Linq;
    7 using System.Threading;
    8
    9 class Program
    10 {
    11 static SemaphoreSlim slim = new SemaphoreSlim(Environment.ProcessorCount, 12);
    12
    13 static void Main(string[] args)
    14 {
    15 for (int i = 0; i < 12; i++)
    16 {
    17 Task.Factory.StartNew((obj) =>
    18 {
    19 Run(obj);
    20 }, i);
    21 }
    22
    23 Console.Read();
    24 }
    25
    26 static void Run(object obj)
    27 {
    28 slim.Wait();
    29
    30 Console.WriteLine("当前时间:{0}任务 {1}已经进入。", DateTime.Now, obj);
    31
    32 //这里busy3s中
    33 Thread.Sleep(3000);
    34
    35 slim.Release();
    36 }
    37 }


    同样,防止死锁的情况,我们需要知道”超时和取消标记“的解决方案,像SemaphoreSlim这种定死的”线程请求范围“,其实是降低了扩展性,

    所以说,试水有风险使用需谨慎,在觉得有必要的时候使用它。

    三: ManualResetEventSlim

         相信它的重量级别大家都知道是ManualReset,而这个轻量级别采用的是"自旋等待“+”内核等待“,也就是说先采用”自旋等待的方式“等待,

    直到另一个任务调用set方法来释放它。如果迟迟等不到释放,那么任务就会进入基于内核的等待,所以说如果我们知道等待的时间比较短,采

    用轻量级的版本会具有更好的性能,原理大概就这样,下面举个小例子。

     1 using System.Collections.Concurrent;
    2 using System.Threading.Tasks;
    3 using System;
    4 using System.Diagnostics;
    5 using System.Collections.Generic;
    6 using System.Linq;
    7 using System.Threading;
    8
    9 class Program
    10 {
    11 //2047:自旋的次数
    12 static ManualResetEventSlim mrs = new ManualResetEventSlim(false, 2047);
    13
    14 static void Main(string[] args)
    15 {
    16
    17 for (int i = 0; i < 12; i++)
    18 {
    19 Task.Factory.StartNew((obj) =>
    20 {
    21 Run(obj);
    22 }, i);
    23 }
    24
    25 Console.WriteLine("当前时间:{0}我是主线程{1},你们这些任务都等2s执行吧:\n",
    26 DateTime.Now,
    27 Thread.CurrentThread.ManagedThreadId);
    28 Thread.Sleep(2000);
    29
    30 mrs.Set();
    31
    32 Console.Read();
    33 }
    34
    35 static void Run(object obj)
    36 {
    37 mrs.Wait();
    38
    39 Console.WriteLine("当前时间:{0}任务 {1}已经进入。", DateTime.Now, obj);
    40 }
    41 }

  • 相关阅读:
    python 的基础 学习 第六天 基础数据类型的操作方法 字典
    python 的基础 学习 第五天 基础数据类型的操作方法
    python 的基础 学习 第四天 基础数据类型
    ASP.NET MVC 入门8、ModelState与数据验证
    ASP.NET MVC 入门7、Hellper与数据的提交与绑定
    ASP.NET MVC 入门6、TempData
    ASP.NET MVC 入门5、View与ViewData
    ASP.NET MVC 入门4、Controller与Action
    ASP.NET MVC 入门3、Routing
    ASP.NET MVC 入门2、项目的目录结构与核心的DLL
  • 原文地址:https://www.cnblogs.com/ShaYeBlog/p/2682093.html
Copyright © 2011-2022 走看看