zoukankan      html  css  js  c++  java
  • Java多线程:AQS源码分析

    前言

      AbstractQueuedSynchronizer类如其名,抽象的队列式的同步器,通常被称之为AQS的类,它是一个非常有用的父类,可用来定义锁以及依赖于排队阻塞线程的其他同步器;ReentrantLock,ReentrantReadWriteLock,CountDownLatch,CyclicBarrier和Semaphore等这些类都是基于AQS类实现的。AbstractQueuedLongSynchronizer 类提供相同的功能但扩展了对同步状态的 64 位的支持。两者都扩展了类 AbstractOwnableSynchronizer(一个帮助记录当前保持独占同步的线程的简单类)。

    框架

    这个类的框架如图所示:

    它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。

    1. AQS用的是一个32位的整型来表示同步状态的,它是用volatile修饰的:

    private volatile int state;

    在互斥锁中它表示着线程是否已经获取了锁,0未获取,1已经获取了,大于1表示重入数。同时AQS提供了getState()、setState()、compareAndSetState()方法来获取和修改该值:

    可重入锁指的是在一个线程中可以多次获取同一把锁,比如:
    一个线程在执行一个带锁的方法,该方法中又调用了另一个需要相同锁的方法,则该线程可以直接执行调用的方法,而无需重新获得锁。synchronized也可以看做重入锁

    所以可重入数大于1表示该线程可能调用了多个需要当前锁的方法,或同一个线程调用了多次lock()方法。

     1 protected final int getState() {
     2     return state;
     3 }
     4 
     5 protected final void setState(int newState) {
     6     state = newState;
     7 }
     8 
     9 protected final boolean compareAndSetState(int expect, int update) {
    10     return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    11 }

    这些方法需要java.util.concurrent.atomic包的支持,采用CAS操作,保证其原子性和可见性。

    2. AQS内部维护着一个FIFO的CLH队列,所以AQS并不支持基于优先级的同步策略。至于为何要选择CLH队列,主要在于CLH锁相对于MSC锁,他更加容易处理cancel和timeout,同时他具备进出队列快、无所、畅通无阻、检查是否有线程在等待也非常容易(head != tail,头尾指针不同)。当然相对于原始的CLH队列锁,AQS采用的是一种变种的CLH队列锁:

    1、原始CLH使用的locked自旋,而AQS的CLH则是在每个node里面使用一个状态字段来控制阻塞,而不是自旋。

    2、为了可以处理timeout和cancel操作,每个node维护一个指向前驱的指针。如果一个node的前驱被cancel,这个node可以前向移动使用前驱的状态字段。

    3、head结点使用的是傀儡结点。

      AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch),即我们常说的" 独占锁" 和 "共享锁"。

      不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

    • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
    • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
    • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
    • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    • tryReleaseShared(int):共享方式。尝试释放资源,成功则返回true,失败则返回false。

      以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

      再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

      一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

    源码解析

      我们使用 ReentrantLock 时,可以通过 lock() 方法获取到锁,但是这并是 ReentrantLock 类所做的事,而是在底层调用了AQS的方法来完成的。所以我们有必要深入去了解 AQS 是怎样实现对锁的管理,保证线程的逻辑正确执行的。

      我们分别对独占锁的管理流程 acquire-release 和 共享锁管理流程 acquireShared-releaseShare 进行分析。

    acquire(int)

      此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。获取到资源后,线程就可以去执行其临界区代码了。下面是acquire()的源码:

    1 public final void acquire(int arg) {
    2     if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    3         selfInterrupt();
    4 }

    执行流程:

    •  "当前线程"首先通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列排序等待(前面还有可能有需要线程在等待该锁)。
    • "当前线程"尝试失败的情况下,先通过addWaiter(Node.EXCLUSIVE)来将“当前线程"加入到"CLH队列(非阻塞的FIFO队列)"末尾。CLH队列就是线程等待队列。
    •  再执行完addWaiter(Node.EXCLUSIVE)之后,会调用acquireQueued()来获取锁。由于此时ReentrantLock是公平锁,它会根据公平性原则来获取锁。
    •  "当前线程"在执行acquireQueued()时,会进入到CLH队列中休眠等待,直到获取锁了才返回!如果"当前线程"在休眠等待过程中被中断过,acquireQueued会返回true,此时"当前线程"会调用selfInterrupt()来自己给自己产生一个中断。至于为什么要自己给自己产生一个中断,后面再介绍。

    所以我们对这里面的方法一个一个介绍

    1、tryAcquire(int)

    此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。这也正是tryLock()的语义,还是那句话,当然不仅仅只限于tryLock()。如下是tryAcquire()的源码:

    1     protected boolean tryAcquire(int arg) {
    2         throw new UnsupportedOperationException();
    3     } 

      因为AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现,所以AQS这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了。例如公平锁的tryAcquire()在ReentrantLock.java的FairSync类中实现。

      这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口,尽量减少不必要的工作量。

      我们可以看看 ReentrantLock 在公平锁状态下是如何实现这个方法的,公平锁的tryAcquire()在ReentrantLock.java的FairSync类中实现,源码如下:

     1 复制代码
     2 protected final boolean tryAcquire(int acquires) {
     3     // 获取“当前线程”
     4     final Thread current = Thread.currentThread();
     5     // 获取“独占锁”的状态
     6     int c = getState();
     7     // c=0意味着“锁没有被任何线程锁拥有”,
     8     if (c == 0) {
     9         // 若“锁没有被任何线程锁拥有”,
    10         // 则判断“当前线程”是不是CLH队列中的排在最前面的那个线程,
    11         // 若是的话,则获取该锁,设置锁的状态,并切设置锁的拥有者为“当前线程”。
    12         if (!hasQueuedPredecessors() &&
    13             compareAndSetState(0, acquires)) {
    14             setExclusiveOwnerThread(current);
    15             return true;
    16         }
    17     }
    18     else if (current == getExclusiveOwnerThread()) {
    19         // 如果“独占锁”的拥有者已经为“当前线程”,
    20         // 则将更新锁的状态,重入数加一
    21         int nextc = c + acquires;
    22         if (nextc < 0)
    23             throw new Error("Maximum lock count exceeded");
    24         setState(nextc); // 更新当前线程的重入数
    25         return true;
    26     }
    27     return false;
    28 }

    说明:根据代码,我们可以分析出,tryAcquire()的作用就是尝试去获取锁。注意,这里只是尝试!尝试成功的话,返回true;尝试失败的话,返回false,后续再通过其它办法来获取该锁。

    2. addWaiter(Node.EXCLUSIVE)

    在介绍此方法前我们需要对 Node 这个类进行介绍。

    Node就是CLH队列的节点。Node在AQS中实现,它的数据结构如下:

     1 private transient volatile Node head;    // CLH队列的队首
     2 private transient volatile Node tail;    // CLH队列的队尾
     3 
     4 // CLH队列的节点
     5 static final class Node {
     6     static final Node SHARED = new Node();
     7     static final Node EXCLUSIVE = null;
     8 
     9     // 线程已被取消,对应的waitStatus的值
    10     static final int CANCELLED =  1;
    11     // “当前线程的后继线程需要被unpark(唤醒)”,对应的waitStatus的值。
    12     // 一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
    13     static final int SIGNAL    = -1;
    14     // 线程(处在Condition休眠状态)在等待Condition唤醒,对应的waitStatus的值
    15     static final int CONDITION = -2;
    16     // (共享锁)其它线程获取到“共享锁”,对应的waitStatus的值
    17     static final int PROPAGATE = -3;
    18 
    19     // waitStatus为“CANCELLED, SIGNAL, CONDITION, PROPAGATE”时分别表示不同状态,
    20     // 若waitStatus=0,则意味着当前线程不属于上面的任何一种状态。
    21     volatile int waitStatus;
    22 
    23     // 前一节点
    24     volatile Node prev;
    25 
    26     // 后一节点
    27     volatile Node next;
    28 
    29     // 节点所对应的线程
    30     volatile Thread thread;
    31 
    32     // nextWaiter是“区别当前CLH队列是 ‘独占锁’队列 还是 ‘共享锁’队列 的标记”
    33     // 若nextWaiter=SHARED,则CLH队列是“独占锁”队列;
    34     // 若nextWaiter=EXCLUSIVE,(即nextWaiter=null),则CLH队列是“共享锁”队列。
    35     Node nextWaiter;
    36 
    37     // “共享锁”则返回true,“独占锁”则返回false。
    38     final boolean isShared() {
    39         return nextWaiter == SHARED;
    40     }
    41 
    42     // 返回前一节点
    43     final Node predecessor() throws NullPointerException {
    44         Node p = prev;
    45         if (p == null)
    46             throw new NullPointerException();
    47         else
    48             return p;
    49     }
    50 
    51     Node() {    // Used to establish initial head or SHARED marker
    52     }
    53 
    54     // 构造函数。thread是节点所对应的线程,mode是用来表示thread的锁是“独占锁”还是“共享锁”。
    55     Node(Thread thread, Node mode) {     // Used by addWaiter
    56         this.nextWaiter = mode;
    57         this.thread = thread;
    58     }
    59 
    60     // 构造函数。thread是节点所对应的线程,waitStatus是线程的等待状态。
    61     Node(Thread thread, int waitStatus) { // Used by Condition
    62         this.waitStatus = waitStatus;
    63         this.thread = thread;
    64     }
    65 }

    说明
    Node是CLH队列的节点,代表“等待锁的线程队列”。
    1. 每个Node都会一个线程对应。
    2. 每个Node会通过prev和next分别指向上一个节点和下一个节点,这分别代表上一个等待线程和下一个等待线程。
    3. Node通过waitStatus保存线程的等待状态。
    4. Node通过nextWaiter来区分线程是“独占锁”线程还是“共享锁”线程。如果是“独占锁”线程,则nextWaiter的值为EXCLUSIVE;如果是“共享锁”线程,则nextWaiter的值是SHARED。

    addWaiter(Node.EXCLUSIVE)的作用是,创建“当前线程”的Node节点,且Node中记录“当前线程”对应的锁是“独占锁”类型,并且将该节点添加到CLH队列的末尾。

     1 private Node addWaiter(Node mode) {
     2     //以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
     3     Node node = new Node(Thread.currentThread(), mode);
     4     
     5     //尝试快速方式直接放到队尾。
     6     Node pred = tail;
     7     if (pred != null) {
     8         node.prev = pred;
     9         if (compareAndSetTail(pred, node)) {
    10             pred.next = node;
    11             return node;
    12         }
    13     }
    14     
    15     //上一步失败则通过enq入队。
    16     enq(node);
    17     return node;
    18 }

    说明:对于“公平锁”而言,addWaiter(Node.EXCLUSIVE)会首先创建一个Node节点,节点的类型是“独占锁”(Node.EXCLUSIVE)类型。然后,再将该节点添加到CLH队列的末尾。

    其中 compareAndSetTail() 和 enq() 方法源码如下:

     1 private final boolean compareAndSetTail(Node expect, Node update) {
     2     return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
     3 }
     4 
     5 private Node enq(final Node node) {
     6     //CAS"自旋",直到成功加入队尾
     7     for (;;) {
     8         Node t = tail;
     9         if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
    10             if (compareAndSetHead(new Node()))
    11                 tail = head;
    12         } else {//正常流程,放入队尾
    13             node.prev = t;
    14             if (compareAndSetTail(t, node)) {
    15                 t.next = node;
    16                 return t;
    17             }
    18         }
    19     }
    20 }

    说明

    1. compareAndSetTail也属于CAS函数,也是通过“本地方法”实现的。compareAndSetTail(expect, update)会以原子的方式进行操作,它的作用是判断CLH队列的队尾是不是为expect,是的话,就将队尾设为update。

    2. enq()的作用很简单。如果CLH队列为空,则新建一个CLH表头;然后将node添加到CLH末尾。否则,直接将node添加到CLH末尾。

    所以总的来说 addWaiter() 方法的作用,就是将当前线程添加到CLH队列中。这就意味着将当前线程添加到等待获取“锁”的等待线程队列中了。

    3. acquireQueued(Node, int)

    前面我们已经尝试获取锁失败。将当前线程添加到CLH队列中了。而acquireQueued()的作用就是逐步的去执行CLH队列的线程,如果当前线程获取到了锁,则返回;否则,当前线程进行休眠,直到唤醒并重新获取锁了才返回。下面,我们看看acquireQueued()的具体流程。

     1 final boolean acquireQueued(final Node node, int arg) {
     2     boolean failed = true;//标记是否成功拿到资源
     3     try {
     4         boolean interrupted = false;//标记等待过程中是否被中断过
     5         
     6         for (;;) {
     7             final Node p = node.predecessor();//拿到前驱
     8             //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
     9             if (p == head && tryAcquire(arg)) {
    10                 setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
    11                 p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
    12                 failed = false;
    13                 return interrupted;//返回等待过程中是否被中断过
    14             }
    15             
    16             //如果自己可以休息了,就进入waiting状态,直到被unpark()
    17             if (shouldParkAfterFailedAcquire(p, node) &&
    18                 parkAndCheckInterrupt())
    19                 interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
    20         }
    21     } finally {
    22         if (failed)
    23             cancelAcquire(node);
    24     }
    25 }

    前面部分代码注释写的很清楚了,这里就不多加描述,不过我们需要注意一下这个条件判断的写法 if (p == head && tryAcquire(arg)),这里涉及到锁的公平性问题,“当前线程”会根据公平性原则进行阻塞等待,直到获取锁为止;并且返回当前线程在等待过程中有没有被中断过。后面文章会单独说明。

    看看shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()具体干些什么。

     1 // 返回“当前线程是否应该阻塞”
     2 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
     3     // 前继节点的状态
     4     int ws = pred.waitStatus;
     5     // 如果前继节点是SIGNAL状态,则意味这当前线程需要被阻塞。此时,返回true。
     6     if (ws == Node.SIGNAL)
     7         return true;
     8     // 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边
     9     if (ws > 0) {
    10         do {
    11             node.prev = pred = pred.prev;
    12         } while (pred.waitStatus > 0);
    13         pred.next = node;
    14     } else {
    15         // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。
    16         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    17     }
    18     return false;
    19 }

    说明
    (01) 关于waitStatus请参考下表(中扩号内为waitStatus的值),更多关于waitStatus的内容,可以参考前面的Node类的介绍。

    CANCELLED[1]  -- 当前线程已被取消
    SIGNAL[-1]    -- “当前线程的后继线程需要被unpark(唤醒)”。一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
    CONDITION[-2] -- 当前线程(处在Condition休眠状态)在等待Condition唤醒
    PROPAGATE[-3] -- (共享锁)其它线程获取到“共享锁”
    [0]           -- 当前线程不属于上面的任何一种状态。

    (02) shouldParkAfterFailedAcquire()通过以下规则,判断“当前线程”是否需要被阻塞。

    规则1:如果前继节点状态为SIGNAL,表明当前节点需要被unpark(唤醒),此时则返回true。
    规则2:如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。
    规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false。

    如果“规则1”发生,即“前继节点是SIGNAL”状态,则意味着“当前线程”需要被阻塞。接下来会调用parkAndCheckInterrupt()阻塞当前线程,直到当前线程被唤醒才从parkAndCheckInterrupt()中返回。

    parkAndCheckInterrupt()代码如下:

    private final boolean parkAndCheckInterrupt() {
        // 通过LockSupport的park()阻塞“当前线程”。
        LockSupport.park(this);
        // 返回线程的中断状态。
        return Thread.interrupted();
    }

    说明:parkAndCheckInterrupt()的作用是阻塞当前线程,并且返回“线程被唤醒之后”的中断状态。
    它会先通过LockSupport.park()阻塞“当前线程”,然后通过Thread.interrupted()返回线程的中断状态。

    这里介绍一下线程被阻塞之后如何唤醒。一般有2种情况:
    第1种情况:unpark()唤醒。“前继节点对应的线程”使用完锁之后,通过unpark()方式唤醒当前线程。
    第2种情况:中断唤醒。其它线程通过interrupt()中断当前线程。

    补充:LockSupport()中的park(),unpark()的作用 和 Object中的wait(),notify()作用类似,是阻塞/唤醒。
    它们的用法不同,park(),unpark()是轻量级的,而wait(),notify()是必须先通过Synchronized获取同步锁。
    关于LockSupport,我们会在之后的章节再专门进行介绍!

    这里提个问题:

    如果锁被占用,线程阻塞,如果调用阻塞线程的 interrupt() 方法,会取消获取锁吗?答案是否定的。

    首先要知道 LockSupport.park() 会响应中断,但不会抛出 InterruptedException,并且Thread.interrupted()返回线程的中断状态时会清空中断状态,当前线程会因为自旋再次进入阻塞状态。

    如果我们调用 lockInterruptibly() 方法,这个方法必须是当前锁未发生中断的情况下才能获取锁,它的实现原理是,如果 

    shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() 判断条件成立的话,不返回Thread.interrupted(),而是直接抛出InterruptedException,同时在 finally 语句中将CLH队列中代表当前线程的节点状态设置为 Node.CANCELLED

    4. selfInterrupt()

    最后我们来看看 selfInterrupted() ,为什么要自己产生一个中断,其源码如下:

    private static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

      这必须结合acquireQueued()进行分析。如果在acquireQueued()中,当前线程被中断过,则执行selfInterrupt();否则不会执行。

      在acquireQueued()中,即使是线程在阻塞状态被中断唤醒而获取到cpu执行权利;但是,它需要重新在循环中判断,如果该线程的前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。它会再次阻塞,直到该线程被它的前面等待锁的线程锁唤醒;线程才会获取锁,然后“真正执行起来”!
      也就是说,在该线程“成功获取锁并真正执行起来”之前,它的中断会被忽略并且中断标记会被清除! 因为在parkAndCheckInterrupt()中,我们线程的中断状态时调用了Thread.interrupted()。该函数不同于Thread的isInterrupted()函数,isInterrupted()仅仅返回中断状态,而interrupted()在返回当前中断状态之后,还会清除中断状态。 正因为之前的中断状态被清除了,所以这里需要调用selfInterrupt()重新产生一个中断,记录线程的中断状态。

    小结

    好了,我们再重新捋一遍整个获取锁的过程

    1 public final void acquire(int arg) {
    2     if (!tryAcquire(arg) &&
    3         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    4         selfInterrupt();
    5 }

    1. 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;

    2. 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;

    3. acquireQueued()使线程在等待队列中等待,有机会时(按公平性原则,轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。

    4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

    整个流程可以用下图来表示

    release(int)

    看完如何获取到锁之后,我们再来看一下如何释放这个独占锁。

    以 ReentrantLock 为例,它释放锁的操作是 unlock(),当你看它底层具体实现时可以发现,其实他调用的正是 AQS 的 release(int) 方法。

    此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义,当然不仅仅只限于unlock()。下面是release()的源码:

    1 public final boolean release(int arg) {
    2     if (tryRelease(arg)) {
    3         Node h = head;//找到头结点
    4         if (h != null && h.waitStatus != 0)
    5             unparkSuccessor(h);//唤醒等待队列里的下一个线程
    6         return true;
    7     }
    8     return false;
    9 }

    说明:逻辑并不复杂。它调用tryRelease()来释放资源。有一点需要注意的是,它是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自定义同步器在设计tryRelease()的时候要明确这一点。

    1. tryRelease()

    此方法尝试去释放指定量的资源。下面是tryRelease()的源码:

    1 protected boolean tryRelease(int arg) {
    2     throw new UnsupportedOperationException();
    3 }

      跟tryAcquire()一样,这个方法是需要独占模式的自定义同步器去实现的。正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。但要注意它的返回值,上面已经提到了,release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false。

    同样,我们这里也看一下在公平锁策略下的 ReentrantLock 是如何实现这个方法的,tryRelease()在ReentrantLock.java的Sync类中实现,源码如下:

     1 protected final boolean tryRelease(int releases) {
     2     // c是本次释放锁之后的状态
     3     int c = getState() - releases;
     4     // 如果“当前线程”不是“锁的持有者”,则抛出异常!
     5     if (Thread.currentThread() != getExclusiveOwnerThread())
     6         throw new IllegalMonitorStateException();
     7 
     8     boolean free = false;
     9     // 如果“锁”已经被当前线程彻底释放即可重入数为0时,则设置“锁”的持有者为null,即锁是可获取状态。
    10     if (c == 0) {
    11         free = true;
    12         setExclusiveOwnerThread(null);
    13     }
    14     // 如果当前重入数不为0表示当前是重入锁的一次释放,所以只更新可重入数为c,不将独占锁线程清空。
    15     setState(c);
    16     return free;
    17 }

    说明
    tryRelease()的作用是尝试释放锁。
    1.  如果“当前线程”不是“锁的持有者”,则抛出异常。
    2. 如果“当前线程”在本次释放锁操作之后,对锁的拥有状态是0(即,当前线程彻底释放该“锁”),则设置“锁”的持有者为null,即锁是可获取状态。同时,更新当前线程的锁的状态为0。

    getExclusiveOwnerThread(), setExclusiveOwnerThread()在AQS的父类AbstractOwnableSynchronizer.java中定义,源码如下:

     1 public abstract class AbstractOwnableSynchronizer
     2     implements java.io.Serializable {
     3 
     4     // “锁”的持有线程
     5     private transient Thread exclusiveOwnerThread;
     6 
     7     // 设置“锁的持有线程”为t
     8     protected final void setExclusiveOwnerThread(Thread t) {
     9         exclusiveOwnerThread = t;
    10     }
    11 
    12     // 获取“锁的持有线程”
    13     protected final Thread getExclusiveOwnerThread() {
    14         return exclusiveOwnerThread;
    15     }
    16    
    17     ...
    18 }

    2. unparkSuccessor()

    在release()中“当前线程”释放锁成功的话,会唤醒当前线程的后继线程。
    根据CLH队列的FIFO规则,“当前线程”(即已经获取锁的线程)肯定是head;如果CLH队列非空的话,则唤醒锁的下一个等待线程。
    面看看unparkSuccessor()的源码,它在AQS中实现

     1 private void unparkSuccessor(Node node) {
     2     // 获取当前线程的状态
     3     int ws = node.waitStatus;
     4     // 如果状态<0,则设置状态=0
     5     if (ws < 0)
     6         compareAndSetWaitStatus(node, ws, 0);
     7 
     8     // 获取当前节点的“有效的后继节点”,无效的话,则通过for循环进行获取。
     9     // 这里的有效,是指“后继节点对应的线程状态<=0”
    10     Node s = node.next;
    11     if (s == null || s.waitStatus > 0) {
    12         s = null;
    13         for (Node t = tail; t != null && t != node; t = t.prev)
    14             if (t.waitStatus <= 0)
    15                 s = t;
    16     }
    17     // 唤醒“后继节点对应的线程”
    18     if (s != null)
    19         LockSupport.unpark(s.thread);
    20 }

      这个函数并不复杂。一句话概括:用unpark()唤醒等待队列中最前边的那个未放弃线程,这里我们也用s来表示吧。此时,再和acquireQueued()联系起来,s被唤醒后,进入if (p == head && tryAcquire(arg))的判断(即使p!=head也没关系,它会再进入shouldParkAfterFailedAcquire()寻找一个离头节点最近的一个等待位置。这里既然s已经是等待队列中最前边的那个未放弃线程了,那么通过shouldParkAfterFailedAcquire()的调整,s也必然会跑到head的next结点,下一次循环判断p==head就成立了),然后s把自己设置成head标杆结点,表示自己已经获取到资源了,acquire()也返回了。

    总结

    “释放锁”的过程相对“获取锁”的过程比较简单。释放锁时,主要进行的操作,是更新当前线程对应的锁的状态。如果当前线程对锁已经彻底释放(即state=0),则设置“锁”的持有线程为null,设置当前线程的状态为空,然后唤醒后继线程。

    acquireShared(int)

    此方法是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。下面是acquireShared()的源码:

    1 public final void acquireShared(int arg) {
    2     if (tryAcquireShared(arg) < 0)
    3         doAcquireShared(arg);
    4 }

    这里tryAcquireShared()依然需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。所以这里acquireShared()的流程就是:

    1. tryAcquireShared()尝试获取资源,成功则直接返回;
    2. 失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回。

    这里就不举tryAcquireShared()的实现了,因为比较复杂,在后面会专门讲解这部分源码

    1. doAcquireShared(int)

    此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。下面是doAcquireShared()的源码:

     1 private void doAcquireShared(int arg) {
     2     final Node node = addWaiter(Node.SHARED);//加入队列尾部
     3     boolean failed = true;//是否成功标志
     4     try {
     5         boolean interrupted = false;//等待过程中是否被中断过的标志
     6         for (;;) {
     7             final Node p = node.predecessor();//前驱
     8             if (p == head) {//如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的
     9                 int r = tryAcquireShared(arg);//尝试获取资源
    10                 if (r >= 0) {//成功
    11                     setHeadAndPropagate(node, r);//将head指向自己,还有剩余资源可以再唤醒之后的线程
    12                     p.next = null; // help GC
    13                     if (interrupted)//如果等待过程中被打断过,此时将中断补上。
    14                         selfInterrupt();
    15                     failed = false;
    16                     return;
    17                 }
    18             }
    19             
    20             //判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
    21             if (shouldParkAfterFailedAcquire(p, node) &&
    22                 parkAndCheckInterrupt())
    23                 interrupted = true;
    24         }
    25     } finally {
    26         if (failed)
    27             cancelAcquire(node);
    28     }
    29 }

    重点分析一下获取锁后的操作:setHeadAndPropagate

     1 private void setHeadAndPropagate(Node node, int propagate) {  
     2     Node h = head; // Record old head for check below   
     3     setHead(node); // head 指向自己
     4 
     5     /**
     6      *    如果读锁(共享锁)获取成功,或头部节点为空,或头节点取消,或刚获取读锁的线程的下一个节点为空,
     7      *    或在节点的下个节点也在申请读锁,则在CLH队列中传播下去唤醒线程,怎么理解这个传播呢,
     8      *    就是只要获取成功到读锁,那就要传播到下一个节点(如果一下个节点继续是读锁的申请,
     9      *    只要成功获取,就再下一个节点,直到队列尾部或为写锁的申请,停止传播)。具体请看doReleaseShared方法。
    10      *
    11      */
    12     if (propagate > 0 || h == null || h.waitStatus < 0) {    
    13         Node s = node.next;  
    14         if (s == null || s.isShared())      
    15             doReleaseShared();           
    16     }  
    17 }  
    18 
    19 private void setHead(Node node) {  
    20     head = node;  
    21     node.thread = null;  
    22     node.prev = null;  
    23 }

    小结

    acquireShared()也就结束了,让我们再梳理一下它的流程:

    1. tryAcquireShared()尝试获取资源,成功则直接返回;

    2. 失败则通过doAcquireShared()进入等待队列park(),直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。

    其实跟acquire()的流程大同小异,只不过多了个自己拿到资源后,还会去唤醒后继队友的操作,这体现了共享锁的定义

    releaseShared()

    看了前面获取所得过程,现在来看一下释放锁的过程。在AQS中是通过releaseShared()来释放共享锁的。此方法是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。下面是releaseShared()的源码:

    1 public final boolean releaseShared(int arg) {
    2     if (tryReleaseShared(arg)) {//尝试释放资源
    3         doReleaseShared();//唤醒后继结点
    4         return true;
    5     }
    6     return false;
    7 }

      此方法的流程也比较简单,一句话:释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于可重入的考量;而共享模式下的releaseShared()则没有这种要求,一是共享的实质--多线程可并发执行;二是共享模式基本也不会重入吧(至少我还没见过),所以自定义同步器可以根据需要决定返回值。

    1.doReleaseShared()

    此方法主要用于唤醒后继。下面是它的源码:

     1 private void doReleaseShared() {  
     2     for (;;) {  
     3         Node h = head;  
     4         if (h != null && h != tail) {   // 从队列的头部开始遍历每一个节点 
     5             int ws = h.waitStatus;  
     6             // 如果节点状态为 Node.SIGNAL,将状态设置为0,设置成功,唤醒线程。
     7             // 为什么会设置不成功,可能改节点被取消;还有一种情况就是有多个线程在运行该代码段,这就是PROPAGATE的含义吧。
     8             // 如果一个节点的状态设置为 Node.SIGNAL,则说明它有后继节点,并且处于阻塞状态
     9             if (ws == Node.SIGNAL) { 
    10                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))  
    11                     continue;            // loop to recheck cases  
    12                 unparkSuccessor(h);  
    13             }
    14             // 如果状态为0,则设置为Node.PROPAGATE,设置为传播,该值然后会在什么时候变化呢?
    15             // 在判断该节点的下一个节点是否需要阻塞时,会判断,如果状态不是Node.SIGNAL或取消状态,
    16             // 为了保险起见,会将前置节点状态设置为Node.SIGNAL,然后再次判断,是否需要阻塞。
    17             else if (ws == 0 &&  
    18                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    19                 continue;                // loop on failed CAS  
    20         }  
    21         /**
    22          * 如果处理过一次 unparkSuccessor 方法后,头节点没有发生变化,就退出该方法,那head在什么时候会改变呢?
    23          * 当然在是抢占锁成功的时候,head节点代表获取锁的节点。一旦获取锁成功,则又会进入setHeadAndPropagate方法,
    24          * 当然又会触发doReleaseShared方法,传播特性应该就是表现在这里吧。再想一下,同一时间,可以有多个多线程占有锁,
    25          * 那在锁释放时,写锁的释放比较简单,就是从头部节点下的第一个非取消节点,唤醒线程即可,
    26          * 为了在释放读锁的上下文环境中获取代表读锁的线程,将信息存入在 readHolds ThreadLocal变量中。
    27          */
    28         if (h == head)                   // loop if head changed 
    29             break;  
    30     }  
    31 }

    参考资料:

    Java并发之AQS详解

    Java多线程系列--“JUC锁”03之 公平锁(一)

    java并发锁ReentrantLock源码分析一 可重入支持中断锁的实现原理

  • 相关阅读:
    在C#中如何使用资源的方法
    C#调用windows API的一些方法
    Uml学习-类图简介
    Uml学习-用例建模简介
    sqlserver中DATE类型的数据转化 CONVERT
    mysql database和schema区别
    nginx buffer
    django pk 和id用法
    sed正则
    kong 插件开发分析
  • 原文地址:https://www.cnblogs.com/2015110615L/p/6754529.html
Copyright © 2011-2022 走看看