zoukankan      html  css  js  c++  java
  • java.util.concurrent.locks.LockSupport用法

      在看AQS内部的时候发现很多使用java.util.concurrent.locks.LockSupport类的东西。 比如CountDownLatch.await 阻塞的时候以及使用阻塞队列进行take、take 方法在线程阻塞的时候也是使用的该类。下面研究其主要的使用方法。

    1. 线程状态简单理解

    一开始学习线程的时候线程的状态如下:

    1、新建状态NEW
    
      new了线程但是没有开始执行,比如: Thread t1 = new Thread();t1就是一个新建状态的线程。
    
    2、可运行状态RUNNABLE
    
      new出来线程,调用start()方法即处于RUNNABLE状态了。处于RUNNABLE状态的线程可能正在Java虚拟机中运行,也可能正在等待处理器的资源,因为一个线程必须获得CPU的资源后,才可以运行其run()方法中的内容,否则排队等待
    
    3、阻塞BLOCKED
    
      如果某一线程正在等待监视器锁,以便进入一个同步的块/方法,那么这个线程的状态就是阻塞BLOCKED
    
    4、等待WAITING
    
      某一线程因为调用不带超时的Object的wait()方法、不带超时的Thread的join()方法、LockSupport的park()方法,就会处于等待WAITING状态
    
    5、超时等待TIMED_WAITING
    
      某一线程因为调用带有指定正等待时间的Object的wait()方法、Thread的join()方法、Thread的sleep()方法、LockSupport的parkNanos()方法、LockSupport的parkUntil()方法,就会处于超时等待TIMED_WAITING状态
    
    6、终止状态TERMINATED
    
      线程调用终止或者run()方法执行结束后,线程即处于终止状态。处于终止状态的线程不具备继续运行的能力。

      可以看到当调用park 方法之后进入WAITING 状态。

    2. 主要API

    1. 主要API如下:

    源码如下:

    /*
     * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     *
     */
    
    /*
     *
     *
     *
     *
     *
     * Written by Doug Lea with assistance from members of JCP JSR-166
     * Expert Group and released to the public domain, as explained at
     * http://creativecommons.org/publicdomain/zero/1.0/
     */
    
    package java.util.concurrent.locks;
    import sun.misc.Unsafe;
    
    /**
     * Basic thread blocking primitives for creating locks and other
     * synchronization classes.
     *
     * <p>This class associates, with each thread that uses it, a permit
     * (in the sense of the {@link java.util.concurrent.Semaphore
     * Semaphore} class). A call to {@code park} will return immediately
     * if the permit is available, consuming it in the process; otherwise
     * it <em>may</em> block.  A call to {@code unpark} makes the permit
     * available, if it was not already available. (Unlike with Semaphores
     * though, permits do not accumulate. There is at most one.)
     *
     * <p>Methods {@code park} and {@code unpark} provide efficient
     * means of blocking and unblocking threads that do not encounter the
     * problems that cause the deprecated methods {@code Thread.suspend}
     * and {@code Thread.resume} to be unusable for such purposes: Races
     * between one thread invoking {@code park} and another thread trying
     * to {@code unpark} it will preserve liveness, due to the
     * permit. Additionally, {@code park} will return if the caller's
     * thread was interrupted, and timeout versions are supported. The
     * {@code park} method may also return at any other time, for "no
     * reason", so in general must be invoked within a loop that rechecks
     * conditions upon return. In this sense {@code park} serves as an
     * optimization of a "busy wait" that does not waste as much time
     * spinning, but must be paired with an {@code unpark} to be
     * effective.
     *
     * <p>The three forms of {@code park} each also support a
     * {@code blocker} object parameter. This object is recorded while
     * the thread is blocked to permit monitoring and diagnostic tools to
     * identify the reasons that threads are blocked. (Such tools may
     * access blockers using method {@link #getBlocker(Thread)}.)
     * The use of these forms rather than the original forms without this
     * parameter is strongly encouraged. The normal argument to supply as
     * a {@code blocker} within a lock implementation is {@code this}.
     *
     * <p>These methods are designed to be used as tools for creating
     * higher-level synchronization utilities, and are not in themselves
     * useful for most concurrency control applications.  The {@code park}
     * method is designed for use only in constructions of the form:
     *
     *  <pre> {@code
     * while (!canProceed()) { ... LockSupport.park(this); }}</pre>
     *
     * where neither {@code canProceed} nor any other actions prior to the
     * call to {@code park} entail locking or blocking.  Because only one
     * permit is associated with each thread, any intermediary uses of
     * {@code park} could interfere with its intended effects.
     *
     * <p><b>Sample Usage.</b> Here is a sketch of a first-in-first-out
     * non-reentrant lock class:
     *  <pre> {@code
     * class FIFOMutex {
     *   private final AtomicBoolean locked = new AtomicBoolean(false);
     *   private final Queue<Thread> waiters
     *     = new ConcurrentLinkedQueue<Thread>();
     *
     *   public void lock() {
     *     boolean wasInterrupted = false;
     *     Thread current = Thread.currentThread();
     *     waiters.add(current);
     *
     *     // Block while not first in queue or cannot acquire lock
     *     while (waiters.peek() != current ||
     *            !locked.compareAndSet(false, true)) {
     *       LockSupport.park(this);
     *       if (Thread.interrupted()) // ignore interrupts while waiting
     *         wasInterrupted = true;
     *     }
     *
     *     waiters.remove();
     *     if (wasInterrupted)          // reassert interrupt status on exit
     *       current.interrupt();
     *   }
     *
     *   public void unlock() {
     *     locked.set(false);
     *     LockSupport.unpark(waiters.peek());
     *   }
     * }}</pre>
     */
    public class LockSupport {
        private LockSupport() {} // Cannot be instantiated.
    
        private static void setBlocker(Thread t, Object arg) {
            // Even though volatile, hotspot doesn't need a write barrier here.
            UNSAFE.putObject(t, parkBlockerOffset, arg);
        }
    
        /**
         * Makes available the permit for the given thread, if it
         * was not already available.  If the thread was blocked on
         * {@code park} then it will unblock.  Otherwise, its next call
         * to {@code park} is guaranteed not to block. This operation
         * is not guaranteed to have any effect at all if the given
         * thread has not been started.
         *
         * @param thread the thread to unpark, or {@code null}, in which case
         *        this operation has no effect
         */
        public static void unpark(Thread thread) {
            if (thread != null)
                UNSAFE.unpark(thread);
        }
    
        /**
         * Disables the current thread for thread scheduling purposes unless the
         * permit is available.
         *
         * <p>If the permit is available then it is consumed and the call returns
         * immediately; otherwise
         * the current thread becomes disabled for thread scheduling
         * purposes and lies dormant until one of three things happens:
         *
         * <ul>
         * <li>Some other thread invokes {@link #unpark unpark} with the
         * current thread as the target; or
         *
         * <li>Some other thread {@linkplain Thread#interrupt interrupts}
         * the current thread; or
         *
         * <li>The call spuriously (that is, for no reason) returns.
         * </ul>
         *
         * <p>This method does <em>not</em> report which of these caused the
         * method to return. Callers should re-check the conditions which caused
         * the thread to park in the first place. Callers may also determine,
         * for example, the interrupt status of the thread upon return.
         *
         * @param blocker the synchronization object responsible for this
         *        thread parking
         * @since 1.6
         */
        public static void park(Object blocker) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, 0L);
            setBlocker(t, null);
        }
    
        /**
         * Disables the current thread for thread scheduling purposes, for up to
         * the specified waiting time, unless the permit is available.
         *
         * <p>If the permit is available then it is consumed and the call
         * returns immediately; otherwise the current thread becomes disabled
         * for thread scheduling purposes and lies dormant until one of four
         * things happens:
         *
         * <ul>
         * <li>Some other thread invokes {@link #unpark unpark} with the
         * current thread as the target; or
         *
         * <li>Some other thread {@linkplain Thread#interrupt interrupts}
         * the current thread; or
         *
         * <li>The specified waiting time elapses; or
         *
         * <li>The call spuriously (that is, for no reason) returns.
         * </ul>
         *
         * <p>This method does <em>not</em> report which of these caused the
         * method to return. Callers should re-check the conditions which caused
         * the thread to park in the first place. Callers may also determine,
         * for example, the interrupt status of the thread, or the elapsed time
         * upon return.
         *
         * @param blocker the synchronization object responsible for this
         *        thread parking
         * @param nanos the maximum number of nanoseconds to wait
         * @since 1.6
         */
        public static void parkNanos(Object blocker, long nanos) {
            if (nanos > 0) {
                Thread t = Thread.currentThread();
                setBlocker(t, blocker);
                UNSAFE.park(false, nanos);
                setBlocker(t, null);
            }
        }
    
        /**
         * Disables the current thread for thread scheduling purposes, until
         * the specified deadline, unless the permit is available.
         *
         * <p>If the permit is available then it is consumed and the call
         * returns immediately; otherwise the current thread becomes disabled
         * for thread scheduling purposes and lies dormant until one of four
         * things happens:
         *
         * <ul>
         * <li>Some other thread invokes {@link #unpark unpark} with the
         * current thread as the target; or
         *
         * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
         * current thread; or
         *
         * <li>The specified deadline passes; or
         *
         * <li>The call spuriously (that is, for no reason) returns.
         * </ul>
         *
         * <p>This method does <em>not</em> report which of these caused the
         * method to return. Callers should re-check the conditions which caused
         * the thread to park in the first place. Callers may also determine,
         * for example, the interrupt status of the thread, or the current time
         * upon return.
         *
         * @param blocker the synchronization object responsible for this
         *        thread parking
         * @param deadline the absolute time, in milliseconds from the Epoch,
         *        to wait until
         * @since 1.6
         */
        public static void parkUntil(Object blocker, long deadline) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(true, deadline);
            setBlocker(t, null);
        }
    
        /**
         * Returns the blocker object supplied to the most recent
         * invocation of a park method that has not yet unblocked, or null
         * if not blocked.  The value returned is just a momentary
         * snapshot -- the thread may have since unblocked or blocked on a
         * different blocker object.
         *
         * @param t the thread
         * @return the blocker
         * @throws NullPointerException if argument is null
         * @since 1.6
         */
        public static Object getBlocker(Thread t) {
            if (t == null)
                throw new NullPointerException();
            return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
        }
    
        /**
         * Disables the current thread for thread scheduling purposes unless the
         * permit is available.
         *
         * <p>If the permit is available then it is consumed and the call
         * returns immediately; otherwise the current thread becomes disabled
         * for thread scheduling purposes and lies dormant until one of three
         * things happens:
         *
         * <ul>
         *
         * <li>Some other thread invokes {@link #unpark unpark} with the
         * current thread as the target; or
         *
         * <li>Some other thread {@linkplain Thread#interrupt interrupts}
         * the current thread; or
         *
         * <li>The call spuriously (that is, for no reason) returns.
         * </ul>
         *
         * <p>This method does <em>not</em> report which of these caused the
         * method to return. Callers should re-check the conditions which caused
         * the thread to park in the first place. Callers may also determine,
         * for example, the interrupt status of the thread upon return.
         */
        public static void park() {
            UNSAFE.park(false, 0L);
        }
    
        /**
         * Disables the current thread for thread scheduling purposes, for up to
         * the specified waiting time, unless the permit is available.
         *
         * <p>If the permit is available then it is consumed and the call
         * returns immediately; otherwise the current thread becomes disabled
         * for thread scheduling purposes and lies dormant until one of four
         * things happens:
         *
         * <ul>
         * <li>Some other thread invokes {@link #unpark unpark} with the
         * current thread as the target; or
         *
         * <li>Some other thread {@linkplain Thread#interrupt interrupts}
         * the current thread; or
         *
         * <li>The specified waiting time elapses; or
         *
         * <li>The call spuriously (that is, for no reason) returns.
         * </ul>
         *
         * <p>This method does <em>not</em> report which of these caused the
         * method to return. Callers should re-check the conditions which caused
         * the thread to park in the first place. Callers may also determine,
         * for example, the interrupt status of the thread, or the elapsed time
         * upon return.
         *
         * @param nanos the maximum number of nanoseconds to wait
         */
        public static void parkNanos(long nanos) {
            if (nanos > 0)
                UNSAFE.park(false, nanos);
        }
    
        /**
         * Disables the current thread for thread scheduling purposes, until
         * the specified deadline, unless the permit is available.
         *
         * <p>If the permit is available then it is consumed and the call
         * returns immediately; otherwise the current thread becomes disabled
         * for thread scheduling purposes and lies dormant until one of four
         * things happens:
         *
         * <ul>
         * <li>Some other thread invokes {@link #unpark unpark} with the
         * current thread as the target; or
         *
         * <li>Some other thread {@linkplain Thread#interrupt interrupts}
         * the current thread; or
         *
         * <li>The specified deadline passes; or
         *
         * <li>The call spuriously (that is, for no reason) returns.
         * </ul>
         *
         * <p>This method does <em>not</em> report which of these caused the
         * method to return. Callers should re-check the conditions which caused
         * the thread to park in the first place. Callers may also determine,
         * for example, the interrupt status of the thread, or the current time
         * upon return.
         *
         * @param deadline the absolute time, in milliseconds from the Epoch,
         *        to wait until
         */
        public static void parkUntil(long deadline) {
            UNSAFE.park(true, deadline);
        }
    
        /**
         * Returns the pseudo-randomly initialized or updated secondary seed.
         * Copied from ThreadLocalRandom due to package access restrictions.
         */
        static final int nextSecondarySeed() {
            int r;
            Thread t = Thread.currentThread();
            if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
                r ^= r << 13;   // xorshift
                r ^= r >>> 17;
                r ^= r << 5;
            }
            else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
                r = 1; // avoid zero
            UNSAFE.putInt(t, SECONDARY, r);
            return r;
        }
    
        // Hotspot implementation via intrinsics API
        private static final sun.misc.Unsafe UNSAFE;
        private static final long parkBlockerOffset;
        private static final long SEED;
        private static final long PROBE;
        private static final long SECONDARY;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> tk = Thread.class;
                parkBlockerOffset = UNSAFE.objectFieldOffset
                    (tk.getDeclaredField("parkBlocker"));
                SEED = UNSAFE.objectFieldOffset
                    (tk.getDeclaredField("threadLocalRandomSeed"));
                PROBE = UNSAFE.objectFieldOffset
                    (tk.getDeclaredField("threadLocalRandomProbe"));
                SECONDARY = UNSAFE.objectFieldOffset
                    (tk.getDeclaredField("threadLocalRandomSecondarySeed"));
            } catch (Exception ex) { throw new Error(ex); }
        }
    
    }
    View Code

    2. 使用

    import java.util.concurrent.locks.LockSupport;
    
    public class PlainTest {
    
        public static void main(String[] args) throws InterruptedException {
            Thread thread = new Thread(() -> {
                System.out.println("111222");
                LockSupport.park();
                System.out.println("222333");
                try {
                    Thread.sleep(3*1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("333444");
            });
            thread.start();
    
            Thread.sleep(1*1000);
            System.out.println("thread.getState(): " + thread.getState() + "\t1");
            LockSupport.unpark(thread);
            Thread.sleep(1*1000);
            System.out.println("thread.getState(): " + thread.getState() + "\t2");
            Thread.sleep(3*1000);
            System.out.println("thread.getState(): " + thread.getState() + "\t3");
        }
    }

    结果:

    111222
    thread.getState(): WAITING    1
    222333
    thread.getState(): TIMED_WAITING    2
    333444
    thread.getState(): TERMINATED    3

    2. park 方法

      park用于挂起当前线程。 当前线程进入等待或者超时等待状态。其恢复的条件是调用unpark、其它线程中断了线程、带参数的park 时间到达指定时间。

    1. park 可以设置一个blocker 参数, 也可以不设置。设置之后可以获取到当前线程阻塞的信息。

    设置的时候会通过unsafe(parkBlockerOffset 偏移量获取到thread 对象的parkBlocker 的偏移量), 然后设置到java.lang.Thread#parkBlocker。

    java.lang.Thread#parkBlocker:

        /**
         * The argument supplied to the current call to
         * java.util.concurrent.locks.LockSupport.park.
         * Set by (private) java.util.concurrent.locks.LockSupport.setBlocker
         * Accessed using java.util.concurrent.locks.LockSupport.getBlocker
         */
        volatile Object parkBlocker;

    (1) 不设置:

    import java.util.concurrent.locks.LockSupport;
    
    public class PlainTest {
    
        public static void main(String[] args) {
            LockSupport.park();
        }
    }

    jstack查看线程信息:

    "main" #1 prio=5 os_prio=0 tid=0x00000224f146d000 nid=0x3524 waiting on condition [0x00000081e89fe000]
       java.lang.Thread.State: WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)
            at PlainTest.main(PlainTest.java:6)

    (2) 设置blocker

    代码:

    import java.util.concurrent.locks.LockSupport;
    
    public class PlainTest {
    
        public static void main(String[] args) {
            LockSupport.park(new Object());
        }
    }

    结果:

    "main" #1 prio=5 os_prio=0 tid=0x000002402c6ad800 nid=0x4a14 waiting on condition [0x000000d0ed6ff000]
       java.lang.Thread.State: WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for  <0x000000076caeab80> (a java.lang.Object)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at PlainTest.main(PlainTest.java:6)

    (3) 设置超时时间

    import java.util.concurrent.locks.LockSupport;
    
    public class PlainTest {
    
        public static void main(String[] args) {
            LockSupport.parkNanos(Long.MAX_VALUE);
        }
    }

    结果:

    "main" #1 prio=5 os_prio=0 tid=0x000001e39c82c800 nid=0x3e70 waiting on condition [0x000000215d5ff000]
       java.lang.Thread.State: TIMED_WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:338)
            at PlainTest.main(PlainTest.java:6)

    3. unpark 需要设置一个线程进行解除阻塞

      用于解除线程的阻塞。注意也可以先unpark, 在park。 只不过先unpark、后park, 调用park的时候相当于线程不会进行阻塞。多次unpark 和 一次unpark 的效果一样, 只能对一次park 生效。

      下面研究其原理。

    3. 原理

    1. park 原理

    park 调用的最终是: sun.misc.Unsafe#park, 是一个native 方法

        public native void park(boolean var1, long var2);

      其参数有两个, 第一个是是否是相对时间(isAbsolute), 第二个参数是时间。

      对于第一个参数,LockSupport 使用的时候只有Ijava.util.concurrent.locks.LockSupport#parkUntil(java.lang.Object, long)传递的是true(代表是相对时间), 其他是false。

    接下来查看其调用到C++相关方法。

    1. \openjdk\hotspot\src\share\vm\prims\unsafe.cpp 内部的方法:

    UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
      UnsafeWrapper("Unsafe_Park");
      EventThreadPark event;
    #ifndef USDT2
      HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time);
    #else /* USDT2 */
       HOTSPOT_THREAD_PARK_BEGIN(
                                 (uintptr_t) thread->parker(), (int) isAbsolute, time);
    #endif /* USDT2 */
      JavaThreadParkedState jtps(thread, time != 0);
      thread->parker()->park(isAbsolute != 0, time);
    #ifndef USDT2
      HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker());
    #else /* USDT2 */
      HOTSPOT_THREAD_PARK_END(
                              (uintptr_t) thread->parker());
    #endif /* USDT2 */
      if (event.should_commit()) {
        oop obj = thread->current_park_blocker();
        event.set_klass((obj != NULL) ? obj->klass() : NULL);
        event.set_timeout(time);
        event.set_address((obj != NULL) ? (TYPE_ADDRESS) cast_from_oop<uintptr_t>(obj) : 0);
        event.commit();
      }
    UNSAFE_END

     核心是在thread->parker()->park(isAbsolute != 0, time); 这一行代码,调用线程内部的parker() 获取到parker 之后继续调用 park 方法。(每个线程对象都有一个parker对象)

    parker 对象:openjdk\hotspot\src\share\vm\runtime\park.hpp

    class Parker : public os::PlatformParker {
    private:
      volatile int _counter ;
      Parker * FreeNext ;
      JavaThread * AssociatedWith ; // Current association
    
    public:
      Parker() : PlatformParker() {
        _counter       = 0 ;
        FreeNext       = NULL ;
        AssociatedWith = NULL ;
      }
    protected:
      ~Parker() { ShouldNotReachHere(); }
    public:
      // For simplicity of interface with Java, all forms of park (indefinite,
      // relative, and absolute) are multiplexed into one call.
      void park(bool isAbsolute, jlong time);
      void unpark();
    
      // Lifecycle operators
      static Parker * Allocate (JavaThread * t) ;
      static void Release (Parker * e) ;
    private:
      static Parker * volatile FreeList ;
      static volatile int ListLock ;
    
    };

      _counter 属性是起重要作用的属性。

      其父类有互斥变量等属性:\openjdk\hotspot\src\os\linux\vm\os_linux.hpp

    class PlatformParker : public CHeapObj<mtInternal> {
      protected:
        enum {
            REL_INDEX = 0,
            ABS_INDEX = 1
        };
        int _cur_index;  // which cond is in use: -1, 0, 1
        pthread_mutex_t _mutex [1] ;
        pthread_cond_t  _cond  [2] ; // one for relative times and one for abs.
    
      public:       // TODO-FIXME: make dtor private
        ~PlatformParker() { guarantee (0, "invariant") ; }
    
      public:
        PlatformParker() {
          int status;
          status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());
          assert_status(status == 0, status, "cond_init rel");
          status = pthread_cond_init (&_cond[ABS_INDEX], NULL);
          assert_status(status == 0, status, "cond_init abs");
          status = pthread_mutex_init (_mutex, NULL);
          assert_status(status == 0, status, "mutex_init");
          _cur_index = -1; // mark as unused
        }
    };
    
    #endif // OS_LINUX_VM_OS_LINUX_HPP

    2. park 方法根据操作系统不同交给对应的实现:

    比如: openjdk\hotspot\src\os\solaris\vm\os_solaris.cpp

    void Parker::park(bool isAbsolute, jlong time) {
      // Ideally we'd do something useful while spinning, such
      // as calling unpackTime().
    
      // Optional fast-path check:
      // Return immediately if a permit is available.
      // We depend on Atomic::xchg() having full barrier semantics
      // since we are doing a lock-free update to _counter.
        // 如果_counter 属性大于0, 代表有许可,直接返回
      if (Atomic::xchg(0, &_counter) > 0) return;
    
      // Optional fast-exit: Check interrupt before trying to wait
      Thread* thread = Thread::current();
      assert(thread->is_Java_thread(), "Must be JavaThread");
      JavaThread *jt = (JavaThread *)thread;
      if (Thread::is_interrupted(thread, false)) {
        return;
      }
    
      // First, demultiplex/decode time arguments
      timespec absTime;
       // 如果time 参数小于0, 或者是绝对时间且时间等于0, 直接返回 
     if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
        return;
      }
      if (time > 0) {
        // Warning: this code might be exposed to the old Solaris time
        // round-down bugs.  Grep "roundingFix" for details.
      // 将时间换算后保存起来
        unpackTime(&absTime, isAbsolute, time);
      }
    
      // Enter safepoint region
      // Beware of deadlocks such as 6317397.
      // The per-thread Parker:: _mutex is a classic leaf-lock.
      // In particular a thread must never block on the Threads_lock while
      // holding the Parker:: mutex.  If safepoints are pending both the
      // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
      ThreadBlockInVM tbivm(jt);
    
      // Don't wait if cannot get lock since interference arises from
      // unblocking.  Also. check interrupt before trying wait
      // 如果线程被中断,或者是在尝试给互斥变量加锁的过程中,加锁失败,比如被其它线程锁住了,直接返回
      if (Thread::is_interrupted(thread, false) ||
          os::Solaris::mutex_trylock(_mutex) != 0) {
        return;
      }
    
      int status ;
    
      // 走到这里代表有_counter 大于0, 则将其重置为0。
      if (_counter > 0)  { // no wait needed
        _counter = 0;
      // 对互斥变量解锁
        status = os::Solaris::mutex_unlock(_mutex);
        assert (status == 0, "invariant") ;
        // Paranoia to ensure our locked and lock-free paths interact
        // correctly with each other and Java-level accesses.
        OrderAccess::fence();
        return;
      }
    
    #ifdef ASSERT
      // Don't catch signals while blocked; let the running threads have the signals.
      // (This allows a debugger to break into the running thread.)
      sigset_t oldsigs;
      sigset_t* allowdebug_blocked = os::Solaris::allowdebug_blocked_signals();
      thr_sigsetmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
    #endif
    
      OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
      jt->set_suspend_equivalent();
      // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
    
      // Do this the hard way by blocking ...
      // See http://monaco.sfbay/detail.jsf?cr=5094058.
      // TODO-FIXME: for Solaris SPARC set fprs.FEF=0 prior to parking.
      // Only for SPARC >= V8PlusA
    #if defined(__sparc) && defined(COMPILER2)
      if (ClearFPUAtPark) { _mark_fpu_nosave() ; }
    #endif
    
      if (time == 0) {
        status = os::Solaris::cond_wait (_cond, _mutex) ;
      } else {
        status = os::Solaris::cond_timedwait (_cond, _mutex, &absTime);
      }
      // Note that an untimed cond_wait() can sometimes return ETIME on older
      // versions of the Solaris.
      assert_status(status == 0 || status == EINTR ||
                    status == ETIME || status == ETIMEDOUT,
                    status, "cond_timedwait");
    
    #ifdef ASSERT
      thr_sigsetmask(SIG_SETMASK, &oldsigs, NULL);
    #endif
      _counter = 0 ;
      status = os::Solaris::mutex_unlock(_mutex);
      assert_status(status == 0, status, "mutex_unlock") ;
      // Paranoia to ensure our locked and lock-free paths interact
      // correctly with each other and Java-level accesses.
      OrderAccess::fence();
    
      // If externally suspended while waiting, re-suspend
      if (jt->handle_special_suspend_equivalent_condition()) {
        jt->java_suspend_self();
      }
    }

    (1) ThreadBlockInVM tbivm(jt); 是修改线程状态为阻塞。 相当于创建一个ThreadBlockInVM  对象, 变量名为tbivm, 参数为jt

    \openjdk\hotspot\src\share\vm\runtime\interfaceSupport.hpp

    class ThreadBlockInVM : public ThreadStateTransition {
     public:
      ThreadBlockInVM(JavaThread *thread)
      : ThreadStateTransition(thread) {
        // Once we are blocked vm expects stack to be walkable
        thread->frame_anchor()->make_walkable(thread);
        trans_and_fence(_thread_in_vm, _thread_blocked);
      }
      ~ThreadBlockInVM() {
        trans_and_fence(_thread_blocked, _thread_in_vm);
        // We don't need to clear_walkable because it will happen automagically when we return to java
      }
    };

     然后调用到: D:\study\sourcecode\openjdk\openjdk\hotspot\src\share\vm\runtime\interfaceSupport.hpp 中的 transition_and_fence

      // transition_and_fence must be used on any thread state transition
      // where there might not be a Java call stub on the stack, in
      // particular on Windows where the Structured Exception Handler is
      // set up in the call stub. os::write_memory_serialize_page() can
      // fault and we can't recover from it on Windows without a SEH in
      // place.
      static inline void transition_and_fence(JavaThread *thread, JavaThreadState from, JavaThreadState to) {
        assert(thread->thread_state() == from, "coming from wrong thread state");
        assert((from & 1) == 0 && (to & 1) == 0, "odd numbers are transitions states");
        // Change to transition state (assumes total store ordering!  -Urs)
        thread->set_thread_state((JavaThreadState)(from + 1));
    
        // Make sure new state is seen by VM thread
        if (os::is_MP()) {
          if (UseMembar) {
            // Force a fence between the write above and read below
            OrderAccess::fence();
          } else {
            // Must use this rather than serialization page in particular on Windows
            InterfaceSupport::serialize_memory(thread);
          }
        }
    
        if (SafepointSynchronize::do_call_back()) {
          SafepointSynchronize::block(thread);
        }
        thread->set_thread_state(to);
    
        CHECK_UNHANDLED_OOPS_ONLY(thread->clear_unhandled_oops();)
      }

    定义的线程状态: openjdk\hotspot\src\share\vm\utilities\globalDefinitions.hpp

    enum JavaThreadState {
      _thread_uninitialized     =  0, // should never happen (missing initialization)
      _thread_new               =  2, // just starting up, i.e., in process of being initialized
      _thread_new_trans         =  3, // corresponding transition state (not used, included for completness)
      _thread_in_native         =  4, // running in native code
      _thread_in_native_trans   =  5, // corresponding transition state
      _thread_in_vm             =  6, // running in VM
      _thread_in_vm_trans       =  7, // corresponding transition state
      _thread_in_Java           =  8, // running in Java or in stub code
      _thread_in_Java_trans     =  9, // corresponding transition state (not used, included for completness)
      _thread_blocked           = 10, // blocked in vm
      _thread_blocked_trans     = 11, // corresponding transition state
      _thread_max_state         = 12  // maximum thread state+1 - used for statistics allocation
    };

    2. unpark 原理

    \openjdk\hotspot\src\os\solaris\vm\os_solaris.cpp 中 unpark 方法如下:

    void Parker::unpark() {
      int s, status ;
      // 获取互斥锁
      status = os::Solaris::mutex_lock (_mutex) ;
      assert (status == 0, "invariant") ;
      // s记录原来的_counter 的值
      s = _counter;
      // _counter 设置为0
      _counter = 1;
      // 释放互斥锁
      status = os::Solaris::mutex_unlock (_mutex) ;
      assert (status == 0, "invariant") ;
    
      // 如果原来的_counter为0, 证明有线程调用park 在等待信号。 则调用下面方法通知线程解除阻塞。 则原来park 等待的线程会继续后面的代码
      if (s < 1) {
        status = os::Solaris::cond_signal (_cond) ;
        assert (status == 0, "invariant") ;
      }
    }

      unpark本身就是将_counter 设置为1,并通知条件阻塞的线程已经可以结束等待了. 如果多次连续调用unpark 方法,则s < 1 不成立, 也就不会走下面的方法。

    cond_signal 调用 \openjdk\hotspot\src\os\solaris\vm\os_solaris.hpp 中 下面代码:

      static int cond_signal(cond_t *cv)            { return _cond_signal(cv); }

    总结: 每个线程都有一个parker 对象,内部包含_counter 可以视作许可证。 每次park 的时候相当于等待该许可证(等待该变量改为1), 调用unpark 相当于将许可证变量改为1。

    【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
  • 相关阅读:
    Spring创建对象的原理
    java.io.WriteAbortedException异常
    在servlet中返回json数据
    Java中导入导出Excel -- POI技术
    Java文件下载
    MySql 分页关键字(limit)
    从dao层查出的数据到页面时数值都是零的异常
    注解
    事务的四大特性
    Java-事务管理
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/15553946.html
Copyright © 2011-2022 走看看