zoukankan      html  css  js  c++  java
  • condition学习小结

    参考 https://www.cnblogs.com/go2sea/p/5630355.html
    建议先了解AQS相关知识,可以参考 https://www.cnblogs.com/lllliuxiaoxia/p/15769401.html

    condition创建

    Condition在JUC框架下提供了传统Java监视器风格的wait、notify和notifyAll相似的功能。
    Condition必须被绑定到一个独占锁上使用。ReentrantLock中获取Condition的方法为:

    public Condition newCondition() {
    	return sync.newCondition();
    }
    

    查看AQS的newCondition()方法,实际调用:

    final ConditionObject newCondition() {
    	return new ConditionObject();
    }
    

    直接初始化并返回了一个AQS提供的ConditionObject对象。因此,Condition实际上是AQS框架的内容。ConditionObject通过维护firstWaiter和lastWaiter来维护Condition等待队列。
    通过signal操作将Condition等待队列中的线程移到Sync锁等待队列

    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
    

    await解析

    firstWaiter和lastWaiter分别是Condition队列的头结点和尾节点。
    Condition在调用await方法之前,必须先获取锁,注意,这个锁必须是一个独占锁。
    我们先来看一下await中用到的几个方法:

    • addConditionWaiter
      此方法在Condition队列中添加一个等待线程:
    private Node addConditionWaiter() {
    	Node t = lastWaiter;
    	// If lastWaiter is cancelled, clean out.
    	if (t != null && t.waitStatus != Node.CONDITION) {
    		unlinkCancelledWaiters();
    		t = lastWaiter;
    	}
    	Node node = new Node(Thread.currentThread(), Node.CONDITION);
    	if (t == null)
    		firstWaiter = node;
    	else
    		t.nextWaiter = node;
    	lastWaiter = node;
    	return node;
    }
    

    方法先检查一下队列尾节点状态是否还是Condition(如果被signal或者中断,waitStatus会被修改为0或者CANCELLED)。如果尾节点被取消或者中断,调用unlinkCancelledWaiters方法删除Condition队列中被cancel的节点,相当于对等待队列进行一次清扫。然后将当前线程封装在一个Node中,添加到Condition队列的尾部。这里由于我们在操纵Condition队列的时候已经获取了一个独占锁,因此不会发生竞争。
    值得注意的是,Condition队列与Sync队列(锁等待队列)有几点不同:①Condition队列是一个单向链表,而Sync队列是一个双向链表;②Sync队列在初始化的时候,会在队列头部添加一个空的dummy节点,它不持有任何线程,而Condition队列初始化时,头结点就开始持有等待线程了。
    我们有必要在这里提一下Node对象中的nextWaiter成员、SHARED成员和EXCLUSIVE成员:

    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;
    
    Node nextWaiter;
    

    nextWaiter在共享模式下,被设置为SHARED,SHARED为一个final的空节点,用来表示当前模式是共享模式;默认情况下nextWaiter是null,EXCLUSIVE成员是一个final的null,因此默认模式是独占模式。在Condition队列中nextWaiter被用来指向队列里的下一个等待线程。在一个线程从Condition队列中被移除之后,nextWaiter被设置为空(EXCLUSIVE)。这再次表明:Condition必须被绑定在一个独占锁上使用。

    • unlinkCancelledWaiters
    private void unlinkCancelledWaiters() {
    	Node t = firstWaiter;
    	Node trail = null;
    	while (t != null) {
    		Node next = t.nextWaiter;
    		if (t.waitStatus != Node.CONDITION) {
    			t.nextWaiter = null;
    			if (trail == null)
    				firstWaiter = next;
    			else
    				trail.nextWaiter = next;
    			if (next == null)
    				lastWaiter = trail;
    		}
    		else
    			trail = t;
    		t = next;
    	}
    }
    

    unlinkCancelledWaiters方法很简单,从头到尾遍历Condition队列,移除状态不为CONDITION的节点(被cancel或被中断的节点)。由于这里我们在操纵Condition队列的时候已经获取了所绑定的独占锁,因此不用担心竞争的发生。

    • fullyRelease
      我们再来看一下fullyRelease方法,这个方法用来释放锁:
    final int fullyRelease(Node node) {
    	boolean failed = true;
    	try {
    		int savedState = getState();
    		if (release(savedState)) {
    			failed = false;
    			return savedState;
    		} else {
    			throw new IllegalMonitorStateException();
    		}
    	} finally {
    		if (failed)
    			node.waitStatus = Node.CANCELLED;
    	}
    }
    

    方法首先获取了state的值,这个值表示可锁被“重入”深度,并调用release释放全部的重入获取,如果成功,返回这个深度,如果失败,要将当前线程的waitStatus设置为CANCELLED。

    • isOnSyncQueue
      我们再来看一下isOnSyncQueue方法,这个方法返节点是否在Sync队列中等待锁:
    final boolean isOnSyncQueue(Node node) {
    	if (node.waitStatus == Node.CONDITION || node.prev == null)
    		return false;
    	if (node.next != null) // If has successor, it must be on queue
    		return true;
    	/*
    	 * node.prev can be non-null, but not yet on queue because
    	 * the CAS to place it on queue can fail. So we have to
    	 * traverse from tail to make sure it actually made it.  It
    	 * will always be near the tail in calls to this method, and
    	 * unless the CAS failed (which is unlikely), it will be
    	 * there, so we hardly ever traverse much.
    	 */
    	return findNodeFromTail(node);
    }
    

    node从Condition队列移除的第一步,就是设置waitStatus为其他值,因此是否等于Node.CONDITON可以作为判断标志,如果等于,说明还在Condition队列中,即不再Sync队列里。在node被放入Sync队列时,第一步就是设置node的prev为当前获取到的尾节点,所以如果发现node的prev为null的话,可以确定node尚未被加入Sync队列。
    相似的,node被放入Sync队列的最后一步是设置node的next,如果发现node的next不为null,说明已经完成了放入Sync队列的过程,因此可以返回true。
    当我们执行完两个if而仍未返回时,node的prev一定不为null,next一定为null,这个时候可以认为node正处于放入Sync队列的执行CAS操作执行过程中。而这个CAS操作有可能失败,因此我们再给node一次机会,调用findNodeFromTail来检测:

    • findNodeFromTail
    private boolean findNodeFromTail(Node node) {
    	Node t = tail;
    	for (;;) {
    		if (t == node)
    			return true;
    		if (t == null)
    			return false;
    		t = t.prev;
    	}
    }
    

    findNodeFromTail方法从尾部遍历Sync队列,如果检查node是否在队列中,如果还不在,此时node也许在CAS自旋中,在不久的将来可能会进到Sync队列里。但我们已经等不了了,直接放回false。

    • checkInterruptWhileWaiting
    /** Mode meaning to reinterrupt on exit from wait */
    private static final int REINTERRUPT =  1;
    /** Mode meaning to throw InterruptedException on exit from wait */
    private static final int THROW_IE    = -1;
    
    /**
     * Checks for interrupt, returning THROW_IE if interrupted
     * before signalled, REINTERRUPT if after signalled, or
     * 0 if not interrupted.
     */
    private int checkInterruptWhileWaiting(Node node) {
    	return Thread.interrupted() ?
    		(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
    		0;
    }
    

    此方法在线程从park中醒来后调用,它的返回值有三种:0代表在park过程中没有发生中断;THORW_IE(1)代表发生了中断,且在后续我们需要抛出中断异常;REINTERRUPT(-1)表示发生了中断,但在后续我们不抛出中断异常,而是“补上”这次中断。当没有发生中断时,我们返回0即可,当中断发生时,返回THROW_IE or REINTERRUPT由transferAfterCancelledWait方法判断:

    • transferAfterCancelledWait
    final boolean transferAfterCancelledWait(Node node) {
    	if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
    		enq(node);
    		return true;
    	}
    	/*
    	 * If we lost out to a signal(), then we can't proceed
    	 * until it finishes its enq().  Cancelling during an
    	 * incomplete transfer is both rare and transient, so just
    	 * spin.
    	 */
    	while (!isOnSyncQueue(node))
    		Thread.yield();
    	return false;
    }
    

    transferAfterCancelledWait方法并不在ConditionObject中定义,而是由AQS提供。这个方法根据是否中断发生时,是否有signal操作来“掺和”来返回结果。方法调用CAS操作将node的waitStatus从CONDITION设置为0,如果成功,说明当中断发生时,说明没有signal发生(signal的第一步是将node的waitStatus设置为0),在调用enq将线程放入Sync队列后直接返回true,表示中断先于signal发生,即中断在await等待过程中发生,根据await的语义,在遇到中断时需要抛出中断异常,返回true告诉上层方法返回THROW_IT,后续会根据这个返回值做抛出中断异常的处理。此时的变化流程为(中断->interrupted检测->CAS)

    如果CAS操作失败,是否说明中断后于signal发生呢?只能说这时候我们不能确定中断和signal到底谁先发生,只是在我们做CAS操作的时候,他们俩已经都发生了(中断->interrupted检测->signal->CAS,或者signal->中断->interrupted检测->CAS都有可能),这时候我们无法判断到底顺序是怎样,这里的处理是不管怎样都返回false告诉上层方法返回REINTERRUPT,当做是signal先发生(线程被signal唤醒)来处理,后续根据这个返回值做“补上”中断的处理。在返回false之前,我们要先做一下等待,直到当前线程被成功放入Sync锁等待队列。因为这里认为是先signal后中断,所以把node放到Sync同步队列上。

    因此,我们可以这样总结:transferAfterCancelledWait的返回值表示了线程是否因为中断从park中唤醒。

    • await
    public final void await() throws InterruptedException {
    	if (Thread.interrupted())
    		throw new InterruptedException();
    	Node node = addConditionWaiter();
    	int savedState = fullyRelease(node);
    	int interruptMode = 0;
    	while (!isOnSyncQueue(node)) {
    		LockSupport.park(this);
    		if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    			break;
    	}
    	if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    		interruptMode = REINTERRUPT;
    	if (node.nextWaiter != null) // clean up if cancelled
    		unlinkCancelledWaiters();
    	if (interruptMode != 0)
    		reportInterruptAfterWait(interruptMode);
    }
    

    await方法是及时响应中断的。它首先检查了一下中断标志。然后调用addConditionWaiter将当前线程放入Condition队列的尾,并顺手清理了一下队列里的无用节点。紧接着调用fullyRelease方法释放当前线程持有的锁。然后是一个while循环,这个循环会循环检测线程的状态,直到线程被signal或者中断唤醒且被放入Sync锁等待队列。如果中断发生的话,还需要调用checkInterruptWhileWaiting方法,根据中断发生的时机确定后去处理这次中断的方式,如果发生中断,退出while循环。

    退出while循环后,我们调用acquireQueued方法来获取锁,注意,acquireQueued方法的返回值表示在等待获取锁的过程中是否发生中断,如果发生中断 且 原来没有需要做抛出处理的中断发生时,我们将后续处理方式设置为REINTERRUPT(如果原来在await状态有中断发生,即interrruptMode==THROW_IE,依然保持THROW_IE)。

    如果是应为中断从park中唤醒(interruptMode==THROT_IE),当前线程仍在Condition队列中,但waitStatus已经变成0了,这里在调用unlinkCancelledWaiters做一次清理。

    最后,根据interruptMode的值,调用reportInterruptAfterWait做出相应处理:

    private void reportInterruptAfterWait(int interruptMode)
    	throws InterruptedException {
    	if (interruptMode == THROW_IE)
    		throw new InterruptedException();
    	else if (interruptMode == REINTERRUPT)
    		selfInterrupt();
    }
    

    如果interruptMod==0,donothing,如果是THROW_IE,说明在await状态下发生中断,抛出中断异常,如果是REINTERRUPT,说明是signal“掺和”了中断,我们无法分辨具体的先后顺序,于是统一按照先signal再中断来处理,即成功获取锁之后要调用selfInterrupt“补上”这次中断。

    signal

    signal 唤醒Condition队列的头节点持有的线程

    public final void signal() {
    	if (!isHeldExclusively())
    		throw new IllegalMonitorStateException();
    	Node first = firstWaiter;
    	if (first != null)
    		doSignal(first);
    }
    

    调用signal之前也需要获取锁,因此signal方法首先检测了一下当前线程是否获取了独占锁。然后调用doSignal唤醒队列中第一个等待线程。注意,这里的“唤醒”意思是将线程从Condition队列移到Sync队列,表示已经完成Condition的等待,具有了去竞争锁的资格。至此,我们可以发现,由于await会直接把线程放入Condition等待队列的尾部,因此Condition是公平的,即按照入列的顺序来signal。

    private void doSignal(Node first) {
    	do {
    		if ( (firstWaiter = first.nextWaiter) == null)
    			lastWaiter = null;
    		first.nextWaiter = null;
    	} while (!transferForSignal(first) &&
    			 (first = firstWaiter) != null);
    }
    

    doSignal方法先将first节点从队列中摘下,然后调用transferForSignal去改变first节点的waitStatus(所谓唤醒线程),这个方法有可能失败,因为等待线程可能已经到时或者被中断,因此while循环这个操作直到成功唤醒或队列为空。我们来看下transferForSignal方法:

    final boolean transferForSignal(Node node) {
    	/*
    	 * If cannot change waitStatus, the node has been cancelled.
    	 */
    	if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    		return false;
    
    	/*
    	 * Splice onto queue and try to set waitStatus of predecessor to
    	 * indicate that thread is (probably) waiting. If cancelled or
    	 * attempt to set waitStatus fails, wake up to resync (in which
    	 * case the waitStatus can be transiently and harmlessly wrong).
    	 */
    	Node p = enq(node);
    	int ws = p.waitStatus;
    	if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    		LockSupport.unpark(node.thread);
    	return true;
    }
    

    这个方法并不在ConditionObject中定义,而是由AQS提供。方法首先调用CAS操作修改node的waitStatus,如果失败,表示线程已经放弃等待(到时或被中断),直接返回false。如果成功,调用enq方法将它放入Sync锁等待队列,返回值p是node在Sync队列中的前驱节点。检测一下前驱p的waitStatus,如果waitStatus为CANCELLED或者无法将前驱p的waitStatus设为SIGNAL,则直接唤醒node持有的线程,即unpark,否则doSignal方法循环退出,node等待前驱p唤醒。这里必须搞清楚,node线程是在哪里park的,显然,他还在await方法的那个while循环里。unpark之后,node线程将会从while循环中退出,然后去调用acquireQueued方法,这个方法是一个自旋,弄得线程会在自旋过程中清除已经为CANCELLED状态的前驱,然后注册前驱节点的waitStatus为SIGNAL。

    至此,signal方法已经完成了所有该做的,“唤醒”的线程已经成功加入Sync队列并已经参与锁的竞争了,返回true。

    signalAll

    signalAll 唤醒Condition队列的所有等待线程

    public final void signalAll() {
    	if (!isHeldExclusively())
    		throw new IllegalMonitorStateException();
    	Node first = firstWaiter;
    	if (first != null)
    		doSignalAll(first);
    }
    

    signalAll方法同样先检测是否持有独占锁,然后对奥用doSignalAll方法:

    private void doSignalAll(Node first) {
    	lastWaiter = firstWaiter = null;
    	do {
    		Node next = first.nextWaiter;
    		first.nextWaiter = null;
    		transferForSignal(first);
    		first = next;
    	} while (first != null);
    }
    

    doSignalAll方法循环调用transferForSignal方法“唤醒”队列的头结点,直到队列为空。

    总结:ConditionObject由AQS提供,它实现了类似wiat、notify和notifyAll类似的功能。Condition必须与一个独占锁绑定使用,在await或signal之前必须现持有独占锁。Condition队列是一个单向链表,他是公平的,按照先进先出的顺序从队列中被“唤醒”,所谓唤醒指的是完成Condition对象上的等待,被移到Sync锁等待队列中,有参与竞争锁的资格(Sync队列有公平&非公平两种模式,注意区别)。

  • 相关阅读:
    2022北航软件研究生入学考试991考试大纲-数据结构与C
    pgsql 学习
    Java开发必须掌握的 20+ 种 Spring 常用注解
    Spring 学习总结
    Spring MVC快速入门教程
    spring boot与spring mvc的区别是什么?
    Java知识体系最强总结(2020版)
    arthas(阿尔萨斯)使用实践----查看慢方法 /方法耗时等
    JVM --------jmap-----查看堆内存信息、生成heap dump 文件(转储堆内存快照到指定文件)
    [JVM】jstat命令详解---JVM的统计监测工具:jstat 堆内存各部分的使用量,以及加载类的数量。
  • 原文地址:https://www.cnblogs.com/lllliuxiaoxia/p/15772582.html
Copyright © 2011-2022 走看看