zoukankan      html  css  js  c++  java
  • AQS详解(AbstractQueuedSynchronizer)

    Intrinsic VS explicity

     1. 不一定保证公平              1. 提供公平和非公平的选择

     2. 无                          2. 提供超时的功能

     3. 无                          3. 提供对中断的响应

     4. 无                          4. 提供try... 的方法

     5. 保证获取锁和释放锁的一致性  5. 需要自己保证

     6. 只有一个conditionQueue    6. 可以有多个conditonQueue

                                    7.  使用灵活

     

     

    一个带有内部状态的类

    自定义同步协议需要注意的点:

    1. 获取锁的时候状态如何变化
    2. 释放锁的时候状态如何变化已经状态变化后是都对别的等待方有影响

     

     

     

    AQS的作用

    1. 提供对获取锁成功或失败后线程的管理比如是否进行阻塞
    2. 当释放锁后提供对等待的线程的通知功能
    3. 提供给用户对状态进行自定义处理的接口

     

    AQS的使用

    1. AQS依赖于一个FIFO队列和一个int表示的状态. 具体的实现类可以自己去保存别的状态, 但是只有这个int表示的状态是require和release的, 子类自己的状态是做辅助作用的.

    2. 想要使用AQS来作为一个同步器的基类,需要实现下面这几个方法.

    tryAcquire
    tryRelease   
    tryAcquireShared
    tryReleaseShared
    isHeldExclusively

    (注意一定要保证线程安全)

    3. 可以通过这些方法来对状态进行监控和修改 

    getState 

    setState 

    compareAndSetState 

     

    源码分析

    /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */

    /**addWaiter()将新node加到队列的末尾, 如果队列没有初始化的话,进行初始化操作(如果head节点是null的话,创建一个新的node当head节点)

    acquireQueued() 如果当前节点是head的下一个节点,进行获取state操作,成功后不进行阻塞. 否则,在链表中向前寻找,直到找到一个没有被取消的节点,把当前节点作为找到的节点的下一个节点,然后把当前节点阻塞.

    cancelAcquire() 把当前节点的状态改为取消状态,然后向前找,直到找到一个没有被取消的节点,如果这个节点不是头节点,而且没有被取消,则把当前节点的下一个节点作为找到的这个节点的子节点. 否则,找到当前节点的下一个没有被取消的节点,然后把它唤醒.

    **/
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)

    // 找到下一个没有被取消的节点,然后把他唤醒.
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    Release操作不会去操作队列,只会去唤醒后边的没有取消的节点.而且release后会把当前节点的状态改为0.

    Head节点可能是初始化的那个new node()节点也有可能是获取状态成功的那个节点.

    所以,即使头节点的状态是0, 后续来的节点也会把头节点的状态改为Signal, 除非后面已经没有等待的节点了.

    就算中间的节点可能因为某种因素导致链条没有接上,也会从tail开始搜索来保证如果有是一定可以找到的就算因为竞争的问题导致新加入的节点没有接收到前面节点的通知,但是,这种情况下前面的节点,除了头节点,都已经被取消了所以在enq()操作的时候会直接去tryAcquire,也不会导致整个操作有问题还有一个保障措施是只有当前面的节点是signal的时候才会安心的去block,而在reslease这边,会把这个状态改成0,然后再去唤醒后边的几点.

    需要注意的是如果我们的release操作和acquire操作中,state的状态改动不一致,会导致死锁.

     

    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))

    // 和非中断的操作的区别是, 当当前节点被唤醒的时候, 如果被中断过,会抛出中断异常
            doAcquireInterruptibly(arg);
    }

    /**
     * Attempts to acquire in exclusive mode, aborting if interrupted,
     * and failing if the given timeout elapses.  Implemented by first
     * checking interrupt status, then invoking at least once {@link
     * #tryAcquire}, returning on success.  Otherwise, the thread is
     * queued, possibly repeatedly blocking and unblocking, invoking
     * {@link #tryAcquire} until success or the thread is interrupted
     * or the timeout elapses.  This method can be used to implement
     * method {@link Lock#tryLock(long, TimeUnit)}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @param nanosTimeout the maximum number of nanoseconds to wait
     * @return {@code true} if acquired; {@code false} if timed out
     * @throws InterruptedException if the current thread is interrupted
     */
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||

    // 和正常的区别是, 在shouldpark后, 会判断时间的长短,短的话, 会进行自旋. 长的话, 会调用parkNanos()进行阻塞
            doAcquireNanos(arg, nanosTimeout);
    }

     

    /**
     * Acquires in shared mode, ignoring interrupts.  Implemented by
     * first invoking at least once {@link #tryAcquireShared},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquireShared} until success.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquireShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     */
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)

    // 和正常的区别是, 在成功获取状态后,在setHead后,如果tryAcquire返回的值大于0,而且下一个节点也是shared的节点,且当前节点的状态是signal. 则唤醒下一个节点. 如果当前节点的状态是0,则改为PROPAGATE

         doAcquireShared(arg);
    }

     

     

    /**
     * Release action for shared mode -- signal successor and ensure
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    和release的区别是, 如果head的状态是0 , 会改为PROPAGATE

     

    private void doReleaseShared() {
        /*

    Condition

    /**
     * Implements interruptible condition wait.
     * <ol>
     * <li> If current thread is interrupted, throw InterruptedException.
     * <li> Save lock state returned by {@link #getState}.
     * <li> Invoke {@link #release} with
     *      saved state as argument, throwing
     *      IllegalMonitorStateException if it fails.
     * <li> Block until signalled or interrupted.
     * <li> Reacquire by invoking specialized version of
     *      {@link #acquire} with saved state as argument.
     * <li> If interrupted while blocked in step 4, throw InterruptedException.
     * </ol>
     */
    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();

    // 1如果最后一个节点被取消了, 就把整个链表中取消的节点都删掉
    // 2 把这个节点加到链表的最后, 初始状态是condition

    Node node = addConditionWaiter();

    // 调用release操作, 把当前获取锁的node release掉(注意, 并没有保证conditionNode 和 当前持有锁的线程的node的对应)
        int savedState = fullyRelease(node);
        int interruptMode = 0;

    // 判断这个node是否在SyncQueue 中
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);

    // 被唤醒或被中断
            if ((interruptMode =

    // 如果当前线程没有被中断过,break, 但是中断后可能会很诡异

    否则判断是否在signal之前取消了,如果是, 把这个node转到SyncQueue中, 状态从condition转到0, 

    如果状态不是condition, 一直等到这个节点被放到syncQueue中,然后返回false

     checkInterruptWhileWaiting(node)) != 0)
                break;
        }

    // 获取锁
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)

    // 看下是否被中断过, 然后对中断做响应.
            reportInterruptAfterWait(interruptMode);
    }

    /**
     * Moves the longest-waiting thread, if one exists, from the
     * wait queue for this condition to the wait queue for the
     * owning lock.
     *
     * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
     *         returns {@code false}
     */
    public final void signal() {

    // 所以使用condition必须实现isHeldExclusively 这个方法
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }

    /**
     * Removes and transfers nodes until hit non-cancelled one or
     * null. Split out from signal in part to encourage compilers
     * to inline the case of no waiters.
     * @param first (non-null) the first node on condition queue
     */
    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;

    // 1. 把节点的状态从condition转到0, 如果失败, 返回false

      2. 把这个节点插入到syncQueue中, 把这个节点的上一个节点置为signal. 如果上个节点是取消状态或者置为signal的操作失败,把这个节点的线程唤醒. 返回true. 这时这个节点已经在syncqueue中了, 整个流程可以正常跑通

    // 结论: 将这个node的线程作为一个正常的获取锁的线程, 把他放到syncQueue中, 等待被唤醒, 如果没有头节点, 或错过了头节点的唤醒, 就直接唤醒, 
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }

  • 相关阅读:
    阿里云 k8s 部署 Spring Cloud Alibaba 微服务实践 (四) 自动化部署
    阿里云 k8s 部署 Spring Cloud Alibaba 微服务实践 (三) 服务观测
    阿里云 k8s 部署 Spring Cloud Alibaba 微服务实践 (二) 部署微服务程序
    阿里云 k8s 部署 Spring Cloud Alibaba 微服务实践 (一) 部署 Nacos
    C++知识点
    libmkl 学习笔记
    基于tesseract-OCR进行中文识别
    poco编译与运行
    Linux下的I/O复用与epoll详解(转载)
    高并发网络编程之epoll详解(转载)
  • 原文地址:https://www.cnblogs.com/zhaoxinshanwei/p/7743029.html
Copyright © 2011-2022 走看看