zoukankan      html  css  js  c++  java
  • 【Java多线程】队列同步器AQS(十一)

    一、Lock接口

      Lock实现和synchronized关键字类似的同步功能,只是Lock在使用时需要显式地获取和释放锁,synchronized实现的隐式的获取所和释放锁。

      虽然Lock它缺少了(通过synchronized块或者方法所提供的)隐式获取释放锁的便捷性,但是却拥有了锁获取与释放的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性,何以见得,举个简单的实例:

      假设我们需要先获得锁A,然后在获取锁B,当锁B获得后,释放锁A同时获取锁C,当锁C获得后,在释放B同时获得锁D。。。是不是已经被绕晕了,很显然如果使用synchronized实现的话,不但其过程复杂难以控制,并且稍微出错可以说是一种灾难性的后果。

    1.1 Lock接口特性

      

    1.2 Lock接口简单实用

    1 Lock lock = new ReentrantLock();
    2 try {
    3     lock.lock();
    4 } finally {
    5     lock.unlock();
    6

    1.3 Lock各接口的含义

      

      Lock接口定义了实现一个锁应该具有的方法,下边看一下AQS

     二、队列同步器AQS

      队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。

           同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法(getState()、setState(int newState) 和 compareAndSetState(int expect,int update))来进行操作,因为它们能够保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。

           同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解二者之间的关系:锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注的领域。

    2.1 队列同步器的接口和示例

      同步器的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。  

      重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态。

      (1) getState():获取当前同步状态。

      (2) setState(int newState):设置当前同步状态。

      (3) compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性。

      

      同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。

      

      注:模板方法基本分为三类:独占式同步状态获取与释放、共享式同步状态获取与释放和查询同步队列中等待线程情况。自定义同步组件将使用同步器提供的模板方法来实现自己的同步语义。

      以独占锁的示例来深入了解一下同步器的工作原理:

      独占锁:同一时刻只有一个线程能够获取锁,而其他获取锁的线程只能处于同步队列中等待,只有获取锁的线程释放了锁,后续的线程才能够获取锁。

     1 class Mutex implements Lock {
     2 
     3     // 静态内部类,自定义同步器
     4     private static class Sync extends AbstractQueuedSynchronizer {
     5         // 是否处于占用状态
     6         protected boolean isHeldExclusively() {
     7             return getState() == 1;
     8         }
     9 
    10         // 当状态为0的时候获取锁
    11         public boolean tryAcquire(int acquires) {
    12             if (compareAndSetState(0, 1)) {
    13                 setExclusiveOwnerThread(Thread.currentThread());
    14                 return true;
    15             }
    16             return false;
    17         }
    18 
    19         // 释放锁,将状态设置为0
    20         protected boolean tryRelease(int releases) {
    21             if (getState() == 0) throw new IllegalMonitorStateException();
    22             setExclusiveOwnerThread(null);
    23             setState(0);
    24             return true;
    25         }
    26 
    27         // 返回一个Condition,每个condition都包含了一个condition队列
    28         Condition newCondition() {
    29             return new ConditionObject();
    30         }
    31 
    32     }
    33 
    34     // 仅需要将操作代理到Sync上即可
    35     private final Sync sync = new Sync();
    36 
    37     // 获取锁
    38     public void lock() {
    39         sync.acquire(1);
    40     }
    41 
    42     // 尝试非阻塞的获取锁
    43     public boolean tryLock() {
    44         return sync.tryAcquire(1);
    45     }
    46 
    47     // 释放锁
    48     public void unlock() {
    49         sync.release(1);
    50     }
    51 
    52     public Condition newCondition() {
    53         return sync.newCondition();
    54     }
    55 
    56     public boolean isLocked() {
    57         return sync.isHeldExclusively();
    58     }
    59 
    60     public boolean hasQueuedThreads() {
    61         return sync.hasQueuedThreads();
    62     }
    63 
    64     public void lockInterruptibly() throws InterruptedException {
    65         sync.acquireInterruptibly(1);
    66     }
    67 
    68     public boolean tryLock(long timeout, TimeUnit unit)
    69             throws InterruptedException {
    70         return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    71     }
    72 
    73 }

      独占锁Mutex是一个自定义同步组件,它在同一时刻只允许一条线程占有锁。Mutex定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态。在tryAcquire(int acquire)方法中,如果经过CAS设置成功(同步状态设置为1),则代表获取了同步状态,而在tryRelease(int release)方法中只是将同步状态重置为0。

      用户使用Mutex时,并不会直接和内部同步器实现打交道。而是调用Mutex提供的方法,在Mutex的实现中,以获取锁的lock()方法为例:只需要在方法实现中调用同步器的模板方法acquire(int args)即可。当前线程调用该方法获取同步状态失败后会被加入到同步队列中等待,这样就大大降低了实现一个可靠自定义组件的门槛。

    2.2 队列同步器的实现分析

    2.2.1 同步队列

      同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

         节点(Node):用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点信息。看看节点属性类型和名称以及描述:

      

        

      节点是构成同步队列(即等待队列)的基础,同步器拥有首节点(head)和尾节点(tail),没有成功获取同步状态的线程将会称为节点加入队列的尾部。

      同步队列的结构如下:

      

      同步器包含两个节点类型的引用,一个指向头节点,一个指向尾节点。线程加入队列的过程必须保证线程安全,同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Node update),保证线程安全。

      为什么必须保证线程安全:同时有多条线程没有获取同步状态要加入同步队列,这时如果不是线程安全的,请问谁先谁后呢?所以在此处的这个操作必须是线程安全的。它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。

      同步器将节点加入同步队列的过程:

      

      

      同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点:

      

      设置首节点是通过获取同步状态成功的线程完成的,由于只有一个线程能够获取到同步状态,因此设置头节点的方法并不需要CAS来保障,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的next引用即可。

    2.2.2 独占式同步状态获取与释放 

    (1)获取 

      通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是说由于线程获取同步状态失败后进入同步队列中,后继对线程进行中断操作时,线程不会从同步队列移除。acquire方法:

    1 public final void acquire(int arg){
    2     if(!tryAcquire(arg)&& acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
    3         selfInterrupt();
    4 }

      首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该节点以“死循环”的方式获取同步状态。如果获取不到就阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。

         节点的构造以及加入同步队列依靠于addWaiter和enq方法:

     1 private Node addWaiter(Node mode){
     2     Node node = new Node(Thread.currentThread(),mode);
     3     //快速尝试在尾部添加
     4     Node pred = tail; 
     5     if(pred != null){
     6         node.pred = pred;
     7         //通过compareAndSetTail(Node expect, Node update)方法来确保节点能够被线程安全添加。
     8         if(compareAndSetTail(pred,node)){
     9             pred.next = node;
    10             return node;
    11         }
    12     }
    13     enq(node);
    14     return node; 
    15 }
    16  
    17 private Node enq(final Node node){
    18     for(;;){
    19         Node t = tail;
    20         if(t==null){
    21             if(compareAndSetHead(new Node()))
    22                 tail = head;
    23         }else{
    24             node.pred = t;
    25             if(compareAndSetTail(t,node)){
    26                 t.next = node;
    27                 return t;
    28             }
    29         }
    30     }
    31 }

      在enq(final Node node)中,同步器通过死循环的方式来确保节点的添加,在死循环中只有通过CAS将当前节点设置为尾节点之后,当前线程才能从该方法返回,否则的话当前线程不断地尝试设置。

      节点进入队列后,就进入了一个自旋状态,每个节点(或者说每个线程),都在自省观察,当条件满足,获取到同步状态,就可以从这个自旋过程中退出,否则依旧留在自旋过程中:

     1 final boolean acquireQueued(final Node node, int arg){
     2     boolean failed = true;
     3     try{
     4         boolean interrupted = false;
     5         for(;;){
     6             final Node p = node.predecessor();
     7             if(p == head && tryAcquire(arg)){
     8                 setHead(node);
     9                 p.next = null;
    10                 failed = false;
    11                 return interrupted;
    12             }
    13             if(shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt()){
    14                 interrupted = true;
    15             }
    16         }
    17     }finally{
    18         if(failed){
    19             cancleAcquire(node);
    20         }
    21     }
    22 }

      在acquireQueued(final Node node,int arg)方法中,当前线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态,这是为什么?原因有两个,如下。  

      第一,头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点。

      第二,维护同步队列的FIFO原则。该方法中节点自旋获取同步状态的行为如下图: 

      

      

      由于非首节点线程前驱节点出队或被中断而从等待状态返回,随后检查自己的前驱是否是头节点,如果是则尝试获取同步状态,可以看到节点与及节点之间在循环检查的过程中基本上不相互通信,而是简单地判断自己的前驱是否为头节点,这样就使得节点的释放符合FIFO,并且对于方便对过早通知进行处理(过早通知指的是前驱节点不是头节点的线程由于中断被唤醒)。

      独占式同步状态获取流程,也就是acquire(int arg)方法调用流程:

      

      在上图中前驱节点为头节点且能够获取同步状态的判断条件和线程进入等待状态是:获取同步状态的自旋过程(acquireQueued方法的死循环),当同步状态获取成功,当前线程从acquire(int arg)方法返回,这也就代表着当前线程获得了锁。

    (2)释放

      当前线程获取同步状态完成相应逻辑后,需要释放同步状态,通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释放了同步状态后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)

     1 public final boolean release(int arg){
     2     try(tryRelease(arg)){
     3         Node h = head;
     4         if(h!=null && h.waitStatus != 0){
     5             unparkSuccessor(h);
     6         }
     7         return true;
     8     }
     9     return false;
    10 }

       该方法执行时,会唤醒头节点的后继节点线程,unparkSuccerssor(Node node)方法使用LcokSupport(后面讲)来唤醒处理等待状态的线程。

      (3)独占式同步状态获取和释放的总结

      在获取同步状态时,同步器会维持一个同步队列,获取失败的线程都会被加入到同步队列中,并在同步队列中自旋(判断自己前驱节点为头节点)。移出队列(停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。

           在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。

    2.2.3 独占式超时获取同步状态

      通过调用同步器的doAcquireNanos(int arg,long nanosTimeout)方法可以超时获取同步状态,即在指定的时间段内获取同步状态,如果获取到同步状态则返回true,否则,返回false。该方法提供了传统Java同步操作(比如synchronized关键字)所不具备的特性。

         在Java 5之前,当一个线程获取不到锁而被阻塞在synchronized之外时,对该线程进行中断操作,此时该线程的中断标志位会被修改,但线程依旧会阻塞在synchronized上,等待着获取锁。在Java 5中,同步器提供了acquireInterruptibly(int arg)方法,这个方法在等待获取同步状态时,如果当前线程被中断,会立刻返回,并抛出InterruptedException。
         超时获取同步状态过程可以被视作:响应中断获取同步状态过程的“增强版”,doAcquireNanos(int arg,long nanosTimeout)方法在支持响应中断的基础上,增加了超时获取的特性。

          针对超时获取,主要需要计算出需要睡眠的时间间隔nanosTimeout,为了防止过早通知,nanosTimeout计算公式为:nanosTimeout-=now-lastTime,其中now为当前唤醒时间,lastTime为上次唤醒时间,如果nanosTimeout大于0则表示超时时间未到,需要继续睡眠nanosTimeout纳秒,反之,表示已经超时。

       doAcquireNanos(int arg,long nanosTimeout):

      该方法在自旋过程中,当节点的前驱节点为头节点时尝试获取同步状态,如果获取成功则从该方法返回,这个过程和独占式同步获取的过程类似,但是在同步状态获取失败的处理上不同。

      如果当前线程获取同步状态失败,则判断是否超时(nanosTimeout小于0表示超时),如果没有超时,重新计算超时间隔nanosTimeout,然后使线程等待nanosTimeout纳秒(当已到设置的超时时间,该线程会从LockSupport.parkNanos(Object blocker, long nanos)方法返回)。

      如果nanosTimeout小于等于spinForTimeoutThreshold(1000纳秒)时,将不会使该线程进行超时等待,而是进入快速的自旋过程。原因:非常短的超时等待无法做到十分精确,如果这时再进行超时等待,相反会让nanosTimeout的超时从整体上表现的不精确。

           因此:在超时非常短的场景下,同步器会进入无条件的快速自旋。独占式超时获取同步状态的流程图:

      

      

       独占式超时获取同步状态doAcquireNanos(int arg,long nanosTimeout)和独占式获取同步状态acquire(int args)在流程上非常相似,其主要区别在于未获取到同步状态时的处理逻辑。acquire(int args)在未获取到同步状态时,将会使当前线程一直处于等待状态,而doAcquireNanos(int arg,long nanosTimeout)会使当前线程等待nanosTimeout纳秒,如果当前线程在nanosTimeout纳秒内没有获取到同步状态,将会从等待逻辑中自动返回。

    2.2.4 共享式同步状态获取与释放

       共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态

      

      

      从图可以看到,共享是可以在同一时刻所有共享线程对资源进行访问的,而独占的话是在同一时刻只有一个线程能够访问。

      通过调用同步器的acquireShared(int arg)方法可以共享式地获取同步状态;

      在acquireShared(int arg)方法中,同步器调用tryAcquireShared(int arg)方法尝试获取同步状态,tryAcquireShared(int arg)方法返回值为int型,当返回值大于等于0时,表示能够获取到同步状态。在共享式获取自选状态过程中,成功获取到同步状态并退出自旋的条件就是tryAcquireShared(int arg)方法的返回值大于等于0。

      可以看到在doAcquireShared(int arg)方法的自旋过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,如果返回值大于等于0,表示该次获取同步状态成功,并从自旋过程中退出。

      与独占式相同,共享式获取也需要释放同步状态,通过调用releaseShared(int arg)方法可以释放同步状态:

    1 public final boolean releaseShared(int arg){
    2     try(tryReleaseShared(arg)){
    3        doReleaseShared(); 
    4        return true;
    5     }
    6     return false;
    7 }

      该方法释放同步状态之后,将会唤醒后续处于等待状态的节点。对于能够支持多个线程同时访问的并发组件,它和独占式主要区别在于tryReleaseShared(int arg)方法必须确保同步状态(或者资源数)线程安全释放,一般都是通过CAS和循环来保证的,因为释放同步状态的操作会同时来自多个线程。

    三、具体AQS应用分析

      1、【Java多线程】ReentrantLock锁原理分析(十二)

      

    参考文章:

      1、《Java并发编程的艺术》

      2、https://blog.csdn.net/xlgen157387/article/details/78341626

      3、https://blog.csdn.net/qq_40645822/article/details/90668367

  • 相关阅读:
    Codeforces 749C【模拟】
    Codeforces 358D【DP】
    Lightoj1122 【数位DP】
    Codeforces 744C【DFS】
    大晚上就是想说说话
    HDU5997 【线段树】
    codeforces743D 【DFS】
    lightoj 1422【区间DP·分类区间首元素的情况】
    lightoj 1125【背包·从n个选m个】
    Lightoj 1147【DP】
  • 原文地址:https://www.cnblogs.com/h--d/p/14561229.html
Copyright © 2011-2022 走看看