zoukankan      html  css  js  c++  java
  • ReentrantLock原理

    内部结构、工作过程、源码

    基本用法

    class Job{
        private ReentrantLock lock = new ReentrantLock();
        // 共享变量
        private String data = null;
    
        /**
         * 临界方法
         */
        public void deal(){
            // 加锁
            lock.lock();
            try {
                // TODO
                data = "";
            } finally {
                // 释放
                lock.unlock();
            }
        }
    }

    进入lock方法,调用的是sync的lock方法

    public void lock() {
        sync.lock();
    }

    那么这个sync是什么呢?它是ReentrantLock的静态内部类,了解一下其继承结构

    先从ReentrantLock里面的Sync及其子类开始

    先简单说一下这三个类都实现了什么:

    1. Sync提供了抽象方法lock,以供公平和非公平模式的子类实现。

    2. 公平和非公平模式的子类需要实现AQS类里的tryAcquire方法,即尝试获取锁的具体实现。

    3. 看其中的源码,我们还可以看到Sync类里还有个实现好的nonfairTryAcquire方法,那么问题来了,既然子类会完成tryAcquire方法的实现,为何还要多此一举?先看我画好的方法调用过程:

    对于非公平锁

    对于ReentrantLock里面的tryLock

     对于公平锁

    看完图,应该知道为什么在Sync里面实现nonfairTryAcquire了吧?

    无论公平或者非公平模式,ReentrantLock的tryLock方法都会调用同一个方法nonfairTryAcquire,所以把这个方法放在父类Sync。也就能理解了作者的注释:

    /**
    * Performs non-fair tryLock. tryAcquire is implemented in
    * subclasses, but both need nonfair try for trylock method.
    */

    类似的,两种模式释放锁的逻辑也是一样的,所以把tryRelease也放在了父类中。说白了就是代码复用

    现在看下这三个类的源码

    【Sync】

    /**
     * 锁同步控制的基础。并在这个基础上实现了公平和非公平版本。
     * 使用AQS的状态表示持有锁的数量。
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
    
        /**
         * 子类化的主要原因是允许非公平版本的快速途径。
         */
        abstract void lock();
    
        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            // 当前线程
            final Thread current = Thread.currentThread();
            // 同步状态
            int c = getState();
            // 同步状态为0,代表锁没有被占用,这个时候可以获取
            if (c == 0) {
                // CAS操作,如果内存值是预期值0,则用更新值acquires覆盖内存值,成功返回true
                if (compareAndSetState(0, acquires)) {
                    // 设置当前拥有独占访问权的线程。
                    setExclusiveOwnerThread(current);
                    // 锁获取成功
                    return true;
                }
            }
            // 如果当前线程已经持有这个锁了,那就重入
            else if (current == getExclusiveOwnerThread()) {
                // 当前状态加上acquires(一般是+1)
                int nextc = c + acquires;
                // 异常情况
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 设置状态
                setState(nextc);
                // 锁获取成功
                return true;
            }
            // 没有获取到,直接返回false
            return false;
        }
    
        // 如果当前线程只获取一次,那么一进一出,同步状态为0,返回true;如果重入了几次,那么同步状态不一定减到0,返回就是false了。
        protected final boolean tryRelease(int releases) {
            // 当前状态减去releases
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // 如果同步状态为0,证明此时无人持有锁
            if (c == 0) {
                // 只有在这种情况才是true
                free = true;
                // 没有线程拥有独占访问权
                setExclusiveOwnerThread(null);
            }
            // 设置同步状态
            setState(c);
            return free;
        }
    
        // 判断当前线程是否是拥有独占访问权的线程
        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
    
        // new一个Condition
        final ConditionObject newCondition() {
            return new ConditionObject();
        }
    
        // Methods relayed from outer class
    
        // 获取拥有独占访问权的线程
        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }
    
        // 获取重入次数
        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }
    
        // 同步状态为0即没有锁
        final boolean isLocked() {
            return getState() != 0;
        }
    
        /**
         * Reconstitutes the instance from a stream (that is, deserializes it).
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

    【NonfairSync】

    /**
     * 非公平锁的实现
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
    
        
        final void lock() {
            // 如果当前无人占用锁,并且当前线程设置同步状态成功,那么这个线程就“抢占”成功了(说白了就是不管有没有等待很久的线程)
            if (compareAndSetState(0, 1))
                // 设置拥有独占访问权的线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 如果没有抢成功,则调用AQS的acquire方法(进而进入当前类的tryAcquire方法,非公平实现)
                acquire(1);
        }
    
        protected final boolean tryAcquire(int acquires) {
            // 尝试获取,调用的是父类的非公平实现
            return nonfairTryAcquire(acquires);
        }
    }

    【FairSync】

    /**
     * 公平锁的实现
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
    
        final void lock() {
            // 调用AQS的acquire方法(进而进入当前类的tryAcquire方法,公平实现)
            acquire(1);
        }
    
        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 区别在这里,先判断是否有人在等待,如果有就返回false;没有的话即尝试设置同步状态抢占锁
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 重入锁情况,和非公平实现一样一样的
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

    很简单的逻辑

    1. 锁空闲则尝试设置同步状态以抢占。

    2. 重入的情况,更新同步状态即可。

    3. 公平锁仅仅多了个判断当前等待队列是否为空。

    一切源于AbstractQueuedSynchronizer(AQS)

    内部结构大体可以分为两部分:CLH同步队列和Condition条件。

    根据CLH队列的Node结构来看,可以看出两种模式:共享模式和独占模式。(问题来了:共享模式和独占模式的作用是什么?为什么要搞两种?)

    独占模式:在一个线程持有锁的前提下,其他线程试图获取该锁将无法成功。

    共享模式:锁可能(但不一定)被多个线程共同持有。

    具体应用体现:ReentrantLock中是独占模式;CountDownLatch中是共享模式;ReentrantReadWriteLock中的ReadLock是共享模式,WriteLock是独占模式(表示可以多个线程同时读,只能一个线程写)

    我们知道Condition是线程通信的工具,那么Condition是怎样影响CLH队列的?这个在后面慢慢解释。

    先熟悉一下基础内容Node

    /**
     * AQS是“CLH”(发明人是Craig、Landin和Hagersten)锁队列的变体。
     * CLH锁通常用于自旋锁。但是这里使用相同的基本策略来实现同步。
     */
    static final class Node {
        /** 节点在共享模式下等待的标记 */
        static final Node SHARED = new Node();
        /** 节点在独占模式下等待的标记 */
        static final Node EXCLUSIVE = null;
    
        /** 表示线程已取消 */
        static final int CANCELLED =  1;
        /** 表示后继线程需要被阻塞.(用于独占锁) */
        static final int SIGNAL    = -1;
        /** 表示线程在Condition条件上阻塞等待 */
        static final int CONDITION = -2;
        /** 表示下一个acquireShared应该无条件传播。(用于共享锁) */
        static final int PROPAGATE = -3;
    
        /**
         *   SIGNAL:     此节点的后续节点被(或将要)阻塞(通过park),因此当前节点在释放或取消时必须取消其后续节点。
         *               为了避免争用,acquire方法必须首先表明它们需要一个信号,然后原子的重试acquire方法,如果失败,则阻塞。
         *
         *   CANCELLED:  此节点由于超时或中断节点从未离开此状态而被取消。特别是,具有已取消节点的线程将不再阻塞。
         *
         *   CONDITION:  此节点当前位于条件队列上。在传输之前,它不会被用作同步队列节点,此时状态将被设置为0。
         *
         *   PROPAGATE:  一个被释放的节点应该被传播到其他节点。这是在doReleaseShared中设置的(仅针对head节点),以确保传播能够继续,即使其他操作已经介入。
         *
         *   0:          None of the above
         *
         * 值以数字形式排列以简化使用。非负值意味着节点不需要发出信号。因此,大多数代码不需要检查特定的值,只需检查符号。
         *
         * 对于正常的同步节点,字段初始化为0
         */
        volatile int waitStatus;
    
        /**
         * 当前节点/线程的前驱节点。
         */
        volatile Node prev;
    
        /**
         * 当前节点/线程的后继节点。
         */
        volatile Node next;
    
        /**
         * 当前节点拥有的线程
         */
        volatile Thread thread;
    
        /**
         * 如果等于SHARED,则当前节点是共享模式。
         * 否则是独占模式,这个值也是Condition队列的下一个节点。
         */
        Node nextWaiter;
    
        /**
         * 如果当前节点以共享模式等待,返回true。
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
    
        /**
         * 返回前驱节点,如果为空则抛出空指针异常。
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
    
        Node() {    // 用于创建链表head或共享模式下的SHARED
        }
    
        Node(Thread thread, Node mode) {     // addWaiter方法中使用
            this.nextWaiter = mode;
            this.thread = thread;
        }
    
        Node(Thread thread, int waitStatus) { // Condition类中使用
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

    另外还有AQS的成员变量

    /**
     * 等待队列的头指针
     */
    private transient volatile Node head;
    /**
     * 等待队列的尾指针,仅仅通过enq(入队)方法添加新节点的方式来修改。
     */
    private transient volatile Node tail;
    /**
     * 同步状态(很多地方用到这个状态,比如CountDownLatch利用state当做计数器,而这里代表是否有线程持有锁以及重入次数)
     */
    private volatile int state;

    独占模式

    独占模式有四个主要方法
    1. acquire
    2. acquireInterruptibly
    3. tryAcquireNanos
    4. release

    1. acquire(int arg)

    // 以独占模式获取,忽略中断
    public final void acquire(int arg) {
        // 如果tryAcquire返回false获取锁失败(tryAcquire是交给子类来实现的)
        if (!tryAcquire(arg) &&
            // 则把当前线程放入等待队列
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 中断线程???
            selfInterrupt();
    }
    static void selfInterrupt() {
        // 这一方法实际上完成的是,在线程受到阻塞时抛出一个中断信号,这样线程就得以退出阻塞的状态。Thread.interrupt()方法不会中断一个正在运行的线程。
        Thread.currentThread().interrupt();
    }

    思路很简单,尝试获取锁,获取不到则等待。注意点是interrupt,见方法注释。

    addWaiter方法,这里是以独占模式作为参数的

    // 给当前线程以给定的模式创建并入队一个新节点
    private Node addWaiter(Node mode) {
        // 新Node
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        // 如果队列已经创建,就将新节点插入队尾。
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 如果队列没有创建,通过enq方法创建队列,并插入新的节点。
        enq(node);
        return node;
    }
    // 入队方法
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 初始化
            if (t == null) { // Must initialize
                // 利用CAS设置头结点
                if (compareAndSetHead(new Node()))
                    // 尾指针指向头结点
                    tail = head;
            } else {
                // 新节点的前驱指向尾节点
                node.prev = t;
                // 设置新节点为新的tail
                if (compareAndSetTail(t, node)) {
                    // 原尾节点的next指向新尾节点(新Node)
                    t.next = node;
                    // 返回原尾节点
                    return t;
                }
            }
        }
    }

    关于入队过程

    添加完等待节点之后,执行acquireQueued方法

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 找到当前节点的前驱
                final Node p = node.predecessor();
                // 如果前驱是头结点,则执行tryAcquire尝试获取锁
                if (p == head && tryAcquire(arg)) {
                    // 获取锁成功,设置当前节点为头结点
                    setHead(node);
                    // 原头结点的next置空,这样下次垃圾回收的时候就会回收原头结点
                    p.next = null; // help GC
                    // 在finally里,因为这里是false,则不会执行cancelAcquire方法
                    failed = false;
                    // 这里返回false,就不会执行acquire方法里的selfInterrupt()方法了
                    return interrupted;
                }
                // 不满足上述条件:1. 判断是否应该阻塞。2. 应该阻塞的话,则阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                // 发生异常的话,取消获取锁的尝试,将当前Node的等待状态设置为CANCELLED
                cancelAcquire(node);
        }
    }
    
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 前一个节点的等待状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * 这个节点已经设置了请求释放信号的状态,所以它可以安全地阻塞
             */
            return true;
        if (ws > 0) {
            /*
             * 前一个节点是取消状态。跳过前驱并重试。
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 等待状态是 0 或者 PROPAGATE。表示需要个信号但不要阻塞。调用者需要重试,以确保在阻塞之前不能获取到锁。
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    private final boolean parkAndCheckInterrupt() {
        // 为线程调度的目的禁用当前线程,除非许可证可用
        LockSupport.park(this);
        return Thread.interrupted();
    }
    
    // java.util.concurrent.locks.LockSupport#park(java.lang.Object)
    // LockSupport是JDK中的底层类,用来创建锁和其他同步工具类的基本线程阻塞原语。
    // 通过调用LockSupport.park()和LockSupport.unpark()可以实现线程的阻塞和唤醒。
    // LockSupport提供了1个许可证供使用,如果这个许可还没有被占用,当前线程获取许可并继续执行;如果许可已经被占用,当前线程阻塞,等待获取许可。
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }

    cancelAcquire方法

    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;
    
        node.thread = null;
    
        // 跳过取消状态的阶段
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
    
        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;
    
        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;
    
        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }
    
            node.next = node; // help GC
        }
    }

    2. acquireInterruptibly(int arg)

    如果锁不被其他线程持有,则获取锁并立即返回,将锁持有计数设置为1。
    如果当前线程已经持有该锁,那么持有计数将增加1,该方法立即返回。

    (一)如果锁是由另一个线程持有,那么当前线程就会出于线程调度的目的而被禁用,并处于休眠状态,直到发生以下两种情况之一:
    1. 锁被当前线程获取;
    2. 其他线程中断当前线程。

    (二)如果锁被当前线程获取,那么锁持有计数被设置为1。如果当前线程:
    1. 在进入此方法时已设置其中断状态
    2. 在获取锁时被中断
    则抛出 InterruptedException,并清除当前线程的中断状态。

    在这个实现中,由于这个方法是一个显式的中断点,所以优先响应中断而不是正常的或可重入的锁获取。

    // 以独占模式获取,如果中断则中止。
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        // 如果当前线程已中断,则抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        // 如果尝试获取锁失败,则抛出异常终止
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

    doAcquireInterruptibly方法

    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        // 把当前线程添加到等待队列,方法实现前面已经说过
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                // 找到当前节点的前驱
                final Node p = node.predecessor();
                // 如果前驱是头结点,并且tryAcquire尝试获取锁成功
                if (p == head && tryAcquire(arg)) {
                    // 设置当前节点为头结点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                // 不满足上述条件:1. 判断是否应该阻塞。2. 应该阻塞的话,则阻塞。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 3. 响应中断,抛出中断异常
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                // 发生异常的话,取消获取锁的尝试,将当前Node的等待状态设置为CANCELLED
                cancelAcquire(node);
        }
    }

    3. tryAcquireNanos(int arg, long nanosTimeout)

    如果在给定的等待时间内另一个线程没有持有锁,并且当前线程不是中断状态,则获取锁,并立即返回值{@code true},将锁持有计数设置为1。
    如果当前线程已经持有该锁,那么持有计数将增加1,并且该方法返回{@code true}。

    如果这个锁被设置为使用公平的排序策略,那么如果任何其他线程正在等待这个锁,那么将不会获得一个可用的锁。


    (一)如果锁是由另一个线程持有,那么当前线程就会出于线程调度的目的而被禁用,并处于休眠状态,直到发生以下三种情况之一:
    1. 锁被当前线程获取
    2. 其他线程中断当前线程
    3. 指定的等待时间已经过了


    (二)如果获取了锁,则返回值{@code true},并将锁持有计数设置为1。如果当前线程:
    1. 在进入此方法时已设置其中断状态
    2. 在获取锁时被中断
    则抛出 InterruptedException,并清除当前线程的中断状态。


    如果指定的等待时间过期,则返回值{@code false}。如果时间小于或等于0,则该方法根本不会等待。

    在这个实现中,由于这个方法是一个显式的中断点,所以优先响应中断,而不是正常的或可重入的获取锁,或者报告等待时间的流逝。

    // 尝试以独占模式获取,如果中断则中止,如果给定超时时间超时则失败。
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        // 优先响应中断
        if (Thread.interrupted())
            throw new InterruptedException();
        // 尝试获取锁,成功返回true;如果tryAcquire返回false,则执行doAcquireNanos,在指定时间内尝试获取
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

    doAcquireNanos方法:

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        // 如果过期时间小于等于0,直接返回
        if (nanosTimeout <= 0L)
            return false;
        // 过期时间
        final long deadline = System.nanoTime() + nanosTimeout;
        // 将当前节点以独占模式添加到等待队列中
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                // 获取当前节点的前驱
                final Node p = node.predecessor();
                // 如果前驱是head并且尝试获取锁成功,则把当前节点设置为头结点并将当前头结点的next置空,方便回收。
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                // 判断是否超时,超时则直接返回
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                // 没有超时,则判断是否应该阻塞以及判断超时时间是否大于指定的自旋时间
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    // 如果nanosTimeout大于0,则阻塞
                    LockSupport.parkNanos(this, nanosTimeout);
                // 响应中断
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                // 发生异常则把当前线程设置为取消状态
                cancelAcquire(node);
        }
    }

    4. release(int arg)

    如果当前线程是这个锁的持有者,那么持有计数将递减。如果持有计数现在为零,则释放锁。

    // 以独占模式释放
    public final boolean release(int arg) {
        // 尝试释放锁
        if (tryRelease(arg)) {
            Node h = head;
            // 如果头结点不为空并且等待状态不为0
            if (h != null && h.waitStatus != 0)
                // 唤醒头结点的后继
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    unparkSuccessor方法

    /**
     * 唤醒节点的后继
     */
    private void unparkSuccessor(Node node) {
    
        // 如果当前节点的状态是负数(比如:-1是等待信号通知的状态),则尝试清理预期的信号状态。
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
    
        // 将要被取消阻塞的通常只是下一个节点。
        Node s = node.next;
        // 如果后继节点为空或者状态为cancelled,也就是1
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 则从tail向前遍历,以找到非取消状态的节点。
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 将最终确定的线程节点唤醒
        if (s != null)
            LockSupport.unpark(s.thread);
    }

    共享模式

    共享模式涉及4个方法:

    1. acquireShared
    2. acquireSharedInterruptibly
    3. tryAcquireSharedNanos
    4. releaseShared

    1. acquireShared(int arg)

    // 以共享模式获取,忽略中断
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    doAcquireShared方法

    private void doAcquireShared(int arg) {
        // 以共享模式创建一个新Node,添加到等待队列里
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 当前Node的前驱
                final Node p = node.predecessor();
                // 如果前驱是head
                if (p == head) {
                    // 尝试获取共享锁
                    int r = tryAcquireShared(arg);
                    // 获取成功
                    if (r >= 0) {
                        // 设置头结点并传播
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // 判断是否需要阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    /**
     * 设置队列的头部并检查后继节点是否在共享模式下等待,如果是,并且propagate > 0或者已设置为PROPAGATE状态,则传播。
     * @param node the node
     * @param propagate the return value from a tryAcquireShared
     */
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * 尝试通知下一个节点如果:
         *   1. Propagation 由调用者表示,
         *     或者被之前的操作记录下来(在setHead方法之前或者之后设置为h.waitStatus)
         *     (注意:这将使用等待状态的信号检查,因为PROPAGATE状态可能会转换为SIGNAL。)
         * 
         *   2. 下一个节点在共享模式下等待,或者我们不知道,因为它看起来是空的
         *
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

    2. acquireSharedInterruptibly(int arg)

    // 以共享模式获取,如果中断则中止
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        // 优先响应中断
        if (Thread.interrupted())
            throw new InterruptedException();
        // 如果尝试获取失败,调用doAcquireSharedInterruptibly方法
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    doAcquireSharedInterruptibly方法

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 以共享模式添加到等待队列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                // 当前节点的前驱
                final Node p = node.predecessor();
                // 如果前驱是头结点
                if (p == head) {
                    // 尝试获取共享锁
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 成功获取到则传播状态
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                // 前驱不是头结点,则检查是否需要阻塞,阻塞之后如果被中断则抛出中断异常
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    3. tryAcquireSharedNanos(int arg, long nanosTimeout)

    // 尝试以共享模式获取,如果中断则中止,如果给定超时时间超时则失败
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        // 优先响应中断
        if (Thread.interrupted())
            throw new InterruptedException();
        // 尝试获取共享锁
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }

    doAcquireSharedNanos方法

    private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        // 如果超时时间小于等于0,直接返回
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        // 以共享模式添加到等待队列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                // 获取当前节点的前驱
                final Node p = node.predecessor();
                // 如果前驱是头结点
                if (p == head) {
                    // 尝试获取共享锁
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 获取成功,设置当前节点为头结点,并唤醒下一个等待的线程
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                }
                nanosTimeout = deadline - System.nanoTime();
                // 超时返回
                if (nanosTimeout <= 0L)
                    return false;
                // 判断是否应该阻塞,并且超时时间大于指定的自旋时间
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    // 阻塞指定时间
                    LockSupport.parkNanos(this, nanosTimeout);
                // 中断则抛出中断异常
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    4. releaseShared(int arg)

    // 以共享模式释放
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

    doReleaseShared方法

    private void doReleaseShared() {
        // 确保一个释放传播,即使有其他正在进行的acquires/releases。
        // 如果需要信号的话,按照正常的方式试图唤醒head的后继。
        // 如果不需要信号,则将状态设置为PROPAGATE,以确保在释放后传播仍将继续。
        // 此外,我们必须循环,以防在执行此操作时添加新节点。
        // 另外,与其它使用unparkSuccessor方法不同,这里如果CAS重置状态失败,将重新检查
        for (;;) {
            Node h = head;
            // 从头结点开始,如果当前节点不为空并且不是tail
            if (h != null && h != tail) {
                // 当前节点状态
                int ws = h.waitStatus;
                // 如果是等待通知的状态
                if (ws == Node.SIGNAL) {
                    // 重置状态,失败则重新循环
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 重置成功,唤醒后继线程
                    unparkSuccessor(h);
                }
                // 如果状态是0,则设置状态为PROPAGATE。设置失败重新循环
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

    独占模式和共享模式对比:

    ReentrantLock方法到AQS方法的调用关系:
    ReentrantLock#lock------------------------------------------------------------AbstractQueuedSynchronizer#acquire
    ReentrantLock#lockInterruptibly--------------------------------------------AbstractQueuedSynchronizer#acquireInterruptibly
    ReentrantLock#tryLock(long, java.util.concurrent.TimeUnit)--------AbstractQueuedSynchronizer#tryAcquireNanos
    ReentrantLock#unlock--------------------------------------------------------AbstractQueuedSynchronizer#release

    ReentrantLock#tryLock-------------------------------------------------------ReentrantLock.Sync#nonfairTryAcquire

    需要子类实现的方法

    // 独占模式下尝试获取锁
    java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquire
    // 独占模式下尝试释放锁
    java.util.concurrent.locks.AbstractQueuedSynchronizer#tryRelease
    // 共享模式下尝试获取锁
    java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquireShared
    // 共享模式下尝试释放锁
    java.util.concurrent.locks.AbstractQueuedSynchronizer#tryReleaseShared
    // 判断持有锁的是否是当前线程
    java.util.concurrent.locks.AbstractQueuedSynchronizer#isHeldExclusively

    关于AQS大概工作原理:

    获取同步状态失败的线程都会被封装成Node加入到等待队列并在队列中自旋,移出队列的条件是前驱节点为头节点且成功获取了同步状态。

    有机会争用锁的是head指向结点的后继,也就是队列中第二个结点,如果成功获取锁,这个结点会成为新的头结点。

    获取锁失败的线程会被阻塞,这得益于 LockSupport.park 方法,提供了基本线程阻塞原语,会让当前线程挂起到WAITING状态。被挂起的线程会等待 LockSupport.unpark 方法来唤醒,通俗的讲unpark方法提供了“许可”,告诉线程你可以活动了,不过这种许可是一次性的。具体为:每个线程都有个permit开关,park方法会将permit设置为0,unpark方法会将permit设置成1。

    在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后用 LockSupport.unpark 唤醒头节点的后继节点。

    注:本文实际写于2019-12-04,没有发布是因为自己有很多不懂的地方。过完年,我已经忘了哪里不懂。。。

    所以这篇文章我会在自己理解的基础上不断去完善,争取通俗易懂的给大家讲明白。

    .

  • 相关阅读:
    React 生命周期
    React 总结
    系统后台设置
    数据库的事务日志已满,起因为"LOG_BACKUP"。
    webpack 打包器
    地图API
    ES6 与 React
    前端流行的技术
    Javascript 函数声明、函数表达式与匿名函数自执行表达式
    Javascript 解读与思想
  • 原文地址:https://www.cnblogs.com/LUA123/p/11981140.html
Copyright © 2011-2022 走看看