zoukankan      html  css  js  c++  java
  • Java多线程之JUC包:Condition源码学习笔记

    若有不正之处请多多谅解,并欢迎批评指正。

    请尊重作者劳动成果,转载请标明原文链接:

    http://www.cnblogs.com/go2sea/p/5630355.html

    Condition在JUC框架下提供了传统Java监视器风格的wait、notify和notifyAll相似的功能。

    Condition必须被绑定到一个独占锁上使用。ReentrantLock中获取Condition的方法为:

        public Condition newCondition() {
            return sync.newCondition();
        }
            
            final ConditionObject newCondition() {
                return new ConditionObject();
            }

    直接初始化并返回了一个AQS提供的ConditionObject对象。因此,Condition实际上是AQS框架的内容。ConditionObject通过维护两个成员变量:

            /** First node of condition queue. */
            private transient Node firstWaiter;
            /** Last node of condition queue. */
            private transient Node lastWaiter;

    来维护一个Condition等待队列,并通过signal操作将Condition队列中的线程移到Sync锁等待队列。

    源代码:

    public class ConditionObject implements Condition, java.io.Serializable {
            private static final long serialVersionUID = 1173984872572414699L;
            /** First node of condition queue. */
            private transient Node firstWaiter;
            /** Last node of condition queue. */
            private transient Node lastWaiter;
    
            /**
             * Creates a new {@code ConditionObject} instance.
             */
            public ConditionObject() { }
    
            // Internal methods
    
            /**
             * Adds a new waiter to wait queue.
             * @return its new wait node
             */
            private Node addConditionWaiter() {
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    
            /**
             * Removes and transfers nodes until hit non-cancelled one or
             * null. Split out from signal in part to encourage compilers
             * to inline the case of no waiters.
             * @param first (non-null) the first node on condition queue
             */
            private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
    
            /**
             * Removes and transfers all nodes.
             * @param first (non-null) the first node on condition queue
             */
            private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                    first = next;
                } while (first != null);
            }
    
            /**
             * Unlinks cancelled waiter nodes from condition queue.
             * Called only while holding lock. This is called when
             * cancellation occurred during condition wait, and upon
             * insertion of a new waiter when lastWaiter is seen to have
             * been cancelled. This method is needed to avoid garbage
             * retention in the absence of signals. So even though it may
             * require a full traversal, it comes into play only when
             * timeouts or cancellations occur in the absence of
             * signals. It traverses all nodes rather than stopping at a
             * particular target to unlink all pointers to garbage nodes
             * without requiring many re-traversals during cancellation
             * storms.
             */
            private void unlinkCancelledWaiters() {
                Node t = firstWaiter;
                Node trail = null;
                while (t != null) {
                    Node next = t.nextWaiter;
                    if (t.waitStatus != Node.CONDITION) {
                        t.nextWaiter = null;
                        if (trail == null)
                            firstWaiter = next;
                        else
                            trail.nextWaiter = next;
                        if (next == null)
                            lastWaiter = trail;
                    }
                    else
                        trail = t;
                    t = next;
                }
            }
    
            // public methods
    
            /**
             * Moves the longest-waiting thread, if one exists, from the
             * wait queue for this condition to the wait queue for the
             * owning lock.
             *
             * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
             *         returns {@code false}
             */
            public final void signal() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    
            /**
             * Moves all threads from the wait queue for this condition to
             * the wait queue for the owning lock.
             *
             * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
             *         returns {@code false}
             */
            public final void signalAll() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignalAll(first);
            }
    
            /**
             * Implements uninterruptible condition wait.
             * <ol>
             * <li> Save lock state returned by {@link #getState}.
             * <li> Invoke {@link #release} with saved state as argument,
             *      throwing IllegalMonitorStateException if it fails.
             * <li> Block until signalled.
             * <li> Reacquire by invoking specialized version of
             *      {@link #acquire} with saved state as argument.
             * </ol>
             */
            public final void awaitUninterruptibly() {
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                boolean interrupted = false;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if (Thread.interrupted())
                        interrupted = true;
                }
                if (acquireQueued(node, savedState) || interrupted)
                    selfInterrupt();
            }
    
            /*
             * For interruptible waits, we need to track whether to throw
             * InterruptedException, if interrupted while blocked on
             * condition, versus reinterrupt current thread, if
             * interrupted while blocked waiting to re-acquire.
             */
    
            /** Mode meaning to reinterrupt on exit from wait */
            private static final int REINTERRUPT =  1;
            /** Mode meaning to throw InterruptedException on exit from wait */
            private static final int THROW_IE    = -1;
    
            /**
             * Checks for interrupt, returning THROW_IE if interrupted
             * before signalled, REINTERRUPT if after signalled, or
             * 0 if not interrupted.
             */
            private int checkInterruptWhileWaiting(Node node) {
                return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
            }
    
            /**
             * Throws InterruptedException, reinterrupts current thread, or
             * does nothing, depending on mode.
             */
            private void reportInterruptAfterWait(int interruptMode)
                throws InterruptedException {
                if (interruptMode == THROW_IE)
                    throw new InterruptedException();
                else if (interruptMode == REINTERRUPT)
                    selfInterrupt();
            }
    
            /**
             * Implements interruptible condition wait.
             * <ol>
             * <li> If current thread is interrupted, throw InterruptedException.
             * <li> Save lock state returned by {@link #getState}.
             * <li> Invoke {@link #release} with saved state as argument,
             *      throwing IllegalMonitorStateException if it fails.
             * <li> Block until signalled or interrupted.
             * <li> Reacquire by invoking specialized version of
             *      {@link #acquire} with saved state as argument.
             * <li> If interrupted while blocked in step 4, throw InterruptedException.
             * </ol>
             */
            public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    
            /**
             * Implements timed condition wait.
             * <ol>
             * <li> If current thread is interrupted, throw InterruptedException.
             * <li> Save lock state returned by {@link #getState}.
             * <li> Invoke {@link #release} with saved state as argument,
             *      throwing IllegalMonitorStateException if it fails.
             * <li> Block until signalled, interrupted, or timed out.
             * <li> Reacquire by invoking specialized version of
             *      {@link #acquire} with saved state as argument.
             * <li> If interrupted while blocked in step 4, throw InterruptedException.
             * </ol>
             */
            public final long awaitNanos(long nanosTimeout)
                    throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                final long deadline = System.nanoTime() + nanosTimeout;
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    if (nanosTimeout <= 0L) {
                        transferAfterCancelledWait(node);
                        break;
                    }
                    if (nanosTimeout >= spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                    nanosTimeout = deadline - System.nanoTime();
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
                return deadline - System.nanoTime();
            }
    
            /**
             * Implements absolute timed condition wait.
             * <ol>
             * <li> If current thread is interrupted, throw InterruptedException.
             * <li> Save lock state returned by {@link #getState}.
             * <li> Invoke {@link #release} with saved state as argument,
             *      throwing IllegalMonitorStateException if it fails.
             * <li> Block until signalled, interrupted, or timed out.
             * <li> Reacquire by invoking specialized version of
             *      {@link #acquire} with saved state as argument.
             * <li> If interrupted while blocked in step 4, throw InterruptedException.
             * <li> If timed out while blocked in step 4, return false, else true.
             * </ol>
             */
            public final boolean awaitUntil(Date deadline)
                    throws InterruptedException {
                long abstime = deadline.getTime();
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                boolean timedout = false;
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    if (System.currentTimeMillis() > abstime) {
                        timedout = transferAfterCancelledWait(node);
                        break;
                    }
                    LockSupport.parkUntil(this, abstime);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
                return !timedout;
            }
    
            /**
             * Implements timed condition wait.
             * <ol>
             * <li> If current thread is interrupted, throw InterruptedException.
             * <li> Save lock state returned by {@link #getState}.
             * <li> Invoke {@link #release} with saved state as argument,
             *      throwing IllegalMonitorStateException if it fails.
             * <li> Block until signalled, interrupted, or timed out.
             * <li> Reacquire by invoking specialized version of
             *      {@link #acquire} with saved state as argument.
             * <li> If interrupted while blocked in step 4, throw InterruptedException.
             * <li> If timed out while blocked in step 4, return false, else true.
             * </ol>
             */
            public final boolean await(long time, TimeUnit unit)
                    throws InterruptedException {
                long nanosTimeout = unit.toNanos(time);
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                final long deadline = System.nanoTime() + nanosTimeout;
                boolean timedout = false;
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    if (nanosTimeout <= 0L) {
                        timedout = transferAfterCancelledWait(node);
                        break;
                    }
                    if (nanosTimeout >= spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                    nanosTimeout = deadline - System.nanoTime();
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
                return !timedout;
            }
    
            //  support for instrumentation
    
            /**
             * Returns true if this condition was created by the given
             * synchronization object.
             *
             * @return {@code true} if owned
             */
            final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
                return sync == AbstractQueuedSynchronizer.this;
            }
    
            /**
             * Queries whether any threads are waiting on this condition.
             * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
             *
             * @return {@code true} if there are any waiting threads
             * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
             *         returns {@code false}
             */
            protected final boolean hasWaiters() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                    if (w.waitStatus == Node.CONDITION)
                        return true;
                }
                return false;
            }
    
            /**
             * Returns an estimate of the number of threads waiting on
             * this condition.
             * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
             *
             * @return the estimated number of waiting threads
             * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
             *         returns {@code false}
             */
            protected final int getWaitQueueLength() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                int n = 0;
                for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                    if (w.waitStatus == Node.CONDITION)
                        ++n;
                }
                return n;
            }
    
            /**
             * Returns a collection containing those threads that may be
             * waiting on this Condition.
             * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
             *
             * @return the collection of threads
             * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
             *         returns {@code false}
             */
            protected final Collection<Thread> getWaitingThreads() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                ArrayList<Thread> list = new ArrayList<Thread>();
                for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                    if (w.waitStatus == Node.CONDITION) {
                        Thread t = w.thread;
                        if (t != null)
                            list.add(t);
                    }
                }
                return list;
            }
        }
    View Code

    下面我们就来分析下Condition的工作流程。

    一、await 在条件变量上等待

    分别是Condition队列的头结点和尾节点。Condition在调用await方法之前,必须先获取锁,注意,这个锁必须是一个独占锁。我们先来看一下await中用到的几个方法:

    addConditionWaiter:

            private Node addConditionWaiter() {
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }

    顾名思义,此方法在Condition队列中添加一个等待线程。首先,方法先检查一下队列尾节点是否还在等待Condition(如果被signal或者中断,waitStatus会被修改为0或者CANCELLED)。如果尾节点被取消或者中断,调用unlinkCancelledWaiters方法删除Condition队列中被cancel的节点。然后将当前线程封装在一个Node中,添加到Condition队列的尾部。这里由于我们在操纵Condition队列的时候已经获取了一个独占锁,因此不会发生竞争。

    值得注意的是,Condition队列与Sync队列(锁等待队列)有几点不同:①Condition队列是一个单向链表,而Sync队列是一个双向链表;②Sync队列在初始化的时候,会在队列头部添加一个空的dummy节点,它不持有任何线程,而Condition队列初始化时,头结点就开始持有等待线程了。

    我们有必要在这里提一下Node对象中的nextWaiter成员、SHARED成员和EXCLUSIVE成员:

            /** Marker to indicate a node is waiting in shared mode */
            static final Node SHARED = new Node();
            /** Marker to indicate a node is waiting in exclusive mode */
            static final Node EXCLUSIVE = null;
    
            Node nextWaiter;

    nextWaiter在共享模式下,被设置为SHARED,SHARED为一个final的空节点,用来表示当前模式是共享模式;默认情况下nextWaiter是null,EXCLUSIVE成员是一个final的null,因此默认模式是独占模式。在Condition队列中nextWaiter被用来指向队列里的下一个等待线程。在一个线程从Condition队列中被移除之后,nextWaiter被设置为空(EXCLUSIVE)。这再次表明:Condition必须被绑定在一个独占锁上使用。

    我们来看一下unlinkCancelledWaiters方法:

            private void unlinkCancelledWaiters() {
                Node t = firstWaiter;
                Node trail = null;
                while (t != null) {
                    Node next = t.nextWaiter;
                    if (t.waitStatus != Node.CONDITION) {
                        t.nextWaiter = null;
                        if (trail == null)
                            firstWaiter = next;
                        else
                            trail.nextWaiter = next;
                        if (next == null)
                            lastWaiter = trail;
                    }
                    else
                        trail = t;
                    t = next;
                }
            }

    unlinkCancelledWaiters方法很简单,从头到尾遍历Condition队列,移除被cancel或被中断的节点。由于这里我们在操纵Condition队列的时候已经获取了所绑定的独占锁,因此不用担心竞争的发生。

    我们再来看一下fullyRelease方法,这个方法用来释放锁:

        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }

    方法首先获取了state的值,这个值表示可锁被“重入”深度,并调用release释放全部的重入获取,如果成功,返货这个深度,如果失败,要将当前线程的waitStatus设置为CANCELLED。

    我们再来看一下isOnSyncQueue方法,这个方法返节点是否在Sync队列中等待锁:

        final boolean isOnSyncQueue(Node node) {
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            if (node.next != null) // If has successor, it must be on queue
                return true;
            /*
             * node.prev can be non-null, but not yet on queue because
             * the CAS to place it on queue can fail. So we have to
             * traverse from tail to make sure it actually made it.  It
             * will always be near the tail in calls to this method, and
             * unless the CAS failed (which is unlikely), it will be
             * there, so we hardly ever traverse much.
             */
            return findNodeFromTail(node);
        }

    node从Condition队列移除的第一步,就是设置waitStatus为其他值,因此是否等于Node.CONDITON可以作为判断标志,如果等于,说明还在Condition队列中,即不再Sync队列里。在node被放入Sync队列时,第一步就是设置node的prev为当前获取到的尾节点,所以如果发现node的prev为null的话,可以确定node尚未被加入Sync队列。

    相似的,node被放入Sync队列的最后一步是设置node的next,如果发现node的next不为null,说明已经完成了放入Sync队列的过程,因此可以返回true。

    当我们执行完两个if而仍未返回时,node的prev一定不为null,next一定为null,这个时候可以认为node正处于放入Sync队列的执行CAS操作执行过程中。而这个CAS操作有可能失败,因此我们再给node一次机会,调用findNodeFromTail来检测:

        private boolean findNodeFromTail(Node node) {
            Node t = tail;
            for (;;) {
                if (t == node)
                    return true;
                if (t == null)
                    return false;
                t = t.prev;
            }
        }

    findNodeFromTail方法从尾部遍历Sync队列,如果检查node是否在队列中,如果还不在,此时node也许在CAS自旋中,在不久的将来可能会进到Sync队列里。但我们已经等不了了,直接放回false。

    我们再来看一下checkInterruptWhileWaiting方法:

            /** Mode meaning to reinterrupt on exit from wait */
            private static final int REINTERRUPT =  1;
            /** Mode meaning to throw InterruptedException on exit from wait */
            private static final int THROW_IE    = -1;
    
            /**
             * Checks for interrupt, returning THROW_IE if interrupted
             * before signalled, REINTERRUPT if after signalled, or
             * 0 if not interrupted.
             */
            private int checkInterruptWhileWaiting(Node node) {
                return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
            }

    此方法在线程从park中醒来后调用,它的返回值有三种:0代表在park过程中没有发生中断;THORW_IE(1)代表发生了中断,且在后续我们需要抛出中断异常;REINTERRUPT表示发生了中断,但在后续我们不抛出中断异常,而是“补上”这次中断。当没有发生中断时,我们返回0即可,当中断发生时,返回THROW_IE or REINTERRUPT由transferAfterCancelledWait方法判断:

        final boolean transferAfterCancelledWait(Node node) {
            if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
                enq(node);
                return true;
            }
            /*
             * If we lost out to a signal(), then we can't proceed
             * until it finishes its enq().  Cancelling during an
             * incomplete transfer is both rare and transient, so just
             * spin.
             */
            while (!isOnSyncQueue(node))
                Thread.yield();
            return false;
        }

    transferAfterCancelledWait方法并不在ConditionObject中定义,而是由AQS提供。这个方法根据是否中断发生时,是否有signal操作来“掺和”来返回结果。方法调用CAS操作将node的waitStatus从CONDITION设置为0,如果成功,说明当中断发生时,说明没有signal发生(signal的第一步是将node的waitStatus设置为0),在调用enq将线程放入Sync队列后直接返回true,表示中断先于signal发生,即中断在await等待过程中发生,根据await的语义,在遇到中断时需要抛出中断异常,返回true告诉上层方法返回THROW_IT,后续会根据这个返回值做抛出中断异常的处理。

    如果CAS操作失败,是否说明中断后于signal发生呢?只能说这时候我们不能确定中断和signal到底谁先发生,只是在我们做CAS操作的时候,他们俩已经都发生了(中断->interrupted检测->signal->CAS,或者signal->中断->interrupted检测->CAS都有可能),这时候我们无法判断到底顺序是怎样,这里的处理是不管怎样都返回false告诉上层方法返回REINTERRUPT,当做是signal先发生(线程被signal唤醒)来处理,后续根据这个返回值做“补上”中断的处理。在返回false之前,我们要先做一下等待,直到当前线程被成功放入Sync锁等待队列。

    因此,我们可以这样总结:transferAfterCancelledWait的返回值表示了线程是否因为中断从park中唤醒。

    至此,我们终于可以正式来看await方法了:

            public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }

    await方法是及时响应中断的。它首先检查了一下中断标志。然后调用addConditionWaiter将当前线程放入Condition队列的尾,并顺手清理了一下队列里的无用节点。紧接着调用fullyRelease方法释放当前线程持有的锁。然后是一个while循环,这个循环会循环检测线程的状态,直到线程被signal或者中断唤醒被放入Sync锁等待队列。如果中断发生的话,还需要调用checkInterruptWhileWaiting方法,根据中断发生的时机确定后去处理这次中断的方式,如果发生中断,退出while循环。

    退出while循环后,我们调用acquireQueued方法来获取锁,注意,acquireQueued方法的返回值表示在等待获取锁的过程中是否发生中断,如果发生中断 原来没有需要做抛出处理的中断发生时,我们将后续处理方式设置为REINTERRUPT(如果原来在await状态有中断发生,即interrruptMode==THROW_IE,依然保持THROW_IE)。

    如果是应为中断从park中唤醒(interruptMode==THROT_IE),当前线程仍在Condition队列中,但waitStatus已经变成0了,这里在调用unlinkCancelledWaiters做一次清理。

    最后,根据interruptMode的值,调用reportInterruptAfterWait做出相应处理:

            private void reportInterruptAfterWait(int interruptMode)
                throws InterruptedException {
                if (interruptMode == THROW_IE)
                    throw new InterruptedException();
                else if (interruptMode == REINTERRUPT)
                    selfInterrupt();
            }

    如果interruptMod==0,donothing,如果是THROW_IE,说明在await状态下发生中断,抛出中断异常,如果是REINTERRUPT,说明是signal“掺和”了中断,我们无法分辨具体的先后顺序,于是统一按照先signal再中断来处理,即成功获取锁之后要调用selfInterrupt“补上”这次中断。

    二、awaitNanos 限时的在条件变量上等待

            public final long awaitNanos(long nanosTimeout)
                    throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                final long deadline = System.nanoTime() + nanosTimeout;
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    if (nanosTimeout <= 0L) {
                        transferAfterCancelledWait(node);
                        break;
                    }
                    if (nanosTimeout >= spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                    nanosTimeout = deadline - System.nanoTime();
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
                return deadline - System.nanoTime();
            }

    awaitNanos方法与await方法大致相同,区别在于每次park是定时的,当被唤醒时,比较一下剩余等待时间Timeout与spinForTimeoutThreshold阈值的大小,如果小于,将不再park,spinForTimeoutThreshold阈值的作用在笔者的另一篇博文Semaphore源码学习笔记中已经分析过,作用是提高短时长的等待的相应效率。

    注意:当已经到达等待的deadline时,调用transferAfterCancelledWait方法,注意,此时可能发生中断(上次调用checkInterruptWhileWaiting之后被中断),再次的,我们无法判断这次中断与到时这两个的先后顺序,我们在这里的处理方式是直接忽略这次中断,统一认为是先到时后中断(体现在没有记录transferAfterCancelledWait方法的返回值),但在transferAfterCancelledWait方法中的处理是考虑了被中断的情况的,只不过这个中断标志位没有检测,留给后续来处理了。这个中断标志将会在调用acquireQueued方法并成功获取锁之后被检测并返回,最终影响interruptMode的值,并在reportInterruptAfterWait方法中被处理。可见,这次中断最终没有被遗漏,只是我们先处理的signal,回过头来再去处理它。

    最后方法的返回值是拍唤醒后的剩余等待时间,这个时间可能小于0。

    await(long time, TimeUnit unit)方法与awaitNanos方法十分类似,不再赘述。

    三、awaitUtil 指定结束时刻的在条件变量上等待

            public final boolean awaitUntil(Date deadline)
                    throws InterruptedException {
                long abstime = deadline.getTime();
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                boolean timedout = false;
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    if (System.currentTimeMillis() > abstime) {
                        timedout = transferAfterCancelledWait(node);
                        break;
                    }
                    LockSupport.parkUntil(this, abstime);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
                return !timedout;
            }

    awaitUtil方法在原理上与awaitNanos方法是也十分相似,只不过park操作调用的是LockSupportparkUtil方法,且没有spinForTimeoutThreshold阈值的应用。在返回值上也有些许差别:返回值timedout记录了transferAfterCancelledWait方法的返回值——线程是否因为中断从park中唤醒,如果是的话,表示还没有到等待的deadline。

    四、signal 唤醒Condition队列的头节点持有的线程

            public final void signal() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }

    调用signal之前也需要获取锁,因此signal方法首先检测了一下当前线程是否获取了独占锁。然后调用doSignal唤醒队列中第一个等待线程。注意,这里的“唤醒”意思是将线程从Condition队列移到Sync队列,表示已经完成Condition的等待,具有了去竞争锁的资格。至此,我们可以发现,由于await会直接把线程放入Condition等待队列的尾部,因此Condition是公平的,即按照入列的顺序来signal。

            private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }

    doSignal方法先将first节点从队列中摘下,然后调用transferForSignal去改变first节点的waitStatus(所谓唤醒线程),这个方法有可能失败,因为等待线程可能已经到时或者被中断,因此while循环这个操作直到成功唤醒或队列为空。我们来看下transferForSignal方法:

        final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }

    这个方法并不在ConditionObject中定义,而是由AQS提供。方法首先调用CAS操作修改node的waitStatus,如果失败,表示线程已经放弃等待(到时或被中断),直接返回false。如果成功,调用enq方法将它放入Sync锁等待队列,返回值p是node在Sync队列中的前驱节点。紧接着检测一下前驱p的waitStatus,如果发现不为SIGNAL,需要将node持有的线程(注意不是当前线程)unpark,这里必须搞清楚,node线程是在哪里park的,显然,他还在await方法的那个while循环里。unpark之后,node线程将会从while循环中退出,然后去调用acquireQueued方法,这个方法是一个自旋,弄得线程会在自旋过程中清除已经为CANCELLED状态的前驱,然后注册前驱节点的waitStatus为SIGNAL。

    至此,signal方法已经完成了所有该做的,“唤醒”的线程已经成功加入Sync队列并已经参与锁的竞争了,返回true。

    五、signalAll 唤醒Condition队列的所有等待线程

            public final void signalAll() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignalAll(first);
            }

    signalAll方法同样先检测是否持有独占锁,然后对奥用doSignalAll方法:

            private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                    first = next;
                } while (first != null);
            }

    doSignalAll方法循环调用transferForSignal方法“唤醒”队列的头结点,直到队列为空。

    总结:ConditionObject由AQS提供,它实现了类似wiat、notify和notifyAll类似的功能。Condition必须与一个独占锁绑定使用,在await或signal之前必须现持有独占锁。Condition队列是一个单向链表,他是公平的,按照先进先出的顺序从队列中被“唤醒”,所谓唤醒指的是完成Condition对象上的等待,被移到Sync锁等待队列中,有参与竞争锁的资格(Sync队列有公平&非公平两种模式,注意区别)。


    作者:开方乘十

    出处:http://www.cnblogs.com/go2sea/

    本文版权归作者开方乘十和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接,否则保留追究法律责任的权利。

    如有不正之处,欢迎邮件(hailong.ma@qq.com)指正,谢谢。

  • 相关阅读:
    nodejs入门学习笔记一——一个完整的http路由服务实现
    安装centos虚拟机
    test
    sql单列合并
    linux系统的安全小知识
    建造者模式组装mybatis参数Example()
    Https网站搭建——通过https://localhost:8443访问tomcat首页
    妈妈再也不用担心我找不到spring源码了!
    Webpack --- 模块打包器
    CSS3弹性盒---怪异盒
  • 原文地址:https://www.cnblogs.com/go2sea/p/5630355.html
Copyright © 2011-2022 走看看