zoukankan      html  css  js  c++  java
  • Java多线程之JUC包:AbstractQueuedSynchronizer(AQS)源码学习笔记

    若有不正之处请多多谅解,并欢迎批评指正。

    请尊重作者劳动成果,转载请标明原文链接:

    http://www.cnblogs.com/go2sea/p/5618628.html 

    AbstractQueuedSynchronizer(AQS)是一个同步器框架,在实现锁的时候,一般会实现一个继承自AQS的内部类sync,作为我们的自定义同步器。AQS内部维护了一个state成员和一个队列。其中state标识了共享资源的状态,队列则记录了等待资源的线程。以下这五个方法,在AQS中实现为直接抛出异常,这是我们自定义同步器需要重写的方法:

    ①isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

    ②tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败返回false。

    ③tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败返回false。

    ④tryAcquireShared(int):共享方式。尝试获取资源。成功返回true,失败返回false。

    ⑤tryReleaseShared(int):共享方式。尝试释放资源,成功则返回true,失败返回false。

    其中isHeldExclusively需要在使用Condition时重写,他在AQS中的调用全部发生在其内部类ConditionObject的方法中。②③和④⑤分别对应了AQS定义的两种资源共享的方式:Exclusive&share,例如ReentrantLock就是一种独占锁,CountDownLatch和Semaphore是共享锁。与CountDownLatch有一定相似性的CyclicBarrier并没有自己的共享同步器,而是使用Lock和Condition来实现的(关于CyclicBarrier的详解可以参考本人的另一篇博文http://www.cnblogs.com/go2sea/p/5615531.html)。

    下面是一个简单的独占锁的实现,它是不可重入的。它重写了AQS的tryAcquire方法和tryRelease方法:

     1 import java.io.Serializable;
     2 import java.util.concurrent.TimeUnit;
     3 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
     4 import java.util.concurrent.locks.Condition;
     5 import java.util.concurrent.locks.Lock;
     6 
     7 class Mutex implements Lock, Serializable {
     8     //自定义同步器,继承自AQS
     9    private static class Sync extends AbstractQueuedSynchronizer {
    10      //试图获取锁,当state为0时能成功获取,
    11      public boolean tryAcquire(int acquires) {
    12        assert acquires == 1; //这是一个对于state进行操作的量,含义自定义
    13        if (compareAndSetState(0, 1)) {    //注意:这是一个原子操作
    14          setExclusiveOwnerThread(Thread.currentThread());
    15          return true;
    16        }
    17        return false;
    18      }
    19      //释放锁,此时state应为1,Mutex处于被独占状态
    20      protected boolean tryRelease(int releases) {
    21        assert releases == 1; // Otherwise unused
    22        if (getState() == 0) throw new IllegalMonitorStateException();
    23        setExclusiveOwnerThread(null);
    24        setState(0);
    25        return true;
    26      }
    27      //返回一个Condition
    28      Condition newCondition() { return new ConditionObject(); }
    29    }
    30    
    31    private final Sync sync = new Sync();
    32    
    33    public void lock()                { sync.acquire(1); }
    34    public boolean tryLock()          { return sync.tryAcquire(1); }
    35    public void unlock()              { sync.release(1); }
    36    public Condition newCondition()   { return sync.newCondition(); }
    37    public void lockInterruptibly() throws InterruptedException {
    38      sync.acquireInterruptibly(1);
    39    }
    40    public boolean tryLock(long timeout, TimeUnit unit)
    41        throws InterruptedException {
    42      return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    43    }
    44  }

    我们可以看到,利用AQS实现一个简单的自定义锁看上去并不复杂,让我们以此为例,来学习一下AQS的内部原理吧。

    一、acquire 获取锁

    我们先来看一下Mutex重写的tryAcquire方法:

      //试图获取锁,当state为0时能成功获取,
         public boolean tryAcquire(int acquires) {
           assert acquires == 1; //这是一个对于state进行操作的量,含义自定义
           if (compareAndSetState(0, 1)) {    //注意:这是一个原子操作
             setExclusiveOwnerThread(Thread.currentThread());
             return true;
           }
           return false;
         }

    注意:当我们初始化一个Sync的时候,如果没有指定state的初值(无参数),那么state的默认初值是0。可以看到,方法开头首先有一个断言acquires==1,参数acquires代表要在state上做的改变的量(减去或增加),在Mutex中,我们定义state只有两个状态:0或1,0代表共享资源可以被获取,1表示共享资源正在被占用,因此Mutex是不可重入的。实际上,自定义同步器通过重写tryAcquire和tryRelease来定义state代表的意义和资源的共享方式,这是同步器的主要任务。Mutex的tryAcquire使用一个原子操作compareAndSetState来试图获取资源,这个原子操作由上层的AQS提供,如果成功,将当前线程设置为独占线程并返回true。

    Mutex的lock方法调用了sync的acquire方法,acquire方法实现为:

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    它首先调用tryAquire去获取共享资源,如果失败,调用addWaiter将当前线程放入等待队列,返回持有当前线程的Node对象,然后调用acquireQueued方法来监视等待队列并获取资源。acquireQueued方法会阻塞,直到成功获取。注意,acquire方法不能及时响应中断,只能在成功获取锁之后,再来处理。中断当前线程的操作跑出的异常在acquireQueued方法中被捕获,外部调用者没能看到这个异常,因此调用selfInterrupt来重置中断标识。

    我们需要详细了解addWaiter方法和acquireQueued方法,之后再来回顾acquire的过程,才能对整个获取锁的流程有比较详细的了解。

    我们先来看addWaiter方法:

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }

    addWaiter首先将当前线程包装在一个Node对象node中,然后获取了一下队列的尾节点,如果队列不为空(tail不为null)的话,调用一个CAS函数试图将node放入等待队列的尾部,注意,此时可能发生竞争,如果有另外一个线程在两个if之间抢先更新的队列的尾节点,CAS操作将会失败,这时会调用enq方法,继续试图将node放入队列:

        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }

    enq方法会循环检测队列,如果队列为空,则调用CAS函数初始化队列(此时node==head==tail),否则调用CAS函数将node放入队列尾。注意,这两个CAS是由AQS提供的原子操作。如果CAS失败,enq会继续循环检测,直到成功将node入列。enq方法的这种方式有一个专用的名词:CAS自旋,这种方式在AQS中有多处应用。这里有一个隐含的知识点,即tail是一个volatile成员,确保某个线程更新队列后对其他线程的可见性。

    注意:队列为空的时候,第一个线程进入队列的情况有点tricky:第一个发现队列为空并初始化队列(head节点)的线程不一定优先拿到资源。head节点被初始化后,当前线程需要下一次旋转才有机会进入队列,在这期间,完全有可能半路杀出程咬金,将当前线程与它初始化出的head节点无情分开。我们来总结一下,当队列只有一个节点时(head=tail),有两种情况:第一种是这个队列刚刚被初始化,head并没有持有任何线程对象。这个状态不会持续太久,初始化队列的线程有很大机会在下次自旋时把自己接到队尾。第二种情况是,所有等待线程都已经获得资源并继续执行下去了,队列仅有的节点是最后一个获取共享资源的线程,等到下一个线程到达等待队列并将它踢出队列之后,它才有机会被回收。

    enq执行完毕,我们已经成功把当前线程放入等待队列,接下来的任务就是监视队列,等待获取资源。这个过程由acquireQueued方法实现:

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    acquireQueued方法是一个很重要的方法,在分析这个方法之前,我们先来说一下AQS中的那个等待队列。这个队列实际上是一个CLH队列,它保证了竞争资源的线程按到达顺序来获取资源,避免了饥饿的发生。CLH队列的工作过程,就是acquireQueued方法的工作过程。很明显,这又是一个自旋。首先,我们调用predecessor方法获取当前线程的前驱节点,如果这个前驱是head节点,就紧接着调用tryAcquire去获取共享资源,当然这是有可能失败的,因为head节点可能刚刚“上位”,还没有释放资源。如果很幸运,我们拿到了资源,就调用setHead将node设置为队列的头结点,setHead方法同时会将node的prev置为null,紧接着将原先head的next也置为null,显然这是为了让其后续被回收。注意:acquireQueued方法在自旋过程中是不可被中断的,当然它会检测到中断(在parkAndCheckInterrupt方法中检测中断标志),但并不会因此结束自旋,只能在获得资源退出方法后,反馈给上层的方法:我刚刚被中断了。还记得acquire方法中的selfInterrupt的调用吗,就是为了“补上”这里没有响应的中断。

    好,我们继续往下。获取资源失败后(原因有二,head与我之间还有等待线程或者head节点的线程正在使用资源),调用shouldParkAfterFailedAcquire方法检测是否该去“休息”下,毕竟一直自旋很累嘛。如果可以休息就调用parkAndCheckInterrupt放心去休息。我们先来看一下shuldParkAfterFailedAcquire:

        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }

    我们首先了解一下waitStatus。Node对象维护了一个int成员waitStatus,他的可能取值如下:

    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    下面解释一下每个值的含义

    CANCELLED:因为超时或者中断,结点会被设置为取消状态,被取消状态的结点不应该去竞争锁,只能保持取消状态不变,不能转换为其他状态。处于这种状态的结点会被踢出队列,被GC回收;
    SIGNAL:表示这个结点的继任结点被阻塞了,到时需要通知它;
    CONDITION:表示这个结点在条件队列中,因为等待某个条件而被阻塞;
    PROPAGATE:使用在共享模式头结点有可能处于这种状态,表示锁的下一次获取可以无条件传播;
    0:None of the above,新结点会处于这种状态。

    在我们的Mutex的例子中,节点的waitStatus只可能有CANCELLED、SIGNAL和0三中状态(事实上,独占模式下所有不使用Condition的同步器都是这样)。

    我们继续来分析shouldParkAfterFailedAcquire方法:

    首先检测下node的前驱节点pred,如果pred状态已经被置为SIGNAL,直接返回true。否则,从node的前驱继续往前找,直到找到一个waitStatus小于等于0的节点,设置该点为node的前驱(注意:此时node与这个节点之间的节点从等待队列中被“摘下”,等待被回收了)并返回false。返回之后,上层的acquireQueued方法继续自旋,再次进入shouldParkAfterFailedAcquire方法之后,如果发现node前驱不是取消状态且waitStatus不等于SIGNAL,调用CAS函数进行注册。注意:这个操作可能失败,因此不能直接返回true,而是返回false由上层的自旋再次调用shouldParkAfterFailedAcquire直到确认注册成功。

    历尽曲折,我们终于可以安心休息了:

        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }

    parkAndCheckInterrupt方法十分简单,他调用LockSupport的静态方法park阻塞当前线程,直到被中断,这次中断会被acquireQueued记录,但不会立即响应,直到自旋完成。注意:返回操作中的interrupted方法会将中断标志复位,因此我们在上层需要将这个中断“补上”,再一次:还记得大明湖边的selfInterrupt吗?

    二、release 释放锁

    我们先来看一下Mutex中重写的tryRelease方法:

         //释放锁,此时state应为1,Mutex处于被独占状态
         protected boolean tryRelease(int releases) {
           assert releases == 1; // Otherwise unused
           if (getState() == 0) throw new IllegalMonitorStateException();
           setExclusiveOwnerThread(null);
           setState(0);
           return true;
         }

    逻辑比较简单,首先将独占线程置为null,紧接着将state设置为0,这里不会发生资源竞争,因此不需要用CAS去设置state值,直接置0即可。

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }

    好,我们开始分析release方法。首先调用tryRelease试图释放共享资源,紧接着检测自己的waitStatus是否为SIGNAL,如果是的话,调用unparkSuccessor唤醒队列中的下一个线程。独占模式下,waitStatus!=0与waitStatus==-1等价(这里waitStatus不会为CANCELLED,因为已经获取资源了)。如果不为SIGNAL,说明如果有下个等待线程,它正在自旋。所以直接返回true即可。我们来看下unparkSuccessor方法:

        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }

    unparkSuccessor方法也会在共享模式的工作流程中被调用,因此方法开始做的判断是有必要的。对于独占模式而言,ws应该都是0。然后找到下一个需要被唤醒的线程并调用LockSupport的静态方法unpark唤醒等待线程。

    至此,我们比较详细地了解了acquire&release的工作流程。

    三、acquireShared 获取锁

    下面,我们来学习下共享模式下的获取&释放锁的工作流程。

        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }

    acquireShared方法首先调用tryAcquireShared试图获取共享资源。tryAcquireShared的返回值表示剩余资源个数,负值表示获取失败,0表示获取成功但已无剩余资源。如果获取失败,调用doAcquireShared方法完成独占模式下类似的操作,后面我们会详细分析。注意,doAcquireShared方法在等待资源的过程中也是不响应中断的,它能觉察到中断,但在成功获取资源之前不会处理。

        private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    doAcquireShared方法与acquireQueued方法相似,不同的地方在于,共享模式下成功获取资源并将head指向自己之后,要检查并试图唤醒之后的等待线程。因为共享资源可能剩余,可以被后面的等待线程获取。

        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }

    setHeadAndPropagate中有一个长长的if,来判断是否应该去试图唤醒后面的线程。其中h==null的判断笔者始终不能理解,因为查看代码发现,之后队列尚未初始化的时候为空,后续都不可能为空了。关于这点希望各位看官不吝指教。其他情况,propagate大于0,表示尚有资源可被获取,显然应该继续判断;而当h.waitStatus小于0时,它有两种取值可能,SIGNAL和PROPAGATE,我们将在后面看到,这两种情况都是应该继续判断。后续是对node的后继进行的判断,注意,node此时可能已经不是head节点了,因为这是共享模式,所以可能有一个node的后继成功获取资源后,把自己设为head,将node踢出了队列。这种情况下node的后继s是可能为null的,但貌似这种情况doReleaseShared的调用没有意义。s.isShared的判断主要是考虑到读写锁的情况,在读写锁的使用过程中,申请写锁(独占模式)和申请读锁(共享模式)的线程可能同时存在,这个判断发现后即线程是共享模式的时候,调用soReleaseShared方法唤醒他。

    但总之,我们十分保守谨慎地调用了doReleaseShared方法试图唤醒后继线程:

        private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }

    又是一个自旋。我们首先获取head节点h,然后检查它的waitStatus是否为SIGNAL,如果是的话,调用CAS将h的waitStatus设置为0,并调用unparkSuccessor唤醒下一个等待线程。注意,这里调用CAS方法而不是直接赋值,是因为在共享模式下,这里可能发生竞争。doReleaseShared方法可能由head节点在使用完共享资源后主动调用(后续在releaseShared方法中可以看到),也可能由刚刚“上位”的等待线程调用,在上位之后,原来的head线程已被踢出队列。

    因此,doReleaseShared方法的执行情况变得比较复杂,需要细致分析。

    第一种情况,只有刚刚释放资源的head线程调用,这时候没有竞争,waitStatus是SIGNAL,就去唤醒下个线程,是0,就重置为PROPAGATE。

    第二种情况,刚刚释放完资源的旧head,和刚刚上位的新head同时调用doReleaseShared方法,这时候最新的head获取的都是自己,若干被踢出的旧head获取的可能是旧head,也可能是新head,这些被踢出的旧head线程也在根据自己获取的head(不管新旧)的状态进行CAS操作和unparkSuccessor操作,幸运的是(必须幸运啊。。),这些操作不会造成错误,只是多了一些唤醒而已(这些唤醒可能导致一个线程获得资源,也可能是一个“虚晃”)。

    我们可以发现,不管head引用怎样更迭,最终新head的waitStatus都会被顺利处理。注意,可能有多个旧head同时参与这个过程,都不影响正确性。

    我们注意到,一个新head,在他刚上位的时候有机会调用一次setHeadAndPropagate进而调用doReleaseShared,在他释放资源之后,又一次调用doReleaseShared(这次是必然的)。第一次调用时,不管新head的waitStatus是0还是SIGNAL,最终状态都被PROPAGATE(当然,被踢出队列的head可能还没来得及设置成PROPAGATE,但新上位的head最终会被设置),这也符合PROPAGATE的语义:使用在共享模式头结点有可能处于这种状态,表示锁的下一次获取可以无条件传播。

    还有一个问题,它是由SIGNAL-->0-->PROPAGATE变化而来的,为什么不是SIGNAL-->PROPAGA这样直接变化呢?原因是unparkSuccessor方法会试图将当前node的waitStatus复位成0,如果我们直接SIGNAL-->PROPAGA后,那么又被复位成0,还需要一次CAS操作置为PROPAGATE。

    四、releaseShared 释放锁

        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }

    我们可以看到,调用tryReleaseShared成功释放共享资源之后,最终要再次调用doReleaseShared试图唤醒后面的等待线程。

    五、ConditionObject

    关于Condition的内容请看笔者的另一篇博文Condition源码学习笔记

    至此,我们对独占模式和共享模式下、不响应中断的、没有等待时间参数的获取资源和释放资源的流程有了初步了解。这时去看JUC包中的锁的源码,相信会有更深的理解。


    作者:开方乘十

    出处:http://www.cnblogs.com/go2sea/

    本文版权归作者开方乘十和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接,否则保留追究法律责任的权利。

    如有不正之处,欢迎邮件(hailong.ma@qq.com)指正,谢谢。

  • 相关阅读:
    css文本在标签<text>内平均分布
    ES6实现去重,排序,加升序
    uni-app项目打包成小程序
    uni-app项目( uniapp滚动监听元素)
    运行vue项目:Module build failed: Error: Cannot find module 'node-sass'报错问题
    笨方法实现数量的输入与加一减一 、以及对边界值的判断禁用
    基于nuxt的前端商城pc端项目(bug记录)
    基于nuxt的商城项目pc端项目记录
    Vue学习笔记整理-长期更新
    程序员,不要创业!
  • 原文地址:https://www.cnblogs.com/go2sea/p/5618628.html
Copyright © 2011-2022 走看看