zoukankan      html  css  js  c++  java
  • 走进 AQS 瞧一瞧看一看

    并发中有一块很重要的东西就是AQS。接下来一周的目标就是它。

    看复杂源码时,一眼望过去,这是什么?不要慌,像剥洋葱一样,一层层剥开(哥,喜欢"扒开"这个词)。

    参考资源:

    https://www.cnblogs.com/waterystone/p/4920797.html

    https://javadoop.com/post/AbstractQueuedSynchronizer#toc4

    一概述

    大师Doug Lea依赖FIFO(First-in-first-out)等待队列,创建了AQS框架来实现锁和同步器的操作。AQS是LimitLatch,countDownLacth,ReentrantLock,etc 这些类的基础。它们都是继承了AQS类,继承的时候,protected的方法根据业务需要必须重写,也就是tryAcquire,tryRelease,tryAcquireShared,tryReleaseShared,isHeldExclusively中的一部分或是全部。

    二AQS结构

    1.我们看看AQS的属性:

    静态内部类 Node (为什么要写静态内部类,个人觉得,Node是链表结构,在这里作为阻塞队列来使用,只有单独一个地方(AQS中)使用,所以写成静态内部类,也提高了封装性)

    // 头结点

    private transient volatile Node head;

    // 尾结点

    private transient volatile Node tail;

    // 资源状态,等于0:没有被任何thread占有;大于0:资源已经被其他线程占有

     private volatile int state;

    2.Node的内部构造

    Node类至关重要,我懒了,贴一下代码,发现里面的注释真的很详细,就不再翻译一遍了。

     1         /** Marker to indicate a node is waiting in shared mode */
    // 共享模式
    2 static final Node SHARED = new Node(); 3 /** Marker to indicate a node is waiting in exclusive mode */
    // 独占模式
    4 static final Node EXCLUSIVE = null; 5 6 /** waitStatus value to indicate thread has cancelled */ 7 static final int CANCELLED = 1;
    // 在这里,大家注意一下。waitStatus 值有1,-1,-2,中间越过了0.
    // 其实,waitStatus 的值,有0 这种情况,初始值为0
    8 /** waitStatus value to indicate successor's thread needs unparking */ 9 static final int SIGNAL = -1; 10 /** waitStatus value to indicate thread is waiting on condition */ 11 static final int CONDITION = -2; 12 /** 13 * waitStatus value to indicate the next acquireShared should 14 * unconditionally propagate 15 */ 16 static final int PROPAGATE = -3; 17 18 /** 19 * Status field, taking on only the values: 20 * SIGNAL: The successor of this node is (or will soon be) 21 * blocked (via park), so the current node must 22 * unpark its successor when it releases or 23 * cancels. To avoid races, acquire methods must 24 * first indicate they need a signal, 25 * then retry the atomic acquire, and then, 26 * on failure, block. 27 * CANCELLED: This node is cancelled due to timeout or interrupt. 28 * Nodes never leave this state. In particular, 29 * a thread with cancelled node never again blocks. 30 * CONDITION: This node is currently on a condition queue. 31 * It will not be used as a sync queue node 32 * until transferred, at which time the status 33 * will be set to 0. (Use of this value here has 34 * nothing to do with the other uses of the 35 * field, but simplifies mechanics.) 36 * PROPAGATE: A releaseShared should be propagated to other 37 * nodes. This is set (for head node only) in 38 * doReleaseShared to ensure propagation 39 * continues, even if other operations have 40 * since intervened. 41 * 0: None of the above 42 * 43 * The values are arranged numerically to simplify use. 44 * Non-negative values mean that a node doesn't need to 45 * signal. So, most code doesn't need to check for particular 46 * values, just for sign. 47 * 48 * The field is initialized to 0 for normal sync nodes, and 49 * CONDITION for condition nodes. It is modified using CAS 50 * (or when possible, unconditional volatile writes). 51 */ 52 volatile int waitStatus; 53 65 volatile Node prev; 79 */ 80 volatile Node next; 81 82 /** 83 * The thread that enqueued this node. Initialized on 84 * construction and nulled out after use. 85 */ 86 volatile Thread thread; 87 98 Node nextWaiter;

    接下来,进入到AQS源码分析环节。

    概述中提到过下面这几个方法tryAcquire,tryRelease,tryAcquireShared,tryReleaseShared

    它们分别是独占锁的获取和释放,共享锁的获取和释放。

    这里,我们从独占锁的获取开始讲起。

    3.独占锁方法解析

    3.1 acquire 方法

     1     /**
     2      * Acquires in exclusive mode, ignoring interrupts.  Implemented
     3      * by invoking at least once {@link #tryAcquire},
     4      * returning on success.  Otherwise the thread is queued, possibly
     5      * repeatedly blocking and unblocking, invoking {@link
     6      * #tryAcquire} until success.  This method can be used
     7      * to implement method {@link Lock#lock}.
     8      *
     9      * @param arg the acquire argument.  This value is conveyed to
    10      *        {@link #tryAcquire} but is otherwise uninterpreted and
    11      *        can represent anything you like.
    12      */
    // 尝试获取锁,tryAcquire放回true,表示成功获取锁,就不会再往下走了。
    13 public final void acquire(int arg) {
    // 尝试获取锁失败,并且,acquireQueued成功,那么就会进入方法中,执行自我中断
    // 接下来,开始剥洋葱,方法逐个分析解惑
    14 if (!tryAcquire(arg) && 15 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 16 selfInterrupt(); 17 }

    3.2 tryAcquire 方法

     1     /**
     2      * Attempts to acquire in exclusive mode. This method should query
     3      * if the state of the object permits it to be acquired in the
     4      * exclusive mode, and if so to acquire it.
     5      *
     6      * <p>This method is always invoked by the thread performing
     7      * acquire.  If this method reports failure, the acquire method
     8      * may queue the thread, if it is not already queued, until it is
     9      * signalled by a release from some other thread. This can be used
    10      * to implement method {@link Lock#tryLock()}.
    11      *
    12      * <p>The default
    13      * implementation throws {@link UnsupportedOperationException}.
    14      *
    15      * @param arg the acquire argument. This value is always the one
    16      *        passed to an acquire method, or is the value saved on entry
    17      *        to a condition wait.  The value is otherwise uninterpreted
    18      *        and can represent anything you like.
    19      * @return {@code true} if successful. Upon success, this object has
    20      *         been acquired.
    21      * @throws IllegalMonitorStateException if acquiring would place this
    22      *         synchronizer in an illegal state. This exception must be
    23      *         thrown in a consistent fashion for synchronization to work
    24      *         correctly.
    25      * @throws UnsupportedOperationException if exclusive mode is not supported
    26      */
    // 哇咔咔,这么长注释,不过别着急。慢慢看。
    // 这个方法用来查询对象的state是否允许以独占方式来获取锁,如果可以,尝试获取。
    // 接下里大家会疑问,为什么这个方法里面只有throw这一行代码。因为,这个方法需要在子类继承的时候需要被重写,这个就是设计模式中的模板方法。
    // 同时,这个方法没有被做成abstract方法,因为,子类继承的时候为了自己的需求,只需要实现独占模式的方法或是共享模式的方法即可,不用都去实现。
    27 protected boolean tryAcquire(int arg) { 28 throw new UnsupportedOperationException(); 29 }

    3.3 acquireQueued方法

     1 /**
     2      * Acquires in exclusive uninterruptible mode for thread already in
     3      * queue. Used by condition wait methods as well as acquire.
     4      *
     5      * @param node the node
     6      * @param arg the acquire argument
     7      * @return {@code true} if interrupted while waiting
     8      */
    // 线程挂起后,被解锁,就是在这个方法里实现的
    // 方法返回结果:等待时,是否被中断
    9 final boolean acquireQueued(final Node node, int arg) {
    // failed true:表示没有拿到资源 false:表示成功拿到资源
    10 boolean failed = true; 11 try {
    // interrupted 是否被中断
    12 boolean interrupted = false;
    // 又一个自旋的使用哦
    13 for (;;) {
    // 获取前一个节点Node
    14 final Node p = node.predecessor(); 15 if (p == head && tryAcquire(arg)) {
    // 设置头结点,将原先的头结点与node节点的连接,断开
    16 setHead(node); 17 p.next = null; // help GC 18 failed = false; 19 return interrupted; 20 }
    // 获取锁失败,是否应该挂起当前线程
    // p 是前驱节点,node是当前线程节点
    21 if (shouldParkAfterFailedAcquire(p, node) &&
    // 获取锁失败,挂起线程的操作在下面方法里实施 22 parkAndCheckInterrupt()) 23 interrupted = true; 24 } 25 } finally { 26 if (failed) 27 cancelAcquire(node); 28 } 29 }

    3.3.1 shouldParkAfterFailedAcquire方法

     1 /**
     2      * Checks and updates status for a node that failed to acquire.
     3      * Returns true if thread should block. This is the main signal
     4      * control in all acquire loops.  Requires that pred == node.prev.
     5      *
     6      * @param pred node's predecessor holding status
     7      * @param node the node
     8      * @return {@code true} if thread should block
     9      */
    // 这个方法用途:获取锁失败,判断是否需要挂起线程
    10 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 11 int ws = pred.waitStatus;
    // 等待状态 SIGNAL表示前驱节点状态正常,当前线程需要挂起,直接返回true
    12 if (ws == Node.SIGNAL) 13 /* 14 * This node has already set status asking a release 15 * to signal it, so it can safely park. 16 */
    // 这个节点已经让请求release的状态来标识,所以它可以安全的park了
    17 return true; 18 if (ws > 0) { 19 /* 20 * Predecessor was cancelled. Skip over predecessors and 21 * indicate retry. 22 */
    // ws大于0的状态1,表示取消了排队。
    // 如果取消了排队,接着再去找前一个,前一个也被取消了,就找前一个
    的前一个,总会有一个没被取消的。
    23 do { 24 node.prev = pred = pred.prev; 25 } while (pred.waitStatus > 0); 26 pred.next = node; 27 } else { 28 /* 29 * waitStatus must be 0 or PROPAGATE. Indicate that we 30 * need a signal, but don't park yet. Caller will need to 31 * retry to make sure it cannot acquire before parking. 32 */
    // else的情况,就是waitStatus 为0,-2,-3
    // 用CAS将前驱节点状态设为-1(Node.SIGNAL) 33 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 34 } 35 return false; 36 }

    3.3.2 parkAndCheckInterrupt方法

    1 /**
    2      * Convenience method to park and then check if interrupted
    3      *
    4      * @return {@code true} if interrupted
    5      */
    // 在shouldParkAfterFailedAcquire返回true的时候,才会执行这个方法,这个方法的作用就是挂起线程
    6 private final boolean parkAndCheckInterrupt() {
    // 调用park方法,使线程挂起 (使用unpark方法来唤醒线程)
    7 LockSupport.park(this);
    // 查看线程是否被中断
    8 return Thread.interrupted(); 9 }

    3.4 addWaiter 方法

     1     /**
     2      * Creates and enqueues node for current thread and given mode.
     3      *
     4      * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     5      * @return the new node
     6      */
    // 将Node放进阻塞队列的末尾
    7 private Node addWaiter(Node mode) {
    // 生成一个node节点,保存当前的线程和占用模式(独占模式或是共享模式)
    8 Node node = new Node(Thread.currentThread(), mode); 9 // Try the fast path of enq; backup to full enq on failure 10 Node pred = tail;
    // 查询tail节点是否有值,有值代表的是此链表不为空
    11 if (pred != null) {
    // node相连接,很简单,就是改变指针指向,可以参考数据结构 链表
    12 node.prev = pred;
    // compareAndSetTail这个方法的操作就是比较赋值,经典的CAS方法,不明白的可以参照下面资源
    // http://www.cnblogs.com/lihao007/p/8654787.html
    13 if (compareAndSetTail(pred, node)) { 14 pred.next = node; 15 return node; 16 } 17 } 18 enq(node); 19 return node; 20 }
    
    

    3.4.1 eng()方法

     1 /**
     2      * Inserts node into queue, initializing if necessary. See picture above.
     3      * @param node the node to insert
     4      * @return node's predecessor
     5      */
    // 来到这个方法有两种可能。一是等待线程队列为空,二是其他线程更新了队列
    6 private Node enq(final Node node) {
    // 又是自旋方法(乐观锁),AQS源码中出现多次,因为需要线程的不安全
    7 for (;;) { 8 Node t = tail; 9 if (t == null) { // Must initialize 10 if (compareAndSetHead(new Node())) 11 tail = head; 12 } else {
    // 将node放在队列最后,有线程竞争的话,排不上重排
    13 node.prev = t; 14 if (compareAndSetTail(t, node)) { 15 t.next = node; 16 return t; 17 } 18 } 19 } 20 }

     独占模式获取锁,就分析到这里了。

    4.release独占模式 释放锁

     1 /**
     2      * Releases in exclusive mode.  Implemented by unblocking one or
     3      * more threads if {@link #tryRelease} returns true.
     4      * This method can be used to implement method {@link Lock#unlock}.
     5      *
     6      * @param arg the release argument.  This value is conveyed to
     7      *        {@link #tryRelease} but is otherwise uninterpreted and
     8      *        can represent anything you like.
     9      * @return the value returned from {@link #tryRelease}
    10      */
    // 独占模式下,释放锁
    11 public final boolean release(int arg) {
    // tryRelease方法是模板方法,留给子类定义。
    12 if (tryRelease(arg)) { 13 Node h = head;
    // 首先判断阻塞队列是否为空。接下来,判断waitStatus的状态,0的状态是初始化是被赋予的值。只有是非0的状态,才说明head节点后面是有其它节点的。
    14 if (h != null && h.waitStatus != 0) 15 unparkSuccessor(h); 16 return true; 17 } 18 return false; 19 }

    4.1 unparkSuccessor

     1 /**
     2      * Wakes up node's successor, if one exists.
     3      *
     4      * @param node the node
     5      */
     6     private void unparkSuccessor(Node node) {
     7         /*
     8          * If status is negative (i.e., possibly needing signal) try
     9          * to clear in anticipation of signalling.  It is OK if this
    10          * fails or if status is changed by waiting thread.
    11          */
    12         int ws = node.waitStatus;
    13         if (ws < 0)
    14             compareAndSetWaitStatus(node, ws, 0);
    15 
    16         /*
    17          * Thread to unpark is held in successor, which is normally
    18          * just the next node.  But if cancelled or apparently null,
    19          * traverse backwards from tail to find the actual
    20          * non-cancelled successor.
    21          */
    22         Node s = node.next;
    23         if (s == null || s.waitStatus > 0) {
    24             s = null;
    // 这里,是从队列末尾向前查找没有被取消的节点
    // 这里,我当时对为什么从后向前查找有疑问,后来看了文章明白了。地址:https://javadoop.com/post/AbstractQueuedSynchronizer#toc4
    25 for (Node t = tail; t != null && t != node; t = t.prev) 26 if (t.waitStatus <= 0) 27 s = t; 28 } 29 if (s != null) 30 LockSupport.unpark(s.thread); 31 }

     就写到这里了,仍需努力,以后有想法了,慢慢补充,写的不对的地方,欢迎指正,共同探讨。

     
  • 相关阅读:
    Java第二次作业
    JAVA学习计划
    学生选课系统
    抽奖
    Java第二次作业
    Java第六次作业修改版
    Java第六次作业
    JAVA第五次作业
    Java第四次作业
    JAVA第三次作业
  • 原文地址:https://www.cnblogs.com/lihao007/p/8647851.html
Copyright © 2011-2022 走看看