zoukankan      html  css  js  c++  java
  • Condition的await()和signal()流程

    介绍

    Conditionj.u.c包下提供的一个接口。
    可以翻译成 条件对象,其作用是线程先等待,当外部满足某一条件时,在通过条件对象唤醒等待的线程。ArrayBlockingQueue就是通过Condition实现的。

    先看一下Condition接口提供了哪些方法:

    /**
     *  条件对象
     */
    public interface Condition {
    
        /**
         * 让线程进入等待,如果其他线程调用同一Condition对象的notify/notifyAll,那么等待的线程可能被唤醒
         */
        void await() throws InterruptedException;
    
        /**
         * 不抛出中断异常的await方法
         */
        void awaitUninterruptibly();
    
        /**
         * 带超时的await
         */
        long awaitNanos(long nanosTimeout) throws InterruptedException;
    
        /**
         * 带超时的await(可指定时间单位)
         */
        boolean await(long time, TimeUnit unit) throws InterruptedException;
    
        /**
         * 带超时的await(指定截止时间)
         */
        boolean awaitUntil(Date deadline) throws InterruptedException;
    
        /**
         * 唤醒等待的线程
         */
        void signal();
    
        /**
         * 唤醒所有线程
         */
        void signalAll();
    }
    

    Condition接口主要提供了两类方法——让线程等待的方法(await()等)和唤醒线程的方法(signal())。

    AQS内部提供了Condition接口的实现——ConditionalObject。它内部的字段如下:

        private static final long serialVersionUID = 1173984872572414699L;
        //该ConditionObject维护的等待队列的头节点
        private transient Node firstWaiter;
        //该ConditionObject维护的等待队列的尾节点
        private transient Node lastWaiter;
    

    非常简单,从上面的字段我们大概可以猜到Condition内部也维护了一个队列。
    上篇文章中,我们已经分析锁实现的远离就是通过节点构成队列:让队列中除头节点外的其他线程都被Park,当头节点释放锁时,头节点唤醒下一个节点(Unpark线程),同时更新头节点。
    举一反三,我们推测Condition唤醒功能的原理也是通过维护队列的节点。
    接下来就通过分析源码,(主要是await()signal()方法),验证我们的猜测。


    Condtion对象的获取

    Condition对象的获取主要是通过Lock.newCondition()方法。
    一个Lock对象可以返回多个Condition对象。
    在对Condition进行等待或者唤醒前,都需要先持有Condition关联Lock对象,否则会抛出IllegalMonitorStateException异常。


    Condition.await()过程

    public final void await() throws InterruptedException {
                //如果线程已经被标记为中断,则抛出异常
                if (Thread.interrupted())
                    throw new InterruptedException();
                //将线程添加进等待队列
                //注意等待队列和AQS维护的阻塞队列是两个不同的队列
                //正常流程当线程能调用await(),说明线程此时拥有锁,此时AQS的阻塞队列中,线程应该在head节点
                Node node = addConditionWaiter();
    
                //释放掉锁(如果释放失败,NODE的waitStatus被更新为CANCELLED)
                //同时因为释放掉了锁,该线程在阻塞队列中的节点也已经被移除
                int savedState = fullyRelease(node);
    
                //这里会将线程挂起,除非线程节点被移到AQS的阻塞队列或是线程被外部中断
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    //检查是否是由于被中断而唤醒,如果是,则跳出循环
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                //在阻塞队列中尝试获取锁
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                //节点已经在阻塞队列中,与Condition的等待队列联系断开
                //对于SIGNAL唤醒的线程而言,SIGNAL时除了将节点移到阻塞队列,同时也清空了node.nextWaiter
                //而对于中断唤醒的线程而言,只是将节点移到阻塞队列,并没有清空node.nextWaiter(因为此时线程不持有,操作等待线程并非线程安全)
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                //根据interruptMode 决定是否需要抛出异常
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    
    

    await()方法主要可以分为以下几个过程:
    1)线程将自己包装成节点,并添加到Condition的阻塞队列中
    2)线程主动释放掉锁
    3)线程进入自循环等待(主动通过LockSupport.park()),醒来时,检查自己是否已经被移动至Lock的阻塞队列
    4)线程在阻塞队列中等待,直到获取锁(线程在等待时可能又会被LockSupport.park()挂起)
    5)线程获取锁,检查自己在等待过程中(await()过程)是否有被中断
    6)如果有需要,则清理节点与等待队列之间的联系
    7)根据中断状态确定是否需要抛出异常,以便让await()的调用者可以响应线程的中断状态

    从上面的流程,我们可以清楚的了解到以上步骤1和2,以及步骤5,6,7是持有锁的,步骤3和4并没有持有锁。了解这一点很重要,因为涉及某些方法是否需要以CAS来保证线程安全。
    了解了大体流程,接下来就逐步分析各个步骤。

    步骤1.线程包装成节点,添加进Condition的等待队列

    这一步骤主要是addConditionWaiter()过程

        private Node addConditionWaiter() {
                Node t = lastWaiter;
    
                // 找出该ConditionObject的等待队列中 真正未被取消的最后一个节点,并更新为lastWaiter
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
    
                //将该线程包装成Node
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                //如果此时ConditionObject队列为空,初始化链表且头节点为node
                if (t == null)
                    firstWaiter = node;
                else //否则将node添加队尾
                    t.nextWaiter = node;
                //更新链表尾节点为node
                lastWaiter = node;
                return node;
            }
    
    

    主要找出等待队列的最后一个节点,将线程包装成Node,添加到队列的队尾。
    这里要注意的一点是此时Node的waitStatusCONDITION。节点的waitStatus对判断等待是否被取消很重要,在等待队列中等待的节点状态应该为CONDITION,如果状态不为CONDITION,说明线程已经取消了等待(如果waitStatus为0说明被唤醒或中断)。

    步骤2.线程释放锁

    释放锁的步骤比较简单。主要通过fullRelease()更新AQSstate为0并且将AQS的拥有者置为null,同时唤醒阻塞队列中的后继节点。

    final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                //获取锁被持有的此时
                int savedState = getState();
                //让锁直接释放被持有的次数
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                //如果释放失败了,则将节点标记为CANCELLED
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }
    

    步骤3.线程挂起,进入循环等待

    这一步比较关键,线程等待的动作都发生在这一步。

        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                //检查是否是由于被中断而唤醒,如果是,则跳出循环
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
        }
    

    线程执行到这,会被LockSupport.park()挂起。
    如果此时线程被唤醒,线程会检查是否是因为中断,如果发生中断,还需要确定中断是否发生在SIGNAL前(如果发生在SIGNAL前,之后线程需要抛出异常,让外部响应)。
    如果线程不是因为中断而唤醒,线程需要确认节点是否已经被移动至AQS的等待队列。如果没有被移动,则继续被挂起(防止假唤醒)。

    checkInterruptWhileWaiting()就是用来检测线程在等待的时候是否被中断。

    /**
             * 检查线程在WAITING状态期间,是否有被中断
             * 如果没有返回0;如果是在SIGNAL之前被中断,返回 THROW_IE;如果在SIGNAL之后被中断,则返回REINTERRUPT
             */
            private int checkInterruptWhileWaiting(Node node) {
                return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
            }
    

    如果线程被中断,还需要通过transferAferCancelledWait判断中断是否发生在SIGNAL之前。

    final boolean transferAfterCancelledWait(Node node) {
            //CAS操作,期待值是CONDITION,说明此时唤醒是被取消(中断),因为如果是SIGNAL,那么waitingStatus不会CONDITION,而是0(可以见SIGNAL流程)
            if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
                //即使是取消的,也需要移到AQS的阻塞队列
                enq(node);
                return true;
            }
    
            //说明线程先收到了SIGNAL信号
            //此时要等SIGNAL信号处理完成
            while (!isOnSyncQueue(node))
                Thread.yield();
            return false;
        }
    

    前文说了Node节点的waitStatus是一个很重要的状态,从它可以推断出线程节点发生了什么操作。
    上述方法先用CAS尝试将更新节点的waitStatus为0,期待值为CONDITION
    如果尝试成功,说明此时节点未被操作过(SIGNAL信号),线程是中断唤醒的,此时需要通过enq()将节点添加AQS阻塞队列,因为此时没有锁,所以enq()方法以CAS重试的方式保证线程安全。
    如果尝试失败,说明线程收到了SIGNAL信号,节点将由负责SIGNAL的线程移动至阻塞队列。这里为了避免线程返回过早,在判断出线程还未移动至阻塞队列的情况下,会通过Thread.yeild()让出CPU时间。

    步骤4.线程在阻塞队列中重新等待锁

    这一步主要是通过acquireQueue()方法。该方法已经在上一篇文章中介绍过了,这里不过多介绍。
    需要注意的一点是,即使线程在等待时被中断,仍然需要在AQS的阻塞队列中等待获取锁。因为外部没有办法在线程获取锁之前发现中断状态,而且即使线程抛出了中断异常,此时线程也是持有锁的,外部需要显式的释放。

    步骤5.检查并设置中断状态

    这一步很简单,主要就是通过步骤3中的checkInterruptWhilewaiting()方法返回值:0表示未中断,-1表示中断发生在SIGNAl之前,1表示中断发生在SIGNAL之后。

    步骤6.清理节点与等待队列的联系

    这里有两种情况,如果线程是因为SIGNAL唤醒的,在唤醒时调用signal()的线程已经清理了被唤醒节点与等待队列的关系。 因为那时唤醒线程持有锁,操作是安全的。
    但是如果对于中断被唤醒的线程,唤醒时是不持有锁的,不能保证线程安全的清理唤醒节点与等待队列的关系。因此就将等待清理工作放在了获取锁之后。

    //此方法并不保证线程安全,因此调用此方法时,必须要在获取锁的情况下调用
            //此方法的目的是为了整理Condition的等待队列,将非CONDITION状态的节点从等待队列中移除
            private void unlinkCancelledWaiters() {
                Node t = firstWaiter;
                Node trail = null;
                while (t != null) {
                    Node next = t.nextWaiter;
                    if (t.waitStatus != Node.CONDITION) {
                        t.nextWaiter = null;
                        if (trail == null)
                            firstWaiter = next;
                        else
                            trail.nextWaiter = next;
                        if (next == null)
                            lastWaiter = trail;
                    }
                    else
                        trail = t;
                    t = next;
                }
            }
    

    步骤7.确定中断状态,并决定是否抛出异常

    这一步也很简单,就是根据interruptMode来确定是否抛出异常,如果interruptMode值为THROW_IE,说明线程被唤醒前先被中断,此时需要抛出InterruptedException


    Condition.signal()过程

    signal()过程比起await()要简单很多。既然await()过程是将节点添加到等待队列,那么signal()作为await()的逆过程,就是将节点从等待队列重新移动到AQS阻塞队列。

            /**
             * 唤醒等待节点
             * 主要的流程就是将节点从Condition的等待队列移到AQS的阻塞队列中,让其重新等待锁的获取
             */
            public final void signal() {
                //先验证唤醒者是否是锁的持有者
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                //唤醒等待队列的第一个节点
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    

    为保证安全,先确定唤醒线程是否为锁的持有者。
    之后找出等待队列中的头节点,将其唤醒。

            /**
             * 唤醒操作(此时占有锁,为线程安全),
             * 找出等待队列中第一个真正要被唤醒的节点,移动到阻塞队列,
             */
            private void doSignal(Node first) {
                do {
                    //找出第一个需要唤醒的节点
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    //断开和之后等待队列的联系,并更新等待队列的头节点
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null); //等待队列的头节点移动失败 且 等待队列中还有其他节点,则继续尝试其他节点
            }
    

    因为头节点可能已经被取消,所以这里会一直在等待队列中从前往后找一个节点,开始唤醒。直到一个节点唤醒成功或者等待队列中没有节点需要唤醒。
    上文也说了唤醒过程其实就是节点的移动过程。

        /**
         * 将节点从Condition的等待队列移动到AQS的阻塞队列(该方法只会在signal相关的方法中被调用)
         */
        final boolean transferForSignal(Node node) {
            //CAS操作更新节点的waitStatus,期待值为CONDITION
            //操作成功,说明没有其他线程在操作这个节点
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) //CAS操作失败,说明节点已经被中断操作过,waitStatus已经变成了0
                return false;
    
            //这里说明CAS操作成功
            //应该是线程安全的
            //将节点添加到阻塞队列的队尾
            Node p = enq(node);
            //判断此时阻塞队列是否有前继节点等待,有就Park线程,等待前继节点唤醒
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    
    

    transferForSignal()操作其实就是先通过CAS操作确定等待队列中的节点还未被取消。如果CAS操作成功,则将其添加到阻塞队列,如果此时阻塞队列还需要等待锁,则让await()的线程继续被挂起。将给线程唤醒的任务(此时已经是获取锁的任务,不再是之前的await()任务)交给阻塞队列中的前继节点。

    await()以及signal()过程的流程图

    跟着流程图在走一遍,可以帮助巩固上述的知识点。
    Condition的await()和signal()过程

    总结

    再借上篇文章重口味的比方梳理下这个流程。
    当你排队进了WC的包厢,想要方便时,你觉得太脏了,于是你在包厢内留了张纸条,希望有人能在厕所包厢干净了在叫你过来上厕所(调用Condition的await ()),随后主动让出了包厢的使用权(释放锁)。后面在排队等着方便的人便进去了。
    你离开后,到另一个地方边玩手机边等(移动到Condition的阻塞队列,并被Park),期间,也有一些人同样觉得厕所太脏,跑了出来,在外面等,并排在了你的后面。
    过了一段时间,有人把打电话给你说厕所已经变干净了,可以去用了,你就重新回到厕所那排起了队伍,等待轮到你用厕所(acquireQueue)。因为那个人仅通知了你,没有通知其他因为嫌弃厕所脏,而跑出来的人,所以那些只能继续在那等别人去叫他们。
    如果你在等待的时候突然有人用其他方式把你喊了回去(外部未给signal却被中断),你又主动从在外面等待走到了厕所前去排队等待继续去用厕所,但是等你重新排到包厢时,发现测试还是很脏(Condition的 条件未满足),你就会抛出异常。

  • 相关阅读:
    LeetCode 25. Reverse Nodes in k-Group
    LeetCode 66. Plus One
    LeetCode 69. Sqrt(x)
    很认真的聊一聊程序员的自我修养
    LeetCode 24. Swap Nodes in Pairs
    unordered_map和map的区别
    子查询一定要注意,别忘记加TOP 1,不然就GG了,过了好久测试给我测出来了
    Tree 通过父id找所有子节点
    SqlSugar CURD
    什么是.NET Framwork
  • 原文地址:https://www.cnblogs.com/insaneXs/p/12219097.html
Copyright © 2011-2022 走看看