zoukankan      html  css  js  c++  java
  • AQS之Condition

    一、引言

           一般我们在使用锁的Condition时,我们一般都是这么使用,以ReentrantLock为例,

    ReentrantLock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    
    lock.lock();
    try{
       condition.await();
    	
    }finally{
    	
       lock.unlock();
    }
    
    lock.lock();
    try{
       condition.signal();
    	
    }finally{
    	
       lock.unlock();
    }
    

      从上面可以知道,我们调用Condition的await和signal方法必须是在获取得到锁的情况下,首先我们以这个为基础,先不管是如何获取得到锁的,那么上面的程序在condition.await()时阻塞当前调用的线程,而调用 condition.signal()方法的时候可能唤起一个正在await阻塞的线程,我这里说的是可能不是一定。为什么这么说,我们来看下await()方法主要做了什么事情。

    二、分析

    下面是await 方法的在jdk8的源码,

    下面是await 方法的在jdk8的源码,
      public final void await() throws InterruptedException {
                if (Thread.interrupted())  // ①
                    throw new InterruptedException();
                Node node = addConditionWaiter(); //②
                int savedState = fullyRelease(node); //③
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) { //④
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // ⑤
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //⑥
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled ⑦
                    unlinkCancelledWaiters();
                if (interruptMode != 0) //  ⑧
                    reportInterruptAfterWait(interruptMode);
      }

    执行过程如下,
    ① ,判断当前显示是否被interrupt? 是的话,会抛出中断异常InterruptedException,
    ② ,调用addConditionWaiter方法,主要是new 一个以当前线程为数据的节点Node,然后添加到condition条件队列里。
    ③ ,调用fullyRelease方法,该方法内部调用release方法,而release方法主要是在AQS队列里面唤起第一个节点(即head的next节点)的线程(如果head后继节点存在的话)。这时,在ASQ同步器内至少有2个活动的线程(一个是当前线程(可能是头节点),另一个是唤起的线程(如果存在))。如果唤起失败,会抛异常IllegalMonitorStateException。注:当存在唤起的线程的时候,这个线程就可以去争取获取锁。
    ④ ,通过isOnSyncQueue方法判断该节点是否在AQS队列中,当调用await时候,肯定不在AQS上,因为addConditionWaiter方法是new 一个新的Node.接着会进入while循环里面。调用 LockSupport.park(this);阻塞当前线程,相当于释放锁。
    ⑤ ,当当前线程被唤起的时候(可能是 另一个await线程唤起的第一个节点可能是这个线程,或者在signal下,前驱节点已经cancel时,第一个firstWaiter节点是该当前节点),需要判断是否被中断,存储在interruptMode,如果被中断则break,否则checkInterruptWhileWaiting返回0,那么会接着判断node节点是否在AQS中,如果还是不在的话,park当前线程,否则跳出while循环。那么node节点是什么时候被加入到AQS上的,答案是在signal方法上。
    ⑥ ,当node节点在AQS队列时,我们需要获取锁,只有当前线程的节点Node在AQS队列上,才能去争取锁。争取锁就是通过调用acquireQueued方法。等下来分析下acquireQueued方法。
    ⑦ , 如果当前节点的node.nextWaiter不为空,说明还有其他线程在该condition上,并且当前的线程已经获取锁,接着清除条件队列上的cancel类型节点
    ⑧ , 如果interruptMode 是InterruptedException类型或者REINTERRUPT类型。则进行相应的抛中断异常或者线程自我中断标志位设置。


    接着,来分析下signal方法

    public final void signal() {
                if (!isHeldExclusively()) // ①
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first); // ②
       }
    

    执行过程如下,
    ① ,如果当前线程不是持有该condition的锁,那么执行抛IllegalMonitorStateException异常。
    ② ,调用doSignal方法,并且条件队列的首节点传入。

        private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null) // ①
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) && (first = firstWaiter) != null); // ②
        } 
    

     

    ① ,这种条件队列firstWaiter指针为next节点,因为当前的节点需要被移除条件队列,并且next节点为空,那么lastWaiter置为null,说明是空条件队列,接着把first.nextWaiter=null,说明移除了条件队列
    ② ,在这里有2步操作,一是transferForSignal,二是 first = firstWaiter,如果我们当前first节点入AQS队列成功,那么transferForSignal返回true,则doSignal的while循环结束,
    如果当前的first节点入AQS返回失败,则需要next的节点重新signal,保证有一个成功的firstWaiter节点入AQS队列。接着来看下transferForSignal 方法主要做了什么事情。

    final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // ①
                return false;
    
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            Node p = enq(node); // ②
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))  // ③
                LockSupport.unpark(node.thread);
            return true;
        }

    ① ,首先传入的节点node是条件队列的第一个节点(在外部已经移除),改变其状态CONDITION,为初始状态0,如果改变失败,说明该节点已经不是条件节点,直接返回false,doSignal方法重新调用新firstWaiter节点入AQS队列,
    ② ,把首节点入AQS节点,enq()方法返回的是入节点的前驱节点。从这里核心方法可以知道,signal() 方法的作用其实只是把等待队列中第一个非取消节点转移到AQS的同步队列尾部。转移后的节点很可能正在在同步队列阻塞着,什么时候唤醒,取决于它的前驱节点是否是头节点。
    ③ ,如果当前前驱节点的waitStatus>0(说明是CANCELLED状态),前驱节点已经Canncel(说明前驱节点已经中断等情况),则可以调用LockSupport.unpark(node.thread)唤起线程,则await方法的park返回可以立即返回,预先将AQS同步队列中取消的节点移除掉,而不用等到获取同步状态失败的时候再去判断了,起到一定的优化作用。


    最后来分析下获取锁方法 acquireQueued

    执行如下:

    final boolean acquireQueued(final Node node, int arg) { // ①
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) { 
                    final Node p = node.predecessor();  
                    if (p == head && tryAcquire(arg)) { //②
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // ③
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    ① ,传入node 为需要获取锁的节点,arg为之前state状态.
    ② ,如果传入的节点的前驱节点是head节点,说明当前节点是AQS队列的首节点,可以尝试去获取锁,即我们需要是要实现的同步语义方法tryAcquire,如果同步语义获取锁成功,则设置当前头节点为头节点。
    这里注意返回的值得语义是是否发生中断,而不是获取锁是否成功。
    ③ ,调用 shouldParkAfterFailedAcquire方法,该方法用来判断获取锁失败后是否需要park当前线程,如果需要park线程,则接着判断该线程是否有中断标志。


    接着我们来看下shouldParkAfterFailedAcquire 方法。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)  // ①
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {  // ②
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //③
            }
            return false;
        }

    ① 前驱节点pred的waitStatus为SIGNAL,说明当前节点有效,返回true,代表需要park当前线程,
    ② 前驱节点已经取消,则删除去取消掉的前驱节点,返回false,外面继续for循环获取锁
    ③ 处在该条件语句的前驱节点的waitStatus必定是 0,或者是传播PROPAGATE,则设置传播节点为SIGNAL,然后返回false,则接着去for循环获取锁,并且失败的时候,调用shouldParkAfterFailedAcquire时知道前驱为SIGNAL(之前由③设置),则需要park线程。
    从这里可以知道transferForSignal方法中,!compareAndSetWaitStatus(p, ws, Node.SIGNAL)语句,如果对其前驱节点设置Node.SIGNAL失败,则不需要等到acquireQueued去判断是否需要park线程,直接unpark线程即可

  • 相关阅读:
    常见的eclipse和真机出现的问题
    volley+okhttp封装,一行代码就可访问网络
    android异步任务处理(网络等耗时操作)
    android手机短信获取
    Android从启动到程序运行整个过程的整理
    android中的广播
    图片旋转问题
    Android Satudio的使用记录
    百度地图初学者
    简单的图片上传和下载
  • 原文地址:https://www.cnblogs.com/liferecord/p/12093024.html
Copyright © 2011-2022 走看看