zoukankan      html  css  js  c++  java
  • Condition分析

    Condition是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件,只有满足条件时,线程才会被唤醒。

    结论:

    阻塞:await()方法中,在线程释放锁资源之后,如果节点不在 AQS 等待队列,则阻塞当前线程,如果在AQS等待队列,则自旋等待尝试获取锁。

    释放:signal()后,节点会从 condition 队列移动到 AQS 等待队列,则进入正常锁的获取流程。

    基本使用

    public class ConditionAwaitDemo implements Runnable {
        private ReentrantLock lock;
        private Condition condition;
    
        public ConditionAwaitDemo(ReentrantLock lock, Condition condition) {
            this.lock = lock;
            this.condition = condition;
        }
    
        @Override
        public void run() {
            System.out.println("begin---await");
            try {
                lock.lock();
                condition.await();
                System.out.println("end---await");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
    
    public class ConditionSignalDemo implements Runnable {
        private ReentrantLock lock;
        private Condition condition;
    
        public ConditionSignalDemo(ReentrantLock lock, Condition condition) {
            this.lock = lock;
            this.condition = condition;
        }
    
        @Override
        public void run() {
            System.out.println("begin---signal");
            try {
                lock.lock();
                condition.signal();
                System.out.println("end---await");
            } finally {
                lock.unlock();
            }
        }
    }
    
    public class Test {
        public static void main(String[] args) {
            ReentrantLock lock = new ReentrantLock();
            Condition condition = lock.newCondition();
            new Thread(new ConditionAwaitDemo(lock, condition)).start();
            new Thread(new ConditionSignalDemo(lock, condition)).start();
        }
    }
    

    源码分析 Condition.await()

    ​ 调用Condition,需要获得Lock锁,所以会意味着存在一个AQS同步队列,先看Condition.await()方法。它会使线程进入等待队列并释放锁,同时将线程状态变为等待状态。

    1,AQS.await(),其实阻塞调用的还是LockSupport.park()方法。

        public final void await() throws InterruptedException {
            //表示await允许被中断
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();        
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //判断这个节点是否在AQS队列上,第一次判断肯定是false,因为前面已经释放锁了
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);// 第一次总是park自己,开始阻塞等待
                // 线程判断自己在等待过程中是否被中断了,如果没有中断,则再次循环,会在 isOnSyncQueue 中判断自己是否在队列上
                // isOnSyncQueue 判断当前node还在队列上且不是CONDITION状态了,就结束循环和阻塞.
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.
            // interruptMode != THROW_IE -> 表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了.
            // 将这个变量设置成 REINTERRUPT.
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            // 如果 node 的下一个等待者不是 null, 则进行清理,清理 Condition 队列上的节点.
            // 如果是 null ,就没有什么好清理的了.
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            // 如果线程被中断了,需要抛出异常.或者什么都不做
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
    

    2,AQS.addConditionWaiter()

        //把当前线程封装成 Node,添加到等待队列。这里的队列不再是双向链表,而是单向链表
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
    

    3,AQS.fullyRelease(Node )

        //彻底的释放锁,什么叫彻底呢,就是如果当前锁存在多次重入,那么在这个方法中只需要释放一次就会把所有的重入次数归零。
    	final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();	//获取当前重入次数
                //释放锁并且唤醒下一个同步队列中的线程
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }
    

    4,AQS.isOnSyncQueue(Node )

    ​ condition队列中的节点会重新加入到aqs队列去重新竞争锁,也就是当调用signal的时候,会把当前节点从condition队列转移到aqs队列。所以需要判断节点是否在aqs队列中。

    a,如果不在aqs队列,则说明当前节点没有被唤醒去争抢锁,所以需要阻塞等待其它线程调用signal唤醒。

    b,如果在aqs队列,则说明当前节点需要去竞争锁获得执行权

    node.waitStatus == Node.CONDITION || node.prev == null 判断不在aqs队列?

    ​ aqs队列中的节点一定没有状态为condition的节点。在aqs队列中只有head节点的prev==null,head节点又是获得锁的节点,所以不可能。

    ​ node.next不为空一定存在condition的队列,因为aqs是双向列表

    c,findNodeFromTail:从tail节点往前扫描aqs队列,一但发现aqs队列有节点和当前节点相等,那么该节点也一定存在于aqs队列。

        //判断当前节点是否在同步队列中,返回 false 表示不在,返回 true 表示在
    	final boolean isOnSyncQueue(Node node) {
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            if (node.next != null) // If has successor, it must be on queue
                return true;
            return findNodeFromTail(node);
        }
    

    5,ConditionObject.checkInterruptWhileWaiting(Node )

    ​ 判断线程在condition队列被阻塞的过程中,有没有被其他线程触发过中断请求

        private int checkInterruptWhileWaiting(Node node) {
            //如果被中断,则调用transferAfterCancelledWait判断后续的处理应该是抛出异常还是重新中断
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            0;
        }
    

    6,transferAfterCancelledWait(Node )

        final boolean transferAfterCancelledWait(Node node) {
            //如果设置成功,证明线程在signal之前已经被中断过,所以重新假如aqs队列
            if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
                enq(node);
                return true;
            }
            //其实如果调用signal唤醒,则线程已经在aqs队列,该步循环判断是否在aqs队列上,如果不在让出CPU执行权
            while (!isOnSyncQueue(node))
                Thread.yield();
            return false;
        }
    

    7,AQS.acquireQueued(Node , int )

    ​ 该方法,aqs分析过,被唤醒的线程去抢占同步锁,并且回复到原来的重入次数。

    8,reportInterruptAfterWait(int )

    根据 checkInterruptWhileWaiting 方法返回的中断标识来进行中断上报。

        private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();	//抛出中断异常
            else if (interruptMode == REINTERRUPT)	//则重新响应中断
                selfInterrupt();
        }
    

    源码分析 Condition.signal()

    1,AQS.signal()

        public final void signal() {
            //先判断当前线程是否获得了锁,逻辑是:判断当前线程和获得锁的线程是否相等
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter; // 拿到 Condition 队列上第一个节点
            if (first != null)
                doSignal(first);
        }
    

    2,ConditionObject.doSignal(Node )

    ​ 对condition队列从首部开始第一个节点执行transferForSignal操作,将将node从condition队移动到aqs队列中,同时修改aqs队列中原先尾节点的状态。

        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
    

    3,AQS.transferForSignal(Node )

        final boolean transferForSignal(Node node) {
            //更新节点的状态为0,如果更新失败只有一种可能就是这个节点被CANCELLED了
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
            //把当前节点添加到AQS队列。并且返回返回按当前节点的上一个节点,也就是原tail节点
            Node p = enq(node);
            int ws = p.waitStatus;
            // 如果上一个节点的状态被取消了, 或者尝试设置上一个节点的状态为 SIGNAL 失败了(SIGNAL 表示: 他的 next 节点需要停止阻塞)
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            //如果 node 的 prev 节点已经是signal 状态,那么被阻塞的 ThreadA 的唤醒工作由 AQS 队列来完成
            return true;
        }
    

  • 相关阅读:
    反射学习系列3反射实例应用
    (转)华为牛人在华为工作十年的感悟!
    利用日志记录所有LINQ的增,删,改解决方案
    qt5摄像头
    opencvcartToPolar笛卡尔坐标转极坐标
    逆矩阵
    方阵的行列式
    qt5右键菜单
    矩阵的转置
    opencvpyrDown降采样和pyrUp升采样
  • 原文地址:https://www.cnblogs.com/gaojf/p/12774133.html
Copyright © 2011-2022 走看看