zoukankan      html  css  js  c++  java
  • Java并发基石——所谓“阻塞”:Object Monitor和AQS(3)

    ====================================
    (接上文《Java并发基石——所谓“阻塞”:Object Monitor和AQS(2)》)

    • 使用AQS实现的Mutex

    现在我们来看看在AbstractQueuedSynchronizer官方文档中(实际上就是JDK源代码上的注释说明)给出的另一个示例性代码Mutex。这是一个使用AQS实现互斥锁的例子,完全可以用在正式环境下。以下代码片段中,英文注释信息是示例中自带的,中文信息是笔者加上的详细功能说明:

    // ........省略掉import区域的信息
    /**
     * 这是一个java AbstractQueuedSynchronizer中自带的Mutex互斥锁的实现,实际上就是基于AQS的独占机制实现。本文将详细讲解它的工作过程。
     * @author Doug Lea
     */
    class Mutex implements Lock, Serializable {
      // Our internal helper class
      private static class Sync extends AbstractQueuedSynchronizer {
        /**
         * Reports whether in locked state<br>
         * isHeldExclusively该方被重写,这个方法用于向当前线程反馈,是否AQS正处于独占操作状态下。<br>
         * 当AQS目前state状态为1时,就说明当前AQS队列正处于独占操作状态。
         */
        protected boolean isHeldExclusively() {
          return getState() == 1;
        }
        /**
         * Acquires the lock if state is zero<br>
         * tryAcquire也在示例中被重写,当前线程试图获取AQS独占操作权时,该方法将被调用。
         * */
        public boolean tryAcquire(int acquires) {
          assert acquires == 1;// Otherwise unused
          /*
           * 在上篇文章中已经介绍过compareAndSetState这个方法非常重要,多线程下状态的线程安全性主要也在这里体现
           * ——因为在多线程抢占执行compareAndSetState这句代码时,只有一个线程能够执行成功
           * 放在这个示例代码段落中,具体的工作特点是:比较当前AQS的state状态,如果为0,则设置为state状态为1,并返回true;
           * 其它情况下放弃对state的设定操作,并返回false。
           * */
          if (compareAndSetState(0, 1)) {
            // 如果成功设置当前AQS的state状态为1,则说明当前线程获得了AQS的独占操作权
            // 那么通过setExclusiveOwnerThread方法,设定AQS的独占操作线程为当前线程
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
          }
          return false;
        }
        
        /**
         * Releases the lock by setting state to zero
         * tryRelease在示例中被重写,当前线程释放AQS队列的独占操作权,该方法被调用
         * */
        protected boolean tryRelease(int releases) {
          assert releases == 1; // Otherwise unused
          // 如果调用该方法时,发现state状态不为1,就抛出异常
          // 这是为什么呢?这是因为,至少有一个线程在没有获得独占操作权的情况下,就调用了释放操作权的操作。
          if (getState() == 0) {
            throw new IllegalMonitorStateException();
          }
          
          // 通过setExclusiveOwnerThread方法,设定AQS的独占操作线程为null
          // 并且设定state状态为0
          setExclusiveOwnerThread(null);
          setState(0);
          return true;
        }
    
        // Provides a Condition
        // 关于java中的java.util.concurrent.locks.Condition接口,请参见后文介绍
        Condition newCondition() {
          return new ConditionObject();
        }
      }
    
      // The sync object does all the hard work. We just forward to it.
      private final Sync sync = new Sync();
    
      // 该方法进行加锁处理,实际上就是调用AQS的acquire方法。这里的参数1,会出现在tryAcquire方法中
      // 一旦加锁成功(获取操作权的操作成功),其它试图加锁(试图获取操作权)但又没有成功的线程,则进入阻塞状态。
      public void lock() {
        sync.acquire(1); 
      }
      // acquireInterruptibly方法和acquire方法的区别,在上文中已经有了说明
      public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
      }
      // 尝试加锁(尝试获取独占操作权),如果成功了(也就是说state因为当前线程的这个操作被置为1),则返回true;
      // 如果加锁操作没有成功,则返回false,这时当前线程还可以继续动作。
      public boolean tryLock() {
        return sync.tryAcquire(1); 
      }
      // 在一段持续的时间内,尝试加锁(尝试获取独占操作权),如果成功则返回true;
      // 如果加锁操作没有成功,则返回false,这是当前线程还可以继续动作。
      public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
      }
      // 解除独占状态,AQS将在执行过程中调用Mutex.Sync中的tryRelease方法。
      public void unlock() {
        sync.release(1); 
      }
      public Condition newCondition() {
        return sync.newCondition();
      }
      // 查询当前AQS队列是否已经被某个线程独占。
      // 实际上就是调用isHeldExclusively方法,看状态是否为1,为1就是说明被独占了
      public boolean isLocked() {
        return sync.isHeldExclusively(); 
      }
      // 查询AQS中是否有任何线程正在等待获取。当AQS中的head != tail时,说明AQS中有等待队列
      public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads(); 
      }
    }
    

    示例代码中的注释信息已经非常清楚了,这里就不再赘述。

    • Mutex的运行使用效果

    接下来我们看一下Mutex的运行效果,既然是基于AQS中的独占模式进行的实现,那么在某一个线程获得独占操作权的情况下,其它线程就不能获得操作权了——要么进入阻塞等待获得独占操作权,要么放弃抢占操作权继续执行当前线程的后续操作。

    情况一:某个线程抢得了操作权,其它线程等待

    public static void main(String[] args) {
      final Mutex mutex = new Mutex();
      for(int index = 0 ; index < 3 ; index++) {
        new Thread(() -> {
          try {
            mutex.lock();
          } finally {
            // 如果不调用unlock,那么独占状态将不会解除,其它线程将一直处于阻塞状态
            mutex.unlock();
          }
        } , "index" + index).start();
      } 
    }
    

    情况二:某个线程抢得了操作权,其它线程尝试抢占操作权一段时间后,放弃抢占操作权的操作

    public static void main(String[] args) {
      final Mutex mutex = new Mutex();
      for(int index = 0 ; index < 3 ; index++) {
        new Thread(() -> {
          boolean isMutex = false;
          try {
            // 尝试获得独占操作权(10秒的时间内)
            if(mutex.tryLock(10, TimeUnit.SECONDS)) {
              isMutex = true;
              System.out.println("本线程尝试获取独占操作权成功");
            } else {
              System.out.println("本线程尝试获取独占操作权失败");
            }
          } catch(InterruptedException e) {
            e.printStackTrace(System.out);
          } finally {
            if(isMutex) {
              mutex.unlock();
            }
          }
        } , "index" + index).start();
      }
    }
    

    3.3、AQS代码分析

    本届中我们主要分析AQS中几个关键方法,包括独占工作模式下的acquire()方法和release()方法,以及共享工作模式下的acquireShared()方法和releaseShared()方法。

    3.3.1、AQS中的Node和Node构成的AQS队列

    要理解AQS内部如何基于CAS原理和LockSupport工具完成工作的,就需要先把AQS中关于Node的关键定义搞清楚:

    在这里插入图片描述

    上图是对上文中AQS队列中,每一个Node节点结构图的细化说明。这个队列是一个双向的带头、尾标识的队列,在AQS队列初始化时,其中的Tail节点为null。在一个Node节点涉及到的独占模式工作效果中,主要包括了几个重要的部分:

    static final class Node {
    	volatile int waitStatus;
    	volatile Node prev;
    	volatile Node next;
    	volatile Thread thread;
    	// 。。。。。先省略,后续还有其它补充
    }	
    
    • next:指向当前Node节点的下一个节点,如果当前节点为尾节点,则next属性为null。

    • prev:指向当前Node节点的上一个Node节点,如果当前节点为head节点,则prev属性为null。

    • thread:AQS的双向队列说白了是对进程中若干Thread集合的管理,那么Node节点就需要一个属性来标识其管理的/代表的线程对象。

    • waitStatus:节点等待状态,这个状态信息非常重要:包括这些状态标识:

      • 1:CANCELLED,指示当前节点所代表的线程由于等待超时或者interrupt中断信号的原因,已经取消了其在AQS队列中的抢占操作,处于这种标识状态下的节点不会再被本AQS队列继续阻塞;
      • -1:SIGNAL,指示当前节点的后续节点所代表的线程,已经被(或者很快将被)阻塞(通过parking模式),也就是说当前后续节点所代表的线程需要被执行unpark后,才能解除阻塞。
      • -2:CONDITION,指示当前对象所代表的线程,需要等待一个Condition对象的激活——在Condition队列中被阻塞。
      • -3:PROPAGATE,当节点处于“共享”模式下,该值指示共享操作权的释放信号还可以扩散到AQS队列中的其它节点上去。
      • 0:节点不处于以上任何状态。

    这里我们只是介绍了waitStatus状态属性的字面含义,实际上这些字面含义只要查看java.util.concurrent.locks.AbstractQueuedSynchronizer.Node的源代码就能看到,最重要的还是这些状态如何在Node的工作过程中发挥作用。

    3.3.2、独占模式下的acquire()的工作过程

    有了以上小节的对Node中各个属性的基础描述作为铺垫,我们就可以首先开始分析AQS独占模式下的acquire()方法、release()工作过程。当我们调用AbstractQueuedSynchronizer.acquire(int)方法时,AQS做了什么工作呢?首先上代码——acquire(int)方法中的详细代码片段:

    //......
    public final void acquire(int arg) {
      // 请注意这个 Node.EXCLUSIVE 值,从源代码可以看到这个对象代表的就是一个null值
      if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
    }
    //......
    

    acquire 方法中一共执行了三个关键方法:

    • tryAcquire(int) 这个方法在上文中都已经介绍过,就是需要开发人员在使用AQS独占模式之前需要重写的方法。当这个方法返回true,就表示当前线程获得了独占操作权;从acquire方法的代码详情可以看出,如果tryAcquire(int)方法返回true,则整个方法就结束,不再进行任何处理——包括也不需要进行AQS队列的任何变动了。

    • addWaiter(Node) 方法的作用创建一个Node节点,并试图将这个节点挂载到AQS的队尾(以CAS线程安全的方式)。根据AQS的工作模式(独占或者共享)创建的节点特点是不一样的。这里我们先介绍在独占模式下创建节点的过程。

    /**
     * Creates and enqueues node for current thread and given mode.
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     */
    private Node addWaiter(Node mode) {
      // 根据源代码的,当独占模式的acquire方法调用addWaiter这个方法的时候,其传参的mode的值为null
      Node node = new Node(Thread.currentThread(), mode);
      // Try the fast path of enq; backup to full enq on failure
      // 最常见的this.tail为null的情况就是当前AQS队列中还没有节点
      Node pred = this.tail;
      if (pred != null) {
        node.prev = pred;
        // 这个CAS方法的含义是,判断当前AQS的尾部节点是pred,如果是则重新设置为当前node
        // 设定成功,则退出
        if (compareAndSetTail(pred, node)) {
          pred.next = node;
          return node;
        }
      }
      enq(node);
      return node;
    }
    

    这个addWaiter方法中的CAS操作只进行了一次,并不是如果没有成功就进入enq方法。enq方法实际上就是addWaiter方法的“无限循环”操作,也就是说不将新增的节点添加到AQS队列中决不罢休。代码如下:

    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * 反正笔者是不知道picture在哪里
     * @param node the node to insert
     */
    private Node enq(final Node node) {
      // 一直试图添加,直到添加到了AQS队列的头部或者尾部未知
      for (;;) {
        Node t = tail;
    	if (t == null) { // Must initialize
    	  if (compareAndSetHead(new Node()))
    	    tail = head;
    	} else {
    	  node.prev = t;
    	  if (compareAndSetTail(t, node)) {
    		t.next = node;
    	    return t;
    	  }
    	}
      }
    }
    
    • acquireQueued(Node , int) 通过addWaiter方法我们将一个代表线程的AQS节点进行了创建,并且加入到了AQS队列中,但是我们还没有让这个节点所代表的线程进入阻塞状态,也没有让这个节点和AQS队列的整体建立交互机制(参与独占操作权的抢占)。所以这个acquireQueued方法就是让这个新加入AQS队列的节点根据调用者的设定逻辑进入不同的阻塞状态,并且让这个节点所代表的线程参与独占权抢占。代码详情如下:
    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     * @return true:if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
      boolean failed = true;
      try {
        boolean interrupted = false;
        for (;;) {
          // 取得当前的节点的前驱节点(前置节点)
          final Node p = node.predecessor();
          // 这个if段落的代码有得一读。仔细看
          // 如果当前节点的前驱节点是头结点,那么再次试图执行tryAcquire方法。
          // 如果执行成功则当前节点为头部节点,通过if段落中的代码去掉当前的头结点信息
          if (p == head && tryAcquire(arg)) {
            setHead(node);
            p.next = null; // help GC
            failed = false;
            return interrupted;
          }
          
          // 如果当前节点前驱节点不是头部节点,或者执行tryAcquire再次失败,那么就执行到这个if块
          if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) 
            interrupted = true;
        }
      } finally {
        if (failed) 
          cancelAcquire(node);
      }
    }
    

    acquireQueued方法中如果当前节点的前驱节点是AQS队列的head节点,且再次试图执行tryAcquire方法又成功的特殊情况这里就不再进行赘述了,我们主要讲解后面的两个方法shouldParkAfterFailedAcquire和parkAndCheckInterrupt。

    shouldParkAfterFailedAcquire方法是判断当前节点是否在获取独占操作权失败后进入阻塞状态;parkAndCheckInterrupt方法用于让当前节点所代表的线程进入阻塞状态,并监控其是否收到了“中断”信号。先来看一下shouldParkAfterFailedAcquire方法的代码详情:

    // 参数为node:当前节点;pred:当前节点的前置节点
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
      int ws = pred.waitStatus;
      // 前置节点Node的waitStatus状态属性为SIGNAL(-1),就说明可以进行阻塞,于是返回true
      if (ws == Node.SIGNAL)
        // This node has already set status asking a release to signal it, so it can safely park.
        return true;
      // waitStatus状态属性大于0的,就只有一种叫做CANCELLED的状态
      // 文档中对于这种状态的描述也非常清楚,就是这些节点由于超时或者终止的原因不在需要进入阻塞状态。
      // 也不会再参与独占操作权的抢占操作,于是循环执行以下代码,将这些节点剔除
      if (ws > 0) {
        // Predecessor was cancelled. Skip over predecessors and indicate retry.
        do {
          node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
      } else {
        // waitStatus must be 0 or PROPAGATE. Indicate that we need a signal, but don't park yet.
        // Caller will need to retry to make sure it cannot acquire before parking.
        // 这个代码很明确,不再说明了
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
      }
      return false;
    }
    

    一旦shouldParkAfterFailedAcquire方法的返回值为false,则重新进入acquireQueued方法的“for ( ; ; ) 死循环”;如果返回值为true,则调用parkAndCheckInterrupt方法。parkAndCheckInterrupt方法的详情如下:

    /**
     * Convenience method to park and then check if interrupted
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
      LockSupport.park(this);
      return Thread.interrupted();
    }
    

    ================================================================
    (接下文)

  • 相关阅读:
    Flink sql 写 Hbase 忽略空列
    python协程系列(一)——生成器generator以及yield表达式详解
    Python中可迭代对象,迭代器和生成器的异同点
    Python生成器: send函数、close函数与yield关键字协作
    sqlalchemy的基本用法
    Windows10激活后又提示未激活解决办法
    parseaddr函数和formataddr函数的用法
    VisualStudio Code Remote 调试方法(错误Containers Docker version 17.12.0 or later required.)
    SpringBoot Jar Windows CMD 运行卡顿
    使用Aspose.cells(java)将excel转为图片等
  • 原文地址:https://www.cnblogs.com/liulaolaiu/p/11744215.html
Copyright © 2011-2022 走看看