zoukankan      html  css  js  c++  java
  • 关于高并发下多线程数据处理

    一、Lock:

    C#中关键字lock(VB.NET中SyncLock,等同于try+finally的Monitor.Enter……Monitor.Exit)。原理是“每次线程进入后锁住当前所有的内存区块等相关区域,由该线程自行处理完毕全部的线程后自动释放”,接着其余线程抢先进入。

    优点:最为大众所知的一种多线程处理方法,最为普遍的解决方案。

    缺点:无法完全适应高并发场合下处理需求——原因:每次让大量线程在外边排队等候(因为一次只能一个线程处理lock区块),而且外边排队的线程总是要在Windows操作系统、临界区等上下文环境切换。(想一想:假设有一个厕所,每次一个人进去大便,因为有人你进不了,也不知道人家啥时候出来……所以尴尬的你只能到休息区去暂时休息打盹、看报、聊天……,等一阵然后再去看看厕所是否处于可用状态……)。这里的“厕所”的门就是“临界区”,“临界区”里边就是受保护的代码,“厕所”和“休息区”是两个不同的上下文地区,整个公司就是一个Windows操作系统。所以这种lock方式的上下文切换本身就是尝试性地试探是否可以进入,不可以则上下文切换休眠。当然消耗资源也是大的——除非你可以“估计”到每个线程处理的时间差不多等于线程休眠从上下文切换的时间,而且外部排队线程不是很多的情况下。

    二、取代lock的“原子锁”:

    本质上是CPU级别的锁,最小粒度的。用于保证一次性数据原子量的完成。在NET中有InterLock类可以提供最简单的诸如Increment,Decrement等原子性操作方法。如果要实现共享代码,完全可以用以下方法尝试实现:

    private volatile int _isFree = 0;
    
    public void GeneralLockCode(Action act = null)
    {
         try
         {
            if(InterLock.Exchange(ref _isFree, 1) == 0)  
            {
                  if(act != null)
                   act();
            }
         }
         finally
         {
               InterLock.Exchange(ref _isFree, 0);
         }
    }

    “_isFree”是一个类级别的变量,如果是0表示尚无线程使用act代码(表示允许线程进来)。每次进入此代码之后,有一个Exchange函数,这个函数是内存原子操作函数,它的作用是:1)把“1”赋值给_isFree。2)返回_isFree以前状态的值(在这两步中不允许其它线程干扰对_isFree进行变化)。

    假设有2个线程(A和B)。A抢到了执行了Exchange方法,得到结果true;此时_isFree立马变成了1。此时B线程再次执行Exchange,_isFree重新被赋值为1,但是返回了线程A时候处理的状态1,因为1不会等于0,所以act方法不会执行——直到A执行了finally方法为止。

    上面的代码加以改进,就完全可以变成一个通用的,比lock更好一点的“排它锁”:

    private volatile int _isFree = 0;
    
    public void GeneralLock(Action act)
    {
          try
          {
                while(InterLock.Exchange(ref _isFree,1) == 1);
                act();
          }
          finally
          {
               InterLock.Exchange(ref _isFree,0);
          }
    }

    该锁一目了然——也只允许一个线程进入,其余线程外面等待。但是该方法的好处在于——如果act方法足够小(耗时小,时间短),那么当某个线程抢先进入的时候,其余的线程不是像Lock块一样上下文切换,而是直接在临界区“候着”自旋,直到act被执行完毕,锁标记释放为止。之所以要“act耗时足够小”,是因为这里如果高并发的话,没有抢到机会执行act的线程不得不一直处于“自旋”状态,空耗CPU资源。

    我们可以通过尝试在while里边加Thread.Sleep,Sleep的毫秒可以随着失败的轮询次数不断试探性增长,或者干脆使用SpinWait或者Thread的SpinUntil代替while,以便于这个CAS的锁更好地适用于一般高并发场合下的应用。

    【方案1】

    private volatile int _isFree = 0;
    
    public void GeneralLock(Action act)
    {
         SpinWait wait = new SpinWait();
    
          try
          {
                 
                while(InterLock.Exchange(ref _isFree,1) == 1)                    
                      {wait.SpinOnce();}
       act();
          }
          finally
          {
               InterLock.Exchange(ref _isFree,0);
          }
    }

    【方案2】

    private volatile int _isFree = 0;
    
    public void GeneralLock(Action act)
    {
          try
          {
               Thread.SpinUntil (() => InterLock.Exchange(ref _isFree,1) == 0);  //Until,直到isFree=0,即等到可用状态。
               act();
          }
          finally
          {
               InterLock.Exchange(ref _isFree,0);
          }
    }

    三、CAS法则:

    InternLock中有一个Compare And Swap原子操作,其函数是(以NET为主):

    InternLock.CompareExchange(ref 原来变量,替代变量,比较值),函数结果返回“原来变量”前一次的值(被“替代变量”取代前的数值)。

    根据这个函数,我们完全可以在把每个线程中拷贝自己的一份“原有变量”的值作为“比较值”,“替代变量”在“比较值”基础上进行某些操作,然后使用这个CAS方法保证每次操作的时候都是原子性的,该函数做3件事情:

    1)判断“原来变量”是否等于“比较值”。

    2)如果1)步返回true,则用“替代变量”替换“原来变量”。

    3)返回“原有变量”前一次的值。

    再次声明,这个方法也是原子性的。意味着在这个函数的时候无论如何不会被其它线程干扰打断,修改变量。那么我们轻而易举可以得到结论——如果有A和B线程,同时修改变量a(假设a共用,初始值为1),那么A线程抢到进去(A线程对a做了拷贝,比较值为1,替代变量为2)。那么当A做CAS的时候,原来变量=比较值,然后把“替代变量”替换“原来变量”,之后返回之前的值1,1=A线程的比较值1,所以成功了。

    假设A在执行CAS过程中(或者之前某些步骤),B线程也进来了,(B的比较值也为1,替代变量为2),此时A率先做了CAS,“原来变量”变成了2,此时B做CAS发现原来变量不等于比较值,因此不会进行替代。当然返回的结果也也是被A线程替代后的原来变量的值2,自然也不等于B线程的比较值。所以B线程不得不再次去抢——直到满足条件为止。

    类似这样的方法,相对于之前讲的“原子锁”而言不存在空转(因为他们每次都尝试生产一个自己的方案,然后分别去抢;不像“原子锁”中的while处于Sleep空转或者忙转情况),但是他们也是死循环尝试探测是否可以抢到原子锁的,所以仍然不排除CPU资源被大量占用的情况。所以也完全可以尝试先判断一次是否抢到,抢到直接退出循环;否则继续抢或者进行优化的“自旋”,下面给出伪代码结构:

    volatile _globalVar = 初始化值;
    
    public void GeneralCAS()
    {
          声明 _copyVar和_repVar;
    do
               {
                  _copyVar = _globalVar;
                  _repVar = 在_globalVar基础上改变或者新的值;
               }while(CAS(ref _globalVar,_repVar,_copyVar)!=_copyVar);
    }

     最后给出一个多线程累加的例子:

    public struct AtomicNumber
    {
          private volatile int _number = 0;
          
          public int GetProcessedNumber (int stepToIncrease = 1)
          {
              int copyNum = _number,newNum = copyNum + stepToIncrease;
              for(;;copyNumber = _number,newNum = copyNum+stepToIncrease)
              {
                     if(Interlock.CompareExchange(ref _number, newNum, copyNum) == copyNum)
                     {
                           return newNum;
                     }
              }
          }
    }
  • 相关阅读:
    初识Java,关于一个简单的ATM机的java程序设计
    字符串和字符串对象的区别
    集中常见得字符串处理方式
    得到类模板的3种方式
    反射的条件
    封装一个标签加文本框
    建立及中常见的布局管理器
    随机输入3个正整数,程序出来从小到大排列
    java 基础
    IO
  • 原文地址:https://www.cnblogs.com/ServiceboyNew/p/7127534.html
Copyright © 2011-2022 走看看