zoukankan      html  css  js  c++  java
  • 并发编程专题五:抽象队列同步器AQS应用

    并发之父

       生平不识Doug Lea,学懂并发也枉然

      Java并发编程核心在于java.util.concurrent包而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于AbstractQueuedSynchronizer简称AQS,AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器。

    ReentrantLock

      ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。而且它具有比synchronized更多的特性,比如它支持手动加锁与解锁,支持加锁的公平性。

    1 使用ReentrantLock进行同步
    2 ReentrantLock lock = new ReentrantLock(false);//false为非公平锁,true为公平锁
    3 lock.lock() //加锁
    4 lock.unlock() //解锁
      ReentrantLock如何实现synchronized不具备的公平与非公平性呢?
      在ReentrantLock内部定义了一个Sync的内部类,该类继承AbstractQueuedSynchronized,对该抽象类的部分方法做了实现;并且还定义了两个子类:
      1、FairSync 公平锁的实现
      2、NonfairSync 非公平锁的实现
      这两个类都继承自Sync,也就是间接继承了AbstractQueuedSynchronized,所以这一个ReentrantLock同时具备公平与非公平特性。
      上面主要涉及的设计模式:模板模式-子类根据需要做具体业务实现

    AQS具备特性

      * 阻塞等待队列

      * 共享/独占

      * 公平/非公平

      * 可重入

      * 允许中断

      除了Lock外,Java.util.concurrent当中同步器的实现如Latch,Barrier,BlockingQueue等,都是基于AQS框架实现

      * 一般通过定义内部类Sync继承AQS

      * 将同步器所有调用都映射到Sync对应的方法

      AQS内部维护属性volatile int state (32位)

      * state表示资源的可用状态

      State三种访问方式
        getState()、setState()、compareAndSetState()
      AQS定义两种资源共享方式

      * Exclusive-独占,只有一个线程能执行,如ReentrantLock

      * Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch

      AQS定义两种队列

      * 同步等待队列

      * 条件等待队列

      不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

      * isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

      * tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。

      * tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

      * tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

      * tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

    同步等待队列

      AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。

    条件等待队列

      Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备时,这些等待线程才会被唤醒,从而重新争夺锁

    AQS源码分析

       1 public abstract class AbstractQueuedSynchronizer
       2         extends AbstractOwnableSynchronizer
       3         implements java.io.Serializable {
       4     private static final long serialVersionUID = 7373984972572414691L;
       5 
       6     /**
       7      * Creates a new {@code AbstractQueuedSynchronizer} instance
       8      * with initial synchronization state of zero.
       9      */
      10     protected AbstractQueuedSynchronizer() { }
      11 
      12     /**
      13      * Wait queue node class.
      14      *
      15      * 不管是条件队列,还是CLH等待队列
      16      * 都是基于Node类
      17      * 
      18      * AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人
      19      * 发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,Java中的
      20      * CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。
      21      */
      22     static final class Node {
      23         /**
      24          * 标记节点未共享模式
      25          * */
      26         static final Node SHARED = new Node();
      27         /**
      28          *  标记节点为独占模式
      29          */
      30         static final Node EXCLUSIVE = null;
      31 
      32         /**
      33          * 在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待
      34          * */
      35         static final int CANCELLED =  1;
      36         /**
      37          *  后继节点的线程处于等待状态,而当前的节点如果释放了同步状态或者被取消,
      38          *  将会通知后继节点,使后继节点的线程得以运行。
      39          */
      40         static final int SIGNAL    = -1;
      41         /**
      42          *  节点在等待队列中,节点的线程等待在Condition上,当其他线程对Condition调用了signal()方法后,
      43          *  该节点会从等待队列中转移到同步队列中,加入到同步状态的获取中
      44          */
      45         static final int CONDITION = -2;
      46         /**
      47          * 表示下一次共享式同步状态获取将会被无条件地传播下去
      48          */
      49         static final int PROPAGATE = -3;
      50 
      51         /**
      52          * 标记当前节点的信号量状态 (1,0,-1,-2,-3)5种状态
      53          * 使用CAS更改状态,volatile保证线程可见性,高并发场景下,
      54          * 即被一个线程修改后,状态会立马让其他线程可见。
      55          */
      56         volatile int waitStatus;
      57 
      58         /**
      59          * 前驱节点,当前节点加入到同步队列中被设置
      60          */
      61         volatile Node prev;
      62 
      63         /**
      64          * 后继节点
      65          */
      66         volatile Node next;
      67 
      68         /**
      69          * 节点同步状态的线程
      70          */
      71         volatile Thread thread;
      72 
      73         /**
      74          * 等待队列中的后继节点,如果当前节点是共享的,那么这个字段是一个SHARED常量,
      75          * 也就是说节点类型(独占和共享)和等待队列中的后继节点共用同一个字段。
      76          */
      77         Node nextWaiter;
      78 
      79         /**
      80          * Returns true if node is waiting in shared mode.
      81          */
      82         final boolean isShared() {
      83             return nextWaiter == SHARED;
      84         }
      85 
      86         /**
      87          * 返回前驱节点
      88          */
      89         final Node predecessor() throws NullPointerException {
      90             Node p = prev;
      91             if (p == null)
      92                 throw new NullPointerException();
      93             else
      94                 return p;
      95         }
      96         //空节点,用于标记共享模式
      97         Node() {    // Used to establish initial head or SHARED marker
      98         }
      99         //用于同步队列CLH
     100         Node(Thread thread, Node mode) {     // Used by addWaiter
     101             this.nextWaiter = mode;
     102             this.thread = thread;
     103         }
     104         //用于条件队列
     105         Node(Thread thread, int waitStatus) { // Used by Condition
     106             this.waitStatus = waitStatus;
     107             this.thread = thread;
     108         }
     109     }
     110     
     111     /**
     112      * 指向同步等待队列的头节点
     113      */
     114     private transient volatile Node head;
     115 
     116     /**
     117      * 指向同步等待队列的尾节点
     118      */
     119     private transient volatile Node tail;
     120 
     121     /**
     122      * 同步资源状态
     123      */
     124     private volatile int state;
     125 
     126     /**
     127      * 
     128      * @return current state value
     129      */
     130     protected final int getState() {
     131         return state;
     132     }
     133 
     134     protected final void setState(int newState) {
     135         state = newState;
     136     }
     137 
     138     /**
     139      * Atomically sets synchronization state to the given updated
     140      * value if the current state value equals the expected value.
     141      * This operation has memory semantics of a {@code volatile} read
     142      * and write.
     143      *
     144      * @param expect the expected value
     145      * @param update the new value
     146      * @return {@code true} if successful. False return indicates that the actual
     147      *         value was not equal to the expected value.
     148      */
     149     protected final boolean compareAndSetState(int expect, int update) {
     150         // See below for intrinsics setup to support this
     151         return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
     152     }
     153 
     154     // Queuing utilities
     155 
     156     /**
     157      * The number of nanoseconds for which it is faster to spin
     158      * rather than to use timed park. A rough estimate suffices
     159      * to improve responsiveness with very short timeouts.
     160      */
     161     static final long spinForTimeoutThreshold = 1000L;
     162 
     163     /**
     164      * 节点加入CLH同步队列
     165      */
     166     private Node enq(final Node node) {
     167         for (;;) {
     168             Node t = tail;
     169             if (t == null) { // Must initialize
     170                 //队列为空需要初始化,创建空的头节点
     171                 if (compareAndSetHead(new Node()))
     172                     tail = head;
     173             } else {
     174                 node.prev = t;
     175                 //set尾部节点
     176                 if (compareAndSetTail(t, node)) {//当前节点置为尾部
     177                     t.next = node; //前驱节点的next指针指向当前节点
     178                     return t;
     179                 }
     180             }
     181         }
     182     }
     183 
     184     /**
     185      * Creates and enqueues node for current thread and given mode.
     186      *
     187      * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     188      * @return the new node
     189      */
     190     private Node addWaiter(Node mode) {
     191         // 1. 将当前线程构建成Node类型
     192         Node node = new Node(Thread.currentThread(), mode);
     193         // Try the fast path of enq; backup to full enq on failure
     194         Node pred = tail;
     195         // 2. 1当前尾节点是否为null?
     196         if (pred != null) {
     197             // 2.2 将当前节点尾插入的方式
     198             node.prev = pred;
     199             // 2.3 CAS将节点插入同步队列的尾部
     200             if (compareAndSetTail(pred, node)) {
     201                 pred.next = node;
     202                 return node;
     203             }
     204         }
     205         enq(node);
     206         return node;
     207     }
     208 
     209     /**
     210      * Sets head of queue to be node, thus dequeuing. Called only by
     211      * acquire methods.  Also nulls out unused fields for sake of GC
     212      * and to suppress unnecessary signals and traversals.
     213      *
     214      * @param node the node
     215      */
     216     private void setHead(Node node) {
     217         head = node;
     218         node.thread = null;
     219         node.prev = null;
     220     }
     221 
     222     /**
     223      *
     224      */
     225     private void unparkSuccessor(Node node) {
     226         //获取wait状态
     227         int ws = node.waitStatus;
     228         if (ws < 0)
     229             compareAndSetWaitStatus(node, ws, 0);// 将等待状态waitStatus设置为初始值0
     230 
     231         /**
     232          * 若后继结点为空,或状态为CANCEL(已失效),则从后尾部往前遍历找到最前的一个处于正常阻塞状态的结点
     233          * 进行唤醒
     234          */
     235         Node s = node.next; //head.next = Node1 ,thread = T3
     236         if (s == null || s.waitStatus > 0) {
     237             s = null;
     238             for (Node t = tail; t != null && t != node; t = t.prev)
     239                 if (t.waitStatus <= 0)
     240                     s = t;
     241         }
     242         if (s != null)
     243             LockSupport.unpark(s.thread);//唤醒线程,T3唤醒
     244     }
     245 
     246     /**
     247      * 把当前结点设置为SIGNAL或者PROPAGATE
     248      * 唤醒head.next(B节点),B节点唤醒后可以竞争锁,成功后head->B,然后又会唤醒B.next,一直重复直到共享节点都唤醒
     249      * head节点状态为SIGNAL,重置head.waitStatus->0,唤醒head节点线程,唤醒后线程去竞争共享锁
     250      * head节点状态为0,将head.waitStatus->Node.PROPAGATE传播状态,表示需要将状态向后继节点传播
     251      */
     252     private void doReleaseShared() {
     253         for (;;) {
     254             Node h = head;
     255             if (h != null && h != tail) {
     256                 int ws = h.waitStatus;
     257                 if (ws == Node.SIGNAL) {//head是SIGNAL状态
     258                     /* head状态是SIGNAL,重置head节点waitStatus为0,E这里不直接设为Node.PROPAGAT,
     259                      * 是因为unparkSuccessor(h)中,如果ws < 0会设置为0,所以ws先设置为0,再设置为PROPAGATE
     260                      * 这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
     261                      */
     262                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
     263                         continue; //设置失败,重新循环
     264                     /* head状态为SIGNAL,且成功设置为0之后,唤醒head.next节点线程
     265                      * 此时head、head.next的线程都唤醒了,head.next会去竞争锁,成功后head会指向获取锁的节点,
     266                      * 也就是head发生了变化。看最底下一行代码可知,head发生变化后会重新循环,继续唤醒head的下一个节点
     267                      */
     268                     unparkSuccessor(h);
     269                     /*
     270                      * 如果本身头节点的waitStatus是出于重置状态(waitStatus==0)的,将其设置为“传播”状态。
     271                      * 意味着需要将状态向后一个节点传播
     272                      */
     273                 }
     274                 else if (ws == 0 &&
     275                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
     276                     continue;                // loop on failed CAS
     277             }
     278             if (h == head) //如果head变了,重新循环
     279                 break;
     280         }
     281     }
     282 
     283     /**
     284      * 把node节点设置成head节点,且Node.waitStatus->Node.PROPAGATE
     285      */
     286     private void setHeadAndPropagate(Node node, int propagate) {
     287         Node h = head; //h用来保存旧的head节点
     288         setHead(node);//head引用指向node节点
     289         /* 这里意思有两种情况是需要执行唤醒操作
     290          * 1.propagate > 0 表示调用方指明了后继节点需要被唤醒
     291          * 2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点
     292          */
     293         if (propagate > 0 || h == null || h.waitStatus < 0 ||
     294                 (h = head) == null || h.waitStatus < 0) {
     295             Node s = node.next;
     296             if (s == null || s.isShared())//node是最后一个节点或者 node的后继节点是共享节点
     297                 /* 如果head节点状态为SIGNAL,唤醒head节点线程,重置head.waitStatus->0
     298                  * head节点状态为0(第一次添加时是0),设置head.waitStatus->Node.PROPAGATE表示状态需要向后继节点传播
     299                  */
     300                 doReleaseShared();
     301         }
     302     }
     303 
     304     // Utilities for various versions of acquire
     305 
     306     /**
     307      * 终结掉正在尝试去获取锁的节点
     308      * @param node the node
     309      */
     310     private void cancelAcquire(Node node) {
     311         // Ignore if node doesn't exist
     312         if (node == null)
     313             return;
     314 
     315         node.thread = null;
     316 
     317         // 剔除掉一件被cancel掉的节点
     318         Node pred = node.prev;
     319         while (pred.waitStatus > 0)
     320             node.prev = pred = pred.prev;
     321 
     322         // predNext is the apparent node to unsplice. CASes below will
     323         // fail if not, in which case, we lost race vs another cancel
     324         // or signal, so no further action is necessary.
     325         Node predNext = pred.next;
     326 
     327         // Can use unconditional write instead of CAS here.
     328         // After this atomic step, other Nodes can skip past us.
     329         // Before, we are free of interference from other threads.
     330         node.waitStatus = Node.CANCELLED;
     331 
     332         // If we are the tail, remove ourselves.
     333         if (node == tail && compareAndSetTail(node, pred)) {
     334             compareAndSetNext(pred, predNext, null);
     335         } else {
     336             // If successor needs signal, try to set pred's next-link
     337             // so it will get one. Otherwise wake it up to propagate.
     338             int ws;
     339             if (pred != head &&
     340                     ((ws = pred.waitStatus) == Node.SIGNAL ||
     341                             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
     342                     pred.thread != null) {
     343                 Node next = node.next;
     344                 if (next != null && next.waitStatus <= 0)
     345                     compareAndSetNext(pred, predNext, next);
     346             } else {
     347                 unparkSuccessor(node);
     348             }
     349 
     350             node.next = node; // help GC
     351         }
     352     }
     353 
     354     /**
     355      * 
     356      */
     357     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
     358         int ws = pred.waitStatus;
     359         if (ws == Node.SIGNAL)
     360             /*
     361              * 若前驱结点的状态是SIGNAL,意味着当前结点可以被安全地park
     362              */
     363             return true;
     364         if (ws > 0) {
     365             /*
     366              * 前驱节点状态如果被取消状态,将被移除出队列
     367              */
     368             do {
     369                 node.prev = pred = pred.prev;
     370             } while (pred.waitStatus > 0);
     371             pred.next = node;
     372         } else {
     373             /*
     374              * 当前驱节点waitStatus为 0 or PROPAGATE状态时
     375              * 将其设置为SIGNAL状态,然后当前结点才可以可以被安全地park
     376              */
     377             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
     378         }
     379         return false;
     380     }
     381 
     382     /**
     383      * 中断当前线程
     384      */
     385     static void selfInterrupt() {
     386         Thread.currentThread().interrupt();
     387     }
     388 
     389     /**
     390      * 阻塞当前节点,返回当前Thread的中断状态
     391      * LockSupport.park 底层实现逻辑调用系统内核功能 pthread_mutex_lock 阻塞线程
     392      */
     393     private final boolean parkAndCheckInterrupt() {
     394         LockSupport.park(this);//阻塞
     395         return Thread.interrupted();
     396     }
     397 
     398     /**
     399      * 已经在队列当中的Thread节点,准备阻塞等待获取锁
     400      */
     401     final boolean acquireQueued(final Node node, int arg) {
     402         boolean failed = true;
     403         try {
     404             boolean interrupted = false;
     405             for (;;) {//死循环
     406                 final Node p = node.predecessor();//找到当前结点的前驱结点
     407                 if (p == head && tryAcquire(arg)) {//如果前驱结点是头结点,才tryAcquire,其他结点是没有机会tryAcquire的。
     408                     setHead(node);//获取同步状态成功,将当前结点设置为头结点。
     409                     p.next = null; // help GC
     410                     failed = false;
     411                     return interrupted;
     412                 }
     413                 /**
     414                  * 如果前驱节点不是Head,通过shouldParkAfterFailedAcquire判断是否应该阻塞
     415                  * 前驱节点信号量为-1,当前线程可以安全被parkAndCheckInterrupt用来阻塞线程
     416                  */
     417                 if (shouldParkAfterFailedAcquire(p, node) &&
     418                         parkAndCheckInterrupt())
     419                     interrupted = true;
     420             }
     421         } finally {
     422             if (failed)
     423                 cancelAcquire(node);
     424         }
     425     }
     426 
     427     /**
     428      * 与acquireQueued逻辑相似,唯一区别节点还不在队列当中需要先进行入队操作
     429      */
     430     private void doAcquireInterruptibly(int arg)
     431             throws InterruptedException {
     432         final Node node = addWaiter(Node.EXCLUSIVE);//以独占模式放入队列尾部
     433         boolean failed = true;
     434         try {
     435             for (;;) {
     436                 final Node p = node.predecessor();
     437                 if (p == head && tryAcquire(arg)) {
     438                     setHead(node);
     439                     p.next = null; // help GC
     440                     failed = false;
     441                     return;
     442                 }
     443                 if (shouldParkAfterFailedAcquire(p, node) &&
     444                         parkAndCheckInterrupt())
     445                     throw new InterruptedException();
     446             }
     447         } finally {
     448             if (failed)
     449                 cancelAcquire(node);
     450         }
     451     }
     452 
     453     /**
     454      * 独占模式定时获取
     455      */
     456     private boolean doAcquireNanos(int arg, long nanosTimeout)
     457             throws InterruptedException {
     458         if (nanosTimeout <= 0L)
     459             return false;
     460         final long deadline = System.nanoTime() + nanosTimeout;
     461         final Node node = addWaiter(Node.EXCLUSIVE);//加入队列
     462         boolean failed = true;
     463         try {
     464             for (;;) {
     465                 final Node p = node.predecessor();
     466                 if (p == head && tryAcquire(arg)) {
     467                     setHead(node);
     468                     p.next = null; // help GC
     469                     failed = false;
     470                     return true;
     471                 }
     472                 nanosTimeout = deadline - System.nanoTime();
     473                 if (nanosTimeout <= 0L)
     474                     return false;//超时直接返回获取失败
     475                 if (shouldParkAfterFailedAcquire(p, node) &&
     476                         nanosTimeout > spinForTimeoutThreshold)
     477                     //阻塞指定时长,超时则线程自动被唤醒
     478                     LockSupport.parkNanos(this, nanosTimeout);
     479                 if (Thread.interrupted())//当前线程中断状态
     480                     throw new InterruptedException();
     481             }
     482         } finally {
     483             if (failed)
     484                 cancelAcquire(node);
     485         }
     486     }
     487 
     488     /**
     489      * 尝试获取共享锁
     490      */
     491     private void doAcquireShared(int arg) {
     492         final Node node = addWaiter(Node.SHARED);//入队
     493         boolean failed = true;
     494         try {
     495             boolean interrupted = false;
     496             for (;;) {
     497                 final Node p = node.predecessor();//前驱节点
     498                 if (p == head) {
     499                     int r = tryAcquireShared(arg); //非公平锁实现,再尝试获取锁
     500                     //state==0时tryAcquireShared会返回>=0(CountDownLatch中返回的是1)。
     501                     // state为0说明共享次数已经到了,可以获取锁了
     502                     if (r >= 0) {//r>0表示state==0,前继节点已经释放锁,锁的状态为可被获取
     503                         //这一步设置node为head节点设置node.waitStatus->Node.PROPAGATE,然后唤醒node.thread
     504                         setHeadAndPropagate(node, r);
     505                         p.next = null; // help GC
     506                         if (interrupted)
     507                             selfInterrupt();
     508                         failed = false;
     509                         return;
     510                     }
     511                 }
     512                 //前继节点非head节点,将前继节点状态设置为SIGNAL,通过park挂起node节点的线程
     513                 if (shouldParkAfterFailedAcquire(p, node) &&
     514                         parkAndCheckInterrupt())
     515                     interrupted = true;
     516             }
     517         } finally {
     518             if (failed)
     519                 cancelAcquire(node);
     520         }
     521     }
     522 
     523     /**
     524      * Acquires in shared interruptible mode.
     525      * @param arg the acquire argument
     526      */
     527     private void doAcquireSharedInterruptibly(int arg)
     528             throws InterruptedException {
     529         final Node node = addWaiter(Node.SHARED);
     530         boolean failed = true;
     531         try {
     532             for (;;) {
     533                 final Node p = node.predecessor();
     534                 if (p == head) {
     535                     int r = tryAcquireShared(arg);
     536                     if (r >= 0) {
     537                         setHeadAndPropagate(node, r);
     538                         p.next = null; // help GC
     539                         failed = false;
     540                         return;
     541                     }
     542                 }
     543                 if (shouldParkAfterFailedAcquire(p, node) &&
     544                         parkAndCheckInterrupt())
     545                     throw new InterruptedException();
     546             }
     547         } finally {
     548             if (failed)
     549                 cancelAcquire(node);
     550         }
     551     }
     552 
     553     /**
     554      * Acquires in shared timed mode.
     555      *
     556      * @param arg the acquire argument
     557      * @param nanosTimeout max wait time
     558      * @return {@code true} if acquired
     559      */
     560     private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
     561             throws InterruptedException {
     562         if (nanosTimeout <= 0L)
     563             return false;
     564         final long deadline = System.nanoTime() + nanosTimeout;
     565         final Node node = addWaiter(Node.SHARED);
     566         boolean failed = true;
     567         try {
     568             for (;;) {
     569                 final Node p = node.predecessor();
     570                 if (p == head) {
     571                     int r = tryAcquireShared(arg);
     572                     if (r >= 0) {
     573                         setHeadAndPropagate(node, r);
     574                         p.next = null; // help GC
     575                         failed = false;
     576                         return true;
     577                     }
     578                 }
     579                 nanosTimeout = deadline - System.nanoTime();
     580                 if (nanosTimeout <= 0L)
     581                     return false;
     582                 if (shouldParkAfterFailedAcquire(p, node) &&
     583                         nanosTimeout > spinForTimeoutThreshold)
     584                     LockSupport.parkNanos(this, nanosTimeout);
     585                 if (Thread.interrupted())
     586                     throw new InterruptedException();
     587             }
     588         } finally {
     589             if (failed)
     590                 cancelAcquire(node);
     591         }
     592     }
     593 
     594     // Main exported methods
     595 
     596     /**
     597      * 尝试获取独占锁,可指定锁的获取数量
     598      */
     599     protected boolean tryAcquire(int arg) {
     600         throw new UnsupportedOperationException();
     601     }
     602 
     603     /**
     604      * 尝试释放独占锁,在子类当中实现
     605      */
     606     protected boolean tryRelease(int arg) {
     607         throw new UnsupportedOperationException();
     608     }
     609 
     610     /**
     611      * 共享式:共享式地获取同步状态。对于独占式同步组件来讲,同一时刻只有一个线程能获取到同步状态,
     612      * 其他线程都得去排队等待,其待重写的尝试获取同步状态的方法tryAcquire返回值为boolean,这很容易理解;
     613      * 对于共享式同步组件来讲,同一时刻可以有多个线程同时获取到同步状态,这也是“共享”的意义所在。
     614      * 本方法待被之类覆盖实现具体逻辑
     615      *  1.当返回值大于0时,表示获取同步状态成功,同时还有剩余同步状态可供其他线程获取;
     616      *
     617      * 2.当返回值等于0时,表示获取同步状态成功,但没有可用同步状态了;
     618 
     619      * 3.当返回值小于0时,表示获取同步状态失败。
     620      */
     621     protected int tryAcquireShared(int arg) {
     622         throw new UnsupportedOperationException();
     623     }
     624 
     625     /**
     626      * 释放共享锁,具体实现在子类当中实现
     627      */
     628     protected boolean tryReleaseShared(int arg) {
     629         throw new UnsupportedOperationException();
     630     }
     631 
     632     /**
     633      * 当前线程是否持有独占锁
     634      */
     635     protected boolean isHeldExclusively() {
     636         throw new UnsupportedOperationException();
     637     }
     638 
     639     /**
     640      * 获取独占锁
     641      */
     642     public final void acquire(int arg) {
     643         //尝试获取锁
     644         if (!tryAcquire(arg) &&
     645                 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//独占模式
     646             selfInterrupt();
     647     }
     648 
     649     /**
     650      * 
     651      */
     652     public final void acquireInterruptibly(int arg)
     653             throws InterruptedException {
     654         if (Thread.interrupted())
     655             throw new InterruptedException();
     656         if (!tryAcquire(arg))
     657             doAcquireInterruptibly(arg);
     658     }
     659 
     660     /**
     661      * 获取独占锁,设置最大等待时间
     662      */
     663     public final boolean tryAcquireNanos(int arg, long nanosTimeout)
     664             throws InterruptedException {
     665         if (Thread.interrupted())
     666             throw new InterruptedException();
     667         return tryAcquire(arg) ||
     668                 doAcquireNanos(arg, nanosTimeout);
     669     }
     670 
     671     /**
     672      * 释放独占模式持有的锁
     673      */
     674     public final boolean release(int arg) {
     675         if (tryRelease(arg)) {//释放一次锁
     676             Node h = head;
     677             if (h != null && h.waitStatus != 0)
     678                 unparkSuccessor(h);//唤醒后继结点
     679             return true;
     680         }
     681         return false;
     682     }
     683 
     684     /**
     685      * 请求获取共享锁
     686      */
     687     public final void acquireShared(int arg) {
     688         if (tryAcquireShared(arg) < 0)//返回值小于0,获取同步状态失败,排队去;获取同步状态成功,直接返回去干自己的事儿。
     689             doAcquireShared(arg);
     690     }
     691 
     692 
     693     /**
     694      * Releases in shared mode.  Implemented by unblocking one or more
     695      * threads if {@link #tryReleaseShared} returns true.
     696      *
     697      * @param arg the release argument.  This value is conveyed to
     698      *        {@link #tryReleaseShared} but is otherwise uninterpreted
     699      *        and can represent anything you like.
     700      * @return the value returned from {@link #tryReleaseShared}
     701      */
     702     public final boolean releaseShared(int arg) {
     703         if (tryReleaseShared(arg)) {
     704             doReleaseShared();
     705             return true;
     706         }
     707         return false;
     708     }
     709 
     710     // Queue inspection methods
     711 
     712     public final boolean hasQueuedThreads() {
     713         return head != tail;
     714     }
     715 
     716     public final boolean hasContended() {
     717         return head != null;
     718     }
     719 
     720     public final Thread getFirstQueuedThread() {
     721         // handle only fast path, else relay
     722         return (head == tail) ? null : fullGetFirstQueuedThread();
     723     }
     724 
     725     /**
     726      * Version of getFirstQueuedThread called when fastpath fails
     727      */
     728     private Thread fullGetFirstQueuedThread() {
     729         Node h, s;
     730         Thread st;
     731         if (((h = head) != null && (s = h.next) != null &&
     732                 s.prev == head && (st = s.thread) != null) ||
     733                 ((h = head) != null && (s = h.next) != null &&
     734                         s.prev == head && (st = s.thread) != null))
     735             return st;
     736 
     737         Node t = tail;
     738         Thread firstThread = null;
     739         while (t != null && t != head) {
     740             Thread tt = t.thread;
     741             if (tt != null)
     742                 firstThread = tt;
     743             t = t.prev;
     744         }
     745         return firstThread;
     746     }
     747 
     748     /**
     749      * 判断当前线程是否在队列当中
     750      */
     751     public final boolean isQueued(Thread thread) {
     752         if (thread == null)
     753             throw new NullPointerException();
     754         for (Node p = tail; p != null; p = p.prev)
     755             if (p.thread == thread)
     756                 return true;
     757         return false;
     758     }
     759 
     760     final boolean apparentlyFirstQueuedIsExclusive() {
     761         Node h, s;
     762         return (h = head) != null &&
     763                 (s = h.next)  != null &&
     764                 !s.isShared()         &&
     765                 s.thread != null;
     766     }
     767 
     768     /**
     769      * 判断当前节点是否有前驱节点
     770      */
     771     public final boolean hasQueuedPredecessors() {
     772         Node t = tail; // Read fields in reverse initialization order
     773         Node h = head;
     774         Node s;
     775         return h != t &&
     776                 ((s = h.next) == null || s.thread != Thread.currentThread());
     777     }
     778 
     779 
     780     // Instrumentation and monitoring methods
     781 
     782     /**
     783      * 同步队列长度
     784      */
     785     public final int getQueueLength() {
     786         int n = 0;
     787         for (Node p = tail; p != null; p = p.prev) {
     788             if (p.thread != null)
     789                 ++n;
     790         }
     791         return n;
     792     }
     793 
     794     /**
     795      * 获取队列等待thread集合
     796      */
     797     public final Collection<Thread> getQueuedThreads() {
     798         ArrayList<Thread> list = new ArrayList<Thread>();
     799         for (Node p = tail; p != null; p = p.prev) {
     800             Thread t = p.thread;
     801             if (t != null)
     802                 list.add(t);
     803         }
     804         return list;
     805     }
     806 
     807     /**
     808      * 获取独占模式等待thread线程集合
     809      */
     810     public final Collection<Thread> getExclusiveQueuedThreads() {
     811         ArrayList<Thread> list = new ArrayList<Thread>();
     812         for (Node p = tail; p != null; p = p.prev) {
     813             if (!p.isShared()) {
     814                 Thread t = p.thread;
     815                 if (t != null)
     816                     list.add(t);
     817             }
     818         }
     819         return list;
     820     }
     821 
     822     /**
     823      * 获取共享模式等待thread集合
     824      */
     825     public final Collection<Thread> getSharedQueuedThreads() {
     826         ArrayList<Thread> list = new ArrayList<Thread>();
     827         for (Node p = tail; p != null; p = p.prev) {
     828             if (p.isShared()) {
     829                 Thread t = p.thread;
     830                 if (t != null)
     831                     list.add(t);
     832             }
     833         }
     834         return list;
     835     }
     836 
     837 
     838     // Internal support methods for Conditions
     839 
     840     /**
     841      * 判断节点是否在同步队列中
     842      */
     843     final boolean isOnSyncQueue(Node node) {
     844         //快速判断1:节点状态或者节点没有前置节点
     845         //注:同步队列是有头节点的,而条件队列没有
     846         if (node.waitStatus == Node.CONDITION || node.prev == null)
     847             return false;
     848         //快速判断2:next字段只有同步队列才会使用,条件队列中使用的是nextWaiter字段
     849         if (node.next != null) // If has successor, it must be on queue
     850             return true;
     851         //上面如果无法判断则进入复杂判断
     852         return findNodeFromTail(node);
     853     }
     854 
     855     private boolean findNodeFromTail(Node node) {
     856         Node t = tail;
     857         for (;;) {
     858             if (t == node)
     859                 return true;
     860             if (t == null)
     861                 return false;
     862             t = t.prev;
     863         }
     864     }
     865 
     866     /**
     867      * 将节点从条件队列当中移动到同步队列当中,等待获取锁
     868      */
     869     final boolean transferForSignal(Node node) {
     870         /*
     871          * 修改节点信号量状态为0,失败直接返回false
     872          */
     873         if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
     874             return false;
     875 
     876         /*
     877          * 加入同步队列尾部当中,返回前驱节点
     878          */
     879         Node p = enq(node);
     880         int ws = p.waitStatus;
     881         //前驱节点不可用 或者 修改信号量状态失败
     882         if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
     883             LockSupport.unpark(node.thread); //唤醒当前节点
     884         return true;
     885     }
     886 
     887     final boolean transferAfterCancelledWait(Node node) {
     888         if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
     889             enq(node);
     890             return true;
     891         }
     892         /*
     893          * If we lost out to a signal(), then we can't proceed
     894          * until it finishes its enq().  Cancelling during an
     895          * incomplete transfer is both rare and transient, so just
     896          * spin.
     897          */
     898         while (!isOnSyncQueue(node))
     899             Thread.yield();
     900         return false;
     901     }
     902 
     903     /**
     904      * 入参就是新创建的节点,即当前节点
     905      */
     906     final int fullyRelease(Node node) {
     907         boolean failed = true;
     908         try {
     909             //这里这个取值要注意,获取当前的state并释放,这从另一个角度说明必须是独占锁
     910             //可以考虑下这个逻辑放在共享锁下面会发生什么?
     911             int savedState = getState();
     912             if (release(savedState)) {
     913                 failed = false;
     914                 return savedState;
     915             } else {
     916                 //如果这里释放失败,则抛出异常
     917                 throw new IllegalMonitorStateException();
     918             }
     919         } finally {
     920             /**
     921              * 如果释放锁失败,则把节点取消,由这里就能看出来上面添加节点的逻辑中
     922              * 只需要判断最后一个节点是否被取消就可以了
     923              */
     924             if (failed)
     925                 node.waitStatus = Node.CANCELLED;
     926         }
     927     }
     928 
     929     // Instrumentation methods for conditions
     930 
     931     public final boolean hasWaiters(ConditionObject condition) {
     932         if (!owns(condition))
     933             throw new IllegalArgumentException("Not owner");
     934         return condition.hasWaiters();
     935     }
     936 
     937     /**
     938      * 获取条件队列长度
     939      */
     940     public final int getWaitQueueLength(ConditionObject condition) {
     941         if (!owns(condition))
     942             throw new IllegalArgumentException("Not owner");
     943         return condition.getWaitQueueLength();
     944     }
     945 
     946     /**
     947      * 获取条件队列当中所有等待的thread集合
     948      */
     949     public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
     950         if (!owns(condition))
     951             throw new IllegalArgumentException("Not owner");
     952         return condition.getWaitingThreads();
     953     }
     954 
     955     /**
     956      * 条件对象,实现基于条件的具体行为
     957      */
     958     public class ConditionObject implements Condition, java.io.Serializable {
     959         private static final long serialVersionUID = 1173984872572414699L;
     960         /** First node of condition queue. */
     961         private transient Node firstWaiter;
     962         /** Last node of condition queue. */
     963         private transient Node lastWaiter;
     964 
     965         /**
     966          * Creates a new {@code ConditionObject} instance.
     967          */
     968         public ConditionObject() { }
     969 
     970         // Internal methods
     971 
     972         /**
     973          * 1.与同步队列不同,条件队列头尾指针是firstWaiter跟lastWaiter
     974          * 2.条件队列是在获取锁之后,也就是临界区进行操作,因此很多地方不用考虑并发
     975          */
     976         private Node addConditionWaiter() {
     977             Node t = lastWaiter;
     978             //如果最后一个节点被取消,则删除队列中被取消的节点
     979             //至于为啥是最后一个节点后面会分析
     980             if (t != null && t.waitStatus != Node.CONDITION) {
     981                 //删除所有被取消的节点
     982                 unlinkCancelledWaiters();
     983                 t = lastWaiter;
     984             }
     985             //创建一个类型为CONDITION的节点并加入队列,由于在临界区,所以这里不用并发控制
     986             Node node = new Node(Thread.currentThread(), Node.CONDITION);
     987             if (t == null)
     988                 firstWaiter = node;
     989             else
     990                 t.nextWaiter = node;
     991             lastWaiter = node;
     992             return node;
     993         }
     994 
     995         /**
     996          * 发信号,通知遍历条件队列当中的节点转移到同步队列当中,准备排队获取锁
     997          */
     998         private void doSignal(Node first) {
     999             do {
    1000                 if ( (firstWaiter = first.nextWaiter) == null)
    1001                     lastWaiter = null;
    1002                 first.nextWaiter = null;
    1003             } while (!transferForSignal(first) && //转移节点
    1004                     (first = firstWaiter) != null);
    1005         }
    1006 
    1007         /**
    1008          * 通知所有节点移动到同步队列当中,并将节点从条件队列删除
    1009          */
    1010         private void doSignalAll(Node first) {
    1011             lastWaiter = firstWaiter = null;
    1012             do {
    1013                 Node next = first.nextWaiter;
    1014                 first.nextWaiter = null;
    1015                 transferForSignal(first);
    1016                 first = next;
    1017             } while (first != null);
    1018         }
    1019 
    1020         /**
    1021          * 删除条件队列当中被取消的节点
    1022          */
    1023         private void unlinkCancelledWaiters() {
    1024             Node t = firstWaiter;
    1025             Node trail = null;
    1026             while (t != null) {
    1027                 Node next = t.nextWaiter;
    1028                 if (t.waitStatus != Node.CONDITION) {
    1029                     t.nextWaiter = null;
    1030                     if (trail == null)
    1031                         firstWaiter = next;
    1032                     else
    1033                         trail.nextWaiter = next;
    1034                     if (next == null)
    1035                         lastWaiter = trail;
    1036                 }
    1037                 else
    1038                     trail = t;
    1039                 t = next;
    1040             }
    1041         }
    1042 
    1043         // public methods
    1044 
    1045         /**
    1046          * 发新号,通知条件队列当中节点到同步队列当中去排队
    1047          */
    1048         public final void signal() {
    1049             if (!isHeldExclusively())//节点不能已经持有独占锁
    1050                 throw new IllegalMonitorStateException();
    1051             Node first = firstWaiter;
    1052             if (first != null)
    1053                 /**
    1054                  * 发信号通知条件队列的节点准备到同步队列当中去排队
    1055                  */
    1056                 doSignal(first);
    1057         }
    1058 
    1059         /**
    1060          * 唤醒所有条件队列的节点转移到同步队列当中
    1061          */
    1062             public final void signalAll() {
    1063             if (!isHeldExclusively())
    1064                 throw new IllegalMonitorStateException();
    1065             Node first = firstWaiter;
    1066             if (first != null)
    1067                 doSignalAll(first);
    1068         }
    1069 
    1070         /**
    1071          * Implements uninterruptible condition wait.
    1072          * <ol>
    1073          * <li> Save lock state returned by {@link #getState}.
    1074          * <li> Invoke {@link #release} with saved state as argument,
    1075          *      throwing IllegalMonitorStateException if it fails.
    1076          * <li> Block until signalled.
    1077          * <li> Reacquire by invoking specialized version of
    1078          *      {@link #acquire} with saved state as argument.
    1079          * </ol>
    1080          */
    1081         public final void awaitUninterruptibly() {
    1082             Node node = addConditionWaiter();
    1083             int savedState = fullyRelease(node);
    1084             boolean interrupted = false;
    1085             while (!isOnSyncQueue(node)) {
    1086                 LockSupport.park(this);
    1087                 if (Thread.interrupted())
    1088                     interrupted = true;
    1089             }
    1090             if (acquireQueued(node, savedState) || interrupted)
    1091                 selfInterrupt();
    1092         }
    1093 
    1094         /** 该模式表示在退出等待时重新中断 */
    1095         private static final int REINTERRUPT =  1;
    1096         /** 异常中断 */
    1097         private static final int THROW_IE    = -1;
    1098 
    1099         /**
    1100          * 这里的判断逻辑是:
    1101          * 1.如果现在不是中断的,即正常被signal唤醒则返回0
    1102          * 2.如果节点由中断加入同步队列则返回THROW_IE,由signal加入同步队列则返回REINTERRUPT
    1103          */
    1104         private int checkInterruptWhileWaiting(Node node) {
    1105             return Thread.interrupted() ?
    1106                     (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
    1107                     0;
    1108         }
    1109 
    1110         /**
    1111          * 根据中断时机选择抛出异常或者设置线程中断状态
    1112          */
    1113         private void reportInterruptAfterWait(int interruptMode)
    1114                 throws InterruptedException {
    1115             if (interruptMode == THROW_IE)
    1116                 throw new InterruptedException();
    1117             else if (interruptMode == REINTERRUPT)
    1118                 selfInterrupt();
    1119         }
    1120 
    1121         /**
    1122          * 加入条件队列等待,条件队列入口
    1123          */
    1124         public final void await() throws InterruptedException {
    1125 
    1126             //T2进来
    1127             //如果当前线程被中断则直接抛出异常
    1128             if (Thread.interrupted())
    1129                 throw new InterruptedException();
    1130             //把当前节点加入条件队列
    1131             Node node = addConditionWaiter();
    1132             //释放掉已经获取的独占锁资源
    1133             int savedState = fullyRelease(node);//T2释放锁
    1134             int interruptMode = 0;
    1135             //如果不在同步队列中则不断挂起
    1136             while (!isOnSyncQueue(node)) {
    1137                 LockSupport.park(this);//T1被阻塞
    1138                 //这里被唤醒可能是正常的signal操作也可能是中断
    1139                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    1140                     break;
    1141             }
    1142             /**
    1143              * 走到这里说明节点已经条件满足被加入到了同步队列中或者中断了
    1144              * 这个方法很熟悉吧?就跟独占锁调用同样的获取锁方法,从这里可以看出条件队列只能用于独占锁
    1145              * 在处理中断之前首先要做的是从同步队列中成功获取锁资源
    1146              */
    1147             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    1148                 interruptMode = REINTERRUPT;
    1149             //走到这里说明已经成功获取到了独占锁,接下来就做些收尾工作
    1150             //删除条件队列中被取消的节点
    1151             if (node.nextWaiter != null) // clean up if cancelled
    1152                 unlinkCancelledWaiters();
    1153             //根据不同模式处理中断
    1154             if (interruptMode != 0)
    1155                 reportInterruptAfterWait(interruptMode);
    1156         }
    1157 
    1158 
    1159         /**
    1160          * Implements timed condition wait.
    1161          * <ol>
    1162          * <li> If current thread is interrupted, throw InterruptedException.
    1163          * <li> Save lock state returned by {@link #getState}.
    1164          * <li> Invoke {@link #release} with saved state as argument,
    1165          *      throwing IllegalMonitorStateException if it fails.
    1166          * <li> Block until signalled, interrupted, or timed out.
    1167          * <li> Reacquire by invoking specialized version of
    1168          *      {@link #acquire} with saved state as argument.
    1169          * <li> If interrupted while blocked in step 4, throw InterruptedException.
    1170          * <li> If timed out while blocked in step 4, return false, else true.
    1171          * </ol>
    1172          */
    1173         public final boolean await(long time, TimeUnit unit)
    1174                 throws InterruptedException {
    1175             long nanosTimeout = unit.toNanos(time);
    1176             if (Thread.interrupted())
    1177                 throw new InterruptedException();
    1178             Node node = addConditionWaiter();
    1179             int savedState = fullyRelease(node);
    1180             final long deadline = System.nanoTime() + nanosTimeout;
    1181             boolean timedout = false;
    1182             int interruptMode = 0;
    1183             while (!isOnSyncQueue(node)) {
    1184                 if (nanosTimeout <= 0L) {
    1185                     timedout = transferAfterCancelledWait(node);
    1186                     break;
    1187                 }
    1188                 if (nanosTimeout >= spinForTimeoutThreshold)
    1189                     LockSupport.parkNanos(this, nanosTimeout);
    1190                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    1191                     break;
    1192                 nanosTimeout = deadline - System.nanoTime();
    1193             }
    1194             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    1195                 interruptMode = REINTERRUPT;
    1196             if (node.nextWaiter != null)
    1197                 unlinkCancelledWaiters();
    1198             if (interruptMode != 0)
    1199                 reportInterruptAfterWait(interruptMode);
    1200             return !timedout;
    1201         }
    1202 
    1203 
    1204         final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
    1205             return sync == AbstractQueuedSynchronizer.this;
    1206         }
    1207 
    1208         /**
    1209          * Queries whether any threads are waiting on this condition.
    1210          * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
    1211          *
    1212          * @return {@code true} if there are any waiting threads
    1213          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
    1214          *         returns {@code false}
    1215          */
    1216         protected final boolean hasWaiters() {
    1217             if (!isHeldExclusively())
    1218                 throw new IllegalMonitorStateException();
    1219             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
    1220                 if (w.waitStatus == Node.CONDITION)
    1221                     return true;
    1222             }
    1223             return false;
    1224         }
    1225 
    1226         /**
    1227          * Returns an estimate of the number of threads waiting on
    1228          * this condition.
    1229          * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
    1230          *
    1231          * @return the estimated number of waiting threads
    1232          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
    1233          *         returns {@code false}
    1234          */
    1235         protected final int getWaitQueueLength() {
    1236             if (!isHeldExclusively())
    1237                 throw new IllegalMonitorStateException();
    1238             int n = 0;
    1239             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
    1240                 if (w.waitStatus == Node.CONDITION)
    1241                     ++n;
    1242             }
    1243             return n;
    1244         }
    1245 
    1246         /**
    1247          * 得到同步队列当中所有在等待的Thread集合
    1248          */
    1249         protected final Collection<Thread> getWaitingThreads() {
    1250             if (!isHeldExclusively())
    1251                 throw new IllegalMonitorStateException();
    1252             ArrayList<Thread> list = new ArrayList<Thread>();
    1253             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
    1254                 if (w.waitStatus == Node.CONDITION) {
    1255                     Thread t = w.thread;
    1256                     if (t != null)
    1257                         list.add(t);
    1258                 }
    1259             }
    1260             return list;
    1261         }
    1262     }
    1263 
    1264     /**
    1265      * Setup to support compareAndSet. We need to natively implement
    1266      * this here: For the sake of permitting future enhancements, we
    1267      * cannot explicitly subclass AtomicInteger, which would be
    1268      * efficient and useful otherwise. So, as the lesser of evils, we
    1269      * natively implement using hotspot intrinsics API. And while we
    1270      * are at it, we do the same for other CASable fields (which could
    1271      * otherwise be done with atomic field updaters).
    1272      * unsafe魔法类,直接绕过虚拟机内存管理机制,修改内存
    1273      */
    1274     private static final Unsafe unsafe = Unsafe.getUnsafe();
    1275     //偏移量
    1276     private static final long stateOffset;
    1277     private static final long headOffset;
    1278     private static final long tailOffset;
    1279     private static final long waitStatusOffset;
    1280     private static final long nextOffset;
    1281 
    1282     static {
    1283         try {
    1284             //状态偏移量
    1285             stateOffset = unsafe.objectFieldOffset
    1286                     (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
    1287             //head指针偏移量,head指向CLH队列的头部
    1288             headOffset = unsafe.objectFieldOffset
    1289                     (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
    1290             tailOffset = unsafe.objectFieldOffset
    1291                     (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
    1292             waitStatusOffset = unsafe.objectFieldOffset
    1293                     (Node.class.getDeclaredField("waitStatus"));
    1294             nextOffset = unsafe.objectFieldOffset
    1295                     (Node.class.getDeclaredField("next"));
    1296 
    1297         } catch (Exception ex) { throw new Error(ex); }
    1298     }
    1299 
    1300     /**
    1301      * CAS 修改头部节点指向. 并发入队时使用.
    1302      */
    1303     private final boolean compareAndSetHead(Node update) {
    1304         return unsafe.compareAndSwapObject(this, headOffset, null, update);
    1305     }
    1306 
    1307     /**
    1308      * CAS 修改尾部节点指向. 并发入队时使用.
    1309      */
    1310     private final boolean compareAndSetTail(Node expect, Node update) {
    1311         return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    1312     }
    1313 
    1314     /**
    1315      * CAS 修改信号量状态.
    1316      */
    1317     private static final boolean compareAndSetWaitStatus(Node node,
    1318                                                          int expect,
    1319                                                          int update) {
    1320         return unsafe.compareAndSwapInt(node, waitStatusOffset,
    1321                 expect, update);
    1322     }
    1323 
    1324     /**
    1325      * 修改节点的后继指针.
    1326      */
    1327     private static final boolean compareAndSetNext(Node node,
    1328                                                    Node expect,
    1329                                                    Node update) {
    1330         return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    1331     }
    1332 }
    1333 
    1334 
    1335 AQS框架具体实现-独占锁实现ReentrantLock
    1336 
    1337 public class ReentrantLock implements Lock, java.io.Serializable {
    1338     private static final long serialVersionUID = 7373984872572414699L;
    1339     /**
    1340      * 内部调用AQS的动作,都基于该成员属性实现
    1341      */
    1342     private final Sync sync;
    1343 
    1344     /**
    1345      * ReentrantLock锁同步操作的基础类,继承自AQS框架.
    1346      * 该类有两个继承类,1、NonfairSync 非公平锁,2、FairSync公平锁
    1347      */
    1348         abstract static class Sync extends AbstractQueuedSynchronizer {
    1349         private static final long serialVersionUID = -5179523762034025860L;
    1350 
    1351         /**
    1352          * 加锁的具体行为由子类实现
    1353          */
    1354         abstract void lock();
    1355 
    1356         /**
    1357          * 尝试获取非公平锁
    1358          */
    1359         final boolean nonfairTryAcquire(int acquires) {
    1360             //acquires = 1
    1361             final Thread current = Thread.currentThread();
    1362             int c = getState();
    1363             /**
    1364              * 不需要判断同步队列(CLH)中是否有排队等待线程
    1365              * 判断state状态是否为0,不为0可以加锁
    1366              */
    1367             if (c == 0) {
    1368                 //unsafe操作,cas修改state状态
    1369                 if (compareAndSetState(0, acquires)) {
    1370                     //独占状态锁持有者指向当前线程
    1371                     setExclusiveOwnerThread(current);
    1372                     return true;
    1373                 }
    1374             }
    1375             /**
    1376              * state状态不为0,判断锁持有者是否是当前线程,
    1377              * 如果是当前线程持有 则state+1
    1378              */
    1379             else if (current == getExclusiveOwnerThread()) {
    1380                 int nextc = c + acquires;
    1381                 if (nextc < 0) // overflow
    1382                     throw new Error("Maximum lock count exceeded");
    1383                 setState(nextc);
    1384                 return true;
    1385             }
    1386             //加锁失败
    1387             return false;
    1388         }
    1389 
    1390         /**
    1391          * 释放锁
    1392          */
    1393         protected final boolean tryRelease(int releases) {
    1394             int c = getState() - releases;
    1395             if (Thread.currentThread() != getExclusiveOwnerThread())
    1396                 throw new IllegalMonitorStateException();
    1397             boolean free = false;
    1398             if (c == 0) {
    1399                 free = true;
    1400                 setExclusiveOwnerThread(null);
    1401             }
    1402             setState(c);
    1403             return free;
    1404         }
    1405 
    1406         /**
    1407          * 判断持有独占锁的线程是否是当前线程
    1408          */
    1409         protected final boolean isHeldExclusively() {
    1410             return getExclusiveOwnerThread() == Thread.currentThread();
    1411         }
    1412 
    1413         //返回条件对象
    1414         final ConditionObject newCondition() {
    1415             return new ConditionObject();
    1416         }
    1417 
    1418 
    1419         final Thread getOwner() {
    1420             return getState() == 0 ? null : getExclusiveOwnerThread();
    1421         }
    1422 
    1423         final int getHoldCount() {
    1424             return isHeldExclusively() ? getState() : 0;
    1425         }
    1426 
    1427         final boolean isLocked() {
    1428             return getState() != 0;
    1429         }
    1430 
    1431         /**
    1432          * Reconstitutes the instance from a stream (that is, deserializes it).
    1433          */
    1434         private void readObject(java.io.ObjectInputStream s)
    1435                 throws java.io.IOException, ClassNotFoundException {
    1436             s.defaultReadObject();
    1437             setState(0); // reset to unlocked state
    1438         }
    1439     }
    1440 
    1441     /**
    1442      * 非公平锁
    1443      */
    1444     static final class NonfairSync extends Sync {
    1445         private static final long serialVersionUID = 7316153563782823691L;
    1446         /**
    1447          * 加锁行为
    1448          */
    1449         final void lock() {
    1450             /**
    1451              * 第一步:直接尝试加锁
    1452              * 与公平锁实现的加锁行为一个最大的区别在于,此处不会去判断同步队列(CLH队列)中
    1453              * 是否有排队等待加锁的节点,上来直接加锁(判断state是否为0,CAS修改state为1)
    1454              * ,并将独占锁持有者 exclusiveOwnerThread 属性指向当前线程
    1455              * 如果当前有人占用锁,再尝试去加一次锁
    1456              */
    1457             if (compareAndSetState(0, 1))
    1458                 setExclusiveOwnerThread(Thread.currentThread());
    1459             else
    1460                 //AQS定义的方法,加锁
    1461                 acquire(1);
    1462         }
    1463 
    1464         /**
    1465          * 父类AbstractQueuedSynchronizer.acquire()中调用本方法
    1466          */
    1467         protected final boolean tryAcquire(int acquires) {
    1468             return nonfairTryAcquire(acquires);
    1469         }
    1470     }
    1471 
    1472     /**
    1473      * 公平锁
    1474      */
    1475     static final class FairSync extends Sync {
    1476         private static final long serialVersionUID = -3000897897090466540L;
    1477         final void lock() {
    1478             acquire(1);
    1479         }
    1480         /**
    1481          * 重写aqs中的方法逻辑
    1482          * 尝试加锁,被AQS的acquire()方法调用
    1483          */
    1484         protected final boolean tryAcquire(int acquires) {
    1485             final Thread current = Thread.currentThread();
    1486             int c = getState();
    1487             if (c == 0) {
    1488                 /**
    1489                  * 与非公平锁中的区别,需要先判断队列当中是否有等待的节点
    1490                  * 如果没有则可以尝试CAS获取锁
    1491                  */
    1492                 if (!hasQueuedPredecessors() &&
    1493                         compareAndSetState(0, acquires)) {
    1494                     //独占线程指向当前线程
    1495                     setExclusiveOwnerThread(current);
    1496                     return true;
    1497                 }
    1498             }
    1499             else if (current == getExclusiveOwnerThread()) {
    1500                 int nextc = c + acquires;
    1501                 if (nextc < 0)
    1502                     throw new Error("Maximum lock count exceeded");
    1503                 setState(nextc);
    1504                 return true;
    1505             }
    1506             return false;
    1507         }
    1508     }
    1509 
    1510     /**
    1511      * 默认构造函数,创建非公平锁对象
    1512      */
    1513     public ReentrantLock() {
    1514         sync = new NonfairSync();
    1515     }
    1516 
    1517     /**
    1518      * 根据要求创建公平锁或非公平锁
    1519      */
    1520     public ReentrantLock(boolean fair) {
    1521         sync = fair ? new FairSync() : new NonfairSync();
    1522     }
    1523 
    1524     /**
    1525      * 加锁
    1526      */
    1527     public void lock() {
    1528         sync.lock();
    1529     }
    1530 
    1531     /**
    1532      * 尝试获去取锁,获取失败被阻塞,线程被中断直接抛出异常
    1533      */
    1534     public void lockInterruptibly() throws InterruptedException {
    1535         sync.acquireInterruptibly(1);
    1536     }
    1537 
    1538     /**
    1539      * 尝试加锁
    1540      */
    1541     public boolean tryLock() {
    1542         return sync.nonfairTryAcquire(1);
    1543     }
    1544 
    1545     /**
    1546      * 指定等待时间内尝试加锁
    1547      */
    1548     public boolean tryLock(long timeout, TimeUnit unit)
    1549             throws InterruptedException {
    1550         return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    1551     }
    1552 
    1553     /**
    1554      * 尝试去释放锁
    1555      */
    1556     public void unlock() {
    1557         sync.release(1);
    1558     }
    1559 
    1560     /**
    1561      * 返回条件对象
    1562      */
    1563     public Condition newCondition() {
    1564         return sync.newCondition();
    1565     }
    1566 
    1567     /**
    1568      * 返回当前线程持有的state状态数量
    1569      */
    1570     public int getHoldCount() {
    1571         return sync.getHoldCount();
    1572     }
    1573 
    1574     /**
    1575      * 查询当前线程是否持有锁
    1576      */
    1577     public boolean isHeldByCurrentThread() {
    1578         return sync.isHeldExclusively();
    1579     }
    1580 
    1581     /**
    1582      * 状态表示是否被Thread加锁持有
    1583      */
    1584     public boolean isLocked() {
    1585         return sync.isLocked();
    1586     }
    1587 
    1588     /**
    1589      * 是否公平锁?是返回true 否则返回 false
    1590      */
    1591     public final boolean isFair() {
    1592         return sync instanceof FairSync;
    1593     }
    1594 
    1595     /**
    1596      * 获取持有锁的当前线程
    1597      */
    1598     protected Thread getOwner() {
    1599         return sync.getOwner();
    1600     }
    1601 
    1602     /**
    1603      * 判断队列当中是否有在等待获取锁的Thread节点
    1604      */
    1605     public final boolean hasQueuedThreads() {
    1606         return sync.hasQueuedThreads();
    1607     }
    1608 
    1609     /**
    1610      * 当前线程是否在同步队列中等待
    1611      */
    1612     public final boolean hasQueuedThread(Thread thread) {
    1613         return sync.isQueued(thread);
    1614     }
    1615 
    1616     /**
    1617      * 获取同步队列长度
    1618      */
    1619     public final int getQueueLength() {
    1620         return sync.getQueueLength();
    1621     }
    1622 
    1623     /**
    1624      * 返回Thread集合,排队中的所有节点Thread会被返回
    1625      */
    1626     protected Collection<Thread> getQueuedThreads() {
    1627         return sync.getQueuedThreads();
    1628     }
    1629 
    1630     /**
    1631      * 条件队列当中是否有正在等待的节点
    1632      */
    1633     public boolean hasWaiters(Condition condition) {
    1634         if (condition == null)
    1635             throw new NullPointerException();
    1636         if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
    1637             throw new IllegalArgumentException("not owner");
    1638         return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
    1639     }
    1640 
    1641 }
  • 相关阅读:
    android studio 中如何合并冲突(转)
    关于学习ZigBee的书籍
    多一点学习之外的人文思考
    有关技术文档的一点感想
    有关文学知识对我大学生活的影响
    【转】华为PCB布线规范
    【转】怎么样从一个疯狂下载者成为一个学习者!!!值得反省下的问题·~~
    时钟1
    关于有源滤波器和无源滤波器
    【转】zz个人的制板习惯流程
  • 原文地址:https://www.cnblogs.com/Mapi/p/14397300.html
Copyright © 2011-2022 走看看