zoukankan      html  css  js  c++  java
  • 多线程实战(二)线程同步

     当多个线程使用共享对象的时候,依次等待,这种我们通常叫着线程同步;下面介绍几种线程同步的方法:

    原子操作

    Mutex

    SemaphoreSlim

    AutoResetEvent

    ManuualResetEventSlim

    CountDownEvent

    Barrier

    ReaderWriterLockSlim

    原子操作

    所谓原子操作是指不会被线程调度机制打断的操作;用于执行轻量级、仅执行一次的操作,比如修改计数器,某些条件下的增加值或设置位等。这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch(切换到另一个线程)。

    internal class Program
        {
            private static void Main(string[] args)
            {
                Console.WriteLine("Incorrect counter");
    
                var c = new Counter();
    
                var t1 = new Thread(() => TestCounter(c));
                var t2 = new Thread(() => TestCounter(c));
                var t3 = new Thread(() => TestCounter(c));
                t1.Start();
                t2.Start();
                t3.Start();
                t1.Join();
                t2.Join();
                t3.Join();
    
                Console.WriteLine("Total count: {0}", c.Count);
                Console.WriteLine("--------------------------");
    
                Console.WriteLine("Correct counter");
    
                var c1 = new CounterNoLock();
    
                t1 = new Thread(() => TestCounter(c1));
                t2 = new Thread(() => TestCounter(c1));
                t3 = new Thread(() => TestCounter(c1));
                t1.Start();
                t2.Start();
                t3.Start();
                t1.Join();
                t2.Join();
                t3.Join();
    
                Console.WriteLine("Total count: {0}", c1.Count);
                Console.ReadKey();
            }
    
            static void TestCounter(CounterBase c)
            {
                for (int i = 0; i < 100000; i++)
                {
                    c.Increment();
                    c.Decrement();
                }
            }
    
            class Counter : CounterBase
            {
                private int _count;
    
                public int Count { get { return _count; } }
    
                public override void Increment()
                {
                    _count++;
                }
    
                public override void Decrement()
                {
                    _count--;
                }
            }
    
            class CounterNoLock : CounterBase
            {
                private int _count;
    
                public int Count { get { return _count; } }
    
                public override void Increment()
                {
                    Interlocked.Increment(ref _count);
                }
    
                public override void Decrement()
                {
                    Interlocked.Decrement(ref _count);
                }
            }
    
            abstract class CounterBase
            {
                public abstract void Increment();
    
                public abstract void Decrement();
            }
        }

     

    Mutex

     同步基元,可以用于进程同步;当然也可以用于线程同步;

    线程使用Mutex.WaitOne()方法等待C# Mutex对象被释放,如果它等待的C# Mutex对象被释放了,它就自动拥有这个对象,直到它调用Mutex.ReleaseMutex()方法释放这个对象,而在此期间,其他想要获取这个C#Mutex对象的线程都只有等待。

     1.线程同步

     static void Main(string[] args)
            {
                for (int i = 0; i < 10; i++)
                {
                    Thread t = new Thread(() => method());
                    t.Start();
                }
                Console.ReadKey();
            }
    
    
            static int count = 0;
            static Mutex mutex = new Mutex();
            static void method()
            {
                Thread.Sleep(1000);
                mutex.WaitOne();
                Console.WriteLine("当前数字:{0}", ++count);
                mutex.ReleaseMutex();
            }

     2.进程同步

    static void Main(string[] args)
            {
                const string MutexName = "CSharpThreadingCookbook";
    
                using (var m = new Mutex(false, MutexName))
                {
                    if (!m.WaitOne(TimeSpan.FromSeconds(5), false))
                    {
                        Console.WriteLine("Second instance is running!");
                    }
                    else
                    {
                        Console.WriteLine("Running!");
                        Console.ReadLine();
                        m.ReleaseMutex();
                    }
                }
                Console.ReadKey();
            }

    运行两个exe程序,得到执行结果

    SemaphoreSlim

    SemaphoreSlim 类表示一个轻量、快速的信号量,可在等待时间预计很短的情况下用于在单个进程内等待。SemaphoreSlim 尽可能多地依赖公共语言运行时 (CLR) 提供的同步基元。 但是,它还提供延迟初始化、基于内核的等待句柄,作为在多个信号量上进行等待的必要支持。 SemaphoreSlim 也支持使用取消标记,但不支持命名信号量或使用用于同步的等待句柄。

    对可同时访问资源或资源池的线程数加以限制的 Semaphore 的轻量替代。

     Wait() 阻止当前线程,直至它可进入SemaphoreSlim 为止。

     Release() 退出 SemaphoreSlim 一次。

     static void Main(string[] args)
            {
                for (int i = 0; i <= 6; i++)
                {
                    string threadName = "线程" + i;
                    var t = new Thread(() => AccessDatabase(threadName));
                    t.Start();
                }
                Console.ReadKey();
            }
    
            // 同时授予4个信号量的初始请求数。
            static SemaphoreSlim _semaphore = new SemaphoreSlim(4);
            static void AccessDatabase(string name)
            {
                Console.WriteLine("{0}等待进入", name);
                _semaphore.Wait();
                Console.WriteLine("{0}进入", name);
                Thread.Sleep(2000);
                Console.WriteLine("{0}完成", name);
                _semaphore.Release();
    
            }

    AutoResetEvent

    允许线程通过发信号互相通信。通常,此通信涉及线程需要独占访问的资源。

    线程通过调用 AutoResetEvent 上的 WaitOne 来等待信号。如果 AutoResetEvent 处于非终止状态,则该线程阻塞,并等待当前控制资源的线程通过调用 Set 发出资源可用的信号。

    调用 Set 向 AutoResetEvent 发信号以释放等待线程。AutoResetEvent 将保持终止状态,直到一个正在等待的线程被释放,然后自动返回非终止状态。如果没有任何线程在等待,则状态将无限期地保持为终止状态。

    可以通过将一个布尔值传递给构造函数来控制 AutoResetEvent 的初始状态,如果初始状态为终止状态,则为 true;否则为 false

    通俗的来讲只有等myResetEven.Set()成功运行后,myResetEven.WaitOne()才能够获得运行机会;Set是发信号,WaitOne是等待信号,只有发了信号,等待的才会执行。如果不发的话,WaitOne后面的程序就永远不会执行。

     const int numIterations = 10;
            static int number;
            static AutoResetEvent myResetEvent = new AutoResetEvent(false);
            static AutoResetEvent ChangeEvent = new AutoResetEvent(false);
            static void Main(string[] args)
            {
                Thread payMoneyThread = new Thread(new ThreadStart(PayMoneyProc));
                payMoneyThread.Name = "付钱线程";
                Thread getBookThread = new Thread(new ThreadStart(GetBookProc));
                getBookThread.Name = "取书线程";
                payMoneyThread.Start();
                getBookThread.Start();
    
                for (int i = 1; i <= numIterations; i++)
                {
                    Console.WriteLine("买书线程:数量{0}", i);
                    number = i;
                    myResetEvent.Set();
                    Thread.Sleep(1000);
                    ChangeEvent.Set();
                    Thread.Sleep(1000);
                }
                Console.ReadKey();
            }
    
            static void PayMoneyProc()
            {
                while (true)
                {
                    myResetEvent.WaitOne();
                    Console.WriteLine("{0}:数量{1}", Thread.CurrentThread.Name, number);
                }
            }
            static void GetBookProc()
            {
                while (true)
                {
                    ChangeEvent.WaitOne();             
                    Console.WriteLine("{0}:数量{1}", Thread.CurrentThread.Name, number);
                    Console.WriteLine("------------------------------------------");
                }
            }

    ManuualResetEventSlim

    ManualResetEvent 类的简化版;通知一个或多个正在等待的线程已发生事件。
    Set() 将事件状态设置为有信号,从而允许一个或多个等待该事件的线程继续。
    Reset()将事件状态设置为非终止状态,从而导致线程受阻。
    Wait()阻止当前线程,直到设置了当前ManualResetEventSlim 为止。
    static void Main(string[] args)
            {
                var t1 = new Thread(() => Method("线程1", 5));
                var t2 = new Thread(() => Method("线程2", 6));
                var t3 = new Thread(() => Method("线程3", 12));
                t1.Start();
                t2.Start();
                t3.Start();
                Thread.Sleep(TimeSpan.FromSeconds(6));
                Console.WriteLine("第一次开始");
                _mainEvent.Set();
                Thread.Sleep(TimeSpan.FromSeconds(2));
                _mainEvent.Reset();
                Console.WriteLine("第一次关闭");
                Thread.Sleep(TimeSpan.FromSeconds(10));
                Console.WriteLine("第二次开始");
                _mainEvent.Set();
                Thread.Sleep(TimeSpan.FromSeconds(2));
                Console.WriteLine("第二次关闭");
                _mainEvent.Reset();
                Console.ReadKey();
            }
            static ManualResetEventSlim _mainEvent = new ManualResetEventSlim(false);
            static void Method(string threadName, int seconds)
            {
                Console.WriteLine("{0}暂停", threadName);
                Thread.Sleep(TimeSpan.FromSeconds(seconds));
                Console.WriteLine("{0}等待打开", threadName);
                _mainEvent.Wait();
                Console.WriteLine("{0}进入", threadName);
            }

     

    CountDownEvent

    示在计数变为零时处于有信号状态的同步基元.通过CountdownEvent可以在主线程中线程池中的任务运行,主线程要等待线程池中的任务完成之后才能继续。CountdownEvent Class在使用上十分的简单,只要在CountdownEvent的构造函数中传入信号量的数量。在每个线程启动的地方主线程调用AddCount方法增加信号量计数,线程池中跑的线程调用Signal。然后在主线程中调用Signal和Wait方法,就可以实现主 线程等待X次Signal方法调用之后继续。

    static void Main()
            {
                CountdownEvent cde = new CountdownEvent(3); // 创建SemaphoreSlim 初始化信号量最多计数为3次 
                Console.WriteLine(" InitialCount={0}, CurrentCount={1}, IsSet={2}", cde.InitialCount, cde.CurrentCount, cde.IsSet);
    
                Task t1 = Task.Factory.StartNew(() =>
                {
                    while (true)
                    {
                        Thread.Sleep(1000);
                        if (!cde.IsSet)
                        {
                            cde.Signal();
                            Console.WriteLine(" InitialCount={0}, CurrentCount={1}, IsSet={2}", cde.InitialCount, cde.CurrentCount, cde.IsSet);
                        }
                    }
                });
                cde.Wait();
                /*将 CurrentCount 重置为 InitialCount 的值。*/
                Console.WriteLine("将 CurrentCount 重置为 InitialCount 的值。");
                cde.Reset();
    
                cde.Wait();
                /*将 CurrentCount 重置为 5*/
                Console.WriteLine("将 CurrentCount 重置为 5");
                cde.Reset(5);
                cde.AddCount(2);
    
                cde.Wait();
                /*等待任务完成*/
                Task.WaitAll(t1);
                Console.WriteLine("任务执行完成");
                /*释放*/
                cde.Dispose();
                Console.ReadLine();
            }

      static void Main()
            {
                CountdownEvent cde = new CountdownEvent(3); // 创建SemaphoreSlim 初始化信号量最多计数为3次 
                Console.WriteLine(" InitialCount={0}, CurrentCount={1}, IsSet={2}", cde.InitialCount, cde.CurrentCount, cde.IsSet);
                /*创建任务执行计数*/
                Task t1 = Task.Factory.StartNew(() =>
                {
                    for (int index = 0; index <= 5; index++)
                    {
                        /*重置计数器*/
                        cde.Reset();
                        /*创建任务执行计数*/
                        while (true)
                        {
                            Thread.Sleep(1000);
                            if (!cde.IsSet)
                            {
                                cde.Signal();
                                Console.WriteLine("第{0}轮计数  CurrentCount={1}", index, cde.CurrentCount);
                            }
                            else
                            {
                                Console.WriteLine("第{0}轮计数完成", index);
                                break;
                            }
                        }
                        /*等待计数完成*/
                        cde.Wait();
                    }
                });
                t1.Wait();
                /*释放*/
                cde.Dispose();
                Console.ReadLine();
            }

    Barrier

    使多个任务能够采用并行方式依据某种算法在多个阶段中协同工作。

    当在需要一组任务并行地运行一连串的阶段,但是每一个阶段都要等待其他任务完成前一阶段之后才能开始时,您可以通过使用Barrier类的实例来同步这一类协同工作

       private static Task[] _CookTasks { get; set; }
            private static Barrier _barrier { get; set; }
            /*获取当前计算机处理器数*/
            private static int _particpants = Environment.ProcessorCount;
            /*  
             *  代码中 展示煮饭的步骤   1.打水  2.淘米 3.放入锅中 4.盖上锅盖 5.生火煮饭 
             */
            static void Main(string[] args)
            {
                Console.WriteLine("定义{0}个人煮饭3次", _particpants);
                _CookTasks = new Task[_particpants];
                _barrier = new Barrier(_particpants, (barrier) =>
                {
                    Console.WriteLine("当前阶段:{0}", barrier.CurrentPhaseNumber);
                });
                Stopwatch swTask1 = new Stopwatch();
                swTask1.Start();
                /*定义N个人*/
                for (int cook_person = 0; cook_person < _particpants; cook_person++)
                {
                    _CookTasks[cook_person] = Task.Factory.StartNew((num) =>
                    {
                        int index = Convert.ToInt32(num);
                        /*每个人煮3次饭*/
                        for (int cook_count = 0; cook_count < 3; cook_count++)
                        {
                            CookStepTask1(index, cook_count);
                            CookStepTask2(index, cook_count);
                            CookStepTask3(index, cook_count);
                            CookStepTask4(index, cook_count);
                            CookStepTask5(index, cook_count);
                        }
                    }, cook_person);
                }
    
                /*ContinueWhenAll 提供一组任务完成后 延续方法*/
                var finalTask = Task.Factory.ContinueWhenAll(_CookTasks, (tasks) =>
                {
                    /*等待任务完成*/
                    Task.WaitAll(_CookTasks);
                    swTask1.Stop();
                    Console.WriteLine("采用并发 {1}个人煮3次饭耗时:{0}", swTask1.ElapsedMilliseconds, _particpants);
                    /*释放资源*/
                    _barrier.Dispose();
                });
                Thread.Sleep(4000);
                Stopwatch swTask = new Stopwatch();
                swTask.Start();
    
                /*定义N个人*/
                for (int cook_person = 0; cook_person < _particpants; cook_person++)
                {
                    /*每个人煮3次饭*/
                    for (int cook_count = 0; cook_count < 3; cook_count++)
                    {
                        CookStep1(cook_person, cook_count); 
                        CookStep2(cook_person, cook_count); 
                        CookStep3(cook_person, cook_count); 
                        CookStep4(cook_person, cook_count); 
                        CookStep5(cook_person, cook_count);
                    }
                }
                swTask.Stop();
                Console.WriteLine("不采用并发 {1}个人煮3次饭耗时:{0}", swTask.ElapsedMilliseconds, _particpants);
                Thread.Sleep(2000);
    
    
                Console.ReadLine();
            }
            /*1.打水*/
            private static void CookStepTask1(int pesron_index, int index)
            {
                Console.WriteLine("{0} 第{1}次 打水... 耗时2分钟", pesron_index, index);
                Thread.Sleep(200);
                /*存在线程暂停 所以需要将 _barrier.SignalAndWait();放在方法中 */
                _barrier.SignalAndWait();
            }
    
            /*2.淘米*/
            private static void CookStepTask2(int pesron_index, int index)
            {
                Console.WriteLine("{0} 第{1}次 淘米... 耗时3分钟", pesron_index, index);
                Thread.Sleep(300);
                /*存在线程暂停 所以需要将 _barrier.SignalAndWait();放在方法中 */
                _barrier.SignalAndWait();
            }
    
            /*3.放入锅中*/
            private static void CookStepTask3(int pesron_index, int index)
            {
                Console.WriteLine("{0} 第{1}次 放入锅中... 耗时1分钟", pesron_index, index);
                Thread.Sleep(100);
                /*存在线程暂停 所以需要将 _barrier.SignalAndWait();放在方法中 */
                _barrier.SignalAndWait();
            }
    
            /*4.盖上锅盖*/
            private static void CookStepTask4(int pesron_index, int index)
            {
                Console.WriteLine("{0} 第{1}次 盖上锅盖... 耗时1分钟", pesron_index, index);
                Thread.Sleep(100);
                /*存在线程暂停 所以需要将 _barrier.SignalAndWait();放在方法中 */
                _barrier.SignalAndWait();
            }
            /*5.生火煮饭*/
            private static void CookStepTask5(int pesron_index, int index)
            {
                Console.WriteLine("{0} 第{1}次  生火煮饭... 耗时30分钟", pesron_index, index);
                Thread.Sleep(500);
                /*存在线程暂停 所以需要将 _barrier.SignalAndWait();放在方法中 */
                _barrier.SignalAndWait();
            }
    
            /*1.打水*/
            private static void CookStep1(int pesron_index, int index)
            {
                Console.WriteLine("{0} 第{1}次 打水... 耗时2分钟", pesron_index, index);
                Thread.Sleep(200);
            }
    
            /*2.淘米*/
            private static void CookStep2(int pesron_index, int index)
            {
                Console.WriteLine("{0} 第{1}次 淘米... 耗时3分钟", pesron_index, index);
                Thread.Sleep(300);
            }
    
            /*3.放入锅中*/
            private static void CookStep3(int pesron_index, int index)
            {
                Console.WriteLine("{0} 第{1}次 放入锅中... 耗时1分钟", pesron_index, index);
                Thread.Sleep(100);
            }
    
            /*4.盖上锅盖*/
            private static void CookStep4(int pesron_index, int index)
            {
                Console.WriteLine("{0} 第{1}次 盖上锅盖... 耗时1分钟", pesron_index, index);
                Thread.Sleep(100);
            }
            /*5.生火煮饭*/
            private static void CookStep5(int pesron_index, int index)
            {
                Console.WriteLine("{0} 第{1}次  生火煮饭... 耗时30分钟", pesron_index, index);
                Thread.Sleep(500);
            }

    ReaderWriterLockSlim

    读写锁的概念很简单,允许多个线程同时获取读锁,但同一时间只允许一个线程获得写锁,因此也称作共享-独占锁。

     http://www.cnblogs.com/xchit/p/4548392.html

    class Program
        {
            static void Main(string[] args)
            {
                new Thread(() => Read("读线程1")) { IsBackground = true }.Start();
                new Thread(() => Read("读线程2")) { IsBackground = true }.Start();
                new Thread(() => Read("读线程3")) { IsBackground = true }.Start();
    
                new Thread(() => Write("写线程1")){ IsBackground = true }.Start();
                new Thread(() => Write("写线程2")){ IsBackground = true }.Start();
    
                Thread.Sleep(TimeSpan.FromSeconds(30));
    
                Console.ReadKey();
            }
    
            static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim();
            static Dictionary<int, int> _items = new Dictionary<int, int>();
    
            static void Read(string ThreadName)
            {
                //Console.WriteLine(ThreadName);
                while (true)
                {
                    try
                    {
                        _rw.EnterReadLock();
                        foreach (var key in _items.Keys)
                        {
                            Thread.Sleep(TimeSpan.FromSeconds(0.1));
                            Console.WriteLine("{0}读{1}", ThreadName,key.ToString());
                        }
                    }
                    finally
                    {
                        _rw.ExitReadLock();
                    }
                }
            }
    
            static void Write(string threadName)
            {
                while (true)
                {
                    try
                    {
                        int newKey = new Random().Next(250);
                        _rw.EnterUpgradeableReadLock();
                        if (!_items.ContainsKey(newKey))
                        {
                            try
                            {
                                _rw.EnterWriteLock();
                                _items[newKey] = 1;
                                Console.WriteLine("{0}写{1}", threadName,newKey);
                            }
                            finally
                            {
                                _rw.ExitWriteLock();
                            }
                        }
                        Thread.Sleep(TimeSpan.FromSeconds(0.1));
                    }
                    finally
                    {
                        _rw.ExitUpgradeableReadLock();
                    }
                }
            }
        }
  • 相关阅读:
    redis:高可用分析
    mysql:explain分析sql
    python中注意事项(更新)
    jupyter使用小技巧(更新)
    Jupyter中的快捷键
    Excel制作甘特图
    Vim编辑器常用命令
    Mysql主从
    常用MySql命令
    进程与线程的一个简单解释
  • 原文地址:https://www.cnblogs.com/xchit/p/4780876.html
Copyright © 2011-2022 走看看