zoukankan      html  css  js  c++  java
  • AtomicInteger的CAS算法浅析

      之前浅析过自旋锁(自旋锁浅析),我们知道它的实现原理就是CAS算法。CAS(Compare and Swap)即比较并交换,作为著名的无锁算法,它也是乐观锁的实现方式之一。JDK并发包里也有许多代码中有CAS的身影闪烁其中,鉴于CAS算法在并发领域的重要性和普适性,还是再结合AtomicInteger这个原子类来浅析一下吧。浅析之前,先借用之前自旋锁测试代码直接看AtomicInteger的自增测试结果,可以拿它跟自旋锁做个比较:

        @Test
        public void testAtomicInteger()
        {
            // 10个线程使用AtomicInteger自增
            AtomicInteger ai = new AtomicInteger();
            for (int i = 0; i < 10; i++)
            {
                new Thread(new Runnable()
                {
                    @Override
                    public void run()
                    {
                        // 自增1万次
                        for (int j = 0; j < 10000; j++)
                        {
                            count = ai.incrementAndGet();
                        }
    
                        // 一个线程执行完了就减1,10个线程执行完了就变成0,执行主线程
                        latch.countDown();
                    }
                }).start();
            }
    
            // 主线程等待
            try
            {
                latch.await();
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
    
            TestCase.assertEquals(count, 100000);
        }

      运行结果:

    count值:100000, 耗时:10毫秒.

      是不是比自旋锁要简单?必须的,因为AtomicInteger本身已经实现了CAS算法,人家天然就用于并发自增的。之前也说到过,CAS的原理很简单,它包含三个值:当前内存值(V)、预期原来的值(A)以及期待更新的值(B)。如果内存位置V的值与预期原值A相匹配,那么处理器会自动将该位置值更新为新值B,返回true。否则处理器不做任何操作,返回false。举上面的例子,我们有10个线程分别去做自增操作,很明显count是共享变量,它将被这10个线程追杀加1。假如线程1将count追加到100时,正准备更新到101这一刻,线程2插一脚抢先一步把count追加到101,那么线程1该怎么办呢?它将获取最新的count值再去自增。具体怎么实现的,我们接下来看。为了直观点,我们可以换种方式实现上面的例子:

    package com.wulinfeng.test.testpilling;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class AtomicIntegerTest {
    
        // 共享变量
        private static int count;
    
        // 10个线程就先初始化10
        private static CountDownLatch latch = new CountDownLatch(10);
    
        public static void main(String[] args) {
            // 10个线程使用AtomicInteger自增
            AtomicInteger ai = new AtomicInteger();
            for (int i = 0; i < 10; i++) {
                final int threadNum = i;
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        // 自增1万次
                        for (int j = 0; j < 10000; j++) {
                            count = ai.incrementAndGet();
                            System.out.println("线程" + threadNum + ": " + count);
                        }
    
                        // 一个线程执行完了就减1,10个线程执行完了就变成0,执行主线程
                        latch.countDown();
                    }
                }).start();
            }
    
            // 主线程等待
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

      截取运行结果后面几行:

    线程1: 99993
    线程1: 99994
    线程1: 99995
    线程1: 99996
    线程1: 99997
    线程1: 99998
    线程1: 99999
    线程1: 100000

      这后面的日志打印的都是线程1在追加,把控制台日志往上拉就可以看到其他线程也一直在追加的。那么这10个线程如何在CAS的咒法护身下没有互相冲突的呢?看AtomicInteger的源码便知:

    package java.util.concurrent.atomic;
    import java.util.function.IntUnaryOperator;
    import java.util.function.IntBinaryOperator;
    import sun.misc.Unsafe;
    
    /**
     * An {@code int} value that may be updated atomically.  See the
     * {@link java.util.concurrent.atomic} package specification for
     * description of the properties of atomic variables. An
     * {@code AtomicInteger} is used in applications such as atomically
     * incremented counters, and cannot be used as a replacement for an
     * {@link java.lang.Integer}. However, this class does extend
     * {@code Number} to allow uniform access by tools and utilities that
     * deal with numerically-based classes.
     *
     * @since 1.5
     * @author Doug Lea
    */
    public class AtomicInteger extends Number implements java.io.Serializable {
        private static final long serialVersionUID = 6214790243416807050L;
    
        // setup to use Unsafe.compareAndSwapInt for updates
        private static final Unsafe unsafe = Unsafe.getUnsafe();
        private static final long valueOffset;
    
        static {
            try {
                valueOffset = unsafe.objectFieldOffset
                    (AtomicInteger.class.getDeclaredField("value"));
            } catch (Exception ex) { throw new Error(ex); }
        }
    
        private volatile int value;
    
        /**
         * Creates a new AtomicInteger with the given initial value.
         *
         * @param initialValue the initial value
         */
        public AtomicInteger(int initialValue) {
            value = initialValue;
        }
    
        /**
         * Creates a new AtomicInteger with initial value {@code 0}.
         */
        public AtomicInteger() {
        }
    
        /**
         * Gets the current value.
         *
         * @return the current value
         */
        public final int get() {
            return value;
        }
    
        /**
         * Sets to the given value.
         *
         * @param newValue the new value
         */
        public final void set(int newValue) {
            value = newValue;
        }
    
        /**
         * Eventually sets to the given value.
         *
         * @param newValue the new value
         * @since 1.6
         */
        public final void lazySet(int newValue) {
            unsafe.putOrderedInt(this, valueOffset, newValue);
        }
    
        /**
         * Atomically sets to the given value and returns the old value.
         *
         * @param newValue the new value
         * @return the previous value
         */
        public final int getAndSet(int newValue) {
            return unsafe.getAndSetInt(this, valueOffset, newValue);
        }
    
        /**
         * Atomically sets the value to the given updated value
         * if the current value {@code ==} the expected value.
         *
         * @param expect the expected value
         * @param update the new value
         * @return {@code true} if successful. False return indicates that
         * the actual value was not equal to the expected value.
         */
        public final boolean compareAndSet(int expect, int update) {
            return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
        }
    
        /**
         * Atomically sets the value to the given updated value
         * if the current value {@code ==} the expected value.
         *
         * <p><a href="package-summary.html#weakCompareAndSet">May fail
         * spuriously and does not provide ordering guarantees</a>, so is
         * only rarely an appropriate alternative to {@code compareAndSet}.
         *
         * @param expect the expected value
         * @param update the new value
         * @return {@code true} if successful
         */
        public final boolean weakCompareAndSet(int expect, int update) {
            return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
        }
    
        /**
         * Atomically increments by one the current value.
         *
         * @return the previous value
         */
        public final int getAndIncrement() {
            return unsafe.getAndAddInt(this, valueOffset, 1);
        }
    
        /**
         * Atomically decrements by one the current value.
         *
         * @return the previous value
         */
        public final int getAndDecrement() {
            return unsafe.getAndAddInt(this, valueOffset, -1);
        }
    
        /**
         * Atomically adds the given value to the current value.
         *
         * @param delta the value to add
         * @return the previous value
         */
        public final int getAndAdd(int delta) {
            return unsafe.getAndAddInt(this, valueOffset, delta);
        }
    
        /**
         * Atomically increments by one the current value.
         *
         * @return the updated value
         */
        public final int incrementAndGet() {
            return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
        }
    
        /**
         * Atomically decrements by one the current value.
         *
         * @return the updated value
         */
        public final int decrementAndGet() {
            return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
        }
    
        /**
         * Atomically adds the given value to the current value.
         *
         * @param delta the value to add
         * @return the updated value
         */
        public final int addAndGet(int delta) {
            return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
        }
    
        /**
         * Atomically updates the current value with the results of
         * applying the given function, returning the previous value. The
         * function should be side-effect-free, since it may be re-applied
         * when attempted updates fail due to contention among threads.
         *
         * @param updateFunction a side-effect-free function
         * @return the previous value
         * @since 1.8
         */
        public final int getAndUpdate(IntUnaryOperator updateFunction) {
            int prev, next;
            do {
                prev = get();
                next = updateFunction.applyAsInt(prev);
            } while (!compareAndSet(prev, next));
            return prev;
        }
    
        /**
         * Atomically updates the current value with the results of
         * applying the given function, returning the updated value. The
         * function should be side-effect-free, since it may be re-applied
         * when attempted updates fail due to contention among threads.
         *
         * @param updateFunction a side-effect-free function
         * @return the updated value
         * @since 1.8
         */
        public final int updateAndGet(IntUnaryOperator updateFunction) {
            int prev, next;
            do {
                prev = get();
                next = updateFunction.applyAsInt(prev);
            } while (!compareAndSet(prev, next));
            return next;
        }
    
        /**
         * Atomically updates the current value with the results of
         * applying the given function to the current and given values,
         * returning the previous value. The function should be
         * side-effect-free, since it may be re-applied when attempted
         * updates fail due to contention among threads.  The function
         * is applied with the current value as its first argument,
         * and the given update as the second argument.
         *
         * @param x the update value
         * @param accumulatorFunction a side-effect-free function of two arguments
         * @return the previous value
         * @since 1.8
         */
        public final int getAndAccumulate(int x,
                                          IntBinaryOperator accumulatorFunction) {
            int prev, next;
            do {
                prev = get();
                next = accumulatorFunction.applyAsInt(prev, x);
            } while (!compareAndSet(prev, next));
            return prev;
        }
    
        /**
         * Atomically updates the current value with the results of
         * applying the given function to the current and given values,
         * returning the updated value. The function should be
         * side-effect-free, since it may be re-applied when attempted
         * updates fail due to contention among threads.  The function
         * is applied with the current value as its first argument,
         * and the given update as the second argument.
         *
         * @param x the update value
         * @param accumulatorFunction a side-effect-free function of two arguments
         * @return the updated value
         * @since 1.8
         */
        public final int accumulateAndGet(int x,
                                          IntBinaryOperator accumulatorFunction) {
            int prev, next;
            do {
                prev = get();
                next = accumulatorFunction.applyAsInt(prev, x);
            } while (!compareAndSet(prev, next));
            return next;
        }
    
        /**
         * Returns the String representation of the current value.
         * @return the String representation of the current value
         */
        public String toString() {
            return Integer.toString(get());
        }
    
        /**
         * Returns the value of this {@code AtomicInteger} as an {@code int}.
         */
        public int intValue() {
            return get();
        }
    
        /**
         * Returns the value of this {@code AtomicInteger} as a {@code long}
         * after a widening primitive conversion.
         * @jls 5.1.2 Widening Primitive Conversions
         */
        public long longValue() {
            return (long)get();
        }
    
        /**
         * Returns the value of this {@code AtomicInteger} as a {@code float}
         * after a widening primitive conversion.
         * @jls 5.1.2 Widening Primitive Conversions
         */
        public float floatValue() {
            return (float)get();
        }
    
        /**
         * Returns the value of this {@code AtomicInteger} as a {@code double}
         * after a widening primitive conversion.
         * @jls 5.1.2 Widening Primitive Conversions
         */
        public double doubleValue() {
            return (double)get();
        }
    
    }

      标黄了3个成员变量和两个方法。方法比较简单,一个是返回自增前的值,一个是返回自增后的值。

      第一个成员变量是unsafe,这个必不可少,没了它CAS就是浮云。CAS算法用到了Unsafe对象的compareAndSwapInt方法,而它是一个本地方法,所以实现源码到此为止,没法再跟进去了。其实CAS算法的精华也就在于此,所以很遗憾。但至少我们知道Unsafe总共有3个CAS方法:

        public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
    
        public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
    
        public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

      第二个成员变量是valueOffset,它是共享变量value在AtomicInteger对象上的内存偏移量。它作为compareAndSwapInt的第二个参数,用于修改共享变量value的值。

      最后就是value了,上面已经介绍了,它就是例子中的count,是共享变量,也就是多线程并发中被追杀的共享资源。它使用volatile修饰,解决了可见性和有序性问题,再由unsafe的CAS保证了原子性,3大问题都解决了,多线程并发问题也就解决了。

      回过头再看那标黄的两个方法,实现都是unsafe的getAndAddInt,点进去瞧瞧,发现它不是本地方法:

        public final int getAndAddInt(Object var1, long var2, int var4) {
            int var5;
            do {
                var5 = this.getIntVolatile(var1, var2);
            } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
    
            return var5;
        }

      代码看起来是不是有点眼熟?没错,还是自旋锁的套路,只不过这里用到的是Unsafe的CAS算法,而我们的自旋锁用到的是多套了一层马甲的AtmoicXXX的CAS算法,所以说到底,我们用的还是Unsafe的CAS。通过循环,先获取当前值var5(怎么获取当前值的?getIntVolatile就不用看了,还是本地方法),再计算更新值var5+var4,然后通过compareAndSwapInt方法设置value变量。如果compareAndSwapInt方法返回失败,表示value变量的值被别的线程更改了,所以需要循环获取value变量的最新值,再次通过compareAndSwapInt方法设置value变量,直至设置成功,跳出循环,返回更新前的值。

      从上面看到,CAS底层实现依赖于Unsafe包,我们只要明白CAS的原理即可:预期值与当前值一致,那么执行更新,否则死循环尝试更新,直到成功。

      

  • 相关阅读:
    Linux查看物理CPU个数、核数、逻辑CPU个数
    shell脚本中格式化日期
    MySQL中常用字符串函数
    Xtrabackup 使用方法
    LinuxShell算术运算
    mysql高可用方案MHA介绍
    CentOS安装scp命令
    源码编译安装MySQL
    mysql编译参数详解(./configure)
    SparkStreaming 结合Kafka 时丢数据
  • 原文地址:https://www.cnblogs.com/wuxun1997/p/10974472.html
Copyright © 2011-2022 走看看