zoukankan      html  css  js  c++  java
  • 并发编程实践三:Condition

    Condition实例始终被绑定到一个锁(Lock)上。Lock替代了Java的synchronized方法,而Condition则替代了Object的监视器方法,包含wait、notify和notifyAll(想很多其它的了解能够看我的博客:Java并发编程3-等待、通知和中断)。而在Condition中相应为await、signal和signalAll。这篇文章主要讲述Condition的用法。以及它的实现机制。

    Condition的使用

    与Object的监视器方法不同。每一个Lock能够相应多个Condition对象,这样等待的线程就能够分散到多个等待集合中。就能够针对不同的等待集合来依次唤醒线程。实现唤醒效率的提高(不再须要唤醒全部线程)。看以下的样例:

    public class BoundedBuffer {
    	final Lock lock = new ReentrantLock();
    	final Condition notFull = lock.newCondition();
    	final Condition notEmpty = lock.newCondition();
    
    	final Object[] items = new Object[100];
    	int putptr, takeptr, count;
    
    	public void put(Object x) throws InterruptedException {
    		lock.lock();
    		try {
    			while (count == items.length)
    				notFull.await();
    			items[putptr] = x;
    			if (++putptr == items.length)
    				putptr = 0;
    			++count;
    			notEmpty.signal();    //唤醒一个take线程
    		} finally {
    			lock.unlock();
    		}
    	}
    
    	public Object take() throws InterruptedException {
    		lock.lock();
    		try {
    			while (count == 0)
    				notEmpty.await();
    			Object x = items[takeptr];
    			if (++takeptr == items.length)
    				takeptr = 0;
    			--count;
    			notFull.signal();   //唤醒一个put线程
    			return x;
    		} finally {
    			lock.unlock();
    		}
    	}
    }


    以下我们来看看Condition的主要方法:

    await

    造成当前线程在接到信号或被中断之前一直处于等待状态。
    与此Condition相关的锁以原子方式释放,而且出于线程调度的目的,将禁用当前线程,且在发生下面四种情况之中的一个曾经,当前线程将一直处于休眠状态:
     1)其它某个线程调用此Condition的signal()方法,而且碰巧将当前线程选为被唤醒的线程。或者
     2)其它某个线程调用此Condition的signalAll()方法。或者
     3)其它某个线程中断当前线程,且支持中断线程的挂起;或者
     4)已超过指定的等待时间。或者
     5)发生“虚假唤醒”。


    在全部情况下。在此方法返回到当前线程前。都必须又一次获取与此条件有关的锁。
    await支持无參数版本号(一直等待)、带时间參数的版本号(仅仅等待指定时间或等待至某个时间)和支持不可中断的等待。

    signal

    唤醒一个等待线程。


    假设全部的线程都在等待此条件。则选择当中的一个唤醒。在从 await 返回之前,该线程必须又一次获取锁。

    signalAll

    唤醒全部等待线程。
    假设全部的线程都在等待此条件,则唤醒全部线程。在从 await 返回之前,每一个线程都必须又一次获取锁。

    在使用Condition时,须要注意的是Condition的实例本身也是一个Object,也带有wait、notify和notifyAll方法,注意不要搞混。

    Condition的实现

    AbstractQueuedLongSynchronizer.ConditionObject是Condition的详细实现类,使用了一个FIFO队列来保存等待的线程,await将一个线程放入等待队列中,signal每次唤醒等待时间最长的线程(而notify则是随意唤醒一个线程)。signalAll则唤醒全部等待线程。等待队列的节点使用和AQS的队列同样的节点(见上一篇:“并发编程实践二:AbstractQueuedSynchronizer”),队列的head和tail的定义例如以下:

    public class ConditionObject implements Condition, java.io.Serializable {
    	private transient Node firstWaiter;
    	private transient Node lastWaiter;
    	
    	。。。

    。。。 }


     

    和AQS不同的是,ConditionObject使用nextWaiter指向下一个节点(AQS中使用prev和next),而且waitStatus属性值为Node.CONDITION。

    当一个线程获取了锁后,它能够调用该锁相应的Condition的await方法将自己堵塞:
     1)假设当前线程被中断,则抛出中断异常;
     2)将当前线程放置到Condition的等待队列中;
     3)释放当前线程的锁,而且保存锁定状态;
     4)在收到信号、中断或超时前,一直堵塞;
     5)使用保存的锁定状态又一次获取锁;
     6)假设步骤4的堵塞过程中发生中断,则抛出中断异常。

    public final void await() throws InterruptedException {
    	if (Thread.interrupted())  //1
    		throw new InterruptedException();
    	Node node = addConditionWaiter();  //2
    	int savedState = fullyRelease(node); //3
    	int interruptMode = 0;
    	while (!isOnSyncQueue(node)) {     //4
    		LockSupport.park(this);
    		if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    			break;
    	}
    	if (acquireQueued(node, savedState) && interruptMode != THROW_IE)  //5
    		interruptMode = REINTERRUPT;
    	if (node.nextWaiter != null) // clean up if cancelled
    		unlinkCancelledWaiters();
    	if (interruptMode != 0)  //6
    		reportInterruptAfterWait(interruptMode);
    }


     

    整个过程并不复杂,须要注意的是堵塞须要放在一个循环中。防止“虚假唤醒”,之所以要保存锁定状态,是为了使用排它模式来获取锁。

    线程能够调用signal来将当前Condition的等待队列中的第一个节点移动到拥有锁的等待队列:
     1)假设不是排它模式。则抛出IllegalMonitorStateException异常。
     2)将等待队列的第一个节点出队列,并将其增加AQS的锁队列。

    public final void signal() {
    	if (!isHeldExclusively()) //1
    		throw new IllegalMonitorStateException();
    	Node first = firstWaiter;
    	if (first != null)
    		doSignal(first);  //2
    }
    private void doSignal(Node first) {
    	do {
    		if ( (firstWaiter = first.nextWaiter) == null)
    			lastWaiter = null;
    		first.nextWaiter = null;
    	} while (!transferForSignal(first) &&
    			 (first = firstWaiter) != null);
    }
    final boolean transferForSignal(Node node) {
    	if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    		return false;
    	Node p = enq(node);
    	int ws = p.waitStatus;
    	if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    		LockSupport.unpark(node.thread);
    	return true;
    }


    将因为signal总是从队列的第一个节点開始处理。因此总是能够保持唤醒的次序。
    signal一開始就运行isHeldExclusively推断是否为排它模式,在ReentrantLock中的实现例如以下:

    protected final boolean isHeldExclusively() {
    	return getExclusiveOwnerThread() == Thread.currentThread();
    }


    也就是当当前线程为锁的拥有者时。才继续运行。而在transferForSignal中,假设节点的waitStatus不是CONDITION,那么就仅仅会是CANCELLED(在await操作中运行fullyRelease时。假设失败会将节点的waitStatus设置到CANCELLED);enq将节点增加AQS的堵塞队列,返回节点的前续节点,当前续节点被取消(ws > 0),或者更改状态失败(这里同意失败,失败后被唤醒的线程在acquireQueued中会再次设置前续节点的状态,直到成功)后,将运行唤醒线程的操作。

    线程也能够调用signalAll将全部线程从此Condition的等待队列移动到拥有锁的等待队列。

    public final void signalAll() {
    	if (!isHeldExclusively())
    		throw new IllegalMonitorStateException();
    	Node first = firstWaiter;
    	if (first != null)
    		doSignalAll(first);
    }
    private void doSignalAll(Node first) {
    	lastWaiter = firstWaiter = null;
    	do {
    		Node next = first.nextWaiter;
    		first.nextWaiter = null;
    		transferForSignal(first);
    		first = next;
    	} while (first != null);
    }


    signalAll在doSignalAll中依次调用transferForSignal将Condition的等待队列中的全部节点移动到锁的等待队列中。

    结束语

    Condition在设计时就充分考虑了Object的监视器方法的缺陷。一个lock能够相应多个Condition,从而能够使线程分散到多个等待队列中,使应用更为灵活,而且在实现上使用了FIFO队列来保存等待线程,确保了能够做到使用signal按FIFO方式唤醒等待线程。避免每次唤醒全部线程导致数据竞争。
    Condition这种设计相同也导致使用上要比Object的监视器方法更为复杂,你须要考虑使用多少个Condition。在什么地方使用哪个condition等等?因为Condition是和Lock配合使用的。所以是否使用Condition须要和Lock一起综合考虑。

  • 相关阅读:
    android-exploitme(六):基础加密
    错误:error libGL.so: cannot open shared object file: No such file or directory
    android-exploitme(五):不安全的数据存储
    android-exploitme(四):参数篡改
    android-exploitme(三):安全连接
    android-exploitme(二):安装apk熟悉测试环境
    android-exploitme(一):生成apk
    Ubuntu rsync同步
    phantomjs + selenium headless test
    Fatal error: cannot allocate memory for the buffer pool
  • 原文地址:https://www.cnblogs.com/mengfanrong/p/5109351.html
Copyright © 2011-2022 走看看