zoukankan      html  css  js  c++  java
  • Java-CAS 与原子类

    CAS(Compare and Swap),即比较并替换,实现并发算法时常用到的一种技术。

    CAS 的思想很简单:三个参数,一个当前内存值 V、旧的预期值 A、即将更新的值 B,当且仅当预期值 A 和内存值 V 相同时,将内存值修改为 B 并返回 true,否则什么都不做,并返回 false。

    和 CAS 相关的一个概念是原子操作。原子操作是不可被中断的一个或一系列操作。而 CAS 则是 Java 中保证原子操作的一种方式。

    从 Java1.5 开始,JDK 的并发包里就提供了一些类来支持原子操作,都是以 Atomic 开头。

    volatile 不能保证类似 i++ 这样操作的原子性,CAS 能够保证。

    一、原子类使用

    以 AtomicInteger 为例,常用 API:

    • public final int get():获取当前的值
    • public final int getAndSet(int newValue):获取当前的值,并设置新的值
    • public final int getAndIncrement():获取当前的值,并自增
    • public final int getAndDecrement():获取当前的值,并自减
    • public final int getAndAdd(int delta):获取当前的值,并加上预期的值

    相比 Integer 的优势,多线程中让变量自增:

    private volatile int count = 0;
    // 若要线程安全执行执行 count++,需要加锁
    public synchronized void increment() {
        count++;
    }
    public int getCount() {
        return count;
    }

    使用 AtomicInteger 后:

    private AtomicInteger count = new AtomicInteger();
    public void increment() {
        count.incrementAndGet();
    }
    // 使用 AtomicInteger 后,不需要加锁,也可以实现线程安全
    public int getCount() {
        return count.get();
    }

    二、CAS 问题

    CAS 方式为乐观锁,synchronized 为悲观锁。因此使用 CAS 解决并发问题通常情况下性能更优。

    但使用 CAS 方式也会有几个问题:

    1.循环时间长开销大

    自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。

    2.只能保证一个共享变量的原子操作

    对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就需要用锁。

    从 Java 1.5 开始,JDK 提供了 AtomicReference 类来保证引用对象之间的原子性。

    3.ABA 问题

    如果一个值原来是 A,变成了 B,又变成了 A,那么使用 CAS 进行检查时则会发现它的值没有发生变化,但是实际上却变化了。

    解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么 A->B->A 就会变成 1A->2B->3A。

    从 Java 1.5 开始,JDK 提供了 AtomicStampedReference、AtomicMarkableReference 类来解决 ABA 问题。

    三、CAS 实现

    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }
    
    public final int getAndAddInt(Object o, long offset, int delta) {
        int v;
        do {
            v = getIntVolatile(o, offset);
        } while (!compareAndSwapInt(o, offset, v, v + delta));
        return v;
    }
    
    public native int getIntVolatile(Object o, long offset);
    
    public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);

    最后调用的是 Unsafe 类的方法,主要是 compareAndSwapInt 方法,即 CAS。

    查看 Unsafe 的实现:https://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/fea2c7f50ce8/src/share/vm/prims/unsafe.cpp

    UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
      UnsafeWrapper("Unsafe_CompareAndSwapInt");
      oop p = JNIHandles::resolve(obj);
      jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
      return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
    UNSAFE_END

    其中核心方法为 Atomic::cmpxchg(x, addr, e),其中参数 x 是即将更新的值,参数 e 是原内存的值,参数 addr 为内存地址。

    查看 Atomic 的实现:https://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/fea2c7f50ce8/src/share/vm/runtime/atomic.cpp

    #include "runtime/atomic.inline.hpp"
    
    jbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte* dest, jbyte compare_value) {
      assert(sizeof(jbyte) == 1, "assumption.");
      uintptr_t dest_addr = (uintptr_t)dest;
      uintptr_t offset = dest_addr % sizeof(jint);
      volatile jint* dest_int = (volatile jint*)(dest_addr - offset);
      jint cur = *dest_int;
      jbyte* cur_as_bytes = (jbyte*)(&cur);
      jint new_val = cur;
      jbyte* new_val_as_bytes = (jbyte*)(&new_val);
      new_val_as_bytes[offset] = exchange_value;
      while (cur_as_bytes[offset] == compare_value) {
        jint res = cmpxchg(new_val, dest_int, cur);
        if (res == cur) break;
        cur = res;
        new_val = cur;
        new_val_as_bytes[offset] = exchange_value;
      }
      return cur_as_bytes[offset];
    }

    查看 atomic_linux_x86.inline.hpp,不同系统、不同 CPU 有不同的实现:https://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/fea2c7f50ce8/src/share/vm/runtime/atomic.inline.hpp

    Linux 的 x86 实现:https://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/fea2c7f50ce8/src/os_cpu/linux_x86/vm/atomic_linux_x86.inline.hpp

    inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
      int mp = os::is_MP();
      __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                        : "=a" (exchange_value)
                        : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                        : "cc", "memory");
      return exchange_value;
    }

    Windows 的 x86 实现:https://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/fea2c7f50ce8/src/os_cpu/windows_x86/vm/atomic_windows_x86.inline.hpp

    inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
      // alternative for InterlockedCompareExchange
      int mp = os::is_MP();
      __asm {
        mov edx, dest
        mov ecx, exchange_value
        mov eax, compare_value
        LOCK_IF_MP(mp)
        cmpxchg dword ptr [edx], ecx
      }
    }
    
    // Adding a lock prefix to an instruction on MP machine
    // VC++ doesn't like the lock prefix to be on a single line
    // so we can't insert a label after the lock prefix.
    // By emitting a lock prefix, we can define a label after it.
    #define LOCK_IF_MP(mp) __asm cmp mp, 0  
                           __asm je L0      
                           __asm _emit 0xF0 
                           __asm L0:

    https://benjaminwhx.com/2018/05/03/%E3%80%90%E7%BB%86%E8%B0%88Java%E5%B9%B6%E5%8F%91%E3%80%91%E8%B0%88%E8%B0%88CAS/

    https://www.cnblogs.com/noKing/p/9094983.html

  • 相关阅读:
    _ 下划线 Underscores __init__
    Page not found (404) 不被Django的exception中间件捕捉 中间件
    从装修儿童房时的门锁说起
    欧拉定理 费马小定理的推广
    线性运算 非线性运算
    Optimistic concurrency control 死锁 悲观锁 乐观锁 自旋锁
    Avoiding Full Table Scans
    批量的单向的ssh 认证
    批量的单向的ssh 认证
    Corrupted MAC on input at /usr/local/perl/lib/site_perl/5.22.1/x86_64-linux/Net/SSH/Perl/Packet.pm l
  • 原文地址:https://www.cnblogs.com/jhxxb/p/11533938.html
Copyright © 2011-2022 走看看