zoukankan      html  css  js  c++  java
  • Java Concurrent之 AbstractQueuedSynchronizer

     ReentrantLock/CountDownLatch/Semaphore/FutureTask/ThreadPoolExecutor的源码中都会包含一个静态的内部类Sync,它继承了AbstractQueuedSynchronizer这个抽象类。

    AbstractQueuedSynchronizer是java.util.concurrent包中的核心组件之一,为并发包中的其他synchronizers提供了一组公共的基础设施。

         AQS会对进行acquire而被阻塞的线程进行管理,其管理方式是在AQS内部维护了一个FIFO的双向链表队列,队列的头部是一个空的结点,除此之外,每个结点持有着一个线程,结点中包含两个重要的属性waiteStatus和nextWaiter。结点的数据结构如下: Node中的属性waitStatus、prev、next、thread都使用了volatile修饰,这样直接的读写操作就具有内存可见性。 waitStatus表示了当前结点Node的状态

    Java代码  收藏代码
    1. static final class Node {  
    2.         /** waitStatus的值,表示此结点中的线程被取消 */  
    3.         static final int CANCELLED =  1;  
    4.         /** waitStatus value 表明后续结点中的线程需要unparking 唤醒 */  
    5.         static final int SIGNAL    = -1;  
    6.         /** waitStatus value 表明当前结点中的线程需要等待一个条件*/  
    7.         static final int CONDITION = -2;  
    8.         /** 表明结点是以共享模式进行等待(shared mode)的标记*/  
    9.         static final Node SHARED = new Node();  
    10.         /** 表明结点是以独占模式进行等待(exclusive mode)的标记*/  
    11.         static final Node EXCLUSIVE = null;  
    12.         /** 
    13.          * Status field, taking on only the values: 
    14.          *   SIGNAL: 后继结点现在(或即将)被阻塞(通过park) 那么当前结点在释放或者被取消的时候必须unpark它的后继结点 
    15.          *           为了避免竞态条件,acquire方法必须首先声明它需要一个signal,然后尝试原子的acquire 
    16.          *            如果失败了 就阻塞      
    17.          *   CANCELLED:当前结点由于超时或者中断而被取消  结点不会脱离这个状态    
    18.          *              尤其是,取消状态的结点中的线程永远不会被再次阻塞 
    19.          *   CONDITION: 当前结点在一个条件队列中。它将不会进入sync队列中直到它被transferred 
    20.          *              (这个值在这里的使用只是为了简化结构 跟其他字段的使用没有任何关系) 
    21.          *   0:          None of the above 非以上任何值 
    22.          * 
    23.          * 这些值通过数字来分类达到简化使用的效果 
    24.          * 非负的数字意味着结点不需要信号signal 这样大部分的代码不需要检查特定的值 just 检查符号就ok了 
    25.          * 
    26.          * 这个字段对于普通的sync结点初始化为0 对于条件结点初始化为CONDITION(-2) 本字段的值通过CAS操作进行修改 
    27.          */  
    28.         volatile int waitStatus;  
    29.         /** 
    30.          * 连接到当前结点或线程依赖的用于检查waitStatus等待状态的前驱结点。 
    31.          * 进入队列时赋值,出队列时置空(为GC考虑)。 
    32.          * 根据前驱结点的取消(CANCELLED),我们查找一个非取消结点的while循环短路,将总是会退出 ; 
    33.          * 因为头结点永远不会被取消:一个结点成为头结点只能通过一次成功过的acquire操作的结果 
    34.          * 一个取消的线程永远不会获取操作成功(acquire操作成功) 
    35.          * 一个线程只能取消它自己  不能是其他结点 
    36.          */  
    37.         volatile Node prev;  
    38.         /** 
    39.          * 连接到当前结点或线程释放时解除阻塞(unpark)的后继结点 
    40.          * 入队列时赋值,出队列时置空(为GC考虑) 
    41.          * 入队列时不会给前驱结点的next字段赋值,需要确认compareAndSetTail(pred, node)操作是否成功 (详见Node addWaiter(Node mode)方法) 
    42.          * 所以当我们发现结点的next为空时不一定就是tail尾结点 如果next为空,可以通过尾结点向前遍历即addWaiter中调用的enq(node)方法(个人觉 
    43.          * 这是对第一次处理失败的亡羊补牢之举)官方说法double-check 双层检查 
    44.          *  
    45.          * 被取消的结点next指向的是自己而不是空(详见cancelAcquire(Node node)中最后的node.next = node; )这让isOnSyncQueue变得简单 
    46.          * Upon cancellation, we cannot adjust this field, but can notice 
    47.          * status and bypass the node if cancelled.   
    48.          */  
    49.         volatile Node next;  
    50.         /** 
    51.          * 入队列结点中的线程,构造时初始化,使用完 就置空 
    52.          */  
    53.         volatile Thread thread;  
    54.         /** 
    55.          * 连接到下一个在条件上等待的结点 或者waitStatus为特殊值SHARED 共享模式 
    56.          * 因为条件队列只有在独占模式(exclusive)下持有时访问,当结点等待在条件上,我们只需要一个简单的链表队列来持有这些结点 
    57.          * 然后他们会转移到队列去进行re-acquire操作。 
    58.          * 由于条件只能是独占的,我们可以使用一个特殊的值来声明共享模式(shared mode)来节省一个字段 
    59.          */  
    60.         Node nextWaiter;  
    61.         /** 
    62.          * 如果结点以共享模式等待  就返回true 
    63.          */  
    64.         final boolean isShared() {  
    65.             return nextWaiter == SHARED;  
    66.         }  
    67.         /** 
    68.          * 返回当前结点的前驱结点如果为null就抛出NullPointException 
    69.          * @return the predecessor of this node 
    70.          */  
    71.         final Node predecessor() throws NullPointerException {  
    72.             Node p = prev;  
    73.             if (p == null)  
    74.                 throw new NullPointerException();  
    75.             else  
    76.                 return p;  
    77.         }  
    78.          //用于建立初始化头 或 共享标识  
    79.         Node() {      
    80.         }  
    81.          //入队列时使用  
    82.         Node(Thread thread, Node mode) {     // Used by addWaiter  
    83.             this.nextWaiter = mode;  
    84.             this.thread = thread;  
    85.         }  
    86.         //用于条件结点  
    87.         Node(Thread thread, int waitStatus) { // Used by Condition  
    88.             this.waitStatus = waitStatus;  
    89.             this.thread = thread;  
    90.         }  
    91.     }  

    acquire操作

    获取同步器

    Java代码  收藏代码
    1. if(尝试获取成功){  
    2.     return ;  
    3. }else{  
    4.     加入队列;park自己  
    5. }  

     释放同步器

    Java代码  收藏代码
    1. if(尝试释放成功){  
    2.     unpark等待队列中的第一个结点  
    3. }else{  
    4.     return false;  
    5. }  
    Java代码  收藏代码
    1. /** 
    2.      * 以独占模式(exclusive mode)排他地进行的acquire操作 ,对中断不敏感 完成synchronized语义 
    3.      * 通过调用至少一次的tryAcquire实现 成功时返回 
    4.      * 否则在成功之前,一直调用tryAcquire(int)将线程加入队列,线程可能反复的阻塞和解除阻塞(park/unpark)。 
    5.      * 这个方法可以用于实现Lock.lock()方法 
    6.      * acquire是通过tryAcquire(int)来实现的,直至成功返回时结束,故我们无需自定义这个方法就可用它来实现lock。 
    7.      * tryLock()是通过Sync.tryAquire(1)来实现的 
    8.      * @param arg the acquire argument. 这个值将会被传递给tryAcquire方法  
    9.      * 但他是不间断的 可以表示任何你喜欢的内容 
    10.      */  
    11.     public final void acquire(int arg) {  
    12.         if (!tryAcquire(arg) &&  
    13.             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  
    14.             selfInterrupt();  
    15.     }  
    16.     /** 
    17.      * 尝试以独占模式进行acquire操作 这个方法应该查询这个对象状态是否允许以独占模式进行acquire操作,如果允许就获取它 
    18.      *  
    19.      *  
    20.      * <p>This method is always invoked by the thread performing 
    21.      * acquire.  If this method reports failure, the acquire method 
    22.      * may queue the thread, if it is not already queued, until it is 
    23.      * signalled by a release from some other thread. This can be used 
    24.      * to implement method {@link Lock#tryLock()}. 
    25.      * 
    26.      * 默认实现抛出UnsupportedOperationException异常 
    27.      * 
    28.      * @param arg the acquire argument. This value is always the one 
    29.      *        passed to an acquire method, or is the value saved on entry 
    30.      *        to a condition wait.  The value is otherwise uninterpreted 
    31.      *        and can represent anything you like. 
    32.      * @return {@code true} if successful. Upon success, this object has 
    33.      *         been acquired. 
    34.      * @throws IllegalMonitorStateException if acquiring would place this 
    35.      *         synchronizer in an illegal state. This exception must be 
    36.      *         thrown in a consistent fashion for synchronization to work 
    37.      *         correctly. 
    38.      * @throws UnsupportedOperationException if exclusive mode is not supported 
    39.      */  
    40.     protected boolean tryAcquire(int arg) {  
    41.         throw new UnsupportedOperationException();  
    42.     }  
    43.     /** 
    44.      * 以独占不可中断模式 
    45.      * Acquires in exclusive uninterruptible mode for thread already in 
    46.      * queue. Used by condition wait methods as well as acquire. 
    47.      * 
    48.      * @param node the node 
    49.      * @param arg the acquire argument 
    50.      * @return {@code true} if interrupted while waiting 
    51.      */  
    52.     final boolean acquireQueued(final Node node, int arg) {  
    53.         try {  
    54.             boolean interrupted = false;//记录线程是否曾经被中断过  
    55.             for (;;) {//死循环 用于acquire获取失败重试  
    56.                 final Node p = node.predecessor();//获取结点的前驱结点  
    57.                 if (p == head && tryAcquire(arg)) {//若前驱为头结点  继续尝试获取  
    58.                     setHead(node);  
    59.                     p.next = null; // help GC  
    60.                     return interrupted;  
    61.                 }  
    62.                 ////检查是否需要等待(检查前驱结点的waitStatus的值>0/<0/=0) 如果需要就park当前线程  只有前驱在等待时才进入等待 否则继续重试  
    63.                 if (shouldParkAfterFailedAcquire(p, node) &&   
    64.                     parkAndCheckInterrupt())//线程进入等待需要,需要其他线程唤醒这个线程以继续执行  
    65.                     interrupted = true;//只要线程在等待过程中被中断过一次就会被记录下来  
    66.             }  
    67.         } catch (RuntimeException ex) {  
    68.             //acquire失败  取消acquire  
    69.             cancelAcquire(node);  
    70.             throw ex;  
    71.         }  
    72.     }  
    73.      /** 
    74.      * 检查并更新acquire获取失败的结点的状态 
    75.      * 信号控制的核心 
    76.      * Checks and updates status for a node that failed to acquire. 
    77.      * Returns true if thread should block. This is the main signal 
    78.      * control in all acquire loops.  Requires that pred == node.prev 
    79.      * 
    80.      * @param pred node's predecessor holding status 
    81.      * @param node the node 
    82.      * @return {@code true} if thread should block 
    83.      */  
    84.     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {  
    85.         int s = pred.waitStatus;  
    86.         if (s < 0)  
    87.             /* 
    88.              * 这个结点已经设置状态要求对他释放一个信号 所以他是安全的等待 
    89.              * This node has already set status asking a release 
    90.              * to signal it, so it can safely park 
    91.              */  
    92.             return true;  
    93.         if (s > 0) {  
    94.             /* 
    95.              * 前驱结点被取消 跳过前驱结点 并尝试重试 知道找到一个未取消的前驱结点 
    96.              * Predecessor was cancelled. Skip over predecessors and 
    97.              * indicate retry. 
    98.              */  
    99.         do {  
    100.         node.prev = pred = pred.prev;  
    101.         } while (pred.waitStatus > 0);  
    102.         pred.next = node;  
    103.     }  
    104.         else  
    105.             /* 
    106.              * 前驱结点的状态为0时表示为新建的 需要设置成SIGNAL(-1) 
    107.              * 声明我们需要一个信号但是暂时还不park 调用者将需要重试保证它在parking之前不被acquire 
    108.              * Indicate that we need a signal, but don't park yet. Caller 
    109.              * will need to retry to make sure it cannot acquire before 
    110.              * parking. 
    111.              */  
    112.             compareAndSetWaitStatus(pred, 0, Node.SIGNAL);  
    113.         return false;  
    114.     }  
    115.     /** 
    116.      * park当前线程方便的方法 并且然后会检查当前线程是否中断 
    117.      * 
    118.      * @return {@code true} if interrupted 
    119.      */  
    120.     private final boolean parkAndCheckInterrupt() {  
    121.         LockSupport.park(this);  
    122.         return Thread.interrupted();  
    123.     }  

    添加结点到等待队列

    首先构建一个准备入队列的结点,如果当前队列不为空,则将mode的前驱指向tail(只是指定当前结点的前驱结点,这样下面的操作一即使失败了 也不会影响整个队列的现有连接关系),compareAndSetTail成功将mode设置为tail结点,则将原先的tail结点的后继节点指向mode。如果队列为空亦或者compareAndSetTail操作失败,没关系我们还有enq(node)为我们把关。

    Java代码  收藏代码
    1. /** 
    2.      *通过给定的线程和模式 创建结点和结点入队列操作 
    3.      * 
    4.      * @param current the thread 当前线程 
    5.      * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 独占和共享模式 
    6.      * @return the new node 
    7.      */  
    8.     private Node addWaiter(Node mode) {  
    9.         Node node = new Node(Thread.currentThread(), mode);  
    10.         // Try the fast path of enq; backup to full enq on failure  
    11.         Node pred = tail;  
    12.         if (pred != null) {  
    13.             node.prev = pred;//只是指定当前结点的前驱结点,这样下面的操作一即使失败了   也不会影响整个队列的现有连接关系  
    14.             if (compareAndSetTail(pred, node)) {//原子地设置node为tail结点 CAS操作 操作一  
    15.                 pred.next = node;  
    16.                 return node;  
    17.             }  
    18.         }  
    19.         enq(node);//操作一失败时  这里会重复检查亡羊补牢一下  官方说法 double-check  
    20.         return node;  
    21.     }  
    22.     /** 
    23.      * 将结点插入队列 必要时进行初始化操作 
    24.      * @param node 带插入结点 
    25.      * @return node's predecessor 返回当前结点的前驱结点 
    26.      */  
    27.     private Node enq(final Node node) {  
    28.         for (;;) {  
    29.             Node t = tail;  
    30.             if (t == null) { // Must initialize 当前队列为空 进行初始化操作  
    31.                 Node h = new Node(); // Dummy header 傀儡头结点  
    32.                 h.next = node;  
    33.                 node.prev = h;  
    34.                 if (compareAndSetHead(h)) {//原子地设置头结点  
    35.                     tail = node;//头尾同一结点  
    36.                     return h;  
    37.                 }  
    38.             }  
    39.             else {  
    40.                 node.prev = t;  
    41.                 if (compareAndSetTail(t, node)) {//原子地设置tail结点 上面操作一的增强操作  
    42.                     t.next = node;  
    43.                     return t;  
    44.                 }  
    45.             }  
    46.         }  
    47.     }  

    acquire 取消结点

    取消结点操作:首先会判断结点是否为null,若不为空,while循环查找距离当前结点最近的非取消前驱结点PN(方便GC处理取消的结点),然后取出这个前驱的后继结点指向,利用它来感知其他的取消或信号操作(例如 compareAndSetNext(pred, predNext, null)) 然后将当前结点的状态Status设置为CANCELLED

    • 当前结点如果是尾结点,就删除当前结点,将找到的非取消前驱结点PN设置为tail,并原子地将其后继指向为null

    • 当前结点存在后继结点SN,如果前驱结点需要signal,则将PN的后继指向SN;否则将通过unparkSuccessor(node);唤醒后继结点

    Java代码  收藏代码
    1. /** 
    2.      * 取消一个将要尝试acquire的结点 
    3.      * 
    4.      * @param node the node 
    5.      */  
    6.     private void cancelAcquire(Node node) {  
    7.     // 如果结点不存在就直接返回  
    8.         if (node == null)  
    9.         return;  
    10.     node.thread = null;  
    11.     // 跳过取消的结点 while循环直到找到一个未取消的结点  
    12.     Node pred = node.prev;  
    13.     while (pred.waitStatus > 0)  
    14.         node.prev = pred = pred.prev;  
    15.     //前面的操作导致前驱结点发送变化 但是pred的后继结点还是没有变化  
    16.     Node predNext = pred.next;//通过predNext来感知其他的取消或信号操作 例如 compareAndSetNext(pred, predNext, null)  
    17.     //这里用无条件的写来代替CAS操作  
    18.     node.waitStatus = Node.CANCELLED;  
    19.     // 如果当前node是tail结点 就删除当前结点   
    20.     if (node == tail && compareAndSetTail(node, pred)) {  
    21.         compareAndSetNext(pred, predNext, null);//原子地将node结点之前的第一个非取消结点设置为tail结点 并将其后继指向null  
    22.     } else {  
    23.         // 如果前驱不是头结点 并且前驱的状态为SIGNAL(或前驱需要signal)  
    24.         if (pred != head  
    25.         && (pred.waitStatus == Node.SIGNAL  
    26.             || compareAndSetWaitStatus(pred, 0, Node.SIGNAL))  
    27.         && pred.thread != null) {  
    28.         //如果node存在后继结点 将node的前驱结点的后继指向node的后继  
    29.         Node next = node.next;  
    30.         if (next != null && next.waitStatus <= 0)  
    31.             compareAndSetNext(pred, predNext, next);//原子地将pred的后继指向node的后继  
    32.         } else {  
    33.         //node没有需要signal的前驱,通知后继结点  
    34.         unparkSuccessor(node);  
    35.         }  
    36.         node.next = node; // help GC  
    37.     }  
    38.     }}  

     唤醒后继结点 unparkSuccessor

    唤醒后继结点操作:首先会尝试清除当前结点的预期信号,这里即使操作失败亦或是信号已经被其他等待线程改变 都不影响
    然后查找当前线程最近的一个非取消结点 并唤醒它
    Java代码  收藏代码
    1. /** 
    2.  * 如果存在后继结点 就唤醒它 
    3.  * 
    4.  * @param node the node 
    5.  */  
    6. private void unparkSuccessor(Node node) {  
    7.     /* 
    8.      * 尝试清除预期信号 如果操作失败或该状态被其他等待线程改变 也没关系 
    9.      */  
    10.     compareAndSetWaitStatus(node, Node.SIGNAL, 0);  
    11.     /* 
    12.      * 准备unpark的线程在后继结点里持有(通常就是下一个结点) 
    13.      * 但如果被取消或为空  那么就从tail向后开始遍历查找实际的非取消后继结点 
    14.      */  
    15.     Node s = node.next;  
    16.     if (s == null || s.waitStatus > 0) {  
    17.         s = null;  
    18.         for (Node t = tail; t != null && t != node; t = t.prev)  
    19.             if (t.waitStatus <= 0)  
    20.                 s = t;//找到一个后并不跳出for循环 为了找到一个距离node最近的非取消后继结点  
    21.     }  
    22.     if (s != null)//结点不为空 唤醒后继的等待线程  
    23.         LockSupport.unpark(s.thread);  
    24. }   

     回过头来总结一下:

    当我们调用acquire(int)时,会首先通过tryAcquire尝试获取锁,一般都是留给子类实现(例如ReetrantLock$FairSync中的实现)

    Java代码  收藏代码
    1. /** 
    2.    * tryAcquire的公平版本 
    3.    * Fair version of tryAcquire.  Don't grant access unless 
    4.    * recursive call or no waiters or is first. 
    5.    */  
    6.   protected final boolean tryAcquire(int acquires) {  
    7.       final Thread current = Thread.currentThread();  
    8.       int c = getState();  
    9.       if (c == 0) {  
    10.           if (isFirst(current) &&  
    11.               compareAndSetState(0, acquires)) {  
    12.               setExclusiveOwnerThread(current);  
    13.               return true;  
    14.           }  
    15.       }  
    16.       else if (current == getExclusiveOwnerThread()) {  
    17.           int nextc = c + acquires;  
    18.           if (nextc < 0)  
    19.               throw new Error("Maximum lock count exceeded");  
    20.           setState(nextc);  
    21.           return true;  
    22.       }  
    23.       return false;  
    24.   }  

    如果tryAcquire(int)返回为false,则说明没有获得到锁。 则!tryAcquire(int)为true,接着会继续调用acquireQueued(final Node node ,int arg)方法,当然这调用这个方法之前,我们需要将当前包装成Node加入到队列中(即调用addWaiter(Node mode))。

    在acquireQueued()方法体中,我们会发现一个死循环,唯一跳出死循环的途径是 直到找到一个(条件1)node的前驱是傀儡head结点并且子类的tryAcquire()返回true,那么就将当前结点设置为head结点并返回结点对于线程的中断状态。如果(条件1)不成立,则执行shouldParkAfterFailuredAcquire()

    在shouldParkAfterFailuredAcquire(Node pred,Node node)方法体中,

    首先会判断node结点的前驱结点pred的waitStatus的值:

    * 如果waitStatus>0,表明pred处于取消状态(CANCELLED)则从队列中移除pred。

    * 如果waitStatus<0,表明线程需要park住

    * 如果waitStatus=0,表明这是一个新建结点,需要设置成SIGNAL(-1),在下一次循环中如果不能获得锁就需要park住线程,parkAndCheckInterrupt()就是执行了park()方法来park线程并返回线程中断状态。

    Java代码  收藏代码
    1. private final boolean parkAndCheckInterrupt() {   
    2. LockSupport.park(this);  
    3.  return Thread.interrupted();  
    4.  }  

    如果中间抛出RuntimeException异常,则会调用cancelAcquire(Node)方法取消获取。取消其实也很简单,首先判断node是否为空,如果不为空,找到node最近的非取消前驱结点PN,并将node的status设置为CANCELLED;

    * 倘若node为tail,将node移除并将PN结点设置为tail PN的后继指向null

    * 倘若node存在后继结点SN,如果前驱结点PN需要signal,则将PN后继指向SN 否则调用unparkSuccessor(Node)唤醒后继SN

    AcquireShared共享锁

    Java代码  收藏代码
    1. /** 
    2.  * 以共享模式获取Acquire 对中断不敏感 
    3.  * 通过多次调用tryAcquireShared方法来实现 成功时返回 
    4.  * 否则线程加入Sync队列 可能重复进行阻塞和释放阻塞 调用tryAcquireShared知道成功 
    5.  * 
    6.  * @param arg the acquire argument.  This value is conveyed to 
    7.  *        {@link #tryAcquireShared} but is otherwise uninterpreted 
    8.  *        and can represent anything you like. 
    9.  */  
    10. public final void acquireShared(int arg) {  
    11.     if (tryAcquireShared(arg) < 0)  
    12.         doAcquireShared(arg);  
    13. }  
    14. /** 
    15.  * 以共享不可中断模式获取Acquire 
    16.  * Acquires in shared uninterruptible mode. 
    17.  * @param arg the acquire argument 
    18.  */  
    19. private void doAcquireShared(int arg) {  
    20.     final Node node = addWaiter(Node.SHARED);  
    21.     try {  
    22.         boolean interrupted = false;  
    23.         for (;;) {  
    24.             final Node p = node.predecessor();  
    25.             if (p == head) {  
    26.                 int r = tryAcquireShared(arg);  
    27.                 if (r >= 0) {  
    28.                     setHeadAndPropagate(node, r);  
    29.                     p.next = null; // help GC  
    30.                     if (interrupted)  
    31.                         selfInterrupt();  
    32.                     return;  
    33.                 }  
    34.             }  
    35.             if (shouldParkAfterFailedAcquire(p, node) &&  
    36.                 parkAndCheckInterrupt())  
    37.                 interrupted = true;  
    38.         }  
    39.     } catch (RuntimeException ex) {  
    40.         cancelAcquire(node);  
    41.         throw ex;  
    42.     }  
    43. }  
    44. /** 
    45.  * Sets head of queue, and checks if successor may be waiting 
    46.  * in shared mode, if so propagating if propagate > 0. 
    47.  * 
    48.  * @param pred the node holding waitStatus for node 
    49.  * @param node the node 
    50.  * @param propagate the return value from a tryAcquireShared 
    51.  */  
    52. private void setHeadAndPropagate(Node node, int propagate) {  
    53.     setHead(node);//队列向后移一位  
    54.     if (propagate > 0 && node.waitStatus != 0) {//propagate>0表明共享数值大于前面要求的数值  
    55.         /* 
    56.          * Don't bother fully figuring out successor.  If it 
    57.          * looks null, call unparkSuccessor anyway to be safe. 
    58.          */  
    59.         Node s = node.next;  
    60.         if (s == null || s.isShared())//如果剩下只有一个node或者node.next是共享的 需要park住该线程  
    61.             unparkSuccessor(node);  
    62.     }  
    63. }  

     条件Condition

    Condition是服务单个Lock,condition.await()等方法在Lock上形成一个condition等待队列

    condition.signal()方法在Lock上面处理condition等待队列然后将队列中的node加入到AQS的阻塞队列中等待对应的线程被unpark

    Java代码  收藏代码
    1. /** 
    2.  * 实现可中断的条件等待 
    3.  * <ol> 
    4.  * <li> If current thread is interrupted, throw InterruptedException 
    5.  * <li> Save lock state returned by {@link #getState} 
    6.  * <li> Invoke {@link #release} with 
    7.  *      saved state as argument, throwing 
    8.  *      IllegalMonitorStateException  if it fails. 
    9.  * <li> Block until signalled or interrupted 
    10.  * <li> Reacquire by invoking specialized version of 
    11.  *      {@link #acquire} with saved state as argument. 
    12.  * <li> If interrupted while blocked in step 4, throw exception 
    13.  * </ol> 
    14.  */  
    15. public final void await() throws InterruptedException {  
    16.     if (Thread.interrupted())  
    17.         throw new InterruptedException();  
    18.     Node node = addConditionWaiter();//加入到condition的对用lock的私有队列中,与AQS阻塞队列形成相似  
    19.     //释放这个condition对应的lock的锁 因为若这个await方法阻塞住而lock没有释放锁  
    20.     //那么对于其他线程的node来说肯定是阻塞住的  
    21.     //因为condition对应的lock获得了锁,肯定在AQS的header处,其他线程肯定是得不到锁阻塞在那里,这样两边都阻塞的话就死锁了  
    22.     //故这里需要释放对应的lock锁  
    23.     int savedState = fullyRelease(node);  
    24.     int interruptMode = 0;  
    25.     while (!isOnSyncQueue(node)) {//判断condition是否已经转化成AQS阻塞队列中的一个结点 如果没有park这个线程  
    26.         LockSupport.park(this);  
    27.         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)  
    28.             break;  
    29.     }  
    30.     //这一步需要signal()或signalAll()方法的执行 说明这个线程已经被unpark 然后运行直到acquireQueued尝试再次获得锁  
    31.     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)  
    32.         interruptMode = REINTERRUPT;  
    33.     if (node.nextWaiter != null)  
    34.         unlinkCancelledWaiters();  
    35.     if (interruptMode != 0)  
    36.         reportInterruptAfterWait(interruptMode);  
    37. }  

    网上找到的一个帮助理解Condition的gif图



      

    这个AQS存在两中链表

    * 一种链表是AQS sync链表队列,可称为 横向链表

    * 一种链表是Condition的wait Node链表,相对于AQS sync是结点的一个纵向链表

    当纵向链表被signal通知后 会进入对应的Sync进行排队处理

    Java代码  收藏代码
    1. /** 
    2.  * Moves the longest-waiting thread, if one exists, from the 
    3.  * wait queue for this condition to the wait queue for the 
    4.  * owning lock. 
    5.  * 
    6.  * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 
    7.  *         returns {@code false} 
    8.  */  
    9. public final void signal() {  
    10.     if (!isHeldExclusively())  
    11.         throw new IllegalMonitorStateException();  
    12.     Node first = firstWaiter;  
    13.     if (first != null)  
    14.         doSignal(first);  
    15. }  
    16. /** 
    17.  * Removes and transfers nodes until hit non-cancelled one or 
    18.  * null. Split out from signal in part to encourage compilers 
    19.  * to inline the case of no waiters. 
    20.  * @param first (non-null) the first node on condition queue 
    21.  */  
    22. private void doSignal(Node first) {  
    23.     do {  
    24.         if ( (firstWaiter = first.nextWaiter) == null)//将旧的头结点移出 让下一个结点顶替上来  
    25.             lastWaiter = null;  
    26.         first.nextWaiter = null;  
    27.     } while (!transferForSignal(first) &&//将旧的头结点加入到AQS的等待队列中  
    28.              (first = firstWaiter) != null);  
    29. }  
    30. /** 
    31.  * Transfers a node from a condition queue onto sync queue. 
    32.  * Returns true if successful. 
    33.  * @param node the node 
    34.  * @return true if successfully transferred (else the node was 
    35.  * cancelled before signal). 
    36.  */  
    37. final boolean transferForSignal(Node node) {  
    38.     /* 
    39.      * If cannot change waitStatus, the node has been cancelled. 
    40.      */  
    41.     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))  
    42.         return false;  
    43.     /* 
    44.      * Splice onto queue and try to set waitStatus of predecessor to 
    45.      * indicate that thread is (probably) waiting. If cancelled or 
    46.      * attempt to set waitStatus fails, wake up to resync (in which 
    47.      * case the waitStatus can be transiently and harmlessly wrong). 
    48.      */  
    49.     Node p = enq(node);//进入AQS的阻塞队列  
    50.     int c = p.waitStatus;  
    51.     //该结点点的状态CANCELLED或者修改状态失败 就直接唤醒该结点内的线程  
    52.     //PS 正常情况下 这里是不会为true的故不会在这里唤醒该线程  
    53.     //只有发送signal信号的线程 调用了reentrantLock.unlock方法后(该线程已经加入到了AQS等待队列)才会被唤醒。  
    54.     if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))  
    55.         LockSupport.unpark(node.thread);  
    56.     return true;  
    57. }  

         转眼之间,2014已经与我渐行渐远  2015要开启源码研究之旅、fighting

  • 相关阅读:
    [例程]string.trim().length()的用法
    用各种look and feel打造swing界面
    深入浅出Java多线程(1)方法 join
    eclipse中cvs使用配置
    什么时候用Vector, 什么时候改用ArrayList?
    array,vertor,arraylist,hashable,hashmap等几个易混淆概念的区别
    java.lang.Class.getResource()这哥个方法主要是做什么用
    织梦dedecms实现按照字母搜索的实现方法
    浅析JTable与TableModel、TableCellRenderer、TableCellEditor接口——使用JComboBox显示单元格的值
    用java –jar 命令运行Jar包
  • 原文地址:https://www.cnblogs.com/SoniceryD/p/4206989.html
Copyright © 2011-2022 走看看