zoukankan      html  css  js  c++  java
  • Java并发基础框架AbstractQueuedSynchronizer初探(ReentrantLock的实现分析)

    AbstractQueuedSynchronizer是实现Java并发类库的一个基础框架,Java中的各种锁(RenentrantLock, ReentrantReadWriteLock)以及同步工具类(Semaphore, CountDownLatch)等很多都是基于AbstractQueuedSynchronizer实现的。AbstractQueuedSynchronizer 一般简称AQS,Abstract表示他是一个抽象类,Queued表示他是基于先进先出 FIFO 等待队列实现的,Synchronizer表示他是一个同步器。

    基于队列的意思是,我们用锁来说明,比如多个线程想要获得同一个对象上的锁,那么这些线程会按照申请锁的先后顺序在该锁对象中的一个FIFO队列上排队等待(也就是将这些线程对象的引用插入到该锁的队列中)。AQS是Java并发的基础框架,同时AOS的实现的基础却是 sun.misc.Unsafe 和 volatile,当然还有LockSupport工具类,LockSupport也是借助于Unsafe,主要实现线程的“阻塞”(park)和线程的“唤醒阻塞”(unpark)。基本原理是 sun.misc.Unsafe 保证了内存操作的“原子性”,而volatile保证了内存“可见性”。Unsafe的源码可以参见:http://www.docjar.com/html/api/sun/misc/Unsafe.java.html ,它提供了各种原子性的内存CAS操作。

    本文从ReentrantLock的实现来初步探索AbstractQueuedSynchronizer。为了好把握方向,我们将ReentrantLock的源码(Java1.8.0_40)简化如下:

    public class ReentrantLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = 7373984872572414699L;
        /** Synchronizer providing all implementation mechanics */
        private final Sync sync;
    
        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = -5179523762034025860L;
    
            abstract void lock();
    
            final boolean nonfairTryAcquire(int acquires) {
                // ... ...
            }
    
            protected final boolean tryRelease(int releases) {
               // ... ...
            }
            // ... ...
        }
    
        /**
         * Sync object for non-fair locks
         */
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    
        /**
         * Sync object for fair locks
         */
        static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            final void lock() {
                acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
               // ... ...
            }
        }
    
        public ReentrantLock() {
            sync = new NonfairSync();
        }
    
        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
        public void lock() {
            sync.lock();
        }
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
        public boolean tryLock() {
            return sync.nonfairTryAcquire(1);
        }
        public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        }
        public void unlock() {
            sync.release(1);
        }
        public Condition newCondition() {
            return sync.newCondition();
        }
    // ... ... }

    可以明显的看到,ReentrantLock 实现的所以接口都是借助于他的实例属性——同步器sync来实现的,从构造函数可以看出,ReentrantLock默认是非公平锁——使用非公平同步器NonfairSync,传入true时得到的是公平锁——使用公平同步器FairSync。而这两者都是继承于抽象类Sync,而抽象类Sync又继承于我们的AbstractQueuedSynchronizer。我们先整体看下AQS的实现代码:

    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
        static final class Node {   
            volatile int waitStatus;
            volatile Node prev;
            volatile Node next;
            volatile Thread thread;
            Node nextWaiter;
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
        /**
         * Head of the wait queue, lazily initialized. 
         */
        private transient volatile Node head;
        /**
         * Tail of the wait queue, lazily initialized.  
         */
        private transient volatile Node tail;
        /**
         * The synchronization state.
         */
        private volatile int state;
        /**
         * Inserts node into queue, initializing if necessary. See picture above.
         * @param node the node to insert
         * @return node's predecessor
         */
        private Node enq(final Node node) {
           // ... ...
        }
    
        /**
         * Creates and enqueues node for current thread and given mode.
         * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
         * @return the new node
         */
        private Node addWaiter(Node mode) {
            // ... ...
        }
    
        /**
         * Sets head of queue to be node, thus dequeuing. Called only by
         * acquire methods.  Also nulls out unused fields for sake of GC
         * and to suppress unnecessary signals and traversals.
         * @param node the node
         */
        private void setHead(Node node) {
            // ... ...
        }
    public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; // ... ... } }

    AbstractQueuedSynchronizer的实现包含了两个内部类,Node 类和 ConditionObject类,而后者只有在使用 ReentrantLock.newCondition()时才会用到,暂时不去管它。Node类主要作为FIFO队列上的节点,存储在锁上等待的所有线程对象的信息。提供了enq(final Node node)方法用于插入队列尾部,addWaiter(Node mode)方法用于加入FIFO队列,setHead(Node node)用于初始化FIFO队列的头部。所以AbstractQueuedSynchronizer没有我们想象的那么复杂,它主要是用于实现一个FIFO的等待队列(我们暂时放下ConditionObject不管),以及管理同步器的状态status

    我们在看一下他继承的父类:

    public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
        protected AbstractOwnableSynchronizer() { }
        /**
         * The current owner of exclusive mode synchronization.
         */
        private transient Thread exclusiveOwnerThread;
    
        protected final void setExclusiveOwnerThread(Thread thread) {
            exclusiveOwnerThread = thread;
        }
    
        protected final Thread getExclusiveOwnerThread() {
            return exclusiveOwnerThread;
        }
    }

    很简单,就是实现了互斥同步器的所有者的功能,比如互斥锁正被哪个线程占有者。

    我们大体了解了AbstractQueuedSynchronizer之后,我们再从细节上仔细分析ReentrantLock的实现。

    1)ReentrantLock.lock实现分析

    ReentrantLock分为公平和非公平的锁,NonfairSync 和 FairSync的lock实现分别如下:

        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    
        static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            final void lock() {
                acquire(1);
            }
    
            /**
             * Fair version of tryAcquire.  Don't grant access unless
             * recursive call or no waiters or is first.
             */
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }

    NonfairSync.lock 和 FairSync.lock实现差别只有两行代码:

    if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());

    就是这两行代码使得了 NonfairSync.lock 的锁的实现是非公平的,这两行代码的意思是:如果sync同步器的状态为0,也就是锁没有被占,那么就设置为1,也就是立刻获得锁,并且设置锁的拥有者。也就是说非公平锁,可以 不进入等待队列而直接获取锁,并且不管是否在他的前面已经有其它线程在等待着获取该锁,这就是“不公平”锁的原因之一。原因之二是它们的调用 acquire(1); 都是在 AQS 中,都分别调用了子类中的tryAcquire,而NonfairSync.tryAcquire 和 FairSync.tryAcquire实现又不同:

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    这里含义是:tryAcquire(arg)尝试去获得锁,并且调用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),将该申请锁的线程插入FIFO等待队列。而NonfairSync.tryAcquire的实现如下:

            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
            /**
             * Performs non-fair tryLock.  tryAcquire is implemented in
             * subclasses, but both need nonfair try for trylock method.
             */
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }

    而FairSync.tryAcquire的实现如下:

            /**
             * Fair version of tryAcquire.  Don't grant access unless
             * recursive call or no waiters or is first.
             */
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }

    可以明显看到公平锁的实现:

                if (c == 0) {
                    if (!hasQueuedPredecessors()

    即使 c==0 ,也就是锁没有被占有,它也要调用hasQueuedPredecessors()去判断是否在自己前面已经有线程在等待队列上了,所以这里就是实现了FIFO的公平,先到的先获得锁。所以公平锁和非公平锁的实现在上面的两个对方是有区别的。

    分析完了锁的公平和非公平的原因,我们再接着上面看如何实现加入FIFO队列,以及如何实现等待:

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    tryAcquire(arg)刚才分析完了,我们再看addWaiter(Node.EXCLUSIVE)

        /**
         * Creates and enqueues node for current thread and given mode.
         *
         * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
         * @return the new node
         */
        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
        /**
         * Inserts node into queue, initializing if necessary. See picture above.
         * @param node the node to insert
         * @return node's predecessor
         */
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }

    很简单,就是构造一个Node节点,然后插入到等待队列的尾部。

    再看acquireQueued(addWaiter(Node.EXCLUSIVE), arg):

        /**
         * Acquires in exclusive uninterruptible mode for thread already in
         * queue. Used by condition wait methods as well as acquire.
         *
         * @param node the node
         * @param arg the acquire argument
         * @return {@code true} if interrupted while waiting
         */
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    这里就实现了在锁上的“阻塞”的功能。在一个死循环中,先判断Node是否是等待队列的头节点,如果是的话,然后调用tryAcquire(arg)去获得锁,然后就可以返回了,也就是获得锁成功了。如果Node不是头节点的话,线程就要被阻塞了:

        /**
         * Checks and updates status for a node that failed to acquire.
         * Returns true if thread should block. This is the main signal
         * control in all acquire loops.  Requires that pred == node.prev.
         *
         * @param pred node's predecessor holding status
         * @param node the node
         * @return {@code true} if thread should block
         */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }

    该函数的功能是将Node的前驱节点的等待状态pred.waitStatus设置为SIGNAL。这样设置的原因是方便实现Node节点的“唤醒阻塞”(unpark)。设置成功之后调用:parkAndCheckInterrupt(); 开始被“阻塞”:

        /**
         * Convenience method to park and then check if interrupted
         *
         * @return {@code true} if interrupted
         */
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }

    阻塞的实现利用了LockSupport类,而LockSupport类又使用了Unsafe:

        public static void park(Object blocker) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, 0L);
            setBlocker(t, null);
        }

    setBlocker(t, blocker) 设置了当前线程被谁阻塞了。UNSAFE.park(false, 0L);实现阻塞:

     /**
        * Block current thread, returning when a balancing
        * <tt>unpark</tt> occurs, or a balancing <tt>unpark</tt> has
        * already occurred, or the thread is interrupted, or, if not
        * absolute and time is not zero, the given time nanoseconds have
        * elapsed, or if absolute, the given deadline in milliseconds
        * since Epoch has passed, or spuriously (i.e., returning for no
        * "reason"). Note: This operation is in the Unsafe class only
        * because <tt>unpark</tt> is, so it would be strange to place it
        * elsewhere.
        */
     public native void park(boolean isAbsolute, long time);

    park方法可以被 unpark 唤醒,超时也会被唤醒,中断也会被唤醒。

    park方法被唤醒了之后,就会在上面那个死循环中,再次检查自己是否是 头结点:

                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                        interrupted = true;
                }

    如果是头结点的话, 那么重新调用tryAcquire(arg)去获得锁,然后返回,表示获得锁成功了。到这里 ReentrantLock.lock()方法的实现算是分析完了。

    2)ReentrantLock.unlock实现分析

        /**
         * Attempts to release this lock.
         *
         * <p>If the current thread is the holder of this lock then the hold
         * count is decremented.  If the hold count is now zero then the lock
         * is released.  If the current thread is not the holder of this
         * lock then {@link IllegalMonitorStateException} is thrown.
         *
         * @throws IllegalMonitorStateException if the current thread does not
         *         hold this lock
         */
        public void unlock() {
            sync.release(1);
        }
        /**
         * Releases in exclusive mode.  Implemented by unblocking one or
         * more threads if {@link #tryRelease} returns true.
         * This method can be used to implement method {@link Lock#unlock}.
         *
         * @param arg the release argument.  This value is conveyed to
         *        {@link #tryRelease} but is otherwise uninterpreted and
         *        can represent anything you like.
         * @return the value returned from {@link #tryRelease}
         */
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }

    再看 tryRelease:

            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }

    很简单,就是修改 sync 的属性status。如果stauts等于0了,就表示锁已经被释放了。于是就可以唤醒FIFO队列的头节点了,unparkSuccessor(head):

        /**
         * Wakes up node's successor, if one exists.
         *
         * @param node the node
         */
        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }

    这里 t.waitStatus <= 0 小于0的包括了 我们在调用shouldParkAfterFailedAcquire时 设置waitStatus 为SIGNAL,因为SIGNAL==-1,所以这里的LockSupport.unpark(s.thread)刚好唤醒了前面的 park().

    所以lock() 和 unlock()方法也对应起来了。到这里ReentrantLock的lock和unlock方法分析完成。ReentrantLock的实现借助于AQS,而AQS有借助于LockSupport和Unsafe,以及volatile。ReentrantLock使用state表示锁被同一个线程获取了多少次,并且记录了锁的拥有者(线程)。可重入锁的可重入的原因就是因为记录了锁的拥有者和记录锁被获取的次数来实现的。另外锁的公平性的实现就是是否允许锁申请的插队。

    Semaphore, CountDownLatch的实现相比ReentrantLock而言更加简单,实现方式也是大体相似的。

    其实查看一些JDK关于并发的库,就可以知道:Java并发库的构建的基础基本就两个——Unsafe和volatile,前者保证“原子性”,后者保证“可见性”

  • 相关阅读:
    【可视化】指标块分析
    【可视化】可视化概况(一)
    webpack 打包编译优化之路
    Akka源码分析-Akka-Streams-概念入门
    Akka源码分析-Cluster-DistributedData
    Akka源码分析-Cluster-Sharding
    Akka源码分析-Cluster-Metrics
    Akka源码分析-Cluster-Distributed Publish Subscribe in Cluster
    Akka源码分析-Cluster-ClusterClient
    Akka源码分析-Cluster-Singleton
  • 原文地址:https://www.cnblogs.com/digdeep/p/4445128.html
Copyright © 2011-2022 走看看