zoukankan      html  css  js  c++  java
  • C#线程同步--限量使用

    问题抽象:当某一资源同一时刻允许一定数量的线程使用的时候,需要有个机制来阻塞多余的线程,直到资源再次变得可用。
    线程同步方案:Semaphore、SemaphoreSlim、CountdownEvent
    方案特性:限量供应;除所有者外,其他人无条件等待;先到先得,没有先后顺序

    1、Semaphore类
          用于控制线程的访问数量,默认的构造函数为initialCount和maximumCount,表示默认设置的信号量个数和最大信号量个数。当你WaitOne的时候,信号量自减,当Release的时候,信号量自增,然而当信号量为0的时候,后续的线程就不能拿到WaitOne了,所以必须等待先前的线程通过Release来释放。

    using System;
    using System.Threading;
    
    namespace ConsoleApp1
    {
        class Program
        {
            static void Main(string[] args)
            {
                Thread t1 = new Thread(Run1);
                t1.Start();
                Thread t2 = new Thread(Run2);
                t2.Start();
                Thread t3 = new Thread(Run3);
                t3.Start();
                Console.ReadKey();
            }
    
            //初始可以授予2个线程信号,因为第3个要等待前面的Release才能得到信号
            static Semaphore sem = new Semaphore(2, 10);
    
            static void Run1()
            {
                sem.WaitOne();
                Console.WriteLine("大家好,我是Run1;" + DateTime.Now.ToString("mm:ss"));
    
                //两秒后
                Thread.Sleep(2000);
                sem.Release();
            }
    
            static void Run2()
            {
                sem.WaitOne();
                Console.WriteLine("大家好,我是Run2;" + DateTime.Now.ToString("mm:ss"));
    
                //两秒后
                Thread.Sleep(2000);
                sem.Release();
            }
    
            static void Run3()
            {
                sem.WaitOne();
                Console.WriteLine("大家好,我是Run3;" + DateTime.Now.ToString("mm:ss"));
    
                //两秒后
                Thread.Sleep(2000);
                sem.Release();
            }
        }
    }
    Program

    在以上的方法中Release()方法相当于自增一个信号量,Release(5)自增5个信号量。但是,Release()到构造函数的第二个参数maximumCount的值就不能再自增了。

    Semaphore可用于进程级交互。

    using System;
    using System.Diagnostics;
    using System.Threading;
    
    namespace ConsoleApp1
    {
        class Program
        {
            static void Main(string[] args)
            {
    
                Thread t1 = new Thread(Run1);
                t1.Start();
    
                Thread t2 = new Thread(Run2);
                t2.Start();
    
                Console.Read();
            }
    
            //初始可以授予2个线程信号,因为第3个要等待前面的Release才能得到信号
            static Semaphore sem = new Semaphore(3, 10, "命名Semaphore");
    
            static void Run1()
            {
                sem.WaitOne();
    
                Console.WriteLine("进程:" + Process.GetCurrentProcess().Id + "  我是Run1" + DateTime.Now.TimeOfDay);
            }
    
            static void Run2()
            {
                sem.WaitOne();
    
                Console.WriteLine("进程:" + Process.GetCurrentProcess().Id + "  我是Run2" + DateTime.Now.TimeOfDay);
            }
        }
    }
    Program

    直接运行两次bin目录的exe文件,就能发现最多只能输出3个。

    Semaphore可以限制可同时访问某一资源或资源池的线程数。
            Semaphore类在内部维护一个计数器,当一个线程调用Semaphore对象的Wait系列方法时,此计数器减一,只要计数器还是一个正数,线程就不会阻塞。当计数器减到0时,再调用Semaphore对象Wait系列方法的线程将被阻塞,直到有线程调用Semaphore对象的Release()方法增加计数器值时,才有可能解除阻塞状态。
     
    示例说明:
    图书馆都配备有若干台公用计算机供读者查询信息,当某日读者比较多时,必须排队等候。UseLibraryComputer实例用多线程模拟了多人使用多台计算机的过程
    using System;
    using System.Threading;
    
    namespace ConsoleApp1
    {
        class Program
        {
            //图书馆拥有的公用计算机  
            private const int ComputerNum = 3;
            private static Computer[] LibraryComputers;
            //同步信号量  
            public static Semaphore sp = new Semaphore(ComputerNum, ComputerNum);
    
            static void Main(string[] args)
            {
                //图书馆拥有ComputerNum台电脑  
                LibraryComputers = new Computer[ComputerNum];
                for (int i = 0; i < ComputerNum; i++)
                    LibraryComputers[i] = new Computer("Computer" + (i + 1).ToString());
                int peopleNum = 0;
                Random ran = new Random();
                Thread user;
                System.Console.WriteLine("敲任意键模拟一批批的人排队使用{0}台计算机,ESC键结束模拟……", ComputerNum);
                //每次创建若干个线程,模拟人排队使用计算机  
                while (System.Console.ReadKey().Key != ConsoleKey.Escape)
                {
                    peopleNum = ran.Next(0, 10);
                    System.Console.WriteLine("
    有{0}人在等待使用计算机。", peopleNum);
    
                    for (int i = 1; i <= peopleNum; i++)
                    {
                        user = new Thread(UseComputer);
                        user.Start("User" + i.ToString());
                    }
                }
            }
    
            //线程函数  
            static void UseComputer(Object UserName)
            {
                sp.WaitOne();//等待计算机可用  
    
                //查找可用的计算机  
                Computer cp = null;
                for (int i = 0; i < ComputerNum; i++)
                    if (LibraryComputers[i].IsOccupied == false)
                    {
                        cp = LibraryComputers[i];
                        break;
                    }
                //使用计算机工作  
                cp.Use(UserName.ToString());
    
                //不再使用计算机,让出来给其他人使用  
                sp.Release();
            }
        }
    
        class Computer
        {
            public readonly string ComputerName = "";
            public Computer(string Name)
            {
                ComputerName = Name;
            }
            //是否被占用  
            public bool IsOccupied = false;
            //人在使用计算机  
            public void Use(String userName)
            {
                System.Console.WriteLine("{0}开始使用计算机{1}", userName, ComputerName);
                IsOccupied = true;
                Thread.Sleep(new Random().Next(1, 2000)); //随机休眠,以模拟人使用计算机  
                System.Console.WriteLine("{0}结束使用计算机{1}", userName, ComputerName);
                IsOccupied = false;
            }
        }
    }
    Program
    2、SemaphoreSlim类
         在.net 4.0之前,framework中有一个重量级的Semaphore,可以跨进程同步,SemaphoreSlim轻量级不行,msdn对它的解释为:限制可同时访问某一资源或资源池的线程数。
    using System;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace ConsoleApp1
    {
        class Program
        {
            static SemaphoreSlim slim = new SemaphoreSlim(Environment.ProcessorCount, 12);
    
            static void Main(string[] args)
            {
                for (int i = 0; i < 12; i++)
                {
                    Task.Factory.StartNew((obj) =>
                    {
                        Run(obj);
                    }, i);
                }
                Console.Read();
            }
    
            static void Run(object obj)
            {
                slim.Wait();
                Console.WriteLine("当前时间:{0}任务 {1}已经进入。", DateTime.Now, obj);
                //这里busy3s中
                Thread.Sleep(3000);
                slim.Release();
            }
        }
    }
    Program

    同样,防止死锁的情况,我们需要知道”超时和取消标记“的解决方案,像SemaphoreSlim这种定死的”线程请求范围“,其实是降低了扩展性,使用需谨慎,在觉得有必要的时候使用它

    注:Semaphore类是SemaphoreSlim类的老版本,该版本使用纯粹的内核时间(kernel-time)方式。
        SemaphoreSlim类不使用Windows内核信号量,而且也不支持进程间同步。所以在跨程序同步的场景下可以使用Semaphore
     
    3、CountdownEvent类
         这种采用信号状态的同步基元非常适合在动态的fork,join的场景,它采用“信号计数”的方式,就比如这样,一个麻将桌只能容纳4个人打麻将,如果后来的人也想搓一把碰碰运气,那么他必须等待直到麻将桌上的人走掉一位。好,这就是简单的信号计数机制,从技术角度上来说它是定义了最多能够进入关键代码的线程数。
         但是CountdownEvent更牛X之处在于我们可以动态的改变“信号计数”的大小,比如一会儿能够容纳8个线程,一下又4个,一下又10个,这样做有什么好处呢?比如一个任务需要加载1w条数据,那么可能出现这种情况。
    例如:
    加载User表:         根据user表的数据量,我们需要开5个task。
    加载Product表:    产品表数据相对比较多,计算之后需要开8个task。
    加载order表:       由于我的网站订单丰富,计算之后需要开12个task。
    using System;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace ConsoleApp1
    {
        class Program
        {
            //默认的容纳大小为“硬件线程“数
            static CountdownEvent cde = new CountdownEvent(Environment.ProcessorCount);
    
            static void LoadUser(object obj)
            {
                try
                {
                    Console.WriteLine("ThreadId={0};当前任务:{1}正在加载User部分数据!", Thread.CurrentThread.ManagedThreadId, obj);
                }
                finally
                {
                    cde.Signal();
                }
            }
    
            static void LoadProduct(object obj)
            {
                try
                {
                    Console.WriteLine("ThreadId={0};当前任务:{1}正在加载Product部分数据!", Thread.CurrentThread.ManagedThreadId, obj);
                }
                finally
                {
                    cde.Signal();
                }
            }
    
            static void LoadOrder(object obj)
            {
                try
                {
                    Console.WriteLine("ThreadId={0};当前任务:{1}正在加载Order部分数据!", Thread.CurrentThread.ManagedThreadId, obj);
                }
                finally
                {
                    cde.Signal();
                }
            }
    
            static void Main(string[] args)
            {
                //加载User表需要5个任务
                var userTaskCount = 5;
                //重置信号
                cde.Reset(userTaskCount);
                for (int i = 0; i < userTaskCount; i++)
                {
                    Task.Factory.StartNew((obj) =>
                    {
                        LoadUser(obj);
                    }, i);
                }
                //等待所有任务执行完毕
                cde.Wait();
                Console.WriteLine("
    User表数据全部加载完毕!
    ");
    
                //加载product需要8个任务
                var productTaskCount = 8;
                //重置信号
                cde.Reset(productTaskCount);
                for (int i = 0; i < productTaskCount; i++)
                {
                    Task.Factory.StartNew((obj) =>
                    {
                        LoadProduct(obj);
                    }, i);
                }
                cde.Wait();
                Console.WriteLine("
    Product表数据全部加载完毕!
    ");
    
                //加载order需要12个任务
                var orderTaskCount = 12;
                //重置信号
                cde.Reset(orderTaskCount);
                for (int i = 0; i < orderTaskCount; i++)
                {
                    Task.Factory.StartNew((obj) =>
                    {
                        LoadOrder(obj);
                    }, i);
                }
                cde.Wait();
                Console.WriteLine("
    Order表数据全部加载完毕!
    ");
    
                Console.WriteLine("
    (*^__^*) 嘻嘻,恭喜你,数据全部加载完毕
    ");
                Console.Read();
            }
        }
    }
    Program

    我们看到有两个主要方法:Wait和Signal。每调用一次Signal相当于麻将桌上走了一个人,直到所有人都搓过麻将wait才给放行,这里同样要注意也就是“超时“问题的存在性,尤其是在并行计算中,轻量级别给我们提供了”取消标记“的机制,这是在重量级别中不存在的

    注:如果调用Signal()没有到达指定的次数,那么Wait()将一直等待,请确保使用每个线程完成后都要调用Signal方法。

  • 相关阅读:
    ASP.net实现WEB站点的后台定时任务[转]
    個人所得稅計算
    當VS2005 遇上 LINQ[转]
    NBearV3中文教程总目录
    C#开源框架
    excel 不能使用对象链接和嵌入的错误
    PetShop 学习
    ADHelper类与扩展应用
    (javascript,treeview)treeview通过checkbox来进行全选单选
    (javascript)动态添加的控件如何设置其属性
  • 原文地址:https://www.cnblogs.com/scmail81/p/9514611.html
Copyright © 2011-2022 走看看