zoukankan      html  css  js  c++  java
  • C#多线程编程(6)--线程安全2 互锁构造Interlocked

    在线程安全1中,我介绍了线程同步的意义和一种实现线程同步的方法:volatile。volatile关键字属于原子操作的一种,若对一个关键字使用volatile,很多时候会显得很“浪费”,因为只有在并发访问的情况下才需要“易变”读写,单线程访问时并不需要。在命名空间System.Threading命名空间中提供了InterLock类,该类中提供了一些原子方法。本文来介绍如何使用这些方法。

    •   互锁

      在抢占式系统中,一个线程在执行到任何阶段都有可能被其他线程“打断”,原子操作能够保证该操作不被其他线程打断,其他线程的“打断”只可能发生在该操作的之前或之后。InterLock类中的方法就是”原子“式的,最常用的方法有:

    public static class InterLock{
        // return (++location)
        public static int Increment(ref int location);
        // return(--location)
        public static int Decrement(ref int location);
        // return(location += value)
        //注意,也可能是负数,从而实现减法运算。
        public static int Add(ref int location, int value)
        // int old = location; location = value; return old;
        public static int Exchange(ref int location, int value);
        //old = location1;
        //if(location1 = comparand) location1 = value;
        //return old;
        public static int CompareExchange(ref int location1,
               int value, int comparand);
        ...
    }

      《CLR via C#》的作者在书中说他喜欢Interlocked中的方法,因为它不但很快,而且不会阻塞线程。为了验证InterLock真的很快,我们对变量进行一百万次的写,与Volatile.Write()来进行对比,看看是否真的快。

     static void Main(string[] args){
         TestInterLock();
         Console.ReadLine();
     }
     private static volatile int v = 0;
     private static int n = 0;
     static void TestInterLock(){
         Stopwatch sw = Stopwatch.StartNew();
         for (int i = 0; i < 1000000; i++)
             v++;
         Console.WriteLine("volatile write 1000000 times takes:{0}", sw.ElapsedMilliseconds);
         sw = Stopwatch.StartNew();
         for (int i = 0; i < 1000000; i++)
             Interlocked.Increment(ref n);
         Console.WriteLine("InterLock write 1000000 times takes:{0}", sw.ElapsedMilliseconds);
         n = 0;
         sw = Stopwatch.StartNew();
         for (int i = 0; i < 1000000; i++)
             n++;
         Console.WriteLine("n++ write 1000000 times takes:{0}", sw.ElapsedMilliseconds);
     }

      运行结果如下:

    volatile write 1000000 times takes:2

    InterLock write 1000000 times takes:8

    n++ write 1000000 times takes:2

      我运行了好几次,结果会有些出入,但是大部分的结果都是volatile的写入速度和原生的n++的速度是一样的。InterLock也确实如Jeffrey Richter所说,很快,只是没有volatile关键字修饰的变量的读写快。这是在非并发情况下,下面来看一下并发情况下是否还是很快。

    static void TestInterLock1(){
        Stopwatch sw = Stopwatch.StartNew();
        Task[] t2 = new[] { new Task(() =>    {
            for (int i = 0; i < 1000000; i++)
                Interlocked.Increment(ref n);
        }), new Task(() => { 
            for (int i = 0; i < 1000000; i++)
                Interlocked.Increment(ref n);}) };
        Task[] t1 = new[] { new Task(() =>    {
            for (int i = 0; i < 1000000; i++)
                v++;
        }), new Task(() => { 
            for (int i = 0; i < 1000000; i++)
                v++;}) };
        Task.WhenAll(t1)
            .ContinueWith(t => Console.WriteLine("volatile write 2000000 times takes:{0}", sw.ElapsedMilliseconds));
        Task.WhenAll(t2)
            .ContinueWith(t => Console.WriteLine("InterLock write 2000000 times takes:{0}", sw.ElapsedMilliseconds));
        t2[0].Start();
        t1[0].Start();
        t1[1].Start();
        t2[1].Start();
    }

      先看运行结果,

    volatile write 2000000 times takes:94
    InterLock write 2000000 times takes:101

      多次运行,运行时间会有不同,但是在并发情况下,volatile的写入和InterLock的写入速度几乎相同。上述代码写的如此丑陋,而不是直接写Task.Run(),是为了保证初始化部分都运行完成后,再Start(),且两个任务的先后顺序进行了打乱,最大限度减少误差。可以看到并发情况下,volatile和InterLock几乎一样,且在InterLock中的方法要比Volatile的功能要全,但是在串行时,Volatile的性能要比InterLock要好。结论是,若只对变量读写,没有替换或者其他复杂操作时,可以使用volatile关键字,但是一些复杂操作,需要原子操作时,就得使用InterLock中的方法了,如果使用volatile关键字修饰的变量来进行交换的话,很难保证原子性,只有引入锁才能保证线程同步。且InterLock中提供了几个重载方法,能够接受object类型,还有泛型版本。

    可以利用InterLock来实现一个简单的自旋锁,代码如下:

    public struct SimpleSpinLock{
        private int m_Lock;
    
        public void Enter(){
            while (true){
                if (Interlocked.Exchange(ref m_Lock, 1) == 0) return;
                //此处可以添加黑科技
            }
        }
    
        public void Leave(){
            Volatile.Write(ref m_Lock, 0);
        }
    }
    
    //下面是如何使用SimpleSpinLock的例子
    public class Simple{
        private SimpleSpinLock m_lock = new SimpleSpinLock();
    
        public void AccessResource(){
            m_lock.Enter();
            //执行某些程序,只有一个线程可以进入这里
            m_lock.Leave();
        }
    }

      这个简单的自旋锁在一个线程调用Enter()时,其他线程在调用m_lock.Enter()方法时,if (Interlocked.Exchange(ref m_lock, 1) == 0)会失败,因为该方法会将m_lock和1交换,并返回旧值,在已有线程调用m_lock.Enter()时,m_lock的旧值是1,因此该方法会在whle(true)处自旋,不断尝试获得锁。该锁的问题是,该线程没有被阻塞(挂起),而是一直在占用CPU资源,其他需要CPU资源的线程无法运行(可以在while内,我加注释的地方,加入”黑科技“来尝试解决此问题。其思路是在线程自旋的过程中,立刻交出CPU资源,可通过调用Thread.Sleep(0)或者Thread.Yield()来实现。尝试获得锁的线程交出时间片,这样当前获得锁的线程能够有更多的资源来运行程序,从而运行结束并交出锁,具体细节在这里不展开)。因此,自旋锁只适合那些运行非常快的方法。

    • Interlocked Anything 模式

      Interlocked中全部是原子性操作,那是否提供了一个方法,该方法可以接受一个委托,保证该委托在运行时是原子的。答案是没有,但是可以利用Interlocked.CompareExchange来自己实现一个。我们先来看一下利用CompareExchange来实现原子性的Maximum方法。

    public static int Maximum(ref int target, int value){
        int currentValue = target, startValue, desireValue;
        do{
            startValue = currentValue;
            //可以在此处添加任何想要保证“原子性”的操作,此处是求最大值。
            desireValue = Math.Max(startValue, value);
            //注意,此处有可能被其他线程抢占,也有可能target的值被修改,因此if()语句会出问题,要使用Interlocked的方法。
            //if (startValue = target) target = desireValue;
            currentValue = Interlocked.CompareExchange(ref target, desireValue, startValue);
        } 
        //若在此时target已被修改,则重新计算最大值
        while (startValue != currentValue);
        return currentValue;
    }

      此方法的思路是:从CPU将target的值读到寄存器中,到计算最大值结束,期间的任何时间target都有可能被其他线程修改。因此保证原子性就被转换成保证计算最大值时,target的值没有变过,如果变过,就重新计算。因此,在最开始的时候,startValue=currentValue,currentValue是开始计算时target的值。然后求得最大值,并保存到desireValue中。注意,此时target有可能被修改,因此调用CompareExchange方法,该方法会将target与startValue比较,如果此时两值相等,那相当于我之前说的,target从开始到最后没有改变,那么这个最大值是准确的,并将target的旧值付给currentValue,最后如果startValue==currentValue,则计算完成,否则继续循环。

      《CLR via C#》的作者Jeff很喜欢上面的方法,他在实际开发中,都是使用上面的方法,并对其进行了包装,使之能够支持Interlocked Anything。其原理就是在desireValue=Math.Max()处替换成其他方法,只要在返回结果时保证旧值和最开始读取的值一致就可以。我们来看一下他的封装:

    delegate int Morpher<TResult, in TArgument>(int startValue, TArgument argument, out TResult result);
    
    static TResult Morph<TResult, TArgumen>(ref int target, TArgumen argumen, Morpher<TResult, TArgumen> morpher){
        TResult mophorResult;
        int currentValue = target, desireValue, startValue;
        do{
            startValue = currentValue;
            desireValue = morpher(startValue, argumen, out mophorResult);
            currentValue = Interlocked.CompareExchange(ref target, desireValue, startValue);
        } 
        while (currentValue != startValue);
        return mophorResult;
    }

    说实话,我并不能非常好了理解这个封装,并不是不能理解做法,而是不能确定此方法到底能不能实现效果,那我们来测试一下。测试的基本思路是对一个变量执行1000次的result+=10。分别是不带线程同步的和利用Morph方法对result+=10的方法进行互锁,保证其原子性。我省去了Morpher和Morph的声明部分。之所以要在DelayAdd和DelayAdd1方法中调用ThreadSleep(20),是为了模拟当在运行较长的方法时,Morph方法是否还能够保证该方法运行的原子性。

    static void Main(string[] args){
        Test(new TestAction(Add), "Add");
        Test(new TestAction(MorphAdd), "MorphAdd");
        Console.ReadLine();
    }
    //DelayAdd1的变种,使之能够符合Morpher的签名
    static int DelayAdd(int startValue, int argument, out int result){
        Thread.Sleep(20);
        result = startValue + argument;
        return result;
    }
    
    static void DelayAdd1(int argument, ref int result){
        Thread.Sleep(20);
        result += argument;
    }
    //测试的不具有线程同步的方法。
    static void Add(ref int result){
        DelayAdd1(10, ref result);
    }
    //具有线程同步的方法。
    static void MorphAdd(ref int result){
        Morph(ref result, 10, new Morpher<int, int>(DelayAdd));
    }
    //要测试的委托签名
    delegate void TestAction(ref int result);
    //公共测试方法
    static void Test(TestAction action, string actionName){
        int result = 0;
        var tList = new Task[1000];
        for (int i = 0; i < 1000; i++)
            tList[i] = Task.Run(() =>
            {
                action(ref result);
            });
        Task.WhenAll(tList).GetAwaiter().OnCompleted(
            () => Console.WriteLine("{0}, Result is {1}", actionName, result));
    }

    运行,得到的结果是:

    Add, Result is 8440
    MorphAdd, Result is 10000

    运行1000次result += 10,普通的Add不能够得到正确结果,但是MorphAdd可以。这是因为在1000次的Add中,某几个Add是同时调用的,result+=10在同一时间调用了多次,因此有156次的Add因为并行而被吞噬了。值得注意的是,MorphAdd方法因为需要线程同步,因此执行时间要慢很多。但是这些付出是值得的,因为这保证了结果的正确。

      上述例子证明了Morph方法能够保证委托的原子性,且该方法既不会阻塞线程也不会长时间的自旋,推荐大家在实际中使用该方法。

      本文中,我先介绍了Interlocked类中较常用的方法,以及Interlocked.Increment()方法与volatile关键字的对比,结论是虽然将变量设置为所有读写都是“易变的”看起来很浪费,但是该关键字能够保证在单线程时几乎没有性能损失,大部分情况下和原生的读写是一样的速度,且volatile比Interlocked类中提供的写要快2-4倍,但是在并发状态下其性能和volatile关键字是没有差别的。之后我介绍了用Interlocked类中的方法来实现简单的自旋锁,该锁的优点是在非竟态情况下非常快,但是在竟态情况下,未获得锁的线程会一直处于自旋状态,白白浪费CPU。最后介绍了《CLR via C#》书中提到的Interlocked Anything的方法(文中其他的知识点大多也是提取自《CLR via C#》),并测试了该方法确实可以保证委托的原子性,且不会阻塞线程,没有锁,不会造成死锁。至此线程同步中互锁构造就讲完了,后面我会给大家介绍内核构造的信号量和其他锁。

  • 相关阅读:
    稳扎稳打Silverlight(47) 4.0UI之操作剪切板, 隐式样式, CompositeTransform, 拖放外部文件到程序中
    返璞归真 asp.net mvc (9) asp.net mvc 3.0 新特性之 View(Razor)
    返璞归真 asp.net mvc (6) asp.net mvc 2.0 新特性
    稳扎稳打Silverlight(48) 4.0其它之打印, 动态绑定, 增强的导航系统, 杂七杂八
    精进不休 .NET 4.0 (9) ADO.NET Entity Framework 4.1 之 Code First
    稳扎稳打Silverlight(42) 4.0控件之Viewbox, RichTextBox
    稳扎稳打Silverlight(53) 4.0通信之对WCF NetTcpBinding的支持, 在Socket通信中通过HTTP检索策略文件, HTTP请求中的ClientHttp和BrowserHttp
    稳扎稳打 Silverlight 4.0 系列文章索引
    稳扎稳打Silverlight(54) 4.0通信之对UDP协议的支持: 通过 UdpAnySourceMulticastClient 实现 ASM(Any Source Multicast),即“任意源多播”
    返璞归真 asp.net mvc (8) asp.net mvc 3.0 新特性之 Model
  • 原文地址:https://www.cnblogs.com/jazzpop/p/8547880.html
Copyright © 2011-2022 走看看