zoukankan      html  css  js  c++  java
  • C#多线程编程(6)--线程安全2 互锁构造Interlocked

    在线程安全1中,我介绍了线程同步的意义和一种实现线程同步的方法:volatile。volatile关键字属于原子操作的一种,若对一个关键字使用volatile,很多时候会显得很“浪费”,因为只有在并发访问的情况下才需要“易变”读写,单线程访问时并不需要。在命名空间System.Threading命名空间中提供了InterLock类,该类中提供了一些原子方法。本文来介绍如何使用这些方法。

    •   互锁

      在抢占式系统中,一个线程在执行到任何阶段都有可能被其他线程“打断”,原子操作能够保证该操作不被其他线程打断,其他线程的“打断”只可能发生在该操作的之前或之后。InterLock类中的方法就是”原子“式的,最常用的方法有:

    public static class InterLock{
        // return (++location)
        public static int Increment(ref int location);
        // return(--location)
        public static int Decrement(ref int location);
        // return(location += value)
        //注意,也可能是负数,从而实现减法运算。
        public static int Add(ref int location, int value)
        // int old = location; location = value; return old;
        public static int Exchange(ref int location, int value);
        //old = location1;
        //if(location1 = comparand) location1 = value;
        //return old;
        public static int CompareExchange(ref int location1,
               int value, int comparand);
        ...
    }

      《CLR via C#》的作者在书中说他喜欢Interlocked中的方法,因为它不但很快,而且不会阻塞线程。为了验证InterLock真的很快,我们对变量进行一百万次的写,与Volatile.Write()来进行对比,看看是否真的快。

     static void Main(string[] args){
         TestInterLock();
         Console.ReadLine();
     }
     private static volatile int v = 0;
     private static int n = 0;
     static void TestInterLock(){
         Stopwatch sw = Stopwatch.StartNew();
         for (int i = 0; i < 1000000; i++)
             v++;
         Console.WriteLine("volatile write 1000000 times takes:{0}", sw.ElapsedMilliseconds);
         sw = Stopwatch.StartNew();
         for (int i = 0; i < 1000000; i++)
             Interlocked.Increment(ref n);
         Console.WriteLine("InterLock write 1000000 times takes:{0}", sw.ElapsedMilliseconds);
         n = 0;
         sw = Stopwatch.StartNew();
         for (int i = 0; i < 1000000; i++)
             n++;
         Console.WriteLine("n++ write 1000000 times takes:{0}", sw.ElapsedMilliseconds);
     }

      运行结果如下:

    volatile write 1000000 times takes:2

    InterLock write 1000000 times takes:8

    n++ write 1000000 times takes:2

      我运行了好几次,结果会有些出入,但是大部分的结果都是volatile的写入速度和原生的n++的速度是一样的。InterLock也确实如Jeffrey Richter所说,很快,只是没有volatile关键字修饰的变量的读写快。这是在非并发情况下,下面来看一下并发情况下是否还是很快。

    static void TestInterLock1(){
        Stopwatch sw = Stopwatch.StartNew();
        Task[] t2 = new[] { new Task(() =>    {
            for (int i = 0; i < 1000000; i++)
                Interlocked.Increment(ref n);
        }), new Task(() => { 
            for (int i = 0; i < 1000000; i++)
                Interlocked.Increment(ref n);}) };
        Task[] t1 = new[] { new Task(() =>    {
            for (int i = 0; i < 1000000; i++)
                v++;
        }), new Task(() => { 
            for (int i = 0; i < 1000000; i++)
                v++;}) };
        Task.WhenAll(t1)
            .ContinueWith(t => Console.WriteLine("volatile write 2000000 times takes:{0}", sw.ElapsedMilliseconds));
        Task.WhenAll(t2)
            .ContinueWith(t => Console.WriteLine("InterLock write 2000000 times takes:{0}", sw.ElapsedMilliseconds));
        t2[0].Start();
        t1[0].Start();
        t1[1].Start();
        t2[1].Start();
    }

      先看运行结果,

    volatile write 2000000 times takes:94
    InterLock write 2000000 times takes:101

      多次运行,运行时间会有不同,但是在并发情况下,volatile的写入和InterLock的写入速度几乎相同。上述代码写的如此丑陋,而不是直接写Task.Run(),是为了保证初始化部分都运行完成后,再Start(),且两个任务的先后顺序进行了打乱,最大限度减少误差。可以看到并发情况下,volatile和InterLock几乎一样,且在InterLock中的方法要比Volatile的功能要全,但是在串行时,Volatile的性能要比InterLock要好。结论是,若只对变量读写,没有替换或者其他复杂操作时,可以使用volatile关键字,但是一些复杂操作,需要原子操作时,就得使用InterLock中的方法了,如果使用volatile关键字修饰的变量来进行交换的话,很难保证原子性,只有引入锁才能保证线程同步。且InterLock中提供了几个重载方法,能够接受object类型,还有泛型版本。

    可以利用InterLock来实现一个简单的自旋锁,代码如下:

    public struct SimpleSpinLock{
        private int m_Lock;
    
        public void Enter(){
            while (true){
                if (Interlocked.Exchange(ref m_Lock, 1) == 0) return;
                //此处可以添加黑科技
            }
        }
    
        public void Leave(){
            Volatile.Write(ref m_Lock, 0);
        }
    }
    
    //下面是如何使用SimpleSpinLock的例子
    public class Simple{
        private SimpleSpinLock m_lock = new SimpleSpinLock();
    
        public void AccessResource(){
            m_lock.Enter();
            //执行某些程序,只有一个线程可以进入这里
            m_lock.Leave();
        }
    }

      这个简单的自旋锁在一个线程调用Enter()时,其他线程在调用m_lock.Enter()方法时,if (Interlocked.Exchange(ref m_lock, 1) == 0)会失败,因为该方法会将m_lock和1交换,并返回旧值,在已有线程调用m_lock.Enter()时,m_lock的旧值是1,因此该方法会在whle(true)处自旋,不断尝试获得锁。该锁的问题是,该线程没有被阻塞(挂起),而是一直在占用CPU资源,其他需要CPU资源的线程无法运行(可以在while内,我加注释的地方,加入”黑科技“来尝试解决此问题。其思路是在线程自旋的过程中,立刻交出CPU资源,可通过调用Thread.Sleep(0)或者Thread.Yield()来实现。尝试获得锁的线程交出时间片,这样当前获得锁的线程能够有更多的资源来运行程序,从而运行结束并交出锁,具体细节在这里不展开)。因此,自旋锁只适合那些运行非常快的方法。

    • Interlocked Anything 模式

      Interlocked中全部是原子性操作,那是否提供了一个方法,该方法可以接受一个委托,保证该委托在运行时是原子的。答案是没有,但是可以利用Interlocked.CompareExchange来自己实现一个。我们先来看一下利用CompareExchange来实现原子性的Maximum方法。

    public static int Maximum(ref int target, int value){
        int currentValue = target, startValue, desireValue;
        do{
            startValue = currentValue;
            //可以在此处添加任何想要保证“原子性”的操作,此处是求最大值。
            desireValue = Math.Max(startValue, value);
            //注意,此处有可能被其他线程抢占,也有可能target的值被修改,因此if()语句会出问题,要使用Interlocked的方法。
            //if (startValue = target) target = desireValue;
            currentValue = Interlocked.CompareExchange(ref target, desireValue, startValue);
        } 
        //若在此时target已被修改,则重新计算最大值
        while (startValue != currentValue);
        return currentValue;
    }

      此方法的思路是:从CPU将target的值读到寄存器中,到计算最大值结束,期间的任何时间target都有可能被其他线程修改。因此保证原子性就被转换成保证计算最大值时,target的值没有变过,如果变过,就重新计算。因此,在最开始的时候,startValue=currentValue,currentValue是开始计算时target的值。然后求得最大值,并保存到desireValue中。注意,此时target有可能被修改,因此调用CompareExchange方法,该方法会将target与startValue比较,如果此时两值相等,那相当于我之前说的,target从开始到最后没有改变,那么这个最大值是准确的,并将target的旧值付给currentValue,最后如果startValue==currentValue,则计算完成,否则继续循环。

      《CLR via C#》的作者Jeff很喜欢上面的方法,他在实际开发中,都是使用上面的方法,并对其进行了包装,使之能够支持Interlocked Anything。其原理就是在desireValue=Math.Max()处替换成其他方法,只要在返回结果时保证旧值和最开始读取的值一致就可以。我们来看一下他的封装:

    delegate int Morpher<TResult, in TArgument>(int startValue, TArgument argument, out TResult result);
    
    static TResult Morph<TResult, TArgumen>(ref int target, TArgumen argumen, Morpher<TResult, TArgumen> morpher){
        TResult mophorResult;
        int currentValue = target, desireValue, startValue;
        do{
            startValue = currentValue;
            desireValue = morpher(startValue, argumen, out mophorResult);
            currentValue = Interlocked.CompareExchange(ref target, desireValue, startValue);
        } 
        while (currentValue != startValue);
        return mophorResult;
    }

    说实话,我并不能非常好了理解这个封装,并不是不能理解做法,而是不能确定此方法到底能不能实现效果,那我们来测试一下。测试的基本思路是对一个变量执行1000次的result+=10。分别是不带线程同步的和利用Morph方法对result+=10的方法进行互锁,保证其原子性。我省去了Morpher和Morph的声明部分。之所以要在DelayAdd和DelayAdd1方法中调用ThreadSleep(20),是为了模拟当在运行较长的方法时,Morph方法是否还能够保证该方法运行的原子性。

    static void Main(string[] args){
        Test(new TestAction(Add), "Add");
        Test(new TestAction(MorphAdd), "MorphAdd");
        Console.ReadLine();
    }
    //DelayAdd1的变种,使之能够符合Morpher的签名
    static int DelayAdd(int startValue, int argument, out int result){
        Thread.Sleep(20);
        result = startValue + argument;
        return result;
    }
    
    static void DelayAdd1(int argument, ref int result){
        Thread.Sleep(20);
        result += argument;
    }
    //测试的不具有线程同步的方法。
    static void Add(ref int result){
        DelayAdd1(10, ref result);
    }
    //具有线程同步的方法。
    static void MorphAdd(ref int result){
        Morph(ref result, 10, new Morpher<int, int>(DelayAdd));
    }
    //要测试的委托签名
    delegate void TestAction(ref int result);
    //公共测试方法
    static void Test(TestAction action, string actionName){
        int result = 0;
        var tList = new Task[1000];
        for (int i = 0; i < 1000; i++)
            tList[i] = Task.Run(() =>
            {
                action(ref result);
            });
        Task.WhenAll(tList).GetAwaiter().OnCompleted(
            () => Console.WriteLine("{0}, Result is {1}", actionName, result));
    }

    运行,得到的结果是:

    Add, Result is 8440
    MorphAdd, Result is 10000

    运行1000次result += 10,普通的Add不能够得到正确结果,但是MorphAdd可以。这是因为在1000次的Add中,某几个Add是同时调用的,result+=10在同一时间调用了多次,因此有156次的Add因为并行而被吞噬了。值得注意的是,MorphAdd方法因为需要线程同步,因此执行时间要慢很多。但是这些付出是值得的,因为这保证了结果的正确。

      上述例子证明了Morph方法能够保证委托的原子性,且该方法既不会阻塞线程也不会长时间的自旋,推荐大家在实际中使用该方法。

      本文中,我先介绍了Interlocked类中较常用的方法,以及Interlocked.Increment()方法与volatile关键字的对比,结论是虽然将变量设置为所有读写都是“易变的”看起来很浪费,但是该关键字能够保证在单线程时几乎没有性能损失,大部分情况下和原生的读写是一样的速度,且volatile比Interlocked类中提供的写要快2-4倍,但是在并发状态下其性能和volatile关键字是没有差别的。之后我介绍了用Interlocked类中的方法来实现简单的自旋锁,该锁的优点是在非竟态情况下非常快,但是在竟态情况下,未获得锁的线程会一直处于自旋状态,白白浪费CPU。最后介绍了《CLR via C#》书中提到的Interlocked Anything的方法(文中其他的知识点大多也是提取自《CLR via C#》),并测试了该方法确实可以保证委托的原子性,且不会阻塞线程,没有锁,不会造成死锁。至此线程同步中互锁构造就讲完了,后面我会给大家介绍内核构造的信号量和其他锁。

  • 相关阅读:
    构建之法阅读笔记04
    团队项目
    构建之法阅读笔记03
    第6周学习进度
    求最大子数组03
    四则运算4(完结)
    第5周学习进度
    敏捷开发概述
    第4周学习进度
    构建之法阅读笔记01
  • 原文地址:https://www.cnblogs.com/jazzpop/p/8547880.html
Copyright © 2011-2022 走看看