zoukankan      html  css  js  c++  java
  • java高并发核心类 AQS(Abstract Queued Synchronizer)抽象队列同步器

    什么是AQS?

    • 全称: Abstract Queued Synchronizer: 抽象队列同步器
    • 是 java.util.concurrent.locks包下的一个抽象类
    • 其编写者: Doug Lea (并发大佬, 整个j.u.c包都是他写的)
    • 是 j.u.c 包的基础组件(核心)

    我们先来读一下该类的英文说明注释:

    /**
     * Provides a framework for implementing blocking locks and related synchronizers  
     * (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.  
     * 提供了一个实现阻塞式锁 及相关的 依赖于先进先出等待队列的 同步器(如信号量, 事件...) 的框架。
     *
     * This class is designed to be a useful basis for most kinds of synchronizers that rely on 
     * a single atomic {@code int} value to represent state.
     * 该类被设计为 所有依赖于单个原子性值 来代表其状态的同步器 的基石。
     *
     * Subclasses must define the protected methods that change this state, and which
     * define what that state means in terms of this object being acquired or released.  
     * 子类必须 通过定义 protected 方法 来改变 此状态, 并且 定义 定义该状态意味着 当前对象 被 获取了   
     * 或 被释放了。
     *
     * Given these, the other methods in this class carryout all queuing and blocking mechanics. 
     * 基于上述已知, 此类中的其他方法 实行了 所有的 队列等待 和 阻塞 机制。
     *
     * Subclasses can maintain other state fields, but only the atomically updated {@code int}
     * value manipulated using methods {@link #getState}, {@link #setState} and {@link 
     * #compareAndSetState} is tracked with respect to synchronization.
     * 子类可以维持其他状态域, 但是只有 被 getState, setState 和 compareAndSetState(CAS 比较并设置
     * 转态) 方法 管理的 且 被原子性 更新地 值 才会被同步化关联
     *
     * <p>Subclasses should be defined as non-public internal helper classes that are used to 
     * implement the synchronization properties of their enclosing class. 
     * 子类 应当被定义为 被用于实现它们的 封闭类 的 同步特性 的 非公开帮助类
     *
     * Class {@code AbstractQueuedSynchronizer} does not implement any synchronization 
     * interface.  
     * AQS 并没有实现任何同步接口。
     *
     * Instead it defines methods such as {@link #acquireInterruptibly} that can be invoked as
     * appropriate by concrete locks and related synchronizers to implement their public 
     * methods.
     * 它定义了诸如 acquireInterruptibly 等 可以 被 具体的锁 和 相关的同步器 请求 来实现 它们的公有方
     * 法
     *
     * <p>This class supports either or both a default <em>exclusive</em> mode and a 
     * <em>shared</em> mode. 
     * 该类支持 仅排除(排他)模式, 仅共享模式 以及 两种模式并存。
     *
     *
     * When acquired in exclusive mode, attempted acquires by other threads cannot succeed. 
     * 当一个线程 在 排他模式中 获取了锁, 其他线程的 获取(该锁) 就不能成功
     *
     * Shared mode acquires by multiple threads may (but need not) succeed. 
     * 共享模式下, 多个线程获取 一个锁 可能成功(但是 通常 不需要)
     *
     * This class does not &quot;understand&quot; these differences except in the mechanical 
     * sense that when a shared mode acquire succeeds, the next waiting thread (if 
     * one exists) must also determine whether it can acquire as well. 
     * 该类只有在 一个线程在共享模式下获取到了锁 才会 去判断 下一个线程是否能够获取该锁
     *
     * Threads waiting in the different modes share the same FIFO queue. 
     * 在不同模式下等待的线程 共享 先进先出队列
     *
     * Usually, implementation subclasses support only one of these modes, but both can come 
     * into play for example in a {@link ReadWriteLock}. 
     * 通常, 实现该抽象类的子类 只支持其中一种模式(共享, 排他), 但是 在 读写锁中 两种模式可以并存。
     *
     * Subclasses that support only exclusive or only shared modes need not define the methods 
     * supporting the unused mode.
     * 仅支持排他 或 仅支持共享 模式的子类 不需要定义 未被使用的 模式的方法
     *
     * <p>This class defines a nested {@link ConditionObject} class that
     * can be used as a {@link Condition} implementation by subclasses
     * supporting exclusive mode for which method {@link #isHeldExclusively} reports whether 
     * synchronization is exclusively held with respect to the current thread, method {@link 
     * #release} invoked with the current {@link #getState} value fully releases
     * this object, and {@link #acquire}, given this saved state value,
     * eventually restores this object to its previous acquired state. 
     * 该类定义了一个可被用作 支持排他模式的子类的状态应用 的 嵌套类。isHeldExclusively 方法 报告了
     * 同步对当前线程是否维持排他模式。release 方法 通过 getState请求当前 线程状态值, 并且充分释放该对
     * 象。acquire方法 根据已保存的状态值 最终 将该对象恢复到 它 上次被 获取之前的状态。
     *  衍生: semaphore(信号量) 中的 acquire, release
     *
     * No {@code AbstractQueuedSynchronizer} method otherwise creates such a
     * condition, so if this constraint cannot be met, do not use it.  
     * 没有抽象队列同步器 方法 就只好 创建 创建一个 状态, 如果这些约束不满足, 就不使用它
     *
     * The behavior of {@link ConditionObject} depends of course on the
     * semantics of its synchronizer implementation.
     * 状态对象 的行为 毫无疑问 取决于 它的 同步器应用 的 语义
     *
     * <p>This class provides inspection, instrumentation, and monitoring
     * methods for the internal queue, as well as similar methods for
     * condition objects. 
     * 该类对内部队列 以及相似的 状态对象 提供了 检查, 仪器化 和 监控 的方法
     *
     * These can be exported as desired into classes using an {@code AbstractQueuedSynchronizer} 
     * for their synchronization mechanics.
     * 这些方法可以根据需求 通过 AQS 来 导入 到 类中 以实现同步机制
     *
     * <p>Serialization of this class stores only the underlying atomic
     * integer maintaining state, so deserialized objects have empty
     * thread queues. 
     * 此类的序列化仅存储原子性的 整形 维持 状态, 所以
     *
     * Typical subclasses requiring serializability will define a {@code readObject} method that 
     * restores this to a known initial state upon deserialization.
     * 典型的需要序列化属性的 子类会通过 定义 一个 读取 对象的 方法 来将它 恢复为 取决于 反序列化的 已 
     * 知的的 初始状态 
     *
     * <h3>Usage</h3>
     *
     * <p>To use this class as the basis of a synchronizer, redefine the
     * following methods, as applicable, by inspecting and/or modifying
     * the synchronization state using {@link #getState}, {@link
     * #setState} and/or {@link #compareAndSetState}:
     * 要使用该类作为同步器的基础, 需要通过 使用 getState, setState, CAS 方法 来 检查 或 修改 重定义以下方法为 可执行的 方法。
     *
     * <ul>
     * <li> {@link #tryAcquire}
     * <li> {@link #tryRelease}
     * <li> {@link #tryAcquireShared}
     * <li> {@link #tryReleaseShared}
     * <li> {@link #isHeldExclusively}
     * </ul>
     *
     * Each of these methods by default throws {@link UnsupportedOperationException}. 
     * 以上每个方法都会默认抛出 不支持的操作选项 异常
     *
     * Implementations of these methods must be internally thread-safe, and should in general be 
     * short and not block.
     * 这些方法 的实现类 必须是 内部 线程 安全的m 并且普遍应当 量小 且 非阻塞
     *
     * Defining these methods is the <em>only</em> supported means of using this class. 
     * 以上这些方法 的 定义 仅用于 支持 该类
     *
     * All other methods are declared {@code final} because they cannot be independently varied.
     * 其他的方法都以 final 声明, 因为它们 不能是 无关变化的
     *
     * <p>You may also find the inherited methods from {@link AbstractOwnableSynchronizer} 
     * useful to keep track of the thread owning an exclusive synchronizer. 
     * 你可能也会发现 从 AQS 继承来的 方法 在追踪 一个 拥有 排他模式 同步器的 线程时 很 有用
     *
     * You are encouraged to use them -- this enables monitoring and diagnostic tools to assist 
     * users in determining which threads hold locks.
     * 使用他们 使得 负责监控 和 诊断的工具 能更好地 帮助用户确定 哪个线程占用着锁
     *
     * <p>Even though this class is based on an internal FIFO queue, it does not automatically  
     * enforce FIFO acquisition policies.  
     * 尽管该类 基于 一个 内部的 先进先出 队列, 他并没有 自动 强迫 使用 先进先出 的获取策略
     *
     * The core of exclusive synchronization takes the form:
     * 排他同步的核心为以下形式:
     * <pre>
     * Acquire:
     *      // 只要没有获取到参数
     *     while (!tryAcquire(arg)) {
     *        // 如果线程没有入队, 就让他入队
     *        <em>enqueue thread if it is not already queued</em>;
     *        // 也可能是当前线程被阻塞了
     *        <em>possibly block current thread</em>;
     *     }
     *
     * Release:
     *     // 如果释放参数成功
     *     if (tryRelease(arg))
     *        // 就解除队列头部的线程
     *        <em>unblock the first queued thread</em>;
     * </pre>
     *
     * (Shared mode is similar but may involve cascading signals.)
     * 共享模式也相似, 但是可能涉及 层级信号
     *
     * <p id="barging">Because checks in acquire are invoked before
     * enqueuing, a newly acquiring thread may <em>barge</em> ahead of
     * others that are blocked and queued.  
     * 因为在线程入队前需要请求检查, 一个新 获取的线程 可能 会 冲撞 到其他 阻塞 且处于 队列中的 线程(即
     * 插队) [非公平锁]
     *
     * However, you can, if desired, define {@code tryAcquire} and/or {@code tryAcquireShared} 
     * to disable barging by internally invoking one or more of the inspection
     * methods, thereby providing a <em>fair</em> FIFO acquisition order.
     * 有需求的话 也可以 在 tryAcquire 和/或 tryAcquireShared中 通过内部请求来 禁用 冲撞参数, 从而
     * 提供一个公平的 先进先出 获取顺序 [公平锁]
     *
     * In particular, most fair synchronizers can define {@code tryAcquire}
     * to return {@code false} if {@link #hasQueuedPredecessors} (a method
     * specifically designed to be used by fair synchronizers) returns
     * {@code true}.  Other variations are possible.
     * 事实上, 更公平的同步器可以定义 tryAcquire 方法 返回 false 如果队列中有前者(这是一个被特定用于定
     * 义公平同步器 的设计), 如果队列中没有前者就返回true 
     *
     * <p>Throughput and scalability are generally highest for the default barging (also known 
     * as <em>greedy</em>,<em>renouncement</em>, and <em>convoy-avoidance</em>) strategy.
     * 尽管默认使用 barging 机制 的 写入 和 可扩展性 (又称贪婪重申 避免护航) 是最高的。
     *
     * While this is not guaranteed to be fair or starvation-free, earlier
     * queued threads are allowed to recontend before later queued
     * threads, and each recontention has an unbiased chance to succeed
     * against incoming threads.  
     * 尽管这并没有保证 公平 或者说 无饥饿性, 更早入队的线程 被允许 在 后入队的线程 之后 重申 使用权, 
     * 每次重申都有 公平的 成功机会。
     *
     * Also, while acquires do not &quot;spin&quot; in the usual sense, they may perform 
     * multiple invocations of {@code tryAcquire} interspersed with other
     * computations before blocking.
     * 尽管 请求 并不会自旋, 通常 它们 会执行许多次 tryAcquire 方法调用,在阻塞之前, 其间夹杂着其他的
     * 计算
     *
     * This gives most of the benefits of spins when exclusive synchronization is only briefly   
     * held, without most of the liabilities when it isn't. 
     * 当处于排他同步模式时, 它具有了所有自旋的好处, 而当不处于排他模式时, 就不需要这些义务
     *
     * If so desired, you can augment this by preceding calls to acquire methods with "fast-
     * path" checks, possibly prechecking {@link #hasContended} and/or {@link #hasQueuedThreads} 
     * to only do so if the synchronizer is likely not to be contended.
     *
     * <p>This class provides an efficient and scalable basis for
     * synchronization in part by specializing its range of use to
     * synchronizers that can rely on {@code int} state, acquire, and
     * release parameters, and an internal FIFO wait queue. When this does
     * not suffice, you can build synchronizers from a lower level using
     * {@link java.util.concurrent.atomic atomic} classes, your own custom
     * {@link java.util.Queue} classes, and {@link LockSupport} blocking
     * support.
     * 同步器 <-依赖于- state,  acquire,release param, 内部等待队列
     * 修改 同步器的使用范围  -> 扩展 和 效用
     * 可使用原子类, 队列类 和 LockSupport(提供阻塞, park, unpark) 自己构造同步器
     *
     * <h3>Usage Examples</h3>
     *
     * <p>Here is a non-reentrant mutual exclusion lock class that uses the value zero to 
     * represent the unlocked state, and one to represent the locked state.
     * 这是一个不可重入的互斥锁类, 0: 未获得锁 1: 已获得锁
     *
     * While a non-reentrant lock does not strictly require recording of the current owner
     * thread, this class does so anyway to make usage easier to monitor.
     * 尽管 一个 非可重入锁, 并不严格要求记录当前线程的所有者, 该类确实这么做了, 以使使用和监控更方便
     *
     * It also supports conditions and exposes one of the instrumentation methods:
     * 它还提供了状况 状态 和 暴露设备方法 的支持
     *
     *  <pre> {@code
     * class Mutex implements Lock, java.io.Serializable {
     *
     *   // Our internal helper class 内部帮助类
     *   private static class Sync extends AbstractQueuedSynchronizer {
     *     // Reports whether in locked state 报告是否在使用锁的状态
     *     protected boolean isHeldExclusively() {
     *       return getState() == 1;
     *     }
     *
     *     // Acquires the lock if state is zero 如果状态为0就获取锁
     *     public boolean tryAcquire(int acquires) {
     *       assert acquires == 1; // Otherwise unused
     *       // CAS
     *       if (compareAndSetState(0, 1)) {
     *         setExclusiveOwnerThread(Thread.currentThread());
     *         return true;
     *       }
     *       return false;
     *     }
     *
     *     // Releases the lock by setting state to zero 通过将状态设置为 0 来 释放锁
     *     protected boolean tryRelease(int releases) {
     *       assert releases == 1; // Otherwise unused
     *       if (getState() == 0) throw new IllegalMonitorStateException();
     *       setExclusiveOwnerThread(null);
     *       setState(0);
     *       return true;
     *     }
     *
     *     // Provides a Condition 提供状态
     *     Condition newCondition() { return new ConditionObject(); }
     *
     *     // Deserializes properly 适当的反序列化
     *     private void readObject(ObjectInputStream s)
     *         throws IOException, ClassNotFoundException {
     *       s.defaultReadObject();
     *       setState(0); // reset to unlocked state 重设
     *     }
     *   }
     *
     *   // The sync object does all the hard work. We just forward to it.
     *   // 同步对象 做了 所有的 苦工
     *   private final Sync sync = new Sync();
     *
     *   public void lock()                { sync.acquire(1); }
     *   public boolean tryLock()          { return sync.tryAcquire(1); }
     *   public void unlock()              { sync.release(1); }
     *   public Condition newCondition()   { return sync.newCondition(); }
     *   public boolean isLocked()         { return sync.isHeldExclusively(); }
     *   public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
     *   public void lockInterruptibly() throws InterruptedException {
     *     sync.acquireInterruptibly(1);
     *   }
     *   public boolean tryLock(long timeout, TimeUnit unit)
     *       throws InterruptedException {
     *     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
     *   }
     * }}</pre>
     *
     * <p>Here is a latch class that is like a {@link java.util.concurrent.CountDownLatch 
     * CountDownLatch} except that it only requires a single {@code signal} to
     * fire.
     * 类似CountDownLatch, 除了只需要 一个简单的信号 来启动
     *
     * Because a latch is non-exclusive, it uses the {@code shared}
     * acquire and release methods.
     * Latch 是非排他模式的, 使用共享模式的 获取 和 释放方法
     *
     *  <pre> {@code
     * class BooleanLatch {
     *
     *   private static class Sync extends AbstractQueuedSynchronizer {
     *     boolean isSignalled() { return getState() != 0; }
     *
     *     protected int tryAcquireShared(int ignore) {
     *       return isSignalled() ? 1 : -1;
     *     }
     *
     *     protected boolean tryReleaseShared(int ignore) {
     *       setState(1);
     *       return true;
     *     }
     *   }
     *
     *   private final Sync sync = new Sync();
     *   public boolean isSignalled() { return sync.isSignalled(); }
     *   public void signal()         { sync.releaseShared(1); }
     *   public void await() throws InterruptedException {
     *     sync.acquireSharedInterruptibly(1);
     *   }
     * }}</pre>
     *
     * @since 1.5
     * @author Doug Lea
     */
    

    AQS三板斧

    • CAS(compareAndSwap) [比较并交换]

      • 源码:

        /**
             * Atomically sets synchronization state to the given updated
             * value if the current state value equals the expected value.
             * 如果当前状态的值 等于 期望的状态的值, 原子性地设置了同步状态 为获取的更新的值
             * This operation has memory semantics of a {@code volatile} read
             * and write.
             * 该操作 有 与 内存 语义相关的 由 volatile(可见性, 汇编里面其实还是加了锁) 修饰的 读
             * 写操作
             * @param expect the expected value
             * @param update the new value
             * @return {@code true} if successful. False return indicates that the actual
             *         value was not equal to the expected value.
             */
            protected final boolean compareAndSetState(int expect, int update) {
                // See below for intrinsics setup to support this
                return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
            }
        	// sun.misc包 底层通过cpu晶体管来调度, 保证了原子性
            public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
        
    • Spin (自旋)

      • 理解上其实就是 写个 无限循环, 来控制线程, 不让它们跳出逻辑区之外。

      • 源码:

        /**
             * Inserts node into queue, initializing if necessary. See picture above.
             * 将节点插入到队列中如果需要就初始化
             * @param node the node to insert
             * @return node's predecessor 返回该节点的前者
             */
            private Node enq(final Node node) {
                // 自旋
                for (;;) {
                    // 将 节点t 设置为 内队列的尾部节点 其实理解为 将 t 指针 指向 队列尾部 也可以
                    Node t = tail;
                    if (t == null) { // Must initialize 如果尾节点为空, 必须初始化
                        if (compareAndSetHead(new Node()))
                            // 将队列尾部设置为头结点
                            tail = head;
                    } else {
                        // 如果尾节点不为空, 将当前节点的前一个节点设置为尾节点
                        node.prev = t;
                        if (compareAndSetTail(t, node)) {
                            // 将尾节点的下一个节点设置为当前节点
                            t.next = node;
                            return t;
                        }
                    }
                }
            }
        
    • LockSupport(提供阻塞)

      • park

             * @param blocker the synchronization object responsible for this
             *        thread parking
             * @since 1.6
             */
            public static void park(Object blocker) {
                 // 获取当前线程
                Thread t = Thread.currentThread();
                 // 设置线程的阻塞者
                setBlocker(t, blocker);
                 // 执行阻塞 false 为 isAbsolute 标签的值, 是否绝对时间, 0L 为绝对时间
                UNSAFE.park(false, 0L);
                setBlocker(t, null);
            }
        
      • unpark

             * @param thread the thread to unpark, or {@code null}, in which case
             *        this operation has no effect
             */
            public static void unpark(Thread thread) {
                // 如果线程不为空, 就解除阻塞
                if (thread != null)
                    UNSAFE.unpark(thread);
            }
           
        
    • AQS具备特性

      • 阻塞等待队列
      • 共享/独占
      • 公平/非公平
      • 可重入
      • 允许中断
    • 基于AQS的同步工具

      • ReentrantLock(可重入锁)
      • ReentrantReadWriteLock(可重入的读写锁)
      • Semaphore(信号量) [基本上所有语言都有]
      • CountDownLatch(技术插销) 类比golang waitGroups
      • FutureTask(未来任务)
      • SynchronousQueues(同步队列集)
    • CLH 队列

      • 源码注释:
      /**
           * Wait queue node class.
           *
           * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
           * Hagersten) lock queue. CLH locks are normally used for
           * spinlocks.  We instead use them for blocking synchronizers, but
           * use the same basic tactic of holding some of the control
           * information about a thread in the predecessor of its node.  A
           * "status" field in each node keeps track of whether a thread
           * should block.  A node is signalled when its predecessor
           * releases.  Each node of the queue otherwise serves as a
           * specific-notification-style monitor holding a single waiting
           * thread. The status field does NOT control whether threads are
           * granted locks etc though.  A thread may try to acquire if it is
           * first in the queue. But being first does not guarantee success;
           * it only gives the right to contend.  So the currently released
           * contender thread may need to rewait.
           *
           * <p>To enqueue into a CLH lock, you atomically splice it in as new
           * tail. To dequeue, you just set the head field.
           * <pre>
           *      +------+  prev +-----+       +-----+
           * head |      | <---- |     | <---- |     |  tail
           *      +------+       +-----+       +-----+
           * </pre>
           *
           * <p>Insertion into a CLH queue requires only a single atomic
           * operation on "tail", so there is a simple atomic point of
           * demarcation from unqueued to queued. Similarly, dequeuing
           * involves only updating the "head". However, it takes a bit
           * more work for nodes to determine who their successors are,
           * in part to deal with possible cancellation due to timeouts
           * and interrupts.
           *
           * <p>The "prev" links (not used in original CLH locks), are mainly
           * needed to handle cancellation. If a node is cancelled, its
           * successor is (normally) relinked to a non-cancelled
           * predecessor. For explanation of similar mechanics in the case
           * of spin locks, see the papers by Scott and Scherer at
           * http://www.cs.rochester.edu/u/scott/synchronization/
           *
           * <p>We also use "next" links to implement blocking mechanics.
           * The thread id for each node is kept in its own node, so a
           * predecessor signals the next node to wake up by traversing
           * next link to determine which thread it is.  Determination of
           * successor must avoid races with newly queued nodes to set
           * the "next" fields of their predecessors.  This is solved
           * when necessary by checking backwards from the atomically
           * updated "tail" when a node's successor appears to be null.
           * (Or, said differently, the next-links are an optimization
           * so that we don't usually need a backward scan.)
           *
           * <p>Cancellation introduces some conservatism to the basic
           * algorithms.  Since we must poll for cancellation of other
           * nodes, we can miss noticing whether a cancelled node is
           * ahead or behind us. This is dealt with by always unparking
           * successors upon cancellation, allowing them to stabilize on
           * a new predecessor, unless we can identify an uncancelled
           * predecessor who will carry this responsibility.
           *
           * <p>CLH queues need a dummy header node to get started. But
           * we don't create them on construction, because it would be wasted
           * effort if there is never contention. Instead, the node
           * is constructed and head and tail pointers are set upon first
           * contention.
           *
           * <p>Threads waiting on Conditions use the same nodes, but
           * use an additional link. Conditions only need to link nodes
           * in simple (non-concurrent) linked queues because they are
           * only accessed when exclusively held.  Upon await, a node is
           * inserted into a condition queue.  Upon signal, the node is
           * transferred to the main queue.  A special value of status
           * field is used to mark which queue a node is on.
           *
           * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
           * Scherer and Michael Scott, along with members of JSR-166
           * expert group, for helpful ideas, discussions, and critiques
           * on the design of this class.
           */
      
      • 它并不是原始的CLH双端队列, 而是其的一个变种。(未完)
  • 相关阅读:
    认识dojo
    CommonJS规范
    点滴
    快速排序
    npm常用命令
    http详解
    js经验点滴js apply/call/caller/callee/bind使用方法与区别分析
    给string添加新的函数
    大马隐藏锁定研究
    一键购买
  • 原文地址:https://www.cnblogs.com/ronnieyuan/p/11688367.html
Copyright © 2011-2022 走看看