zoukankan      html  css  js  c++  java
  • AQS(AbstractQueuedSynchronizer)解析

    AQS概念

    AQS是用于构建锁和同步器的框架。ReentrantLock、信号量、闭锁、FutureTask等等都是基于AQS构建的。其内部封装了对等待线程队列维护,同时开放接口自定义共享资源的获取和释放。

    AQS负责管理同步器类中状态的整数信息,通过getState,setState和compareAndSetState等方法操作状态。

    支持共享和独占两种获取资源的方式。如果是独占获取,需要实现 tryAcquire 、 tryRelease 和 isHeldExclusively 方法。如果是共享获取,实现 tryAcquireShared 和 tryReleaseShared 等方法。

     boolean tryAcquire(int arg) :独占方式。获取资源成功后返回true,否则返回false

     boolean tryRelease(int arg) :独占方式。释放资源成功后返回true,否则返回false

     int tryAcquireShared(int arg) :共享方式。获取资源失败返回负数,获取成功则返回剩余资源数量,0表示无剩余资源

     boolean tryReleaseShared(int arg) :共享方式。释放资源,允许唤醒后续节点返回true,否则返回false

    AQS源码

    1 重要的内部类

     1 static final class Node {
     2     //共享模式
     3     static final Node SHARED = new Node();
     4     //独占模式
     5     static final Node EXCLUSIVE = null;
     6 
     7     //表示当前节点已经取消调度,如超时和中断,是终结状态
     8     static final int CANCELLED =  1;
     9     //表示当前节点的下一个节点等待当前节点唤醒,当节点入队时会将前一个节点置为该状态
    10     static final int SIGNAL    = -1;
    11     //表示当前节点等待在Condition上
    12     static final int CONDITION = -2;
    13     //共享模式下,前继节点会唤醒之后的一个及多个相邻节点
    14     static final int PROPAGATE = -3;
    15 
    16     //当前节点等待资源的状态
    17     volatile int waitStatus;
    18     //前一个节点
    19     volatile Node prev;
    20     //后一个节点
    21     volatile Node next;
    22     //当前节点代表的线程
    23     volatile Thread thread;
    24     Node nextWaiter;
    25 
    26     final boolean isShared() {
    27         return nextWaiter == SHARED;
    28     }
    29 
    30     Node() {}
    31 
    32     Node(Thread thread, Node mode) {
    33         this.nextWaiter = mode;
    34         this.thread = thread;
    35     }
    36 
    37     Node(Thread thread, int waitStatus) {
    38         this.waitStatus = waitStatus;
    39         this.thread = thread;
    40     }
    41 }

      Node节点代表了在队列中等待资源的线程,头节点除外,初始状态为0。

    2 acquire方法

    1 public final void acquire(int arg) {
    2     if (!tryAcquire(arg) &&
    3         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    4         selfInterrupt();
    5 }

      独占访问资源的顶层入口。

      1.tryAcquire方法由实现类自己实现,定义独占访问资源的逻辑。修改state的值代表占用了资源。

      2.addWaiter方法将当前线程加入到等待队列

      3.acquireQueued将不停尝试tryAcquire方法直到成功,期间不响应中断。如果被通知中断则在成功后返回true

      4.如果重试期间被中断过,在获得资源后才会通过selfInterrupt中断自己

      一上来就会尝试获取锁,而不管等待队列中或许还有其他线程在等待,这里体现了非公平性。

      2.1 addWaiter

     1 private Node addWaiter(Node mode) {
     2     Node node = new Node(Thread.currentThread(), mode);
     3     //tail记录了队列的尾节点
     4     Node pred = tail;
     5     if (pred != null) {
     6         //将新节点的前指针指向旧的尾节点
     7         node.prev = pred;
     8         /**
     9          * 这个方法内部:unsafe.compareAndSwapObject(this, tailOffset, pred, node);
    10          * tailOffset是tail属性在对象内存中的偏移量,根据偏移量找到tail存储的对象,比较和pred是否相同,相同则用node替换tail现有的值
    11          */
    12         if (compareAndSetTail(pred, node)) {
    13             //将旧的尾节点后指针指向新节点
    14             pred.next = node;
    15             return node;
    16         }
    17     }
    18     //失败或者首次插入调用enq
    19     enq(node);
    20     return node;
    21 }

       

      2.2 enq

     1 private Node enq(final Node node) {
     2     //CAS+自旋
     3     for (; ; ) {
     4         Node t = tail;
     5         if (t == null) {
     6             /**
     7              * 第一个放入队列
     8              * 这个方法内部:unsafe.compareAndSwapObject(this, headOffset, null, update);
     9              * 将新节点设置到head属性
    10              */
    11             if (compareAndSetHead(new Node()))
    12                 tail = head;
    13         } else {
    14             node.prev = t;
    15             if (compareAndSetTail(t, node)) {
    16                 t.next = node;
    17                 return t;
    18             }
    19         }
    20     }
    21 }

       注意标红处,头节点是一个默认占位节点表示已经占有资源的节点。后续节点占有资源之后,会将自己设置为头节点,并释放内部封装的线程。

      2.3 acquireQueued

     1 final boolean acquireQueued(final Node node, int arg) {
     2     boolean failed = true;
     3     try {
     4         //标记轮询过程中是否遭遇中断
     5         boolean interrupted = false;
     6         for (;;) {
     7             //前指针指向的节点
     8             final Node p = node.predecessor();
     9             //前指针是头节点,则开始尝试获取资源
    10             if (p == head && tryAcquire(arg)) {
    11                 //获取成功,将自己设置为头节点,并将自己的前向指针和封装的线程置为空。头节点必然是正在占有资源的节点
    12                 setHead(node);
    13                 //将前节点后指针置空,方便GC回收
    14                 p.next = null;
    15                 failed = false;
    16                 return interrupted;
    17             }
    18             /**
    19              * shouldParkAfterFailedAcquire:将前置节点的状态设置为SIGNAL,以便自己陷入等待后,前置节点释放资源后会将自己唤醒。设置完毕返回true,这个方法会删除无效节点
    20              * parkAndCheckInterrupt:进入wait状态,被唤醒后返回中断状态
    21              */
    22             if (shouldParkAfterFailedAcquire(p, node) &&
    23                     parkAndCheckInterrupt())
    24                 //到了这里说明是被中断唤醒的,设置中断标识
    25                 interrupted = true;
    26         }
    27     } finally {
    28         if (failed)
    29             cancelAcquire(node);
    30     }
    31 }

      如果在队列中处于第2名,会尝试竞争资源,成功后自己变为头节点。否则通知还在等待的前节点,让他有幸获得资源,使用完释放后通知自己,自己会在行23处等待。

      2.4 shouldParkAfterFailedAcquire

     1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
     2     int ws = pred.waitStatus;
     3     if (ws == Node.SIGNAL)
     4         //如果前置节点的状态已经是SIGNAL(代表了前置节点释放完资源后,会唤醒自己),可以安心进入等待状态
     5         return true;
     6     if (ws > 0) {
     7         //如果前置节点状态>0,说明它已经咸鱼了,这个时候需要跳过这个节点
     8         //下面的循环不停前推,直到找到一个还在努力的少年
     9         do {
    10             node.prev = pred = pred.prev;
    11         } while (pred.waitStatus > 0);
    12         //这里完成新的前节点的相互关联,中间的咸鱼链因为不再被引用,会被GC回收。
    13         pred.next = node;
    14     } else {
    15         //前置节点是正常的节点,会将它的状态设置为SIGNAL,CAS失败会回到外层继续循环
    16         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    17     }
    18     return false;
    19 }

      2.5 parkAndCheckInterrupt

    1 private final boolean parkAndCheckInterrupt() {
    2     //进入wait状态,跟竞争内置锁失败进入阻塞状态不同
    3     LockSupport.park(this);
    4     //被唤醒了,可能是unPark或者被中断,返回自己的中断状态
    5     return Thread.interrupted();
    6 }

    3 release方法

     1 public final boolean release(int arg) {
     2     //释放资源
     3     if (tryRelease(arg)) {
     4         Node h = head;
     5         if (h != null && h.waitStatus != 0)
     6             //唤醒后续的节点
     7             unparkSuccessor(h);
     8         return true;
     9     }
    10     return false;
    11 }

      1.tryRelease释放资源,即将state的值还原

      2.唤醒队列中正在等待的节点,是最靠前的节点,也就是等待最久的节点,这部分体现了公平性。

      3.1 unparkSuccessor

     1 private void unparkSuccessor(Node node) {
     2     int ws = node.waitStatus;
     3     if (ws < 0)
     4         //将节点的等待状态设置为初始状态
     5         compareAndSetWaitStatus(node, ws, 0);
     6 
     7     Node s = node.next;
     8     if (s == null || s.waitStatus > 0) {
     9         //状态大于0的节点是无效节点,它已经放弃竞争了
    10         s = null;
    11         //从尾节点开始反向遍历,寻找最靠前的等待节点。为什么不从前向后?
    12         for (Node t = tail; t != null && t != node; t = t.prev)
    13             //如果当前节点是有效节点,记录下来
    14             if (t.waitStatus <= 0)
    15                 s = t;
    16     }
    17     if (s != null)
    18         //使用unpark唤醒
    19         LockSupport.unpark(s.thread);
    20 }

      这里唤醒的是离head最近的有效节点,虽然其前置节点不一定就是头节点,但是经过2.4的shouldParkAfterFailedAcquire方法删除无效节点之后,其前置节点一定是头节点。

      回答标红的位置,直观上讲肯定是从前向后更快,但是看新节点进入等待队列的逻辑,2.1和2.2,逻辑是差不多的,都是先将新节点的前向指针关联到尾节点,然后CAS尝试设置将尾节点设置成新节点,再将旧的尾节点后向指针设置到新节点,关键在最后一步,在并发场景下,发生时间片切换,当前线程被挂起了。从前向后遍历的话,到了这里就无法找到下一个节点,主要还是两个节点设置前后关联这个操作不是原子的。

    4 acquireShared方法

    1 public final void acquireShared(int arg) {
    2     if (tryAcquireShared(arg) < 0)
    3         //尝试获取资源,如果失败则进入等待队列
    4         doAcquireShared(arg);
    5 }

      4.1 doAcquireShared

     1 private void doAcquireShared(int arg) {
     2     //在队列尾添加新节点
     3     final Node node = addWaiter(Node.SHARED);
     4     boolean failed = true;
     5     try {
     6         boolean interrupted = false;
     7         //循环尝试获取资源
     8         for (;;) {
     9             final Node p = node.predecessor();
    10             //正处在第二节点
    11             if (p == head) {
    12                 //尝试获取资源
    13                 int r = tryAcquireShared(arg);
    14                 //获取成功
    15                 if (r >= 0) {
    16                     //将自己设置为头结点,并且可能的情况下唤醒后续节点
    17                     setHeadAndPropagate(node, r);
    18                     p.next = null;
    19                     if (interrupted)
    20                         //如果等待过程中被中断,这里恢复中断
    21                         selfInterrupt();
    22                     failed = false;
    23                     return;
    24                 }
    25             }
    26             //等待获取资源
    27             if (shouldParkAfterFailedAcquire(p, node) &&
    28                     parkAndCheckInterrupt())
    29                 interrupted = true;
    30         }
    31     } finally {
    32         if (failed)
    33             cancelAcquire(node);
    34     }
    35 }

      注意代码11行的这个判断条件,必须是第二个节点才会尝试获取资源。获取资源有两种途径,一个是tryAcquireShared直接获取,不需要设置头节点。一种是doAcquireShared,在等待队列中被唤醒并获取资源,这种情况需要设置自己是头节点。

      这里有一个问题,同时获取资源的线程有多个,那么在等待队列中被唤醒的节点也会有多个,一种特殊的情况,头节点尚未释放资源,第二个节点被其余线程唤醒了,那么走到13行,将自己设置成新的头节点,并将自己的前置指针置空。这种情况是否会出问题?不会有问题,之前的头节点代表的线程使用完毕后依然通过releaseShared释放资源

      4.2 setHeadAndPropagate

     1 private void setHeadAndPropagate(Node node, int propagate) {
     2     Node h = head;
     3     //将当前节点设置为头结点,获取了资源的节点都是头节点
     4     setHead(node);
     5     //如果还有剩余资源 后面这部分判断条件没有看懂,请参考:https://blog.csdn.net/anlian523/article/details/106319294/
     6     if (propagate > 0 || h == null
     7             || h.waitStatus < 0
     8             || (h = head) == null 
     9             || h.waitStatus < 0) {
    10         Node s = node.next;
    11         if (s == null || s.isShared())
    12             //
    13             doReleaseShared();
    14     }
    15 }

      上面行7是判断的原头节点,头节点的等待状态可能小于0,需要看下面5.1的解释,也就是同时有多个线程释放了资源并执行了 doReleaseShared 唤醒队列中等待的节点。行8又重新赋值了一下,因为这中间可能其他节点将头节点替换了,需要再次判断下。

      跟独占方式不同的是,自己获得资源之后,如果发现还有资源剩余,是会继续唤醒后续节点的,不用等到释放资源的时候唤醒。

      获取资源的方式有两处,一处是 acquireShared 每个线程第一次会尝试获取,第二处是 doAcquireShared 在等待队列中被唤醒后尝试获取。

    5 releaseShared方法

    1 public final boolean releaseShared(int arg) {
    2     //尝试释放资源
    3     if (tryReleaseShared(arg)) {
    4         //唤醒其他节点
    5         doReleaseShared();
    6         return true;
    7     }
    8     return false;
    9 }

      5.1 doReleaseShared

     1 private void doReleaseShared() {
     2 
     3     for (; ; ) {
     4         Node h = head;
     5         if (h != null && h != tail) {
     6             int ws = h.waitStatus;
     7             //有节点让自己释放时通知
     8             if (ws == Node.SIGNAL) {
     9                 /**
    10                  * 先将头节点的状态还原。
    11                  * 当访问头节点时,头节点状态为-1,代表头节点正占用资源,后面有节点正等待唤醒
    12                  * 若头节点状态为0,代表头节点已经释放了资源,正准备唤醒节点 或者 已经唤醒了节点,但后续节点尚未将自己设置为新的头节点
    13                  */
    14                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    15                     continue;
    16                 //唤醒下一个最靠前的等待节点
    17                 unparkSuccessor(h);
    18             }
    19             //到这里,说明有两条或以上的线程准备释放资源
    20             //1. 头节点已经释放过资源了,此时它的状态是0,并且唤醒的那个节点还未替换头节点
    21             //2. 有其他线程通过releaseShared方法释放了资源,尝试唤醒其他节点
    22             //出现这种情况会将头节点状态设置为-3
    23             else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    24                 continue;
    25         }
    26         //如果头节点发生变化,说明已经成功唤醒了,退出循环
    27         if (h == head)
    28             break;
    29     }
    30 }

      在并发场景下,doReleaseShared可能被同时执行多次,第一次执行的线程,将head的状态由-1设置为0,并唤醒新的节点,在新节点尚未替换head时,第二次执行的线程,又将head的状态从0设置为-3,注意只会设置两次,第三次的话会进入自旋等待!这里说明一下共享模式下头节点状态的含义:

      1. 头节点状态为-1,说明还没有开始唤醒其他节点,可能是没有资源剩余了,也可能是代码还没执行到下面

      2. 头节点状态为0,正在唤醒 或 已经唤醒下一个节点但新节点还没替换头节点

      3. 头节点状态为-3,在2的基础上,又有一个线程释放了资源,此时需要额外唤醒一个节点,这个唤醒操作是由2产生的新节点来执行的,请看4.2#6代码。新节点替换了头节点之后,发现原头节点的状态小于0,即使此时可能 propagate == 0,自己也应该继续唤醒另一个节点了。  

      doReleaseShared的入口有两处,一处是 releaseShared 每个线程用完之后释放并唤醒其余节点,第二处是 setHeadAndPropagate 当有节点被唤醒并获得资源后,若资源还有余,会唤醒其余节点。

        

    关于公平锁和非公平锁

      AQS获取资源有两种方式:一种是执行 acquire / acquireShared 方法,不论等待队列是否有线程,都会通过 tryAcquire / tryAcquireShared 方法直接尝试获取。第二种是在等待队列中,按照先入先出的顺序唤醒。

      是否是公平锁交给实现类规定,AQS只定义了等待队列中获取资源是公平的,如果实现类在 tryAcquire / tryAcquireShared 中规定必须在等待队列为空的情况下才能获取资源,那么就是公平锁,否则是非公平锁。

    关于LockSupport

      https://blog.csdn.net/tangtong1/article/details/102829724?utm_medium=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-1.control&dist_request_id=&depth_1-utm_source=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-1.control

      

    参考:https://www.cnblogs.com/waterystone/p/4920797.html

    人生就像蒲公英,看似自由,其实身不由己。
  • 相关阅读:
    性能测试1 概述
    CURRENMONTH TAG in Automation Framework
    [luoguP1076] 寻宝(模拟)
    [luoguP1220] 关路灯(DP)
    [luoguP1373] 小a和uim之大逃离(DP)
    [HDU2089] 不要62 (数位DP)
    [luoguP1095] 守望者的逃离(DP)
    [luoguP1156] 垃圾陷阱(DP)
    [luoguP1736] 创意吃鱼法(DP)
    [luoguP1417] 烹调方案(背包DP)
  • 原文地址:https://www.cnblogs.com/walker993/p/14619231.html
Copyright © 2011-2022 走看看