zoukankan      html  css  js  c++  java
  • 并发编程学习笔记(十五、AQS同步器源码解析4,AQS条件锁Condition实现原理)

    目录:

    • 概述
    • 同步队列、条件队列
    • Condition源码解析

    概述

    首先我们知道Condition的await()、signa()内置锁synchronize配套的wait()及notify()的增强,可以更加细化的控制锁的唤醒条件

    那么我们这里来类比下它们之间的机制:

    • 同步:内置锁的wait()必须要在synchronize代码块中才能调用,也就是必须要获取到监视器锁之后才能执行;await()与之类似,也是必须获取到锁资源后才能执行。
    • 等待:调用wait()的线程会释放已经获取的锁,进入当前监视器锁的等待队列中;与之类似,await()也会释放自己的lock,进入到当前Condition的条件队列中。
    • 唤醒:调用notify()会唤醒等待在该监视器锁的线程,并参与监视器锁的竞争,并在获取锁之后从wait()处恢复执行;同理,调用signal()会唤醒对应线程,并获取锁资源,在获取后会在signal()处恢复执行。

    同步队列、条件队列

    上面我们说到了内置锁与Condition的等待,一个是放到等待队列中,一个是放到条件队列中,那么我们这里来分别说下。

    ——————————————————————————————————————————————————————————————————————

    1、同步队列(等待队列):

    前面我们在说独占锁的获取时,所有等待的线程都会被包装成一个Node进入同步队列。

    由图可知其是一个先进先出(FIFO)的同步阻塞队列,基于双向链表实现,使用prev、next来串联前驱及后继节点。

    其中有一个nextWaiter属性一直没有提到过,即使是共享模式下它也只是简单的标记了一下,指向了一个空节点。

    因此在同步队列中,并不会用它来串联节点。

    ——————————————————————————————————————————————————————————————————————

    2、条件队列:

    Condition同理,也会在调用await()时将其包装成一个Node放入到条件队列中,但值得注意的是这个条件队列是基于单向链表实现,以nextWaiter来标记下一个节点的指针。

    与同步队列恰好相反,条件队列并不会使用prev、next来串联链表,而是使用nextWaiter。

    其结构大致如下:

    你还记得AQS中Node的waitStatus的值嘛,没错有一个CONDITION就是来标记同步队列的,在源码中我会详述。

    ——————————————————————————————————————————————————————————————————————

    3、同步队列与条件队列的联系:

    一般情况下同步队列和条件队列是互相独立,没有任何关系的。但当调用某个条件队列中的signal()去唤醒线程时,会让这个线程和普通线程一样去竞争锁,如果没有抢到就会被加到同步队列中去,此时这个条件队列就会变成同步队列了。

    这里你可能会有一个疑问,为啥条件队列获取不到锁后不是重新回到条件队列,而是加入到同步队列去了呢,这不是有问题嘛。

    其实是没有问题的,因为你是满足条件后才会调用signal()唤醒线程的,所以线程尝试获取锁的时候其实已经是满足唤醒条件了,就算未获取到锁也只是重新入队(同步队列)而已,照样可以在下次被唤醒时重新竞争锁

    这里我们需要注意的是Node节点是一个一个转移过去的,即使是调用signalAll()也是一样,而不是将整个条件队列接在同步队列的队尾。

    那为什么是一个一个的转移呢,因为同步队列使用的是prev、next来串联链表,条件队列使用的是nextWaiter,它们在数据结构上是完全独立的。因此我们在转换的过程中需要把nextWaiter断开,建立新的prev、next,这也就是为啥要一个个转移的原因之一了。

    ——————————————————————————————————————————————————————————————————————

    4、入队和出队的锁状态:

    • 条件队列:入队前有锁 >>> 队列中释放锁 >>> 离开队列竞争锁 >>> 转到同步队列。
    • 同步队列:入队前无锁 >>> 队列中竞争锁 >>> 离开队列持有锁。

    好了,通过上面的介绍相信你对条件对口已经有个一个感性的认知,接下来我们来说说它的源码。

    Condition源码解析

    1、基本属性及构造函数:

    前面我们说到Condition接口的实现类就是ConditionObject,在学习其主要逻辑前我们还是照常先看下其基本属性及构造函数吧。

    1 /** 条件队列队头节点 */
    2 private transient Node firstWaiter;
    3 /** 条件队列队尾节点 */
    4 private transient Node lastWaiter;
    5 
    6 /**
    7  * 空的构造器,没啥好说的,但可以看出条件队列是延时初始化的。
    8  */
    9 public ConditionObject() { }

    2、await():

    然后我们来看下Condition的线程阻塞方法await()。

     1 public final void await() throws InterruptedException {
     2     // 若当前线程已经中断则抛出异常
     3     if (Thread.interrupted())
     4         throw new InterruptedException();
     5     // 将当前线程封装成Node添加到条件队列中
     6     Node node = addConditionWaiter();
     7     // 释放当前线程所占用的锁,并保存当前锁的状态
     8     int savedState = fullyRelease(node);
     9     int interruptMode = 0;
    10     // 若当前线程不在同步队列中,说明刚刚被await了(也就是条件队列),但还未调用signal()方法,则直接循环阻塞线程
    11     while (!isOnSyncQueue(node)) {
    12         // 阻塞线程,停止运行
    13         LockSupport.park(this);
    14         // 执行到这说明,要么已经调用signal()方法,要么是线程中断了
    15         // 所以这里检查下唤醒原因,如果是因中断而唤醒,则跳出循环
    16         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    17             break;
    18     }
    19     // 这部分非主流程,之后再解析
    20     /*if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    21         interruptMode = REINTERRUPT;
    22     if (node.nextWaiter != null) // clean up if cancelled
    23         unlinkCancelledWaiters();
    24     if (interruptMode != 0)
    25         reportInterruptAfterWait(interruptMode);*/
    26 }

    上面已经把await()的主要流程说明了下,其核心步骤如下,接下来我会一一解析:

    • 将当前线程封装成Node添加到条件队列中
    • 释放当前线程所占用的锁,并保存当前锁的状态
    • 循环阻塞线程

    ——————————————————————————————————————————————————————————————————————

    A、将当前线程封装成Node添加到条件队列中:addConditionWaiter()

     1 private Node addConditionWaiter() {
     2     // 获取队尾队列
     3     Node t = lastWaiter;
     4     // 如果队尾队列不是CONDITION状态,则遍历整个队列,清除所有被cancel的节点
     5     if (t != null && t.waitStatus != Node.CONDITION) {
     6         // 清除所有已经取消等待的线程
     7         unlinkCancelledWaiters();
     8         t = lastWaiter;
     9     }
    10     // 将当前队列包装成CONDITION的Node,并传入条件队列
    11     Node node = new Node(Thread.currentThread(), Node.CONDITION);
    12     if (t == null)
    13         // 若队尾为空,说明队列还无节点,此时将当前节点设置为头节点
    14         firstWaiter = node;
    15     else
    16         // 队尾非空,将队尾节点的nextWaiter指针指向当前节点,也就是当当前节点入到队尾
    17         t.nextWaiter = node;
    18     lastWaiter = node;
    19     return node;
    20 }

    这一步非常简单,而unlinkCancelledWaiters()也不过是对链表的操作它剔除了队列中状态不为CONDITION的节点,只要了解链表的话可以非常轻松的读懂,我就不再赘述。

     1 private void unlinkCancelledWaiters() {
     2     Node t = firstWaiter;
     3     Node trail = null;
     4     while (t != null) {
     5         Node next = t.nextWaiter;
     6         if (t.waitStatus != Node.CONDITION) {
     7             t.nextWaiter = null;
     8             if (trail == null)
     9                 firstWaiter = next;
    10             else
    11                 trail.nextWaiter = next;
    12             if (next == null)
    13                 lastWaiter = trail;
    14         }
    15         else
    16             trail = t;
    17         t = next;
    18     }
    19 }

    这里有一个值得注意的问题,就是是否存在两个不同的线程同时入队的情况

    答案是否定的,因为入队前是一定获取到锁的,所以不存在并发的情况,不需要CAS操作。

    ——————————————————————————————————————————————————————————————————————

    B、释放当前线程所占用的锁,并保存当前锁的状态:fullyRelease(node)

     1 final int fullyRelease(Node node) {
     2     boolean failed = true;
     3     try {
     4         int savedState = getState();
     5         if (release(savedState)) {
     6             failed = false;
     7             return savedState;
     8         } else {
     9             throw new IllegalMonitorStateException();
    10         }
    11     } finally {
    12         if (failed)
    13             node.waitStatus = Node.CANCELLED;
    14     }
    15 }

    首先我们要知道,当调用fullyRelease()方法时说明线程已经入队了。然后根据第5行代码可以看出,其是根据release()方法来释放锁的,这个之前介绍过就不再赘述。

    但有一点奇怪的是,明明只是释放当前线程占用的锁,那为啥函数名却叫fullyRelease呢,这是因为对于可重入锁来说,无论重入了所少次,都是一次性释放完的

    这个地方有一点需要注意下,就是调用release方法是可能会抛出IllegalMonitorStateException异常,这是因为当前线程可能并不是持有锁的线程

    可之前不是说调用await方法的线程一定是持有锁的嘛,哈哈话虽这样说,但其实await方法是并没有校验的,而是放到了fullyRelease()的tryRelease()来校验Thread.currentThread() == getExclusiveOwnerThread()。

    如ReentrantLock:

     1 protected final boolean tryRelease(int releases) {
     2     int c = getState() - releases;
     3     if (Thread.currentThread() != getExclusiveOwnerThread())
     4         throw new IllegalMonitorStateException();
     5     boolean free = false;
     6     if (c == 0) {
     7         free = true;
     8         setExclusiveOwnerThread(null);
     9     }
    10     setState(c);
    11     return free;
    12 }

    ——————————————————————————————————————————————————————————————————————

    C、循环阻塞线程:

    在线程锁释放完毕后,就会调用LockSupport.park(this)方法将当前线程挂起,等待被signal()方法唤醒。

    但在挂起线程前为啥要需要调用isOnSyncQueue()来确保这个不在同步队列中呢,之前不是说了同步队列和条件队列在数据结构上述无关的嘛。为啥会出现在同步队列的情况,这是因为可能其它线程获取到了锁,导致并发调用了signal(下文说明signal会详述)。

     1 final boolean isOnSyncQueue(Node node) {
     2     // 当前线程状态为CONDITION或前驱节点为空(条件队列的prev、next都是空)则不为同步队列
     3     if (node.waitStatus == Node.CONDITION || node.prev == null)
     4         return false;
     5     // 若node的后继节点非空,则肯定为同步队列的节点
     6     if (node.next != null) // If has successor, it must be on queue
     7         return true;
     8     // 从尾部向前搜索
     9     return findNodeFromTail(node);
    10 }
     1 private boolean findNodeFromTail(Node node) {
     2     // 获取尾部节点tail,只有在同步队列中tail才非空,条件队列的队尾是lastWaiter节点
     3     Node t = tail;
     4     for (;;) {
     5         if (t == node)
     6             return true;
     7         if (t == null)
     8             return false;
     9         // 从尾部向前遍历队列
    10         t = t.prev;
    11     }
    12 }

    ——————————————————————————————————————————————————————————————————————

    3、signal():

    唤醒方法分为signal(),唤醒单个线程;和signalAll(),唤醒多个线程。因signal和signalAll非常相似,所以我先说说signalAll()。

    signalAll():

    首先在看signalAll源码之前,我们先要知道调用signalAll方法的线程signAll方法要唤醒的线程(在条件队列里等待的线程)的区别:

    • 调用signalAll方法的线程本身已经持有锁了,现在准备去释放锁
    • signalAll方法要唤醒的线程是是条件队列里挂起了,等待被signal唤醒,然后去竞争锁

    ——————————————————————————————————————————————————————————————————————

    然后我们来看看signalAll的源码:

    1 public final void signalAll() {
    2     if (!isHeldExclusively())
    3         throw new IllegalMonitorStateException();
    4     Node first = firstWaiter;
    5     if (first != null)
    6         doSignalAll(first);
    7 }

    1、首先走了isHeldExclusively()逻辑,如果不满足则抛出监视器锁状态异常,我们通过查阅其源码可发现isHeldExclusively()内部的实现是直接抛异常的,也就是说这个方法肯定是交给子类实现的。

    1 protected boolean isHeldExclusively() {
    2     throw new UnsupportedOperationException();
    3 }

    那我们来看看其子类ReentrantLock,可以发现它回去拿当前线程与当前持有锁的线程做比较。

    为什么会这样比较呢,我们之前不是说到了嘛,Condition其实就是Object中wait和notify方法的扩展。

    所以调用signalAll的肯定也是需要获取到锁的,哈哈。

    1 protected final boolean isHeldExclusively() {
    2     return getExclusiveOwnerThread() == Thread.currentThread();
    3 }

    ——————————————————————————————————————————————————————————————————————

    2、然后我们将队头的节点交给first,并在其非空的情况下(也就是条件队列有数据存在调用doSignalAll()

    1 private void doSignalAll(Node first) {
    2     lastWaiter = firstWaiter = null;
    3     do {
    4         Node next = first.nextWaiter;
    5         first.nextWaiter = null;
    6         transferForSignal(first);
    7         first = next;
    8     } while (first != null);
    9 }

    首先将队首队尾的指针都指向空,也就是清空条件队列(因为是signalAll嘛,唤醒所有,当然是情况队列咯)。

    紧接着遍历整个条件队列,因为first之前的指针是指向firstWaiter的,所以是队头节点,它能够遍历整个队列。

    最后会依次调用transferForSignal,一个个添加到同步队列的队尾。

     1 final boolean transferForSignal(Node node) {
     2     // 先通过CAS将node节点的状态由CONDITION置为0,若修改失败说明该节点已经被CONDITION了,则直接返回操作下一个节点
     3     // 如果操作成功,说明已经将该节点从条件队列唤醒了,因此会被添加到条件队列的尾队等待竞争锁
     4     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
     5         return false;
     6 
     7     // 自旋的添加到同步队列队尾,并拿到当前节点的前驱节点
     8     Node p = enq(node);
     9     int ws = p.waitStatus;
    10     // 将当前节点的前驱节点的状态置为SIGNAL,也就是要前驱节点来唤醒当前节点
    11     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    12         LockSupport.unpark(node.thread);
    13     return true;
    14 }

    ——————————————————————————————————————————————————————————————————————

    在说signal方法前我们先来总结下signalAll:

    • 将条件队列清空(只是令lastWaiter = frstWaliter = null,队列中的结点和连接关系仍然还存在)。
    • 将条件队列中的头结点取出,使之成为孤立结点(nextWaiter的prev和next属性都为null)。
    • 如果该结点处于被Cancelled了的状态,则直接跳过该结点(由于是孤立结点,则会被GC回收)。
    • 如果该结点处于正常状态,则通过enq()法将 它添加到同步队列的末尾。
    • 判断是否需要将该结点唤醒(包括设置该结点的前驱结点的状态为SIGNAL),如有必要,直接唤醒该结点。
    • 重复2-5,直到整个条件队列中的结点都被处理完。

    ——————————————————————————————————————————————————————————————————————

    signal():

    弄懂了signalAll后signal应该会很轻松,因为它们大同小异,signal只是唤醒队列中第一个没有被Cancelled的线程,如果没有则将条件队列清空。

    1 public final void signal() {
    2     if (!isHeldExclusively())
    3         throw new IllegalMonitorStateException();
    4     Node first = firstWaiter;
    5     if (first != null)
    6         doSignal(first);
    7 }
    1 private void doSignal(Node first) {
    2     do {
    3         if ( (firstWaiter = first.nextWaiter) == null)
    4             lastWaiter = null;
    5         first.nextWaiter = null;
    6     } while (!transferForSignal(first) &&
    7              (first = firstWaiter) != null);
    8 }
  • 相关阅读:
    容器技术(三)搭建本地 Registry【15】
    容器技术(三) 使用公共 Registry【14】
    容器技术(三) 镜像命名的最佳实践【13】
    容器技术(三) RUN vs CMD vs ENTRYPOINT【12】
    容器技术(三) dockerfile常用指令【11】
    容器技术(三) dockerfile调试【10】
    容器技术(三) 镜像缓存特性【9】
    容器技术(三) Dockerfile 构建镜像【8】
    容器技术(三)构建镜像【7】
    layui的图标知识
  • 原文地址:https://www.cnblogs.com/bzfsdr/p/13168468.html
Copyright © 2011-2022 走看看