zoukankan      html  css  js  c++  java
  • 关于java的wait方法的深入分析

    本文出自:    https://blog.csdn.net/qq_26222859/article/details/53930941

    wait() 的jvm实现

    先查看jdksrcshare ativejavalangObject.c

    #include <stdio.h>
    #include <signal.h>
    #include <limits.h>
     
    #include "jni.h"
    #include "jni_util.h"
    #include "jvm.h"
     
    #include "java_lang_Object.h"
     
    static JNINativeMethod methods[] = {
        {"hashCode",    "()I",                    (void *)&JVM_IHashCode},
        {"wait",        "(J)V",                   (void *)&JVM_MonitorWait},
        {"notify",      "()V",                    (void *)&JVM_MonitorNotify},
        {"notifyAll",   "()V",                    (void *)&JVM_MonitorNotifyAll},
        {"clone",       "()Ljava/lang/Object;",   (void *)&JVM_Clone},

    可以看到wait 对应的是 native  方法是JVM_MonitorWait, 继续查看JVM_MonitorWait的实现

    JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms))
      JVMWrapper("JVM_MonitorWait");
      Handle obj(THREAD, JNIHandles::resolve_non_null(handle));  //调用 JNIHandles::resolve_non_null 函数将 jobject 类型的 handle 转化为 oop
      assert(obj->is_instance() || obj->is_array(), "JVM_MonitorWait must apply to an object");
      JavaThreadInObjectWaitState jtiows(thread, ms != 0);
      if (JvmtiExport::should_post_monitor_wait()) {  
        JvmtiExport::post_monitor_wait((JavaThread *)THREAD, (oop)obj(), ms);
      }
      ObjectSynchronizer::wait(obj, ms, CHECK); //重点分析这句
    JVM_END

    接着分析ObjectSynchronizer::wait()方法:

    void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
      if (UseBiasedLocking) {
        BiasedLocking::revoke_and_rebias(obj, false, THREAD);
        assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
      }
      if (millis < 0) {
        TEVENT (wait - throw IAX) ;
        THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
      }
      ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj());
      DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis);
      monitor->wait(millis, true, THREAD);  //重点分析这句
     
      /* This dummy call is in place to get around dtrace bug 6254741.  Once
         that's fixed we can uncomment the following line and remove the call */
      // DTRACE_MONITOR_PROBE(waited, monitor, obj(), THREAD);
      dtrace_waited_probe(monitor, obj, THREAD);
    }

    最终通过ObjectMonitor的void wait(jlong millis, bool interruptable, TRAPS);实现

    void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
       Thread * const Self = THREAD ;
       assert(Self->is_Java_thread(), "Must be Java thread!");
       JavaThread *jt = (JavaThread *)THREAD;
     
       DeferredInitialize () ;
     
       // Throw IMSX or IEX.
       CHECK_OWNER();
     
       EventJavaMonitorWait event;
     
       // check for a pending interrupt
       if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
         // post monitor waited event.  Note that this is past-tense, we are done waiting.
         if (JvmtiExport::should_post_monitor_waited()) {
            // Note: 'false' parameter is passed here because the
            // wait was not timed out due to thread interrupt.
            JvmtiExport::post_monitor_waited(jt, this, false);
         }
         if (event.should_commit()) {
           post_monitor_wait_event(&event, 0, millis, false);
         }
         TEVENT (Wait - Throw IEX) ;
         THROW(vmSymbols::java_lang_InterruptedException());
         return ;
       }
     
       TEVENT (Wait) ;
     
       assert (Self->_Stalled == 0, "invariant") ;
       Self->_Stalled = intptr_t(this) ;
       jt->set_current_waiting_monitor(this);
     
       // create a node to be put into the queue
       // Critically, after we reset() the event but prior to park(), we must check
       // for a pending interrupt.
       ObjectWaiter node(Self);//重点分析这句
       node.TState = ObjectWaiter::TS_WAIT ;
       Self->_ParkEvent->reset() ;
       OrderAccess::fence();          // ST into Event; membar ; LD interrupted-flag
     
       // Enter the waiting queue, which is a circular doubly linked list in this case
       // but it could be a priority queue or any data structure.
       // _WaitSetLock protects the wait queue.  Normally the wait queue is accessed only
       // by the the owner of the monitor *except* in the case where park()
       // returns because of a timeout of interrupt.  Contention is exceptionally rare
       // so we use a simple spin-lock instead of a heavier-weight blocking lock.
     
       Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
       AddWaiter (&node) ; //重点分析这句
       Thread::SpinRelease (&_WaitSetLock) ;
     
       if ((SyncFlags & 4) == 0) {
          _Responsible = NULL ;
       }
       intptr_t save = _recursions; // record the old recursion count
       _waiters++;                  // increment the number of waiters
       _recursions = 0;             // set the recursion level to be 1
       exit (true, Self) ;                    // exit the monitor
       guarantee (_owner != Self, "invariant") ;
     
       // As soon as the ObjectMonitor's ownership is dropped in the exit()
       // call above, another thread can enter() the ObjectMonitor, do the
       // notify(), and exit() the ObjectMonitor. If the other thread's
       // exit() call chooses this thread as the successor and the unpark()
       // call happens to occur while this thread is posting a
       // MONITOR_CONTENDED_EXIT event, then we run the risk of the event
       // handler using RawMonitors and consuming the unpark().
       //
       // To avoid the problem, we re-post the event. This does no harm
       // even if the original unpark() was not consumed because we are the
       // chosen successor for this monitor.
       if (node._notified != 0 && _succ == Self) {
          node._event->unpark();
       }
     
       // The thread is on the WaitSet list - now park() it.
       // On MP systems it's conceivable that a brief spin before we park
       // could be profitable.
       //
       // TODO-FIXME: change the following logic to a loop of the form
       //   while (!timeout && !interrupted && _notified == 0) park()
     
       int ret = OS_OK ;
       int WasNotified = 0 ;
       { // State transition wrappers
         OSThread* osthread = Self->osthread();
         OSThreadWaitState osts(osthread, true);
         {
           ThreadBlockInVM tbivm(jt);
           // Thread is in thread_blocked state and oop access is unsafe.
           jt->set_suspend_equivalent();
     
           if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
               // Intentionally empty
           } else
           if (node._notified == 0) {
             if (millis <= 0) {
                Self->_ParkEvent->park () ;  //重点分析这句
             } else {
                ret = Self->_ParkEvent->park (millis) ; //重点分析这句
             }
           }
     
           // were we externally suspended while we were waiting?
           if (ExitSuspendEquivalent (jt)) {
              // TODO-FIXME: add -- if succ == Self then succ = null.
              jt->java_suspend_self();
           }
     
         } // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm
     
     
         // Node may be on the WaitSet, the EntryList (or cxq), or in transition
         // from the WaitSet to the EntryList.
         // See if we need to remove Node from the WaitSet.
         // We use double-checked locking to avoid grabbing _WaitSetLock
         // if the thread is not on the wait queue.
         //
         // Note that we don't need a fence before the fetch of TState.
         // In the worst case we'll fetch a old-stale value of TS_WAIT previously
         // written by the is thread. (perhaps the fetch might even be satisfied
         // by a look-aside into the processor's own store buffer, although given
         // the length of the code path between the prior ST and this load that's
         // highly unlikely).  If the following LD fetches a stale TS_WAIT value
         // then we'll acquire the lock and then re-fetch a fresh TState value.
         // That is, we fail toward safety.
     
         if (node.TState == ObjectWaiter::TS_WAIT) {
             Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
             if (node.TState == ObjectWaiter::TS_WAIT) {
                DequeueSpecificWaiter (&node) ;       // unlink from WaitSet
                assert(node._notified == 0, "invariant");
                node.TState = ObjectWaiter::TS_RUN ;
             }
             Thread::SpinRelease (&_WaitSetLock) ;
         }
     
         // The thread is now either on off-list (TS_RUN),
         // on the EntryList (TS_ENTER), or on the cxq (TS_CXQ).
         // The Node's TState variable is stable from the perspective of this thread.
         // No other threads will asynchronously modify TState.
         guarantee (node.TState != ObjectWaiter::TS_WAIT, "invariant") ;
         OrderAccess::loadload() ;
         if (_succ == Self) _succ = NULL ;
         WasNotified = node._notified ;
     
         // Reentry phase -- reacquire the monitor.
         // re-enter contended monitor after object.wait().
         // retain OBJECT_WAIT state until re-enter successfully completes
         // Thread state is thread_in_vm and oop access is again safe,
         // although the raw address of the object may have changed.
         // (Don't cache naked oops over safepoints, of course).
     
         // post monitor waited event. Note that this is past-tense, we are done waiting.
         if (JvmtiExport::should_post_monitor_waited()) {
           JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT);
         }
     
         if (event.should_commit()) {
           post_monitor_wait_event(&event, node._notifier_tid, millis, ret == OS_TIMEOUT);
         }
     
         OrderAccess::fence() ;
     
         assert (Self->_Stalled != 0, "invariant") ;
         Self->_Stalled = 0 ;
     
         assert (_owner != Self, "invariant") ;
         ObjectWaiter::TStates v = node.TState ;
         if (v == ObjectWaiter::TS_RUN) {
             enter (Self) ;
         } else {
             guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;
             ReenterI (Self, &node) ;
             node.wait_reenter_end(this);
         }
     
         // Self has reacquired the lock.
         // Lifecycle - the node representing Self must not appear on any queues.
         // Node is about to go out-of-scope, but even if it were immortal we wouldn't
         // want residual elements associated with this thread left on any lists.
         guarantee (node.TState == ObjectWaiter::TS_RUN, "invariant") ;
         assert    (_owner == Self, "invariant") ;
         assert    (_succ != Self , "invariant") ;
       } // OSThreadWaitState()
     
       jt->set_current_waiting_monitor(NULL);
     
       guarantee (_recursions == 0, "invariant") ;
       _recursions = save;     // restore the old recursion count
       _waiters--;             // decrement the number of waiters
     
       // Verify a few postconditions
       assert (_owner == Self       , "invariant") ;
       assert (_succ  != Self       , "invariant") ;
       assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
     
       if (SyncFlags & 32) {
          OrderAccess::fence() ;
       }
     
       // check if the notification happened
       if (!WasNotified) {
         // no, it could be timeout or Thread.interrupt() or both
         // check for interrupt event, otherwise it is timeout
         if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
           TEVENT (Wait - throw IEX from epilog) ;
           THROW(vmSymbols::java_lang_InterruptedException());
         }
       }
     
       // NOTE: Spurious wake up will be consider as timeout.
       // Monitor notify has precedence over thread interrupt.
    }

    这段函数相当的长,重点在于以下语句:

    ObjectWaiter node(Self);   Self 是Thread 对象,将当前线程封装成ObjectWaiter对象node;

    ObjectMonitor::AddWaiter() 将 node加入到  ObjectWaiter 的_WaitSet 中;

    exit (true, Self) ; // exit the monitor  线程退出monitor;

    Self->_ParkEvent->park () ;  最终底层的park方法挂起线程;

    分析park()方法

    http://hg.openjdk.java.net/jdk7u/jdk7u/hotspot/file/677234770800/src/os/linux/vm/os_linux.cpp

    void os::PlatformEvent::park() {       // AKA "down()"
      // Invariant: Only the thread associated with the Event/PlatformEvent
      // may call park().
      // TODO: assert that _Assoc != NULL or _Assoc == Self
      int v ;
      for (;;) {
          v = _Event ;
          if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
      }
      guarantee (v >= 0, "invariant") ;
      if (v == 0) {
         // Do this the hard way by blocking ...
         int status = pthread_mutex_lock(_mutex);
         assert_status(status == 0, status, "mutex_lock");
         guarantee (_nParked == 0, "invariant") ;
         ++ _nParked ;
         while (_Event < 0) {
            status = pthread_cond_wait(_cond, _mutex);  //重点分析这句
            // for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
            // Treat this the same as if the wait was interrupted
            if (status == ETIME) { status = EINTR; }
            assert_status(status == 0 || status == EINTR, status, "cond_wait");
         }
         -- _nParked ;
     
        // In theory we could move the ST of 0 into _Event past the unlock(),
        // but then we'd need a MEMBAR after the ST.
        _Event = 0 ;
         status = pthread_mutex_unlock(_mutex);
         assert_status(status == 0, status, "mutex_unlock");
      }
      guarantee (_Event >= 0, "invariant") ;
    }

    从代码中可以看出,最终wait() 在linux 下 调用 pthread_cond_wait()阻塞在条件变量上。

    notify()的jvm实现

    同理,notify()方法最终通过ObjectMonitor的notify(TRAPS)实现;

    1、如果当前_WaitSet为空,即没有正在等待的线程,直接返回;

    2、通过ObjectMonitor::DequeueWaitor()方法,获取_WaitSet列表的第一个ObjectWaitor节点,然后根据不同的调度策略,选择头插入法或者尾插入法放到entryList或者cxq;

    3、最后调用unpark()方法唤醒阻塞在条件变量上的线程。

    // Consider:
    // If the lock is cool (cxq == null && succ == null) and we're on an MP system
    // then instead of transferring a thread from the WaitSet to the EntryList
    // we might just dequeue a thread from the WaitSet and directly unpark() it.
    //我们可能仅将一个线程从WaitSet取出,并直接调用unpark()方法
     
    void ObjectMonitor::notify(TRAPS) {
      CHECK_OWNER();
      if (_WaitSet == NULL) {
         TEVENT (Empty-Notify) ;
         return ;
      }
      DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
     
      int Policy = Knob_MoveNotifyee ;
     
      Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
      ObjectWaiter * iterator = DequeueWaiter() ; //重点分析这句
      if (iterator != NULL) {
         TEVENT (Notify1 - Transfer) ;
         guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
         guarantee (iterator->_notified == 0, "invariant") ;
         if (Policy != 4) {
            iterator->TState = ObjectWaiter::TS_ENTER ;
         }
         iterator->_notified = 1 ;
         Thread * Self = THREAD;
         iterator->_notifier_tid = Self->osthread()->thread_id();
     
         ObjectWaiter * List = _EntryList ;
         if (List != NULL) {
            assert (List->_prev == NULL, "invariant") ;
            assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
            assert (List != iterator, "invariant") ;
         }
     
         if (Policy == 0) {       // prepend to EntryList
             if (List == NULL) {
                 iterator->_next = iterator->_prev = NULL ;
                 _EntryList = iterator ;
             } else {
                 List->_prev = iterator ;
                 iterator->_next = List ;//链表头插入法插入entryList
                 iterator->_prev = NULL ;
                 _EntryList = iterator ;
            }
         } else
         if (Policy == 1) {      // append to EntryList
             if (List == NULL) {
                 iterator->_next = iterator->_prev = NULL ;
                 _EntryList = iterator ;
             } else {
                // CONSIDER:  finding the tail currently requires a linear-time walk of
                // the EntryList.  We can make tail access constant-time by converting to
                // a CDLL instead of using our current DLL.
                ObjectWaiter * Tail ;
                for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
                assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
                Tail->_next = iterator ;  //链表尾插入法插入entryList
                iterator->_prev = Tail ;
                iterator->_next = NULL ;
            }
         } else
         if (Policy == 2) {      // prepend to cxq
             // prepend to cxq
             if (List == NULL) {
                 iterator->_next = iterator->_prev = NULL ;
                 _EntryList = iterator ;
             } else {
                iterator->TState = ObjectWaiter::TS_CXQ ;
                for (;;) {
                    ObjectWaiter * Front = _cxq ;
                    iterator->_next = Front ;  //头插入法插入cxq
                    if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
                        break ;
                    }
                }
             }
         } else
         if (Policy == 3) {      // append to cxq
            iterator->TState = ObjectWaiter::TS_CXQ ;
            for (;;) {
                ObjectWaiter * Tail ;
                Tail = _cxq ;
                if (Tail == NULL) {
                    iterator->_next = NULL ;
                    if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
                       break ;
                    }
                } else {
                    while (Tail->_next != NULL) Tail = Tail->_next ;
                    Tail->_next = iterator ;  //尾插入法插入cxq
                    iterator->_prev = Tail ;
                    iterator->_next = NULL ;
                    break ;
                }
            }
         } else {
            ParkEvent * ev = iterator->_event ;
            iterator->TState = ObjectWaiter::TS_RUN ;
            OrderAccess::fence() ;
            ev->unpark() ;  //重点分析这句
         }
     
         if (Policy < 4) {
           iterator->wait_reenter_begin(this);
         }
     
         // _WaitSetLock protects the wait queue, not the EntryList.  We could
         // move the add-to-EntryList operation, above, outside the critical section
         // protected by _WaitSetLock.  In practice that's not useful.  With the
         // exception of  wait() timeouts and interrupts the monitor owner
         // is the only thread that grabs _WaitSetLock.  There's almost no contention
         // on _WaitSetLock so it's not profitable to reduce the length of the
         // critical section.
      }
     
      Thread::SpinRelease (&_WaitSetLock) ;
     
      if (iterator != NULL && ObjectMonitor::_sync_Notifications != NULL) {
         ObjectMonitor::_sync_Notifications->inc() ;
      }

    分析unpark()方法

    http://hg.openjdk.java.net/jdk7u/jdk7u/hotspot/file/677234770800/src/os/linux/vm/os_linux.cpp

    void os::PlatformEvent::unpark() {
      // Transitions for _Event:
      //    0 :=> 1
      //    1 :=> 1
      //   -1 :=> either 0 or 1; must signal target thread
      //          That is, we can safely transition _Event from -1 to either
      //          0 or 1. Forcing 1 is slightly more efficient for back-to-back
      //          unpark() calls.
      // See also: "Semaphores in Plan 9" by Mullender & Cox
      //
      // Note: Forcing a transition from "-1" to "1" on an unpark() means
      // that it will take two back-to-back park() calls for the owning
      // thread to block. This has the benefit of forcing a spurious return
      // from the first park() call after an unpark() call which will help
      // shake out uses of park() and unpark() without condition variables.
     
      if (Atomic::xchg(1, &_Event) >= 0) return;
     
      // Wait for the thread associated with the event to vacate
      int status = pthread_mutex_lock(_mutex);
      assert_status(status == 0, status, "mutex_lock");
      int AnyWaiters = _nParked;
      assert(AnyWaiters == 0 || AnyWaiters == 1, "invariant");
      if (AnyWaiters != 0 && WorkAroundNPTLTimedWaitHang) {
        AnyWaiters = 0;
        pthread_cond_signal(_cond);
      }
      status = pthread_mutex_unlock(_mutex);
      assert_status(status == 0, status, "mutex_unlock");
      if (AnyWaiters != 0) {
        status = pthread_cond_signal(_cond);
        assert_status(status == 0, status, "cond_signal");
      }
     
      // Note that we signal() _after dropping the lock for "immortal" Events.
      // This is safe and avoids a common class of  futile wakeups.  In rare
      // circumstances this can cause a thread to return prematurely from
      // cond_{timed}wait() but the spurious wakeup is benign and the victim will
      // simply re-test the condition and re-park itself.
    }

    从上面代码可以看出,最终notify()方法在linux下调用pthread_cond_signal()唤醒阻塞在条件变量上的线程。

  • 相关阅读:
    android-为应用单元测试
    android手机拨号器实现
    android模拟器使用
    android开发环境搭建
    C语言之关键字
    linux shell脚本基础-----3
    linux shell脚本基础-----2
    linux shell脚本基础-----1
    Android学习计划
    MySql 绿色版配置
  • 原文地址:https://www.cnblogs.com/geektcp/p/10589507.html
Copyright © 2011-2022 走看看