write and debug by 张艳涛
wait()和notify()的通常用法
- A线程取得锁,执行wait(),释放锁;
- B线程取得锁,完成业务后执行notify(),再释放锁;
- B线程释放锁之后,A线程取得锁,继续执行wait()之后的代码;
关于synchronize修饰的代码块
通常,对于synchronize(lock){…}这样的代码块,编译后会生成monitorenter和monitorexit指令,线程执行到monitorenter指令时会尝试取得lock对应的monitor的所有权(CAS设置对象头),取得后即获取到锁,执行monitorexit指令时会释放monitor的所有权即释放锁;
一个完整的demo
为了深入学习wait()和notify(),先用完整的demo程序来模拟场景吧,以下是源码:
package com.zyt.wait_notify; public class NotifyDemo { private static void sleep(long sleepVal){ try{ Thread.sleep(sleepVal); }catch(Exception e){ e.printStackTrace(); } } private static void log(String desc){ System.out.println(Thread.currentThread().getName() + " : " + desc); } Object lock = new Object(); public void startThreadA(){ new Thread(() -> { synchronized (lock){ log("get lock"); startThreadB(); log("start wait"); try { }catch(InterruptedException e){ e.printStackTrace(); } log("get lock after wait"); log("release lock"); } }, "thread-A").start(); } public void startThreadB(){ new Thread(()->{ synchronized (lock){ log("get lock"); startThreadC(); sleep(100); log("start notify"); lock.notify(); log("release lock"); } },"thread-B").start(); } public void startThreadC(){ new Thread(() -> { synchronized (lock){ log("get lock"); log("release lock"); } }, "thread-C").start(); } public static void main(String[] args){ new NotifyDemo().startThreadA(); } }
以上就是本次实战用到的demo,代码功能简述如下:
启动线程A,取得锁之后先启动线程B再执行wait()方法,释放锁并等待;
线程B启动之后会等待锁,A线程执行wait()之后,线程B取得锁,然后启动线程C,再执行notify唤醒线程A,最后退出synchronize代码块,释放锁;
线程C启动之后就一直在等待锁,这时候线程B还没有退出synchronize代码块,锁还在线程B手里;
线程A在线程B执行notify()之后就一直在等待锁,这时候线程B还没有退出synchronize代码块,锁还在线程B手里;
线程B退出synchronize代码块,释放锁之后,线程A和线程C竞争锁;
把上面的代码在Openjdk8下面执行,反复执行多次,都得到以下结果:
thread-A : get lock
thread-A : start wait
thread-B : get lock
thread-C : c thread is start
thread-B : start notify
thread-B : release lock
thread-A : after wait, acquire lock again
thread-A : release lock
thread-C : get lock
thread-C : release lock
针对以上结果,问题来了:
第一个问题:
将以上代码反复执行多次,结果都是B释放锁之后A会先得到锁,这又是为什么呢?C为何不能先拿到锁呢?
第二个问题:
线程C自开始就执行了monitorenter指令,它能得到锁是容易理解的,但是线程A呢?在wait()之后并没有没有monitorenter指令,那么它又是如何取得锁的呢?
wait()、notify()这些方法都是native方法,所以只有从JVM源码寻找答案了,本次阅读的是openjdk8的源码;
带上问题去看JVM源码
按照demo代码执行顺序,我整理了如下问题,带着这些问题去看JVM源码可以聚焦主线,不要被一些支线的次要的代码卡住(例如一些异常处理,监控和上报等):
线程A在wait()的时候做了什么?
线程C启动后,由于此时线程B持有锁,那么线程C此时在干啥?
线程B在notify()的时候做了什么?
线程B释放锁的时候做了什么?
好了,接下来看源码分析问题吧:
线程A在wait()的时候做了什么
java代码中的
lock.wait();
这个lock是一个object对象
如果你看源码的时候能看到这个是一个native方法,jvm对于native方法的处理有俩种方法,
- 一种是自定义,使用javah 生成对应的.hpp的头,在用c++写对于的.cpp方法实现,其中会调用jin.h的env方法,在使用gcc编译成.so文件,
ObjTest.java package jni; class A {} public class ObjTest extends A{ static { System.loadLibrary("ObjTest"); } public ObjTest(){ System.out.println("default"); } public ObjTest(int age){ System.out.println("param Construtor,age->"+age); } public native static void test(Object a); public static void main(String[] args){ test(new ObjTest()); } }
2.另外的一种就是系统自带的native,
如果你要看代码的话
#include <stdio.h> #include <signal.h> #include <limits.h> #include "jni.h" #include "jni_util.h" #include "jvm.h" #include "java_lang_Object.h" static JNINativeMethod methods[] = { {"hashCode", "()I", (void *)&JVM_IHashCode}, {"wait", "(J)V", (void *)&JVM_MonitorWait}, {"notify", "()V", (void *)&JVM_MonitorNotify}, {"notifyAll", "()V", (void *)&JVM_MonitorNotifyAll}, {"clone", "()Ljava/lang/Object;", (void *)&JVM_Clone}, }; JNIEXPORT void JNICALL Java_java_lang_Object_registerNatives(JNIEnv *env, jclass cls) { (*env)->RegisterNatives(env, cls, methods, sizeof(methods)/sizeof(methods[0])); } JNIEXPORT jclass JNICALL Java_java_lang_Object_getClass(JNIEnv *env, jobject this) { if (this == NULL) { JNU_ThrowNullPointerException(env, NULL); return 0; } else { return (*env)->GetObjectClass(env, this); } }
会发现它使用了 RegisterNatives这种方法,wait对于的方法,为 (void *)&JVM_MonitorWait
在share/vm/prims/jvm.cpp文件中定义的
JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms)) JVMWrapper("JVM_MonitorWait"); Handle obj(THREAD, JNIHandles::resolve_non_null(handle)); JavaThreadInObjectWaitState jtiows(thread, ms != 0); if (JvmtiExport::should_post_monitor_wait()) { JvmtiExport::post_monitor_wait((JavaThread *)THREAD, (oop)obj(), ms); } ObjectSynchronizer::wait(obj, ms, CHECK); JVM_END
重要的是,他要进入ObjectSynchronizer.wait方法
void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) { //UseBiasedLocking默认为true if (UseBiasedLocking) { //撤销对象头中包含的偏向锁 BiasedLocking::revoke_and_rebias(obj, false, THREAD); assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now"); } if (millis < 0) { TEVENT (wait - throw IAX) ; THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative"); } //分配一个关联的ObjectMonitor实例 ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj()); DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis); //调用其wait方法 monitor->wait(millis, true, THREAD); /* This dummy call is in place to get around dtrace bug 6254741. Once that's fixed we can uncomment the following line and remove the call */ // DTRACE_MONITOR_PROBE(waited, monitor, obj(), THREAD); dtrace_waited_probe(monitor, obj, THREAD); }
这里就能看到了
//调用其wait方法
monitor->wait(millis, true, THREAD);
线程B在这个时候做了什么
在线程A调用wait方法的同时,线程B也已经启动了,就是jvm源码调试中的Thread-10
能看到这个时候线程B,已经进入了synchronized代码块,其中对应的指令就是monitorenter
对于monitorenter指令进行简单分析
先打印一下bt信息
#0 ObjectMonitor::enter (this=0x7fb1280036e0, __the_thread__=0x7fb11c001800) at /home/atzhang/atzhang/openjdksource/openjdk8/openjdk/hotspot/src/share/vm/runtime/objectMonitor.cpp:323 #1 0x00007fb14a8de3e6 in ObjectSynchronizer::slow_enter (obj=..., lock=0x7fb1345fc6a8, __the_thread__=0x7fb11c001800) at /home/atzhang/atzhang/openjdksource/openjdk8/openjdk/hotspot/src/share/vm/runtime/synchronizer.cpp:258 #2 0x00007fb14a8ddf70 in ObjectSynchronizer::fast_enter (obj=..., lock=0x7fb1345fc6a8, attempt_rebias=true, __the_thread__=0x7fb11c001800) at /home/atzhang/atzhang/openjdksource/openjdk8/openjdk/hotspot/src/share/vm/runtime/synchronizer.cpp:180 #3 0x00007fb14a538e2f in InterpreterRuntime::monitorenter (thread=0x7fb11c001800, elem=0x7fb1345fc6a8) at /home/atzhang/atzhang/openjdksource/openjdk8/openjdk/hotspot/src/share/vm/interpreter/interpreterRuntime.cpp:573
如果看网上的一般文章那么分析的起点就是InterpreterRuntime::monitorenter ,其实是不对的,也就是99.8%的文章都是差了那么一乃乃,
在上下求索的过程中,发现连mashibing的公开课讲的课,黄俊将的synchronized底层实现都是错的,他错在了哪里了? 他用了openjdk1.8早期的
源码版本来讲偏向锁,其实这个源码还没有实现偏向锁,我也遇到了这个问题,发现怎么看也看不懂,那么真正的入口是
bytecodeInterpreter.cpp
/* monitorenter and monitorexit for locking/unlocking an object */ CASE(_monitorenter): { oop lockee = STACK_OBJECT(-1); // derefing's lockee ought to provoke implicit null check CHECK_NULL(lockee); // find a free monitor or one already allocated for this object // if we find a matching object then we need a new monitor // since this is recursive enter BasicObjectLock* limit = istate->monitor_base(); BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base(); BasicObjectLock* entry = NULL; while (most_recent != limit ) { if (most_recent->obj() == NULL) entry = most_recent; else if (most_recent->obj() == lockee) break; most_recent++; } if (entry != NULL) { entry->set_obj(lockee); int success = false; uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place; markOop mark = lockee->mark(); intptr_t hash = (intptr_t) markOopDesc::no_hash; // implies UseBiasedLocking // code 3:如果锁对象的mark word的状态是偏向模式 if (mark->has_bias_pattern()) {//has_bias_pattern las 3bit is 101 ? uintptr_t thread_ident; uintptr_t anticipated_bias_locking_value; thread_ident = (uintptr_t)istate->thread(); anticipated_bias_locking_value = (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) & ~((uintptr_t) markOopDesc::age_mask_in_place); // code 5:如果偏向的线程是自己且epoch等于class的epoch if (anticipated_bias_locking_value == 0) { // already biased towards this thread, nothing to do if (PrintBiasedLockingStatistics) { (* BiasedLocking::biased_lock_entry_count_addr())++; } success = true; } /** * 初始化:偏向锁默认是延时初始化的,延迟的时间通过参数BiasedLockingStartupDelay控制,默认是4000ms默认是4000ms。 * 初始化是在安全点下通过VMThread完成的,初始化时会把由SystemDictionary维护的所有已加载类的 * Klass的prototype_header修改成匿名偏向锁对象头,并把_biased_locking_enabled静态属性置为true, * 后续加载新的Klass时发现该属性为true,会将Klass的prototype_header修改成匿名偏向锁对象头。 * 当创建某个Klass的oop时,会利用Klass的prototype_header来初始化该oop的对象头,即偏向锁初始化完成后, * 后续所有创建的oop的初始对象头都是匿名偏向锁的,在此之前创建的oop初始对象头都是无锁状态的。 */ else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) { // try revoke bias markOop header = lockee->klass()->prototype_header(); if (hash != markOopDesc::no_hash) { header = header->copy_set_hash(hash); } //_biased_locking_enabled静态属性为false,默认是4000ms,尚未开启偏向锁,所以撤销偏向锁 if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) { if (PrintBiasedLockingStatistics) (*BiasedLocking::revoked_lock_entry_count_addr())++; } } // code 7:如果epoch不等于class中的epoch,则尝试重偏向 else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) { // try rebias markOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident); if (hash != markOopDesc::no_hash) { new_header = new_header->copy_set_hash(hash); } if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), mark) == mark) { if (PrintBiasedLockingStatistics) (* BiasedLocking::rebiased_lock_entry_count_addr())++; } else { // 重偏向失败,代表存在多线程竞争,则调用monitorenter方法进行锁升级 CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception); } success = true; } else { // 走到这里说明当前要么偏向别的线程,要么是匿名偏向(即没有偏向任何线程) // code 8:下面构建一个匿名偏向的mark word,尝试用CAS指令替换掉锁对象的mark word // try to bias towards thread in case object is anonymously biased markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place | (uintptr_t)markOopDesc::age_mask_in_place | epoch_mask_in_place)); if (hash != markOopDesc::no_hash) { header = header->copy_set_hash(hash); } markOop new_header = (markOop) ((uintptr_t) header | thread_ident); // debugging hint DEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);) if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) { // CAS修改成功 if (PrintBiasedLockingStatistics) (* BiasedLocking::anonymously_biased_lock_entry_count_addr())++; } else { // 如果修改失败说明存在多线程竞争,所以进入monitorenter方法 CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception); } success = true; } } //if (mark->has_bias_pattern()) {//has_bias_pattern las 3bit is 101 ? } end of code //如果没符合的mark->has_bias_pattern就到这里了, // traditional lightweight locking // 如果偏向线程不是当前线程或没有开启偏向模式等原因都会导致success==false??存疑 if (!success) { // 轻量级锁的逻辑 //code 9: 构造一个无锁状态的Displaced Mark Word,并将Lock Record的lock指向它 markOop displaced = lockee->mark()->set_unlocked(); entry->lock()->set_displaced_header(displaced); //如果指定了-XX:+UseHeavyMonitors,则call_vm=true,代表禁用偏向锁和轻量级锁 bool call_vm = UseHeavyMonitors; // 利用CAS将对象头的mark word替换为指向Lock Record的指针,(如果是无锁状态,直接升级轻量级锁) if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) { // Is it simple recursive case? // 判断是不是锁重入,进入这里说明obj是不是无锁状态, if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) { //code 10: 如果是锁重入,则直接将Displaced Mark Word设置为null entry->lock()->set_displaced_header(NULL); } else { CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception); } } } UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1); } else { // lock record不够,重新执行 istate->set_msg(more_monitors); UPDATE_PC_AND_RETURN(0); // Re-execute } } /** * JVM中的每个类也有一个类似mark word的prototype_header,用来标记该class的epoch和偏向开关等信息。 * 上面的代码中lockee->klass()->prototype_header()即获取class的prototype_header。 code 1,从当前线程的栈中找到一个空闲的Lock Record(即代码中的BasicObjectLock,下文都用Lock Record代指), 判断Lock Record是否空闲的依据是其obj字段 是否为null。 注意这里是按内存地址从低往高找到最后一个可用的Lock Record,换而言之,就是找到内存地址最高的可用Lock Record。 code 2,获取到Lock Record后,首先要做的就是为其obj字段赋值。 code 3,判断锁对象的mark word是否是偏向模式,即低3位是否为101。 code 4,这里有几步位运算的操作 anticipated_bias_locking_value = (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) & ~((uintptr_t) markOopDesc::age_mask_in_place); 这个位运算可以分为3个部分。 第一部分((uintptr_t)lockee->klass()->prototype_header() | thread_ident) 将当前线程id和类的prototype_header相或,这样得到的值为 (当前线程id + prototype_header中的(epoch + 分代年龄 + 偏向锁标志 + 锁标志位)) ,注意prototype_header的分代年龄那4个字节为0 第二部分 ^ (uintptr_t)mark 将上面计算得到的结果与锁对象的markOop进行异或, 相等的位全部被置为0,只剩下不相等的位。 第三部分 & ~((uintptr_t) markOopDesc::age_mask_in_place) markOopDesc::age_mask_in_place为...0001111000, 取反后,变成了...1110000111,除了分代年龄那4位,其他位全为1; 将取反后的结果再与上面的结果相与,将上面异或得到的结果中分代年龄给忽略掉。 code 5,anticipated_bias_locking_value==0代表偏向的线程是当前线程且mark word的epoch等于class的epoch,这种情况下什么都不用做。 code 6,(anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0 代表class的prototype_header或对象的mark word中偏向模式是关闭的, 又因为能走到这已经通过了mark->has_bias_pattern()判断, 即对象的mark word中偏向模式是开启的,那也就是说class的prototype_header不是偏向模式。 然后利用CAS指令Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark撤销偏向锁, 我们知道CAS会有几个参数,1是预期的原值,2是预期修改后的值 ,3是要修改的对象, 与之对应,cmpxchg_ptr方法第一个参数是预期修改后的值,第2个参数是修改的对象,第3个参数是预期原值, 方法返回实际原值,如果等于预期原值则说明修改成功。 code 7,如果epoch已过期,则需要重偏向,利用CAS指令将锁对象的mark word替换为一个偏向当前线程且epoch为类的epoch的新的mark word。 code 8,CAS将偏向线程改为当前线程,如果当前是匿名偏向则能修改成功,否则进入锁升级的逻辑。 code 9,这一步已经是轻量级锁的逻辑了。从上图的mark word的格式可以看到, 轻量级锁中mark word存的是指向Lock Record的指针。这里构造一个无锁状态的mark word, 然后存储到Lock Record(Lock Record的格式可以看第一篇文章)。设置mark word是无锁状态的原因是: 轻量级锁解锁时是将对象头的mark word设置为Lock Record中的Displaced Mark Word, 所以创建时设置为无锁状态,解锁时直接用CAS替换就好了。 code 10, 如果是锁重入,则将Lock Record的Displaced Mark Word设置为null,起到一个锁重入计数的作用。 以上是偏向锁加锁的流程(包括部分轻量级锁的加锁流程),如果当前锁已偏向其他线程||epoch值过期||偏向模式关闭||获取偏向锁的过程中存在并发冲突, 都会进入到InterpreterRuntime::monitorenter方法, 在该方法中会对偏向锁撤销和升级。 */
这个比较复杂,如果你下载的版本是hotspot-87ee5ee27509那么你会看到源码为
/* monitorenter and monitorexit for locking/unlocking an object */ CASE(_monitorenter): { oop lockee = STACK_OBJECT(-1); // derefing's lockee ought to provoke implicit null check CHECK_NULL(lockee); // find a free monitor or one already allocated for this object // if we find a matching object then we need a new monitor // since this is recursive enter BasicObjectLock* limit = istate->monitor_base(); BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base(); BasicObjectLock* entry = NULL; while (most_recent != limit ) { if (most_recent->obj() == NULL) entry = most_recent; else if (most_recent->obj() == lockee) break; most_recent++; } if (entry != NULL) { entry->set_obj(lockee); markOop displaced = lockee->mark()->set_unlocked(); entry->lock()->set_displaced_header(displaced); if (Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) { // Is it simple recursive case? if (THREAD->is_lock_owned((address) displaced->clear_lock_bits())) { entry->lock()->set_displaced_header(NULL); } else { CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception); } } UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1); } else { istate->set_msg(more_monitors); UPDATE_PC_AND_RETURN(0); // Re-execute } }
这个真是个坑,那么分析一下这个过程
接着进入fast_enter
直接进入,
如果进入inflate方法,
会有其中情况:
- // The mark can be in one of the following states:
- // * Inflated - just return
- // * Stack-locked - coerce it to inflated
- // * INFLATING - busy wait for conversion to complete
- // * Neutral - aggressively inflate the object.
- // * BIASED - Illegal. We should never see this
- ===以上是源码注解
- 如果膨胀过了,锁已经是 ObjectMonitor,那么直接返回
- 如果是Stack-locked,就是轻量级锁,那么锁膨胀
- 如果膨胀中
- 如果无锁Neutral,直接膨胀为重量级锁
- 如果biased偏向锁,非法,因为上一个方法有判断,如果是偏向锁,直接返回不会到到达这里的
这里只贴了,锁此刻为重量级锁的时候,
那么返回的 ObjectMonitor * m 进入了enter方法
void ATTR ObjectMonitor::enter(TRAPS) { // The following code is ordered to check the most common cases first // and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors. Thread * const Self = THREAD ; void * cur ; //原子的设置owner属性,如果_owner属性是NULL就将其设置为Self,否则返回当前的_owner属性 cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ; if (cur == NULL) { //设置成功,说明该Monitor没有被人占用 // Either ASSERT _recursions == 0 or explicitly set _recursions = 0. assert (_recursions == 0 , "invariant") ; assert (_owner == Self, "invariant") ; // CONSIDER: set or assert OwnerIsThread == 1 return ; } if (cur == Self) { //设置失败,说明该Monitor就是当前线程占用的,此处进入enter是嵌套加锁情形 // TODO-FIXME: check for integer overflow! BUGID 6557169. _recursions ++ ; return ; } //轻量级锁膨胀成重量级锁时,将owner设置为lock属性 if (Self->is_lock_owned ((address)cur)) { assert (_recursions == 0, "internal state error"); _recursions = 1 ; // Commute owner from a thread-specific on-stack BasicLockObject address to // a full-fledged "Thread *". _owner = Self ; OwnerIsThread = 1 ; return ; } //该Monitor被其他某个线程占用了,需要抢占 // We've encountered genuine contention. assert (Self->_Stalled == 0, "invariant") ; //记录需要抢占的Monitor指针 Self->_Stalled = intptr_t(this) ; //Knob_SpinEarly默认为1,即为true //TrySpin让当前线程自旋,自旋的次数默认可以自适应调整,如果进入安全点同步则退出自旋,返回1表示抢占成功 // Try one round of spinning *before* enqueueing Self // and before going through the awkward and expensive state // transitions. The following spin is strictly optional ... // Note that if we acquire the monitor from an initial spin // we forgo posting JVMTI events and firing DTRACE probes. if (Knob_SpinEarly && TrySpin (Self) > 0) { assert (_owner == Self , "invariant") ; assert (_recursions == 0 , "invariant") ; assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ; Self->_Stalled = 0 ; return ; } //自旋若干次数后依然抢占失败 assert (_owner != Self , "invariant") ; assert (_succ != Self , "invariant") ; assert (Self->is_Java_thread() , "invariant") ; JavaThread * jt = (JavaThread *) Self ; assert (!SafepointSynchronize::is_at_safepoint(), "invariant") ; assert (jt->thread_state() != _thread_blocked , "invariant") ; assert (this->object() != NULL , "invariant") ; assert (_count >= 0, "invariant") ; //原子的将_count属性加1,表示增加了一个抢占该锁的线程 // Prevent deflation at STW-time. See deflate_idle_monitors() and is_busy(). // Ensure the object-monitor relationship remains stable while there's contention. Atomic::inc_ptr(&_count); EventJavaMonitorEnter event; { // Change java thread status to indicate blocked on monitor enter. //修改Java线程状态为BLOCKED_ON_MONITOR_ENTER,此代码块退出后还原成原来的 JavaThreadBlockedOnMonitorEnterState jtbmes(jt, this); DTRACE_MONITOR_PROBE(contended__enter, this, object(), jt); if (JvmtiExport::should_post_monitor_contended_enter()) { JvmtiExport::post_monitor_contended_enter(jt, this); } OSThreadContendState osts(Self->osthread()); ThreadBlockInVM tbivm(jt); Self->set_current_pending_monitor(this); // TODO-FIXME: change the following for(;;) loop to straight-line code. for (;;) { jt->set_suspend_equivalent(); // cleared by handle_special_suspend_equivalent_condition() // or java_suspend_self() //会通过自旋,park等方式不断循环尝试获取锁,直到成功获取锁为止 EnterI (THREAD) ; if (!ExitSuspendEquivalent(jt)) break ; // // We have acquired the contended monitor, but while we were // waiting another thread suspended us. We don't want to enter // the monitor while suspended because that would surprise the // thread that suspended us. // _recursions = 0 ; _succ = NULL ; exit (false, Self) ; jt->java_suspend_self(); } Self->set_current_pending_monitor(NULL); } Atomic::dec_ptr(&_count); assert (_count >= 0, "invariant") ; Self->_Stalled = 0 ; // Must either set _recursions = 0 or ASSERT _recursions == 0. assert (_recursions == 0 , "invariant") ; assert (_owner == Self , "invariant") ; assert (_succ != Self , "invariant") ; assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ; // The thread -- now the owner -- is back in vm mode. // Report the glorious news via TI,DTrace and jvmstat. // The probe effect is non-trivial. All the reportage occurs // while we hold the monitor, increasing the length of the critical // section. Amdahl's parallel speedup law comes vividly into play. // // Another option might be to aggregate the events (thread local or // per-monitor aggregation) and defer reporting until a more opportune // time -- such as next time some thread encounters contention but has // yet to acquire the lock. While spinning that thread could // spinning we could increment JVMStat counters, etc. DTRACE_MONITOR_PROBE(contended__entered, this, object(), jt); if (JvmtiExport::should_post_monitor_contended_entered()) { JvmtiExport::post_monitor_contended_entered(jt, this); } if (event.should_commit()) { event.set_klass(((oop)this->object())->klass()); event.set_previousOwner((TYPE_JAVALANGTHREAD)_previous_owner_tid); event.set_address((TYPE_ADDRESS)(uintptr_t)(this->object_addr())); event.commit(); } if (ObjectMonitor::_sync_ContendedLockAttempts != NULL) { ObjectMonitor::_sync_ContendedLockAttempts->inc() ; } } // Caveat: TryLock() is not necessarily serializing if it returns failure. // Callers must compensate as needed. int ObjectMonitor::TryLock (Thread * Self) { for (;;) { void * own = _owner ; if (own != NULL) return 0 ; if (Atomic::cmpxchg_ptr (Self, &_owner, NULL) == NULL) { // Either guarantee _recursions == 0 or set _recursions = 0. assert (_recursions == 0, "invariant") ; assert (_owner == Self, "invariant") ; // CONSIDER: set or assert that OwnerIsThread == 1 return 1 ; } // The lock had been free momentarily, but we lost the race to the lock. // Interference -- the CAS failed. // We can either return -1 or retry. // Retry doesn't make as much sense because the lock was just acquired. if (true) return -1 ; } }
标记了比较重要的方法,主要流程就是
- cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ; cas,期望无人使用锁,(即 MonitorObject._owner为0x0),
- 比如这个时候线程A 会在wati方法在释放锁之前这个_owner的值为ThreadA线程的一个包装对象,那么此时需要等待锁
- 在ThreadA释放锁之后,这个时候如果进入执行这个方法那么,直接回返回,代表已经获取了锁
- 锁进行等待的代码主要是EnterI ,主要实现如下
-
for (;;) { if (TryLock (Self) > 0) break ; assert (_owner != Self, "invariant") ; if ((SyncFlags & 2) && _Responsible == NULL) { Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ; } // park self if (_Responsible == Self || (SyncFlags & 1)) { TEVENT (Inflated enter - park TIMED) ; Self->_ParkEvent->park ((jlong) RecheckInterval) ; // Increase the RecheckInterval, but clamp the value. RecheckInterval *= 8 ; if (RecheckInterval > 1000) RecheckInterval = 1000 ; } else { TEVENT (Inflated enter - park UNTIMED) ; Self->_ParkEvent->park() ; } if (TryLock(Self) > 0) break ; // The lock is still contested. // Keep a tally of the # of futile wakeups. // Note that the counter is not protected by a lock or updated by atomics. // That is by design - we trade "lossy" counters which are exposed to // races during updates for a lower probe effect. TEVENT (Inflated enter - Futile wakeup) ; if (ObjectMonitor::_sync_FutileWakeups != NULL) { ObjectMonitor::_sync_FutileWakeups->inc() ; } ++ nWakeups ; // Assuming this is not a spurious wakeup we'll normally find _succ == Self. // We can defer clearing _succ until after the spin completes // TrySpin() must tolerate being called with _succ == Self. // Try yet another round of adaptive spinning. if ((Knob_SpinAfterFutile & 1) && TrySpin (Self) > 0) break ; // We can find that we were unpark()ed and redesignated _succ while // we were spinning. That's harmless. If we iterate and call park(), // park() will consume the event and return immediately and we'll // just spin again. This pattern can repeat, leaving _succ to simply // spin on a CPU. Enable Knob_ResetEvent to clear pending unparks(). // Alternately, we can sample fired() here, and if set, forgo spinning // in the next iteration. if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) { Self->_ParkEvent->reset() ; OrderAccess::fence() ; } if (_succ == Self) _succ = NULL ; // Invariant: after clearing _succ a thread *must* retry _owner before parking. OrderAccess::fence() ; }
上边的方法里面可以主要看出
- TryLock(Self),试着cas一把,如果没人占用锁就,占用锁
-
Self->_ParkEvent->park ((jlong) RecheckInterval) ; 线程挂起一段时间
- 醒了之后,接着TryLock(Self),若不成功则一直循环,
接着看线程A在wait()的时候做了什么
此时线程B在一直循环等待获取锁,线程A开始执行wait方法,2者是并行执行的
可以看到其实Thread9 就是线程A在jvm中对应的线程
那么看到在threadA执行完wait方法之后,先释放了锁.在把自己所对应的linux线程挂起
那么还查threadB.notify分析
流程:
lock.notify() //调用lock对象的方法
--> //这个obj就是这个lock,将lock作为参数
ObjectSynchronizer::notify(obj, CHECK);
--> //找到obj对象上的重量锁ObjectMonitor
ObjectSynchronizer::inflate(THREAD, obj())->notify(THREAD);
--> // 调用锁上的notify方法
void ObjectMonitor::notify(TRAPS)
那么现在进入关键步骤
进入的去DequeueWaiter()
接着
打印
(gdb) p _WaitSet
$1 = (ObjectWaiter * volatile) 0x7f88bbbfa110
(gdb) p *node $6 = (ObjectWaiter) { <StackObj> = { <AllocatedObj> = { _vptr.AllocatedObj = 0x7f88d66797f0 <vtable for ObjectWaiter+16> }, <No data fields>}, members of ObjectWaiter: _next = 0x7f88bbbfa110, _prev = 0x7f88bbbfa110, _thread = 0x7f88d0160800, _notifier_tid = 140225242177872, _event = 0x7f88d0161b00, _notified = 0, TState = ObjectWaiter::TS_WAIT, _Sorted = (unknown: 3019902016), _active = false }
看这个链表节点的next和priev都是自己
ObjectWaiter* next = node->_next; 找到下一个节点node3
if (next == node) { //如果next指向自己说明,只有一个node节点,那么久就将_WaitSet置空
assert(node->_prev == node, "invariant check");
_WaitSet = NULL;
} else { //不进入这里,这里是中间节点的情况
ObjectWaiter* prev = node->_prev; 找到上一个节点 ,命名node1
next->_prev = prev; 令下一个节点的上一个指向 node1
prev->_next = next; 令node1的_next指针指向node3
if (_WaitSet == node) {
_WaitSet = next;
}
}
node->_next = NULL; 接着道这里,将node节点的next ,priev置空
node->_prev = NULL;
这就
那么这样就取出来了objectWaiter 对象
接下来对ObjectWaiter对象的处理方式,根据Policy的不同而不同:
Policy == 0:放入_EntryList队列的排头位置;
Policy == 1:放入_EntryList队列的末尾位置;
Policy == 2:_EntryList队列为空就放入_EntryList,否则放入_cxq队列的排头位置;
那么接着进入ThreadB的monitorexit指令
小结一下,线程B释放了锁之后,执行的操作如下:
偏向锁逻辑,此处未命中;
根据QMode的不同,将ObjectWaiter从_cxq或者_EntryList中取出后唤醒;
唤醒的元素会继续执行挂起前的代码,按照我们之前的分析,线程唤醒后,就会通过CAS去竞争锁,此时由于线程B已经释放了锁,那么此时应该能竞争成功;
唤醒目标线程,就是A之前wait方法里面挂起了,那么接着看wait方法的实现
还有
//ReenterI和EnterI的逻辑基本相同,用于获取对象锁 void ATTR ObjectMonitor::ReenterI (Thread * Self, ObjectWaiter * SelfNode) { assert (Self != NULL , "invariant") ; assert (SelfNode != NULL , "invariant") ; assert (SelfNode->_thread == Self , "invariant") ; assert (_waiters > 0 , "invariant") ; //校验目标对象的对象头就是当前ObjectMonitor的指针 assert (((oop)(object()))->mark() == markOopDesc::encode(this) , "invariant") ; assert (((JavaThread *)Self)->thread_state() != _thread_blocked, "invariant") ; JavaThread * jt = (JavaThread *) Self ; int nWakeups = 0 ; for (;;) { ObjectWaiter::TStates v = SelfNode->TState ; //校验状态 guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ; assert (_owner != Self, "invariant") ; //尝试获取锁 if (TryLock (Self) > 0) break ; //尝试自旋获取锁 if (TrySpin (Self) > 0) break ; TEVENT (Wait Reentry - parking) ; { //修改线程状态 OSThreadContendState osts(Self->osthread()); ThreadBlockInVM tbivm(jt); jt->set_suspend_equivalent(); //SyncFlags默认是0 if (SyncFlags & 1) { Self->_ParkEvent->park ((jlong)1000) ; } else { Self->_ParkEvent->park () ; } // were we externally suspended while we were waiting? for (;;) { //ExitSuspendEquivalent默认返回false if (!ExitSuspendEquivalent (jt)) break ; if (_succ == Self) { _succ = NULL; OrderAccess::fence(); } jt->java_suspend_self(); jt->set_suspend_equivalent(); } } //尝试获取锁 if (TryLock(Self) > 0) break ; TEVENT (Wait Reentry - futile wakeup) ; ++ nWakeups ; // Assuming this is not a spurious wakeup we'll normally // find that _succ == Self. if (_succ == Self) _succ = NULL ; // Invariant: after clearing _succ a contending thread // *must* retry _owner before parking. OrderAccess::fence() ; if (ObjectMonitor::_sync_FutileWakeups != NULL) { ObjectMonitor::_sync_FutileWakeups->inc() ; } }//for循环结束 //for循环结束,已经获取了锁 assert (_owner == Self, "invariant") ; assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ; //从链表中移除 UnlinkAfterAcquire (Self, SelfNode) ; if (_succ == Self) _succ = NULL ; assert (_succ != Self, "invariant") ; //修改状态为TS_RUN SelfNode->TState = ObjectWaiter::TS_RUN ; OrderAccess::fence() ; // see comments at the end of EnterI() }
本文结束