zoukankan      html  css  js  c++  java
  • 【JUC源码解析】AQS

    简介

    AQS,也即AbstractQueuedSynchronizer,抽象队列同步器,提供了一个框架,可以依赖它实现阻塞锁和相关同步器。有两种类型,独占式(Exclusive)和共享式(Share)。

    概述

    同步器,维护了一个共享状态(state)和一个同步队列(链表)。

    共享状态,表示共享资源的状态;初始时为0,表示未锁定,当有一个线程成功抢占此资源时,状态加1,释放资源时,状态减1;通过CAS改变state,一般需要子类实现具体的逻辑。

    同步队列(链表,Node),当一个线程抢占资源失败时,会为此线程创建一个Node,并添加到链表尾部。链表里只有排在前头的结点对应的线程才有资格竞争资源,成功则获得锁,访问资源结束后,释放锁,结点也从链表中移除,下次来竞争资源时,会重新为其创建结点。

     

    结点

    Node

     1     static final class Node {
     2         static final Node SHARED = new Node(); // 标记一个结点(对应的线程)在共享模式下等待
     3         static final Node EXCLUSIVE = null; // 标记一个结点(对应的线程)在独占模式下等待
     4         static final int CANCELLED = 1; // waitStatus的值,表示该结点(对应的线程)已被取消
     5         static final int SIGNAL = -1; // waitStatus的值,表示后继结点(对应的线程)需要被唤醒
     6         static final int CONDITION = -2; // waitStatus的值,表示该结点(对应的线程)在等待某一条件
     7         static final int PROPAGATE = -3; // waitStatus的值,表示有资源可用,新head结点需要继续唤醒后继结点(共享模式下,多线程并发释放资源,而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;设置新的head结点时,会继续唤醒其后继结点)
     8         volatile int waitStatus; // 等待状态,取值范围,-3,-2,-1,0,1
     9         volatile Node prev; // 前驱结点
    10         volatile Node next; // 后继结点
    11         volatile Thread thread; // 结点对应的线程
    12         Node nextWaiter; // 等待队列里下一个等待条件的结点
    13 
    14         final boolean isShared() { // 判断是否为共享模式
    15             return nextWaiter == SHARED;
    16         }
    17 
    18         final Node predecessor() throws NullPointerException { // 前驱结点
    19             Node p = prev;
    20             if (p == null)
    21                 throw new NullPointerException();
    22             else
    23                 return p;
    24         }
    25 
    26         Node() { // 初始化head或share标记结点
    27         }
    28 
    29         Node(Thread thread, Node mode) { // 同步队列
    30             this.nextWaiter = mode;
    31             this.thread = thread;
    32         }
    33 
    34         Node(Thread thread, int waitStatus) { // 等待队列
    35             this.waitStatus = waitStatus;
    36             this.thread = thread;
    37         }
    38     }

     

    属性

    1     private transient volatile Node head; // 指向同步队列(wait queue)的头结点
    2     private transient volatile Node tail; // 指向同步队列(wait queue)的尾节点
    3     private volatile int state; // 同步状态

     

    独占模式

    获取资源入口

    acquire(int)

    1     public final void acquire(int arg) { // 入口
    2         // 竞争资源成功,直接返回;否则,入队等待,司机而动
    3         if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
    4             selfInterrupt(); // 补上中断
    5     }

     

    尝试获取资源

    tryAcquire(int)

    1     protected boolean tryAcquire(int arg) {
    2         throw new UnsupportedOperationException(); // 留给子类实现
    3     }

     

    快速添加结点

    addWaiter(Node)

     1     private Node addWaiter(Node mode) {
     2         Node node = new Node(Thread.currentThread(), mode); // 为当前线程创建结点
     3         Node pred = tail;
     4         if (pred != null) { // 队列不为空,尝试快速添加结点
     5             node.prev = pred;
     6             if (compareAndSetTail(pred, node)) { // 设置node为tail结点
     7                 pred.next = node; // 老tail结点的后继指向新tail结点
     8                 return node; // 返回老tail结点,注意,是老的tail结点
     9             }
    10         }
    11         enq(node); // 否则,自旋添加结点
    12         return node;
    13     }

     快速通道,当队列不为空时,将结点添加到队尾,CAS操作,成功,则返回老的末尾结点。如下图所示。如果失败,则进入自旋添加结点通道。

     

    自旋添加结点

    enq(Node)

     1     private Node enq(final Node node) {
     2         for (;;) { // 自旋
     3             Node t = tail;
     4             if (t == null) { // 队列为空
     5                 if (compareAndSetHead(new Node())) // 创建一个标记结点,head指向它
     6                     tail = head; // tail也指向此结点
     7             } else {
     8                 node.prev = t; // node的前驱指向tail结点
     9                 if (compareAndSetTail(t, node)) { // 设置node为tail结点
    10                     t.next = node; // 老tail结点的后继指向新tail结点
    11                     return t; // 返回老tail结点,注意,是老的tail结点
    12                 }
    13             }
    14         }
    15     }

    自旋添加结点,如果队列为空,创建一个标记结点,head和tail都指向它,否则,设置node结点为新的tail结点,CAS操作,失败重试,直至成功。 见下图。

    队列为空,添加标记结点,好处是,第一个入队的线程结点和后续结点是一样的逻辑,无需特别处理。

     

    入队等待,伺机而动

    acquireQueued(Node, int)

     1     final boolean acquireQueued(final Node node, int arg) {
     2         boolean failed = true; // 标记是否成功
     3         try {
     4             boolean interrupted = false; // 记录中断
     5             for (;;) { // 自旋
     6                 final Node p = node.predecessor(); // 前驱
     7                 if (p == head && tryAcquire(arg)) { // 如果前驱是head结点,表示自己可以竞争资源了(等待中被唤醒或中断)
     8                     setHead(node); // 成功后,将自己设置为head结点
     9                     p.next = null; // 老的head结点出队
    10                     failed = false; // 成功
    11                     return interrupted; // 返回
    12                 }
    13                 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 寻找安全停靠点,检查中断
    14                     interrupted = true;
    15             }
    16         } finally {
    17             if (failed)
    18                 cancelAcquire(node); // 失败则取消
    19         }
    20     }

    入队等待。如果当前结点的前驱是head结点,则尝试获取资源,成功则将此结点设置为head结点,老的head结点出队,返回。否则,寻找安全停靠点,并检查中断标记,如果未找到停靠点或被唤醒,则继续尝试竞争资源。线程总是在安全停靠点处打盹儿(挂起),或是在竞争资源的路上。 

     

    寻找安全停靠点

    shouldParkAfterFailedAcquire(Node, Node)

     1     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
     2         int ws = pred.waitStatus; // 查看前驱的等待状态
     3         if (ws == Node.SIGNAL) // 释放锁后会唤醒自己
     4             return true; // 可以在此停靠(阻塞)
     5         if (ws > 0) { // 如果前驱已取消
     6             do { // 一直往前找
     7                 node.prev = pred = pred.prev; // 将自己的前驱往前移
     8             } while (pred.waitStatus > 0); // 直到安全停靠点(没有取消的结点)
     9             pred.next = node; // 并将自己设置为前驱的后继结点
    10         } else {
    11             compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 与前驱约定好,释放锁后唤醒自己
    12         }
    13         return false; // 暂时不能停靠,继续尝试获取锁
    14     }

    寻找安全停靠点。所谓安全停靠点,指的是,当前结点的前驱结点waitStatus = -1(SIGNAL)。否则,检查其是否是有效的,如果无效(已经取消)则往前找,直到找到第一个有效结点,并设置为自己的前驱结点,也把自己设置为它的后继结点(等于把无效结点截掉了);然后将前驱结点的waitStatus设置为-1,并继续竞争资源,暂时不能停靠。如下图。

     

    停靠并检查中断

    parkAndCheckInterrupt()

    1     private final boolean parkAndCheckInterrupt() {
    2         LockSupport.park(this); // 线程挂起
    3         // 检查中断,并清除中断标记,如果不清除,下次挂起时,会立刻响应中断,所以要清除;最后再把中断补上
    4         return Thread.interrupted();
    5     }

    线程挂起,被唤醒后,需要检查中断标记,同时清除中断标记,是为了下次挂起时,被上次的中断标记立刻中断,从而再也无法被挂起(因为一直有中断标记,一旦被挂起,就立刻响应中断)

    释放资源入口

    release(int)

    1     public final boolean release(int arg) {
    2         if (tryRelease(arg)) { // 成功释放
    3             Node h = head; // head结点
    4             if (h != null && h.waitStatus != 0) // waitStatus等于0,说明后面没有要释放的线程结点
    5                 unparkSuccessor(h); // 唤醒等待队列里下一个结点对应的线程
    6             return true; // 成功
    7         }
    8         return false; // 失败
    9     }

    独占式释放资源,这里肯定是单线程,无需考虑并发。head结点存在,并且waitStatus不为0(肯定也不为1,取消),则唤醒等待队列里下一个结点对应的线程。

     

    释放资源

    tryRelease(int)

    1     protected boolean tryRelease(int arg) {
    2         throw new UnsupportedOperationException(); // 留给子类实现
    3     }

     

    通知后继结点

    unparkSuccessor(Node)

     1     private void unparkSuccessor(Node node) {
     2         int ws = node.waitStatus;
     3         if (ws < 0)
     4             compareAndSetWaitStatus(node, ws, 0); // waitStatus置为0
     5         Node s = node.next; // 后继结点
     6         if (s == null || s.waitStatus > 0) { // 如果后继结点不存在,或者已取消
     7             s = null;
     8             // 从tail结点开始,往前搜索,直至第一次遇见取消的结点,并将改取消结点的后继结点作为待唤醒的结点
     9             for (Node t = tail; t != null && t != node; t = t.prev)
    10                 if (t.waitStatus <= 0)
    11                     s = t;
    12         }
    13         if (s != null) // 如果后继节点存在,则唤醒其所对应的线程
    14             LockSupport.unpark(s.thread);
    15     }

    通知后继结点。当前结点的waitStatus设置为0,不成功也没关系,可能被等待线程改变了(寻找安全停靠点的那个家伙);检查当前线程的后继结点,如果不存在(可能取消了),就从tail结点开始往前找,直到第一次遇见取消的结点,然后将这个取消结点的后继结点作为当前线程要唤醒的后继结点;最后,如果找到了要唤醒的结点,则唤醒此结点对应的线程。

     

    共享模式

    获取资源入口

    acquireShared(int)

    1     public final void acquireShared(int arg) {
    2         if (tryAcquireShared(arg) < 0) // 尝试获取资源
    3             doAcquireShared(arg); // 入队等待
    4     }

     

    尝试获取资源

    tryAcquireShared(int)

    1     protected int tryAcquireShared(int arg) {
    2         throw new UnsupportedOperationException(); // 留给子类实现
    3     }

     入队等待

    doAcquireShared(int)

     1     private void doAcquireShared(int arg) {
     2         final Node node = addWaiter(Node.SHARED); // 入队
     3         boolean failed = true; // 记录是否成功
     4         try {
     5             boolean interrupted = false; // 记录中断
     6             for (;;) { // 自旋
     7                 final Node p = node.predecessor(); // 前驱结点
     8                 if (p == head) { // 在head后面可以竞争资源
     9                     int r = tryAcquireShared(arg); // 尝试获取资源
    10                     if (r >= 0) { // 获取成功
    11                         setHeadAndPropagate(node, r); // 设置head指向当前结点,如果剩余资源,通知后面的结点
    12                         p.next = null; // help GC
    13                         if (interrupted) // 补上中断
    14                             selfInterrupt();
    15                         failed = false; // 成功
    16                         return;
    17                     }
    18                 }
    19                 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 寻找安全停靠点,检查中断
    20                     interrupted = true;
    21             }
    22         } finally {
    23             if (failed)
    24                 cancelAcquire(node); // 失败则取消
    25         }
    26     }

    共享模式下,入队等待。首先尝试获取资源,会返回一个整数,大于0说明还有资源,后面的线程可以继续抢占。否则,寻找安全停靠点,检查中断;如果找不到停靠点或者被唤醒,则继续竞争资源。

     

    通知后面的结点

    setHeadAndPropagate(Node, int)

     1     private void setHeadAndPropagate(Node node, int propagate) {
     2         Node h = head;
     3         setHead(node); // 设置head指向当前结点
     4         // 如果还有剩余资源,或者是老head结点为空,或者老head结点的等待状态小于0,或者是新head结点为空,或者新head结点的等待状态小于0
     5         if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
     6             Node s = node.next; // 后继结点
     7             if (s == null || s.isShared()) // 如果后继结点为空,或者后继结点处于共享模式
     8                 doReleaseShared(); // 释放
     9         }
    10     }

     以下几种情况均需要唤醒后面的线程,包括还剩余资源,或者head结点为空,或者head结点的等待状态小于0;或者新的head结点为空,或者新的head结点的等待状态小于0

     

    释放资源入口

    releaseShared(int)

    1     public final boolean releaseShared(int arg) {
    2         if (tryReleaseShared(arg)) { // 尝试释放资源
    3             doReleaseShared(); // 唤醒后面的结点对应的线程
    4             return true;
    5         }
    6         return false;
    7     }

    共享模式释放资源,也许是多个线程并发释放,所以需要考虑并发问题,同样是CAS操作(CPU指令原语,保证原子性)

     

    唤醒后面的线程

    doReleaseShared()

     1     private void doReleaseShared() {
     2         for (;;) { // 自旋,由于是共享模式,存在并发问题,CAS操作
     3             Node h = head; // head结点
     4             if (h != null && h != tail) { // 队列不为空
     5                 int ws = h.waitStatus; // 等待状态
     6                 if (ws == Node.SIGNAL) { // 如果是signal,表示后继结点线程需要被唤醒
     7                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 设置waitStatus为0,防止重复
     8                         continue;
     9                     unparkSuccessor(h); // 唤醒后继结点线程
    10                 // 保证传播,设置waitStatus为propagate, 结合setHeadAndPropagate(Node, int), waitStatus < 0时,,调用doReleaseShared()方法
    11                 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 
    12                     continue;
    13             }
    14             if (h == head) // 有新的结点入队(空队列时),或者有结点出队,导致head结点改变,需要重新释放
    15                 break;
    16         }
    17     }

    共享模式下,多线程并发释放资源,而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;设置新的head结点时,会继续唤醒其后继结点。 

    传播是什么鬼

    共享模式下,要保证释放事件的传播。比如,一个共享资源,可以同时容纳3个线程同时访问。假如,有两个获得资源的线程访问结束,相继释放锁,而此时,临界区外只有一个线程在等待,且已经创建好结点排在同步队列里,就在head结点的后面。

    反过来想,假如没有propagate状态,如下图所示。

    1. 线程1释放资源,head结点的等待状态,ws = -1,不变,tail结点的线程被唤醒(还没有成为新的head结点,正在竞争资源的路上),此时,已经没有了等待线程;而这个时候,线程2也释放资源,由于head(还是原来的head)结点的等待状态,ws依然是-1(唤醒后继线程),还需要唤醒后面的线程,然而并没有待唤醒的线程;为了不重复这个动作,我们可以在所有的线程释放资源之前,安全地将head结点的ws设置为0,这样就能保证,只有一个线程能设置成功,因此避免了重复唤醒的问题(因为没有多余的等待线程)。

    2.接1,但是又有一个问题,假如,在tail线程正在获取资源时,又有新的线程进来了(入队寻找安全点),准备排在了tail结点后面,恰巧tail结点成为新的head结点时,它的ws还是0,老的head结点的ws也是0,调用setHeadAndPropagate方法时,并不会调用doReleaseShared方法(不满足条件),【注意,一个释放事件可能被忽略】,然后新生结点轮到CPU时间片(已经找到了安全点,准备跟前驱结点协商,醒后通知自己),把前驱结点的ws设置为-1,就park了。

    3.接2,此时的状态是,资源里面只有两个线程在跑,老的head和老的tail(也是当前head),而新生结点在等待被唤醒,但是资源同时容纳的线程数是3个,当然,又有结点释放资源后,肯定能唤醒新生结点,然而,这降低了并发性。于是乎,propagate状态应运而生。有了它,便能保证不重复唤醒线程,也不落下唤醒事件,而是将唤醒事件传播下去。

    4。接3,在doReleaseShared方法里如何保证新的head结点的ws已经被设置为-1了呢,(也许新生结点轮到CPU时间很少);其实不用保证,即使此刻还没协商成功,也没有关系,ws会设置为-3(propagate),唤醒事件已经保留,且会传播下去。等新结点再去设置前驱结点时,发现已经改变,那么便认为不是安全停靠点,于是继续检查,结果发现前驱结点已经是head结点了,太好了,此时可以去竞争资源了,不用被挂起了。

     

    取消

    cancelAcquire(Node)

     1     private void cancelAcquire(Node node) {
     2         if (node == null) // 为空,忽略
     3             return;
     4         node.thread = null; // 取消,不必记录线程
     5         Node pred = node.prev; // 前驱
     6         while (pred.waitStatus > 0) // 一直往前找,知道遇见第一个有效的结点,并设置为自己的前驱
     7             node.prev = pred = pred.prev;
     8         Node predNext = pred.next; // 前驱的后继结点
     9         node.waitStatus = Node.CANCELLED; // 设置waitStatus为1,取消
    10 
    11         if (node == tail && compareAndSetTail(node, pred)) { // 如果当前结点是最后一个,直接移除
    12             compareAndSetNext(pred, predNext, null);
    13         } else {
    14             int ws;
    15             // 如果前驱不是头结点,而且,前驱的等待状态是SIGNAL或不大于0的情况下设置成SIGNAL,并且线程不为空
    16             if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL
    17                     || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
    18                 Node next = node.next;
    19                 if (next != null && next.waitStatus <= 0) // 当前结点的后继结点不为空,且有效
    20                     compareAndSetNext(pred, predNext, next); // 移除当前结点,将其前驱和后继连在一起
    21             } else {
    22                 unparkSuccessor(node); // 否则,唤醒后继结点(pred是头结点,或者其他的动态情况,比如进来了新的结点,或者有别的结点也取消了,等等)
    23             }
    24 
    25             node.next = node; // 等待被回收
    26         }
    27     }

    线程取消,其所对应的结点,waitStatus设为1(CANCEL),并且尝试移除该结点。首先找出其前面的第一个有效的结点作为其前驱结点,并取得前驱结点的后继(可能是它自己,也可能不是);然后,如果此结点是末尾结点,那么直接移除,并设置前驱为新的末尾(tail)结点,否则查看前驱是否是头结点,以及前驱的等待状态还有线程,如果条件满足,则移除该结点.,将该结点的前驱和后继连接在一起;如果条件不满足,调用唤醒后继结点的方法,由于waitStatus已经设置为CANCEL,最后该结点一定会被移除的(无效),最后等待垃圾回收。

     

    条件Condition

    等待队列

    1         private transient Node firstWaiter; // 等待队列头结点
    2         private transient Node lastWaiter; // 等待队列尾结点

     

    中断模式

    1         private static final int REINTERRUPT = 1; // 在收到信号之后发生过中断
    2         private static final int THROW_IE = -1; // 在收到信号之前发生过中断

    等待

    await()

     1         public final void await() throws InterruptedException {
     2             if (Thread.interrupted())
     3                 throw new InterruptedException();
     4             Node node = addConditionWaiter(); // 构造结点,加入到等待队列尾部
     5             int savedState = fullyRelease(node); // 释放锁
     6             int interruptMode = 0;
     7             while (!isOnSyncQueue(node)) { // 判断当前结点是否已经在同步队列里,如果不在,就阻塞当前线程
     8                 LockSupport.park(this);
     9                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 等待过程中如果发生了中断,则退出循环
    10                     break;
    11             }
    12             // 被唤醒后,重新竞争锁,并记录中断状态
    13             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    14                 interruptMode = REINTERRUPT;
    15             if (node.nextWaiter != null)
    16                 unlinkCancelledWaiters(); // 清理一波已经取消的结点
    17             if (interruptMode != 0)
    18                 reportInterruptAfterWait(interruptMode); // 报告中断
    19         }

    添加结点到等待队列

    addConditionWaiter()

     1         private Node addConditionWaiter() {
     2             Node t = lastWaiter; // 末尾结点
     3             if (t != null && t.waitStatus != Node.CONDITION) { // 尾结点取消,清理一波
     4                 unlinkCancelledWaiters();
     5                 t = lastWaiter; // 有效末尾结点
     6             }
     7             Node node = new Node(Thread.currentThread(), Node.CONDITION); // 创建结点
     8             if (t == null)
     9                 firstWaiter = node;
    10             else
    11                 t.nextWaiter = node;
    12             lastWaiter = node; // 加入到尾部
    13             return node;
    14         }

    清除已经取消的结点

    unlinkCancelledWaiters()

     1         private void unlinkCancelledWaiters() {
     2             Node t = firstWaiter; // 头结点
     3             Node trail = null; // 蔓延结点,为了记录有效结点,一直向后蔓延;否则,只依赖firstWaiter,末了,还得从尾到头遍历一遍                                                           
     4             while (t != null) {
     5                 Node next = t.nextWaiter; // 后继结点
     6                 if (t.waitStatus != Node.CONDITION) { // 已经取消
     7                     t.nextWaiter = null; // 断开
     8                     if (trail == null) // trail为空,说明还没遇到有效结点,继续往后找
     9                         firstWaiter = next; // 头节点指向下一个
    10                     else
    11                         trail.nextWaiter = next; // 否则,已经找到有效结点,trail指向的就是目前为止,最靠后的一个有效结点,所以它指向下一个
    12                     if (next == null) // 遍历结束
    13                         lastWaiter = trail; // 设置尾结点
    14                 } else
    15                     trail = t; // trail总是指向后一个有效结点
    16                 t = next; // 向后找
    17             }
    18         }

     释放

    fullyRelease(Node)

     1     final int fullyRelease(Node node) {
     2         boolean failed = true;
     3         try {
     4             int savedState = getState(); // 获取状态
     5             if (release(savedState)) { // 释放
     6                 failed = false;
     7                 return savedState;
     8             } else {
     9                 throw new IllegalMonitorStateException();
    10             }
    11         } finally {
    12             if (failed) // 失败,则标记为取消
    13                 node.waitStatus = Node.CANCELLED;
    14         }
    15     }

    结点是否在同步队列里

    isOnSyncQueue(Node)

     1     final boolean isOnSyncQueue(Node node) {
     2         // 等待状态还是CONDITION,或者前驱为空,则说明不在,因为它不可能是head结点(同步队列里,只有head结点可以为null),而是被head唤醒的
     3         if (node.waitStatus == Node.CONDITION || node.prev == null) 
     4             return false;
     5         if (node.next != null) // 如果有后继结点,说明肯定在同步队列里了
     6             return true;
     7         return findNodeFromTail(node); // 如果后继结点为空,需要从尾部开始查找,因为不断地有新结点加入进来,但也在不远处
     8     }
     9 
    10     private boolean findNodeFromTail(Node node) {
    11         Node t = tail; // 从尾部开始
    12         for (;;) {
    13             if (t == node) // 找到返回
    14                 return true;
    15             if (t == null)
    16                 return false;
    17             t = t.prev; // 往前移
    18         }
    19     }

    等待过程中是否有中断

    checkInterruptWhileWaiting(Node)

    1         private int checkInterruptWhileWaiting(Node node) {
    2             return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
    3         }

    中断唤醒线程

    transferAfterCancelledWait(Node)

    1     final boolean transferAfterCancelledWait(Node node) { // interrupt
    2         if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { // 设置成功,直接入队
    3             enq(node);
    4             return true;
    5         }
    6         while (!isOnSyncQueue(node)) // 如果设置失败,说明已经丢了一个信号,说不定正在入队,自旋一会儿,等入队完成
    7             Thread.yield();
    8         return false;
    9     }

    报告中断

    reportInterruptAfterWait(int)

    1         private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
    2             if (interruptMode == THROW_IE)
    3                 throw new InterruptedException();
    4             else if (interruptMode == REINTERRUPT)
    5                 selfInterrupt();
    6         }

    信号

    signal()

    1         public final void signal() {
    2             if (!isHeldExclusively()) // 非独占式,直接抛异常
    3                 throw new IllegalMonitorStateException();
    4             Node first = firstWaiter; // 头节点
    5             if (first != null)
    6                 doSignal(first); // 发布信号
    7         }

    发布信号

    doSignal(Node)

    1         private void doSignal(Node first) {
    2             do {
    3                 if ((firstWaiter = first.nextWaiter) == null) // 判断是否为空
    4                     lastWaiter = null;
    5                 first.nextWaiter = null; // 断开
    6             } while (!transferForSignal(first) && (first = firstWaiter) != null); // 结点取消,则往后移动一个,继续通知
    7         }

    唤醒线程

    transferForSignal(Node)

     1     final boolean transferForSignal(Node node) { // Signal
     2         if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 修改失败,说明已经取消
     3             return false;
     4         Node p = enq(node); // 入队,返回前驱结点 
     5         int ws = p.waitStatus; // 前驱结点的等待状态
     6         if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 已经取消,或者waitStatus设置失败,(不是安全停靠点),则唤醒线程
     7             LockSupport.unpark(node.thread);
     8         return true;
     9     }

    行文至此结束。

    尊重他人的劳动,转载请注明出处:http://www.cnblogs.com/aniao/p/aniao_aqs.html

  • 相关阅读:
    [javaSE] 网络编程(浏览器客户端-自定义服务端)
    [javaSE] 网络编程(URLConnection)
    [javaSE] 网络编程(URL)
    [javaSE] IO流(FIle对象递归文件列表)
    tcping 与 telnet命令粗略使用
    让mysql监听ipv4
    shell脚本[] [[]] -n -z 的含义解析
    ansible的主机的默认配置部分
    MySQL备份与还原
    gunicorn 简介
  • 原文地址:https://www.cnblogs.com/aniao/p/aniao_aqs.html
Copyright © 2011-2022 走看看