zoukankan      html  css  js  c++  java
  • AbstractQueuedSynchronizer(AQS)源码分析详解

    AbstractQueuedSynchronizer 是一个抽象的同步等待队列。juc下面大多数锁都是使用到了AQS。除了获取资源和释放资源的方法外,AQS基本上将其他同步等待队列的逻辑都实现了,比如线程进入队列同步等待、响应中断、唤醒线程等。如果我们想实现一个简单的同步等待队列,那只需要实现AQS的获取和释放资源的方法就行了(实际上还有一个用来判断是否是当前线程持有资源的方法需要实现)。

    1. AQS类的基本结构

    首先实现了AbstractOwnableSynchronizer类,里面维护了一个FIFO双向队列,也就是我们的等待队列,记录了队列的头节点和尾节点;还维护了一个状态信息state,对于不同的锁来说state有着不同的意义,但是可以肯定的是state跟线程释放进入等待队列是由密切关系的。

    在多线程情况下,对双向队列的头节点和尾节点以及state的修改都是使用的cas进行的修改(不存在多线程的话,就直接赋值了,如果独占锁释放锁资源,因为之会有一个线程持有锁资源)。

    1.1 AbstractOwnableSynchronizer类结构

    核心就是设置和获取exclusiveOwnerThreadexclusiveOwnerThread表示独占模式下持有锁的线程。

    public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
        /** Use serial ID even though all fields transient. */
        private static final long serialVersionUID = 3737899427754241961L;
        protected AbstractOwnableSynchronizer() { }
        /**
         * The current owner of exclusive mode synchronization.
         */
        private transient Thread exclusiveOwnerThread;
        /**
         * Sets the thread that currently owns exclusive access.
         * A {@code null} argument indicates that no thread owns access.
         * This method does not otherwise impose any synchronization or
         * {@code volatile} field accesses.
         * @param thread the owner thread
         */
        protected final void setExclusiveOwnerThread(Thread thread) {
            exclusiveOwnerThread = thread;
        }
        /**
         * Returns the thread last set by {@code setExclusiveOwnerThread},
         * or {@code null} if never set.  This method does not otherwise
         * impose any synchronization or {@code volatile} field accesses.
         * @return the owner thread
         */
        protected final Thread getExclusiveOwnerThread() {
            return exclusiveOwnerThread;
        }
    }
    

    1.2 等待队列

    同步等待队列中是个双向队列,在AQS中记录了头节点和尾节点(都用了volatile修饰,确保了headtail的可见性),一般来说头节点是一个空节点(这点后面会说到)。每个节点里面记录了当前节点的等待模式(独占模式或者共享模式),同时记录了当前节点的等待状态,prevnext记录了当前节点的上一个节点和下一个节点,thread表示节点对应的线程。我们可以将状态粗略的分为两类,<=0表示有效等待节点,>0表示无效节点。

    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;
    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;
    
    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;
        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;
        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;
        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
        volatile Node prev;
        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev's from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         */
        volatile Node next;
        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;
        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;
        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
        Node() {    // Used to establish initial head or SHARED marker
        }
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
    

    上面两个有参构造方法,一个是被addWiter调用,一个是被Condition(也就是条件队列)调用,通过两个方法我们可以看到,addWaiter创建的node,waitStatus=0(初始状态);通过Condition创建的node,waitStatus=-2(CONDITION条件队列中等待)状态(addConditionWaiter方法中可以看到)。需要注意的是一般共享锁是不支持Condition的,所以通过第二个有参构造方法创建的节点是处于独占模式的,这样我们就可以直接根据nextWaiter就能判断出节点的等待模式,也就是上面对应的isShared方法。

    1.3 同步状态state

    在不同的锁实现中,有着不同的意义,比如在ReentractLock中,state表示当前线程获取锁的重入次数。在ReentractReadWriteLock中,state的高16位表示持有读锁的线程数,低16位表示写锁的重入次数。但是无论state怎么存数据,他都与锁的获取与释放有着密切的关系。

    /**
     * The synchronization state.
     */
    private volatile int state;
    

    1.4 几个核心方法

    tryAcquireacquire为例,tryAcquire只是进行锁资源的获取,获取成功就返回true,失败返回false,而acquire会根据tryAcquire的返回结果来判断,是否将当前线程阻塞放入等待队列中。看名字我们就可以很快分辨出方法之间的对应关系。在比如tryReleaseShared是尝试释放锁资源,如果返回true,就会去尝试唤醒后继处于等待状态的节点。

    以try开头的这些方法是我们在实现自定义同步等待队列的时候,需要重点关注的方法,我们需要重写方法,实现具体的逻辑。当然,如果你只是自定义一个独占锁,那你只需要实现tryAcquiretryReleaseisHeldExclusively就行,共享锁相关的,你可以不用管。如果你想实现例如读写锁,那就需要将共享模式对应的方法也要重写,进行实现。

    // 独占模式获取锁资源
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    // 独占模式释放锁资源
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    // 共享模式获取锁资源
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    // 共享模式释放锁资源
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    // 独占模式下,当前线程释放是持有锁的线程
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    

    2. 独占锁

    独占锁指的是只能有一个线程持有锁资源,如果其他线程想要持有锁,需要等待持有锁的线程释放资源之后,否则会进入等待队列等待(没有获取到锁资源),直到等到有线程释放了锁资源,并且对该等待线程进行了唤醒,此时线程就会去再次尝试获取锁资源。

    2.1 acquire

    首先判断获取锁资源释放成功,如果成功,后面的就不能执行了,如果失败,就需要进入等待队列,等待其他线程唤醒自己,唤醒后会更新中断状态,再次去尝试获取锁资源,如果获取锁成功,就根据中断状态处理中断(返回true就会执行selfInterrupt)。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    2.1.1 创建等待节点和初始化队列

    这里直接指定了当前线程和等待模式,创建了一个新的节点(上面我们在说等待队列的时候已经说过这个构造方法了),然后判断队列是否需要初始化。如果不需要初始化,就会将刚创建的节点设置为尾节点。这里其实进行了一下快速尝试将新创建的节点放到队列的尾部,他跟enq中的代码是重复。目的就是为了在外面进行一次快速尝试,如果成功了,就不进入enq了,因为他里面有个for循环。

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    

    enq方法干了两件事

    1. 如果发现队列没有初始化,就创建一个空节点,进行队列初始化,此时队列的头节点和尾节点指向了同一个空节点。
    2. 初始化队列之后,循环并没有退出,而是来设置新创建的节点为尾节点,设置成功后推出循环。此时队列的头节点就是一个空节点,队列的尾节点就是我们刚创建的的新节点。重点看一下else中的代码,他分了三步才成功将tail更新完成,并且我们知道该方法中的代码会被多线程执行,而且这里存在共享变量tail。下面我们简单分析两个线程执行else代码块。

    首先是一个完整的流程,如果1,2,3正常执行,没有被其他线程干扰。这里我们需要特别注意的是代码中nodet都是独立存在于每个线程的栈中的

                  tail  |                           tail(node)                          tail(node)
     __            __   |  __            __            __   |   __            __            __
    |  | <--prev- |  |  | |  | <--prev- |  | <--prev- |  |  |  |  | <--prev- |  | <--prev- |  |
    |  | -next--> |  |  | |  | -next--> |  |          |  |  |  |  | -next--> |  | -next--> |  |
     ——            ——   |  ——            ——            ——   |   ——            ——            —— 
         第1步                           第2步                               第3步
    

    然后这里是两个线程执行的情况。线程线程A执行了enq方法,刚执行完第2步,线程A挂起了(还没执行第3步),线程线程B执行,他获取到了tail存放到自己的栈中的局部变量中,也执行完了第2步,此时就会出现图4的这种情况,由于没有都没有执行第3步,导致next没有连上(后面讲到为什么倒序遍历的时候会用到,倒叙遍历用的是prev)。

                  tail                              tail(A-node)                          tail                                               tail(B-node)
     __            __   |  __            __            __   |   __            __            __    |  __            __            __            __     |  
    |  | <--prev- |  |  | |  | <--prev- |  | <--prev- |  |  |  |  | <--prev- |  | <--prev- |  |   | |  | <--prev- |  | <--prev- |  | <--prev- |  |    | 
    |  | -next--> |  |  | |  | -next--> |  |          |  |  |  |  | -next--> |  |          |  |   | |  | -next--> |  |          |  |          |  |    |
     ——            ——   |  ——            ——            ——   |   ——            ——            ——    |  ——            ——            ——            ——     |
         第1步                           第2步                               第3步                                    图4(线程B执行了第1步和第2步)
    
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t; // 1
                if (compareAndSetTail(t, node)) { // 2
                    t.next = node; // 3
                    return t;
                }
            }
        }
    }
    

    2.1.2 阻塞线程

    进入到该方法后,如果当前节点的前驱节点是头节点,会先尝试获取一下锁资源tryAcquire,所以一般来说,一个线程在调用acquire后,会进行两次锁资源获取,如果获取锁资源成功了,就将当前节点设置为头节点,最后返回中断状态(按照我们说的这个流程的话,interrupted的状态就是声明时候的false),会在上面我们说的selfInterrupt方法中处理中断(这里是在线程获取到锁资源之后处理中断状态)。

    如果尝试获取锁资源失败,就会执行shouldParkAfterFailedAcquire,来判断(当前节点的前驱节点必须是signal状态才进入阻塞状态)释放需要将当前线程挂起,如果返回true,就会执行后面的parkAndCheckInterrupt将当前线程阻塞。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    就做了一件事,就是让自己(当前节点)在一个signal状态的节点的屁股后面排着(确保自己的前驱节点释放资源后会唤醒自己),然后进入阻塞状态。如果当前节点的前驱节点是一个有效状态,就直接修改为signal状态,如果不是,说明前驱节点被取消了,就需要往前找,找到一个有效状态的节点,跟在后面。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) // 必须是signal状态,才能被阻塞,否则就会再次去尝试获取一下锁资源
            return true;
        if (ws > 0) { // 只有CANCELLED=1 这一个状态,如果当前节点的前驱节点被取消了,就需要往前找一个状态为有效状态的waitStatus<=0节点,跟在他后面。
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else { // 如果是有效状态,但是不是signal,就cas修改为signal
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    使用LockSupportpark方法将当前线程阻塞挂起。

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted(); // 被唤醒后的第一件事是返回中断状态,并重置中断状态
    }
    

    2.1.3 从等待队列中移除

    根据acquireQueued中的代码,我们可以知道,只有当当前节点未获取到锁资源,并且抛出了异常,failed就是初始值true,就会执行下面的cancelAcquire方法,该方法主要的目的是将当前节点从队列中移除,会同时移除掉当前节点前后waitStatus=canceled的节点,如果发现当前节点是head节点会去唤醒后继节点。

    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;
        node.thread = null; // 将当前节点thread置为null
        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev; // 从当前节点往前找,找到一个有效等待节点 waitStatus <=0
        Node predNext = pred.next;
        node.waitStatus = Node.CANCELLED; // 将当前节点的状态设置为cancelled(1)
        if (node == tail && compareAndSetTail(node, pred)) { // 如果他是tail节点,直接使用我们找到的pred节点替换他
            compareAndSetNext(pred, predNext, null); // 因为是tail节点,所以节点后面就不会有下一个节点了,cas将next设置为null,这里next=null,prev不变
        } else {
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) { // 如果找到的pred不是头节点,就将pred的waitStatus设置为signal
                Node next = node.next; // 当当前节点的next是一个有效等待节点,就cas设置给pred的next
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node); // 如果当前节点时head,就去唤醒他的后继节点。
            }
            node.next = node; // help GC
        }
    }
    

    2.1.4 中断处理

    将中断操作补上。selfInterrupt执行,是因为acquireQueued返回true,根据上面的代码我们可以看到acquireQueued返回的就是中断状态,并且只有获取到锁资源的线程(这里说的是正常请求,抛异常也是可以返回的),才能从acquireQueued中返回。如果是被中断了,但是没有获取到锁资源,那么就会将中断状态记录下来,等拿到锁资源后返回。所以这里的中断操作我们才说是补的。因为存在线程被中断了,但是一直没有拿到锁,导致不能马上响应中断操作。

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
    

    2.2 release

    release是释放锁资源,如果tryRelease释放锁资源成功(返回true),就会判断头节点是否有后继节点需要唤醒。根据前面分析的代码,我们可以知道,当节点初始化时waitStatus是0,当有新的节点挂在刚初始化的节点的后面时,waitStatus会被修改为signal(-1)状态,所以这里判断 h.waitStatus != 0的意思是说头节点后面还有后继节点。此时就会去唤醒后继节点。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    2.2.1 唤醒后继节点

    这里传入的node是head节点,首先是将head节点的waitStatus修改为0,并且拿到head的后继节点,去唤醒。如果候机节点不是一个有效等待节点,就从队列的尾部开始从后往前找,找到一个有效等待节点去唤醒。

    1. 目前看来将waitStatus=0会增加一次后继节点尝试获取锁资源的机会(将要唤醒的节点尝试获取锁的机会),可以去看一下acquireQueued的代码,唤醒后会尝试获取一次锁,如果获取锁失败,会执行shouldParkAfterFailedAcquire方法,因为waitStatus=0,如果第一次会将waitStatuscas为signal,然后返回false,此时就会再去尝试获取一下锁,如果依旧获取失败,就会执行被阻塞(因为shouldParkAfterFailedAcquire方法判断到waitStatus=signle,直接返回true)。所以waitStatus设置为0,会增加一次获取锁的机会。
    2. 我们知道,尾节点的next是null,这里我们获取到的s==null其实并不表示s就一定是tail节点(前面enq方法已经分析过了)。因为还存在新增加一个节点next还没来得及设置。也正是因为这原因,所以这里在找一个有效等待节点时,选择了从后往前找,因为prev的更新是可靠的,下面补一点前面enq中的代码,我们可以看到在添加一个节点时,第一步是将新增加的节点的prev指向当前tail节点,然后进行第二步将新增加的节点设置tail节点,最后是第三步将旧的tail节点的next只想新增加的节点。首先者三个操作肯定不是原子性操作。当第二部操作成功了,说明第一步操作也肯定成功了,此时就会出现一种情况旧的tail节点的next还没指向新的tail节点,就会出现unparkSuccessors=null(s不是tail节点)的请求,如果是从前往后遍历,因为next是不可靠的,会出现s=null但是s并不是tail节点的情况(没有遍历完所有节点)。所以使用从后往前遍历,因为prev的更新,在第一步操作的时候就完成了,新增节点能成为tail的前提是prev已经更新了。
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t; //   1
                if (compareAndSetTail(t, node)) { // 2
                    // 可能线程会被挂起,来不及执行第三步
                    t.next = node; // 3
                    return t;
                }
            }
        }
    }
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

    2.3 其他

    2.3.1 acquireInterruptibly

    acquire相比,多了很多中断处理的判断,会对中断进行及时处理。不会像acquire那样,需要等到获取到锁资源才去处理中断。这里主要说与acquire相比的一些不同点。

    在进入方法后就会调用Thread.interrupted()获取当前中断状态,并重置中断状态,如果被中断了就抛出异常。然后是获取锁资源。可以看到这里换成了doAcquireInterruptibly处理排队。里面与acquireQueued很类似,核心逻辑没变。

    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
    

    这里将addWaiter方法放了进来,在parkAndCheckInterrupt之后,如果发现中断了,就直接抛异常,此时会执行cancelAcquire方法将当前节点从队列中移除。最后将异常抛出。

    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException(); // 这里直接抛出异常,而不是仅仅对中断状态进行记录
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    2.3.2 tryAcquireNanos

    不经会对中断进行及时处理,还设置了一个等待超时时间,核心是使用了LockSupport.parkNanos方法,当超过执行时间会返回。

    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }
    

    首先会计算出绝对时间,如果获取到了锁资源就返回true,如果超时了就返回false。

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold) // 时间小于等于给定值,就直接不park了
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    3. 共享锁

    对于共享锁来说,锁资源与具体线程是不相关的,如果一个线程获取了锁资源,另一个线程再来获取锁资源时,如果锁资源还能满足当前线程需求,那么依旧可以获取锁资源。

    3.1 acquireShared

    首先是通过tryAcquireShared尝试获取锁资源,并根据返回值判断,是否获取锁资源成功,<0表示获取锁资源失败,=0说明获取到了锁资源,但是没有多余的资源了,>0说明获取到了锁资源,并且还有多余的资源。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    

    3.1.1 doAcquireShared

    与独占锁的实现很类似,首先调用addWaiter创建节点并添加到队列中,然后获取当前节点的前驱节点,如果前驱节点是头节点,就尝试获取锁资源,根据返回值判断是否获取到锁资源,如果获取到了锁资源,这里首先会执行setHeadAndPropagate方法,该方法主要是更行head,然后根据条件看是否需要区唤醒后继节点,这里的中断与acquire中基本一致,获取到锁资源后,判断记录的中断状态,然后响应中断。后面的代码和acquire中的基本一致,就不多说了。

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    3.1.2 setHeadAndPropagate

    这里的setHeadacquireQueued中的一样,调的是同一个方法,重点是后面的判断,propagate表示的是剩余的锁资源,根据if中的代码我们可以知道,只有当存在剩余锁资源或者头节点为null(一般来说head不会为null),或者头节点waiteStatus<0也就是需要被唤醒,就获取当前节点的后继节点判断,后继节点是否是null或者是共享节点(也就是判断是否是一个非独占模式的节点),就会执行doReleaseShared进行节点唤醒。

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
    

    3.1.3 doReleaseShared

    将头节点的是signal,说明需要唤醒后继节点,就从signal修改为0,如果修改成功,就调用unparkSuccessor唤醒后继节点。如果头节点的状态是0,就从0改为PROPAGATE,不做其他操作。

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    

    3.2 releaseShared

    共享锁释放资源,首先使用tryReleaseShared进行锁资源释放,根据返回结果判断是否执行doReleaseShared区唤醒后继节点。

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    

    3.3 其他

    3.3.1 acquireSharedInterruptibly

    共享模式下获取锁响应中断。跟acquireInterruptibly处理中断基本一致,看一眼下面代码就知道了。

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    3.3.2 tryAcquireSharedNanos

    共享模式下的获取锁并设置等待超时时间。

    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }
    private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    4. 条件等待队列

    等待队列(Condition queue)相比前面我们说的同步等待队列(sync queue),两个队列是相互对立的,Condition queue是一个单向列表,通过nextWaiter连接。ConditionObject提供了awaitsignal两种方法,用来将节点进行阻塞放入到同步队列(Condition queue)中,或者将同步队列(Condition queue)中的阻塞节点进行唤醒。

    4.1 await

    addConditionWaiter我们在上面提到过,这就是调用了Node第二个有参构造方法的地方。addConditionWaiter创建一个新的节点,放入到条件等待队列中,并返回创建的节点,然后使用fullyRelease释放锁资源,然后根据节点释放在等待队列(sync queue)中,判断是否直接park。如果不在sync queue中,就直接park,如果在,就使用acquireQueued尝试获取锁资源的方式进行park。后面就是取消无效等待节点和响应中断了。

    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    

    4.1.2 addConditionWaiter

    当前方法是线程安全的,因为只有获取到独占锁的线程才能调用await方法,这里首先是将无效等待节点移除,然后将创建的新节点加入到队列的尾部。

    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        Node node = new Node(Thread.currentThread(), Node.CONDITION); // 指定waitStatus
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
    

    4.1.3 fullyRelease

    进行锁资源的释放,在线程进入阻塞状态前,先将获取到的锁资源释放,这与release操作基本一样

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
    

    4.1.4 isOnSyncQueue

    判断当前节点释放在等待队列(sync queue)中,如果waitStatuscondition说明在条件等待队列中;如果当前节点的pre是null,也说明在条件队列中,我们知道加入到等待队列(sync queue)的第1步操作就是设置prev;如果next有值说明在等待队列(sync queue)中,当目前为止,我们可以看到条件队列中的node并没有设置过next;最后就是findNodeFromTail,从队列的尾部往前找,然后对比。

    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }
    

    4.1.5 reportInterruptAfterWait

    响应中断状态。

    private void reportInterruptAfterWait(int interruptMode)
        throws InterruptedException {
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            selfInterrupt();
    }
    

    4.2 signal

    唤醒条件队列中第一个等待的节点。

    public final void signal() {
        if (!isHeldExclusively()) // 如果当前线程不是持有锁的线程,直接抛出异常
            throw new IllegalMonitorStateException();
        Node first = firstWaiter; // 拿到当前条件队列头节点
        if (first != null)
            doSignal(first); // 唤醒
    }
    

    4.2.1

    从第一个等待节点开始往后找,核心是transferForSignal方法,如果成功,就退出循环。

    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    

    4.2.2 transferForSignal

    将当前节点放到等待队列(sync queue)的尾部,首先回将节点的状态从condition改为初始状态0,enq方法前面我们说过了,返回的是当前节点的前驱节点。这里看看if中的判断,当前驱节点ws>0说明前驱节点是一个无效等待节点,直接唤醒当前线程,后面的cas是将前驱节点修改为signal状态,如果修改失败,也会去唤醒当前线程。经过这些操作后,会将在条件队列中等待的节点,放到(条件队列中依旧有该节点)等待队列(sync queue)的尾部等待唤醒。

    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread); // 唤醒后,会执行acquireQueued方法中的shouldParkAfterFailedAcquire,去找到一个有效等待节点后面去排队。
        return true;
    }
    

    5. 总结

    AQS提供了线程同步的基本实现,提供了两种模式,独占模式和共享模式。使用链表的方式维护了一个双向队列,记录存放着一个一个等待的线程。通过对被volatile修改的state操作,进行锁资源的获取与释放,根据操作结果对队列中节点进行阻塞和唤醒。我们需要仔细分区在什么情况是存在多线程并发,在什么情况下同一时刻只会有一个线程执行(比如独占锁释放锁资源),从而确定我们的操作是否需要确保线程安全(cas操作)。其中独占和共享两种模式其实编码上很相似,只不过共享模式在每次获取完资源后,会判断是否是否有剩余资源,从而选择是否区唤醒后继节点。

    GitHub:https://github.com/godfunc
    博客园:http://www.cnblogs.com/godfunc
    Copyright ©2019 Godfunc
  • 相关阅读:
    Struts2SpringHibernate整合示例,一个HelloWorld版的在线书店(项目源码+详尽注释+单元测试)
    Java实现蓝桥杯勇者斗恶龙
    Java实现 LeetCode 226 翻转二叉树
    Java实现 LeetCode 226 翻转二叉树
    Java实现 LeetCode 226 翻转二叉树
    Java实现 LeetCode 225 用队列实现栈
    Java实现 LeetCode 225 用队列实现栈
    Java实现 LeetCode 225 用队列实现栈
    Java实现 LeetCode 224 基本计算器
    Java实现 LeetCode 224 基本计算器
  • 原文地址:https://www.cnblogs.com/Godfunc/p/15206358.html
Copyright © 2011-2022 走看看