zoukankan      html  css  js  c++  java
  • AbstractQueuedSynchronizer(AQS)

    AQS:抽象的队列同步器,是JUC包中构建锁或者其他同步组件的基础框架。

      Java中的大部分同步类(ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore)都是基于AbstractQueuedSynchronizer(AQS) 实现的。

      AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。

    原理: 

      AQS使用一个volatile的int类型的成员变量state来表示同步状态通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对state值的修改。

      volatile能够保证多线程下的可见性,当status=1则代表当前对象锁已经被占有

      核心思想是:如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;

          如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH变体的虚拟双向队列(双向链表)实现的,将暂时获取不到锁的线程加入到队列中(AQS通过将每条请求共享资源的线程封装成队列中的一个节点来实现锁的分配)。

    AQS数据结构队列的节点Node

       /**
         * 等待队列节点类
         *
         * 等待队列是“CLH”的变体(Craig、Landin和Hagersten)锁队列。CLH锁通常用于自旋锁
         * <p>
         * 每个节点中的“status”字段跟踪线程是否应该阻塞。节点在其前置任务释放时被通知(signalled)。
         * 队列的每个节点都充当一个特定的通知样式监视器,其中包含一个等待线程。
         * 但是,status字段不控制线程是否被授予锁等。
         * 如果线程是队列中的第一个线程,它可能会尝试获取。但是,第一并不能保证成功,只给了了竞争锁的权利。
         * 所以当前发布的竞争者释放锁的线程可能需要重新等待。
         * <p>To enqueue into a CLH lock, you atomically splice it in as new
         * tail. To dequeue, you just set the head field.
         * <pre>
         *      +------+  prev +-----+       +-----+
         * head |      | <---- |     | <---- |     |  tail
         *      +------+       +-----+       +-----+
         * </pre>
         */
        static final class Node {
            /**
             * Marker to indicate a node is waiting in shared mode
             * 标识表示一个节点以共享的模式等待锁
             */
            static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();
            /**
             * Marker to indicate a node is waiting in exclusive mode
             * 标识表示一个节点以独占的模式等待锁
             */
            static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;
    
            /**
             * waitStatus value to indicate thread has cancelled
             * 表示线程获取锁的请求已经取消了
             */
            static final int CANCELLED = 1;
            /**
             * waitStatus value to indicate successor's thread needs unparking
             * 表示线程已经准备好了,就等资源释放了
             */
            static final int SIGNAL = -1;
            /**
             * waitStatus value to indicate thread is waiting on condition
             * 表示节点在等待队列中,节点线程等待唤醒
             */
            static final int CONDITION = -2;
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             * 表示线程处在SHARED情况下,该字段才会使用
             */
            static final int PROPAGATE = -3;
    
            /**
             * 当前节点在队列中的状态
             *
             * Status field, taking on only the values:
             *   SIGNAL:     The successor of this node is (or will soon be)
             *               blocked (via park), so the current node must
             *               unpark its successor when it releases or
             *               cancels. To avoid races, acquire methods must
             *               first indicate they need a signal,
             *               then retry the atomic acquire, and then,
             *               on failure, block.
             *   CANCELLED:  This node is cancelled due to timeout or interrupt.
             *               Nodes never leave this state. In particular,
             *               a thread with cancelled node never again blocks.
             *   CONDITION:  This node is currently on a condition queue.
             *               It will not be used as a sync queue node
             *               until transferred, at which time the status
             *               will be set to 0. (Use of this value here has
             *               nothing to do with the other uses of the
             *               field, but simplifies mechanics.)
             *   PROPAGATE:  A releaseShared should be propagated to other
             *               nodes. This is set (for head node only) in
             *               doReleaseShared to ensure propagation
             *               continues, even if other operations have
             *               since intervened.
             *   0:          None of the above
             *
             * The values are arranged numerically to simplify use.
             * Non-negative values mean that a node doesn't need to
             * signal. So, most code doesn't need to check for particular
             * values, just for sign.
             *
             * The field is initialized to 0 for normal sync nodes, and
             * CONDITION for condition nodes.  It is modified using CAS
             * (or when possible, unconditional volatile writes).
             */
            volatile int waitStatus;
    
            /**
             * 前驱指针
             *
             * Link to predecessor node that current node/thread relies on
             * for checking waitStatus. Assigned during enqueuing, and nulled
             * out (for sake of GC) only upon dequeuing.  Also, upon
             * cancellation of a predecessor, we short-circuit while
             * finding a non-cancelled one, which will always exist
             * because the head node is never cancelled: A node becomes
             * head only as a result of successful acquire. A
             * cancelled thread never succeeds in acquiring, and a thread only
             * cancels itself, not any other node.
             */
            volatile AbstractQueuedSynchronizer.Node prev;
    
            /**
             * 后继指针
             *
             * Link to the successor node that the current node/thread
             * unparks upon release. Assigned during enqueuing, adjusted
             * when bypassing cancelled predecessors, and nulled out (for
             * sake of GC) when dequeued.  The enq operation does not
             * assign next field of a predecessor until after attachment,
             * so seeing a null next field does not necessarily mean that
             * node is at end of queue. However, if a next field appears
             * to be null, we can scan prev's from the tail to
             * double-check.  The next field of cancelled nodes is set to
             * point to the node itself instead of null, to make life
             * easier for isOnSyncQueue.
             */
            volatile AbstractQueuedSynchronizer.Node next;
    
            /**
             * 该节点的线程
             * The thread that enqueued this node.  Initialized on
             * construction and nulled out after use.
             */
            volatile Thread thread;
    
            /**
             * 指向下一个处于CONDITION状态的节点
             *
             * Link to next node waiting on condition, or the special
             * value SHARED.  Because condition queues are accessed only
             * when holding in exclusive mode, we just need a simple
             * linked queue to hold nodes while they are waiting on
             * conditions. They are then transferred to the queue to
             * re-acquire. And because conditions can only be exclusive,
             * we save a field by using special value to indicate shared
             * mode.
             */
            AbstractQueuedSynchronizer.Node nextWaiter;
    
            /**
             * Returns true if node is waiting in shared mode.
             */
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
            /**
             * 返回前驱节点,如果为null,抛出NPE
             *
             * Returns previous node, or throws NullPointerException if null.
             * Use when predecessor cannot be null.  The null check could
             * be elided, but is present to help the VM.
             *
             * @return the predecessor of this node
             */
            final AbstractQueuedSynchronizer.Node predecessor() throws NullPointerException {
                AbstractQueuedSynchronizer.Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    // Used to establish initial head or SHARED marker
            }
    
            Node(Thread thread, AbstractQueuedSynchronizer.Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }

    同步状态state

        /**
         * The synchronization state.
         */
        private volatile int state;

    访问state字段的几个方法:

    这几个方法都是final修饰的,子类无法重写。

      /**
         * 获取state的值
         */
        protected final int getState() {
            return state;
        }
    
        /**
         * 设置state的值
         */
        protected final void setState(int newState) {
            state = newState;
        }
    
        /**
         * CAS原子方式更新state
         */
        protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }

     通过修改State字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程):

                                                                     

    自定义同步器可实现的方法

     AQS提供的几个可用于自定义同步器实现的protected修饰的方法(ReentrantLock即实现了以下方法):

        /**
         * 尝试获取锁以独占方式。
         * 这个方法应该先查询对象的状态是否允许在独占模式下获取它,如果允许,则可以获取它
         *
         * @param arg 获取锁的次数
         * @return 成功则返回true,失败则返回false
         * @throws UnsupportedOperationException if exclusive mode is not supported
         */
        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    
        /**
         * 尝试释放锁以独占模式
         *
         * @param arg 释放锁的次数
         * @return 成功则返回true,失败则返回false
         * @throws UnsupportedOperationException if exclusive mode is not supported
         */
        protected boolean tryRelease(int arg) {
            throw new UnsupportedOperationException();
        }
    
        /**
         * 尝试获取锁以共享模式
         * 这个方法应该先查询对象的状态是否允许在共享模式下获取它,如果允许,则可以获取它
         * @param arg 获取锁的次数
         * @return 负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
         * @throws UnsupportedOperationException if shared mode is not supported
         */
        protected int tryAcquireShared(int arg) {
            throw new UnsupportedOperationException();
        }
    
        /**
         * 尝试释放锁以共享模式
         *
         * @param arg 释放锁的次数
         * @return 如果释放后允许唤醒后续等待结点返回true,否则返回false
         * @throws UnsupportedOperationException if shared mode is not supported
         */
        protected boolean tryReleaseShared(int arg) {
            throw new UnsupportedOperationException();
        }
    
        /**
         * 该线程是否正在独占资源。只有用到Condition才需要去实现它
         */
        protected boolean isHeldExclusively() {
            throw new UnsupportedOperationException();
        }

    一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。

    AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。ReentrantLock是独占锁,所以实现了tryAcquire-tryRelease,如ReentrantLock的非公平锁:

    Sync:

     1 abstract static class Sync extends AbstractQueuedSynchronizer {
     2 
     3         abstract void lock();
     4 
     5         /**
     6          * Performs non-fair tryLock.  tryAcquire is implemented in
     7          * subclasses, but both need nonfair try for trylock method.
     8          */
     9         final boolean nonfairTryAcquire(int acquires) {
    10             final Thread current = Thread.currentThread();
    11             int c = getState();
    12             if (c == 0) {
    13                 if (compareAndSetState(0, acquires)) {
    14                     setExclusiveOwnerThread(current);
    15                     return true;
    16                 }
    17             }
    18             else if (current == getExclusiveOwnerThread()) {
    19                 int nextc = c + acquires;
    20                 if (nextc < 0) // overflow
    21                     throw new Error("Maximum lock count exceeded");
    22                 setState(nextc);
    23                 return true;
    24             }
    25             return false;
    26         }
    27 
    28         protected final boolean tryRelease(int releases) {
    29             int c = getState() - releases;
    30             if (Thread.currentThread() != getExclusiveOwnerThread())
    31                 throw new IllegalMonitorStateException();
    32             boolean free = false;
    33             if (c == 0) {
    34                 free = true;
    35                 setExclusiveOwnerThread(null);
    36             }
    37             setState(c);
    38             return free;
    39         }
    40     ...
    41     }

    NonfairSync:

     1     /**
     2      * Sync object for non-fair locks
     3      */
     4     static final class NonfairSync extends Sync {
     5         private static final long serialVersionUID = 7316153563782823691L;
     6 
     7         /**
     8          * Performs lock.  Try immediate barge, backing up to normal
     9          * acquire on failure.
    10          */
    11         final void lock() {
    12             if (compareAndSetState(0, 1))
    13                 setExclusiveOwnerThread(Thread.currentThread());
    14             else
    15                 acquire(1);
    16         }
    17 
    18         protected final boolean tryAcquire(int acquires) {
    19             return nonfairTryAcquire(acquires);
    20         }
    21     }

    非公平锁与AQS之间方法的关联之处

     ReentrantLock和AQS之间方法的交互过程,以非公平锁的加锁和解锁为例:

       

     ReentrantLock加锁解锁时API层核心方法的映射关系

     

    等待队列

      1、线程加入时机

      当执行 acquire(1) 时,会通过tryAcquire()获取锁。如果获取锁失败,就会调用addWaiter加入到等待队列中去。

      2、线程如何加入

      addWaiter(Node.EXCLUSIVE):返回当前线程节点

     1     /**
     2      * Creates and enqueues node for current thread and given mode.
     3      *
     4      * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     5      * @return the new node
     6      */
     7     private Node addWaiter(Node mode) {
     8         Node node = new Node(Thread.currentThread(), mode);
     9         // Try the fast path of enq; backup to full enq on failure
    10         Node pred = tail;
    11         if (pred != null) {
    12             node.prev = pred;
    13             if (compareAndSetTail(pred, node)) {
    14                 pred.next = node;
    15                 return node;
    16             }
    17         }
    18         enq(node);
    19         return node;
    20     }
    21 
    22 
    23     /**
    24      * CAS tail field. Used only by enq.
    25      */
    26     private final boolean compareAndSetTail(Node expect, Node update) {
    27         return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    28     }

      流程

        1)、通过当前线程和锁模式新建一个节点 node

        2)、将新节点node的prev指针指向等待队列的tail节点

        3)、设置尾节点为新节点node;通过compareAndSetTail方法,完成尾节点的设置。这个方法主要是对tailOffset和Expect进行比较,如果tailOffset的Node和Expect的Node地址是相同的,那么设置Tail的值为Update的值

       

     另外:如果Pred指针是Null(说明等待队列中没有元素),或者当前Pred指针和Tail指向的位置不同(说明被别的线程已经修改),就需要看一下enq的方法。

        /**
         * Inserts node into queue, initializing if necessary. See picture above.
         * @param node the node to insert
         * @return node's predecessor
         */
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }

        1)、如果没有被初始化,就新建节点作为头结点。但是,头结点并不是当前线程节点,而是调用了无参构造函数的节点

        2)、如果队列已经初始化或者并发导致队列中已经有元素,则与上面相同,将当前线程节点添加到队列尾部

       3、等待队列中线程出队时机

        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

        addWaiter(Node.EXCLUSIVE)方法返回新加入队列的线程节点,来作为acquireQueued方法的参数。acquireQueued方法可以对排队中的线程进行 “获锁” 操作。acquireQueued方法会把加入队列中的线程不断去获取锁,直到成功或者不再需要获取(中断)。

    final boolean acquireQueued(final Node node, int arg) {
        // 标记是否成功拿到资源
        boolean failed = true;
        try {
            // 标记等待过程中是否中断过
            boolean interrupted = false;
            // 开始自旋,要么获取锁,要么中断
            for (;;) {
                // 获取当前节点的前驱节点
                final Node p = node.predecessor();
                // 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
                if (p == head && tryAcquire(arg)) {
                    // 获取锁成功,头指针移动到当前node
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    解锁

      

    END.

  • 相关阅读:
    目标检测——Faster R_CNN使用smooth L1作为bbox的回归损失函数原因
    [LeetCode] 2. Add Two Numbers
    XAF 非持久化的详细视图界面全部清空
    4月份开发的问题汇总
    XAF GroupOperator合并查询筛选条件
    C#判断字符判断为空或者空格
    如何去掉C#字符串前后的空格
    IIS 发布出去未能加载文件或程序集“UHFReader”或它的某一个依赖项。试图加载格式不正确
    《图解HTTP》笔记
    Win10中的控制台程序会被鼠标单击暂停运行的解决办法
  • 原文地址:https://www.cnblogs.com/yangyongjie/p/13847717.html
Copyright © 2011-2022 走看看