zoukankan      html  css  js  c++  java
  • jvm源码解读--17 Java的wait()、notify()学习

    write and debug by 张艳涛

    wait()和notify()的通常用法

    1. A线程取得锁,执行wait(),释放锁;
    2. B线程取得锁,完成业务后执行notify(),再释放锁;
    3. B线程释放锁之后,A线程取得锁,继续执行wait()之后的代码;

    关于synchronize修饰的代码块
    通常,对于synchronize(lock){…}这样的代码块,编译后会生成monitorenter和monitorexit指令,线程执行到monitorenter指令时会尝试取得lock对应的monitor的所有权(CAS设置对象头),取得后即获取到锁,执行monitorexit指令时会释放monitor的所有权即释放锁;

    一个完整的demo
    为了深入学习wait()和notify(),先用完整的demo程序来模拟场景吧,以下是源码:

    package com.zyt.wait_notify;
    
    public class NotifyDemo {
    
        private static void sleep(long sleepVal){
            try{
                Thread.sleep(sleepVal);
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    
        private static void log(String desc){
            System.out.println(Thread.currentThread().getName() + " : " + desc);
        }
    
        Object lock = new Object();
    
        public void startThreadA(){
            new Thread(() -> {
                synchronized (lock){
                    log("get lock");
                    startThreadB();
                    log("start wait");
                    try {
                        
                    }catch(InterruptedException e){
                        e.printStackTrace();
                    }
    
                    log("get lock after wait");
                    log("release lock");
                }
            }, "thread-A").start();
        }
    
        public void startThreadB(){
            new Thread(()->{
                synchronized (lock){
                    log("get lock");
                    startThreadC();
                    sleep(100);
                    log("start notify");
                    lock.notify();
                    log("release lock");
    
                }
            },"thread-B").start();
        }
    
        public void startThreadC(){
            new Thread(() -> {
                synchronized (lock){
                    log("get lock");
                    log("release lock");
                }
            }, "thread-C").start();
        }
    
        public static void main(String[] args){
            new NotifyDemo().startThreadA();
        }
    }

    以上就是本次实战用到的demo,代码功能简述如下:

    启动线程A,取得锁之后先启动线程B再执行wait()方法,释放锁并等待;
    线程B启动之后会等待锁,A线程执行wait()之后,线程B取得锁,然后启动线程C,再执行notify唤醒线程A,最后退出synchronize代码块,释放锁;
    线程C启动之后就一直在等待锁,这时候线程B还没有退出synchronize代码块,锁还在线程B手里;
    线程A在线程B执行notify()之后就一直在等待锁,这时候线程B还没有退出synchronize代码块,锁还在线程B手里;
    线程B退出synchronize代码块,释放锁之后,线程A和线程C竞争锁;
    把上面的代码在Openjdk8下面执行,反复执行多次,都得到以下结果:

    thread-A : get lock
    thread-A : start wait
    thread-B : get lock
    thread-C : c thread is start
    thread-B : start notify
    thread-B : release lock
    thread-A : after wait, acquire lock again
    thread-A : release lock
    thread-C : get lock
    thread-C : release lock

    针对以上结果,问题来了:
    第一个问题:
    将以上代码反复执行多次,结果都是B释放锁之后A会先得到锁,这又是为什么呢?C为何不能先拿到锁呢?

    第二个问题:
    线程C自开始就执行了monitorenter指令,它能得到锁是容易理解的,但是线程A呢?在wait()之后并没有没有monitorenter指令,那么它又是如何取得锁的呢?

    wait()、notify()这些方法都是native方法,所以只有从JVM源码寻找答案了,本次阅读的是openjdk8的源码;

    带上问题去看JVM源码
    按照demo代码执行顺序,我整理了如下问题,带着这些问题去看JVM源码可以聚焦主线,不要被一些支线的次要的代码卡住(例如一些异常处理,监控和上报等):

    线程A在wait()的时候做了什么?
    线程C启动后,由于此时线程B持有锁,那么线程C此时在干啥?
    线程B在notify()的时候做了什么?
    线程B释放锁的时候做了什么?

    好了,接下来看源码分析问题吧:

    线程A在wait()的时候做了什么

    java代码中的

    lock.wait();

    这个lock是一个object对象

    如果你看源码的时候能看到这个是一个native方法,jvm对于native方法的处理有俩种方法,

    1. 一种是自定义,使用javah 生成对应的.hpp的头,在用c++写对于的.cpp方法实现,其中会调用jin.h的env方法,在使用gcc编译成.so文件,
    ObjTest.java
    
    package jni;
    class A {}
    public class ObjTest extends A{
        static {
            System.loadLibrary("ObjTest");
        }
        public ObjTest(){
            System.out.println("default");
        }
        public ObjTest(int age){
            System.out.println("param Construtor,age->"+age);
        }
        public native static void test(Object a);
    
        public static void main(String[] args){
            test(new ObjTest());
        }
    }

      2.另外的一种就是系统自带的native,

    如果你要看代码的话

    
    #include <stdio.h>
    #include <signal.h>
    #include <limits.h>
    
    #include "jni.h"
    #include "jni_util.h"
    #include "jvm.h"
    
    #include "java_lang_Object.h"
    
    static JNINativeMethod methods[] = {
        {"hashCode",    "()I",                    (void *)&JVM_IHashCode},
        {"wait",        "(J)V",                   (void *)&JVM_MonitorWait},
        {"notify",      "()V",                    (void *)&JVM_MonitorNotify},
        {"notifyAll",   "()V",                    (void *)&JVM_MonitorNotifyAll},
        {"clone",       "()Ljava/lang/Object;",   (void *)&JVM_Clone},
    };
    
    JNIEXPORT void JNICALL
    Java_java_lang_Object_registerNatives(JNIEnv *env, jclass cls)
    {
        (*env)->RegisterNatives(env, cls,
                                methods, sizeof(methods)/sizeof(methods[0]));
    }
    
    JNIEXPORT jclass JNICALL
    Java_java_lang_Object_getClass(JNIEnv *env, jobject this)
    {
        if (this == NULL) {
            JNU_ThrowNullPointerException(env, NULL);
            return 0;
        } else {
            return (*env)->GetObjectClass(env, this);
        }
    }

    会发现它使用了 RegisterNatives这种方法,wait对于的方法,为   (void *)&JVM_MonitorWait

    在share/vm/prims/jvm.cpp文件中定义的

    JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms))
      JVMWrapper("JVM_MonitorWait");
      Handle obj(THREAD, JNIHandles::resolve_non_null(handle));
      JavaThreadInObjectWaitState jtiows(thread, ms != 0);
      if (JvmtiExport::should_post_monitor_wait()) {
        JvmtiExport::post_monitor_wait((JavaThread *)THREAD, (oop)obj(), ms);
      }
      ObjectSynchronizer::wait(obj, ms, CHECK);
    JVM_END

    重要的是,他要进入ObjectSynchronizer.wait方法

    void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
        //UseBiasedLocking默认为true
      if (UseBiasedLocking) {
          //撤销对象头中包含的偏向锁
        BiasedLocking::revoke_and_rebias(obj, false, THREAD);
        assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
      }
      if (millis < 0) {
        TEVENT (wait - throw IAX) ;
        THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
      }
        //分配一个关联的ObjectMonitor实例
      ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj());
      DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis);
        //调用其wait方法
      monitor->wait(millis, true, THREAD);
    
      /* This dummy call is in place to get around dtrace bug 6254741.  Once
         that's fixed we can uncomment the following line and remove the call */
      // DTRACE_MONITOR_PROBE(waited, monitor, obj(), THREAD);
      dtrace_waited_probe(monitor, obj, THREAD);
    }

    这里就能看到了

    //调用其wait方法
    monitor->wait(millis, true, THREAD);

    线程B在这个时候做了什么

    在线程A调用wait方法的同时,线程B也已经启动了,就是jvm源码调试中的Thread-10

    能看到这个时候线程B,已经进入了synchronized代码块,其中对应的指令就是monitorenter

    对于monitorenter指令进行简单分析

    先打印一下bt信息

    #0  ObjectMonitor::enter (this=0x7fb1280036e0, __the_thread__=0x7fb11c001800) at /home/atzhang/atzhang/openjdksource/openjdk8/openjdk/hotspot/src/share/vm/runtime/objectMonitor.cpp:323
    #1  0x00007fb14a8de3e6 in ObjectSynchronizer::slow_enter (obj=..., lock=0x7fb1345fc6a8, __the_thread__=0x7fb11c001800) at /home/atzhang/atzhang/openjdksource/openjdk8/openjdk/hotspot/src/share/vm/runtime/synchronizer.cpp:258
    #2  0x00007fb14a8ddf70 in ObjectSynchronizer::fast_enter (obj=..., lock=0x7fb1345fc6a8, attempt_rebias=true, __the_thread__=0x7fb11c001800) at /home/atzhang/atzhang/openjdksource/openjdk8/openjdk/hotspot/src/share/vm/runtime/synchronizer.cpp:180
    #3  0x00007fb14a538e2f in InterpreterRuntime::monitorenter (thread=0x7fb11c001800, elem=0x7fb1345fc6a8) at /home/atzhang/atzhang/openjdksource/openjdk8/openjdk/hotspot/src/share/vm/interpreter/interpreterRuntime.cpp:573

    如果看网上的一般文章那么分析的起点就是InterpreterRuntime::monitorenter ,其实是不对的,也就是99.8%的文章都是差了那么一乃乃,

    在上下求索的过程中,发现连mashibing的公开课讲的课,黄俊将的synchronized底层实现都是错的,他错在了哪里了? 他用了openjdk1.8早期的

    源码版本来讲偏向锁,其实这个源码还没有实现偏向锁,我也遇到了这个问题,发现怎么看也看不懂,那么真正的入口是

    bytecodeInterpreter.cpp 

         /* monitorenter and monitorexit for locking/unlocking an object */
    
          CASE(_monitorenter): {
            oop lockee = STACK_OBJECT(-1);
            // derefing's lockee ought to provoke implicit null check
            CHECK_NULL(lockee);
            // find a free monitor or one already allocated for this object
            // if we find a matching object then we need a new monitor
            // since this is recursive enter
            BasicObjectLock* limit = istate->monitor_base();
            BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
            BasicObjectLock* entry = NULL;
            while (most_recent != limit ) {
              if (most_recent->obj() == NULL) entry = most_recent;
              else if (most_recent->obj() == lockee) break;
              most_recent++;
            }
            if (entry != NULL) {
              entry->set_obj(lockee);
              int success = false;
              uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;
    
              markOop mark = lockee->mark();
              intptr_t hash = (intptr_t) markOopDesc::no_hash;
              // implies UseBiasedLocking
              // code 3:如果锁对象的mark word的状态是偏向模式
              if (mark->has_bias_pattern()) {//has_bias_pattern las 3bit is 101 ?
                uintptr_t thread_ident;
                uintptr_t anticipated_bias_locking_value;
                thread_ident = (uintptr_t)istate->thread();
                anticipated_bias_locking_value =
                  (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &
                  ~((uintptr_t) markOopDesc::age_mask_in_place);
                 // code 5:如果偏向的线程是自己且epoch等于class的epoch
                if  (anticipated_bias_locking_value == 0) {
                  // already biased towards this thread, nothing to do
                  if (PrintBiasedLockingStatistics) {
                    (* BiasedLocking::biased_lock_entry_count_addr())++;
                  }
                  success = true;
                }
                   /**
                   *  初始化:偏向锁默认是延时初始化的,延迟的时间通过参数BiasedLockingStartupDelay控制,默认是4000ms默认是4000ms。
                   *  初始化是在安全点下通过VMThread完成的,初始化时会把由SystemDictionary维护的所有已加载类的
                   *  Klass的prototype_header修改成匿名偏向锁对象头,并把_biased_locking_enabled静态属性置为true,
                   *  后续加载新的Klass时发现该属性为true,会将Klass的prototype_header修改成匿名偏向锁对象头。
                   *  当创建某个Klass的oop时,会利用Klass的prototype_header来初始化该oop的对象头,即偏向锁初始化完成后,
                   *  后续所有创建的oop的初始对象头都是匿名偏向锁的,在此之前创建的oop初始对象头都是无锁状态的。
                   */
                else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
                  // try revoke bias
    
                  markOop header = lockee->klass()->prototype_header();
                  if (hash != markOopDesc::no_hash) {
                    header = header->copy_set_hash(hash);
                  }
                  //_biased_locking_enabled静态属性为false,默认是4000ms,尚未开启偏向锁,所以撤销偏向锁
                  if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) {
                    if (PrintBiasedLockingStatistics)
                      (*BiasedLocking::revoked_lock_entry_count_addr())++;
                  }
                }
                 // code 7:如果epoch不等于class中的epoch,则尝试重偏向
                else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
                  // try rebias
                  markOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident);
                  if (hash != markOopDesc::no_hash) {
                    new_header = new_header->copy_set_hash(hash);
                  }
                  if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), mark) == mark) {
                    if (PrintBiasedLockingStatistics)
                      (* BiasedLocking::rebiased_lock_entry_count_addr())++;
                  }
                  else {
                    // 重偏向失败,代表存在多线程竞争,则调用monitorenter方法进行锁升级
                      CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
                  }
                  success = true;
                }
                else {
                    // 走到这里说明当前要么偏向别的线程,要么是匿名偏向(即没有偏向任何线程)
                    // code 8:下面构建一个匿名偏向的mark word,尝试用CAS指令替换掉锁对象的mark word
                    // try to bias towards thread in case object is anonymously biased
                  markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |
                                                                  (uintptr_t)markOopDesc::age_mask_in_place |
                                                                  epoch_mask_in_place));
                  if (hash != markOopDesc::no_hash) {
                    header = header->copy_set_hash(hash);
                  }
                  markOop new_header = (markOop) ((uintptr_t) header | thread_ident);
                  // debugging hint
                  DEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);)
                  if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
                    // CAS修改成功
                      if (PrintBiasedLockingStatistics)
                      (* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
                  }
                  else {
                    // 如果修改失败说明存在多线程竞争,所以进入monitorenter方法
                    CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
                  }
                  success = true;
                }
              }
              //if (mark->has_bias_pattern()) {//has_bias_pattern las 3bit is 101 ? } end of code
              //如果没符合的mark->has_bias_pattern就到这里了,
              // traditional lightweight locking
              // 如果偏向线程不是当前线程或没有开启偏向模式等原因都会导致success==false??存疑
    
              if (!success) {
                  // 轻量级锁的逻辑
                    //code 9: 构造一个无锁状态的Displaced Mark Word,并将Lock Record的lock指向它
                markOop displaced = lockee->mark()->set_unlocked();
                entry->lock()->set_displaced_header(displaced);
                //如果指定了-XX:+UseHeavyMonitors,则call_vm=true,代表禁用偏向锁和轻量级锁
                bool call_vm = UseHeavyMonitors;
                 // 利用CAS将对象头的mark word替换为指向Lock Record的指针,(如果是无锁状态,直接升级轻量级锁)
                if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
                  // Is it simple recursive case?
                  // 判断是不是锁重入,进入这里说明obj是不是无锁状态,
                  if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
                    //code 10: 如果是锁重入,则直接将Displaced Mark Word设置为null
                      entry->lock()->set_displaced_header(NULL);
                  } else {
                    CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
                  }
                }
              }
              UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
            } else {
               // lock record不够,重新执行
              istate->set_msg(more_monitors);
              UPDATE_PC_AND_RETURN(0); // Re-execute
            }
          }
          /**
           * JVM中的每个类也有一个类似mark word的prototype_header,用来标记该class的epoch和偏向开关等信息。
           * 上面的代码中lockee->klass()->prototype_header()即获取class的prototype_header。
    
    code 1,从当前线程的栈中找到一个空闲的Lock Record(即代码中的BasicObjectLock,下文都用Lock Record代指),
           判断Lock Record是否空闲的依据是其obj字段 是否为null。
           注意这里是按内存地址从低往高找到最后一个可用的Lock Record,换而言之,就是找到内存地址最高的可用Lock Record。
    
    code 2,获取到Lock Record后,首先要做的就是为其obj字段赋值。
    
    code 3,判断锁对象的mark word是否是偏向模式,即低3位是否为101。
    
    code 4,这里有几步位运算的操作 anticipated_bias_locking_value =
           (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) & ​
           ~((uintptr_t) markOopDesc::age_mask_in_place); 这个位运算可以分为3个部分。
    
    第一部分((uintptr_t)lockee->klass()->prototype_header() | thread_ident)
           将当前线程id和类的prototype_header相或,这样得到的值为
           (当前线程id + prototype_header中的(epoch + 分代年龄 + 偏向锁标志 + 锁标志位))
           ,注意prototype_header的分代年龄那4个字节为0
    
    第二部分 ^ (uintptr_t)mark 将上面计算得到的结果与锁对象的markOop进行异或,
           相等的位全部被置为0,只剩下不相等的位。
    
    第三部分 & ~((uintptr_t) markOopDesc::age_mask_in_place) markOopDesc::age_mask_in_place为...0001111000,
           取反后,变成了...1110000111,除了分代年龄那4位,其他位全为1;
           将取反后的结果再与上面的结果相与,将上面异或得到的结果中分代年龄给忽略掉。
    
    code 5,anticipated_bias_locking_value==0代表偏向的线程是当前线程且mark word的epoch等于class的epoch,这种情况下什么都不用做。
    
    code 6,(anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0
           代表class的prototype_header或对象的mark word中偏向模式是关闭的,
           又因为能走到这已经通过了mark->has_bias_pattern()判断,
           即对象的mark word中偏向模式是开启的,那也就是说class的prototype_header不是偏向模式。
    
    然后利用CAS指令Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark撤销偏向锁,
           我们知道CAS会有几个参数,1是预期的原值,2是预期修改后的值 ,3是要修改的对象,
           与之对应,cmpxchg_ptr方法第一个参数是预期修改后的值,第2个参数是修改的对象,第3个参数是预期原值,
           方法返回实际原值,如果等于预期原值则说明修改成功。
    
    code 7,如果epoch已过期,则需要重偏向,利用CAS指令将锁对象的mark word替换为一个偏向当前线程且epoch为类的epoch的新的mark word。
    
    code 8,CAS将偏向线程改为当前线程,如果当前是匿名偏向则能修改成功,否则进入锁升级的逻辑。
    
    code 9,这一步已经是轻量级锁的逻辑了。从上图的mark word的格式可以看到,
           轻量级锁中mark word存的是指向Lock Record的指针。这里构造一个无锁状态的mark word,
           然后存储到Lock Record(Lock Record的格式可以看第一篇文章)。设置mark word是无锁状态的原因是:
           轻量级锁解锁时是将对象头的mark word设置为Lock Record中的Displaced Mark Word,
           所以创建时设置为无锁状态,解锁时直接用CAS替换就好了。
    
    code 10, 如果是锁重入,则将Lock Record的Displaced Mark Word设置为null,起到一个锁重入计数的作用。
    
    以上是偏向锁加锁的流程(包括部分轻量级锁的加锁流程),如果当前锁已偏向其他线程||epoch值过期||偏向模式关闭||获取偏向锁的过程中存在并发冲突,
           都会进入到InterpreterRuntime::monitorenter方法, 在该方法中会对偏向锁撤销和升级。
           */

    这个比较复杂,如果你下载的版本是hotspot-87ee5ee27509那么你会看到源码为

         /* monitorenter and monitorexit for locking/unlocking an object */
    
          CASE(_monitorenter): {
            oop lockee = STACK_OBJECT(-1);
            // derefing's lockee ought to provoke implicit null check
            CHECK_NULL(lockee);
            // find a free monitor or one already allocated for this object
            // if we find a matching object then we need a new monitor
            // since this is recursive enter
            BasicObjectLock* limit = istate->monitor_base();
            BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
            BasicObjectLock* entry = NULL;
            while (most_recent != limit ) {
              if (most_recent->obj() == NULL) entry = most_recent;
              else if (most_recent->obj() == lockee) break;
              most_recent++;
            }
            if (entry != NULL) {
              entry->set_obj(lockee);
              markOop displaced = lockee->mark()->set_unlocked();
              entry->lock()->set_displaced_header(displaced);
              if (Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
                // Is it simple recursive case?
                if (THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
                  entry->lock()->set_displaced_header(NULL);
                } else {
                  CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
                }
              }
              UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
            } else {
              istate->set_msg(more_monitors);
              UPDATE_PC_AND_RETURN(0); // Re-execute
            }
          }
    

    这个真是个坑,那么分析一下这个过程

    接着进入fast_enter

    直接进入,

    如果进入inflate方法,

    会有其中情况:

    • // The mark can be in one of the following states:
    • // * Inflated - just return
    • // * Stack-locked - coerce it to inflated
    • // * INFLATING - busy wait for conversion to complete
    • // * Neutral - aggressively inflate the object.
    • // * BIASED - Illegal. We should never see this
    • ===以上是源码注解
    • 如果膨胀过了,锁已经是 ObjectMonitor,那么直接返回
    • 如果是Stack-locked,就是轻量级锁,那么锁膨胀
    • 如果膨胀中
    • 如果无锁Neutral,直接膨胀为重量级锁
    • 如果biased偏向锁,非法,因为上一个方法有判断,如果是偏向锁,直接返回不会到到达这里的

    这里只贴了,锁此刻为重量级锁的时候,

    那么返回的 ObjectMonitor * m 进入了enter方法

    void ATTR ObjectMonitor::enter(TRAPS) {
      // The following code is ordered to check the most common cases first
      // and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors.
      Thread * const Self = THREAD ;
      void * cur ;
    //原子的设置owner属性,如果_owner属性是NULL就将其设置为Self,否则返回当前的_owner属性
      cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
      if (cur == NULL) {
          //设置成功,说明该Monitor没有被人占用
         // Either ASSERT _recursions == 0 or explicitly set _recursions = 0.
         assert (_recursions == 0   , "invariant") ;
         assert (_owner      == Self, "invariant") ;
         // CONSIDER: set or assert OwnerIsThread == 1
         return ;
      }
    
      if (cur == Self) {
          //设置失败,说明该Monitor就是当前线程占用的,此处进入enter是嵌套加锁情形
         // TODO-FIXME: check for integer overflow!  BUGID 6557169.
         _recursions ++ ;
         return ;
      }
    
        //轻量级锁膨胀成重量级锁时,将owner设置为lock属性
      if (Self->is_lock_owned ((address)cur)) {
        assert (_recursions == 0, "internal state error");
        _recursions = 1 ;
        // Commute owner from a thread-specific on-stack BasicLockObject address to
        // a full-fledged "Thread *".
        _owner = Self ;
        OwnerIsThread = 1 ;
        return ;
      }
    
        //该Monitor被其他某个线程占用了,需要抢占
      // We've encountered genuine contention.
      assert (Self->_Stalled == 0, "invariant") ;
    
        //记录需要抢占的Monitor指针
      Self->_Stalled = intptr_t(this) ;
      //Knob_SpinEarly默认为1,即为true
        //TrySpin让当前线程自旋,自旋的次数默认可以自适应调整,如果进入安全点同步则退出自旋,返回1表示抢占成功
      // Try one round of spinning *before* enqueueing Self
      // and before going through the awkward and expensive state
      // transitions.  The following spin is strictly optional ...
      // Note that if we acquire the monitor from an initial spin
      // we forgo posting JVMTI events and firing DTRACE probes.
      if (Knob_SpinEarly && TrySpin (Self) > 0) {
         assert (_owner == Self      , "invariant") ;
         assert (_recursions == 0    , "invariant") ;
         assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
         Self->_Stalled = 0 ;
         return ;
      }
    //自旋若干次数后依然抢占失败
      assert (_owner != Self          , "invariant") ;
      assert (_succ  != Self          , "invariant") ;
      assert (Self->is_Java_thread()  , "invariant") ;
      JavaThread * jt = (JavaThread *) Self ;
      assert (!SafepointSynchronize::is_at_safepoint(), "invariant") ;
      assert (jt->thread_state() != _thread_blocked   , "invariant") ;
      assert (this->object() != NULL  , "invariant") ;
      assert (_count >= 0, "invariant") ;
    
      //原子的将_count属性加1,表示增加了一个抢占该锁的线程
      // Prevent deflation at STW-time.  See deflate_idle_monitors() and is_busy().
      // Ensure the object-monitor relationship remains stable while there's contention.
      Atomic::inc_ptr(&_count);
    
      EventJavaMonitorEnter event;
    
      { // Change java thread status to indicate blocked on monitor enter.
          //修改Java线程状态为BLOCKED_ON_MONITOR_ENTER,此代码块退出后还原成原来的
        JavaThreadBlockedOnMonitorEnterState jtbmes(jt, this);
    
        DTRACE_MONITOR_PROBE(contended__enter, this, object(), jt);
        if (JvmtiExport::should_post_monitor_contended_enter()) {
          JvmtiExport::post_monitor_contended_enter(jt, this);
        }
    
        OSThreadContendState osts(Self->osthread());
        ThreadBlockInVM tbivm(jt);
    
        Self->set_current_pending_monitor(this);
    
        // TODO-FIXME: change the following for(;;) loop to straight-line code.
        for (;;) {
          jt->set_suspend_equivalent();
          // cleared by handle_special_suspend_equivalent_condition()
          // or java_suspend_self()
    //会通过自旋,park等方式不断循环尝试获取锁,直到成功获取锁为止
          EnterI (THREAD) ;
    
          if (!ExitSuspendEquivalent(jt)) break ;
    
          //
          // We have acquired the contended monitor, but while we were
          // waiting another thread suspended us. We don't want to enter
          // the monitor while suspended because that would surprise the
          // thread that suspended us.
          //
              _recursions = 0 ;
          _succ = NULL ;
          exit (false, Self) ;
    
          jt->java_suspend_self();
        }
        Self->set_current_pending_monitor(NULL);
      }
    
      Atomic::dec_ptr(&_count);
      assert (_count >= 0, "invariant") ;
      Self->_Stalled = 0 ;
    
      // Must either set _recursions = 0 or ASSERT _recursions == 0.
      assert (_recursions == 0     , "invariant") ;
      assert (_owner == Self       , "invariant") ;
      assert (_succ  != Self       , "invariant") ;
      assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
    
      // The thread -- now the owner -- is back in vm mode.
      // Report the glorious news via TI,DTrace and jvmstat.
      // The probe effect is non-trivial.  All the reportage occurs
      // while we hold the monitor, increasing the length of the critical
      // section.  Amdahl's parallel speedup law comes vividly into play.
      //
      // Another option might be to aggregate the events (thread local or
      // per-monitor aggregation) and defer reporting until a more opportune
      // time -- such as next time some thread encounters contention but has
      // yet to acquire the lock.  While spinning that thread could
      // spinning we could increment JVMStat counters, etc.
    
      DTRACE_MONITOR_PROBE(contended__entered, this, object(), jt);
      if (JvmtiExport::should_post_monitor_contended_entered()) {
        JvmtiExport::post_monitor_contended_entered(jt, this);
      }
    
      if (event.should_commit()) {
        event.set_klass(((oop)this->object())->klass());
        event.set_previousOwner((TYPE_JAVALANGTHREAD)_previous_owner_tid);
        event.set_address((TYPE_ADDRESS)(uintptr_t)(this->object_addr()));
        event.commit();
      }
    
      if (ObjectMonitor::_sync_ContendedLockAttempts != NULL) {
         ObjectMonitor::_sync_ContendedLockAttempts->inc() ;
      }
    }
    
    
    // Caveat: TryLock() is not necessarily serializing if it returns failure.
    // Callers must compensate as needed.
    
    int ObjectMonitor::TryLock (Thread * Self) {
       for (;;) {
          void * own = _owner ;
          if (own != NULL) return 0 ;
          if (Atomic::cmpxchg_ptr (Self, &_owner, NULL) == NULL) {
             // Either guarantee _recursions == 0 or set _recursions = 0.
             assert (_recursions == 0, "invariant") ;
             assert (_owner == Self, "invariant") ;
             // CONSIDER: set or assert that OwnerIsThread == 1
             return 1 ;
          }
          // The lock had been free momentarily, but we lost the race to the lock.
          // Interference -- the CAS failed.
          // We can either return -1 or retry.
          // Retry doesn't make as much sense because the lock was just acquired.
          if (true) return -1 ;
       }
    }
    
    

    标记了比较重要的方法,主要流程就是

    • cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ; cas,期望无人使用锁,(即 MonitorObject._owner为0x0),
    • 比如这个时候线程A 会在wati方法在释放锁之前这个_owner的值为ThreadA线程的一个包装对象,那么此时需要等待锁
    • 在ThreadA释放锁之后,这个时候如果进入执行这个方法那么,直接回返回,代表已经获取了锁
    • 锁进行等待的代码主要是EnterI ,主要实现如下
    •     for (;;) {
      
              if (TryLock (Self) > 0) break ;
              assert (_owner != Self, "invariant") ;
      
              if ((SyncFlags & 2) && _Responsible == NULL) {
                 Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
              }
      
              // park self
              if (_Responsible == Self || (SyncFlags & 1)) {
                  TEVENT (Inflated enter - park TIMED) ;
                  Self->_ParkEvent->park ((jlong) RecheckInterval) ;
                  // Increase the RecheckInterval, but clamp the value.
                  RecheckInterval *= 8 ;
                  if (RecheckInterval > 1000) RecheckInterval = 1000 ;
              } else {
                  TEVENT (Inflated enter - park UNTIMED) ;
                  Self->_ParkEvent->park() ;
              }
      
              if (TryLock(Self) > 0) break ;
      
              // The lock is still contested.
              // Keep a tally of the # of futile wakeups.
              // Note that the counter is not protected by a lock or updated by atomics.
              // That is by design - we trade "lossy" counters which are exposed to
              // races during updates for a lower probe effect.
              TEVENT (Inflated enter - Futile wakeup) ;
              if (ObjectMonitor::_sync_FutileWakeups != NULL) {
                 ObjectMonitor::_sync_FutileWakeups->inc() ;
              }
              ++ nWakeups ;
      
              // Assuming this is not a spurious wakeup we'll normally find _succ == Self.
              // We can defer clearing _succ until after the spin completes
              // TrySpin() must tolerate being called with _succ == Self.
              // Try yet another round of adaptive spinning.
              if ((Knob_SpinAfterFutile & 1) && TrySpin (Self) > 0) break ;
      
              // We can find that we were unpark()ed and redesignated _succ while
              // we were spinning.  That's harmless.  If we iterate and call park(),
              // park() will consume the event and return immediately and we'll
              // just spin again.  This pattern can repeat, leaving _succ to simply
              // spin on a CPU.  Enable Knob_ResetEvent to clear pending unparks().
              // Alternately, we can sample fired() here, and if set, forgo spinning
              // in the next iteration.
      
              if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) {
                 Self->_ParkEvent->reset() ;
                 OrderAccess::fence() ;
              }
              if (_succ == Self) _succ = NULL ;
      
              // Invariant: after clearing _succ a thread *must* retry _owner before parking.
              OrderAccess::fence() ;
          }

      上边的方法里面可以主要看出

      • TryLock(Self),试着cas一把,如果没人占用锁就,占用锁
      • Self->_ParkEvent->park ((jlong) RecheckInterval) ; 线程挂起一段时间
      • 醒了之后,接着TryLock(Self),若不成功则一直循环,

    接着看线程A在wait()的时候做了什么

    此时线程B在一直循环等待获取锁,线程A开始执行wait方法,2者是并行执行的

    可以看到其实Thread9 就是线程A在jvm中对应的线程

    那么看到在threadA执行完wait方法之后,先释放了锁.在把自己所对应的linux线程挂起

    那么还查threadB.notify分析

    流程:

    lock.notify()   //调用lock对象的方法

    -->                 //这个obj就是这个lock,将lock作为参数

      ObjectSynchronizer::notify(obj, CHECK); 

    -->                 //找到obj对象上的重量锁ObjectMonitor 

    ObjectSynchronizer::inflate(THREAD, obj())->notify(THREAD);
    -->                 // 调用锁上的notify方法

    void ObjectMonitor::notify(TRAPS) 

    那么现在进入关键步骤

    进入的去DequeueWaiter()

    接着

     打印

    (gdb) p _WaitSet
    $1 = (ObjectWaiter * volatile) 0x7f88bbbfa110

    (gdb) p *node 
    $6 = (ObjectWaiter) {
      <StackObj> = {
        <AllocatedObj> = {
          _vptr.AllocatedObj = 0x7f88d66797f0 <vtable for ObjectWaiter+16>
        }, <No data fields>}, 
      members of ObjectWaiter: 
      _next = 0x7f88bbbfa110, 
      _prev = 0x7f88bbbfa110, 
      _thread = 0x7f88d0160800, 
      _notifier_tid = 140225242177872, 
      _event = 0x7f88d0161b00, 
      _notified = 0, 
      TState = ObjectWaiter::TS_WAIT, 
      _Sorted = (unknown: 3019902016), 
      _active = false
    }

    看这个链表节点的next和priev都是自己

      

    ObjectWaiter* next = node->_next;   找到下一个节点node3
       if (next == node) {  //如果next指向自己说明,只有一个node节点,那么久就将_WaitSet置空
      assert(node->_prev == node, "invariant check");
      _WaitSet = NULL;
     } else {    //不进入这里,这里是中间节点的情况
      ObjectWaiter* prev = node->_prev;  找到上一个节点 ,命名node1

      next->_prev = prev;    令下一个节点的上一个指向 node1
      prev->_next = next;     令node1的_next指针指向node3
      if (_WaitSet == node) {    
      _WaitSet = next;
        }
      }
      node->_next = NULL; 接着道这里,将node节点的next ,priev置空
      node->_prev = NULL;

    这就

    那么这样就取出来了objectWaiter 对象

    接下来对ObjectWaiter对象的处理方式,根据Policy的不同而不同:
    Policy == 0:放入_EntryList队列的排头位置;
    Policy == 1:放入_EntryList队列的末尾位置;
    Policy == 2:_EntryList队列为空就放入_EntryList,否则放入_cxq队列的排头位置;

    那么接着进入ThreadB的monitorexit指令

    小结一下,线程B释放了锁之后,执行的操作如下:

    偏向锁逻辑,此处未命中;
    根据QMode的不同,将ObjectWaiter从_cxq或者_EntryList中取出后唤醒;
    唤醒的元素会继续执行挂起前的代码,按照我们之前的分析,线程唤醒后,就会通过CAS去竞争锁,此时由于线程B已经释放了锁,那么此时应该能竞争成功;

    唤醒目标线程,就是A之前wait方法里面挂起了,那么接着看wait方法的实现

    还有

    //ReenterI和EnterI的逻辑基本相同,用于获取对象锁
    void ATTR ObjectMonitor::ReenterI (Thread * Self, ObjectWaiter * SelfNode) {
        assert (Self != NULL                , "invariant") ;
        assert (SelfNode != NULL            , "invariant") ;
        assert (SelfNode->_thread == Self   , "invariant") ;
        assert (_waiters > 0                , "invariant") ;
        //校验目标对象的对象头就是当前ObjectMonitor的指针
        assert (((oop)(object()))->mark() == markOopDesc::encode(this) , "invariant") ;
        assert (((JavaThread *)Self)->thread_state() != _thread_blocked, "invariant") ;
        JavaThread * jt = (JavaThread *) Self ;
     
        int nWakeups = 0 ;
        for (;;) {
            ObjectWaiter::TStates v = SelfNode->TState ;
            //校验状态
            guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;
            assert    (_owner != Self, "invariant") ;
            
            //尝试获取锁
            if (TryLock (Self) > 0) break ;
            //尝试自旋获取锁
            if (TrySpin (Self) > 0) break ;
     
            TEVENT (Wait Reentry - parking) ;
     
            {   
               //修改线程状态
               OSThreadContendState osts(Self->osthread());
               ThreadBlockInVM tbivm(jt);
     
               jt->set_suspend_equivalent();
               //SyncFlags默认是0
               if (SyncFlags & 1) {
                  Self->_ParkEvent->park ((jlong)1000) ;
               } else {
                  Self->_ParkEvent->park () ;
               }
     
               // were we externally suspended while we were waiting?
               for (;;) {
                  //ExitSuspendEquivalent默认返回false
                  if (!ExitSuspendEquivalent (jt)) break ;
                  if (_succ == Self) { _succ = NULL; OrderAccess::fence(); }
                  jt->java_suspend_self();
                  jt->set_suspend_equivalent();
               }
            }
     
            //尝试获取锁
            if (TryLock(Self) > 0) break ;
     
            TEVENT (Wait Reentry - futile wakeup) ;
            ++ nWakeups ;
     
            // Assuming this is not a spurious wakeup we'll normally
            // find that _succ == Self.
            if (_succ == Self) _succ = NULL ;
     
            // Invariant: after clearing _succ a contending thread
            // *must* retry  _owner before parking.
            OrderAccess::fence() ;
     
            if (ObjectMonitor::_sync_FutileWakeups != NULL) {
              ObjectMonitor::_sync_FutileWakeups->inc() ;
            }
        }//for循环结束
     
        //for循环结束,已经获取了锁
        assert (_owner == Self, "invariant") ;
        assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
        //从链表中移除
        UnlinkAfterAcquire (Self, SelfNode) ;
        if (_succ == Self) _succ = NULL ;
        assert (_succ != Self, "invariant") ;
        //修改状态为TS_RUN
        SelfNode->TState = ObjectWaiter::TS_RUN ;
        OrderAccess::fence() ;      // see comments at the end of EnterI()
    }

    本文结束

  • 相关阅读:
    2019-07-26
    2019-07-25
    免费的论文查重网站
    图片转链接
    hexo GitHub创建博客是遇到的问题
    苹果手机安装小火箭
    旅游--南京
    查看电脑基本信息以及电脑性能分析
    Android 不能root
    x-mind ZEN 指定安装路径(其他默认安装c盘可参考)
  • 原文地址:https://www.cnblogs.com/zytcomeon/p/14866326.html
Copyright © 2011-2022 走看看