zoukankan      html  css  js  c++  java
  • Jdk1.6 JUC源码解析(1)-atomic-AtomicXXX

    转自:http://brokendreams.iteye.com/blog/2250109

    功能简介:
    • 原子量和普通变量相比,主要体现在读写的线程安全上。对原子量的是原子的(比如多线程下的共享变量i++就不是原子的),由CAS操作保证原子性。对原子量的读可以读到最新值,由volatile关键字来保证可见性
    • 原子量多用于数据统计(如接口调用次数)、一些序列生成(多线程环境下)以及一些同步数据结构中。
    源码分析:
    • 首先,原子量的一些较底层的操作都是来自sun.misc.Unsafe类,所以原子量内部有一个Unsafe的静态引用。
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    
           在openJdk代码中可以找到这个类,目录openJdk的jdk/share/classes/sun/misc/。
           这个类里面大多数方法都是native的,方法实现可以在openJdk的hotspot/share/vm/prims/unsafe.cpp里面找到。 
    • 接下来,先看下AtomicInteger的源码。
           在AtomicInteger源码中,由内部的一个int域来保存值:
    private volatile int value;
    
           注意到这个int域由volatile关键字修饰,可以保证可见性。
           细节:volatile怎么保证可见性呢?对于被 volatile修饰的域来说,对域进行的写入操作,在指令层面会在必要的时候(多核CPU)加入内存屏障(如:lock addl $0x0),这个内存屏障的作用是令本次写操作刷回主存,同时使其他CPU的cacheline中相应数据失效。所以当其他CPU需要访问相应数据的时 候,会到主存中访问,从而保证了多线程环境下相应域的可见性。
           接下来看一下CAS操作,AtomicInteger中的CAS操作体现在方法compareAndSet。它的实现在unsafe.cpp里面:
    /*
     *      Implementation of class sun.misc.Unsafe
     */
    ...
    UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
      UnsafeWrapper("Unsafe_CompareAndSwapInt");
      oop p = JNIHandles::resolve(obj);
      jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
      return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
    UNSAFE_END
    
            这里调用了Atomic的cmpxchg方法,继续找一下。这个方法定义在hotspot/share/vm/runtime/atomic.hpp 中,实现在hotspot/share/vm/runtime/atomic.cpp中,最终实现取决于底层OS,比如linux x86,实现内联在hotspot部分代码os_cpu/linux_x86/vm/atomic_linux_x86.inline.hpp:
    // Adding a lock prefix to an instruction on MP machine
    #define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "
    ...
    inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
      int mp = os::is_MP();
      __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                        : "=a" (exchange_value)
                        : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                        : "cc", "memory");
      return exchange_value;
    }
    
           从上面的代码中可以看到,如果是CPU是多核(multi processors)的话,会添加一个lock;前缀,这个lock;前缀也是内存屏障,它的作用是在执行后面指令的过程中锁总线(或者是锁 cacheline),保证一致性。后面的指令cmpxchgl就是x86的比较并交换指令了。
           接下来有一个和compareAndSet类似的方法,weakCompareAndSet。
           从注释看,这个方法会发生fail spuriously(伪失败),而且不保证(指令)顺序,只能在一些特性场景(一些计数和统计)下替换compareAndSet。但从方法实现上看和compareAndSet没什么区别:
        /**
         * Atomically sets the value to the given updated value
         * if the current value {@code ==} the expected value.
         *
         * <p>May <a href="package-summary.html#Spurious">fail spuriously</a>
         * and does not provide ordering guarantees, so is only rarely an
         * appropriate alternative to {@code compareAndSet}.
         *
         * @param expect the expected value
         * @param update the new value
         * @return true if successful.
         */
        public final boolean weakCompareAndSet(int expect, int update) {
            return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
        }
    
           但还是应该按照API的说明来使用这两个方法,以防未来方法内部(实现或者底层内部机制)发生变化。
           其余的大多数方法都是基于compareAndSet方法来实现的,来看其中一个,incrementAndGet方法:
        /**
         * Atomically increments by one the current value.
         *
         * @return the updated value
         */
        public final int incrementAndGet() {
            for (;;) {
                int current = get();
                int next = current + 1;
                if (compareAndSet(current, next))
                    return next;
            }
        }
     
           这个方法也体现了CAS一般是使用风格-CAS Loop,不断重试,直到成功。
           当然,这也不是绝对的,JVM底层完全可以用更好的方式也替换这些方法,比如使用内联的lock;xadd就会比cmpxchg指令更好一些。
           最后,AtomicInteger还有一个方法,lazySet:
        /**
         * Eventually sets to the given value.
         *
         * @param newValue the new value
         * @since 1.6
         */
        public final void lazySet(int newValue) {
            unsafe.putOrderedInt(this, valueOffset, newValue);
        }
           看下unsafe.cpp中putOrderedInt方法的实现:
        {CC"putOrderedInt",      CC"("OBJ"JI)V",             FN_PTR(Unsafe_SetOrderedInt)},
        ...
    // The non-intrinsified versions of setOrdered just use setVolatile
    UNSAFE_ENTRY(void, Unsafe_SetOrderedInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint x))
      UnsafeWrapper("Unsafe_SetOrderedInt");
      SET_FIELD_VOLATILE(obj, offset, jint, x);
    UNSAFE_END
    
           注:上面有句注释,说明这只是一个非内联的setOrdered方法的实现,使用了setVolatile(和setVolatile一样的效果)。
           其中,SET_FIELD_VOLATILE的定义如下:
    #define SET_FIELD_VOLATILE(obj, offset, type_name, x) 
      oop p = JNIHandles::resolve(obj); 
      OrderAccess::release_store_fence((volatile type_name*)index_oop_from_field_offset_long(p, offset), x);
    
           在hotspot/src/os_cpu/linux_x86/vm/orderAccess_linux_x86.inline.hpp找到了这个方法 的内联实现。(hotspot/src/share/vm/runtime/orderAccess.hpp这个头文件里面的注释值得留意一下):
    inline void     OrderAccess::release_store_fence(volatile jint*   p, jint   v) {
      __asm__ volatile (  "xchgl (%2),%0"
                        : "=r" (v)
                        : "0" (v), "r" (p)
                        : "memory");
    }
    
           可见,这里是通过xchgl这个指令来实现的setOrdered。
           但是,上面只是非内联的实现,我们看下内联的实现是什么样的。
           在hotspot/src/share/vm/classfile/vmSymbols.hpp中有如下代码:
      do_intrinsic(_putOrderedInt, sun_misc_Unsafe,putOrderedInt_name, putOrderedInt_signature,F_RN)
    
            然后找到hotspot/src/share/vm/opto/library_call.cpp中找到相应实现:
      case vmIntrinsics::_putOrderedInt:
        return inline_unsafe_ordered_store(T_INT);
      ...
    bool LibraryCallKit::inline_unsafe_ordered_store(BasicType type) {
      // This is another variant of inline_unsafe_access, differing in
      // that it always issues store-store ("release") barrier and ensures
      // store-atomicity (which only matters for "long").
      if (callee()->is_static())  return false;  // caller must have the capability!
      ...//省略不重要的部分
      insert_mem_bar(Op_MemBarRelease);
      insert_mem_bar(Op_MemBarCPUOrder);
      // Ensure that the store is atomic for longs:
      bool require_atomic_access = true;
      Node* store;
      if (type == T_OBJECT) // reference stores need a store barrier.
        store = store_oop_to_unknown(control(), base, adr, adr_type, val, type);
      else {
        store = store_to_memory(control(), adr, val, type, adr_type, require_atomic_access);
      }
      insert_mem_bar(Op_MemBarCPUOrder);
      return true;
    }
    
           我们看到这个方法里在保存动作的前后,有3个地方插入了内存屏障。
           再看下hotspot/src/cpu/x86/vm/x86_64.ad:
    instruct membar_release() %{
      match(MemBarRelease);
      ins_cost(400);
      size(0);
      format %{ "MEMBAR-release ! (empty encoding)" %}
      ins_encode( );
      ins_pipe(empty);
    %}
    ...
    instruct membar_volatile(eFlagsReg cr) %{
      match(MemBarVolatile);
      effect(KILL cr);
      ins_cost(400);
      format %{ 
        $$template
        if (os::is_MP()) {
          $$emit$$"LOCK ADDL [ESP + #0], 0	! membar_volatile"
        } else {
          $$emit$$"MEMBAR-volatile ! (empty encoding)"
        }
      %}
      ins_encode %{
        __ membar(Assembler::StoreLoad);
      %}
      ins_pipe(pipe_slow);
    %}
    
           可见,除了membar_volatile中会添加LOCK ADDL这些指令,其他的貌似没什么卵用。 所以上面那3个insert_mem_bar也相当于没有加任何内存屏障。在这种情况下,lazySet就相当于对一个普通域的写操作喽。
    • 再看下AtomicBoolean的源码。
           AtomicBoolean内部是用一个int域来表示布尔状态,1表示true;0表示false:
        private volatile int value;
        /**
         * Creates a new {@code AtomicBoolean} with the given initial value.
         *
         * @param initialValue the initial value
         */
        public AtomicBoolean(boolean initialValue) {
            value = initialValue ? 1 : 0;
        }
    
            CAS、lazySet方法也都分别调用unsafe的compareAndSwapInt和putOrderedInt,上面分析过了。
    • 继续看下AtomicLong的源码。
           AtomicLong内部是用一个long域来保存值:
        /**
         * Records whether the underlying JVM supports lockless
         * compareAndSwap for longs. While the Unsafe.compareAndSwapLong
         * method works in either case, some constructions should be
         * handled at Java level to avoid locking user-visible locks.
         */
        static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
        /**
         * Returns whether underlying JVM supports lockless CompareAndSet
         * for longs. Called only once and cached in VM_SUPPORTS_LONG_CAS.
         */
        private static native boolean VMSupportsCS8();
        static {
          try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicLong.class.getDeclaredField("value"));
          } catch (Exception ex) { throw new Error(ex); }
        }
        private volatile long value;
    
           这里注意到,AtomicLong中还提供了一个包内可见的静态域VM_SUPPORTS_LONG_CAS来表示底层是否支持Long类型(8字节)的lockless CAS操作。
           AtomicLong内部整体结构和AtomicInteger类似,主要来看下内部使用的unsafe的方法有什么不同,首先CAS操作使用了unsafe的compareAndSwapLong方法:
        /**
         * Atomically sets the value to the given updated value
         * if the current value {@code ==} the expected value.
         *
         * @param expect the expected value
         * @param update the new value
         * @return true if successful. False return indicates that
         * the actual value was not equal to the expected value.
         */
        public final boolean compareAndSet(long expect, long update) {
            return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
        }
    
           在unsafe.cpp中找到实现:
        {CC"compareAndSwapLong", CC"("OBJ"J""J""J"")Z",      FN_PTR(Unsafe_CompareAndSwapLong)},
        
        ...
    UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapLong(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jlong e, jlong x))
      UnsafeWrapper("Unsafe_CompareAndSwapLong");
      Handle p (THREAD, JNIHandles::resolve(obj));
      jlong* addr = (jlong*)(index_oop_from_field_offset_long(p(), offset));
      if (VM_Version::supports_cx8())
        return (jlong)(Atomic::cmpxchg(x, addr, e)) == e;
      else {
        jboolean success = false;
        ObjectLocker ol(p, THREAD);
        if (*addr == e) { *addr = x; success = true; }
        return success;
      }
    UNSAFE_END
    
           从实现中可以看到,如果平台不支持8字节的CAS操作,就会加锁然后进行设置操作;如果支持,就会调用Atomic::cmpxchg方法,方法实现可以 参考具体平台内联代码hotspot/os_cpu/linux_x86/vm/atomic_linux_x86.inline.hpp:
    // Adding a lock prefix to an instruction on MP machine
    #define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "
    inline jlong    Atomic::cmpxchg    (jlong    exchange_value, volatile jlong*    dest, jlong    compare_value) {
      bool mp = os::is_MP();
      __asm__ __volatile__ (LOCK_IF_MP(%4) "cmpxchgq %1,(%3)"
                            : "=a" (exchange_value)
                            : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                            : "cc", "memory");
      return exchange_value;
    }
    
           其实就是(多核情况下带lock前缀的)cmpxchgq指令。
           然后看一下lazySet中使用到的unsafe的putOrderedLong方法。
        /**
         * Eventually sets to the given value.
         *
         * @param newValue the new value
         * @since 1.6
         */
        public final void lazySet(long newValue) {
            unsafe.putOrderedLong(this, valueOffset, newValue);
        }
     
           同样在unsafe.cpp中可以找到该方法的实现:
    Cpp代码  收藏代码
    1.     {CC"putOrderedLong",     CC"("OBJ"JJ)V",             FN_PTR(Unsafe_SetOrderedLong)},  
    2.     ...  
    3. UNSAFE_ENTRY(void, Unsafe_SetOrderedLong(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jlong x))  
    4.   UnsafeWrapper("Unsafe_SetOrderedLong");  
    5. #if defined(SPARC) || defined(X86)  
    6.   // Sparc and X86 have atomic jlong (8 bytes) instructions  
    7.   SET_FIELD_VOLATILE(obj, offset, jlong, x);  
    8. #else  
    9.   // Keep old code for platforms which may not have atomic long (8 bytes) instructions  
    10.   {  
    11.     if (VM_Version::supports_cx8()) {  
    12.       SET_FIELD_VOLATILE(obj, offset, jlong, x);  
    13.     }  
    14.     else {  
    15.       Handle p (THREAD, JNIHandles::resolve(obj));  
    16.       jlong* addr = (jlong*)(index_oop_from_field_offset_long(p(), offset));  
    17.       ObjectLocker ol(p, THREAD);  
    18.       *addr = x;  
    19.     }  
    20.   }  
    21. #endif  
    22. UNSAFE_END  
           从实现上看,如果平台是SPARC或者X86或者平台支持8字节CAS,就相当于执行了一个volatile write;否则,加锁写。
           当然还要看一下内联方法,在hotspot/src/share/vm/classfile/vmSymbols.hpp中有如下代码:
    Cpp代码  收藏代码
    1. do_intrinsic(_putOrderedLong,           sun_misc_Unsafe,        putOrderedLong_name, putOrderedLong_signature, F_RN)    
           然后找到hotspot/src/share/vm/opto/library_call.cpp中找到相应实现。
    Cpp代码  收藏代码
    1. case vmIntrinsics::_putOrderedLong:  
    2.   return inline_unsafe_ordered_store(T_LONG);  
    3. ...  
    4. ol LibraryCallKit::inline_unsafe_ordered_store(BasicType type) {  
    5. // This is another variant of inline_unsafe_access, differing in  
    6. // that it always issues store-store ("release") barrier and ensures  
    7. // store-atomicity (which only matters for "long").  
    8. if (callee()->is_static())  return false;  // caller must have the capability!  
    9. ...//忽略不重要部分  
    10. // Ensure that the store is atomic for longs:  
    11. bool require_atomic_access = true;  
    12. Node* store;  
    13. if (type == T_OBJECT) // reference stores need a store barrier.  
    14.   store = store_oop_to_unknown(control(), base, adr, adr_type, val, type);  
    15. else {  
    16.   store = store_to_memory(control(), adr, val, type, adr_type, require_atomic_access);  
    17. }  
    18. insert_mem_bar(Op_MemBarCPUOrder);  
    19. return true;  
           从代码的注释上可以发现,require_atomic_access设置为true,为了保证long写操作的原子性。继续跟代码,找到hotspot/src/share/vm/opto/graphKit.cpp:
    Cpp代码  收藏代码
    1. Node* GraphKit::store_to_memory(Node* ctl, Node* adr, Node *val, BasicType bt,  
    2.                                 int adr_idx,  
    3.                                 bool require_atomic_access) {  
    4.   assert(adr_idx != Compile::AliasIdxTop, "use other store_to_memory factory" );  
    5.   const TypePtr* adr_type = NULL;  
    6.   debug_only(adr_type = C->get_adr_type(adr_idx));  
    7.   Node *mem = memory(adr_idx);  
    8.   Node* st;  
    9.   if (require_atomic_access && bt == T_LONG) {  
    10.     st = StoreLNode::make_atomic(C, ctl, mem, adr, adr_type, val);  
    11.   } else {  
    12.     st = StoreNode::make(_gvn, ctl, mem, adr, adr_type, val, bt);  
    13.   }  
    14.   st = _gvn.transform(st);  
    15.   set_memory(st, adr_idx);  
    16.   // Back-to-back stores can only remove intermediate store with DU info  
    17.   // so push on worklist for optimizer.  
    18.   if (mem->req() > MemNode::Address && adr == mem->in(MemNode::Address))  
    19.     record_for_igvn(st);  
    20.   return st;  
    21. }  
           可见,这里会针对long做原子写操作(这里的原子操作应该指的是将long的高4字节和低4字节的操作合并成一个原子操作,比如某些平台不支持非volatile的long/double域的原子操作)。
     
     
    • 最后看下AtomicReference的源码。
           AtomicReference内部构成和其他原子量基本一致,区别只是这个类内部保存一个对象引用。
    Java代码  收藏代码
    1. public class AtomicReference<V>  implements java.io.Serializable {  
    2.     private static final long serialVersionUID = -1848883965231344442L;  
    3.     private static final Unsafe unsafe = Unsafe.getUnsafe();  
    4.     private static final long valueOffset;  
    5.     static {  
    6.       try {  
    7.         valueOffset = unsafe.objectFieldOffset  
    8.             (AtomicReference.class.getDeclaredField("value"));  
    9.       } catch (Exception ex) { throw new Error(ex); }  
    10.     }  
    11.     private volatile V value;  
           重点看一下内部调用的unsafe的compareAndSwapObject和putOrderedObject方法,先看一下compareAndSwapObject:
    Java代码  收藏代码
    1. /** 
    2.  * Atomically sets the value to the given updated value 
    3.  * if the current value {@code ==} the expected value. 
    4.  * @param expect the expected value 
    5.  * @param update the new value 
    6.  * @return true if successful. False return indicates that 
    7.  * the actual value was not equal to the expected value. 
    8.  */  
    9. public final boolean compareAndSet(V expect, V update) {  
    10.     return unsafe.compareAndSwapObject(this, valueOffset, expect, update);  
    11. }  
           在unsafe.cpp中可以找到实现:
    Cpp代码  收藏代码
    1.     {CC"compareAndSwapObject", CC"("OBJ"J"OBJ""OBJ")Z",  FN_PTR(Unsafe_CompareAndSwapObject)},  
    2.     ...  
    3. UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapObject(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jobject e_h, jobject x_h))  
    4.   UnsafeWrapper("Unsafe_CompareAndSwapObject");  
    5.   oop x = JNIHandles::resolve(x_h);  
    6.   oop e = JNIHandles::resolve(e_h);  
    7.   oop p = JNIHandles::resolve(obj);  
    8.   HeapWord* addr = (HeapWord *)index_oop_from_field_offset_long(p, offset);  
    9.   if (UseCompressedOops) {  
    10.     update_barrier_set_pre((narrowOop*)addr, e);  
    11.   } else {  
    12.     update_barrier_set_pre((oop*)addr, e);  
    13.   }  
    14.   oop res = oopDesc::atomic_compare_exchange_oop(x, addr, e);  
    15.   jboolean success  = (res == e);  
    16.   if (success)  
    17.     update_barrier_set((void*)addr, x);  
    18.   return success;  
    19. UNSAFE_END  
           可以看到里面实际上是执行oopDesc::atomic_compare_exchange_oop这个方法。找到hotspot/src/share/vm/oops/oop.inline.hpp中该方法实现:
    Cpp代码  收藏代码
    1. inline oop oopDesc::atomic_compare_exchange_oop(oop exchange_value,  
    2.                                                 volatile HeapWord *dest,  
    3.                                                 oop compare_value) {  
    4.   if (UseCompressedOops) {  
    5.     // encode exchange and compare value from oop to T  
    6.     narrowOop val = encode_heap_oop(exchange_value);  
    7.     narrowOop cmp = encode_heap_oop(compare_value);  
    8.     narrowOop old = (narrowOop) Atomic::cmpxchg(val, (narrowOop*)dest, cmp);  
    9.     // decode old from T to oop  
    10.     return decode_heap_oop(old);  
    11.   } else {  
    12.     return (oop)Atomic::cmpxchg_ptr(exchange_value, (oop*)dest, compare_value);  
    13.   }  
    14. }  
           cmpxchg之前分析过,看看cmpxche_ptr,找到hotspot/os_cpu/linux_x86/vm/atomic_linux_x86.inline.hpp中实现:
    Cpp代码  收藏代码
    1. inline intptr_t Atomic::cmpxchg_ptr(intptr_t exchange_value, volatile intptr_t* dest, intptr_t compare_value) {  
    2.   return (intptr_t)cmpxchg((jlong)exchange_value, (volatile jlong*)dest, (jlong)compare_value);  
    3. }  
    4. inline jlong    Atomic::cmpxchg    (jlong    exchange_value, volatile jlong*    dest, jlong    compare_value) {  
    5.   bool mp = os::is_MP();  
    6.   __asm__ __volatile__ (LOCK_IF_MP(%4) "cmpxchgq %1,(%3)"  
    7.                         : "=a" (exchange_value)  
    8.                         : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)  
    9.                         : "cc", "memory");  
    10.   return exchange_value;  
    11. }  
           就是(多核下带lock前缀的)cmpxchgq命令了。
           putOrderedObject方法按之前几篇的查找方法,会发现内联之后,相当于一个普通写操作了。
     
           OK,源码分析到此结束!
      
  • 相关阅读:
    分组PARTITION BY及游标CURSOR的用法
    dotnet core 3.1+consul 学习(1)
    常用状态码
    docker 安装consul以及部署consul集群
    泛型
    redis面试题(1)
    asp net core 3.1启动过程源码解读
    .net core 2.x到3.x变化 -> Endpoint Routing
    jwt登录验证逻辑
    .Net Core3.1+Jenkins+Docker+Git实现自动化部署
  • 原文地址:https://www.cnblogs.com/wxgblogs/p/5590264.html
Copyright © 2011-2022 走看看