zoukankan      html  css  js  c++  java
  • Condition的使用与源码进阶

     
    一、Condition作用是什么
      主要功能和Object中的wait、notify功能相对应,使某个线程 在某种情况下等待和唤醒的功能。
     
     
    二、使用实例
    1)实例1,单一生产者和消费者使用,并且只用一个condition对象控制生产者和消费者
    注意:单一condition对象,生产者和消费者都在一个队列中排队,那么这个时候下一个被唤醒的可能是消费者也可能是生产者,那么如果生产者需要唤醒消费者但是被唤醒的确实生产者,则需要生产者继续阻塞排队(使用循环判断模式)
     
     
    package com.test.lock;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.LinkedList;
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ConditionTest4 {
    
        static Logger logger = LoggerFactory.getLogger(ConditionTest4.class ) ;
    
        public static int getRandom(){
            Random random = new Random() ;
            int rid = random.nextInt(6);
            int val =  rid+2;
            logger.info(String.format("生成随机数:%d", val ));
            return  val;
        }
    
    
        //睡眠打印日志更易于观察
        public static void waitN(Integer n ){
            try {
                TimeUnit.SECONDS.sleep(n);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return   ;
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            Lock lock = new ReentrantLock(true) ;
            Condition condition = lock.newCondition();
    
            //队列允许的最大值
            Integer count=10;
           //数据等待的队列
            LinkedList<Integer> list = new LinkedList<>() ;
    
    
            Thread producter = new Thread(()->{
                while (true){//保持生产者一直可以生产
                    lock.lock();
                    try {
                        while(list.size()==count){//保证 condition被叫醒以后,集合中不是满的
                            try {
                                logger.info("1111111111生产者进入等待队列");
                                waitN(2);//睡眠,使测试更容易被观看结果
                                condition.await();
                                logger.info("111111111生产者被唤醒");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        logger.info("111111111111生产者可以生产了");
                        waitN(2);
                        list.add( getRandom());
                        condition.signal();
                    }finally {
                        lock.unlock();
                    }
                }
            },"producter") ;
            producter.start();
    
            Thread consumer = new Thread(()->{
                while (true){
                    lock.lock();
    
                    while(list.size()==0){//每次唤醒判断如果是空的则继续排队
                        try {
                            logger.info("22222222消费者进入等待队列");
                            waitN(5);
                            condition.await();
                            logger.info("222222222消费者被唤醒");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    logger.info("2222222222消费者可以消费了");
                    waitN(2);
                    int val =list.pop();
                    logger.info(String.format("2222222消费者可以消费了 %d ", val ));
                    condition.signal();
    
                    //解锁一般放到finnaly中
                    lock.unlock();
                }
            },"consumer") ;
            consumer.start();
        }
    }
    View Code
    2)实例2,多个生产者和多个消费者使用,生产者和消费者分别使用自己的condition对象
     当生产者的线程大于消费者的线程,则最终队列会在 满的地方徘徊,
    package com.test.lock;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.LinkedList;
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ConditionTest5 {
    
        static Logger logger = LoggerFactory.getLogger(ConditionTest5.class ) ;
    
        public static int getRandom(){
            Random random = new Random() ;
            int rid = random.nextInt(6);
            int val =  rid+2;
            logger.info(String.format("生成随机数:%d", val ));
            return  val;
        }
    
        //睡眠打印日志更易于观察
        public static void waitN(Integer n ){
            try {
                TimeUnit.SECONDS.sleep(n);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return   ;
        }
    
        public static void main(String[] args) throws InterruptedException {
            //true 公平锁,这样生产者和消费者才能交替执行
            Lock lock = new ReentrantLock(true) ;
            //2个condition对象,一个在生产者中等待,一个在消费者中等待
            Condition produCondition = lock.newCondition();
            Condition consuCondition = lock.newCondition();
    
            Integer count=10;
            LinkedList<Integer> list = new LinkedList<>() ;
    
            for (int i = 0; i < 4 ; i++) {
                Thread producter = new Thread(()->{
                    while (true){//保持生产者一直可以生产
                        lock.lock();
    
                        while(list.size()==count){//保证 condition被叫醒以后,集合中不是满的
                            try {
                                logger.info("1111111111生产者进入等待队列");
                                waitN(2);
                                produCondition.await();
                                logger.info("111111111生产者被唤醒");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        logger.info("111111111111生产者可以生产了, {}" , list.toString());
                        waitN(2);
                        list.add( getRandom());
                        consuCondition.signal();
    
                        lock.unlock();
                    }
                },"producter"+i) ;
                producter.start();
            }
    
            for (int i = 0; i < 2; i++) {
                Thread consumer = new Thread(()->{
                    while (true){
                        lock.lock();
                        while(list.size()==0){
                            try {
                                logger.info("22222222消费者进入等待队列");
                                waitN(5);
                                consuCondition.await();
                                logger.info("222222222消费者被唤醒");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        logger.info("2222222222消费者可以消费了");
                        waitN(2);
                        int val =list.pop();
                        logger.info(String.format("2222222消费者可以消费了 %d ,  %s ", val ,  list.toString()  ));
                        produCondition.signal();
                        lock.unlock();
                    }
                },"consumer"+i ) ;
                consumer.start();
            }
        }
    }
    View Code
    三、源码主要方法解读
     
    Condition这个类是AQS的内部类,通过lock进行实例化的,当我们使用condition对象内部方法的时候,是可以使用lock对象的属性的。
     
    1)await方法
     
    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //当前线程进入等待队列
        Node node = addConditionWaiter();
        long savedState = fullyRelease(node);
        int interruptMode = 0;
       //判断节点是否在lock的等待队列中,如果已经不在了则这个线程等待
        while (!isOnSyncQueue(node)) {
            //如果没在lock的等待队列中,则当前线程睡眠
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

     2)当前线程入队方法

      

    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        //如果队列中的状态不是CONDITION,则清除掉这个节点
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        // 形成一个节点
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
       
        if (t == null)
            //如果第一个线程进入等待队列,则当前节点为第一个节点 
            firstWaiter = node;
        else
            //如果不是第一个进入等待,则最后一个节点的next指向当前节点
            t.nextWaiter = node;
        //当前线程的节点变成最后的节点
        lastWaiter = node;
       //返回当前线程的节点
        return node;
    }

    3)线程等待了,则会释放当前线程持有的锁

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            //调用外部类对象lock的方法进行锁释放
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            //如果释放锁失败则当前线程的节点状态为取消 
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

     4) 当前节点是否在lock的等待队列中

    final boolean isOnSyncQueue(Node node) {
        // 当前节点状态为CONDITION ,并且是head节点(节点为head节点: node.prev == null)
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }

    5)从lock的队列的尾部 向前查找是否可以找到这个节点,找到则返回true,找到head节点还没找到

      

    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
             //找到head节点还是没有,则返回false,head节点的prev是null
            if (t == null)
                return false;
            t = t.prev;
        }
    }

     6)唤醒睡眠的线程

      

    public final void signal() {
        //必须是当前获取锁的线程来执行
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            //唤醒队列中第一个线程
            doSignal(first);
    }

     7)唤醒第一个线程,清除第一个node对应的next信息

        

    private void doSignal(Node first) {
        do {
            //如果只有一个节点,则清除lastWaiter ,如果first唤醒失败,则把first的next 赋值给firstWaiter ,在此执行唤醒
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            //清除nextWaiter信息
            first.nextWaiter = null;
        } while (!transferForSignal(first) && //去唤醒第一个节点 
                 (first = firstWaiter) != null);  // 判断first!= null 
    }

     8  唤醒第一个节点去获取锁

      

    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        //设置节点状态 从 CONDITION 变为 0
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            //如果失败,则直接返回
            return false;
    
    
        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        //节点放到lock队列中去抢锁
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0  //节点状态为1 ,被取消了 
              || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //设置线程节点状态为SIGNAL,并判断成功与失败
            //线程在上一个步骤已经放到了lock队列中,这里只是设置一下node状态,实际上在lock.unlock步骤会从队列中唤醒的
            LockSupport.unpark(node.thread);
        return true;
    }
     
     
     
  • 相关阅读:
    angular 前端路由不生效解决方案
    LinqMethod 实现 LeftJoin
    Newtonsoft.Json 序列化踩坑之 IEnumerable
    Newtonsoft.Json 指定某个属性使用特定的时间格式
    [svc]Linux中Swap与Memory内存简单介绍
    [svc]Linux vmstat命令实战详解
    [svc]ansible自动化模块
    [svc]ssh+gg二步认证
    [svc][cpu][jk]cpu的核心查看及什么是cpu的负载
    [vt][xen]xenserver初始安装增加第二块硬盘&xen图形界面安装vm&设置xen里vm开机启动
  • 原文地址:https://www.cnblogs.com/lean-blog/p/13736049.html
Copyright © 2011-2022 走看看