zoukankan      html  css  js  c++  java
  • Java并发编程--AQS

    概述

      抽象队列同步器(AbstractQueuedSynchronizer,简称AQS)是用来构建锁或者其他同步组件的基础框架,它使用一个整型的volatile变量(命名为state)来维护同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

      volatile变量的读写和CAS是concurrent包得以实现的基础。CAS表示如果当前状态值等于预期值,则以原子方式将同步状态设置为给定的更新值,此操作具有volatile读和写的内存语义。AQS通过volatile的读/写和CAS所具有的volatile读和写的内存语义来实现线程之间的通信。

      高层类   Lock  同步器  阻塞队列  Executor  并发容器
      基础类 AQS  非阻塞数据结构  原子变量类
      volatile变量的读/写  CAS

      concurrent包的实现结构如上图所示,AQS、非阻塞数据结构和原子变量类等基础类都是基于volatile变量的读/写和CAS实现,而像Lock、同步器、阻塞队列、Executor和并发容器等高层类又是基于基础类实现。

    AQS的域和方法

      域

    1 private transient volatile Node head; //同步队列的head节点
    2 private transient volatile Node tail; //同步队列的tail节点
    3 private volatile int state; //同步状态

      方法

        AQS提供的可以修改同步状态的3个方法:

    1 protected final int getState();  //获取同步状态
    2 protected final void setState(int newState);  //设置同步状态
    3 protected final boolean compareAndSetState(int expect, int update);  //CAS设置同步状态

        AQS提供的模板方法,主要有以下三类:

          1)独占式获取和释放同步状态

    1 public final void acquire(int arg) //独占式获取同步状态,如果不成功会进入同步队列等待。
    2 public final void acquireInterruptibly(int arg) //与acquire不同的是,能响应中断
    3 public final boolean tryAcquireNanos(int arg, long nanosTimeout) //增加超时机制
    4 
    5 public final boolean release(int arg) //独占式释放同步状态,该方法会调用重写的tryRelease(int arg)。

            以上三种获取同步状态的方法都会调用自定义的tryAcquire(int arg)方法,acquire的获取失败时的入队等待机制、acquireInterruptibly的响应中断机制、tryAcquireNanos的超时机制等AQS已经实现好,这样开发人员只需要实现自己的获取同步状态机制,就可以大大降低了实现一个可靠自定义同步组件的门槛。

          2)共享式获取和释放同步状态

    1 public final void acquireShared(int arg) //共享式获取同步状态,如果不成功会进入同步队列等待。与独占式不同的是,同一时刻可以有多个线程获取到同步状态。
    2 public final void acquireSharedInterruptibly(int arg) //可响应中断
    3 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) //超时机制
    4 
    5 public final boolean releaseShared(int arg) //共享式释放同步状态,该方法会调用重写的tryReleaseShared(int arg)。

            同样以上三种获取同步状态的方法会调用自定义的tryAcquireShared方法。

          3)查询同步队列中的等待线程情

    1 publicfinalCollection<Thread>getQueuedThreads()
    2 publicfinalbooleanhasQueuedThreads()//返回包含可能正在等待获取的线程列表,需要遍历链表。返回的只是个估计值,且是无序的。这个方法的主要是为子类提供的监视同步队列措施而设计。
    3 。。。

        

        AQS提供的自定义方法

          以上AQS的方法都为final方法,不能被子类重写,因为它们对于任何自定义同步器应该是不需要更改的,下面为AQS提供的可以重写的方法。开发者需要根据自定义同步组件的特点,重写以下方法。这些方法的实现在内部必须是线程安全的,通常应该很短并且不被阻塞。

    1 protected boolean tryAcquire(int arg) //独占式获取同步状态,此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。返回值语义:true代表获取成功,false代表获取失败。
    2 protected boolean tryRelease(int arg) //独占式释放同步状态
    3 
    4 protected int tryAcquireShared(int arg) //共享式获取同步状态,返回值语义:负数代表获取失败、0代表获取成功但没有剩余资源、正数代表获取成功,还有剩余资源。
    5 protected boolean tryReleaseShared(int arg) //共享式释放同步状态
    6 
    7 protected boolean isHeldExclusively() //AQS是否被当前线程所独占

    AQS的使用

      怎么使用AQS实现自定义同步组件?

      自定义同步组件实例一:独占锁(使用独占式的获取与释放)。

        Mutex同步组件同一时刻只允许一个线程占用锁,不支持可重入。0表示未锁定状态,1表示锁定状态。 

     1 class Mutex implements Lock, java.io.Serializable {
     2  
     3     //静态内部类,自定义同步器
     4     private static class Sync extends AbstractQueuedSynchronizer {
     5     
     6         // 释放处于占用状态(重写isHeldExclusively)Report whether in locked state
     7         protected boolean isHeldExclusively() { 
     8             return getState() == 1; 
     9         }
    10 
    11         // 独占式获取锁(重写tryAcquire) Acquire the lock if state is zero
    12         public boolean tryAcquire(int acquires) {
    13             assert acquires == 1; // Otherwise unused
    14             if (compareAndSetState(0, 1)) {    //CAS设置状态为1。
    15                 setExclusiveOwnerThread(Thread.currentThread());
    16                 return true;
    17             }
    18             return false;
    19         }
    20 
    21         // 独占式释放锁(重写tryRelease) Release the lock by setting state to zero
    22         protected boolean tryRelease(int releases) {
    23             assert releases == 1; // Otherwise unused
    24             if (getState() == 0) //获取状态
    25                 throw new IllegalMonitorStateException();
    26             setExclusiveOwnerThread(null);
    27             setState(0);    //设置状态为0
    28             return true;
    29         }
    30        
    31         // Provide a Condition
    32         //每个Condition都包含一个队列
    33         Condition newCondition() { return new ConditionObject(); }
    34 
    35         // Deserialize properly
    36         private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
    37             s.defaultReadObject();
    38             setState(0); // reset to unlocked state
    39         }
    40     }
    41 
    42     // The sync object does all the hard work. We just forward to it.
    43     private final Sync sync = new Sync();
    44     
    45     //仅需要将操作代理到sync
    46     public void lock()                { sync.acquire(1); }    //调用AQS的模板方法,
    47     public boolean tryLock()          { return sync.tryAcquire(1); }
    48     public void unlock()              { sync.release(1); }
    49     public Condition newCondition()   { return sync.newCondition(); }
    50     public boolean isLocked()         { return sync.isHeldExclusively(); }
    51     public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
    52     public void lockInterruptibly() throws InterruptedException { 
    53         sync.acquireInterruptibly(1);
    54     }
    55     public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
    56         return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    57     }
    58 }

      自定义同步组件实例二:锁存器(使用共享的获取与释放方法)

        BooleanLatch可用在多个线程需要等待某个事件发生才能继续执行的情况中。初始状态state=0, 此时所有线程获取同步状态方法tryAcquireShared返回-1,即获取失败,入等待队列。直到有线程调用tryReleaseShared释放同步状态,被阻塞的状态才会进行执行。

     1 class BooleanLatch {
     2 
     3     private static class Sync extends AbstractQueuedSynchronizer {
     4         boolean isSignalled() { return getState() != 0; }
     5 
     6         protected int tryAcquireShared(int ignore) {
     7             return isSignalled()? 1 : -1;
     8         }
     9         
    10         protected boolean tryReleaseShared(int ignore) {
    11             setState(1);
    12             return true;
    13         }
    14     }
    15 
    16     private final Sync sync = new Sync();
    17     public boolean isSignalled() { return sync.isSignalled(); }
    18     public void signal()         { sync.releaseShared(1); }
    19     public void await() throws InterruptedException {
    20         sync.acquireSharedInterruptibly(1);
    21     }
    22 }

    AQS的实现原理

      同步队列

        同步队列中的节点Node用来保存获取同步状态失败的线程引用、等待状态、以及前驱和后继节点。

        AQS中两个域:head节点和tail节点,组成一个FIFO的双向队列。

          private transient volatile Node head;

          private transient volatile Node tail;

        Node源码如下。

     1 static final class Node {
     2     /** Marker to indicate a node is waiting in shared mode */
     3     static final Node SHARED = new Node();    //共享方式
     4     /** Marker to indicate a node is waiting in exclusive mode */
     5     static final Node EXCLUSIVE = null;        //独占方式
     6 
     7     /** waitStatus value to indicate thread has cancelled */
     8     static final int CANCELLED =  1;    //waitStatus=1为取消状态
     9     /** waitStatus value to indicate successor's thread needs unparking */
    10     static final int SIGNAL    = -1;    //后继节点的线程处于等待状态,需要被唤醒。
    11     /** waitStatus value to indicate thread is waiting on condition */
    12     static final int CONDITION = -2;    //当前线程在condition上等待
    13     static final int PROPAGATE = -3;    //表示下一次共享式同步状态获取将会无条件的被传播下去。
    14 
    15     volatile int waitStatus;    //等待状态,0-初始状态
    16     volatile Node prev;            //前驱节点
    17     volatile Node next;            //后继节点
    18     volatile Thread thread;        //获取同步的线程
    19     Node nextWaiter;            
    20 
    21     final boolean isShared() {
    22         return nextWaiter == SHARED;
    23     }
    24     //返回前驱节点
    25     final Node predecessor() throws NullPointerException {
    26         Node p = prev;
    27         if (p == null)
    28             throw new NullPointerException();
    29         else
    30             return p;
    31     }
    32 
    33     Node() {    // Used to establish initial head or SHARED marker
    34     }
    35 
    36     Node(Thread thread, Node mode) {     // Used by addWaiter
    37         this.nextWaiter = mode;
    38         this.thread = thread;
    39     }
    40 
    41     Node(Thread thread, int waitStatus) { // Used by Condition
    42         this.waitStatus = waitStatus;
    43         this.thread = thread;
    44     }
    45 }

      独占式获取同步状态

        1)调用自定义的tryAcquire方法,该方法要保证线程线程安全的获取同步状态(如Mutex中的tryAcquire使用CAS更新保证原子性),如果获取成功则return。

        2)如果获取失败,构造Node,并通过addWaiter方法将节点插入队列尾部。Node.EXCLUSIVE表示节点以独占方式等待。

        3)acquireQueued方法中该节点自旋方式尝试获取同步状态。如果获取不到同步状态,则阻塞节点中的线程,被阻塞线程唤醒依靠前驱节点的出队或阻塞线程被终端来实现。

     1 //该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出.
     2 public final void acquire(int arg) {
     3     if (!tryAcquire(arg) &&
     4         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
     5         selfInterrupt();
     6 }
     7 
     8 //将节点添加到队尾
     9 private Node addWaiter(Node mode) {
    10     Node node = new Node(Thread.currentThread(), mode);
    11     // Try the fast path of enq; backup to full enq on failure
    12     //快速尝试入队,如果失败则需要调用enq(node)方法入队,这样做有什么好处?有一定概率减少一次方法调用
    13     //compareAndSetTail保证Node入队是线程安全的
    14     Node pred = tail;
    15     if (pred != null) {
    16         node.prev = pred;
    17         if (compareAndSetTail(pred, node)) {
    18             pred.next = node;
    19             return node;
    20         }
    21     }
    22     enq(node);
    23     return node;
    24 }
    25 
    26 //初始化或自旋CAS直到入队成功
    27 private Node enq(final Node node) {
    28     for (;;) {
    29         Node t = tail;
    30         if (t == null) { // Must initialize
    31             if (compareAndSetHead(new Node()))
    32                 tail = head;
    33         } else {
    34             node.prev = t;
    35             if (compareAndSetTail(t, node)) {
    36                 t.next = node;
    37                 return t;
    38             }
    39         }
    40     }
    41 }

        入同步队列之后怎么获取同步状态?阻塞机制是怎样的?

     1 //自旋方式尝试获取同步状态
     2 final boolean acquireQueued(final Node node, int arg) {
     3     boolean failed = true;
     4     try {
     5         boolean interrupted = false;
     6         for (;;) {
     7             final Node p = node.predecessor();    //获取当前节点的前驱节点
     8             if (p == head && tryAcquire(arg)) {    //如果前驱节点是head节点则尝试获取同步状态
     9                 setHead(node);
    10                 p.next = null; // help GC
    11                 failed = false;
    12                 return interrupted;
    13             }
    14             if (shouldParkAfterFailedAcquire(p, node) &&
    15                 parkAndCheckInterrupt())    //判断当前线程是否应该被阻塞,如果是则阻塞直到被唤醒继续循环,如果不是则再次尝试获取同步状态。
    16                 interrupted = true;
    17         }
    18     } finally {
    19         if (failed)
    20             cancelAcquire(node);
    21     }
    22 }
    23 
    24 //判断当前线程是否应该被阻塞,如果线程应该被阻塞则返回true。检查和更新获取同步状态失败Node的前驱节点的waitStatus。
    25 //其中pred为node的前驱节点
    26 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    27     int ws = pred.waitStatus;
    28     if (ws == Node.SIGNAL)
    29         /*
    30          * This node has already set status asking a release
    31          * to signal it, so it can safely park.
    32          */
    33         //前驱节点已经设置为SIGNAL状态,在前驱节点的线程释放同步状态会唤醒当前Node的线程。
    34         return true;
    35     if (ws > 0) {
    36         /*
    37          * Predecessor was cancelled. Skip over predecessors and
    38          * indicate retry.
    39          */
    40          //前驱节点是cancelled状态,跳过被取消的Node,直到向前找到waitStatus > 0的Node作为当前节点的前驱,然后重试获取同步状态。
    41         do {
    42             node.prev = pred = pred.prev;
    43         } while (pred.waitStatus > 0);
    44         pred.next = node;
    45     } else {
    46         /* waitStatus = 0是初始状态。
    47          * waitStatus must be 0 or PROPAGATE.  Indicate that we
    48          * need a signal, but don't park yet.  Caller will need to
    49          * retry to make sure it cannot acquire before parking.
    50          */
    51         //将前驱节点的等待状态改为SIGNAL。
    52         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    53     }
    54     return false;
    55 }
    56 
    57 //阻塞线程
    58 private final boolean parkAndCheckInterrupt() {
    59     LockSupport.park(this);    //使用LockSupport阻塞当前线程
    60     return Thread.interrupted();
    61 }

        在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。

      独占式释放同步状态

        在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。

     1 public final boolean release(int arg) {
     2     if (tryRelease(arg)) {
     3         Node h = head;
     4         if (h != null && h.waitStatus != 0)
     5             unparkSuccessor(h);    //唤醒后继节点线程
     6         return true;
     7     }
     8     return false;
     9 }
    10 
    11 //唤醒后继节点
    12 private void unparkSuccessor(Node node) {
    13     /*
    14      * If status is negative (i.e., possibly needing signal) try
    15      * to clear in anticipation of signalling.  It is OK if this
    16      * fails or if status is changed by waiting thread.
    17      */
    18      //将当前节点的waitStatus改为0-原始状态,目的是什么?
    19     int ws = node.waitStatus;
    20     if (ws < 0)
    21         compareAndSetWaitStatus(node, ws, 0);
    22 
    23     /*
    24      * Thread to unpark is held in successor, which is normally
    25      * just the next node.  But if cancelled or apparently null,
    26      * traverse backwards from tail to find the actual
    27      * non-cancelled successor.
    28      */
    29      //如果后继节点为null或被取消,则从tail向前找到正常的后继节点
    30     Node s = node.next;
    31     if (s == null || s.waitStatus > 0) {
    32         s = null;
    33         for (Node t = tail; t != null && t != node; t = t.prev)
    34             if (t.waitStatus <= 0)
    35                 s = t;
    36     }
    37     if (s != null)
    38         LockSupport.unpark(s.thread);    //唤醒后继节点
    39 }

      共享式获取同步状态

        1)首先调用自定义方法tryAcquireShared尝试获取同步状态,至少调用一次tryAcquireShared方法,如果返回值>=0,则获取成功,return;否则执行步骤2),

        2)当获取失败时,为当前线程以共享方式创建Node并插入同步队列。

        3)入队后,以自旋方式尝试获取同步状态,如果前驱节点为head节点,则尝试获取同步状态,获取失败,则阻塞线程。

     1 //共享式获取同步状态,忽略异常。
     2 //注意:实现自定义方法tryAcquireShared时,要遵循AQS定义的返回值语义,负数代表获取失败、0代表获取成功但没有剩余资源、正数代表获取成功,还有剩余资源。
     3 public final void acquireShared(int arg) {
     4     if (tryAcquireShared(arg) < 0)    //
     5         doAcquireShared(arg);
     6 }
     7 
     8 private void doAcquireShared(int arg) {
     9     final Node node = addWaiter(Node.SHARED);    //为当前线程以共享方式创建Node并插入同步队列尾部。
    10     boolean failed = true;
    11     try {
    12         boolean interrupted = false;
    13         for (;;) {
    14             final Node p = node.predecessor();
    15             if (p == head) {    //如果前驱节点为head节点,则尝试获取同步状态
    16                 int r = tryAcquireShared(arg);
    17                 if (r >= 0) {    //获取成功
    18                     setHeadAndPropagate(node, r);    //设置node为head节点,还有剩余资源则继续唤醒后继的node
    19                     p.next = null; // help GC
    20                     if (interrupted)    //如果等待过程中被中断过,则中断当前线程
    21                         selfInterrupt();    //Thread.currentThread().interrupt();
    22                     failed = false;
    23                     return;
    24                 }
    25             }
    26             //shouldParkAfterFailedAcquire方法判断当前线程是否应该被阻塞,如果是则调用parkAndCheckInterrupt阻塞当前线程
    27             if (shouldParkAfterFailedAcquire(p, node) &&
    28                 parkAndCheckInterrupt())
    29                 interrupted = true;
    30         }
    31     } finally {
    32         if (failed)
    33             cancelAcquire(node);
    34     }
    35 }
    36 
    37 //获取成功后,设置node为head节点,
    38 private void setHeadAndPropagate(Node node, int propagate) {
    39     Node h = head; // Record old head for check below
    40     setHead(node);    //设置node为head节点,因此node出队
    41     //如果还有剩余资源,尝试唤醒node节点的后继节点
    42     /*
    43      * Try to signal next queued node if:
    44      *   Propagation was indicated by caller,
    45      *     or was recorded (as h.waitStatus) by a previous operation
    46      *     (note: this uses sign-check of waitStatus because
    47      *      PROPAGATE status may transition to SIGNAL.)
    48      * and
    49      *   The next node is waiting in shared mode,
    50      *     or we don't know, because it appears null
    51      *
    52      * The conservatism in both of these checks may cause
    53      * unnecessary wake-ups, but only when there are multiple
    54      * racing acquires/releases, so most need signals now or soon
    55      * anyway.
    56      */
    57     if (propagate > 0 || h == null || h.waitStatus < 0) {
    58         Node s = node.next;
    59         if (s == null || s.isShared())
    60             doReleaseShared();
    61     }
    62 }
    63 
    64 //设置node为head节点,因此node出队
    65 private void setHead(Node node) {
    66     head = node;
    67     node.thread = null;
    68     node.prev = null;
    69 }

      共享式释放同步状态

     1 public final boolean releaseShared(int arg) {
     2     if (tryReleaseShared(arg)) {
     3         doReleaseShared();
     4         return true;
     5     }
     6     return false;
     7 }
     8 
     9 //唤醒后继节点线程并确保被传播
    10 private void doReleaseShared() {
    11     /*
    12      * Ensure that a release propagates, even if there are other
    13      * in-progress acquires/releases.  This proceeds in the usual
    14      * way of trying to unparkSuccessor of head if it needs
    15      * signal. But if it does not, status is set to PROPAGATE to
    16      * ensure that upon release, propagation continues.
    17      * Additionally, we must loop in case a new node is added
    18      * while we are doing this. Also, unlike other uses of
    19      * unparkSuccessor, we need to know if CAS to reset status
    20      * fails, if so rechecking.
    21      */
    22     for (;;) {
    23         Node h = head;
    24         if (h != null && h != tail) {
    25             int ws = h.waitStatus;
    26             if (ws == Node.SIGNAL) {
    27                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    28                     continue;            // loop to recheck cases
    29                 unparkSuccessor(h);    //唤醒后继节点
    30             }
    31             else if (ws == 0 &&
    32                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    33                 continue;                // loop on failed CAS
    34         }
    35         if (h == head)                   // loop if head changed
    36             break;
    37     }
    38 }

      独占式可中断获取

        可响应中断获取与普通获取的区别:当线程被中断时,会立即返回,并抛出InterruptedException,执行cancelAcquire方法取消获取同步状态;而普通获取只是将中断标志位置为true,但线程依旧会阻塞在等待队列中。

     1 public final void acquireInterruptibly(int arg)
     2         throws InterruptedException {
     3     if (Thread.interrupted())
     4         throw new InterruptedException();    //抛出中断异常
     5     if (!tryAcquire(arg))
     6         doAcquireInterruptibly(arg);
     7 }
     8 
     9 private void doAcquireInterruptibly(int arg)
    10     throws InterruptedException {
    11     final Node node = addWaiter(Node.EXCLUSIVE);
    12     boolean failed = true;
    13     try {
    14         for (;;) {
    15             final Node p = node.predecessor();
    16             if (p == head && tryAcquire(arg)) {
    17                 setHead(node);
    18                 p.next = null; // help GC
    19                 failed = false;
    20                 return;
    21             }
    22             if (shouldParkAfterFailedAcquire(p, node) &&
    23                 parkAndCheckInterrupt())
    24                 throw new InterruptedException();    //唯一的区别,抛出中断异常。而普通的获取操作,只是将中断标志位置为true。
    25         }
    26     } finally {
    27         if (failed)
    28             cancelAcquire(node);
    29     }
    30 }

        如何取消线程?

     1 private void cancelAcquire(Node node) {
     2     // Ignore if node doesn't exist
     3     if (node == null)
     4         return;
     5 
     6     node.thread = null;    //将线程置为null
     7 
     8     // Skip cancelled predecessors
     9     Node pred = node.prev;
    10     while (pred.waitStatus > 0)    //即waitStatus=1,为cancelled状态。跳过状态为取消的前驱节点
    11         node.prev = pred = pred.prev;
    12 
    13     // predNext is the apparent node to unsplice. CASes below will
    14     // fail if not, in which case, we lost race vs another cancel
    15     // or signal, so no further action is necessary.
    16     Node predNext = pred.next;
    17 
    18     // Can use unconditional write instead of CAS here.
    19     // After this atomic step, other Nodes can skip past us.
    20     // Before, we are free of interference from other threads.
    21     node.waitStatus = Node.CANCELLED;    //当前节点置为cancelled状态
    22 
    23     // If we are the tail, remove ourselves.
    24     if (node == tail && compareAndSetTail(node, pred)) {    //如果当前节点为tail节点,删除该节点
    25         compareAndSetNext(pred, predNext, null);
    26     } else {
    27         // If successor needs signal, try to set pred's next-link
    28         // so it will get one. Otherwise wake it up to propagate.
    29         //
    30         int ws;
    31         if (pred != head &&
    32             ((ws = pred.waitStatus) == Node.SIGNAL ||
    33              (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
    34             pred.thread != null) {
    35             //如果前驱节点waitStatus为SIGNAL或者CAS更新为SIGNAL成功,则pred释放同步状态时会通知后继节点
    36             //并且pred.thread不为null,则cas将pred的后继节点置为node.next,
    37             Node next = node.next;
    38             if (next != null && next.waitStatus <= 0)
    39                 compareAndSetNext(pred, predNext, next);
    40         } else {
    41             unparkSuccessor(node);    //唤醒后继节点
    42         }
    43 
    44         node.next = node; // help GC next指向自己,node从等待队列中移除。
    45     }
    46 }

      独占式可超时获取

        在响应中断的基础上增加了超时机制

     1 public final boolean tryAcquireNanos(int arg, long nanosTimeout)
     2         throws InterruptedException {
     3     if (Thread.interrupted())
     4         throw new InterruptedException();
     5     return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
     6 }
     7 
     8 private boolean doAcquireNanos(int arg, long nanosTimeout)
     9     throws InterruptedException {
    10     long lastTime = System.nanoTime();
    11     final Node node = addWaiter(Node.EXCLUSIVE);
    12     boolean failed = true;
    13     try {
    14         for (;;) {
    15             final Node p = node.predecessor();
    16             if (p == head && tryAcquire(arg)) {
    17                 setHead(node);
    18                 p.next = null; // help GC
    19                 failed = false;
    20                 return true;
    21             }
    22             if (nanosTimeout <= 0)    //判断是否超时,nanosTimeout<=0表示超时,如果超时则return false.
    23                 return false;
    24             if (shouldParkAfterFailedAcquire(p, node) &&
    25                 nanosTimeout > spinForTimeoutThreshold)
    26                 LockSupport.parkNanos(this, nanosTimeout);    //如果还剩余的时间nanosTimeout>阈值(spinForTimeoutThreshold=1000纳秒),则阻塞当前线程nanosTimeout纳秒。当nanosTimeout<1000纳秒时,则不阻塞当前线程,而是进入快速的自旋过程。原因在于,非常短的超时等待无法做到十分精确,如果这时再进行超时等待,相反会让nanosTimeout的超时从整体上表现得反而不精确。因此,在超时非常短的场景下,同步器会进入无条件的快速自旋。
    27             long now = System.nanoTime();    //当前时间
    28             //lastTime表示上次唤醒时间,now - lastTime表示已经睡眠的时间
    29             nanosTimeout -= now - lastTime;    //nanosTimeout表示还剩余的时间,nanosTimeout>0表示表示超时时间未到
    30             lastTime = now;
    31             if (Thread.interrupted())
    32                 throw new InterruptedException();    //抛异常
    33         }
    34     } finally {
    35         if (failed)
    36             cancelAcquire(node);
    37     }
    38 }

      总结,三种方式获取同步状态方式的对比,主要区别在于获取同步状态失败时的处理逻辑:

        acquire方法直接阻塞线程,不响应中断,只是将中断标记置为true,但线程依旧会阻塞在等待队列中。

        acquireInterruptibly方法直接阻塞线程,响应中断,当线程被中断时,会立即返回,并抛出InterruptedException。

        tryAcquireNanos方法将线程阻塞nanosTimeout秒,如何超时还未获取到同步状态,则返回。同时支持响应中断。

    1 public final void acquire(int arg)    //独占式获取同步状态,如果不成功会进入同步队列等待。
    2 public final void acquireInterruptibly(int arg)    //与acquire不同的是,能响应中断
    3 public final boolean tryAcquireNanos(int arg, long nanosTimeout)    //增加超时机制

    LockSupport

      在阻塞和唤醒线程时,使用了LockSupport类。LockSupport提供了一系列阻塞和唤醒线程的公共方法。底层使用unsafe提供的方法实现。

     1 void park()    //阻塞当前线程,只有调用unpark或中断才能从park方法中返回
     2 void parkNanos(long nanos)    //超时阻塞当前线程,超时则返回
     3 void parkUntil(long deadline)    //截止时间阻塞当前线程,直到deadline
     4 
     5 void unpark(Thread thread)    //唤醒处于阻塞状态的线程
     6 
     7 //jdk1.6中增加了以下方法,blocker表示当前线程要等待的对象。线程dump信息比使用park方法要多,方便问题排查和监控。
     8 void park(Object blocker)
     9 void parkNanos(Object blocker, long nanos)
    10 void parkUntil(Object blocker, long deadline)

    参考资料:

      《Java并发编程的艺术》

      《Java并发之AQS详解》http://www.cnblogs.com/waterystone/p/4920797.html

  • 相关阅读:
    MySQL+Navicat for MySQL安装
    intellij idea14 +svn配置
    java重载(实现同一方法名,不同参数)
    Java连接MySQL数据库及操作
    通过Chrome的inspect对手机webview进行调试
    使用fiddler对手机上的程序进行抓包
    开始一个Android的appium实例
    Android模拟器内安装应用
    Appium的inspector使用
    python webdriver启动IE浏览器
  • 原文地址:https://www.cnblogs.com/zaizhoumo/p/7749820.html
Copyright © 2011-2022 走看看