zoukankan      html  css  js  c++  java
  • AbstractQueuedSynchronizer(AQS)源码分析详解

    AbstractQueuedSynchronizer 是一个抽象的同步等待队列。juc下面大多数锁都是使用到了AQS。除了获取资源和释放资源的方法外,AQS基本上将其他同步等待队列的逻辑都实现了,比如线程进入队列同步等待、响应中断、唤醒线程等。如果我们想实现一个简单的同步等待队列,那只需要实现AQS的获取和释放资源的方法就行了(实际上还有一个用来判断是否是当前线程持有资源的方法需要实现)。

    1. AQS类的基本结构

    首先实现了AbstractOwnableSynchronizer类,里面维护了一个FIFO双向队列,也就是我们的等待队列,记录了队列的头节点和尾节点;还维护了一个状态信息state,对于不同的锁来说state有着不同的意义,但是可以肯定的是state跟线程释放进入等待队列是由密切关系的。

    在多线程情况下,对双向队列的头节点和尾节点以及state的修改都是使用的cas进行的修改(不存在多线程的话,就直接赋值了,如果独占锁释放锁资源,因为之会有一个线程持有锁资源)。

    1.1 AbstractOwnableSynchronizer类结构

    核心就是设置和获取exclusiveOwnerThreadexclusiveOwnerThread表示独占模式下持有锁的线程。

    public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
        /** Use serial ID even though all fields transient. */
        private static final long serialVersionUID = 3737899427754241961L;
        protected AbstractOwnableSynchronizer() { }
        /**
         * The current owner of exclusive mode synchronization.
         */
        private transient Thread exclusiveOwnerThread;
        /**
         * Sets the thread that currently owns exclusive access.
         * A {@code null} argument indicates that no thread owns access.
         * This method does not otherwise impose any synchronization or
         * {@code volatile} field accesses.
         * @param thread the owner thread
         */
        protected final void setExclusiveOwnerThread(Thread thread) {
            exclusiveOwnerThread = thread;
        }
        /**
         * Returns the thread last set by {@code setExclusiveOwnerThread},
         * or {@code null} if never set.  This method does not otherwise
         * impose any synchronization or {@code volatile} field accesses.
         * @return the owner thread
         */
        protected final Thread getExclusiveOwnerThread() {
            return exclusiveOwnerThread;
        }
    }
    

    1.2 等待队列

    同步等待队列中是个双向队列,在AQS中记录了头节点和尾节点(都用了volatile修饰,确保了headtail的可见性),一般来说头节点是一个空节点(这点后面会说到)。每个节点里面记录了当前节点的等待模式(独占模式或者共享模式),同时记录了当前节点的等待状态,prevnext记录了当前节点的上一个节点和下一个节点,thread表示节点对应的线程。我们可以将状态粗略的分为两类,<=0表示有效等待节点,>0表示无效节点。

    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;
    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;
    
    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;
        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;
        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;
        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
        volatile Node prev;
        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev's from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         */
        volatile Node next;
        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;
        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;
        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
        Node() {    // Used to establish initial head or SHARED marker
        }
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
    

    上面两个有参构造方法,一个是被addWiter调用,一个是被Condition(也就是条件队列)调用,通过两个方法我们可以看到,addWaiter创建的node,waitStatus=0(初始状态);通过Condition创建的node,waitStatus=-2(CONDITION条件队列中等待)状态(addConditionWaiter方法中可以看到)。需要注意的是一般共享锁是不支持Condition的,所以通过第二个有参构造方法创建的节点是处于独占模式的,这样我们就可以直接根据nextWaiter就能判断出节点的等待模式,也就是上面对应的isShared方法。

    1.3 同步状态state

    在不同的锁实现中,有着不同的意义,比如在ReentractLock中,state表示当前线程获取锁的重入次数。在ReentractReadWriteLock中,state的高16位表示持有读锁的线程数,低16位表示写锁的重入次数。但是无论state怎么存数据,他都与锁的获取与释放有着密切的关系。

    /**
     * The synchronization state.
     */
    private volatile int state;
    

    1.4 几个核心方法

    tryAcquireacquire为例,tryAcquire只是进行锁资源的获取,获取成功就返回true,失败返回false,而acquire会根据tryAcquire的返回结果来判断,是否将当前线程阻塞放入等待队列中。看名字我们就可以很快分辨出方法之间的对应关系。在比如tryReleaseShared是尝试释放锁资源,如果返回true,就会去尝试唤醒后继处于等待状态的节点。

    以try开头的这些方法是我们在实现自定义同步等待队列的时候,需要重点关注的方法,我们需要重写方法,实现具体的逻辑。当然,如果你只是自定义一个独占锁,那你只需要实现tryAcquiretryReleaseisHeldExclusively就行,共享锁相关的,你可以不用管。如果你想实现例如读写锁,那就需要将共享模式对应的方法也要重写,进行实现。

    // 独占模式获取锁资源
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    // 独占模式释放锁资源
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    // 共享模式获取锁资源
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    // 共享模式释放锁资源
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    // 独占模式下,当前线程释放是持有锁的线程
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    

    2. 独占锁

    独占锁指的是只能有一个线程持有锁资源,如果其他线程想要持有锁,需要等待持有锁的线程释放资源之后,否则会进入等待队列等待(没有获取到锁资源),直到等到有线程释放了锁资源,并且对该等待线程进行了唤醒,此时线程就会去再次尝试获取锁资源。

    2.1 acquire

    首先判断获取锁资源释放成功,如果成功,后面的就不能执行了,如果失败,就需要进入等待队列,等待其他线程唤醒自己,唤醒后会更新中断状态,再次去尝试获取锁资源,如果获取锁成功,就根据中断状态处理中断(返回true就会执行selfInterrupt)。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    2.1.1 创建等待节点和初始化队列

    这里直接指定了当前线程和等待模式,创建了一个新的节点(上面我们在说等待队列的时候已经说过这个构造方法了),然后判断队列是否需要初始化。如果不需要初始化,就会将刚创建的节点设置为尾节点。这里其实进行了一下快速尝试将新创建的节点放到队列的尾部,他跟enq中的代码是重复。目的就是为了在外面进行一次快速尝试,如果成功了,就不进入enq了,因为他里面有个for循环。

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    

    enq方法干了两件事

    1. 如果发现队列没有初始化,就创建一个空节点,进行队列初始化,此时队列的头节点和尾节点指向了同一个空节点。
    2. 初始化队列之后,循环并没有退出,而是来设置新创建的节点为尾节点,设置成功后推出循环。此时队列的头节点就是一个空节点,队列的尾节点就是我们刚创建的的新节点。重点看一下else中的代码,他分了三步才成功将tail更新完成,并且我们知道该方法中的代码会被多线程执行,而且这里存在共享变量tail。下面我们简单分析两个线程执行else代码块。

    首先是一个完整的流程,如果1,2,3正常执行,没有被其他线程干扰。这里我们需要特别注意的是代码中nodet都是独立存在于每个线程的栈中的

                  tail  |                           tail(node)                          tail(node)
     __            __   |  __            __            __   |   __            __            __
    |  | <--prev- |  |  | |  | <--prev- |  | <--prev- |  |  |  |  | <--prev- |  | <--prev- |  |
    |  | -next--> |  |  | |  | -next--> |  |          |  |  |  |  | -next--> |  | -next--> |  |
     ——            ——   |  ——            ——            ——   |   ——            ——            —— 
         第1步                           第2步                               第3步
    

    然后这里是两个线程执行的情况。线程线程A执行了enq方法,刚执行完第2步,线程A挂起了(还没执行第3步),线程线程B执行,他获取到了tail存放到自己的栈中的局部变量中,也执行完了第2步,此时就会出现图4的这种情况,由于没有都没有执行第3步,导致next没有连上(后面讲到为什么倒序遍历的时候会用到,倒叙遍历用的是prev)。

                  tail                              tail(A-node)                          tail                                               tail(B-node)
     __            __   |  __            __            __   |   __            __            __    |  __            __            __            __     |  
    |  | <--prev- |  |  | |  | <--prev- |  | <--prev- |  |  |  |  | <--prev- |  | <--prev- |  |   | |  | <--prev- |  | <--prev- |  | <--prev- |  |    | 
    |  | -next--> |  |  | |  | -next--> |  |          |  |  |  |  | -next--> |  |          |  |   | |  | -next--> |  |          |  |          |  |    |
     ——            ——   |  ——            ——            ——   |   ——            ——            ——    |  ——            ——            ——            ——     |
         第1步                           第2步                               第3步                                    图4(线程B执行了第1步和第2步)
    
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t; // 1
                if (compareAndSetTail(t, node)) { // 2
                    t.next = node; // 3
                    return t;
                }
            }
        }
    }
    

    2.1.2 阻塞线程

    进入到该方法后,如果当前节点的前驱节点是头节点,会先尝试获取一下锁资源tryAcquire,所以一般来说,一个线程在调用acquire后,会进行两次锁资源获取,如果获取锁资源成功了,就将当前节点设置为头节点,最后返回中断状态(按照我们说的这个流程的话,interrupted的状态就是声明时候的false),会在上面我们说的selfInterrupt方法中处理中断(这里是在线程获取到锁资源之后处理中断状态)。

    如果尝试获取锁资源失败,就会执行shouldParkAfterFailedAcquire,来判断(当前节点的前驱节点必须是signal状态才进入阻塞状态)释放需要将当前线程挂起,如果返回true,就会执行后面的parkAndCheckInterrupt将当前线程阻塞。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    就做了一件事,就是让自己(当前节点)在一个signal状态的节点的屁股后面排着(确保自己的前驱节点释放资源后会唤醒自己),然后进入阻塞状态。如果当前节点的前驱节点是一个有效状态,就直接修改为signal状态,如果不是,说明前驱节点被取消了,就需要往前找,找到一个有效状态的节点,跟在后面。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) // 必须是signal状态,才能被阻塞,否则就会再次去尝试获取一下锁资源
            return true;
        if (ws > 0) { // 只有CANCELLED=1 这一个状态,如果当前节点的前驱节点被取消了,就需要往前找一个状态为有效状态的waitStatus<=0节点,跟在他后面。
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else { // 如果是有效状态,但是不是signal,就cas修改为signal
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    使用LockSupportpark方法将当前线程阻塞挂起。

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted(); // 被唤醒后的第一件事是返回中断状态,并重置中断状态
    }
    

    2.1.3 从等待队列中移除

    根据acquireQueued中的代码,我们可以知道,只有当当前节点未获取到锁资源,并且抛出了异常,failed就是初始值true,就会执行下面的cancelAcquire方法,该方法主要的目的是将当前节点从队列中移除,会同时移除掉当前节点前后waitStatus=canceled的节点,如果发现当前节点是head节点会去唤醒后继节点。

    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;
        node.thread = null; // 将当前节点thread置为null
        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev; // 从当前节点往前找,找到一个有效等待节点 waitStatus <=0
        Node predNext = pred.next;
        node.waitStatus = Node.CANCELLED; // 将当前节点的状态设置为cancelled(1)
        if (node == tail && compareAndSetTail(node, pred)) { // 如果他是tail节点,直接使用我们找到的pred节点替换他
            compareAndSetNext(pred, predNext, null); // 因为是tail节点,所以节点后面就不会有下一个节点了,cas将next设置为null,这里next=null,prev不变
        } else {
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) { // 如果找到的pred不是头节点,就将pred的waitStatus设置为signal
                Node next = node.next; // 当当前节点的next是一个有效等待节点,就cas设置给pred的next
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node); // 如果当前节点时head,就去唤醒他的后继节点。
            }
            node.next = node; // help GC
        }
    }
    

    2.1.4 中断处理

    将中断操作补上。selfInterrupt执行,是因为acquireQueued返回true,根据上面的代码我们可以看到acquireQueued返回的就是中断状态,并且只有获取到锁资源的线程(这里说的是正常请求,抛异常也是可以返回的),才能从acquireQueued中返回。如果是被中断了,但是没有获取到锁资源,那么就会将中断状态记录下来,等拿到锁资源后返回。所以这里的中断操作我们才说是补的。因为存在线程被中断了,但是一直没有拿到锁,导致不能马上响应中断操作。

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
    

    2.2 release

    release是释放锁资源,如果tryRelease释放锁资源成功(返回true),就会判断头节点是否有后继节点需要唤醒。根据前面分析的代码,我们可以知道,当节点初始化时waitStatus是0,当有新的节点挂在刚初始化的节点的后面时,waitStatus会被修改为signal(-1)状态,所以这里判断 h.waitStatus != 0的意思是说头节点后面还有后继节点。此时就会去唤醒后继节点。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    2.2.1 唤醒后继节点

    这里传入的node是head节点,首先是将head节点的waitStatus修改为0,并且拿到head的后继节点,去唤醒。如果候机节点不是一个有效等待节点,就从队列的尾部开始从后往前找,找到一个有效等待节点去唤醒。

    1. 目前看来将waitStatus=0会增加一次后继节点尝试获取锁资源的机会(将要唤醒的节点尝试获取锁的机会),可以去看一下acquireQueued的代码,唤醒后会尝试获取一次锁,如果获取锁失败,会执行shouldParkAfterFailedAcquire方法,因为waitStatus=0,如果第一次会将waitStatuscas为signal,然后返回false,此时就会再去尝试获取一下锁,如果依旧获取失败,就会执行被阻塞(因为shouldParkAfterFailedAcquire方法判断到waitStatus=signle,直接返回true)。所以waitStatus设置为0,会增加一次获取锁的机会。
    2. 我们知道,尾节点的next是null,这里我们获取到的s==null其实并不表示s就一定是tail节点(前面enq方法已经分析过了)。因为还存在新增加一个节点next还没来得及设置。也正是因为这原因,所以这里在找一个有效等待节点时,选择了从后往前找,因为prev的更新是可靠的,下面补一点前面enq中的代码,我们可以看到在添加一个节点时,第一步是将新增加的节点的prev指向当前tail节点,然后进行第二步将新增加的节点设置tail节点,最后是第三步将旧的tail节点的next只想新增加的节点。首先者三个操作肯定不是原子性操作。当第二部操作成功了,说明第一步操作也肯定成功了,此时就会出现一种情况旧的tail节点的next还没指向新的tail节点,就会出现unparkSuccessors=null(s不是tail节点)的请求,如果是从前往后遍历,因为next是不可靠的,会出现s=null但是s并不是tail节点的情况(没有遍历完所有节点)。所以使用从后往前遍历,因为prev的更新,在第一步操作的时候就完成了,新增节点能成为tail的前提是prev已经更新了。
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t; //   1
                if (compareAndSetTail(t, node)) { // 2
                    // 可能线程会被挂起,来不及执行第三步
                    t.next = node; // 3
                    return t;
                }
            }
        }
    }
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

    2.3 其他

    2.3.1 acquireInterruptibly

    acquire相比,多了很多中断处理的判断,会对中断进行及时处理。不会像acquire那样,需要等到获取到锁资源才去处理中断。这里主要说与acquire相比的一些不同点。

    在进入方法后就会调用Thread.interrupted()获取当前中断状态,并重置中断状态,如果被中断了就抛出异常。然后是获取锁资源。可以看到这里换成了doAcquireInterruptibly处理排队。里面与acquireQueued很类似,核心逻辑没变。

    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
    

    这里将addWaiter方法放了进来,在parkAndCheckInterrupt之后,如果发现中断了,就直接抛异常,此时会执行cancelAcquire方法将当前节点从队列中移除。最后将异常抛出。

    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException(); // 这里直接抛出异常,而不是仅仅对中断状态进行记录
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    2.3.2 tryAcquireNanos

    不经会对中断进行及时处理,还设置了一个等待超时时间,核心是使用了LockSupport.parkNanos方法,当超过执行时间会返回。

    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }
    

    首先会计算出绝对时间,如果获取到了锁资源就返回true,如果超时了就返回false。

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold) // 时间小于等于给定值,就直接不park了
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    3. 共享锁

    对于共享锁来说,锁资源与具体线程是不相关的,如果一个线程获取了锁资源,另一个线程再来获取锁资源时,如果锁资源还能满足当前线程需求,那么依旧可以获取锁资源。

    3.1 acquireShared

    首先是通过tryAcquireShared尝试获取锁资源,并根据返回值判断,是否获取锁资源成功,<0表示获取锁资源失败,=0说明获取到了锁资源,但是没有多余的资源了,>0说明获取到了锁资源,并且还有多余的资源。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    

    3.1.1 doAcquireShared

    与独占锁的实现很类似,首先调用addWaiter创建节点并添加到队列中,然后获取当前节点的前驱节点,如果前驱节点是头节点,就尝试获取锁资源,根据返回值判断是否获取到锁资源,如果获取到了锁资源,这里首先会执行setHeadAndPropagate方法,该方法主要是更行head,然后根据条件看是否需要区唤醒后继节点,这里的中断与acquire中基本一致,获取到锁资源后,判断记录的中断状态,然后响应中断。后面的代码和acquire中的基本一致,就不多说了。

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    3.1.2 setHeadAndPropagate

    这里的setHeadacquireQueued中的一样,调的是同一个方法,重点是后面的判断,propagate表示的是剩余的锁资源,根据if中的代码我们可以知道,只有当存在剩余锁资源或者头节点为null(一般来说head不会为null),或者头节点waiteStatus<0也就是需要被唤醒,就获取当前节点的后继节点判断,后继节点是否是null或者是共享节点(也就是判断是否是一个非独占模式的节点),就会执行doReleaseShared进行节点唤醒。

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
    

    3.1.3 doReleaseShared

    将头节点的是signal,说明需要唤醒后继节点,就从signal修改为0,如果修改成功,就调用unparkSuccessor唤醒后继节点。如果头节点的状态是0,就从0改为PROPAGATE,不做其他操作。

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    

    3.2 releaseShared

    共享锁释放资源,首先使用tryReleaseShared进行锁资源释放,根据返回结果判断是否执行doReleaseShared区唤醒后继节点。

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    

    3.3 其他

    3.3.1 acquireSharedInterruptibly

    共享模式下获取锁响应中断。跟acquireInterruptibly处理中断基本一致,看一眼下面代码就知道了。

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    3.3.2 tryAcquireSharedNanos

    共享模式下的获取锁并设置等待超时时间。

    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }
    private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    4. 条件等待队列

    等待队列(Condition queue)相比前面我们说的同步等待队列(sync queue),两个队列是相互对立的,Condition queue是一个单向列表,通过nextWaiter连接。ConditionObject提供了awaitsignal两种方法,用来将节点进行阻塞放入到同步队列(Condition queue)中,或者将同步队列(Condition queue)中的阻塞节点进行唤醒。

    4.1 await

    addConditionWaiter我们在上面提到过,这就是调用了Node第二个有参构造方法的地方。addConditionWaiter创建一个新的节点,放入到条件等待队列中,并返回创建的节点,然后使用fullyRelease释放锁资源,然后根据节点释放在等待队列(sync queue)中,判断是否直接park。如果不在sync queue中,就直接park,如果在,就使用acquireQueued尝试获取锁资源的方式进行park。后面就是取消无效等待节点和响应中断了。

    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    

    4.1.2 addConditionWaiter

    当前方法是线程安全的,因为只有获取到独占锁的线程才能调用await方法,这里首先是将无效等待节点移除,然后将创建的新节点加入到队列的尾部。

    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        Node node = new Node(Thread.currentThread(), Node.CONDITION); // 指定waitStatus
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
    

    4.1.3 fullyRelease

    进行锁资源的释放,在线程进入阻塞状态前,先将获取到的锁资源释放,这与release操作基本一样

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
    

    4.1.4 isOnSyncQueue

    判断当前节点释放在等待队列(sync queue)中,如果waitStatuscondition说明在条件等待队列中;如果当前节点的pre是null,也说明在条件队列中,我们知道加入到等待队列(sync queue)的第1步操作就是设置prev;如果next有值说明在等待队列(sync queue)中,当目前为止,我们可以看到条件队列中的node并没有设置过next;最后就是findNodeFromTail,从队列的尾部往前找,然后对比。

    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }
    

    4.1.5 reportInterruptAfterWait

    响应中断状态。

    private void reportInterruptAfterWait(int interruptMode)
        throws InterruptedException {
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            selfInterrupt();
    }
    

    4.2 signal

    唤醒条件队列中第一个等待的节点。

    public final void signal() {
        if (!isHeldExclusively()) // 如果当前线程不是持有锁的线程,直接抛出异常
            throw new IllegalMonitorStateException();
        Node first = firstWaiter; // 拿到当前条件队列头节点
        if (first != null)
            doSignal(first); // 唤醒
    }
    

    4.2.1

    从第一个等待节点开始往后找,核心是transferForSignal方法,如果成功,就退出循环。

    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    

    4.2.2 transferForSignal

    将当前节点放到等待队列(sync queue)的尾部,首先回将节点的状态从condition改为初始状态0,enq方法前面我们说过了,返回的是当前节点的前驱节点。这里看看if中的判断,当前驱节点ws>0说明前驱节点是一个无效等待节点,直接唤醒当前线程,后面的cas是将前驱节点修改为signal状态,如果修改失败,也会去唤醒当前线程。经过这些操作后,会将在条件队列中等待的节点,放到(条件队列中依旧有该节点)等待队列(sync queue)的尾部等待唤醒。

    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread); // 唤醒后,会执行acquireQueued方法中的shouldParkAfterFailedAcquire,去找到一个有效等待节点后面去排队。
        return true;
    }
    

    5. 总结

    AQS提供了线程同步的基本实现,提供了两种模式,独占模式和共享模式。使用链表的方式维护了一个双向队列,记录存放着一个一个等待的线程。通过对被volatile修改的state操作,进行锁资源的获取与释放,根据操作结果对队列中节点进行阻塞和唤醒。我们需要仔细分区在什么情况是存在多线程并发,在什么情况下同一时刻只会有一个线程执行(比如独占锁释放锁资源),从而确定我们的操作是否需要确保线程安全(cas操作)。其中独占和共享两种模式其实编码上很相似,只不过共享模式在每次获取完资源后,会判断是否是否有剩余资源,从而选择是否区唤醒后继节点。

    GitHub:https://github.com/godfunc
    博客园:http://www.cnblogs.com/godfunc
    Copyright ©2019 Godfunc
  • 相关阅读:
    昨天一个人KTV啦 哈哈....
    久违的大雪终于来了
    我心目中的英雄李连杰
    今天终于可以回家了
    ajax 笔记不用刷新实现数据的分页显示 2 (下)
    学Linux可不知道怎么入手呀
    在web.config里配制连接Access数据库字符串
    昨天头请我们吃饭
    PDA程序读取xml文件的想法
    关于layui踩过的坑
  • 原文地址:https://www.cnblogs.com/Godfunc/p/15206358.html
Copyright © 2011-2022 走看看