zoukankan      html  css  js  c++  java
  • ReentrantLock源码解析

    ReentrantLock

    ReentrantLock可重入排他锁,基于AQS同步框架实现锁机制。可重入锁支持公平和非公平模式,默认使用非公平模式。

    内部通过AbstractQueuedSynchronizer中的内部类ConditionObject可以实现条件等待,调用Condition#awaitCondition#signal的线程需要先获取到锁,否则将会抛出异常。

    AQS实现

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
    
        abstract void lock();
    		
        /**
         * 获取非公平锁
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // state = 0,锁未被任何线程持有,立即尝试获取一次
            if (c == 0) {
                // 与公平锁的区别在于非公平不论同步队列中是否有线程等待,都立即获取一次锁
                // 公平锁在会先检查同步队列中是否有在排队的,如果有立即进入队列排队
                if (compareAndSetState(0, acquires)) {
                    // cas成功,获取锁成功,则设置当前持有锁线程为当前线程
                    setExclusiveOwnerThread(current);
                    // 返回true,表示获取锁成功
                    return true;
                }
            }
            // 锁已被持有,但是锁可能被重入
            // 锁被当前线程持有,持有次数+acquires,立即返回获取锁成功
            else if (current == getExclusiveOwnerThread()) {
                // 增加获取次数
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 因为锁被当前线程持有,所以不需要加锁,直接更新即可
                setState(nextc);
                return true;
            }
            return false;
        }
    
        /**
         * 释放锁
         */
        protected final boolean tryRelease(int releases) {
            // 释放后持有锁次数
            int c = getState() - releases;
            // 锁不是当前线程持有,抛出异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            // 标识锁是否已经全部释放
            boolean free = false;
            // c=0,表明锁已经完全释放
            if (c == 0) {
                // 锁完全释放
                free = true;
                // 置空持有锁线程
                setExclusiveOwnerThread(null);
            }
            // 因为锁被当前线程持有,所以不需要加锁,直接更新即可
            setState(c);
            return free;
        }
    
        /**
         * 是否是当前线程持有锁
         */
        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
    
        final ConditionObject newCondition() {
            return new ConditionObject();
        }
    
    	/**
    	 * 当前谁持有锁。如果state = 0,表明没有线程持有锁,否则返回持有线程
    	 */
        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }
    
        /**
         * 持有锁次数
         * 如果是当前线程持有锁,则返回持有次数,否则返回0
         */
        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }
    
        /**
         * 锁是否被持有
         */
        final boolean isLocked() {
            return getState() != 0;
        }
    }
    
    /**
     * 非公平锁
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
    
        /**
         * 非公平锁获取锁
         */
        final void lock() {
            // 不论前面是否有线程在排队,直接获取一次
            if (compareAndSetState(0, 1))
                // 获取成功,设置持所线程为当前线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 第一次获取失败,调用AQS#acquire方法再次获取
                acquire(1);
        }
    
        /**
         * 该方法将被AQS#acquire调用
         */
        protected final boolean tryAcquire(int acquires) {
            // 调用非公平获取
            return nonfairTryAcquire(acquires);
        }
    }
    
    
    /**
     * 公平锁实现
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
    
        /** 
         * 公平锁获取锁
         */
        final void lock() {
            // 调用AQS#acquire方法再次获取
            acquire(1);
        }
    
        /**
         * 该方法将被AQS#acquire调用
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // 锁还未被持有
            if (c == 0) {
                if (!hasQueuedPredecessors() &&  // 是否有线程在排队
                    compareAndSetState(0, acquires)) { // CAS获取锁
                    // 获取成功,设置持所线程为当前线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 锁已被持有
            // 锁被当前线程持有,持有次数+acquires,立即返回获取锁成功
            else if (current == getExclusiveOwnerThread()) {
                // 增加获取次数
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                // 因为锁被当前线程持有,所以不需要加锁,直接更新即可
                setState(nextc);
                return true;
            }
            // 获取锁失败,进入排队
            return false;
        }
    }
    

    获取锁

    1、void lock()方法

    1.1、非公平锁:上来直接CAS抢占一次锁,如果获取获取成功返回,失败才调用acquireacquire中线程直接获取获取锁(CAS或重入),获取失败才进入排队。

    1.2、公平锁:直接调用acquire获取。acquire中线程先检查是否有线程在排队,如果没有才尝试获取锁(CAS或重入),获取失败进入排队。

    2、boolean tryLock()方法

    无论非公平锁或者公平锁,都直接同样逻辑,不论是否有线程在排队,都会直接CAS抢占一次锁或者重入,抢占失败,线程返回。即tryLock()只会尝试获取一次,获取失败也不会进入排队。

    3、void lockInterruptibly()方法、boolean tryLock(long timeout, TimeUnit unit)方法

    如果出现线程中断,立即抛出异常

    /**
     * 尝试获取锁,获取失败,进入排队
     */
    public void lock() {   
        // 调用Sync#lock --> AQS#acquire
        sync.lock();
    }
    
    public void lockInterruptibly() throws InterruptedException {
    	// 调用AQS#acquireInterruptibly
        sync.acquireInterruptibly(1);
    }
    
    /**
     * 无论公平或者非公平都只获取一次,获取失败,立即返回
     */
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }
    
    /**
     * 尝试获取锁,如果超时立即返回,获取失败会进入排队
     */
    public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    
    // 非公平锁lock
    
    

    非公平锁

    lock()方法

    /**
     * 尝试获取锁,获取失败,进入排队
     */
    public void lock() {   
        // 调用Sync#lock --> AQS#acquire
        sync.lock();
    }
    
    /**
     * NonfairSync#lock
     */ 
    final void lock() {
        // 非公平锁不论是否前面是否有线程在等待都会CAS抢占一次锁
        if (compareAndSetState(0, 1))
            // 抢占成功,更新持锁线程为当前线程
            setExclusiveOwnerThread(Thread.currentThread());
        else
            // 直接抢占锁失败,进入acquire正常获取锁
            acquire(1);
    }
    
    /**
     * AQS#acquire
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && // CAS抢占锁失败,,会来到这里正常获取锁
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    /**
     * NonfairSync#tryAcquire
     */
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    
    /**
     * Sync#nonfairTryAcquire
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // state = 0,锁未被任何线程持有,立即尝试获取一次
        if (c == 0) {
            // 与公平锁的区别在于非公平不论同步队列中是否有线程等待,都立即获取一次锁
            // 公平锁在会先检查同步队列中是否有在排队的,如果有立即进入队列排队
            if (compareAndSetState(0, acquires)) {
                // cas成功,获取锁成功,则设置当前持有锁线程为当前线程
                setExclusiveOwnerThread(current);
                // 返回true,表示获取锁成功
                return true;
            }
        }
        // 锁已被持有,但是锁可能被重入
        // 锁被当前线程持有,持有次数+acquires,立即返回获取锁成功
        else if (current == getExclusiveOwnerThread()) {
            // 增加获取次数
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            // 因为锁被当前线程持有,所以不需要加锁,直接更新即可
            setState(nextc);
            return true;
        }
        return false;
    }
    

    公平锁

    lock()

    public void lock() {
        sync.lock();
    }
    
    /**
     * 公平锁获取锁
     * 非公平锁会直接CAS抢占一次锁,抢占失败才会调用acquire()获取一次锁
     */ 
    final void lock() {
        acquire(1);
    }
    
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 无论锁是否被其他线程持有,都会判断队列中是否有其他线程在等待,如果没有才尝试获取锁
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    释放锁

    如果调用release()方法的线程不是持有锁的线程,将会抛出异常。锁释放完全,Sync#tryRelease返回true,否则返回false,返回true,意味着当前线程锁完全释放,使用unparkSuccessor下一个线程,唤醒后的线程将继续在acquireQueued() 方法中继续自旋获取锁。

    public void unlock() {
        // 调用AQS#release --> Sync#tryRelease
        sync.release(1);
    }
    
    /**
     * 释放锁。如果当前线程已经完全释放锁,则唤醒后驱节点
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    /**
     * 释放锁。
     * 1、当前线程不是持锁线程,抛出异常
     * 2、锁完全释放,置空持锁线程,返回true
     * 3、锁未完全释放,返回false
     */
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    
    /**
     * 找到第一个等待唤醒的节点,唤醒线程,唤醒后的线程继续在acquireQueued()方法中自旋获取锁
     */
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        // 当前节点的后驱节点不存在或者已取消
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 从尾节点找到第一个等待唤醒的节点,
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 唤醒第一个等待唤醒的后驱节点
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

    创建Condition

    // 直接调用Sync#newCondition()
    public Condition newCondition() {
        return sync.newCondition();
    }
    
    // 创建Condition对象(AQS#ConditionObject)
    final ConditionObject newCondition() {
        return new ConditionObject();
    }
    

    进入条件队列

    await()

    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 创建条件队列节点,移除队列中已取消节点,并入队
        Node node = addConditionWaiter();
        // 完全释放锁,调用该方法的线程必须是持锁线程
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // 当前节点不在同步队列,阻塞当前线程
        while (!isOnSyncQueue(node)) {
            // 阻塞当前线程
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        
        // 向上:线程完全释放锁(重入次数将会被保存,之后唤醒线程后,再获取等次数重入)并进入条件队列,排队阻塞,等待条件出现
        //////////////////////////////////////////////
        // 向下:条件出现,sigal方法中线程从条件队列移除,线程被唤醒,继续执行
        
        // 当节点收到signal被唤醒后,重新获取阻塞前的持锁次数,然后向下执行
        
        // 进入自旋获取锁 如果线程被打断,处理中断
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        // 清除条件队列中已被取消的线程
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        // 中断处理
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    
    /**
     * 创建条件队列等待节点
     */
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        if (t != null && t.waitStatus != Node.CONDITION) {
            // 从头节点开始清除已取消节点
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        // 创建条件队列
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        // 当前新增节点为尾节点
        lastWaiter = node;
        return node;
    }
    
    /**
     * 从头部开始移除已取消节点
     * 条件队列是单链表,所以从头部开始遍历移除
     */
    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        Node trail = null;
        while (t != null) {
            Node next = t.nextWaiter;
            // 节点已取消
            if (t.waitStatus != Node.CONDITION) {
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            }
            else
                trail = t;
            t = next;
        }
    }
    
    /**
     * 完全释放锁
     */
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            // 完全释放锁
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
    

    await(long nanosTimeout) 可超时

    public final long awaitNanos(long nanosTimeout)
        throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        final long deadline = System.nanoTime() + nanosTimeout;
        int interruptMode = 0;
        // 如果超时时间大于自旋时间,那么阻塞线程;否则,线程自旋
        while (!isOnSyncQueue(node)) {
            // 已超时,节点从条件队列进入同步队列
            if (nanosTimeout <= 0L) {
                transferAfterCancelledWait(node);
                break;
            }
            // 超时时间大于自旋时间,直接阻塞
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            // 剩余超时时间
            nanosTimeout = deadline - System.nanoTime();
        }
        // 自旋获取锁
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return deadline - System.nanoTime();
    }
    
    /**
     * 将节点从条件队列进入同步队列
     */
    final boolean transferAfterCancelledWait(Node node) {
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }
    

    等待条件出现,唤醒条件队列中线程signal

    /**
     * 条件出现,唤醒条件队列头节点
     * 如果当前线程不是持锁线程,抛出异常
     * 如果条件队列头节点不为null,唤醒头节点,为null则什么也不错
     */
    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        // 唤醒条件队列头节点
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }
    
    private void doSignal(Node first) {
        do {
            // 第二个等待节点是null,则清空条件队列,lastWaiter、firstWaiter都为空
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) && (first = firstWaiter) != null);
    }
    
    /**
     * 唤醒节点,节点由条件队列转入同步队列
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
    
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            // 唤醒
            LockSupport.unpark(node.thread);
        return true;
    }
    
    /**
     * 唤醒全部条件队列上的节点
     */
    public final void signalAll() {
        // 持锁线程不是当前线程,立即抛出异常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignalAll(first);
    }
    
    private void doSignalAll(Node first) {
        lastWaiter = firstWaiter = null;
        // 从头节点开始逐个遍历唤醒
        do {
            Node next = first.nextWaiter;
            first.nextWaiter = null;
            transferForSignal(first);
            first = next;
        } while (first != null);
    }
    
    
  • 相关阅读:
    什么人一亏再亏,什么人亿万富翁? —兼谈本周经济与股市
    数组排序
    倒水
    倒水
    lua string
    lua string
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
  • 原文地址:https://www.cnblogs.com/QullLee/p/12247734.html
Copyright © 2011-2022 走看看