一、Condition作用是什么
主要功能和Object中的wait、notify功能相对应,使某个线程 在某种情况下等待和唤醒的功能。
二、使用实例
1)实例1,单一生产者和消费者使用,并且只用一个condition对象控制生产者和消费者
注意:单一condition对象,生产者和消费者都在一个队列中排队,那么这个时候下一个被唤醒的可能是消费者也可能是生产者,那么如果生产者需要唤醒消费者但是被唤醒的确实生产者,则需要生产者继续阻塞排队(使用循环判断模式)
package com.test.lock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.LinkedList; import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ConditionTest4 { static Logger logger = LoggerFactory.getLogger(ConditionTest4.class ) ; public static int getRandom(){ Random random = new Random() ; int rid = random.nextInt(6); int val = rid+2; logger.info(String.format("生成随机数:%d", val )); return val; } //睡眠打印日志更易于观察 public static void waitN(Integer n ){ try { TimeUnit.SECONDS.sleep(n); } catch (InterruptedException e) { e.printStackTrace(); } return ; } public static void main(String[] args) throws InterruptedException { Lock lock = new ReentrantLock(true) ; Condition condition = lock.newCondition(); //队列允许的最大值 Integer count=10; //数据等待的队列 LinkedList<Integer> list = new LinkedList<>() ; Thread producter = new Thread(()->{ while (true){//保持生产者一直可以生产 lock.lock(); try { while(list.size()==count){//保证 condition被叫醒以后,集合中不是满的 try { logger.info("1111111111生产者进入等待队列"); waitN(2);//睡眠,使测试更容易被观看结果 condition.await(); logger.info("111111111生产者被唤醒"); } catch (InterruptedException e) { e.printStackTrace(); } } logger.info("111111111111生产者可以生产了"); waitN(2); list.add( getRandom()); condition.signal(); }finally { lock.unlock(); } } },"producter") ; producter.start(); Thread consumer = new Thread(()->{ while (true){ lock.lock(); while(list.size()==0){//每次唤醒判断如果是空的则继续排队 try { logger.info("22222222消费者进入等待队列"); waitN(5); condition.await(); logger.info("222222222消费者被唤醒"); } catch (InterruptedException e) { e.printStackTrace(); } } logger.info("2222222222消费者可以消费了"); waitN(2); int val =list.pop(); logger.info(String.format("2222222消费者可以消费了 %d ", val )); condition.signal(); //解锁一般放到finnaly中 lock.unlock(); } },"consumer") ; consumer.start(); } }
2)实例2,多个生产者和多个消费者使用,生产者和消费者分别使用自己的condition对象
当生产者的线程大于消费者的线程,则最终队列会在 满的地方徘徊,
package com.test.lock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.LinkedList; import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ConditionTest5 { static Logger logger = LoggerFactory.getLogger(ConditionTest5.class ) ; public static int getRandom(){ Random random = new Random() ; int rid = random.nextInt(6); int val = rid+2; logger.info(String.format("生成随机数:%d", val )); return val; } //睡眠打印日志更易于观察 public static void waitN(Integer n ){ try { TimeUnit.SECONDS.sleep(n); } catch (InterruptedException e) { e.printStackTrace(); } return ; } public static void main(String[] args) throws InterruptedException { //true 公平锁,这样生产者和消费者才能交替执行 Lock lock = new ReentrantLock(true) ; //2个condition对象,一个在生产者中等待,一个在消费者中等待 Condition produCondition = lock.newCondition(); Condition consuCondition = lock.newCondition(); Integer count=10; LinkedList<Integer> list = new LinkedList<>() ; for (int i = 0; i < 4 ; i++) { Thread producter = new Thread(()->{ while (true){//保持生产者一直可以生产 lock.lock(); while(list.size()==count){//保证 condition被叫醒以后,集合中不是满的 try { logger.info("1111111111生产者进入等待队列"); waitN(2); produCondition.await(); logger.info("111111111生产者被唤醒"); } catch (InterruptedException e) { e.printStackTrace(); } } logger.info("111111111111生产者可以生产了, {}" , list.toString()); waitN(2); list.add( getRandom()); consuCondition.signal(); lock.unlock(); } },"producter"+i) ; producter.start(); } for (int i = 0; i < 2; i++) { Thread consumer = new Thread(()->{ while (true){ lock.lock(); while(list.size()==0){ try { logger.info("22222222消费者进入等待队列"); waitN(5); consuCondition.await(); logger.info("222222222消费者被唤醒"); } catch (InterruptedException e) { e.printStackTrace(); } } logger.info("2222222222消费者可以消费了"); waitN(2); int val =list.pop(); logger.info(String.format("2222222消费者可以消费了 %d , %s ", val , list.toString() )); produCondition.signal(); lock.unlock(); } },"consumer"+i ) ; consumer.start(); } } }
三、源码主要方法解读
Condition这个类是AQS的内部类,通过lock进行实例化的,当我们使用condition对象内部方法的时候,是可以使用lock对象的属性的。
1)await方法
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//当前线程进入等待队列
Node node = addConditionWaiter();
long savedState = fullyRelease(node);
int interruptMode = 0;
//判断节点是否在lock的等待队列中,如果已经不在了则这个线程等待
while (!isOnSyncQueue(node)) {
//如果没在lock的等待队列中,则当前线程睡眠
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
2)当前线程入队方法
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. //如果队列中的状态不是CONDITION,则清除掉这个节点 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 形成一个节点 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) //如果第一个线程进入等待队列,则当前节点为第一个节点 firstWaiter = node; else //如果不是第一个进入等待,则最后一个节点的next指向当前节点 t.nextWaiter = node; //当前线程的节点变成最后的节点 lastWaiter = node; //返回当前线程的节点 return node; }
3)线程等待了,则会释放当前线程持有的锁
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); //调用外部类对象lock的方法进行锁释放 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { //如果释放锁失败则当前线程的节点状态为取消 if (failed) node.waitStatus = Node.CANCELLED; } }
4) 当前节点是否在lock的等待队列中
final boolean isOnSyncQueue(Node node) { // 当前节点状态为CONDITION ,并且是head节点(节点为head节点: node.prev == null) if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ return findNodeFromTail(node); }
5)从lock的队列的尾部 向前查找是否可以找到这个节点,找到则返回true,找到head节点还没找到
private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; //找到head节点还是没有,则返回false,head节点的prev是null if (t == null) return false; t = t.prev; } }
6)唤醒睡眠的线程
public final void signal() { //必须是当前获取锁的线程来执行 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) //唤醒队列中第一个线程 doSignal(first); }
7)唤醒第一个线程,清除第一个node对应的next信息
private void doSignal(Node first) { do { //如果只有一个节点,则清除lastWaiter ,如果first唤醒失败,则把first的next 赋值给firstWaiter ,在此执行唤醒 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; //清除nextWaiter信息 first.nextWaiter = null; } while (!transferForSignal(first) && //去唤醒第一个节点 (first = firstWaiter) != null); // 判断first!= null }
8 唤醒第一个节点去获取锁
final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ //设置节点状态 从 CONDITION 变为 0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) //如果失败,则直接返回 return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ //节点放到lock队列中去抢锁 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 //节点状态为1 ,被取消了 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //设置线程节点状态为SIGNAL,并判断成功与失败 //线程在上一个步骤已经放到了lock队列中,这里只是设置一下node状态,实际上在lock.unlock步骤会从队列中唤醒的 LockSupport.unpark(node.thread); return true; }