zoukankan      html  css  js  c++  java
  • AbstractQueuedSynchronizer(AQS)简介

    AbstractQueuedSynchronizer(AQS)是JDK中实现并发编程的核心,平时我们工作中经常用到的ReentrantLock,CountDownLatch等都是基于它来实现的。

    AQS 可以用于构建锁或者其他相关同步装置的基础框架。AQS 利用了一个int数据 state( >=0 )来表示状态,期望它能够成为实现大部分同步需求的基础。

      synchronized : 是JVM层面上的锁,是JVM内部实现锁,锁操作是在宿主机上完成的。 

      Lock(基于AQS的锁) :是Java语言层面上的锁,在Java语言层面上就能完成锁操作。 

      java中实现Lock接口的锁,都是基于AQS的锁,是Java层面的锁。

       公平锁:先进来的线程先执行。当一个线程抢到锁时,会检查同步队列里是否有比当前线程先进来的线程。如果有当前线程就不执行,让给最先进来的线程执行。

      非公平锁 :抢到锁的线程先执行。当一个线程抢到锁时,不会检查同步队列,而是直接执行。

      【注意】:AQS的state与Node的waitStatus  是两个不同的概念。

      1  AQS内部维护一个双向链表(FIFO队列)

       

      1.1  AQS类中维护的队列节点Node类 ,如下

    static final class Node {
         // 共享模式下等待的标记
        static final Node SHARED = new Node();
        
        // 独占模式下等待的标记
        static final Node EXCLUSIVE = null;
    
        // 线程的等待状态 表示线程已经被取消
        static final int CANCELLED =  1;
        
        // 线程的等待状态 表示后继线程需要被唤醒
        static final int SIGNAL    = -1;
        
        // 线程的等待状态 表示线程在Condtion上
        static final int CONDITION = -2;
        
        // 表示下一个acquireShared需要无条件的传播
        static final int PROPAGATE = -3;
    
        /**
         *     节点状态
         *   CANCELLED(1):    一个节点由于超时或者中断需要在CLH队列中取消等待状态,被取消的节点不会再次等待
         *           自己的理解 :  --- 即当前线程正在执行(要么永远不执行--取消执行),不在同步队列里。
         *   
         *   0:  默认值 
         *            自己的理解 :当前线程在同步队列中,而且是头节点 --- 即将要获取到锁(得到执行线程的机会)
         *
         *   SIGNAL(-1):    当前节点的后继节点处于等待状态时,如果当前节点的同步状态被释放或者取消, 必须唤起它的后继节点
         *           自己的理解 :当前线程已经执行完毕,锁释放的状态,同时要通知后续线程进入等待状态 --- 即使next节点 状态更改为 0 
         *     
         *   CONDITION(-2): 当前节点在等待队列中,只有当节点的状态设为0的时候该节点才会被转移到同步队列
         *           自己的理解 :当前线程在等待集合里( wait set),不在同步队列里。
         *   
         *   PROPAGATE(-3): 下一次的共享模式同步状态的获取将会无条件的传播
         *           自己的理解 :   
         */
        volatile int waitStatus;
    
       /**
        *    当前节点的前驱节点,当前线程依赖它来检查waitStatus,在入队的时候才被分配,
        *    并且只在出队的时候才被取消(为了GC),头节点永远不会被取消,
        *    一个节点成为头节点仅仅是成功获取到锁的结果,
        *    一个被取消的线程永远也不会获取到锁,线程只取消自身,而不涉及其他节点 
        */
        volatile Node prev;
    
        /**
         *     当前节点的后继节点,当前线程释放的才被唤起,在入队时分配,在绕过被取消的前驱节点时调整,在出队列的时候取消(为了GC)
         *    如果一个节点的next为空,我们可以从尾部扫描它的prev,双重检查
         *    被取消节点的next设置为指向节点本身而不是null,为了isOnSyncQueue更容易操作
         */
        volatile Node next;
    
        
        //使该节点排队的线程。 在构造时初始化,使用后为空。
        volatile Thread thread;
    
       
        /**
         *     链接到下一个节点的等待条件,或特殊的值SHARED,因为条件队列只有在独占模式时才能被访问,
         *    所以我们只需要一个简单的连接队列在等待的时候保存节点,然后把它们转移到队列中重新获取
         *    因为条件只能是独占性的,我们通过使用特殊的值来表示共享模式
         */
        Node nextWaiter;
    
        //如果节点处于共享模式下等待直接返回true
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
    
      
        //此节点的前一个节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
    
        Node() {    // Used to establish initial head or SHARED marker
        }
        
        // 指定线程和模式的构造方法
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        // 指定线程和节点状态的构造方
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
    View Code

      1.2   Node是构成同步队列的基础,看一下Node的结构:

        

     

      1.3  同步队列中首节点是获取到锁的节点,它在释放的时候会唤醒后继节点,后继节点获取到锁的时候,会把自己设为首节点。  

        

      注意,设置首节点不需要使用CAS,因为在并发环境中只有一个线程都获取到锁,只有获取到锁的线程才能设置首节点。

      2  AQS类的成员变量:

    //head:等待队列头部,延迟初始化,直到调用enq()方法才真正初始化;
    private transient volatile Node head;
    
    //tail:等待队列尾部,延迟初始化,直到调用enq()方法才真正初始化;
    private transient volatile Node tail;
    
    //state:AQS状态位,通过try*方法维护;
    private volatile int state;

      2.1  state(AQS状态)的操作 : getState() ,setState()  ,compareAndSetState()

      3  实现自定义同步器时,需要重写以下方法。 

    protected boolean tryAcquire(int arg) 排它的获取这个状态。这个方法的实现需要查询当前状态是否允许获取,然后再进行获取(使用compareAndSetState来做)状态。
    protected boolean tryRelease(int arg)  释放状态。
    protected int tryAcquireShared(int arg) 共享的模式下获取状态。
    protected boolean tryReleaseShared(int arg) 共享的模式下释放状态。
    protected boolean isHeldExclusively() 在排它模式下,状态是否被占用。

     ===========================================================================================================================================================================================================

    独占锁的获取 

    ReentrantLock的非公平锁的lock()方法

    final void lock() {
            if (compareAndSetState(0, 1))  // CAS方式设置状态,若成功,设置当前线程为活动线程;否则 调用 acquire()方法 
                setExclusiveOwnerThread(Thread.currentThread());    // 设置当前线程为活动(执行)线程  
            else
                acquire(1);
    }

    acquire() 是AQS的方法 :

    public final void acquire(int arg) {
            /*  
             * tryAcquire() :获取 AQS 的状态state ,此方法是需要被重写的。
             * addWaiter(): 把当前线程加入到同步队列尾部 。  
             * acquireQueued():以死循环的方式获取同步状态 
             */
            if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt(); //中断线程   此方法的源码 Thread.currentThread().interrupt();  
    }

    ReentrantLock的非公平锁的  tryAcquire() 方法 : 

    protected final boolean tryAcquire(int acquires) {     
            final Thread current = Thread.currentThread();//获取当前线程
            int c = getState();                         // 获取AQS同步器的状态
            if (c == 0) {                               // 表示当前线程为等待执行线程,就是位于同步队列的头结点
                if (compareAndSetState(0, acquires)) {  // 设置AQS状态。若成功设置(state = 1 ),则执行当前线程;否则此方法返回false,表示没有获取到-- state=0
                    setExclusiveOwnerThread(current);   // 设置当前线程为活动(执行)线程    方法源码 : exclusiveOwnerThread = thread;
                    return true;                        // 表示获取到AQS状态 -- state= 1
                }
            }else if (current == getExclusiveOwnerThread()) {   // 比较当前线程与活动线程是否一致,
                int nextc = c + acquires;                       // 这就是可重入的体现  state >= 0 
    if (nextc < 0) // 若此处为false ,则 c = 1,即表示线程正在执行 throw new Error("Maximum lock count exceeded"); setState(nextc); // 设置状态 return true; } return false; }

    ReentrantLock的公平锁的  tryAcquire() 方法 :

    protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();         // 获取当前线程
            int c = getState();                                    // 获取AQS同步器的状态
            if (c == 0) {                                        // 表示当前线程为等待执行线程,就是位于同步队列的头结点
                /**
                 * hasQueuedPredecessors() :查询任何线程是否等待获取比当前线程更长的时间。 
                 *         true :当前线程之前有一个排队的线程;
                 *         false: 当前线程在队列的头部 或  同步队列为空,  
                 * compareAndSetState() :设置AQS状态
                 */
                if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);             // 设置当前线程为活动(执行)线程    方法源码 : exclusiveOwnerThread = thread;
                    return true;                                // 表示获取到AQS状态 --state= 1
                }
            } else if (current == getExclusiveOwnerThread()) {     // 比较当前线程与活动线程是否一致
                int nextc = c + acquires;                        // 此时c in( 1,-1,-2,-3)
                if (nextc < 0)                                    // 若此处为false ,则  c = 1,即表示线程正在执行
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);                                // 设置状态
                return true;
            }
            return false;
    }

    tryAcquire ( ) 总结  :

      AQS状态获取成功 :1)把AQS状态的值从 0 设置成 1 (state = 1 );2)当前线程设置成活动线程;

      AQS状态获取失败 :没有做任何改变 ,此时ASQ状态是 0  ( state =0 )

    AQS的 addWaiter()方法 :

    private Node addWaiter(Node mode) {     //把当前线程假如到同步队列中。
            //  mode :就是 独占模式下的 标识      static final Node EXCLUSIVE = null;
            //  Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; }      
            //  新建线程节点的 waitStatus = 0  --- 即 每一个新进来的线程状态都是 0  
            Node node = new Node(Thread.currentThread(), mode); //把当前线程封装成 Node,
            Node pred = tail;            //获取尾节点
            if (pred != null) {          //尾节点不为空,说明同步队列里有其他节点 ;
                node.prev = pred;
                //若成功把当前线程插入到同步队列尾部,compareAndSetTail()返回true
                //否则,返回false;然后把这个任务(就是把当前线程加入到同步队列中)交给enq()方法。
                if (compareAndSetTail(pred, node)) {  
                    pred.next = node;
                    return node;          //返回线程
                }
            }
            enq(node);                    // 死循环方式把节点插入到同步队列尾部
            return node;
    }

    AQS的 enq()方法 :

    private Node enq(final Node node) { //能确保当前线程加入到同步队列尾部
            for (;;) {             // 这是一个死循环 ,出口在  return t; 语句。
                Node t = tail;    // 现在    t 就是尾节点
                /*
                     同步队列 特点 : 
                         head节点 : 始终都为null节点      即 head.thread=null;
                         tail节点 :只有初始化时才为null节点 ;以后tail节点总是指向最后一个添加的线程节点。
                      tail节点是null,而不是null节点,说明同步队列还没有初始化。
                      由此得知,同步队列初始化时机:第一次使用时
               注意 :只有head节点能获取到同步状态(state),因为这是一个FIFO(先进先出的队列);当一个节点获取到状态后,就把自己设置为head节点(head节点是一个null节点)。
    */ if (t == null) { //初始化同步队列 if (compareAndSetHead(new Node())) tail = head; } else {                    // 向同步队列中添加线程节点 node.prev = t;              // 当前节点指向先前尾节点 if (compareAndSetTail(t, node)) {   // 设置尾结点指针指向当前节点 t.next = node; // 把先前尾节点的next指针,指向当前节点(即现在的尾节点) return t; // 返回先前尾节点,就是现在尾节点的prev节点 } } } }

    addWaiter ( ) 总结 :把当前线程以死循环的方式加入到同步队列尾部

    节点出入到同步队列tail的流程

        

    AQS的acquireQueued()方法:

    final boolean acquireQueued(final Node node, int arg) { //这个方法的目的:以死循环的方式获取AQS状态。
            boolean failed = true;
            try {
                boolean interrupted = false;              // 是否被中断
                
                for (;;) {                                // 死循环   出口是 return interrupted; 语句  结束死循环的依据是JVM根据上一次死循环事件来灵活设置的。
                    final Node p = node.predecessor();     // 当前线程的前一个节点     
                    // 1. 这里就是以死循环的方式不断获取状态。
                    // 当前线程能够获取到AQS状态时,说明当前线程必须设置为头结点,因为只有头结点有资格获取到锁 
                    // 头结点就是获取到AQS状态的节点,即  头节点就是获取到锁的节点。
                    if (p == head && tryAcquire(arg)) {    // 
                        setHead(node); //头结点是一个null节点
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;  //这是这个线程的我唯一出口
                    }
                    //2. 这里相当于维护同步队列的状态,以死循环的方式不断维护同步队列的状态:同步队列节点数,和prev node 的 wait status 的值
                    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
    }
    AQS的 shouldParkAfterFailedAcquire() 与 parkAndCheckInterrupt()方法:
    
    
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            // 前置节点状态是signal,说明前置节点 已经释放锁了,同时当前节点会被通知 ;
            // 那当前节点可以安全阻塞,因为前置节点承诺执行完之后会通知唤醒下一节点----- 即 当前节点
            if (ws == Node.SIGNAL)  return true;
            // 删除被中断的或是正在执行的线程节点,因为同步队列里存放的是等待执行的节点----就是阻塞的节点。          
            if (ws > 0) {     
                do {
                    node.prev = pred = pred.prev; //删除前置节点
                } while (pred.waitStatus > 0);
                pred.next = node;
                
            } else {  // 此时  ws = 0 / -2 / -3 ,  
    
                // 前置节点是0或者propagate状态,这里通过CAS把前置节点状态改成signal
                // 这里不返回true让当前节点阻塞,而是返回false,目的是让调用者再check一下当前线程是否能 成功获取锁,失败的话再阻塞,
                //这里说实话我也不是特别理解这么做的原因  
                //------自己的理解 :尝试获取修改前置节点状态,但只有前置节点释放锁后才能修改成功。           
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//设置前置节点的状态为 -1 
            }
            
            return false;
    }
    //此方法会中断线程,返回线程是否被挂起的boolean值 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 阻塞当前线程,监事是当前sync对象 return Thread.interrupted(); // 阻塞返回后,返回当前线程是否被中断 }
    //LockSupport的方法 //此方法是挂起当前线程,返回该线程是否被中断过,如果被中断过,直接设置interrupted = true public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); // 设置当前线程的监视器blocker UNSAFE.park(false, 0L); // 这里调用了native方法到JVM级别的阻塞机制阻塞当前线程 setBlocker(t, null); // 阻塞结束后把blocker置空 } //LockSupport的方法 private static void setBlocker(Thread t, Object arg) { UNSAFE.putObject(t, parkBlockerOffset, arg); }
    
    
    acquireQueued()总结 :每一个节点以死循环的方式做两件事:
      1. 获取AQS状态。
      2. 维护同步队列里的节点状态。

     同步队列中每一个节点自选获取锁的流程。

         

      当前节点成功获取锁的流程 :把当前节点设置为head节点

        

     如果获取同步状态失败,采用cancelAcquire方法取消当前节点 : 结束死循环判断依据是JVM根据上一次死循环的时间灵活设置的。

    // 取消当前节点
    private void cancelAcquire(Node node) {
        // 当前节点不存在的话直接忽略 
            if (node == null)
                return;
            node.thread = null;  // 把当前节点的线程设为null
            // 获取当前节点的前驱pred
        Node pred = node.prev;
        while (pred.waitStatus > 0)   // 如果prde的ws > 0,直接跳过pred继续往前遍历,直到pred的
            node.prev = pred = pred.prev;  // ws <= 0
     
            // 获取pred的后继predNext
            Node predNext = pred.next;
     
            // 把node节点的ws设为CANCELLED
            node.waitStatus = Node.CANCELLED;
     
            // 如果node是尾节点,利用CAS把pred设为尾节点,predNext为null
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // pred不是头结点 && pred的线程不为空 && pred.ws = singal
            // 利用CAS把node的next设为pred的next节点
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {   // node是头结点,唤起它的后继节点
                    unparkSuccessor(node);
                }
     
                node.next = node; // node指向自己,便于GC
        }
    }

    分三种情况进行考虑: 

       1. node本身就是尾节点,直接把node的prev设为尾节点

       2. node的prev不是头结点,直接把prev和node的next进行连接

       3. node的prev是头结点,使用unparkSuccessor唤醒后继节点

     这就是整个acquireQueued的流程,如果执行完acquireQueued方法返回线程被中断过,那线程最后要进行自我中断一下

     /**
         * 当前线程的自我中断
         */
    private static void selfInterrupt() {
            Thread.currentThread().interrupt();
    }

    问题:在acquireQueued(final Node node,int arg)方法中,当前线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态,

    这是为什么?原因有两个,如下:
      第一,头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点。
      第二,维护同步队列的FIFO原则。该方法中,节点自旋获取同步状态的行为如下图所示

        

    由于非首节点线程前驱节点出队或者被中断而从等待状态返回,随后检查自己的前驱是否是头节点,如果是则尝试获取同步状态。

    节点和节点之间在循环检查的过程中基本不相互通信,而是简单地判断自己的前驱是否为头节点,这样就使得节点的释放规则符合FIFO,并且也便于对过早通知的处理(过早通知是指前驱节点不是头节点的线程由于中断而被唤醒)。

    节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说每个线程)都在自省地观察,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这个自旋过程中(并会阻塞节点的线程)。

    经过上面的分析,独占式锁的获取过程也就是acquire()方法的执行流程如下图所示:

     

        

    独占锁释放

    当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能够继续获取同步状态。

    通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)。

    源码如下:

    public final boolean release(int arg) {
        if (tryRelease(arg)) { //
            Node h = head;  //
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h); //
            return true;
        }
        return false;
    }

    首先,会通过模板方法调用子类的tryRelease(),如果释放成功,获取当前头结点,

    如果头结点不为空,同时头结点的等待状态不等于0,则执行unparkSuccessor()方法,唤醒等待的队列中的下一个节点的线程。

     /**
         * 如果node存在唤醒它的后继节点
         */
        private void unparkSuccessor(Node node) {
            /*
             * 获取node的ws,如果ws<0,使用CAS把node的ws设为0,表示释放同步状态
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
     
            /*
             * 获取node的后继节点s,根据条件s = null 或者 s.ws > 0,从同步队列的尾部开始遍历,
             * 直到找到距node最近的满足ws <= 0的节点t,把t赋给s,唤醒s节点的线程
             * 如果s不为null && s的ws <= 0,直接唤醒s的线程
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
    }

    在unparkSuccessor()方法中,首先先获取头结点的等待状态,如果等待状态为-1,则将状态置为0。

    接下来,找到下一个需要唤醒的结点s,如果它为空或已取消,则从队列中去寻找最前边的那个未放弃的节点。

    最后,执行唤醒的操作,通过LockSupport提供的工具类。下一个等待节点的线程被唤醒后,它在自旋tryAcquire()方法会返回true,则表示自己拿到资源,将前一个头结点踢出队列,将自己设置为头结点。

    还记得我们之前提到的,当第一次构建队列的时候,此时头结点与尾节点全部都是空的,这时,首先会新增一个空的头结点,其实这里的设计非常精妙。

    我在看源码的时候,是很好奇为何要初始化的时候设置一个空的头结点,

      其主要的原因是:如果没有一个空的头结点,在acquireQueued()方法中的自旋就会出现问题,因为自旋是判断的自己的前一个节点是否为头节点,如果第一次构建队列,就把当前等待节点放置在头结点,它就没有前置节点了,它的自旋条件永远无法成立。因此,空的头结点的创建是非常必要的。

    好啦,我们刚刚分析了独占式同步状态获取与释放以及同步队列的原理,我们来总结一下这块: 
    - 线程获取锁失败,线程被封装成Node进行入队操作,核心方法在于addWaiter()和enq(),同时enq()完成对同步队列的头结点初始化工作以及CAS操作失败的重试; 
    - 线程获取锁是一个自旋的过程,当且仅当当前节点的前驱节点是头结点并且成功获得同步状态时,节点出队即该节点引用的线程获得锁,否则,当不满足条件时就会调用LookSupport.park()方法使得线程阻塞; 
    - 释放锁的时候会唤醒后继节点。

    总体来说:在获取同步状态时,AQS维护一个同步队列,获取同步状态失败的线程会加入到队列中进行自旋;移除队列(或停止自旋)的条件是前驱节点是头结点并且成功获得了同步状态。在释放同步状态时,同步器会调用unparkSuccessor()方法唤醒后继节点。

    --------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    独占可中断式获取同步状态

    我们在前面提到过,Lock锁的实现与synchronized相比,更加的灵活,可以响应中断以及超时时间设置等特性,而Lock的这些特性的实现也是基于AQS的acquireInterruptibly()方法实现的,我们现在来看一下源码的实现

        /**
         * 当前线程被中断后,直接抛出异常,否则的话,再次调用tryAcquire方法获取同步状态
         */
        public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);// 线程获取锁失败
        }
     
     
        /**
         *  以独占模式获取同步状态,线程被中断直接抛出异常
         */
        private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);//将节点插入到同步队列中
    boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } //如果发现被中断,直接抛出异常 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                // 线程中断抛异常
    throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

    从上面的代码中可以看到,基本上实现与acquire()一致,唯一的区别是当parkAndCheckInterrupt返回true时,即线程阻塞时该线程被中断,代码抛出被中断异常。

    ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    独占式超时获取同步状态

    AQS中的tryAcquireNanos()方法可以设置一个超时时间,该方法会在三种情况下才会返回:

    • 在超时时间内,当前线程成功获取了锁;
    • 当前线程在超时时间内被中断;
    • 超时时间结束,仍未获得锁返回false。

    我们来看一下其源码实现:

         /**
         * 以独占模式获取同步状态,线程被中断,直接抛出异常,如果在指定时间内没有获取到同步状态,
         * 直接返回false,表现获取同步状态失败.
         */
        public final boolean tryAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            return tryAcquire(arg) ||
                doAcquireNanos(arg, nanosTimeout); //实现超时等待效果
    }
    
    private boolean doAcquireNanos(int arg, long nanosTimeout)  throws InterruptedException {
            if (nanosTimeout <= 0L) return false;
         //1.根据超时时间和当前时间计算出截至时间 
    final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor();
              // 2. 当前线程获取锁出队列
    if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; }
              // 3.1 . 重新计算超时时间 nanosTimeout
    = deadline - System.nanoTime();
              //3.2 . 已经超时返回false
    if (nanosTimeout <= 0L) return false;
              //3.3 . 线程阻塞等待
    if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout);
              // 3.4 . 线程被中断 抛出中断异常
    if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

    程序逻辑如图所示:

         
    程序逻辑同独占锁可响应中断式获取基本一致,唯一的不同在于获取锁失败后,对超时时间的处理上,
    在第1步会先计算出按照现在时间和超时时间计算出理论上的截止时间,
    比如当前时间是8h10min,超时时间是10min,那么根据deadline = System.nanoTime() + nanosTimeout计算出刚好达到超时时间时的系统时间就是8h 10min+10min = 8h 20min。
    然后根据deadline - System.nanoTime()就可以判断是否已经超时了,
    比如,当前系统时间是8h 30min很明显已经超过了理论上的系统时间8h 20min,deadline - System.nanoTime()计算出来就是一个负数,自然而然会在3.2步中的If判断之间返回false。
    如果还没有超时即3.2步中的if判断为true时就会继续执行3.3步通过LockSupport.parkNanos使得当前线程阻塞,同时在3.4步增加了对中断的检测,若检测出被中断直接抛出被中断异常。

    ==================================================================================================================================================================================

  • 相关阅读:
    yzoj P2344 斯卡布罗集市 题解
    yzoj P2350 逃离洞穴 题解
    yzoj P2349 取数 题解
    JXOI 2017 颜色 题解
    NOIP 2009 最优贸易 题解
    CH 4302 Interval GCD 题解
    CH4301 Can you answer on these queries III 题解
    Luogu2533[AHOI2012]信号塔
    Luogu3320[SDOI2015]寻宝游戏
    Luogu3187[HNOI2007]最小矩形覆盖
  • 原文地址:https://www.cnblogs.com/wdp1990/p/11249212.html
Copyright © 2011-2022 走看看