之前浅析过自旋锁(自旋锁浅析),我们知道它的实现原理就是CAS算法。CAS(Compare and Swap)即比较并交换,作为著名的无锁算法,它也是乐观锁的实现方式之一。JDK并发包里也有许多代码中有CAS的身影闪烁其中,鉴于CAS算法在并发领域的重要性和普适性,还是再结合AtomicInteger这个原子类来浅析一下吧。浅析之前,先借用之前自旋锁测试代码直接看AtomicInteger的自增测试结果,可以拿它跟自旋锁做个比较:
@Test public void testAtomicInteger() { // 10个线程使用AtomicInteger自增 AtomicInteger ai = new AtomicInteger(); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { // 自增1万次 for (int j = 0; j < 10000; j++) { count = ai.incrementAndGet(); } // 一个线程执行完了就减1,10个线程执行完了就变成0,执行主线程 latch.countDown(); } }).start(); } // 主线程等待 try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } TestCase.assertEquals(count, 100000); }
运行结果:
count值:100000, 耗时:10毫秒.
是不是比自旋锁要简单?必须的,因为AtomicInteger本身已经实现了CAS算法,人家天然就用于并发自增的。之前也说到过,CAS的原理很简单,它包含三个值:当前内存值(V)、预期原来的值(A)以及期待更新的值(B)。如果内存位置V的值与预期原值A相匹配,那么处理器会自动将该位置值更新为新值B,返回true。否则处理器不做任何操作,返回false。举上面的例子,我们有10个线程分别去做自增操作,很明显count是共享变量,它将被这10个线程追杀加1。假如线程1将count追加到100时,正准备更新到101这一刻,线程2插一脚抢先一步把count追加到101,那么线程1该怎么办呢?它将获取最新的count值再去自增。具体怎么实现的,我们接下来看。为了直观点,我们可以换种方式实现上面的例子:
package com.wulinfeng.test.testpilling; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; public class AtomicIntegerTest { // 共享变量 private static int count; // 10个线程就先初始化10 private static CountDownLatch latch = new CountDownLatch(10); public static void main(String[] args) { // 10个线程使用AtomicInteger自增 AtomicInteger ai = new AtomicInteger(); for (int i = 0; i < 10; i++) { final int threadNum = i; new Thread(new Runnable() { @Override public void run() { // 自增1万次 for (int j = 0; j < 10000; j++) { count = ai.incrementAndGet(); System.out.println("线程" + threadNum + ": " + count); } // 一个线程执行完了就减1,10个线程执行完了就变成0,执行主线程 latch.countDown(); } }).start(); } // 主线程等待 try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } }
截取运行结果后面几行:
线程1: 99993 线程1: 99994 线程1: 99995 线程1: 99996 线程1: 99997 线程1: 99998 线程1: 99999 线程1: 100000
这后面的日志打印的都是线程1在追加,把控制台日志往上拉就可以看到其他线程也一直在追加的。那么这10个线程如何在CAS的咒法护身下没有互相冲突的呢?看AtomicInteger的源码便知:
package java.util.concurrent.atomic; import java.util.function.IntUnaryOperator; import java.util.function.IntBinaryOperator; import sun.misc.Unsafe; /** * An {@code int} value that may be updated atomically. See the * {@link java.util.concurrent.atomic} package specification for * description of the properties of atomic variables. An * {@code AtomicInteger} is used in applications such as atomically * incremented counters, and cannot be used as a replacement for an * {@link java.lang.Integer}. However, this class does extend * {@code Number} to allow uniform access by tools and utilities that * deal with numerically-based classes. * * @since 1.5 * @author Doug Lea */ public class AtomicInteger extends Number implements java.io.Serializable { private static final long serialVersionUID = 6214790243416807050L; // setup to use Unsafe.compareAndSwapInt for updates private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value; /** * Creates a new AtomicInteger with the given initial value. * * @param initialValue the initial value */ public AtomicInteger(int initialValue) { value = initialValue; } /** * Creates a new AtomicInteger with initial value {@code 0}. */ public AtomicInteger() { } /** * Gets the current value. * * @return the current value */ public final int get() { return value; } /** * Sets to the given value. * * @param newValue the new value */ public final void set(int newValue) { value = newValue; } /** * Eventually sets to the given value. * * @param newValue the new value * @since 1.6 */ public final void lazySet(int newValue) { unsafe.putOrderedInt(this, valueOffset, newValue); } /** * Atomically sets to the given value and returns the old value. * * @param newValue the new value * @return the previous value */ public final int getAndSet(int newValue) { return unsafe.getAndSetInt(this, valueOffset, newValue); } /** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that * the actual value was not equal to the expected value. */ public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } /** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. * * <p><a href="package-summary.html#weakCompareAndSet">May fail * spuriously and does not provide ordering guarantees</a>, so is * only rarely an appropriate alternative to {@code compareAndSet}. * * @param expect the expected value * @param update the new value * @return {@code true} if successful */ public final boolean weakCompareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } /** * Atomically increments by one the current value. * * @return the previous value */ public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); } /** * Atomically decrements by one the current value. * * @return the previous value */ public final int getAndDecrement() { return unsafe.getAndAddInt(this, valueOffset, -1); } /** * Atomically adds the given value to the current value. * * @param delta the value to add * @return the previous value */ public final int getAndAdd(int delta) { return unsafe.getAndAddInt(this, valueOffset, delta); } /** * Atomically increments by one the current value. * * @return the updated value */ public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; } /** * Atomically decrements by one the current value. * * @return the updated value */ public final int decrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, -1) - 1; } /** * Atomically adds the given value to the current value. * * @param delta the value to add * @return the updated value */ public final int addAndGet(int delta) { return unsafe.getAndAddInt(this, valueOffset, delta) + delta; } /** * Atomically updates the current value with the results of * applying the given function, returning the previous value. The * function should be side-effect-free, since it may be re-applied * when attempted updates fail due to contention among threads. * * @param updateFunction a side-effect-free function * @return the previous value * @since 1.8 */ public final int getAndUpdate(IntUnaryOperator updateFunction) { int prev, next; do { prev = get(); next = updateFunction.applyAsInt(prev); } while (!compareAndSet(prev, next)); return prev; } /** * Atomically updates the current value with the results of * applying the given function, returning the updated value. The * function should be side-effect-free, since it may be re-applied * when attempted updates fail due to contention among threads. * * @param updateFunction a side-effect-free function * @return the updated value * @since 1.8 */ public final int updateAndGet(IntUnaryOperator updateFunction) { int prev, next; do { prev = get(); next = updateFunction.applyAsInt(prev); } while (!compareAndSet(prev, next)); return next; } /** * Atomically updates the current value with the results of * applying the given function to the current and given values, * returning the previous value. The function should be * side-effect-free, since it may be re-applied when attempted * updates fail due to contention among threads. The function * is applied with the current value as its first argument, * and the given update as the second argument. * * @param x the update value * @param accumulatorFunction a side-effect-free function of two arguments * @return the previous value * @since 1.8 */ public final int getAndAccumulate(int x, IntBinaryOperator accumulatorFunction) { int prev, next; do { prev = get(); next = accumulatorFunction.applyAsInt(prev, x); } while (!compareAndSet(prev, next)); return prev; } /** * Atomically updates the current value with the results of * applying the given function to the current and given values, * returning the updated value. The function should be * side-effect-free, since it may be re-applied when attempted * updates fail due to contention among threads. The function * is applied with the current value as its first argument, * and the given update as the second argument. * * @param x the update value * @param accumulatorFunction a side-effect-free function of two arguments * @return the updated value * @since 1.8 */ public final int accumulateAndGet(int x, IntBinaryOperator accumulatorFunction) { int prev, next; do { prev = get(); next = accumulatorFunction.applyAsInt(prev, x); } while (!compareAndSet(prev, next)); return next; } /** * Returns the String representation of the current value. * @return the String representation of the current value */ public String toString() { return Integer.toString(get()); } /** * Returns the value of this {@code AtomicInteger} as an {@code int}. */ public int intValue() { return get(); } /** * Returns the value of this {@code AtomicInteger} as a {@code long} * after a widening primitive conversion. * @jls 5.1.2 Widening Primitive Conversions */ public long longValue() { return (long)get(); } /** * Returns the value of this {@code AtomicInteger} as a {@code float} * after a widening primitive conversion. * @jls 5.1.2 Widening Primitive Conversions */ public float floatValue() { return (float)get(); } /** * Returns the value of this {@code AtomicInteger} as a {@code double} * after a widening primitive conversion. * @jls 5.1.2 Widening Primitive Conversions */ public double doubleValue() { return (double)get(); } }
标黄了3个成员变量和两个方法。方法比较简单,一个是返回自增前的值,一个是返回自增后的值。
第一个成员变量是unsafe,这个必不可少,没了它CAS就是浮云。CAS算法用到了Unsafe对象的compareAndSwapInt方法,而它是一个本地方法,所以实现源码到此为止,没法再跟进去了。其实CAS算法的精华也就在于此,所以很遗憾。但至少我们知道Unsafe总共有3个CAS方法:
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5); public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
第二个成员变量是valueOffset,它是共享变量value在AtomicInteger对象上的内存偏移量。它作为compareAndSwapInt的第二个参数,用于修改共享变量value的值。
最后就是value了,上面已经介绍了,它就是例子中的count,是共享变量,也就是多线程并发中被追杀的共享资源。它使用volatile修饰,解决了可见性和有序性问题,再由unsafe的CAS保证了原子性,3大问题都解决了,多线程并发问题也就解决了。
回过头再看那标黄的两个方法,实现都是unsafe的getAndAddInt,点进去瞧瞧,发现它不是本地方法:
public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
代码看起来是不是有点眼熟?没错,还是自旋锁的套路,只不过这里用到的是Unsafe的CAS算法,而我们的自旋锁用到的是多套了一层马甲的AtmoicXXX的CAS算法,所以说到底,我们用的还是Unsafe的CAS。通过循环,先获取当前值var5(怎么获取当前值的?getIntVolatile就不用看了,还是本地方法),再计算更新值var5+var4,然后通过compareAndSwapInt方法设置value变量。如果compareAndSwapInt方法返回失败,表示value变量的值被别的线程更改了,所以需要循环获取value变量的最新值,再次通过compareAndSwapInt方法设置value变量,直至设置成功,跳出循环,返回更新前的值。
从上面看到,CAS底层实现依赖于Unsafe包,我们只要明白CAS的原理即可:预期值与当前值一致,那么执行更新,否则死循环尝试更新,直到成功。