zoukankan      html  css  js  c++  java
  • Java并发之AQS同步器学习

    AQS队列同步器学习

    在学习并发的时候,我们一定会接触到 JUC 当中的工具,JUC 当中为我们准备了很多在并发中需要用到的东西,但是它们都是基于AQS(AbstractQueuedSynchronizer)队列同步器来实现的,也就是我们如果能够去梳理清楚AQS当中的知识点,对我们以后了解其他并发功能键有很大的帮助。

    CLH队列

    队列同步器(AbstractQueuedSynchronizer),是用来构建锁或者其他同步组件的基础框架,它使用了一个int变量来表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,并发包的作者Doug Lea期望她能够成为实现大部分同步需求的基础。

    而这个内置的队列就是CLH双向队列,当前线程如果获取锁失败的时候,会将当前线程、状态等信息封装成一个Node节点添加到CLH队列当中去--也就是一个Node节点其实就是一个线程,而当有线程释放时,会唤醒CLH队列并取其首节点进行再次获取:

      static final class Node {
              /** Marker to indicate a node is waiting in shared mode */
           //共享模式节点
              static final Node SHARED = new Node();
    
              /** Marker to indicate a node is waiting in exclusive mode */
           //独占模式节点
              static final Node EXCLUSIVE = null;
      ​
              /** waitStatus value to indicate thread has cancelled */
           //处于取消的等待状态
           /* 因为超时或中断就会处于该状态,并且处于该状态的节点不会转变为其他状态
              处于该状态的节点不会再次被阻塞*/
              static final int CANCELLED =  1;
    
              /** waitStatus value to indicate successor's thread needs unparking */
           //等待状态
           /*  表示后继节点是否需要被唤醒 */
              static final int SIGNAL    = -1;
    
              /** waitStatus value to indicate thread is waiting on condition */
           /* 该节点处于条件队列当中,该节点不会用作同步队列直到设置状态0用来传输时才会移到同步队列当中,并且加入对同步状态的获取 */
              static final int CONDITION = -2;
              /**
               * waitStatus value to indicate the next acquireShared should
               * unconditionally propagate
               */
           /* 表示下一次共享式同步状态获取将会无条件地传播下去 */
              static final int PROPAGATE = -3;
      ​
           //线程等待状态
              volatile int waitStatus;
      ​
             //当前节点的前置节点
              volatile Node prev;
      ​
              //当前节点的后置节点
              volatile Node next;
      ​
              //节点所在的线程
              volatile Thread thread;
      ​
             //条件队列当中的下一个等待节点
              Node nextWaiter;
      ​
              /**
               * 判断节点是否共享模式
               */
              final boolean isShared() {
                  return nextWaiter == SHARED;
              }
      ​
              /**
               * 获取前置节点
               */
              final Node predecessor() throws NullPointerException {
                  Node p = prev;  //获取前置节点
                  if (p == null)  //为空则抛空指针异常
                      throw new NullPointerException();
                  else
                      return p;
              }
      ​
              Node() {    // Used to establish initial head or SHARED marker
              }
      ​
              Node(Thread thread, Node mode) {     // Used by addWaiter
                  this.nextWaiter = mode;
                  this.thread = thread;
              }
      ​
              Node(Thread thread, int waitStatus) { // Used by Condition
                  this.waitStatus = waitStatus;
                  this.thread = thread;
              }
          }
    

    通过上面对Node节点的源代码进解说,我想对于之后的内容会有很大的帮助的,因为后面的方法当中会有特别多的状态判断。

    CLH队列.jpg

    当我们重写同步器的时候,需要使用同步器的3个方法来访问和修改同步的状态。分别是:

    • getState():获取当前同步状态

    • setState(int newState):设置当前同步状态

    • compareAndSetState(int expect, int update):通过CAS来设置当前状态,该方法可以保证设置状态操作的原子性

    入列

    我们在上面既然已经讲到了AQS当中维护着的是CLH双向队列,并且是FIFO,既然是队列,那肯定就存在着入列和出列的操作,我们来先从入列看起:

    acquire(int arg)方法

    该方法是独占模式下线程获取同步状态的入口,如果当前线程获取同步状态成功,则由该方法返回,如获取不成功将会进入CLH队列当中进行等待。

    在该方法当中会调用重写的tryAcquire(int arg)方法。

      public final void acquire(int arg) {
          if (!tryAcquire(arg) &&
              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
              selfInterrupt();
      }
    
    • tryAcquire(int arg)

      很多人刚看到这个方法的时候,会不会有种一脸懵逼的感觉,方法体居然只是返回一个异常而已,说好的业务逻辑代码呢?

      回到我们一开始说的,AQS实际上只是作为一个同步组件的基础框架,具体的实现要交由自定义的同步器去自己实现,所以该方法当中只有一句异常。

    此方法由用户自定义的同步器去实现,尝试获取独占资源,如果成功则返回true,如果失败则返回false

          protected boolean tryAcquire(int arg) {
              throw new UnsupportedOperationException();
          }
    
    • addWaiter(Node mode)

      将当前线程添加到CLH队列的队尾,并且指定独占模式。

      Node有两种模式,分别是独占模式和共享模式,也就是Node.EXCLUSIVENode.SHARED

        private Node addWaiter(Node mode) {
                  //将当前线程以指定模式来创建Node节点
                  Node node = new Node(Thread.currentThread(), mode);
                  // Try the fast path of enq; backup to full enq on failure
                  Node pred = tail;  //获取队列尾部给变量pred
                  if (pred != null) {  //若队尾不为空
                      node.prev = pred;  //将当前节点的前置节点指向原来的tail
                      if (compareAndSetTail(pred, node)) {  //通过CAS将tail设置为Node
                          /*
                          *如果设置成功,表示此操作没有别的线程执行成功
          */ 
                          pred.next = node;  //将原来tail节点的后置节点指向node节点
                          return node;  //返回node节点
                      }
                  }
                  enq(node);
                  return node;
              }
    
    • enq(Node )

      该方法是将节点插入到CLH队列的尾部,并且通过自旋(死循环)来保证Node节点的正确添加

          private Node enq(final Node node) {
                  for (;;) {  //自旋--死循环添加节点
                      Node t = tail;  //获取原来tial节点至t变量
                      if (t == null) { // Must initialize  队列为空
                          if (compareAndSetHead(new Node()))  //设置一个空节点作为head节点
                              tail = head;  //head和tail是同一个节点
                      } else {  //队列不为空的正常情况
                          node.prev = t;  //设置当前节点的前置节点为原tail节点
                          if (compareAndSetTail(t, node)) {  //通过CAS设置当前节点为tail节点
                              t.next = node;  //原tail节点后置节点是当前节点
                              return t;  //返回原tail节点结束循环
                          }
                      }
                  }
              }
    
    • acquireQueued(final Node node, int arg)

      来到这个方法,证明已经通过tryAcquire获取同步状态失败了,并且调用了addWaiter方法将当前线程添加至CLH队列的尾部了,剩下的就是在等待状态当中等其他线程来唤醒自己去获取同步状态了。

      对于已经处于CLH队列当中的线程,是以独占并且不可中断的模式去获取同步状态。

          final boolean acquireQueued(final Node node, int arg) {
              boolean failed = true;  //成功获取资源的状态
              try {
                  boolean interrupted = false;  //是否被中断的状态
                  for (;;) { //自旋--死循环
                      final Node p = node.predecessor();  //获取当前节点的前置节点
                      //如果前置节点是首节点,并且已经成功获取同步状态
          if (p == head && tryAcquire(arg)) { 
                          setHead(node);  //设置当前节点为head节点,并且将当前node节点的前置节点置null
                          p.next = null; //设置原head节点的后置节点为null,方便GC回收原来的head节点
          failed = false; 
                          return interrupted; //返回是否被中断
                      }
                      //获取同步状态失败后,判断是否需要阻塞或中断
                      if (shouldParkAfterFailedAcquire(p, node) &&
                          parkAndCheckInterrupt())
                          interrupted = true;  //如果被中断过,设置标记为true
                  }
              } finally {
                  if (failed)
                      cancelAcquire(node);  //取消当前节点继续获取同步状态的尝试
              }
          }
    
    • shouldParkAfterFailedAcquire(Node pred, Node node)

      对于获取状态失败的节点,检查并更新其状态,如果线程阻塞就返回true,这是所有获取状态循环的信号控制方法。

      要求pred == node.prev

    实际上除非锁获取成功,要不然都会被阻塞起来

          private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
              int ws = pred.waitStatus;  //获取前驱节点的状态
              //状态为-1,表示后继节点已经处于waiting等待状态,等该节点释放或取消,就会通知后继节点
          if (ws == Node.SIGNAL) 
                  return true;
              //如果状态大于0--取消状态,就跳过该节点循环往前找,找到一个非cancel状态的节点
              if (ws > 0) {
                  do {
                      node.prev = pred = pred.prev;
                  } while (pred.waitStatus > 0);
                  //赋值pred的后继节点为node节点
                  pred.next = node;
              } else {  //如果状态小于0
                  //必须是PROPAGATE或者0--表示无状态,当是-2的时候,在condition queue队列当中
                  //通过CAS设置pred节点状态为signal
                  compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
              }
              return false;
          }
    
    • parkAndCheckInterrupt()

      还有当该节点的前驱节点状态为signal时,才可以将该节点所在线程pack起来,否则无法将线程pack。

          private final boolean parkAndCheckInterrupt() {
              //通过LockSupport工具阻塞当前线程
              LockSupport.park(this);
              return Thread.interrupted();  //清除中断标识,返回清除前的标识
          }
    
    • cancelAcquire(Node node)

      该方法是取消节点所在线程对同步状态的获取,那说白了就是将节点的状态改为cancelled.

          private void cancelAcquire(Node node) {
              // Ignore if node doesn't exist
              if (node == null)  //节点为空则返回
                  return;
          ​
              node.thread = null;  //节点所在线程设为null
          ​
              // Skip cancelled predecessors
              //获取node节点的前驱节点
              Node pred = node.prev;
              //循环获取前驱节点的状态,找到第一个状态不为cancelled的前驱节点
              while (pred.waitStatus > 0)
                  node.prev = pred = pred.prev;
          ​
              // predNext is the apparent node to unsplice. CASes below will
              // fail if not, in which case, we lost race vs another cancel
              // or signal, so no further action is necessary.
              //获取pred节点的后继节点
              Node predNext = pred.next;
    
              //设置node节点状态为CANCELLED
              node.waitStatus = Node.CANCELLED;
          ​
              //如果node节点是tail节点,通过CAS设置tail节点为pred
              if (node == tail && compareAndSetTail(node, pred)) {
                  //通过CAS将pred节点的next节点设置null
                  compareAndSetNext(pred, predNext, null);
              } else {  //如果不是tail节点
          ​
                  int ws;  //初始化node节点状态变量
    
                  /*
                  *如果pred不是head节点,并且状态是SIGNAL或者状态小于0并且设置pred
                  *状态为SIGNAL成功,。并且pred所封装的线程不为空
                  */
                  if (pred != head &&
                      ((ws = pred.waitStatus) == Node.SIGNAL ||
                       (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                      pred.thread != null) {
                      //获取node节点的后继节点
                      Node next = node.next;
                      //如果后继节点部位null并且状态不为cancelled
                      if (next != null && next.waitStatus <= 0)
                          //设置pred的后继节点为next,也就是将pred的后继节点不再是node
                          compareAndSetNext(pred, predNext, next);
                  } else {
                      unparkSuccessor(node);  //释放后继节点
                  }
          ​
                  node.next = node; // help GC
              }
          }
    
    • unparkSuccessor(Node node)
          private void unparkSuccessor(Node node) {
              //获取node节点的状态
              int ws = node.waitStatus;
              if (ws < 0)  //如果状态小于0,为SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3
                  //通过CAS将node节点状态设置为0
                  compareAndSetWaitStatus(node, ws, 0);
          ​
          //获取node节点的后继节点 
              Node s = node.next;
              //如果后继节点为空或者状态大于0--cancelled
              if (s == null || s.waitStatus > 0) {
                  //后继节点置为空
                  s = null;
                  //从tail节点开始往前遍历
                  for (Node t = tail; t != null && t != node; t = t.prev)
                      if (t.waitStatus <= 0)  //判断状态小于等于0,就是为了找到状态不为cancelled的节点
                          s = t;  //找到最前的状态小于等于0的节点
              }
              if (s != null)  //如果由以上方法找到的节点不为空
                  //通过LockSupport工具释放s节点封装的线程
                  LockSupport.unpark(s.thread);
          }
    

    经过了以上的分析,我想我们对入列的代码也有了一个比较好的了解吧,那我们也可以尝试画一下入列的流程图。
    AQS入列流程.jpg

    出列

    出列的操作相对于入列来说就真的是简单的多了,毕竟入列的时候需要考虑的因素太多,要考虑前驱和后继节点,还要考虑节点的状态等等一堆因素,而出列就是指CLH队列的头部节点,所以麻烦的因素就会少了很多。

    release(int arg)

    我们废话都不多说了,直接上代码吧。

    这也是以独占模式来释放对象

      public final boolean release(int arg) {
          if (tryRelease(arg)) {
              Node h = head;  //获取head节点
              //如果head节点不为空并且状态不为0,也就是初始节点
      if (h != null && h.waitStatus != 0) 
                  unparkSuccessor(h);  //唤醒后继节点
              return true;
          }
          return false;
      }
    
    • tryRelease(int arg)

      这个方法与入列的tryAcquire一样,是只有一个异常的,也就是证明这个方法也是由自定义的同步组件自己去实现,在AQS同步器当中只是定义一个方法而已。

          protected boolean tryRelease(int arg) {
              throw new UnsupportedOperationException();
          }
    
    • unparkSuccessor(Node node)

      这个方法实际在入列的时候已经讲过了,我直接搬上面的代码解释下来。

          private void unparkSuccessor(Node node) {
              //获取node节点的状态
              int ws = node.waitStatus;
              if (ws < 0)  //如果状态小于0,为SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3
                  //通过CAS将node节点状态设置为0
                  compareAndSetWaitStatus(node, ws, 0);
          ​
          //获取node节点的后继节点 
              Node s = node.next;
              //如果后继节点为空或者状态大于0--cancelled
              if (s == null || s.waitStatus > 0) {
                  //后继节点置为空
                  s = null;
                  //从tail节点开始往前遍历
                  for (Node t = tail; t != null && t != node; t = t.prev)
                      if (t.waitStatus <= 0)  //判断状态小于等于0,就是为了找到状态不为cancelled的节点
                          s = t;  //找到最前的状态小于等于0的节点
              }
              if (s != null)  //如果由以上方法找到的节点不为空
                  //通过LockSupport工具释放s节点封装的线程
                  LockSupport.unpark(s.thread);
          }
    

    这上面就是出列也就是释放的代码了,其实看起来不是很难。

    小结

    花了整整3天左右的时间去看了一下AQS的源码,会去看也纯属是想要把自己的并发方面的知识能够丰富起来,但是这次看源码也还是不太顺利,因为很多代码或者方法,单独分开来看的时候或许能理解,感觉方法的作用也的确是那么回事,但是当一整个流程串起来的时候也还是不太明白这样做的具体作用,以及整个的执行流程。更加没办法理解那些自旋里的代码,每一次执行会出现怎样的结果,对CLH队列的影响。

    不过,自己也是有收获的,至少相较于一开始来说,自己对AQS有了一点皮毛的理解,不至于以后闻起来完完全全是一问三不知的状态。

    同时也希望我这篇文章能够对想要了解AQS的程序猿能够起一点作用,以后自己也还是将自己的一些学习心得或者资料共享出来。

    参考资料

    方腾飞:《Java并发编程的艺术》

    如需转载,请务必注明出处,毕竟一块块搬砖也不是容易的事情。

  • 相关阅读:
    切片
    docker基础
    第18课 脚本练习二(找出文件下最大文件)
    第17课 脚本练习一(添加新用户)
    第十四课 脚本编程(重定向+变量)
    第十课 新建共享文件夹
    第九课 Linux文本处理
    第八课 正则表达式
    第七课 VI全屏文本编辑器
    第六课 Linux文件系统文本操作命令
  • 原文地址:https://www.cnblogs.com/KingJack/p/9521734.html
Copyright © 2011-2022 走看看