一、为什么值和预期不一样?
我们先来看下下面的这段 Java 程序,开启十个线程,每个线程进行 number++ 操作 1000 次,最终输出的值大小应该为 10000:
public void addNumber(){
number++;
}
for (int i = 1; i <=10 ; i++) {
new Thread(()->{
for (int j = 1; j <=1000; j++) {
data.addNumber();
}
},String.valueOf(i)).start();
}
当我们打印出最终的 number 的值的时候发现,每一次的值都小于预期的 10000 。这是因为 number++ 并非是一步操作,当执行它时会分为三条指令:① 获取到原始值;② 对原始值进行加一操作得到新值;③ 将新值写回内存。在并发较高的情况下,当两个线程同时获取到旧的值之后,就会产生写入的值相同的状况,造成总和总比预期值小的后果。
为了解决这个问题,我们可以给 addNumber()
方法加上 synchronized
修饰解决。synchronized
属于悲观锁,一次只允许一个线程进行 number++
的操作,虽然这样能够解决并发问题,但是在此处的效率并不高。由此我们可以使用一种乐观锁,乐观锁的含义是假设没有发生冲突,那么就正好可以进行某项操作,如果要是发生冲突了,那就重试直到成功,其最常见的就是CAS
。对 addNumber()
方法进行以下修改便可以避免并发值重复的问题。
AtomicInteger atomicInteger =new AtomicInteger();
public void addNumber(){
atomicInteger.getAndIncrement();
}
这里边使用到了java.util.concurrent.atomic
包下的AtomicInteger
来解决原子性问题。
二、CAS是什么?
在上面我们使用到了 Java 的java.util.concurrent.atomic
包,这个包便借用了 CAS 来实现了区别于synchronized
同步锁的一种乐观锁。
那么 CAS 是什么呢?CAS其实就是Compare And Swap 的简写,它是一条 CPU 并发原语。原语的执行必须是连续的,在执行过程中不允许中断,也就是说CAS是一条原子指令,不会造成所谓的数据不一致的问题。我们跟踪atomicInteger.getAndIncrement();
这条语句可以得到以下的内容:
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
由此可以发现它的实现借助了一个叫做Unsafe
的类。Unsafe
类是 Java 中用于直接操作内存数据的一个类(类似于C语言中的指针操作),其中包含很多的本地方法(native)。
以getAndAddInt(Object var1, long var2, int var4)
方法为例,展示其实如何实现的保证原子性。方法的实现如下:
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
该方法中包含三个参数,分别代表的意思是:
① 当前对象;
② 该变量值在内存中的偏移地址;
③ 需要增加的值大小。
在这段代码中,有一个叫做compareAndSwapInt
的方法,public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
,该方法共包含四个参数,返回值为布尔型,使用当前对象的当前值与 var5 进行比较,如果相同则更新值返回 true,失败返回 false,参数含义如下。
Object var1:代表当前对象;
long var2:代表内存偏移量,相当于对象值的引用地址;
int var4:代表期望值,使用期望值和当前对象中的值进行比较;
int var5:代表要交换的值。
该方法中使用了自旋锁以保证其原子性。假设主内存值为 v 等于10,此时有 T1、T2两个线程进入到该方法,根据 Java 内存模型(JMM)我们可以知道,线程 T1 和线程 T2 都会将主内存的值10拷贝到自己的工作内存。
1、当线程 T1 和线程 T2 都通过getIntVolatile(var1, var2)
赋值给了变量 var5 之后,线程 T1 被挂起;
2、线程 T2 调用方法compareAndSwapInt
,因为当中的期望值 var5 和当前主内存值相同,比较成功,更新当前内存的值为 11,返回 true,退出循环;
3、线程 T1 被唤醒,在执行compareAndSwapInt
方法的时候,由于当前内存的值以及为11,和 工作内存 var5 的值10不同了,所以比较不成功,返回 false,继续执行循环;
4、线程 T1 重新从主内存获取当前的最新值11赋值给 var5;
5、线程 T1 继续进行比较,若此时没有其他线程对主内存的进行修改,比较更新成功 ,退出循环;否则继续执行步骤4。
流程图如下所示:
虽然CAS没有加锁保证了一致性,并发性有所提高 ,但是也产生了一系列的问题,比如循环时间长开销大、只能保证一个共享变量的原子操作、会产生ABA问题。
三、ABA 问题是什么?
使用 CAS 会产生 ABA 问题,这是因为 CAS 算法是在某一时刻取出内存值然后在当前的时刻进行比较,中间存在一个时间差,在这个时间差里就可能会产生 ABA 问题。
ABA 问题的过程是当有两个线程 T1 和 T2 从内存中获取到值A,线程 T2 通过某些操作把内存 值修改为B,然后又经过某些操作将值修改为回值A,T2退出。线程 T1 进行操作的时候 ,使用预期值同内存中的值比较,此时均为A,修改成功退出。但是此时的A以及不是原先的A了,这就是 ABA 问题,如下图。
四、如何解决 ABA 问题?
解决这个问题可以使用添加版本号的方式。我们可以使用 Java 中的提供的类AtomicStampedReference
进行操作,其中的compareAndSet
方法如下:
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
这个方法包含了四个参数,expectedReference
代表的是期望被修改的值,newReference
代表的是新的值,expectedStamp
代表期望被修改的版本号,newStamp
代表新的版本号。只有当预期值和均当前内存值相同时才会修改成功。ABA 问题的完整示例以及解决代码如下:
public class ABADemo {
private static AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference<>(100, 1);
public static void main(String[] args) {
new Thread(() -> {
int stamp = stampedReference.getStamp();
System.out.println("当前线程名称:" + Thread.currentThread().getName() + ",版本号为" + stamp + ",值是" + stampedReference.getReference());
//暂停1秒钟t1线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
stampedReference.compareAndSet(100, 101, stampedReference.getStamp(), stampedReference.getStamp() + 1);
System.out.println("当前线程名称:" + Thread.currentThread().getName() + ",版本号为" + stampedReference.getStamp() + ",值是" + stampedReference.getReference());
stampedReference.compareAndSet(101, 100, stampedReference.getStamp(), stampedReference.getStamp() + 1);
System.out.println("当前线程名称:" + Thread.currentThread().getName() + ",版本号为" + stampedReference.getStamp() + ",值是" + stampedReference.getReference());
System.out.println("线程t1已完成1次ABA操作~~~~~");
}, "t1").start();
new Thread(() -> {
int stamp = stampedReference.getStamp();
System.out.println("当前线程名称:" + Thread.currentThread().getName() + ",版本号为" + stamp + ",值是" + stampedReference.getReference());
//线程2暂停3秒,保证线程1完成1次ABA
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean result = stampedReference.compareAndSet(100, 6666, stamp, stamp + 1);
System.out.println("当前线程名称:" + Thread.currentThread().getName() + ",修改成功否:" + result + ",最新版本号" +
stampedReference.getStamp() + ",最新的值:" + stampedReference.getReference());
}, "t2").start();
}
}