zoukankan      html  css  js  c++  java
  • 深入浅出AQS源码解析

    最近一直在研究AQS的源码,希望可以更深刻的理解AQS的实现原理。虽然网上有很多关于AQS的源码分析,但是看完以后感觉还是一知半解。于是,我将自己的整个理解过程记录下来了,希望对大家有所帮助。

    基本原理

    AQS是Java中锁的基础,主要由两个队列组成。一个队列是同步队列,另一个是条件队列

    同步队列的原理

    • 同步队列的队列头部是head,队列尾部是tail节点,head节点是一个空节点,同步队列是一个双向链表,通过nextprev连接所有节点
    • 所有的线程在竞争锁的时候都会创建一个Node节点,线程与节点绑定在一起,(如果是同步锁和排他锁不同之处是通过nextWaiter来区分的)并且添加到同步队列的尾部
    • head的第一个节点获取锁,其余节点都需要等待被唤醒
    • 同步队列中的节点会存在取消和null的情况(如:线程超时中断、线程更新节点的中间态),被取消和null的节点不能被唤醒,将会被视为无效节点
    • 一个线程只能被有效的前驱节点(取消和null的节点除外)唤醒
    • 持有锁的线程只能是有一个,其他有效节点对应的线程都会被挂起

    条件队列的原理

    • 一个同步队列可以对应多个条件队列
    • 条件队列是一个单向链表,通过nextWaiter来连接起来,条件队列的头节点是firstWaiter,尾节点是lastWaiter
    • 某个条件队列中满足条件的节点(被signalsignalAll方法唤醒的节点)才会被转移到同步队列
    • 条件队列中的被转移到同步队列的节点是从头节点开始,条件队列中被阻塞的线程会添加到队列的尾部

    同步队列的实现

    首先,了解以下同步队列中队列的节点Node的数据结构

    static final class Node {
            /** 共享锁的标识 */
            static final Node SHARED = new Node();
            /** 排他锁的标识 */
            static final Node EXCLUSIVE = null;
    
            /** 线程取消 */
            static final int CANCELLED =  1;
            /** 持有锁的线程的后继线程被挂起 */
            static final int SIGNAL    = -1;
            /** 条件队列标识 */
            static final int CONDITION = -2;
            /**
             * 共享锁情况下,通知所有其他节点
             */
            static final int PROPAGATE = -3;
    
            /**
             * waitStatus的取值如下:
             *   SIGNAL(-1): 当前节点的后继节点应该被挂起
             *   CANCELLED(1): 当前节点被取消
             *   CONDITION(-2): 当前节点在条件队列
             *   PROPAGATE(-3): 释放共享锁时需要通知所有节点
             *   0: 初始值
             *
             */
            volatile int waitStatus;
    
            /**
             * 前驱节点
             */
            volatile Node prev;
    
            /**
             * 后继节点
             */
            volatile Node next;
    
            /**
             * 节点对应的线程
             */
            volatile Thread thread;
    
            /**
             * 在共享锁的情况下,该节点的值为SHARED
             * 在排他锁的情况下,该节点的值为EXCLUSIVE
             * 在条件队列的情况下,链接的是下一个等待条件的线程
             */
            Node nextWaiter;
    }
    

    其次,我们来看一下同步队列的链表结构
    同步队列链表

    接着,我们根据同步队列的原理来分析以下acquirerelease需要做哪些事情:

    实现acquire功能需要做的事情

    1. 创建一个Node节点node(该节点可能是排他锁,也可以能是共享锁)
    2. node添加到同步队列尾部,如果同步队列为空(初始情况下),需要先创建一个空的头节点,然后再添加到队列的尾部
    3. 如果node的前驱节点是head,说明node是第一个节点,能够获取锁,需要将head修改成node,释放前驱节点的资源
    4. 如果node的前驱节点不是head,说明获取锁失败,需要检测是否需要将node绑定的线程挂起,分以下几种情况:
      • 如果nodewaitStatus已经被设置为SIGNAL 表示需要被挂起
      • 如果nodewaitStatus设置为CANCEL表示该节点已经被取消,需要被去掉,并修改 nodeprev,直到链接上一个有效的节点为止
      • 否则将nodewaitStatus设置为SIGNAL,表示即将要被挂起
    5. 如果需要将node绑定的线程挂起,则让出CPU,直到当前驱节点来唤起node才会开始继续从步骤3开始执行

    与acquire功能相关的代码

    • acquire方法:获取排他锁
    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
    1. tryAcquire(arg):对外提供的一个扩展方法,常用的锁都要实现这个方法,具体实现与锁相关

    2. addWaiter(Node.EXCLUSIVE): 创建一个排他锁节点,并将该节点添加到同步队列尾部,代码如下:

    private Node addWaiter(Node mode) {
            // 创建一个node,EXCLUSIVE类型
            Node node = new Node(mode);
    
            for (;;) {
                // 获取尾节点
                Node oldTail = tail;
                if (oldTail != null) {
                    // 设置即将成为尾节点的前驱
                    node.setPrevRelaxed(oldTail);
                    // CAS操作设置尾节点
                    if (compareAndSetTail(oldTail, node)) {
                        // 将新尾节点的前驱节点与新的尾节点关联起来
                        oldTail.next = node;
                        // 返回添加的节点
                        // 这个节点现在不一定是尾节点,因为如果有多个线程调用这个方法时,
                        // 可能还有节点添加在这个节点后面
                        return node;
                    }
                } else {
                    // 如果队列为空,初始化头节点
                    initializeSyncQueue();
                }
            }
        }
    
    1. acquireQueued同步队列中的节点获取排他锁
    final boolean acquireQueued(final Node node, int arg) {
            try {
                // 线程是否中断
                boolean interrupted = false;
                for (;;) {
                    // 获取前驱节点
                    final Node p = node.predecessor();
                    // 如果前驱节点是头节点,获取锁
                    if (p == head && tryAcquire(arg)) {
                        // 修改头节点
                        setHead(node);
                        // 释放头节点的资源
                        p.next = null; // help GC
                        // 返回线程中断的状态
                        // 这也是该方法唯一的返回值
                        // 没有获取锁的线程会一直执行该方法直到获取锁以后再返回
                        return interrupted;
                    }
                    // 获取锁失败后是否需要将线程挂起
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt()) // 线程挂起并返回是否被中断
                        interrupted = true;
                }
            } catch (Throwable t) {
                // 取消该节点
                cancelAcquire(node);
                throw t;
            }
        }
    
    1. shouldParkAfterFailedAcquire:检测线程获取锁失败以后是否需要被挂起
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            // 前驱节点的状态
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * 状态已经设置成SIGNAL,可以直接挂起该节点
                 */
                return true;
            // 节点被取消
            if (ws > 0) {
                /*
                 * 找到pred第一个有效的前驱节点
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                // pred可能是一个新的节点,需要将pred的next重写设置为node
                pred.next = node;
            } else {
                /*
                 * CAS操作将pred节点的状态设置为SIGNAL
                 */
                pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
            }
            // 只有当pred节点的waitStatus已经是SIGNAL状态时,才可以安全的挂起线程
            // 否则需要不能被挂起
            return false;
        }
    
    1. parkAndCheckInterrupt:将当前线程挂起,并检测当前线程是否中断
    private final boolean parkAndCheckInterrupt() {
            // 线程挂起
            LockSupport.park(this);
            // 检测线程是否中断
            return Thread.interrupted();
        }
    
    1. cancelAcquire:取消节点
     private void cancelAcquire(Node node) {
            // 如果节点为空,什么都不做
            if (node == null)
                return;
            // 释放线程
            node.thread = null;
    
            // 从后往前过滤掉所有的被取消的节点
            Node pred = node.prev;
            while (pred.waitStatus > 0)
                node.prev = pred = pred.prev;
    
            // 有效前驱节点的nex节点
            Node predNext = pred.next;
    
            // 将node设置为CANCELLED
            node.waitStatus = Node.CANCELLED;
    
            // 如果是尾节点,设置新的尾节点
            if (node == tail && compareAndSetTail(node, pred)) {
                // 将新的尾节点的后续设置为null
                pred.compareAndSetNext(predNext, null);
            } else {
                // If successor needs signal, try to set pred's next-link
                // so it will get one. Otherwise wake it up to propagate.
                int ws;
                // 如果前驱节点的线程不为null并且waitStatus为SIGNAL
                if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                     (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                    Node next = node.next;
                    // 将node设置成pred的后继节点
                    if (next != null && next.waitStatus <= 0)
                        pred.compareAndSetNext(predNext, next);
                } else {
                    // 唤起node节点的后继节点
                    // 因为node节点已经释放锁了
                    unparkSuccessor(node);
                }
    
                node.next = node; // help GC
            }
        }
    
    1. unparkSuccessor:唤醒后继节点
    private void unparkSuccessor(Node node) {
            /*
             * 获取node节点的waitStatus
             */
            int ws = node.waitStatus;
           // 用CSA操作将waitStatus设置成初始状态
           // 不管设置是否成功,都无所谓,因为该节点即将被销毁
            if (ws < 0)
                node.compareAndSetWaitStatus(ws, 0);
            /*
             * 获取node的后继节点
             */
            Node s = node.next;
            // 如果后继节点为null或者被取消,
            // 通过从同步队列的尾节点开始一直往前找到一个有效的后继节点
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node p = tail; p != node && p != null; p = p.prev)
                    if (p.waitStatus <= 0)
                        s = p;
            }
            // 如果后继节点不为空
            if (s != null)
                LockSupport.unpark(s.thread);// 唤醒后继节点的线程
        }
    

    acquire方法类似的还有acquireInterruptiblytryAcquireNanosacquireSharedacquireSharedInterruptiblytryAcquireSharedNanos,我们都一一分析以下

    • acquireInterruptibly方法:获取可中断的排他锁
    public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted()) // 如果线程中断,直接返回
                throw new InterruptedException();
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg); // 中断式的获取锁
        }
    
    1. doAcquireInterruptibly:可中断式的获取锁
    private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
           // 创建一个排他节点加入同步队列
            final Node node = addWaiter(Node.EXCLUSIVE);
            try {
                for (;;) {
                    // 获取前驱节点
                    final Node p = node.predecessor();
                    // 如果前驱节点是头节点,说明已经获取的锁
                    if (p == head && tryAcquire(arg)) {
                        // 修改头节点
                        setHead(node);
                        p.next = null; // help GC
                        return;
                    }
                    // 如果没有获取锁,检测是否需要挂起
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException(); // 如果发现线程已经被中断,需要抛出异常
                }
            } catch (Throwable t) {
                // 发生异常取消节点
                cancelAcquire(node);
                throw t;
            }
        }
    
    • tryAcquireNanos方法:超时中断获取排他锁
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException(); // 线程中断直接返回
            return tryAcquire(arg) ||
                doAcquireNanos(arg, nanosTimeout); // 超时获取排他锁
        }
    
    1. doAcquireNanos:超时获取排他锁
    private boolean doAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            // 如果超时直接返回
            if (nanosTimeout <= 0L)
                return false;
            // 获取超时时长
            final long deadline = System.nanoTime() + nanosTimeout;
            // 添加一个排他节点到同步队列尾部
            final Node node = addWaiter(Node.EXCLUSIVE);
            try {
                for (;;) {
                     // 获取前驱节点
                    final Node p = node.predecessor();
                    // 已经获取锁
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        return true;
                    }
                    nanosTimeout = deadline - System.nanoTime();
                    // 如果超时了就取消
                    if (nanosTimeout <= 0L) {
                        cancelAcquire(node);
                        return false;
                    }
                    // 检测节点是否需要被挂起
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                        // 如果需要挂起,且超时时长大于SPIN_FOR_TIMEOUT_THRESHOLD
                        // 线程挂起nanosTimeout时间
                        LockSupport.parkNanos(this, nanosTimeout); 
                    if (Thread.interrupted())
                        throw new InterruptedException();
                }
            } catch (Throwable t) {
                // 发生异常取消节点
                cancelAcquire(node);
                throw t;
            }
        }
    
    • acquireShared方法:获取共享锁
    public final void acquireShared(int arg) {
            // 对外提供的一个扩展方法,常用的锁都要实现这个方法,
            // 该方法的实现与锁的用途有关
            if (tryAcquireShared(arg) < 0) 
                doAcquireShared(arg); // 获取共享锁
        }
    
    1. doAcquireShared:获取共享锁
     private void doAcquireShared(int arg) {
            // 添加一个共享节点到同步队列尾部
            final Node node = addWaiter(Node.SHARED);
            try {
                boolean interrupted = false;
                for (;;) {
                    // 获取前驱节点
                    final Node p = node.predecessor();
                    if (p == head) {
                        // 返回结果大于等于0表示获取共享锁
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            // 设置头节点并广播通知其他获取共享锁的节点
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            // 如果线程被中断,将该线程中断
                            // 共享锁会被多个线程获取,如果需要中断
                            // 所有获取共享锁的线程都要被中断
                            if (interrupted)
                                selfInterrupt();
                            return;
                        }
                    }
                    // 检测是否需要挂起
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt()) // 挂起并中断
                        interrupted = true;
                }
            } catch (Throwable t) {
                // 发生异常取消节点
                cancelAcquire(node);
                throw t;
            }
        }
    
    1. setHeadAndPropagate:设置头节点并广播其他节点来获取锁
     private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // 记录旧的头节点
            setHead(node);// 设置新的头节点
            /*
             * 如果头节点为null或者是不是取消状态,尝试唤醒后继节点
             */
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                // node节点的next是SHARED,即共享锁
                if (s == null || s.isShared())
                    // 唤起获取共享锁的线程
                    doReleaseShared();
            }
        }
    
    1. doReleaseShared:唤醒等待共享锁的节点
     private void doReleaseShared() {
            /*
             * 唤醒时是从头节点开始先唤醒第一个共享节点,
             * 第一个共享节点被唤醒后会在doAcquireShared方法里继续执行(之前就是在这个方法里被挂起的)
             * 第一个共享节点如果获取锁会调用setHeadAndPropagate方法修改头节点,然后再调用doReleaseShared方法
             * 唤醒第二个共享节点,以此类推,最后把所有的共享节点都唤醒
             */
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    // 获取头节点的状态
                    int ws = h.waitStatus;
                    // 如果头节点是SIGNAL,需要将状态设置为0,表示已经即将被唤醒
                    if (ws == Node.SIGNAL) {
                        if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                            continue;            // 如果失败了说明有其他线程在修改头节点,需要继续重试
                        unparkSuccessor(h); // 唤醒头节点的后继节点
                    }
                    else if (ws == 0 &&
                             !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                        continue;                // 将头节点状态从0设置成PROPAGATE,如果失败了继续,因为也有其他获取共享锁的线程在更改头节点
                }
                // 如果头节点未改变(因为没有后继节点需要等待共享锁),跳出循环
                if (h == head)
                    break;
            }
        }
    
    1. selfInterrupt:中断当前线程
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
    
    • acquireSharedInterruptibly方法:可中断的获取共享锁
    public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException(); // 如果线程被中断抛出异常
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg); // 可中断的方式获取共享锁
        }
    
    1. doAcquireSharedInterruptibly:可中断的方式后去共享锁
     private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            // 添加共享锁节点到同步队列尾部
            final Node node = addWaiter(Node.SHARED);
            try {
                for (;;) {
                    // 获取前驱节点
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            // 获取共享锁以后修改头节点,通知其他等待共享锁的节点
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            return;
                        }
                    }
                    // 线程获取共享锁失败后需要挂起,并且发现线程被中断,所以抛出异常
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } catch (Throwable t) {
                // 发生异常取消节点
                cancelAcquire(node);
                throw t;
            }
        }
    
    • tryAcquireSharedNanos方法:超时中断获取共享锁
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted()) // 线程如果中断了,直接抛出异常
                throw new InterruptedException();
            return tryAcquireShared(arg) >= 0 ||
                doAcquireSharedNanos(arg, nanosTimeout); // 超时获取共享锁
        }
    
    1. doAcquireSharedNanos:超时的方式获取中断锁
    private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            // 超时直接返回
            if (nanosTimeout <= 0L)
                return false;
            final long deadline = System.nanoTime() + nanosTimeout;
            // 添加共享节点到同步队列尾部
            final Node node = addWaiter(Node.SHARED);
            try {
                for (;;) {
                    // 获取前驱节点
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            // 获取锁,修改头节点,通知所有其他等待共享锁的节点
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            return true;
                        }
                    }
                    nanosTimeout = deadline - System.nanoTime();
                    if (nanosTimeout <= 0L) {
                        // 超时取消节点
                        cancelAcquire(node);
                        return false;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                        // 如果需要挂起,且超时时长大于SPIN_FOR_TIMEOUT_THRESHOLD
                        // 线程挂起nanosTimeout时间
                        LockSupport.parkNanos(this, nanosTimeout);
                    if (Thread.interrupted())
                        throw new InterruptedException(); // 中断了抛出异常
                }
            } catch (Throwable t) {
                // 发生异常取消节点
                cancelAcquire(node);
                throw t;
            }
        }
    

    实现release功能需要做的事情

    1. 释放当前获取锁的线程持有的资源
    2. 唤醒有效的一个后继节点

    与release功能相关的代码

    • release方法:释放排他锁
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                // 头节点不能是一个中间态
                if (h != null && h.waitStatus != 0)
                    // 唤醒后继节点
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    • release方法:释放共享锁
    public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                // 释放共享锁,从头节点开始一个一个的释放
                // 如果存在多个共享节点在同步队列时,doReleaseShared方式其实是递归调用
                doReleaseShared();
                return true;
            }
            return false;
        }
    

    至此,将所有获取锁和释放锁的方法相关的源码全部分析完

    条件队列的实现

    我们来看一下条件队列的链表结构
    条件队列的链表结构

    实现await功能需要做的事情

    1. 创建一个CONDITION类型的节点,将该节点添加到条件队列
    2. 释放已经获取的锁(因为只有当前线程先获取了锁才可能再调用Condition.await()方法)
    3. 如果无法获取锁,线程挂起

    与await功能相关的代码

    • await方法:等待条件
    public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException(); // 如果线程中断,直接抛出异常
                // 创建一个CONDITION类型的节点,将该节点添加到条件队列尾部
                Node node = addConditionWaiter();
                // 释放锁
                // 在调用await方法之前都会调用lock方法,这个时候已经获取锁了
                // 有时候锁还是可重入的,所以需要将所有的资源都释放掉
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                // 如果节点不再同步队列,全部都要挂起
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    // 如果在等待期间发生过中断(不管是调用signal之前还是之后),直接退出
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                // 让线程尝试去获取锁,如果无法获取锁就挂起
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                // 清除所有在条件队列中是取消状态的节点
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                // 发生中断,上报中断情况
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    
    1. addConditionWaiter:在条件队列中添加一个节点
     private Node addConditionWaiter() {
                Node t = lastWaiter;
                // 清除条件队列中无效的节点
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                // 创建一个节点
                Node node = new Node(Node.CONDITION);
                // 添加到条件队列尾部
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    
    1. unlinkCancelledWaiters:清除在条件队列中被取消的节点
    private void unlinkCancelledWaiters() {
                Node t = firstWaiter;
                Node trail = null;
                // 遍历条件队列将所有不是CONDITION状态的节点全部清除掉
                // 这些节点都是取消状态的节点
                while (t != null) {
                    Node next = t.nextWaiter;
                    if (t.waitStatus != Node.CONDITION) {
                        t.nextWaiter = null;
                        if (trail == null)
                            firstWaiter = next;
                        else
                            trail.nextWaiter = next;
                        if (next == null)
                            lastWaiter = trail;
                    }
                    else
                        trail = t;
                    t = next;
                }
            }
    
    1. fullyRelease:释放线程持有的所有锁资源
    final int fullyRelease(Node node) {
            try {
                int savedState = getState();
                // 释放所有的资源
                // 如果是可重入锁,savedState就是重入的次数
                if (release(savedState))
                    return savedState;
                throw new IllegalMonitorStateException();
            } catch (Throwable t) {
                // 发生异常就取消该节点
                node.waitStatus = Node.CANCELLED;
                throw t;
            }
        }
    
    1. isOnSyncQueue:判断节点是否在同步队列
    final boolean isOnSyncQueue(Node node) {
            // waitStatus是CONDITION或者node没有前驱节点,说明node不在同步队列
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            if (node.next != null) // 有后继节点一定在同步队列
                return true;
            /*
             * 在同步队列中查找node,看是否在同步队列中
             */
            return findNodeFromTail(node);
        }
    
    1. findNodeFromTail:在同步队列中查找节点
    private boolean findNodeFromTail(Node node) {
            // 从尾节点开始查找
            for (Node p = tail;;) {
                if (p == node) // 找到了
                    return true;
                if (p == null) // 找到头了还没找到
                    return false;
                p = p.prev;
            }
        }
    
    1. checkInterruptWhileWaiting:检测中断的情况
    private int checkInterruptWhileWaiting(Node node) {
                // 没有发生中断返回0
                // 调用signal之前发生中断返回THROW_IE
                // 调用signal之后发生中断返回REINTERRUPT
                return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
            }
    
    1. transferAfterCancelledWait:清除在条件队列中被取消的节点
    // 只有线程处于中断状态,才会调用此方法
    // 如果需要的话,将这个已经取消等待的节点转移到阻塞队列
    // 返回 true,如果此线程在 signal 之前被取消,否则返回false
    final boolean transferAfterCancelledWait(Node node) {
      
            // 用 CAS 将节点状态设置为 0 
            // 如果这步 CAS 成功,说明是 signal 方法之前发生的中断,
           // 因为如果 signal 先发生的话,signal 中会将 waitStatus 设置为 0
            if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
                enq(node); // 将节点放入阻塞队列
                return true;
            }
            // 到这里是因为 CAS 失败,肯定是因为 signal 方法已经将 waitStatus 设置为了 0
            // signal 方法会将节点转移到阻塞队列,但是可能还没完成,这边自旋等待其完成
            // 当然,这种事情还是比较少的吧:signal 调用之后,没完成转移之前,发生了中断
            while (!isOnSyncQueue(node))
                Thread.yield();
            return false;
        }
    
    1. enq:把节点添加到同步队列
    private Node enq(Node node) {
            // 无限循环,将节点添加到同步队列尾部
            for (;;) {
                Node oldTail = tail;
                if (oldTail != null) {
                    node.setPrevRelaxed(oldTail);
                    if (compareAndSetTail(oldTail, node)) {
                        oldTail.next = node;
                        return oldTail;
                    }
                } else {
                    // 如果同步队列为空,初始化
                    initializeSyncQueue();
                }
            }
        }
    
    1. reportInterruptAfterWait:中断处理
    private void reportInterruptAfterWait(int interruptMode)
                throws InterruptedException {
                // 如果是THROW_IE状态,抛异常
                if (interruptMode == THROW_IE)
                    throw new InterruptedException();
                else if (interruptMode == REINTERRUPT) // 再次中断,因为中断状态被使用过一次
                    selfInterrupt();
            }
    

    awaitNanosawaitUntilawait(long time, TimeUnit unit)这几个方法的整体逻辑是一样的,就不再分析了

    实现signal功能需要做的事情

    1. 将条件队列中的节点加入同步队列
    2. 唤醒线程

    与signal功能相关的代码

    • signal方法:唤醒等待条件的节点
     public final void signal() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                // 获取条件队列中的第一个节点
                Node first = firstWaiter;
                if (first != null)
                    // 唤醒等待条件的节点
                    doSignal(first); 
            }
    
    1. doSignal:唤醒等待条件的节点
    private void doSignal(Node first) {
                do {
                    // 去掉无效的节点
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&  // 将节点转移到同步队列
                         (first = firstWaiter) != null);
            }
    
    1. transferForSignal:将节点转移到同步队列
    final boolean transferForSignal(Node node) {
            /*
             * 取消的节点不需要转移
             */
            if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
                return false;
    
            /*
             * 将节点加入同步队列尾部
             */
            Node p = enq(node);
            int ws = p.waitStatus;
            // ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程
            // 如果 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用
            // 节点入队后,需要把前驱节点的状态设为SIGNAL
            if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
                // 如果前驱节点取消或者 CAS 失败,会进到这里唤醒线程
                LockSupport.unpark(node.thread);
            return true;
        }
    
    • signalAlll方法:唤醒所有等待条件的节点
    public final void signalAll() {
                // 如果是当前线程
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    // 唤醒所有等待条件的节点
                    doSignalAll(first);
            }
    
    1. doSignalAll:唤醒所有等待条件的节点
    // 将所有的节点都转移到同步队列
    private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                    first = next;
                } while (first != null);
            }
    

    现在将与AQS相关的核心代码都整理了一遍,里面如果有描述不清晰或者不准确的地方希望大家可以帮忙指出!

  • 相关阅读:
    linux下apache(httpd)安装部署
    基于redis分布式缓存实现(新浪微博案例)
    python--字典
    django(四)
    django(三)
    django(二)
    django(一)
    053(八十)
    053(七十九)
    053(七十八)
  • 原文地址:https://www.cnblogs.com/pinxiong/p/13288201.html
Copyright © 2011-2022 走看看