zoukankan      html  css  js  c++  java
  • Java多线程系列---“JUC原子类”01之 原子类的实现(CAS算法)

    转自:https://blog.csdn.net/ls5718/article/details/52563959  & https://blog.csdn.net/mmoren/article/details/79185862(含部分修改)

    在JDK 5之前Java语言是靠synchronized关键字保证同步的,这会导致有锁

    锁机制存在以下问题

    (1)在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题。

    (2)一个线程持有锁会导致其它所有需要此锁的线程挂起。

    (3)如果一个优先级高的线程等待一个优先级低的线程释放锁会导致优先级倒置,引起性能风险。

    volatile是不错的机制,但是volatile不能保证原子性。因此对于同步最终还是要回到锁机制上来。

    独占锁是一种悲观锁,synchronized就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。而另一个更加有效的锁就是乐观锁。所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。乐观锁用到的机制就是CAS,Compare and Swap。

    一、什么是CAS

    CAS,compare and swap的缩写,中文翻译成比较并交换。

    在Java发展初期,java语言是不能够利用硬件提供的这些便利来提升系统的性能的。而随着java不断的发展,Java本地方法(JNI)的出现,使得java程序越过JVM直接调用本地方法提供了一种便捷的方式,因而java在并发的手段上也多了起来。而在Doug Lea提供的cucurenct包中,CAS理论是它实现整个java包的基石。

    CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。 如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值 。否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该 位置的值。(在 CAS 的一些特殊情况下将仅返回 CAS 是否成功,而不提取当前 值。)CAS 有效地说明了“我认为位置 V 应该包含值 A;如果包含该值,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。”

    通常将 CAS 用于同步的方式是从地址 V 读取值 A,执行多步计算来获得新 值 B,然后使用 CAS 将 V 的值从 A 改为 B。如果 V 处的值尚未同时更改,则 CAS 操作成功。

    类似于 CAS 的指令允许算法执行读-修改-写操作,而无需害怕其他线程同时 修改变量,因为如果其他线程修改变量,那么 CAS 会检测它(并失败),算法 可以对该操作重新计算。

    二、CAS的目的

    利用CPU的CAS指令,同时借助JNI来完成Java的非阻塞算法。其它原子操作都是利用类似的特性完成的。整个J.U.C都是建立在CAS之上的,因此对于synchronized阻塞算法,J.U.C在性能上有了很大的提升。

    三、CAS存在的问题

    CAS虽然很高效的解决原子操作,但是CAS仍然存在三大问题。ABA问题,循环时间长开销大和只能保证一个共享变量的原子操作

    1.  ABA问题。因为CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A。

    从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式(最终调用的是Unsafe中的方法)将该引用和该标志的值设置为给定的更新值。

    关于ABA问题参考文档: http://blog.hesey.net/2011/09/resolve-aba-by-atomicstampedreference.html

    2. 循环时间长开销大。自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。

    3. 只能保证一个共享变量的原子操作。当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。

    四. CPU对CAS的支持

    或许我们可能会有这样的疑问,假设存在多个线程执行CAS操作并且CAS的步骤很多,有没有可能在判断V和E相同后,正要赋值时,切换了线程,更改了值。造成了数据不一致呢?答案是否定的,因为CAS是一种系统原语,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题。

    五. 鲜为人知的unsafe类

    Unsafe类存在于sun.misc包中,其内部方法操作可以像C的指针一样直接操作内存,单从名称看来就可以知道该类是非安全的,毕竟Unsafe拥有着类似于C的指针操作,因此总是不应该首先使用Unsafe类,Java官方也不建议直接使用的Unsafe类,但我们还是很有必要了解该类,因为Java中CAS操作的执行依赖于Unsafe类的方法,注意Unsafe类中的所有方法都是native修饰的,也就是说Unsafe类中的方法都直接调用操作系统底层资源执行相应任务。
    1. unsafe类里的CAS操作

    CAS是一些CPU直接支持的指令,也就是我们前面分析的无锁(乐观-->无锁,悲观-->有锁)操作,在Java中无锁操作CAS基于以下3个方法实现,在稍后讲解Atomic系列内部方法是基于下述方法的实现的。

    //第一个参数o为给定对象,offset为对象内存的偏移量,通过这个偏移量迅速定位字段并设置或获取该字段的值,
    //expected表示期望值,x表示要设置的值,下面3个方法都通过CAS原子指令执行操作。
    public final native boolean compareAndSwapObject(Object o, long offset,Object expected, Object x);                                                                                                  
     
    public final native boolean compareAndSwapInt(Object o, long offset,int expected,int x);
     
    public final native boolean compareAndSwapLong(Object o, long offset,long expected,long x);

    2. 挂起与恢复

    将一个线程进行挂起是通过park方法实现的,调用 park后,线程将一直阻塞直到超时或者中断等条件出现。unpark可以终止一个挂起的线程,使其恢复正常。Java对线程的挂起操作被封装在 LockSupport类中,LockSupport类中有各种版本park方法,其底层实现最终还是使用Unsafe.park()方法和Unsafe.unpark()方法

    六、concurrent包的实现

    由于java的CAS同时具有 volatile 读和volatile写的内存语义,因此Java线程之间的通信现在有了下面四种方式:

    1. A线程写volatile变量,随后B线程读这个volatile变量。
    2. A线程写volatile变量,随后B线程用CAS更新这个volatile变量。
    3. A线程用CAS更新一个volatile变量,随后B线程用CAS更新这个volatile变量。
    4. A线程用CAS更新一个volatile变量,随后B线程读这个volatile变量。

    Java的CAS会使用现代处理器上提供的高效机器级别原子指令,这些原子指令以原子方式对内存执行读-改-写操作,这是在多处理器中实现同步的关键(从本质上来说,能够支持原子性读-改-写指令的计算机器,是顺序计算图灵机的异步等价机器,因此任何现代的多处理器都会去支持某种能对内存执行原子性读-改-写操作的原子指令)。同时,volatile变量的读/写和CAS可以实现线程之间的通信。把这些特性整合在一起,就形成了整个concurrent包得以实现的基石。如果我们仔细分析concurrent包的源代码实现,会发现一个通用化的实现模式:

    1. 首先,声明共享变量为volatile;
    2. 然后,使用CAS的原子条件更新来实现线程之间的同步;
    3. 同时,配合以volatile的读/写和CAS所具有的volatile读和写的内存语义来实现线程之间的通信。

    通过前面的分析我们已基本理解了无锁CAS的原理并对Java中的指针类Unsafe类有了比较全面的认识,下面进一步分析CAS在Java中的应用,即并发包中的原子操作类(Atomic系列),从JDK 1.5开始提供了java.util.concurrent.atomic包,在该包中提供了许多基于CAS实现的原子操作类,用法方便,性能高效。
    1. 原子更新基本类型

    原子更新基本类型主要包括3个类:

    • AtomicBoolean:原子更新布尔类型
    • AtomicInteger:原子更新整型
    • AtomicLong:原子更新长整型

    这3个类的实现原理和使用方式几乎是一样的,这里我们以AtomicInteger为例进行分析,AtomicInteger主要是针对int类型的数据执行原子操作,它提供了原子自增方法、原子自减方法以及原子赋值方法等,鉴于AtomicInteger的源码不多,我们直接看源码

    public class AtomicInteger extends Number implements java.io.Serializable {
        private static final long serialVersionUID = 6214790243416807050L;
     
        // 获取指针类Unsafe
        private static final Unsafe unsafe = Unsafe.getUnsafe();
     
        //下述变量value在AtomicInteger实例对象内的内存偏移量
        private static final long valueOffset;
     
        static {
            try {
               //通过unsafe类的objectFieldOffset()方法,获取value变量在对象内存中的偏移
               //通过该偏移量valueOffset,unsafe类的内部方法可以获取到变量value对其进行取值或赋值操作
                valueOffset = unsafe.objectFieldOffset
                    (AtomicInteger.class.getDeclaredField("value"));
            } catch (Exception ex) { throw new Error(ex); }
        }
       //当前AtomicInteger封装的int变量value
        private volatile int value;//注意,它的声明带有volatile,这是必需的,以保证内存可见性。
     
        public AtomicInteger(int initialValue) {
            value = initialValue;
        }
        public AtomicInteger() {
        }
       //获取当前最新值,
        public final int get() {
            return value;
        }
        //设置当前值,具备volatile效果,方法用final修饰是为了更进一步的保证线程安全。
        public final void set(int newValue) {
            value = newValue;
        }
        //最终会设置成newValue,使用该方法后可能导致其他线程在之后的一小段时间内可以获取到旧值,有点类似于延迟加载
        public final void lazySet(int newValue) {
            unsafe.putOrderedInt(this, valueOffset, newValue);
        }
       //设置新值并获取旧值,底层调用的是CAS操作即unsafe.compareAndSwapInt()方法
        public final int getAndSet(int newValue) {
            return unsafe.getAndSetInt(this, valueOffset, newValue);
        }
       //如果当前值为expect,则设置为update(当前值指的是value变量)
        public final boolean compareAndSet(int expect, int update) {
            return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
        }
        //当前值加1返回旧值,底层CAS操作
        public final int getAndIncrement() {
            return unsafe.getAndAddInt(this, valueOffset, 1);
        }
        //当前值减1,返回旧值,底层CAS操作
        public final int getAndDecrement() {
            return unsafe.getAndAddInt(this, valueOffset, -1);
        }
       //当前值增加delta,返回旧值,底层CAS操作
        public final int getAndAdd(int delta) {
            return unsafe.getAndAddInt(this, valueOffset, delta);
        }
        //当前值加1,返回新值,底层CAS操作
        public final int incrementAndGet() {
            return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
        }
        //当前值减1,返回新值,底层CAS操作
        public final int decrementAndGet() {
            return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
        }
       //当前值增加delta,返回新值,底层CAS操作
        public final int addAndGet(int delta) {
            return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
        }
       //省略一些不常用的方法....
    }

    这些方法的实现都依赖另一个public方法:

    public final boolean compareAndSet(int expect, int update)

    这是一个非常重要的方法,比较并设置,我们以后将简称为CAS。该方法以原子方式实现了如下功能:如果当前值等于expect,则更新为update,否则不更新,如果更新成功,返回true,否则返回false。

    通过上述的分析,可以发现AtomicInteger原子类的内部几乎是基于前面分析过Unsafe类中的CAS相关操作的方法实现的,这也同时证明AtomicInteger是基于无锁实现的,这里重点分析自增操作实现过程,其他方法自增实现原理一样。

    我们发现AtomicInteger类中所有自增或自减的方法都间接调用Unsafe类中的getAndAddInt()方法实现了CAS操作,从而保证了线程安全,关于getAndAddInt其实前面已分析过,它是Unsafe类中1.8新增的方法,源码如下

    //Unsafe类中的getAndAddInt方法
    public final int getAndAddInt(Object o, long offset, int delta) {
            int v;
            do {
                v = getIntVolatile(o, offset);
            } while (!compareAndSwapInt(o, offset, v, v + delta));
            return v;
        }

    可看出getAndAddInt通过一个while循环不断的重试更新要设置的值,直到成功为止,调用的是Unsafe类中的compareAndSwapInt方法,是一个CAS操作方法。这里需要注意的是,上述源码分析是基于JDK1.8的,如果是1.8之前的方法,AtomicInteger源码实现有所不同,是基于for死循环的,如下

    //JDK 1.7的源码,由for的死循环实现,并且直接在AtomicInteger实现该方法,
    //JDK1.8后,该方法实现已移动到Unsafe类中,直接调用getAndAddInt方法即可
    public final int incrementAndGet() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }

    1.8之前说明:代码主体是个死循环,先获取当前值current,计算期望的值next,然后调用CAS方法进行更新,如果当前值没有变,则更新并返回新值,否则继续循环直到更新成功为止。

    synchronized vs 原子更新

    与synchronized锁相比,这种原子更新方式代表一种不同的思维方式。

    synchronized是悲观的,它假定更新很可能冲突,所以先获取锁,得到锁后才更新。原子变量的更新逻辑是乐观的,它假定冲突比较少,但使用CAS更新,也就是进行冲突检测,如果确实冲突了,那也没关系,继续尝试就好了。

    synchronized代表一种阻塞式算法,得不到锁的时候,进入锁等待队列,等待其他线程唤醒,有上下文切换开销。原子变量的更新逻辑是非阻塞式的,更新冲突的时候,它就重试,不会阻塞,不会有上下文切换开销。

    对于大部分比较简单的操作,无论是在低并发还是高并发情况下,这种乐观非阻塞方式的性能都要远高于悲观阻塞式方式。

    2. CAS中的ABA问题及其解决方案

    假设这样一种场景,当第一个线程执行CAS(V,E,U)操作,在获取到当前变量V,准备修改为新值U前,另外两个线程已连续修改了两次变量V的值,使得该值又恢复为旧值,这样的话,我们就无法正确判断这个变量是否已被修改过,如下图

     

    这就是典型的CAS的ABA问题,一般情况这种情况发现的概率比较小,可能发生了也不会造成什么问题,比如说我们对某个做加减法,不关心数字的过程,那么发生ABA问题也没啥关系。但是在某些情况下还是需要防止的,那么该如何解决呢?在Java中解决ABA问题,我们可以使用以下两个原子类:
    (1)AtomicStampedReference类

    https://blog.csdn.net/zqz_zqz/article/details/68062568

    AtomicStampedReference原子类是一个带有时间戳的对象引用,在每次修改后,AtomicStampedReference不仅会设置新值而且还会记录更改的时间。当AtomicStampedReference设置对象值时,对象值以及时间戳都必须满足期望值才能写入成功,这也就解决了反复读写时,无法预知值是否已被修改的窘境。底层实现为: 通过Pair私有内部类存储数据和时间戳, 并构造volatile修饰的私有实例,接着看AtomicStampedReference类的compareAndSet()方法的实现,同时对当前数据和当前时间进行比较,只有两者都相等是才会执行casPair()方法,单从该方法的名称就可知是一个CAS方法,最终调用的还是Unsafe类中的compareAndSwapObject方法

    到这我们就很清晰AtomicStampedReference的内部实现思想了,

    通过一个键值对Pair存储数据和时间戳,在更新时对数据和时间戳进行比较,

    只有两者都符合预期才会调用Unsafe的compareAndSwapObject方法执行数值和时间戳替换,也就避免了ABA的问题。

    比如:

    public class Counter {
        public final static AtomicStampedReference<String> ATOMIC_REFERENCE = new AtomicStampedReference<String>("abc", 0);
    
        public static void main(String args[]) {
            for (int i = 0; i < 100; i++) {
                final int num = i;
                final int stamp = ATOMIC_REFERENCE.getStamp();// 初始化是0
                new Thread() {
                    public void run() {
                        if (ATOMIC_REFERENCE.compareAndSet("abc", "abc2", stamp, stamp + 1)) {//如果第一个参数不为abc,则永远不会打印updated语句
                            System.out.println("I am thread:" + num + "," + "I have been updated!");
                        }
                    }
                }.start();
            }
            new Thread() {
                public void run() {
                    int stamp = ATOMIC_REFERENCE.getStamp();
                    while (!ATOMIC_REFERENCE.compareAndSet("abc2", "abc", stamp, stamp + 1))//如果第一个参数部位abc2,则永远不会打印下面的语句
                        ;
                    System.out.println("Already back to original value ");
                }
            }.start();
        }
    }
    I am thread:0,I have been updated!
    Already back to original value 

    (2)AtomicMarkableReference类

    AtomicMarkableReference与AtomicStampedReference不同的是,

    AtomicMarkableReference维护的是一个boolean值的标识,也就是说至于true和false两种切换状态,

    /**AtomicMarkableReference 解决aba问题,注意,它并不能解决aba的问题 ,它是通过一个boolean来标记是否更改,本质就是只有true和false两种版本来回切换,只能降低aba问题发生的几率,并不能阻止aba问题的发生,看下面的例子**/  
        public final static AtomicMarkableReference<String> ATOMIC_MARKABLE_REFERENCE = new AtomicMarkableReference<String>("abc" , false);  
          
        public static void main(String[] args){  
            //假设以下操作由不同线程执行  
            System.out.println("mark:"+ATOMIC_MARKABLE_REFERENCE.isMarked()); //线程1 获取到mark状态为false,原始值为“abc”  
            boolean thread1 = ATOMIC_MARKABLE_REFERENCE.isMarked();  
            System.out.println("mark:"+ATOMIC_MARKABLE_REFERENCE.isMarked()); //线程3获取到mark状态为false ,原始值为“abc”  
            boolean thread2 = ATOMIC_MARKABLE_REFERENCE.isMarked();  
            System.out.println("change result:"+ATOMIC_MARKABLE_REFERENCE.compareAndSet("abc", "abc2", thread1, !thread1)); //线程1 更改abc为abc2  
              
            System.out.println("mark:"+ATOMIC_MARKABLE_REFERENCE.isMarked()); //线程2获取到mark状态为true ,原始值为“abc2”  
            boolean thread3 = ATOMIC_MARKABLE_REFERENCE.isMarked();  
            System.out.println("change result:"+ATOMIC_MARKABLE_REFERENCE.compareAndSet("abc2", "abc", thread3, !thread3));//线程2 更改abc2为abc  
              
            System.out.println("change result:"+ATOMIC_MARKABLE_REFERENCE.compareAndSet("abc", "abc2", thread2, !thread2));//线程3更改abc为abc2;  
            //按照上面的执行顺序,3次修改都修改成功了,线程1与线程2拿到的mark状态都是false,他俩要做的操作都是将“abc”更改为“abc2”, 只是线程2在线程1和线程3中间做了一次更改,最后线程2做操作的时候并没有感知到线程1与线程3的更改;  
        }  
    这个例子来自:https://blog.csdn.net/zqz_zqz/article/details/68062568 
  • 相关阅读:
    [CentOS_7.4]Linux编译安装mono环境
    [CentOS_7.4]Linux安装与网络配置
    div框选中状态,倒三角样式
    photoswipe图片滑动插件使用
    微信应用号(小程序)开发教程二
    微信应用号(小程序)开发教程一
    开发者必去的10大国内网站推荐
    愤怒的小鸟 高清完整版下载
    魔兽 高清完整版下载
    分歧者3 高清完整版下载
  • 原文地址:https://www.cnblogs.com/Hermioner/p/9894941.html
Copyright © 2011-2022 走看看