zoukankan      html  css  js  c++  java
  • AbstractQueuedSynchronizer原理分析

      AbstractQueuedSynchronized 以下简称AQS,是用来构建锁或者其他同步组件的基础框架。

      在AQS中,为锁的获取和释放提供了一些模板方法,而实现锁的类(AQS的子类)需要实现这些模板方法中的同步方法。

      这些方法包括:

      ·tryAcquire():尝试以独占模式获取锁

      ·tryRelease():尝试释放独占模式的锁

      ·tryAcquireShared():尝试以共享模式获取锁

      ·tryReleaseShared():尝试释放共享模式的锁

      ·isHeldExclusiverly():返回是否以独占模式获有锁

      在分析AQS的原理之前,我们先看看LockSupportLockCondition、AQS、ReentrantLok等等之间的关系和使用方式。

      关系图

                 

     

      使用方式有两种:一种是不带条件的普通的锁,另一种是带条件的锁。

      不带条件

         /*
             * ReentrantLock是Lock接口的实现类之一
             * 实现的是一种可重入的锁
             */
            Lock lock  = new ReentrantLock();
            lock.lock();
            try {
                //同步处理
            }finally {
                lock.unlock();
            }

      带条件的锁

    Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();//创建和该锁关联的条件锁
        
        public void conditionWait() throws InterruptedException{
            lock.lock();
            try {
                condition.await();
            }finally {
                lock.unlock();
            }
        }
        public void ConditionSignal() throws InterruptedException{
            lock.lock();
            try {
                condition.signal();
            }finally {
                lock.unlock();
            }
        }

      从使用方式可以看到,主要调用的方法就是:lock.lock()、lock.unlock()、lock.newCondition()、condition.await()、condition.signal() 

      

    下面分别来看看这几个方法

      lock.lock():由于Lock是一个接口,所以需要通过其子类来实例化,所以lock.lock()其实调用的是子类的lock()方法,在上面的例子中自然调用的就是ReentrantLock.lock()方法。

        public void lock() {
            sync.lock();
        }

       可以看到,ReentrantLock.lock()方法调用的是同步器内部类的lock()方法,而ReentrantLock的内部同步器类Sync又分为FairSync和NoFairSync。

      源码如下(省略了一部分内容):

       abstract static class Sync extends AbstractQueuedSynchronizer {
            
            abstract void lock();
       }
        static final class NonfairSync extends Sync {
         
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
        }
    
        static final class FairSync extends Sync {
         final void lock() {
                acquire(1);
            }
        }

      可以看到在非公平的获取锁的方法中,会首先尝试直接去获取锁,而不会通过同步队列去排队获取锁。否则的话,就通过AQS同步器的acquire(1)去获取锁。

      ※RetrantLock在初始化的时候可以通过参数指定是公平的还是不公平的锁。默认是非公平的锁。

      lock.unlock():调用的是同步器中的release(1)方法,也就是AQS中的release(1)方法。

        public void unlock() {
            sync.release(1);
        }

      lock.newCondition():该方法调用的也是内部同步器类中的newCondition()方法。

        public Condition newCondition() {
            return sync.newCondition();
        }

      在ReentrantLock的内部同步器类中的newCondition()方法如下:

            final ConditionObject newCondition() {
                return new ConditionObject();
            }

      可以看到,该方法直接返回了一个AQS中的内部类ConditonObject对象。所以,在每次生成一个条件锁的时候,都会创建一个ConditionObject对象,

      而每个ConditionObject对象都会在内部维护一个条件等待队列。

      ConditonObject实现分析点此参考

      conditon.await():通过调用LockSupport中的park()方法,将当前挂起。

      源码如下:

           public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }

      condition.signal():通过调用LockSupport的unpark()方法,来唤醒线程。

      源码如下:

            public final void signal() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    
            private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
      final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }

      了解完上面的概述之后,我们知道了,在不带条件的锁中,主要是通过调用AQS中的acquire(1)herelease(1)方法来获取锁和释放锁的。

    下面就来具体看看AQS中的实现

      在AQS中通过一个双向FIFO同步队列来维护获取对象锁的线程,当获取锁失败的时候,会用当前线程构建一个Node节点,加入到同步队列中去。

      在AQS中维护了同步队列的head和tail节点,和同步状态。。

      Node节点中维护了当前线程的status和前驱节点、后继节点、下一个等待节点(条件等待的时候用)。

      waitStatus包含如下状态默认值为0:

        CANCELLED = 1 : 表示当前线程在等待的时候已经超时了或者被取消了

        SIGNAL = -1  :当前线程释放了同步状态或者被取消的时候会通知后继节点,使后继节点得以运行

        CONDITINO = -2:节点在等待队列中,等待Condition,当其他线程在Condition上调用signal的时候,该线程会从等待队列转移到同步队列中去,加入到同步状态的获取。

        PROPAGATE = -3:表示下一次共享式同步状态会无条件的传播下去

      AQS中同步器的结构如下:

      

      当有节点加入到同步队列的时候,只需要对tail节点重新指向就可以了

       同步队列是一个FIFO的队列,在获取锁的时候总是首节点是获取同步状态的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点在获取同步状态成功时,会将自己设置为首节点。

     下面就来具体看看在获取锁的时候lock.lock()调用的同步器中的acquire(1)方法的具体实现

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

      以独占模式获取锁,并且不相应中断。在AQS中还有以下acquire方法:

        acquireInterruptibly(int arg) :以独占模式获取锁,并且响应中断

        acquireShared(int arg):以共享模式获取锁,并且不响应中断

        acquireSharedInterruptibly(int arg):以共享模式获取锁,并且响应中断

      该方法首先会调用tryAcquire(arg)来尝试获取锁。从源码可以看到,在AQS中该方法只是单纯的抛出一个UnsupportedOperationException异常,该方法需要实现AQS同步器的具体类中实现。

        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }

      我们看看ReentrantLock中的具体实现。

      非公平锁:

            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {  //如果当前的同步状态为0,就尝试直接设置同步状态和设置独占的线程为自己,来强制获取锁
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {  //如果当前同步状态不为0,就判断是不是自己获取了锁,这里是实现可重入的
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }

      公平锁:

            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {  
                    if (!hasQueuedPredecessors() &&  //当前state为0 ,并且同步队列中没有前继节点,就尝试设置自己为获得锁的线程
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {  //实现可重入
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }

       可以看到,不论是在公平锁还是非公平锁的tryAcquire中,当获取到锁的时候返回的都是true,否则返回false。

      所以,如果当前线程没有获取到锁的时候,则会继续执行后面的acquireQueued(addWaiter(Node.EXCLUSIVE),arg))

      我们先看看addWaiter(Node.EXCLUSIVE)的实现。

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // 首先为当前线程以指定模式构建一个Node节点,然后尝试快速入队的方式加入到同步队列中去
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);  //否则的话就调用enq(node)方法来加入到同步队列中去
            return node;
        }

      再看看enq(node)的源码实现

        private Node enq(final Node node) {
            for (;;) {  //通过一个无条件的循环,知道将构建一个空的head,然后将当前节点加入到空的head的后面,构成一个同步队列
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }

      再看看acquireQueued(node,int)的源码实现

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {  //判断前一个节点是不是head节点,如果是的话,则会再次尝试去获取锁
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&  //如果前一个节点不是head节点,则设置前一个非取消节点的状态是signal,以确保在前一个线程释放锁的时候能唤醒当前线程
                        parkAndCheckInterrupt())  //挂起当前线程,并且返回当前线程是否被中断过(会清空中断状态,只有在被唤醒的时候才能从park()方法返回)
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);  //如果被中断过,则把该节点从同步队列中删除
            }
        }
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
        private void cancelAcquire(Node node) {
            // Ignore if node doesn't exist
            if (node == null)
                return;
    
            node.thread = null;
    
            // Skip cancelled predecessors
            Node pred = node.prev;
            while (pred.waitStatus > 0)
                node.prev = pred = pred.prev;
    
            // predNext is the apparent node to unsplice. CASes below will
            // fail if not, in which case, we lost race vs another cancel
            // or signal, so no further action is necessary.
            Node predNext = pred.next;
    
            // Can use unconditional write instead of CAS here.
            // After this atomic step, other Nodes can skip past us.
            // Before, we are free of interference from other threads.
            node.waitStatus = Node.CANCELLED;
    
            // If we are the tail, remove ourselves.
            if (node == tail && compareAndSetTail(node, pred)) {
                compareAndSetNext(pred, predNext, null);
            } else {
                // If successor needs signal, try to set pred's next-link
                // so it will get one. Otherwise wake it up to propagate.
                int ws;
                if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                    Node next = node.next;
                    if (next != null && next.waitStatus <= 0)
                        compareAndSetNext(pred, predNext, next);
                } else {
                    unparkSuccessor(node);
                }
    
                node.next = node; // help GC
            }
        }

       acquire(1)的总结

       ①首先会去调用子类实现的具体tryAcquire(1)方法来获取锁,根据子类不同,实现的也不同,例如ReentrantLock中的实现就分为公平锁和非公平锁。非公平锁就是不会去同步队列中排队,

      而是直接去获取锁。如果获取失败的话,就会跟公平锁一样,进入FIFO的同步队列排队。

       ②加入同步队列的时候,首先会判断tail节点是否为空,如果不为空,则会尝试快速入队,如果为空的话,则会先创建一个空的head节点,然后在将当前线程的节点加入到同步队列中去

       ③加入到同步队列之后,会再次判断前一个节点是不是head节点,如果是的话,则会再次尝试去获取锁,如果获取失败的话,则会挂起当前线程。

       ④直到当前线程被唤醒的时候,会判断当前线程是否被中断过,如果被中断过,则会从同步队列中删除当前线程节点。并且中断当前线程。

      下面在看看lock.unlock()方法调用的同步器中的sync.release(1)方法的具体实现

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }

      首先会调用tryRelease(arg)方法来释放锁。然后唤醒后面的线程。

        protected boolean tryRelease(int arg) {
            throw new UnsupportedOperationException();
        }

      tryRelease()同样也是子类需要实现的方法。ReentrantLock中的实现如下:

            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }

      

       private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }

      

      release(1)的总结

      ①首先会判断当前线程是不是独占的拥有锁。如果是的话,则会去释放锁。如果当前线程都已经退出了获取锁(可重入的原因),则会设置当前线程的state为0,独占线程为null

      ②在释放锁之后,会唤醒下一个等待中的线程。

      

      AQS中条件队列的实现方式参考ConditonObject实现分析点此参考

  • 相关阅读:
    window.parent 、window.top及window.self 详解
    js中的变量提升和函数提升
    IE不支持ES6语法的解决方案——Babel
    JavaScript 文件拖拽上传插件 dropzone.js 介绍
    C# DataTable 增加行与列
    group by 与 order by 一起使用的时候
    window.open传递多个参数
    Jquery EasyUI tree 的异步加载(遍历指定文件夹,根据文件夹内的文件生成tree)
    ASP.NET中调用百度地图API
    C# 读取Excel中的数据到DataTable中
  • 原文地址:https://www.cnblogs.com/zerotomax/p/8967024.html
Copyright © 2011-2022 走看看