zoukankan      html  css  js  c++  java
  • Condition的await()和signal()流程

    介绍

    Conditionj.u.c包下提供的一个接口。
    可以翻译成 条件对象,其作用是线程先等待,当外部满足某一条件时,在通过条件对象唤醒等待的线程。ArrayBlockingQueue就是通过Condition实现的。

    先看一下Condition接口提供了哪些方法:

    /**
     *  条件对象
     */
    public interface Condition {
    
        /**
         * 让线程进入等待,如果其他线程调用同一Condition对象的notify/notifyAll,那么等待的线程可能被唤醒
         */
        void await() throws InterruptedException;
    
        /**
         * 不抛出中断异常的await方法
         */
        void awaitUninterruptibly();
    
        /**
         * 带超时的await
         */
        long awaitNanos(long nanosTimeout) throws InterruptedException;
    
        /**
         * 带超时的await(可指定时间单位)
         */
        boolean await(long time, TimeUnit unit) throws InterruptedException;
    
        /**
         * 带超时的await(指定截止时间)
         */
        boolean awaitUntil(Date deadline) throws InterruptedException;
    
        /**
         * 唤醒等待的线程
         */
        void signal();
    
        /**
         * 唤醒所有线程
         */
        void signalAll();
    }
    

    Condition接口主要提供了两类方法——让线程等待的方法(await()等)和唤醒线程的方法(signal())。

    AQS内部提供了Condition接口的实现——ConditionalObject。它内部的字段如下:

        private static final long serialVersionUID = 1173984872572414699L;
        //该ConditionObject维护的等待队列的头节点
        private transient Node firstWaiter;
        //该ConditionObject维护的等待队列的尾节点
        private transient Node lastWaiter;
    

    非常简单,从上面的字段我们大概可以猜到Condition内部也维护了一个队列。
    上篇文章中,我们已经分析锁实现的远离就是通过节点构成队列:让队列中除头节点外的其他线程都被Park,当头节点释放锁时,头节点唤醒下一个节点(Unpark线程),同时更新头节点。
    举一反三,我们推测Condition唤醒功能的原理也是通过维护队列的节点。
    接下来就通过分析源码,(主要是await()signal()方法),验证我们的猜测。


    Condtion对象的获取

    Condition对象的获取主要是通过Lock.newCondition()方法。
    一个Lock对象可以返回多个Condition对象。
    在对Condition进行等待或者唤醒前,都需要先持有Condition关联Lock对象,否则会抛出IllegalMonitorStateException异常。


    Condition.await()过程

    public final void await() throws InterruptedException {
                //如果线程已经被标记为中断,则抛出异常
                if (Thread.interrupted())
                    throw new InterruptedException();
                //将线程添加进等待队列
                //注意等待队列和AQS维护的阻塞队列是两个不同的队列
                //正常流程当线程能调用await(),说明线程此时拥有锁,此时AQS的阻塞队列中,线程应该在head节点
                Node node = addConditionWaiter();
    
                //释放掉锁(如果释放失败,NODE的waitStatus被更新为CANCELLED)
                //同时因为释放掉了锁,该线程在阻塞队列中的节点也已经被移除
                int savedState = fullyRelease(node);
    
                //这里会将线程挂起,除非线程节点被移到AQS的阻塞队列或是线程被外部中断
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    //检查是否是由于被中断而唤醒,如果是,则跳出循环
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                //在阻塞队列中尝试获取锁
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                //节点已经在阻塞队列中,与Condition的等待队列联系断开
                //对于SIGNAL唤醒的线程而言,SIGNAL时除了将节点移到阻塞队列,同时也清空了node.nextWaiter
                //而对于中断唤醒的线程而言,只是将节点移到阻塞队列,并没有清空node.nextWaiter(因为此时线程不持有,操作等待线程并非线程安全)
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                //根据interruptMode 决定是否需要抛出异常
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    
    

    await()方法主要可以分为以下几个过程:
    1)线程将自己包装成节点,并添加到Condition的阻塞队列中
    2)线程主动释放掉锁
    3)线程进入自循环等待(主动通过LockSupport.park()),醒来时,检查自己是否已经被移动至Lock的阻塞队列
    4)线程在阻塞队列中等待,直到获取锁(线程在等待时可能又会被LockSupport.park()挂起)
    5)线程获取锁,检查自己在等待过程中(await()过程)是否有被中断
    6)如果有需要,则清理节点与等待队列之间的联系
    7)根据中断状态确定是否需要抛出异常,以便让await()的调用者可以响应线程的中断状态

    从上面的流程,我们可以清楚的了解到以上步骤1和2,以及步骤5,6,7是持有锁的,步骤3和4并没有持有锁。了解这一点很重要,因为涉及某些方法是否需要以CAS来保证线程安全。
    了解了大体流程,接下来就逐步分析各个步骤。

    步骤1.线程包装成节点,添加进Condition的等待队列

    这一步骤主要是addConditionWaiter()过程

        private Node addConditionWaiter() {
                Node t = lastWaiter;
    
                // 找出该ConditionObject的等待队列中 真正未被取消的最后一个节点,并更新为lastWaiter
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
    
                //将该线程包装成Node
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                //如果此时ConditionObject队列为空,初始化链表且头节点为node
                if (t == null)
                    firstWaiter = node;
                else //否则将node添加队尾
                    t.nextWaiter = node;
                //更新链表尾节点为node
                lastWaiter = node;
                return node;
            }
    
    

    主要找出等待队列的最后一个节点,将线程包装成Node,添加到队列的队尾。
    这里要注意的一点是此时Node的waitStatusCONDITION。节点的waitStatus对判断等待是否被取消很重要,在等待队列中等待的节点状态应该为CONDITION,如果状态不为CONDITION,说明线程已经取消了等待(如果waitStatus为0说明被唤醒或中断)。

    步骤2.线程释放锁

    释放锁的步骤比较简单。主要通过fullRelease()更新AQSstate为0并且将AQS的拥有者置为null,同时唤醒阻塞队列中的后继节点。

    final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                //获取锁被持有的此时
                int savedState = getState();
                //让锁直接释放被持有的次数
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                //如果释放失败了,则将节点标记为CANCELLED
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }
    

    步骤3.线程挂起,进入循环等待

    这一步比较关键,线程等待的动作都发生在这一步。

        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                //检查是否是由于被中断而唤醒,如果是,则跳出循环
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
        }
    

    线程执行到这,会被LockSupport.park()挂起。
    如果此时线程被唤醒,线程会检查是否是因为中断,如果发生中断,还需要确定中断是否发生在SIGNAL前(如果发生在SIGNAL前,之后线程需要抛出异常,让外部响应)。
    如果线程不是因为中断而唤醒,线程需要确认节点是否已经被移动至AQS的等待队列。如果没有被移动,则继续被挂起(防止假唤醒)。

    checkInterruptWhileWaiting()就是用来检测线程在等待的时候是否被中断。

    /**
             * 检查线程在WAITING状态期间,是否有被中断
             * 如果没有返回0;如果是在SIGNAL之前被中断,返回 THROW_IE;如果在SIGNAL之后被中断,则返回REINTERRUPT
             */
            private int checkInterruptWhileWaiting(Node node) {
                return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
            }
    

    如果线程被中断,还需要通过transferAferCancelledWait判断中断是否发生在SIGNAL之前。

    final boolean transferAfterCancelledWait(Node node) {
            //CAS操作,期待值是CONDITION,说明此时唤醒是被取消(中断),因为如果是SIGNAL,那么waitingStatus不会CONDITION,而是0(可以见SIGNAL流程)
            if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
                //即使是取消的,也需要移到AQS的阻塞队列
                enq(node);
                return true;
            }
    
            //说明线程先收到了SIGNAL信号
            //此时要等SIGNAL信号处理完成
            while (!isOnSyncQueue(node))
                Thread.yield();
            return false;
        }
    

    前文说了Node节点的waitStatus是一个很重要的状态,从它可以推断出线程节点发生了什么操作。
    上述方法先用CAS尝试将更新节点的waitStatus为0,期待值为CONDITION
    如果尝试成功,说明此时节点未被操作过(SIGNAL信号),线程是中断唤醒的,此时需要通过enq()将节点添加AQS阻塞队列,因为此时没有锁,所以enq()方法以CAS重试的方式保证线程安全。
    如果尝试失败,说明线程收到了SIGNAL信号,节点将由负责SIGNAL的线程移动至阻塞队列。这里为了避免线程返回过早,在判断出线程还未移动至阻塞队列的情况下,会通过Thread.yeild()让出CPU时间。

    步骤4.线程在阻塞队列中重新等待锁

    这一步主要是通过acquireQueue()方法。该方法已经在上一篇文章中介绍过了,这里不过多介绍。
    需要注意的一点是,即使线程在等待时被中断,仍然需要在AQS的阻塞队列中等待获取锁。因为外部没有办法在线程获取锁之前发现中断状态,而且即使线程抛出了中断异常,此时线程也是持有锁的,外部需要显式的释放。

    步骤5.检查并设置中断状态

    这一步很简单,主要就是通过步骤3中的checkInterruptWhilewaiting()方法返回值:0表示未中断,-1表示中断发生在SIGNAl之前,1表示中断发生在SIGNAL之后。

    步骤6.清理节点与等待队列的联系

    这里有两种情况,如果线程是因为SIGNAL唤醒的,在唤醒时调用signal()的线程已经清理了被唤醒节点与等待队列的关系。 因为那时唤醒线程持有锁,操作是安全的。
    但是如果对于中断被唤醒的线程,唤醒时是不持有锁的,不能保证线程安全的清理唤醒节点与等待队列的关系。因此就将等待清理工作放在了获取锁之后。

    //此方法并不保证线程安全,因此调用此方法时,必须要在获取锁的情况下调用
            //此方法的目的是为了整理Condition的等待队列,将非CONDITION状态的节点从等待队列中移除
            private void unlinkCancelledWaiters() {
                Node t = firstWaiter;
                Node trail = null;
                while (t != null) {
                    Node next = t.nextWaiter;
                    if (t.waitStatus != Node.CONDITION) {
                        t.nextWaiter = null;
                        if (trail == null)
                            firstWaiter = next;
                        else
                            trail.nextWaiter = next;
                        if (next == null)
                            lastWaiter = trail;
                    }
                    else
                        trail = t;
                    t = next;
                }
            }
    

    步骤7.确定中断状态,并决定是否抛出异常

    这一步也很简单,就是根据interruptMode来确定是否抛出异常,如果interruptMode值为THROW_IE,说明线程被唤醒前先被中断,此时需要抛出InterruptedException


    Condition.signal()过程

    signal()过程比起await()要简单很多。既然await()过程是将节点添加到等待队列,那么signal()作为await()的逆过程,就是将节点从等待队列重新移动到AQS阻塞队列。

            /**
             * 唤醒等待节点
             * 主要的流程就是将节点从Condition的等待队列移到AQS的阻塞队列中,让其重新等待锁的获取
             */
            public final void signal() {
                //先验证唤醒者是否是锁的持有者
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                //唤醒等待队列的第一个节点
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    

    为保证安全,先确定唤醒线程是否为锁的持有者。
    之后找出等待队列中的头节点,将其唤醒。

            /**
             * 唤醒操作(此时占有锁,为线程安全),
             * 找出等待队列中第一个真正要被唤醒的节点,移动到阻塞队列,
             */
            private void doSignal(Node first) {
                do {
                    //找出第一个需要唤醒的节点
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    //断开和之后等待队列的联系,并更新等待队列的头节点
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null); //等待队列的头节点移动失败 且 等待队列中还有其他节点,则继续尝试其他节点
            }
    

    因为头节点可能已经被取消,所以这里会一直在等待队列中从前往后找一个节点,开始唤醒。直到一个节点唤醒成功或者等待队列中没有节点需要唤醒。
    上文也说了唤醒过程其实就是节点的移动过程。

        /**
         * 将节点从Condition的等待队列移动到AQS的阻塞队列(该方法只会在signal相关的方法中被调用)
         */
        final boolean transferForSignal(Node node) {
            //CAS操作更新节点的waitStatus,期待值为CONDITION
            //操作成功,说明没有其他线程在操作这个节点
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) //CAS操作失败,说明节点已经被中断操作过,waitStatus已经变成了0
                return false;
    
            //这里说明CAS操作成功
            //应该是线程安全的
            //将节点添加到阻塞队列的队尾
            Node p = enq(node);
            //判断此时阻塞队列是否有前继节点等待,有就Park线程,等待前继节点唤醒
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    
    

    transferForSignal()操作其实就是先通过CAS操作确定等待队列中的节点还未被取消。如果CAS操作成功,则将其添加到阻塞队列,如果此时阻塞队列还需要等待锁,则让await()的线程继续被挂起。将给线程唤醒的任务(此时已经是获取锁的任务,不再是之前的await()任务)交给阻塞队列中的前继节点。

    await()以及signal()过程的流程图

    跟着流程图在走一遍,可以帮助巩固上述的知识点。
    Condition的await()和signal()过程

    总结

    再借上篇文章重口味的比方梳理下这个流程。
    当你排队进了WC的包厢,想要方便时,你觉得太脏了,于是你在包厢内留了张纸条,希望有人能在厕所包厢干净了在叫你过来上厕所(调用Condition的await ()),随后主动让出了包厢的使用权(释放锁)。后面在排队等着方便的人便进去了。
    你离开后,到另一个地方边玩手机边等(移动到Condition的阻塞队列,并被Park),期间,也有一些人同样觉得厕所太脏,跑了出来,在外面等,并排在了你的后面。
    过了一段时间,有人把打电话给你说厕所已经变干净了,可以去用了,你就重新回到厕所那排起了队伍,等待轮到你用厕所(acquireQueue)。因为那个人仅通知了你,没有通知其他因为嫌弃厕所脏,而跑出来的人,所以那些只能继续在那等别人去叫他们。
    如果你在等待的时候突然有人用其他方式把你喊了回去(外部未给signal却被中断),你又主动从在外面等待走到了厕所前去排队等待继续去用厕所,但是等你重新排到包厢时,发现测试还是很脏(Condition的 条件未满足),你就会抛出异常。

  • 相关阅读:
    uva 10369 Arctic Network
    uvalive 5834 Genghis Khan The Conqueror
    uvalive 4848 Tour Belt
    uvalive 4960 Sensor Network
    codeforces 798c Mike And Gcd Problem
    codeforces 796c Bank Hacking
    codeforces 768c Jon Snow And His Favourite Number
    hdu 1114 Piggy-Bank
    poj 1276 Cash Machine
    bzoj 2423 最长公共子序列
  • 原文地址:https://www.cnblogs.com/insaneXs/p/12219097.html
Copyright © 2011-2022 走看看