zoukankan      html  css  js  c++  java
  • java并发

    AQS之ConditionObject

    一丶Condition

       Condition(java.util.concurrent.locks.Condition) 分解 Object monitor方法( wait(), notify(), notifyAll() ) 进去不同的对象,  通过配合Lock的实现使用,  达到每个对象有个wait-set的效果.

         Lock可用于替换synchronized关键字的使用, Condition可用于替换Object monitor 方法的使用.

      多个Condition提供一种手段使得一个线程可以挂起, 直到某些状态条件成立时被另一个线程唤醒. 由于这些状态条件可以被不同线程访问, 它们必须被保护, 因此锁会以某种形式关联这些状态条件在使用Condition前, 必须先使用Lock获取锁,  获得锁后, 如果Condition不满足, 则调用Condition.wait()方法等待, 该方法会先释放获得的锁, 然后挂起当前线程, 直到condtion满足被通知唤醒,  Condition.wait()方法就像Object.wati()方法.

      Condition实例会被绑定到一个Lock实例上, Condition实例只能通过Lock.newCondition()方法获取

      点此查看Condition使用例子

      -- 以上文字翻译于Condition官方文档, 蕴含了本人的理解, 若要更原汁原味的理解, 可查阅源码注释

     

     

    二丶ConditionObject

       ConditionObject实现了Condition接口, 是AQS中的内部类

      2.1) ConditionObject包含了头指针和尾指针, 内部维护了一个等待队列

            /** First node of condition queue. */
            private transient Node firstWaiter;
            /** Last node of condition queue. */
            private transient Node lastWaiter;

      2.2) Condition#await()实现  (阻塞等待对应条件出现)

            /**
             * Implements interruptible condition wait.
             * <ol>
             * <li> If current thread is interrupted, throw InterruptedException.
             * <li> Save lock state returned by {@link #getState}.
             * <li> Invoke {@link #release} with saved state as argument,
             *      throwing IllegalMonitorStateException if it fails.
             * <li> Block until signalled or interrupted.
             * <li> Reacquire by invoking specialized version of
             *      {@link #acquire} with saved state as argument.
             * <li> If interrupted while blocked in step 4, throw InterruptedException.
             * </ol>
             */
            public final void await() throws InterruptedException {
                if (Thread.interrupted())   // 检查是否被中断
                    throw new InterruptedException(); 
                Node node = addConditionWaiter();  // 将本线程添加进ConditionObject等待队列
                int savedState = fullyRelease(node); // 安全使用本方法必先获得锁, 由于已获得锁, 释放锁
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) { // 如果当前线程不在AQS同步队列中, 则阻塞当前线程
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //如果在AQS同步队列中,则尝试获取锁, 获取失败则阻塞当前线程
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }

      2.3) Condition#signal() 实现  (对应条件已满足, 发信号唤醒线程)

           /**
             * Moves the longest-waiting thread, if one exists, from the
             * wait queue for this condition to the wait queue for the
             * owning lock.
             *
             * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
             *         returns {@code false}
             */
            public final void signal() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
            /**
             * Removes and transfers nodes until hit non-cancelled one or
             * null. Split out from signal in part to encourage compilers
             * to inline the case of no waiters.
             * @param first (non-null) the first node on condition queue
             */
            private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&    //将在ConditionObject等待队列中的节点转移到AQS同步队列中
                         (first = firstWaiter) != null);
            }
        /**
         * Transfers a node from a condition queue onto sync queue.
         * Returns true if successful.
         * @param node the node
         * @return true if successfully transferred (else the node was
         * cancelled before signal)
         */
        final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            Node p = enq(node);  //进入AQS队列
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }

       关于线程节点的等待转移过程可以参考此博文图

     

     

    学习资料:

      <java高并发程序设计>

      ConditionObject分析

     

     

    人生没有彩排,每一天都是现场直播
  • 相关阅读:
    1822. Sign of the Product of an Array
    1828. Queries on Number of Points Inside a Circle
    1480. Running Sum of 1d Array
    C++字符串
    Git&GitHb学习记录
    54. Spiral Matrix
    104. Maximum Depth of Binary Tree
    110. Balanced Binary Tree
    136. Single Number
    19、泛型入门
  • 原文地址:https://www.cnblogs.com/timfruit/p/10990790.html
Copyright © 2011-2022 走看看