zoukankan      html  css  js  c++  java
  • Condition的使用与源码进阶

     
    一、Condition作用是什么
      主要功能和Object中的wait、notify功能相对应,使某个线程 在某种情况下等待和唤醒的功能。
     
     
    二、使用实例
    1)实例1,单一生产者和消费者使用,并且只用一个condition对象控制生产者和消费者
    注意:单一condition对象,生产者和消费者都在一个队列中排队,那么这个时候下一个被唤醒的可能是消费者也可能是生产者,那么如果生产者需要唤醒消费者但是被唤醒的确实生产者,则需要生产者继续阻塞排队(使用循环判断模式)
     
     
    package com.test.lock;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.LinkedList;
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ConditionTest4 {
    
        static Logger logger = LoggerFactory.getLogger(ConditionTest4.class ) ;
    
        public static int getRandom(){
            Random random = new Random() ;
            int rid = random.nextInt(6);
            int val =  rid+2;
            logger.info(String.format("生成随机数:%d", val ));
            return  val;
        }
    
    
        //睡眠打印日志更易于观察
        public static void waitN(Integer n ){
            try {
                TimeUnit.SECONDS.sleep(n);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return   ;
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            Lock lock = new ReentrantLock(true) ;
            Condition condition = lock.newCondition();
    
            //队列允许的最大值
            Integer count=10;
           //数据等待的队列
            LinkedList<Integer> list = new LinkedList<>() ;
    
    
            Thread producter = new Thread(()->{
                while (true){//保持生产者一直可以生产
                    lock.lock();
                    try {
                        while(list.size()==count){//保证 condition被叫醒以后,集合中不是满的
                            try {
                                logger.info("1111111111生产者进入等待队列");
                                waitN(2);//睡眠,使测试更容易被观看结果
                                condition.await();
                                logger.info("111111111生产者被唤醒");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        logger.info("111111111111生产者可以生产了");
                        waitN(2);
                        list.add( getRandom());
                        condition.signal();
                    }finally {
                        lock.unlock();
                    }
                }
            },"producter") ;
            producter.start();
    
            Thread consumer = new Thread(()->{
                while (true){
                    lock.lock();
    
                    while(list.size()==0){//每次唤醒判断如果是空的则继续排队
                        try {
                            logger.info("22222222消费者进入等待队列");
                            waitN(5);
                            condition.await();
                            logger.info("222222222消费者被唤醒");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    logger.info("2222222222消费者可以消费了");
                    waitN(2);
                    int val =list.pop();
                    logger.info(String.format("2222222消费者可以消费了 %d ", val ));
                    condition.signal();
    
                    //解锁一般放到finnaly中
                    lock.unlock();
                }
            },"consumer") ;
            consumer.start();
        }
    }
    View Code
    2)实例2,多个生产者和多个消费者使用,生产者和消费者分别使用自己的condition对象
     当生产者的线程大于消费者的线程,则最终队列会在 满的地方徘徊,
    package com.test.lock;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.LinkedList;
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ConditionTest5 {
    
        static Logger logger = LoggerFactory.getLogger(ConditionTest5.class ) ;
    
        public static int getRandom(){
            Random random = new Random() ;
            int rid = random.nextInt(6);
            int val =  rid+2;
            logger.info(String.format("生成随机数:%d", val ));
            return  val;
        }
    
        //睡眠打印日志更易于观察
        public static void waitN(Integer n ){
            try {
                TimeUnit.SECONDS.sleep(n);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return   ;
        }
    
        public static void main(String[] args) throws InterruptedException {
            //true 公平锁,这样生产者和消费者才能交替执行
            Lock lock = new ReentrantLock(true) ;
            //2个condition对象,一个在生产者中等待,一个在消费者中等待
            Condition produCondition = lock.newCondition();
            Condition consuCondition = lock.newCondition();
    
            Integer count=10;
            LinkedList<Integer> list = new LinkedList<>() ;
    
            for (int i = 0; i < 4 ; i++) {
                Thread producter = new Thread(()->{
                    while (true){//保持生产者一直可以生产
                        lock.lock();
    
                        while(list.size()==count){//保证 condition被叫醒以后,集合中不是满的
                            try {
                                logger.info("1111111111生产者进入等待队列");
                                waitN(2);
                                produCondition.await();
                                logger.info("111111111生产者被唤醒");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        logger.info("111111111111生产者可以生产了, {}" , list.toString());
                        waitN(2);
                        list.add( getRandom());
                        consuCondition.signal();
    
                        lock.unlock();
                    }
                },"producter"+i) ;
                producter.start();
            }
    
            for (int i = 0; i < 2; i++) {
                Thread consumer = new Thread(()->{
                    while (true){
                        lock.lock();
                        while(list.size()==0){
                            try {
                                logger.info("22222222消费者进入等待队列");
                                waitN(5);
                                consuCondition.await();
                                logger.info("222222222消费者被唤醒");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        logger.info("2222222222消费者可以消费了");
                        waitN(2);
                        int val =list.pop();
                        logger.info(String.format("2222222消费者可以消费了 %d ,  %s ", val ,  list.toString()  ));
                        produCondition.signal();
                        lock.unlock();
                    }
                },"consumer"+i ) ;
                consumer.start();
            }
        }
    }
    View Code
    三、源码主要方法解读
     
    Condition这个类是AQS的内部类,通过lock进行实例化的,当我们使用condition对象内部方法的时候,是可以使用lock对象的属性的。
     
    1)await方法
     
    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //当前线程进入等待队列
        Node node = addConditionWaiter();
        long savedState = fullyRelease(node);
        int interruptMode = 0;
       //判断节点是否在lock的等待队列中,如果已经不在了则这个线程等待
        while (!isOnSyncQueue(node)) {
            //如果没在lock的等待队列中,则当前线程睡眠
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

     2)当前线程入队方法

      

    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        //如果队列中的状态不是CONDITION,则清除掉这个节点
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        // 形成一个节点
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
       
        if (t == null)
            //如果第一个线程进入等待队列,则当前节点为第一个节点 
            firstWaiter = node;
        else
            //如果不是第一个进入等待,则最后一个节点的next指向当前节点
            t.nextWaiter = node;
        //当前线程的节点变成最后的节点
        lastWaiter = node;
       //返回当前线程的节点
        return node;
    }

    3)线程等待了,则会释放当前线程持有的锁

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            //调用外部类对象lock的方法进行锁释放
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            //如果释放锁失败则当前线程的节点状态为取消 
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

     4) 当前节点是否在lock的等待队列中

    final boolean isOnSyncQueue(Node node) {
        // 当前节点状态为CONDITION ,并且是head节点(节点为head节点: node.prev == null)
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }

    5)从lock的队列的尾部 向前查找是否可以找到这个节点,找到则返回true,找到head节点还没找到

      

    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
             //找到head节点还是没有,则返回false,head节点的prev是null
            if (t == null)
                return false;
            t = t.prev;
        }
    }

     6)唤醒睡眠的线程

      

    public final void signal() {
        //必须是当前获取锁的线程来执行
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            //唤醒队列中第一个线程
            doSignal(first);
    }

     7)唤醒第一个线程,清除第一个node对应的next信息

        

    private void doSignal(Node first) {
        do {
            //如果只有一个节点,则清除lastWaiter ,如果first唤醒失败,则把first的next 赋值给firstWaiter ,在此执行唤醒
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            //清除nextWaiter信息
            first.nextWaiter = null;
        } while (!transferForSignal(first) && //去唤醒第一个节点 
                 (first = firstWaiter) != null);  // 判断first!= null 
    }

     8  唤醒第一个节点去获取锁

      

    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        //设置节点状态 从 CONDITION 变为 0
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            //如果失败,则直接返回
            return false;
    
    
        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        //节点放到lock队列中去抢锁
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0  //节点状态为1 ,被取消了 
              || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //设置线程节点状态为SIGNAL,并判断成功与失败
            //线程在上一个步骤已经放到了lock队列中,这里只是设置一下node状态,实际上在lock.unlock步骤会从队列中唤醒的
            LockSupport.unpark(node.thread);
        return true;
    }
     
     
     
  • 相关阅读:
    prototype.js超强的javascript类库
    MySQL Server Architecture
    Know more about RBA redo block address
    MySQL无处不在
    利用Oracle Enterprise Manager Cloud Control 12c创建DataGuard Standby
    LAMP Stack
    9i中DG remote archive可能导致Primary Database挂起
    Oracle数据库升级与补丁
    Oracle为何会发生归档日志archivelog大小远小于联机重做日志online redo log size的情况?
    Oracle Ksplice如何工作?How does Ksplice work?
  • 原文地址:https://www.cnblogs.com/lean-blog/p/13736049.html
Copyright © 2011-2022 走看看