AQS之ConditionObject
一丶Condition
Condition(java.util.concurrent.locks.Condition) 分解 Object monitor方法( wait(), notify(), notifyAll() ) 进去不同的对象, 通过配合Lock的实现使用, 达到每个对象有个wait-set的效果.
Lock可用于替换synchronized关键字的使用, Condition可用于替换Object monitor 方法的使用.
多个Condition提供一种手段使得一个线程可以挂起, 直到某些状态条件成立时被另一个线程唤醒. 由于这些状态条件可以被不同线程访问, 它们必须被保护, 因此锁会以某种形式关联这些状态条件. 在使用Condition前, 必须先使用Lock获取锁, 获得锁后, 如果Condition不满足, 则调用Condition.wait()方法等待, 该方法会先释放获得的锁, 然后挂起当前线程, 直到condtion满足被通知唤醒, Condition.wait()方法就像Object.wati()方法.
Condition实例会被绑定到一个Lock实例上, Condition实例只能通过Lock.newCondition()方法获取
-- 以上文字翻译于Condition官方文档, 蕴含了本人的理解, 若要更原汁原味的理解, 可查阅源码注释
二丶ConditionObject
ConditionObject实现了Condition接口, 是AQS中的内部类
2.1) ConditionObject包含了头指针和尾指针, 内部维护了一个等待队列
/** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter;
2.2) Condition#await()实现 (阻塞等待对应条件出现)
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) // 检查是否被中断 throw new InterruptedException(); Node node = addConditionWaiter(); // 将本线程添加进ConditionObject等待队列 int savedState = fullyRelease(node); // 安全使用本方法必先获得锁, 由于已获得锁, 释放锁 int interruptMode = 0; while (!isOnSyncQueue(node)) { // 如果当前线程不在AQS同步队列中, 则阻塞当前线程 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //如果在AQS同步队列中,则尝试获取锁, 获取失败则阻塞当前线程 interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
2.3) Condition#signal() 实现 (对应条件已满足, 发信号唤醒线程)
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && //将在ConditionObject等待队列中的节点转移到AQS同步队列中 (first = firstWaiter) != null); }
/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); //进入AQS队列 int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
关于线程节点的等待转移过程可以参考此博文图
学习资料:
<java高并发程序设计>