zoukankan      html  css  js  c++  java
  • Java并发之ReentrantLock源码解析(二)

    在了解如何加锁时候,我们再来了解如何解锁。可重入互斥锁ReentrantLock的解锁方法unlock()并不区分是公平锁还是非公平锁,Sync类并没有实现release(int arg)方法,这里会实现调用其父类AbstractQueuedSynchronizer的release(int arg)方法。在release(int arg)方法中,会先调用其子类实现的tryRelease(int arg)方法,这里我们看看ReentrantLock.Sync.tryRelease(int releases)的实现。
    正常情况下,这段代码只能由占有锁的线程调用,所以这里会先获取锁的引用计数,再减去释放次数,即:c = getState() - releases,然后判断尝试释放锁的线程,是否是独占锁的线程,如果不是则抛出IllegalMonitorStateException异常。如果独占线程在释放锁后锁的引用计数为0,则设置独占线程为null,再设置state为0,代表锁成为无主状态。
    如果确定锁成为无主状态后,release(int arg)会检查队列中是否有需要唤醒的线程,如果头节点header为null,则代表除了释放锁的线程,没有任何线程抢锁,如果头节点的等待状态为0,代表头节点目前没有需要唤醒的后继节点。如果头节点不为null且等待状态不为0,则代表队列中可能存在需要唤醒的线程,会进而执行unparkSuccessor(Node node),将头节点作为参数传入。
    当我们将头节点传入unparkSuccessor(Node node),如果判断头节点的等待状态<0,则会将头节点的等待状态设置为0,如果头节点的后继节点不为null,或者后继节点尚未被取消,会直接唤醒后继节点,后继节点会退出parkAndCheckInterrupt()方法,acquireQueued(final Node node, int arg)的循环中重新竞争锁,如果竞争成功,则后继节点成为头节点,退出抢锁逻辑开始访问资源,如果竞争失败,这里最多循环两次执行shouldParkAfterFailedAcquire(Node pred, Node node),第一次先将后继节点的前驱节点的等待状态由0改为SIGNAL(1),表示前驱节点的后继节点处于等待唤醒状态,第二次循环如果还是抢锁失败,shouldParkAfterFailedAcquire(Node pred, Node node)判断前驱节点的等待状态为SIGNAL,返回true,后继节点的线程陷入阻塞。需要注意的是,即便是公平锁,也可能存在不公平的情况,公平锁头节点的后继节点,也有可能存在抢锁失败的情况,比如之前说过,公平锁在调用tryLock()时是不保证公平的。
    这里我们需要注意一下,后继节点是可能存在被移除的情况,这种情况会在后续讲解tryLock(long timeout, TimeUnit unit)的时候说明,如果一旦出现后继节点被移除的情况(waitStatus > 0),在唤醒后继节点时,会从尾节点向前驱节点遍历,找到最靠近头节点的有效节点。

    public class ReentrantLock implements Lock, java.io.Serializable {
    	//...
        private final Sync sync;
    	//...
        abstract static class Sync extends AbstractQueuedSynchronizer {
    		//...
            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    		//...
    	}
    	//...
        public void unlock() {
            sync.release(1);
        }
    	//...
    }
    
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
    	//...
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    	//...
    	//由子类实现尝试解锁方法。
    	protected boolean tryRelease(int arg) {
            throw new UnsupportedOperationException();
        }
    	//...
    	private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                node.compareAndSetWaitStatus(ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
    			//如果后继节点不为null但被移除,会从尾节点向前驱节点遍历,找到最靠前的有效节点
                for (Node p = tail; p != node && p != null; p = p.prev)
                    if (p.waitStatus <= 0)
                        s = p;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    	//...
    }
    

      

    可重入互斥锁的tryLock()方法不分公平锁或非公平锁,统一调用ReentrantLock.Sync.nonfairTryAcquire(int acquires),在JUC长征篇之ReentrantLock源码解析(一)已经介绍过nonfairTryAcquire(int acquires)方法了,这里就不再赘述了。

    public class ReentrantLock implements Lock, java.io.Serializable {
    	//...
        private final Sync sync;
    	//...
        abstract static class Sync extends AbstractQueuedSynchronizer {
    		//...
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    		//...
    	}
    	//...
        public boolean tryLock() {
            return sync.nonfairTryAcquire(1);
        }
    	//...
    }
    

      

    调用ReentrantLock.tryLock(long timeout, TimeUnit unit)方法时,会调用Sync父类AQS的tryAcquireNanos(int arg, long nanosTimeout)方法,如果线程已被标记为中断,则会进入<1>处的分支并抛出InterruptedException异常,否则先调用tryAcquire(int acquires)尝试抢夺,如果抢锁失败,则会调用doAcquireNanos(int arg, long nanosTimeout)陷入计时阻塞,如果在阻塞期间内线程被中断,则会抛出InterruptedException异常,如果到达有效期后线程还未获得锁,tryAcquireNanos(int arg, long nanosTimeout)将返回false。
    当进入doAcquireNanos(int arg, long nanosTimeout)后,会先在<2>处计算获取锁的截止时间,之后和原先的acquireQueued()很像,先把当前线程封装成一个Node对象并加入到等待队列,再判断当前节点的前驱节点是否是头节点,是的话再尝试抢锁,如果抢锁成功则退出。否则用截止时间减去系统当前时间,算出线程剩余的抢锁时间(nanosTimeout)。如果剩余时间<=0的话,代表到达截止时间后线程依旧未占用锁,于是调用<3>处的代表将线程对应的Node节点从等待队列中移除,返回false表示抢锁失败。如果在后面的循环中如果发现当前时间大于截止时间,而线程还未获得锁,代表抢锁失败。
    如果剩余时间大于0,则会调用shouldParkAfterFailedAcquire(),如果前驱节点的状态状态为0,则将前驱节点的等待状态设置为-1表示其后继节点等待唤醒,然后在下一次循环的时候,shouldParkAfterFailedAcquire()判断前置节点的等待状态为-1,就有机会阻塞当前线程。但相比acquireQueued()不同的是,这里会判断剩余时间是否大于1000纳秒,如果剩余时间小于等于1000纳秒的话,这里就不会阻塞线程,而是用自旋的方式,直到抢锁成功,或者锁超时抢锁失败。如果自旋期间或者阻塞期间线程被中断,则会在<6>处抛出InterruptedException异常。

    public class ReentrantLock implements Lock, java.io.Serializable {
    	//...
        private final Sync sync;
    	//...
        public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        }
    	//...
    }
    
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
    	static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
    	//...
        public final boolean tryAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())//<1>
                throw new InterruptedException();
            return tryAcquire(arg) ||
                doAcquireNanos(arg, nanosTimeout);
        }
    	//...
        private boolean doAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (nanosTimeout <= 0L)
                return false;
            final long deadline = System.nanoTime() + nanosTimeout;//<2>
            final Node node = addWaiter(Node.EXCLUSIVE);
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        return true;
                    }
                    nanosTimeout = deadline - System.nanoTime();
                    if (nanosTimeout <= 0L) {
                        cancelAcquire(node);//<3>
                        return false;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&//<4>
                        nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)//<5>
                        LockSupport.parkNanos(this, nanosTimeout);
                    if (Thread.interrupted())
                        throw new InterruptedException();//<6>
                }
            } catch (Throwable t) {
                cancelAcquire(node);
                throw t;
            }
        }
    	//...
    }
    

      

    下面,我们来看看AQS是如何移除一个节点的,当要移除一个节点时,会先置空它的线程引用,再在<1>处遍历当前节点的前驱节点,直到找到有效节点(等待状态<=0),找到最靠近当前节点的有效节点pred后,再找到有效节点的后继节点predNext(pred.next),然后我们将当前节点的等待状态置为CANCELLED(1),如果当前节点是尾节点,则用CAS的方式设置尾节点为有效节点pred,如果pred成功设置为为节点,这里还会用CAS的方式设置pred的后继为null,因为尾节点不应该有后继节点,这里用CAS设置尾节点的后继节点是防止pred成为尾节点后,还未置空尾节点的后继节点前,又有新的节点入队成为新的尾节点,并且设置pred的后继节点为最新的尾节点。如果<2>出的代码执行成功,代表当前没有新的节点入队,如果执行失败,代表有新的节点入队,pred已经不是尾节点,且pred的后继已经被修改。
    如果node不是尾节点,或者在执行compareAndSetTail(node, pred)的时候,有新节点入队,tail引用指向的对象已经不是node本身,或者在执行compareAndSetTail()执行失败,则会进入<3>处的分支。进入<3>处的分支后,会先判断当前节点的前驱节点是不是头节点,如果是头节点的话,会进入<5>处的分支,唤醒当前节点的后继节点来竞争锁,如果当前节点的后继节点为null或者被取消的话,则会从尾节点开始遍历前驱节点,找到队列最前的有效节点,唤醒有效节点竞争锁。
    如果前驱节点不是头节点,且前驱节点的等待状态为SIGNAL或者前驱节点的等待状态为有效状态(waitStatus<=0)且成功设置前驱节点的等待状态为SIGNAL,再判断前驱节点前驱节点的thread引用是否为null,如果不为null则代表前驱节点还不是头节点或者尚未被取消,此时就可以进入<4>处的分支,这里如果判断当前节点的后继节点不为null或者尚未被取消,则用CAS的方式设置前驱节点的后继节点,为当前节点的后继节点。

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
    	//...
        private void cancelAcquire(Node node) {
            // Ignore if node doesn't exist
            if (node == null)
                return;
    
            node.thread = null;
    
            // Skip cancelled predecessors
            Node pred = node.prev;
            while (pred.waitStatus > 0)//<1>
                node.prev = pred = pred.prev;
    
            // predNext is the apparent node to unsplice. CASes below will
            // fail if not, in which case, we lost race vs another cancel
            // or signal, so no further action is necessary, although with
            // a possibility that a cancelled node may transiently remain
            // reachable.
            Node predNext = pred.next;
    
            // Can use unconditional write instead of CAS here.
            // After this atomic step, other Nodes can skip past us.
            // Before, we are free of interference from other threads.
            node.waitStatus = Node.CANCELLED;
    
            // If we are the tail, remove ourselves.
            if (node == tail && compareAndSetTail(node, pred)) {
                pred.compareAndSetNext(predNext, null);//<2>
            } else {//<3>
                // If successor needs signal, try to set pred's next-link
                // so it will get one. Otherwise wake it up to propagate.
                int ws;
                if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                     (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
                    pred.thread != null) {//<4>
                    Node next = node.next;
                    if (next != null && next.waitStatus <= 0)
                        pred.compareAndSetNext(predNext, next);
                } else {//<5>
                    unparkSuccessor(node);
                }
    
                node.next = node; // help GC
            }
        }
    	//...
    }
    

      

    先前我们说过,会存在后继节点指向被取消节点的情况,就是发生在cancelAcquire(Node node)方法里,下图是一个锁的等待队列,N1是队头,N1所对应的线程T1正占有锁进行资源访问,N2和N5调用lock()方法采用非计时阻塞请求锁,除非N2和N5对应的线程获取到锁,否则将永远阻塞;N3和N4调用tryLock(long timeout, TimeUnit unit)方法采用计时阻塞请求锁,如果超时对应的线程还未获取到锁,N3和N4将会从队列中移除,返回抢锁失败。

    我们假定N3和N4已经超时,要从队列中移除,看看并发场景下是如何出现有效节点的后继引用指向无效节点,这里笔者稍微简化cancelAcquire(Node node),我们只要专注可能出现有效节点指向无效节点的代码。

    假定N3和N4两个节点对应的线程是T3和T4,T3要从等待队列中移除N3,先获取N3的前驱节点pred(N3)为N2,N2是一个有效节点(waitStatus<=0),所以不需要在<1>处遍历,接着在<2>处获取N2的后继节点predNext(N2)为N3,再将N3的等待状态改为CANCELLED,此时T3挂起,T4开始执行。T4线程同样获取N4前驱节点pred(n4)为N3,然后发现N3的等待状态>0,会一直往前遍历到N2,所以N4的前驱节点pred(N4)会为N2,接着T4执行<2>处的代码,predNext(N2)也是N3,此时T4挂起,T3恢复执行。T3判断N3不是尾节点,于是进入分值<4>,T3判断N3的前驱节点N2不是头节点,N2的状态为为SIGNAL,且N2的thread字段不为空,表明N2既没有被取消,也不是头节点,于是进入<5>处的分值,这里获取N3的后继节点N4,由于N4对应的线程T4尚未执行到<3>处的代码,N4的等待状态依旧为SIGNAL,所以T3会进入<6>处的分支,将N2的后继节点指向N4。此时T4开始执行,将N4的等待状态改为CANCELLED,T4进行<4>处的分支,N4的前驱N2不是头节点,等待状态为SIGNAL,且N2的线程引用不为怒null,继而进入<5>处的分值,获取到N4的后继N5,N5不Wie空,且N5的等待状态<=0,进而进入到<6>分支,最后要用CAS的方式尝试将N2的后继节点设置为N5,但这里的设置一定会失败,因为此时N2的后继节点为N4,而T4原先获取到N2的后继节点为N3,出现了有效节点指向无效节点的情况。

    private void cancelAcquire(Node node) {
    	//...
    	Node pred = node.prev;
    	while (pred.waitStatus > 0)
    		node.prev = pred = pred.prev;//<1>
    	Node predNext = pred.next;//<2>
    	node.waitStatus = Node.CANCELLED;//<3>
    	if (node == tail && compareAndSetTail(node, pred)) {
    		//...
    	} else {//<4>
    		// If successor needs signal, try to set pred's next-link
    		// so it will get one. Otherwise wake it up to propagate.
    		int ws;
    		if (pred != head &&
    			((ws = pred.waitStatus) == Node.SIGNAL ||
    			 (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
    			pred.thread != null) {//<5>
    			Node next = node.next;
    			if (next != null && next.waitStatus <= 0)//<6>
    				pred.compareAndSetNext(predNext, next);
    		} else {
    			//...
    		}
    		//...
    	}
    }
    

      

    最后等待队列的布局如下所示,N2指向无效节点N4,而非N4的后继节点。同时这里也引出另一个问题,哪怕N4不是无效节点,在上面移除节点的代码中,只设置了N2的后继引用指向N4,却没设置N4的前驱引用指向N2,所以这里N4的前驱依旧指向N3。

    那么如果真的出现上述这样的情况,ReentrantLock是如何来修复这个队列呢?答案在释放锁的时候调用unparkSuccessor(Node node)。笔者先前在介绍这个方法时,就有意提到后继节点可能指向无效节点,当N1对应的线程使用完锁释放之后,N2对应的线程T2接着使用锁并释放锁,在N2释放锁的时候,发现N2的后继节点已经成为失效节点(waitStatus > 0),这里会从尾节点开始找到队列中最前面的有效节点,然后将其唤醒,这里也就是N5。

    private void unparkSuccessor(Node node) {
    	//...
    	Node s = node.next;
    	if (s == null || s.waitStatus > 0) {
    		s = null;
    		for (Node p = tail; p != node && p != null; p = p.prev)
    			if (p.waitStatus <= 0)
    				s = p;
    	}
    	if (s != null)
    		LockSupport.unpark(s.thread);
    }
    

      

    N5在被唤醒后,会调用shouldParkAfterFailedAcquire(Node pred, Node node)发现原先的前驱节点N4的等待状态处于被移除,会进入<1>处的分支,查找到最靠近自己的有效前驱节点,并将前驱节点的后继节点指向自己。这里也能回答我们先前的问题,为何在要移除一个节点时,只修改前驱节点的后继引用为被移除节点的后继,却不将被移除节点的后继节点的前驱引用,指向其前驱,因为在释放锁的时候,会唤醒头节点的后继,如果被唤醒的后继发现自己的前驱已经被移除,会往前查找最靠近自己的有效前驱,这里一般是头节点,接着将头节点的后继引用指向自己,再往后就是我们熟悉的流程了,如果前驱节点是头节点且抢锁成功,则退出acquireQueued()方法进行资源的访问,如果抢锁失败,则最多执行两次shouldParkAfterFailedAcquire()然后陷入阻塞,等待下一次的唤醒。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    	int ws = pred.waitStatus;
    	if (ws == Node.SIGNAL)
    		//...
    	if (ws > 0) {//<1>
    		/*
    		 * Predecessor was cancelled. Skip over predecessors and
    		 * indicate retry.
    		 */
    		do {
    			node.prev = pred = pred.prev;
    		} while (pred.waitStatus > 0);
    		pred.next = node;
    	} else {
    		//...
    	}
    	return false;
    }
    

      

    我们已经了解了lcok()、tryLock()以及tryLock(long timeout, TimeUnit unit),下面的lockInterruptibly()就变得尤为简单,lockInterruptibly()顾名思义也是无限期陷入阻塞,直到获得锁或者被中断,如果线程本身被中断,在抢锁时会直接抛出InterruptedException异常,否则就开始抢锁,如果抢锁成功则皆大欢喜,抢锁失败则执行doAcquireInterruptibly(int arg),这个方法相信不需要笔者做过多的介绍,基本上很多步骤和方法在上面已经介绍过了。将线程封装成Node节点并入队,判断Node节点的前驱是否是头节点,是的话则试图抢锁,如果不是的话则最多循环两次执行shouldParkAfterFailedAcquire(),然后陷入阻塞,直到前驱节点成为头节点并释放锁将其唤醒,或者线程被中断,抛出InterruptedException异常。

    public class ReentrantLock implements Lock, java.io.Serializable {
    	//...
    	private final Sync sync;
    	//...
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
    	//..
    }
    
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
    	//...
        public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        }
    	//...
        private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.EXCLUSIVE);
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        return;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } catch (Throwable t) {
                cancelAcquire(node);
                throw t;
            }
        }
    	//...
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    	//...
    }
    

      

    最后,笔者就简单介绍下公平锁(FairSync)的tryAcquire(int acquires)方法,如果我们查看ReentrantLock的源码会发现,在执行lock()、tryLock(long timeout, TimeUnit unit)和lockInterruptibly(),这几个方法最终都会调用到AQS对应的三个方法:acquire(int arg)、tryAcquireNanos(int arg, long nanosTimeout)、acquireInterruptibly(int arg),这三个方法在抢锁的时候会优先执行子类实现的tryAcquire(int arg)方法,也就是公平锁(FairSync)或者非公平锁(NonfairSync)实现的tryAcquire(int arg)方法,抢锁失败再执行AQS自身实现的入队、阻塞方法。

    下面,我们来看下公平锁实现的tryAcquire(int acquires)方法,其实这个方法的实现也非常简单,先判断锁目前是否处于无主状态,是的话再判断队列中是否有等待线程,确认锁是无主状态且队列中没有等待线程,便开始尝试抢锁,抢锁成功则直接返回。公平锁相较于非公平锁的抢锁逻辑,也仅仅是多了一步而已,判断锁是否无主,是的话再判断队列中是否有等待线程,不像非公平锁,只要判断是无主线程便不再查看等待队列,直接尝试抢锁。

        static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
            /**
             * Fair version of tryAcquire.  Don't grant access unless
             * recursive call or no waiters or is first.
             */
            @ReservedStackAccess
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
    			//如果state为0,且队列中没有等待线程,则尝试抢锁
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    

      

  • 相关阅读:
    Codis的源码编译生成tar包
    Jenkins安装war版本
    Eclipse中src/main/resources配置文件启动问题
    关于Web项目的pom文件处理
    spark streaming的理解和应用
    Azkaban安装
    Mysql安装
    Oracle递归操作
    WebService 入门
    BaseAction 类
  • 原文地址:https://www.cnblogs.com/beiluowuzheng/p/14944477.html
Copyright © 2011-2022 走看看