zoukankan      html  css  js  c++  java
  • Java并发编程3-抽象同步队列AQS详解

    AQS是AtractQueuedSynchronizer(队列同步器)的简写,是用来构建锁或其他同步组件的基础框架。主要通过一个int类型的state来表示同步状态,内部有一个FIFO的同步队列来实现。

    AQS的使用方式是通过子类继承来实现,子类继承同步器并且实现抽象方法来完成同步,实现过程中涉及到同步状态的方法主要有:

    getState():获取同步状态

    setState(int newState):设置同步状态

    compareAndSetState(int expect,int update):通过CAS操作来设置同步状态,保证操作的原子性

    一、AQS的使用

    AQS需要子类继承并实现抽象方法来实现,而需要重写的方法如下:

     protected boolean tryAcquire(int arg) : 独占式获取同步状态,试着获取,成功返回true,反之为false

     protected boolean tryRelease(int arg) :独占式释放同步状态,等待中的其他线程此时将有机会获取到同步状态;

     protected int tryAcquireShared(int arg) :共享式获取同步状态,返回值大于等于0,代表获取成功;反之获取失败;

     protected boolean tryReleaseShared(int arg) :共享式释放同步状态,成功为true,失败为false

     protected boolean isHeldExclusively() : 是否在独占模式下被线程占用。

    另外AQS还提供了大量的模板方法供子类使用,主要分成三类:独占式获取和释放同步状态、共享式获取和释放同步状态、查询同步队列中的等待线程信息

    acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;

    acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;

    tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;

    acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;

    acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;

    tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;

    release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;

    releaseShared(int arg):共享式释放同步状态;

    Collection<Thread> getQueuedThreads():获取等待在同步队列中的线程集合

    二、AQS的实现原理

    AQS内部依赖一个FIFO双向队列来完成同步状态的管理,当前线程获取同步状态失败时,会将当前线程及等待信息构成成一个Node节点加入到同步队列中,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

    同步队列中的节点主要属性有:获取同步状态失败的线程引用、等待状态、前驱节点、后继节点等,同步器拥有首节点和尾节点,没有成功获取同步状态的线程会成为节点并加入队列的尾部。如下图示:

    同步队列器中包含一个指向头节点的节点引用,一个指向尾节点的节点引用。当一个线程成功获取到了锁,则其他线程无法获取锁就会被构造成节点,加入到队列的尾部,由于尾节点只有一个,所以这个加入尾部的操作必须是线程安全的。

    同步对列器采用的CAS操作来对尾节点进行设置的。compareAndSetTail(Node expect,Node update).第一个参数是之前的尾节点,第二个参数是当前节点。

    1.队列的头节点就是当前线程获取到同步锁的节点,当头节点释放锁时,会唤醒后继节点,后继节点会尝试获取同步锁,获取成功就将自己设置成首节点。

    2.设置首节点是通过获取同步锁成功的线程来完成的,由于只有一个线程能够获取到同步锁,所以设置头节点的方法不需要采用CAS操作,只需要将首节点设置为旧的的首节点的next节点,并将next引用断开即可

     

    三、锁的获取和释放

    这里我们说下Node。Node结点是对每一个等待获取资源的线程的封装,其包含了需要同步的线程本身及其等待状态,如是否被阻塞、是否等待唤醒、是否已经被取消等。变量waitStatus则表示当前Node结点的等待状态,共有5种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、0。

    • CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。

    • SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。

    • CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。

    • PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。

    • 0:新结点入队时的默认状态。

    注意,负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常

    3.1、独占式同步状态的获取和释放源码解析

    独占式同步状态的获取是通过方法acquire(int arg)方法来获取的,源码如下:

    1     public final void acquire(int arg) {
    2         if (!tryAcquire(arg) &&
    3             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    4             selfInterrupt();
    5     }

    首先是执行子类实现的tryAcquire方法来独占式获取锁,如果获取成功则方法结束;如果获取失败则先通过addWaiter方法来创建节点并且加入到队列的尾部,源码如下:

     1 /**创建Node节点,并加入同步队列尾部*/
     2     private Node addWaiter(Node mode) {
     3         //1.给当前线程创建Node节点
     4         Node node = new Node(Thread.currentThread(), mode);
     5         // Try the fast path of enq; backup to full enq on failure
     6         //2.获取当前尾部节点,也就是新加入节点的前驱节点
     7         Node pred = tail;
     8         if (pred != null) {
     9             //3.当前驱节点不为空时,则将新节点的prev引用指向前驱节点
    10             node.prev = pred;
    11             //4.由于尾部节点可能存在并发情况,获取需要CAS操作来设置
    12             if (compareAndSetTail(pred, node)) {
    13                 //5.设置成功之后将旧的尾节点的next指向新节点,并返回
    14                 pred.next = node;
    15                 return node;
    16             }
    17         }
    18         //5.如果CAS失败,则进入死循环不停自旋来CAS设置尾部节点,直到成功为止
    19         enq(node);
    20         return node;
    21     }
     1  private Node enq(final Node node) {
     2         for (;;) {
     3             Node t = tail;
     4             if (t == null) { // Must initialize
     5                 if (compareAndSetHead(new Node()))
     6                     tail = head;
     7             } else {
     8                 node.prev = t;
     9                 if (compareAndSetTail(t, node)) {
    10                     t.next = node;
    11                     return t;
    12                 }
    13             }
    14         }
    15     }

     创建了节点并加入尾部之后通过acquiredQueued方法使得该节点不停自旋获取同步状态,获取不到则阻塞线程,而被阻塞的线程的唤醒需要依靠前驱节点的出队或阻塞线程被中断,源码如下:

     1  /**节点进入队列之后通过自旋来获取同步状态,直到获取同步状态成功*/
     2     final boolean acquireQueued(final Node node, int arg) {
     3         //失败标识
     4         boolean failed = true;
     5         try {
     6             //中断标识
     7             boolean interrupted = false;
     8             for (;;) {
     9                 //获取当前节点的前驱节点
    10                 final Node p = node.predecessor();
    11                 /**
    12                  * 当当前节点当前驱节点为head节点时,才尝试调用子类实现的tryAcquire方法来获取同步状态,否则直接返回
    13                  * 因为队列时FIFO的,当头节点释放同步状态后,只有后继节点才可获取同步状态,其他节点暂时无权获取
    14                 */
    15                 if (p == head && tryAcquire(arg)) {
    16                     setHead(node);
    17                     p.next = null; // help GC
    18                     failed = false;
    19                     return interrupted;
    20                 }
    21                 if (shouldParkAfterFailedAcquire(p, node) &&
    22                         parkAndCheckInterrupt())
    23                     interrupted = true;
    24             }
    25         } finally {
    26             if (failed)
    27                 cancelAcquire(node);
    28         }
    29     }

     第15行的判断意思是只有当节点前驱节点是头节点时才能够尝试获取同步锁。前驱节点为头节点且能够获取同步状态的判断条件和线程进入等待状态是获取同步状态的自旋过程。当同步状态获取成功之后,当前线程从acquire方法返回,也就是当前线程获取到了锁。

    当线程获取到了锁并执行了业务逻辑之后,会需要释放锁,并且唤醒后继节点。调用同步器的release方法可以释放同步锁,源码如下:

     1 /**独占式释放同步状态*/
     2     public final boolean release(int arg) {
     3         //调用子类释放同步状态方法
     4         if (tryRelease(arg)) {
     5             //释放成功获取当前head节点
     6             Node h = head;
     7             //当当前节点的状态不为0时,则尝试唤醒后继节点
     8             if (h != null && h.waitStatus != 0)
     9                 unparkSuccessor(h);
    10             return true;
    11         }
    12         return false;
    13     }

     3.2、共享式同步状态的获取和释放源码解析

    调用AQS的acquireShared方法获取同步状态,直接调用子类实现的tryAcquireShared方法,如果返回值大于等于0则表示获取同步状态成功直接返回;如果小于0则表示获取同步状态失败,则调用doAcquireShared方法

    1 /**共享式获取同步状态*/
    2     public final void acquireShared(int arg) {
    3         //调用子类获取同步状态方法,小于0表示获取同步状态失败
    4         if (tryAcquireShared(arg) < 0)
    5             doAcquireShared(arg);//失败之后调用AQS的doAcquireShared(int arg)方法
    6     }

     doAcquireShared方法源码如下:

     1 private void doAcquireShared(int arg) {
     2         //创建共享模式的节点调用addWaiter方法添加到同步队列的尾部
     3         final Node node = addWaiter(Node.SHARED);
     4         boolean failed = true;//失败标识
     5         try {
     6             boolean interrupted = false;//中断标识
     7             //死循环尝试获取同步状态
     8             for (;;) {
     9                 //获取新节点的前驱节点
    10                 final Node p = node.predecessor();
    11                 //当前驱节点为head节点时继续尝试执行tryAcquireShared方法
    12                 if (p == head) {
    13                     int r = tryAcquireShared(arg);
    14                     //当返回值大于等于0时表示获取同步状态成功,则
    15                     if (r >= 0) {
    16                         //设置head节点
    17                         setHeadAndPropagate(node, r);
    18                         p.next = null; // help GC
    19                         if (interrupted)
    20                             selfInterrupt();
    21                         failed = false;
    22                         return;
    23                     }
    24                 }
    25                 if (shouldParkAfterFailedAcquire(p, node) &&
    26                         parkAndCheckInterrupt())
    27                     interrupted = true;
    28             }
    29         } finally {
    30             if (failed)
    31                 cancelAcquire(node);
    32         }
    33     }

    可以看出共享式和独占式的获取同步状态的流程,都是先调用子类的获取同步状态的方法,如果成功则直接返回;如果失败则通过死循环来不停尝试当前驱节点为首节点时开始尝试获取同步状态,直到成功。

    共享式释放同步状态的源码如下示:

    1 /**共享式释放同步状态*/
    2     public final boolean releaseShared(int arg) {
    3         //当调用子类当tryReleaseShared方法成功时调用AQS的doReleaseShared方法;否则直接返回false
    4         if (tryReleaseShared(arg)) {
    5             doReleaseShared();
    6             return true;
    7         }
    8         return false;
    9     }
     1 private void doReleaseShared() {
     2         for (;;) {
     3             Node h = head;//获取当前首节点
     4             if (h != null && h != tail) {
     5                 int ws = h.waitStatus;
     6                 //判断首节点状态释放为signal,如果是则需要唤醒后继节点
     7                 if (ws == Node.SIGNAL) {
     8                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
     9                         continue;            // loop to recheck cases
    10                     unparkSuccessor(h);
    11                 }
    12                 else if (ws == 0 &&
    13                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    14                     continue;                // loop on failed CAS
    15             }
    16             if (h == head)                   // loop if head changed
    17                 break;
    18         }
    19     }

     共享式锁和独占式锁的主要区别在于同一时刻是否允许多个线程同时获取到锁。比如一个文件,写操作同一时间只允许一个线程在写,可以使用独占式锁,但是读操作是可以允许多个线程同时进行读操作的,则可以使用共享式锁。

    四、AQS等待通知机制

  • 相关阅读:
    苹果一体机发射Wi-Fi
    iphone 屏蔽系统自动更新,消除设置上的小红点
    data parameter is nil 异常处理
    copy与mutableCopy的区别总结
    java axis2 webservice
    mysql 远程 ip访问
    mysql 存储过程小问题
    mysql游标错误
    is not writable or has an invalid setter method错误的解决
    Struts2中关于"There is no Action mapped for namespace / and action name"的总结
  • 原文地址:https://www.cnblogs.com/jackion5/p/10663945.html
Copyright © 2011-2022 走看看