zoukankan      html  css  js  c++  java
  • 并发-AtomicInteger源码分析—基于CAS的乐观锁实现

    AtomicInteger源码分析—基于CAS的乐观锁实现

     

    参考:

    http://www.importnew.com/22078.html

    https://www.cnblogs.com/mantu/p/5796450.html

    http://hustpawpaw.blog.163.com/blog/static/184228324201210811243127/

    1. 悲观锁与乐观锁

    我们都知道,cpu是时分复用的,也就是把cpu的时间片,分配给不同的thread/process轮流执行,时间片与时间片之间,需要进行cpu切换,也就是会发生进程的切换。切换涉及到清空寄存器,缓存数据。然后重新加载新的thread所需数据。当一个线程被挂起时,加入到阻塞队列,在一定的时间或条件下,在通过notify(),notifyAll()唤醒回来。在某个资源不可用的时候,就将cpu让出,把当前等待线程切换为阻塞状态。等到资源(比如一个共享数据)可用了,那么就将线程唤醒,让他进入runnable状态等待cpu调度。这就是典型的悲观锁的实现。独占锁是一种悲观锁,synchronized就是一种独占锁,它假设最坏的情况,并且只有在确保其它线程不会造成干扰的情况下执行,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。

    但是,由于在进程挂起和恢复执行过程中存在着很大的开销。当一个线程正在等待锁时,它不能做任何事,所以悲观锁有很大的缺点。举个例子,如果一个线程需要某个资源,但是这个资源的占用时间很短,当线程第一次抢占这个资源时,可能这个资源被占用,如果此时挂起这个线程,可能立刻就发现资源可用,然后又需要花费很长的时间重新抢占锁,时间代价就会非常的高。

    所以就有了乐观锁的概念,他的核心思路就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。在上面的例子中,某个线程可以不让出cpu,而是一直while循环,如果失败就重试,直到成功为止。所以,当数据争用不严重时,乐观锁效果更好。比如CAS就是一种乐观锁思想的应用。

    2.   java中CAS的实现

    CAS就是Compare and Swap的意思,比较并操作。很多的cpu直接支持CAS指令。CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。

    JDK1.5中引入了底层的支持,在int、long和对象的引用等类型上都公开了CAS的操作,并且JVM把它们编译为底层硬件提供的最有效的方法,在运行CAS的平台上,运行时把它们编译为相应的机器指令。在Java.util.concurrent.atomic包下面的所有的原子变量类型中,比如AtomicInteger,都使用了这些底层的JVM支持为数字类型的引用类型提供一种高效的CAS操作。

    在CAS操作中,会出现ABA问题。就是如果V的值先由A变成B,再由B变成A,那么仍然认为是发生了变化,并需要重新执行算法中的步骤。有简单的解决方案:不是更新某个引用的值,而是更新两个值,包括一个引用和一个版本号,即使这个值由A变为B,然后为变为A,版本号也是不同的。AtomicStampedReference和AtomicMarkableReference支持在两个变量上执行原子的条件更新。AtomicStampedReference更新一个“对象-引用”二元组,通过在引用上加上“版本号”,从而避免ABA问题,AtomicMarkableReference将更新一个“对象引用-布尔值”的二元组。

    3.  AtomicInteger的实现。

    AtomicInteger 是一个支持原子操作的 Integer 类,就是保证对AtomicInteger类型变量的增加和减少操作是原子性的,不会出现多个线程下的数据不一致问题。如果不使用 AtomicInteger,要实现一个按顺序获取的 ID,就必须在每次获取时进行加锁操作,以避免出现并发时获取到同样的 ID 的现象。

    接下来通过源代码来看AtomicInteger具体是如何实现的原子操作。

    首先看incrementAndGet() 方法,下面是具体的代码。

    1
    2
    3
    4
    5
    6
    7
    8
    public final int incrementAndGet() { 
            for (;;) { 
                int current = get(); 
                int next = current + 1
                if (compareAndSet(current, next)) 
                    return next; 
            
        }

    通过源码,可以知道,这个方法的做法为先获取到当前的 value 属性值,然后将 value 加 1,赋值给一个局部的 next 变量,然而,这两步都是非线程安全的,但是内部有一个死循环,不断去做compareAndSet操作,直到成功为止,也就是修改的根本在compareAndSet方法里面,compareAndSet()方法的代码如下:

    1
    2
    3
    public final boolean compareAndSet(int expect, int update) { 
            return unsafe.compareAndSwapInt(this, valueOffset, expect, update); 
        }

    compareAndSet()方法调用的compareAndSwapInt()方法的声明如下,是一个native方法。

    publicfinal native boolean compareAndSwapInt(Object var1, long var2, int var4, intvar5);

    compareAndSet 传入的为执行方法时获取到的 value 属性值,next 为加 1 后的值, compareAndSet所做的为调用 Sun 的 UnSafe 的 compareAndSwapInt 方法来完成,此方法为 native 方法,compareAndSwapInt 基于的是CPU 的 CAS指令来实现的。所以基于 CAS 的操作可认为是无阻塞的,一个线程的失败或挂起不会引起其它线程也失败或挂起。并且由于 CAS 操作是 CPU 原语,所以性能比较好。

    类似的,还有decrementAndGet()方法。它和incrementAndGet()的区别是将 value 减 1,赋值给next 变量。

    AtomicInteger中还有getAndIncrement() 和getAndDecrement() 方法,他们的实现原理和上面的两个方法完全相同,区别是返回值不同,前两个方法返回的是改变之后的值,即next。而这两个方法返回的是改变之前的值,即current。还有很多的其他方法,就不列举了。

    下面以具体的例子分析下AtomicInteger的实现:

      计数器(Counter),采用Java里比较方便的锁机制synchronized关键字,初步的代码如下:

    复制代码
     1 public class Counter {
     2     private int value;  
     3       
     4     public synchronized int getValue() {  
     5         return value;  
     6     }  
     7   
     8     public synchronized int increment() {  
     9         return ++value;  
    10     }  
    11   
    12     public synchronized int decrement() {  
    13         return --value;  
    14     }  
    15 }
    复制代码

      synchronized关键字是基于阻塞的锁机制,也就是说当一个线程拥有锁的时候,访问同一资源的其它线程需要等待,直到该线程释放锁,这也就我们前面所说的悲观锁。这里会有些问题:首先,如果被阻塞的线程优先级很高很重要怎么办?其次,如果获得锁的线程一直不释放锁怎么办?(这种情况是非常糟糕的)。还有一种情况,如果有大量的线程来竞争资源,那CPU将会花费大量的时间和资源来处理这些竞争(事实上CPU的主要工作并非这些),同时,还有可能出现一些例如死锁之类的情况,最后,其实锁机制是一种比较粗糙,粒度比较大的机制,相对于像计数器这样的需求有点儿过于笨重,因此,对于这种需求我们期待一种更合适、更高效的线程安全机制。

    package TestAtomicInteger;
     
    import java.util.concurrent.atomic.AtomicInteger;
     
    class MyThread implements Runnable {
    //    static  int i = 0;
         static AtomicInteger ai=new AtomicInteger(0);
          
     
        public void run() {
            for (int m = 0; m < 1000000; m++) {
                ai.getAndIncrement();
            }
        }
    };
     
    public class TestAtomicInteger {
        public static void main(String[] args) throws InterruptedException {
            MyThread mt = new MyThread();
     
            Thread t1 = new Thread(mt);
            Thread t2 = new Thread(mt);
            t1.start();
            t2.start();
            Thread.sleep(500);
            System.out.println(MyThread.ai.get());
        }
    }

      可以发现结果都是2000000,也就是说AtomicInteger是线程安全的。

      下面我们就以模拟CAS机制来实现Counter的例子:

       CAS类:

    复制代码
     1 public class SimpleCAS {
     2     private volatile int value;
     3     public synchronized int getValue(){
     4         return value;  
     5     } 
     6     public synchronized boolean comperaAndSwap(int expectedValue,int newValue){
     7         int oldValue = value;
     8         if(oldValue == expectedValue){
     9             value = newValue;
    10             return true;
    11         }else{
    12             return false;
    13         }
    14     }
    15 }
    复制代码

      CASCounter类:

    复制代码
     1 public class CASCounter {
     2     private SimpleCAS cas;  
     3     public int getValue(){
     4         return cas.getValue();
     5     }
     6     public int increment(){
     7         int olevalue = cas.getValue();
     8         for (; ;) {
     9             if(cas.comperaAndSwap(olevalue, olevalue+1)){
    10                 return cas.getValue();
    11             }
    12         }
    13          
    14     }
    15 }
    复制代码

      上面的模拟不是CSA的真正实现,其实我们在语言层面是没有做任何同步的操作的,大家也可以看到源码没有任何锁加在上面,可它为什么是线程安全的呢?这就是Atomic包下这些类的奥秘:语言层面不做处理,我们将其交给硬件—CPU和内存,利用CPU的多处理能力,实现硬件层面的阻塞,再加上volatile变量的特性即可实现基于原子操作的线程安全。所以说,CAS并不是无阻塞,只是阻塞并非在语言、线程方面,而是在硬件层面,所以无疑这样的操作会更快更高效!

      总结一下,AtomicInteger基于冲突检测的乐观并发策略。 可以想象,这种乐观在线程数目非常多的情况下,失败的概率会指数型增加。

    AtomicInteger等对象出现的目的主要是为了解决在多线程环境下变量计数的问题,例如常用的i++,i--操作,它们不是线程安全的,AtomicInteger引入后,就不必在进行i++和i--操作时,进行加锁操作,在我们日常工作中,有很多业务场景需要在多线程环境下进行变量的计数:订单数统计、访问量统计、累计相应时长统计等。

    demo 源码:https://github.com/mantuliu/javaAdvance

        下面我们先分析一下AtomicInteger的源代码。通过源码分析我们知道,AtomicInteger的核心就是一个CAS算法(CompareAndSwap),比较并交换算法,此算法是由unsafe的底层代码实现,它是一个原子的操作,原理就是:如果内存中的实际值与update值相同,则将实际值更新为expect值,反之则返回失败,由上层系统循环获取实际值后,再次调用此CAS算法:

    复制代码
    /*
     * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     */
    
    /*
     *
     *
     *
     *
     *
     * Written by Doug Lea with assistance from members of JCP JSR-166
     * Expert Group and released to the public domain, as explained at
     * http://creativecommons.org/publicdomain/zero/1.0/
     */
    
    package java.util.concurrent.atomic;
    import sun.misc.Unsafe;
    
    /**
     * An {@code int} value that may be updated atomically.  See the
     * {@link java.util.concurrent.atomic} package specification for
     * description of the properties of atomic variables. An
     * {@code AtomicInteger} is used in applications such as atomically
     * incremented counters, and cannot be used as a replacement for an
     * {@link java.lang.Integer}. However, this class does extend
     * {@code Number} to allow uniform access by tools and utilities that
     * deal with numerically-based classes.
     *
     * @since 1.5
     * @author Doug Lea
    */
    public class AtomicInteger extends Number implements java.io.Serializable {
        private static final long serialVersionUID = 6214790243416807050L;
    
        // setup to use Unsafe.compareAndSwapInt for updates
        private static final Unsafe unsafe = Unsafe.getUnsafe();
        private static final long valueOffset;//value值的偏移地址
    
        static {
          try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
          } catch (Exception ex) { throw new Error(ex); }
        }
    
        private volatile int value;//实际的值
    
        /**
         * Creates a new AtomicInteger with the given initial value.
         *
         * @param initialValue the initial value
         */
        public AtomicInteger(int initialValue) {
            value = initialValue;//初始化
        }
    
        /**
         * Creates a new AtomicInteger with initial value {@code 0}.
         */
        public AtomicInteger() {
        }
    
        /**
         * Gets the current value.
         *
         * @return the current value
         */
        public final int get() {
            return value;//返回value值
        }
    
        /**
         * Sets to the given value.
         *
         * @param newValue the new value
         */
        public final void set(int newValue) {
            value = newValue;//设置新值,因为没有判断oldvalue,所以此操作是非线程安全的
        }
    
        /**
         * Eventually sets to the given value.
         *
         * @param newValue the new value
         * @since 1.6
         */
        public final void lazySet(int newValue) {
            unsafe.putOrderedInt(this, valueOffset, newValue);//与set操作效果一样,只是采用的是unsafe对象中通过偏移地址来设置值的方式
        }
    
        /**
         * Atomically sets to the given value and returns the old value.
         *
         * @param newValue the new value
         * @return the previous value
         */
        public final int getAndSet(int newValue) {//原子操作,设置新值,返回老值
            for (;;) {
                int current = get();
                if (compareAndSet(current, newValue))//通过CAS算法,比较current的值和实际值是否一致,如果一致则设置为newValue
                    return current;
            }
        }
    
        /**
         * Atomically sets the value to the given updated value
         * if the current value {@code ==} the expected value.
         *
         * @param expect the expected value
         * @param update the new value
         * @return true if successful. False return indicates that
         * the actual value was not equal to the expected value.
         */
        public final boolean compareAndSet(int expect, int update) {
            return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
        }
    
        /**
         * Atomically sets the value to the given updated value
         * if the current value {@code ==} the expected value.
         *
         * <p>May <a href="package-summary.html#Spurious">fail spuriously</a>
         * and does not provide ordering guarantees, so is only rarely an
         * appropriate alternative to {@code compareAndSet}.
         *
         * @param expect the expected value
         * @param update the new value
         * @return true if successful.
         */
        public final boolean weakCompareAndSet(int expect, int update) {
            return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
        }
    
        /**
         * Atomically increments by one the current value.
         *
         * @return the previous value
         */
        public final int getAndIncrement() {//i++操作
            for (;;) {
                int current = get();//获取当前值
                int next = current + 1;//当前值+1
                if (compareAndSet(current, next))//比较current值和实际的值是否一致,如不一致,则继续循环
                    return current;
            }
        }
    
        /**
         * Atomically decrements by one the current value.
         *
         * @return the previous value
         */
        public final int getAndDecrement() {
            for (;;) {
                int current = get();
                int next = current - 1;
                if (compareAndSet(current, next))
                    return current;
            }
        }
    
        /**
         * Atomically adds the given value to the current value.
         *
         * @param delta the value to add
         * @return the previous value
         */
        public final int getAndAdd(int delta) {//例如:当我们统计接口的响应时间时,可以利用此方法
            for (;;) {
                int current = get();
                int next = current + delta;
                if (compareAndSet(current, next))
                    return current;
            }
        }
    
        /**
         * Atomically increments by one the current value.
         *
         * @return the updated value
         */
        public final int incrementAndGet() {
            for (;;) {
                int current = get();
                int next = current + 1;
                if (compareAndSet(current, next))
                    return next;
            }
        }
    
        /**
         * Atomically decrements by one the current value.
         *
         * @return the updated value
         */
        public final int decrementAndGet() {
            for (;;) {
                int current = get();
                int next = current - 1;
                if (compareAndSet(current, next))
                    return next;
            }
        }
    
        /**
         * Atomically adds the given value to the current value.
         *
         * @param delta the value to add
         * @return the updated value
         */
        public final int addAndGet(int delta) {
            for (;;) {
                int current = get();
                int next = current + delta;
                if (compareAndSet(current, next))
                    return next;
            }
        }
    
        /**
         * Returns the String representation of the current value.
         * @return the String representation of the current value.
         */
        public String toString() {
            return Integer.toString(get());
        }
    
    
        public int intValue() {
            return get();
        }
    
        public long longValue() {
            return (long)get();
        }
    
        public float floatValue() {
            return (float)get();
        }
    
        public double doubleValue() {
            return (double)get();
        }
    
    }
    复制代码

          下面,我们为四种情况(同步关键字、ReentrantLock公平锁和非公平锁、AtomicInteger)做一下性能对比分析,当我们看到上面的代码分析后,我们判断AtomicInteger应该比加锁的方式快,但是实验的结果表明,AtomicInteger只比ReentrantLock加公平锁的情况快几十倍,比其它两种方式略慢一些。

         四个demo都用100个线程来循环模拟下单60秒钟:

    demo Lesson8SyncIntPerform:在使用同步关键字加锁的情况下100个线程循环下单数为:677337556

    demo Lesson8SyncIntPerform:在使用同步关键字加锁的情况下100个线程循环下单数为:755994691

    demo Lesson8AtomicIntPerform:在使用AtomicInteger的情况下100个线程循环下单数为:562359607

    demo Lesson8AtomicIntPerform:在使用AtomicInteger的情况下100个线程循环下单数为:575367967

    demo Lesson8LockIntPerform:在使用ReentrantLock加非公平锁的情况下100个线程循环下单数为:857239882

    demo Lesson8LockIntPerform:在使用ReentrantLock加非公平锁的情况下100个线程循环下单数为:860364303

    demo Lesson8LockFairIntPerform:在使用ReentrantLock加公平锁的情况下100个线程循环下单数为:19153640

    demo Lesson8LockFairIntPerform:在使用ReentrantLock加公平锁的情况下100个线程循环下单数为:19076567

       上面的实验结果表明,在jdk1.6及后续的版本中(本实验的jdk版本是1.7,操作系统为windows操作系统),已经对于synchronized关键字的性能优化了很多,已经和ReentrantLock的性能差不多,加锁的效果比不加锁时使用AtomicInteger性能效果还要略好一些,但是公平锁的性能明显降低,其它三种情况下的性能是公平锁性能的几十倍以上,这和公平锁每次试图占有锁时,都必须先要进等待队列,按照FIFO的顺序去获取锁,因此在我们的实验情景下,使用公平锁的线程进行了频繁切换,而频繁切换线程,性能必然会下降的厉害,这也告诫了我们在实际的开发过程中,在需要使用公平锁的情景下,务必要考虑线程的切换频率。

    AtomicStampedReference解决ABA问题

    在运用CAS做Lock-Free操作中有一个经典的ABA问题:

    线程1准备用CAS将变量的值由A替换为B,在此之前,线程2将变量的值由A替换为C,又由C替换为A,然后线程1执行CAS时发现变量的值仍然为A,所以CAS成功。但实际上这时的现场已经和最初不同了,尽管CAS成功,但可能存在潜藏的问题,例如下面的例子:

    AtomicStampedReference解决ABA问题 - 木瓜仙人 - 木瓜仙人

    现有一个用单向链表实现的堆栈,栈顶为A,这时线程T1已经知道A.next为B,然后希望用CAS将栈顶替换为B:

    head.compareAndSet(A,B);

    在T1执行上面这条指令之前,线程T2介入,将A、B出栈,再pushD、C、A,此时堆栈结构如下图,而对象B此时处于游离状态:

    AtomicStampedReference解决ABA问题 - 木瓜仙人 - 木瓜仙人

    此时轮到线程T1执行CAS操作,检测发现栈顶仍为A,所以CAS成功,栈顶变为B,但实际上B.next为null,所以此时的情况变为:

    AtomicStampedReference解决ABA问题 - 木瓜仙人 - 木瓜仙人

    其中堆栈中只有B一个元素,C和D组成的链表不再存在于堆栈中,平白无故就把C、D丢掉了。

    以上就是由于ABA问题带来的隐患,各种乐观锁的实现中通常都会用版本戳version来对记录或对象标记,避免并发操作带来的问题,在Java中,AtomicStampedReference<E>也实现了这个作用,它通过包装[E,Integer]的元组来对对象标记版本戳stamp,从而避免ABA问题,例如下面的代码分别用AtomicInteger和AtomicStampedReference来对初始值为100的原子整型变量进行更新,AtomicInteger会成功执行CAS操作,而加上版本戳的AtomicStampedReference对于ABA问题会执行CAS失败:

     

    public class Test {

        private static AtomicInteger atomicInt = new AtomicInteger(100);

        private static AtomicStampedReference atomicStampedRef = new AtomicStampedReference(100, 0);

     

        public static void main(String[] args) throws InterruptedException {

           Thread intT1 = new Thread(new Runnable() {

               @Override

               public void run() {

                  atomicInt.compareAndSet(100, 101);

                  atomicInt.compareAndSet(101, 100);

               }

           });

     

           Thread intT2 = new Thread(new Runnable() {

               @Override

               public void run() {

                  try {

                      TimeUnit.SECONDS.sleep(1);

                  } catch (InterruptedException e) {

                  }

                  boolean c3 = atomicInt.compareAndSet(100, 101);

                  System.out.println(c3); // true

               }

           });

     

           intT1.start();

           intT2.start();

           intT1.join();

           intT2.join();

     

           Thread refT1 = new Thread(new Runnable() {

               @Override

               public void run()

                  try {

                      TimeUnit.SECONDS.sleep(1);

                  } catch (InterruptedException e) {

                  }

                  atomicStampedRef.compareAndSet(100, 101, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);

                  atomicStampedRef.compareAndSet(101, 100, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);

               }

           });

     

           Thread refT2 = new Thread(new Runnable() {

               @Override

               public void run() {

                  int stamp = atomicStampedRef.getStamp();

                  try {

                      TimeUnit.SECONDS.sleep(2);

                  } catch (InterruptedException e) {

                  }

                  boolean c3 = atomicStampedRef.compareAndSet(100, 101, stamp, stamp + 1);

                  System.out.println(c3); // false

               }

           });

     

           refT1.start();

           refT2.start();

        }

    }

  • 相关阅读:
    [模板] 主席树
    [模板] 替罪羊树
    [模板] Treap
    [LUOGU] P4342 [IOI1998]Polygon
    [JOYOI] 1051 选课
    poj 1845 数论(唯一分解定理+分治法求等比数列前n项的和mod m的值)
    poj 2418 bst统计字符串
    hdu 3791 二叉排序树
    hdu 3999 二叉排序树
    toj 3711 水题
  • 原文地址:https://www.cnblogs.com/xuwc/p/8684421.html
Copyright © 2011-2022 走看看