zoukankan      html  css  js  c++  java
  • Java并发(十一):Condition条件

    先做总结:

    1、为什么使用Condition条件?

    synchronized配合Object的wait()、notify()系列方法可以实现等待/通知模式。

    Lock提供了条件Condition,对线程的等待、唤醒操作更加详细和灵活。

    Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。

    2、Condition条件实现原理:

    (1)lock.newCondition(),new一个ConditionObject对象,ConditionObject有一个单向等待队列

    (2)condition等待队列:线程已经拿到锁,但是因为不满足某个条件,被放入等待队列并释放锁,不能获取锁

       AQS同步队列:线程抢锁失败,被放入AQS阻塞队列,等前面线程释放锁之后自己再获取锁

    (3)await():当前线程T加入条件等待队列 释放锁 park()当前线程

       signal():当前线程T节点出条件等待队列 T节点加入AQS同步队列 unpark()T线程

    一、Condition实现生产者消费者问题

    class BoundedBuffer {
        final Lock lock = new ReentrantLock();
        // condition 依赖于 lock 来产生
        final Condition notFull = lock.newCondition();
        final Condition notEmpty = lock.newCondition();
    
        final Object[] items = new Object[100];
        int putptr, takeptr, count;
    
        // 生产
        public void put(Object x) throws InterruptedException {
            lock.lock();
            try {
                while (count == items.length)
                    notFull.await();  // 队列已满,等待,直到 not full 才能继续生产
                items[putptr] = x;
                if (++putptr == items.length) putptr = 0;
                ++count;
                notEmpty.signal(); // 生产成功,队列已经 not empty 了,发个通知出去
            } finally {
                lock.unlock();
            }
        }
    
        // 消费
        public Object take() throws InterruptedException {
            lock.lock();
            try {
                while (count == 0)
                    notEmpty.await(); // 队列为空,等待,直到队列 not empty,才能继续消费
                Object x = items[takeptr];
                if (++takeptr == items.length) takeptr = 0;
                --count;
                notFull.signal(); // 被我消费掉一个,队列 not full 了,发个通知出去
                return x;
            } finally {
                lock.unlock();
            }
        }
    }

    二、lock与condition关系

        // 一个lock可以new多个条件
        public static void main(String[] args) {
            ReentrantLock lock = new ReentrantLock();
            lock.lock();
            Condition newCondition1 = lock.newCondition();
            newCondition1.await();
            newCondition1.signal();
            Condition newCondition2 = lock.newCondition();
            newCondition2.await();
            newCondition2.signal();
            lock.unlock();
        }
    
        // ReentrantLock.newCondition()
        public Condition newCondition() {
            return sync.newCondition();
        }
        
        // ReentrantLock.Sync.newCondition()
        final ConditionObject newCondition() {
            return new ConditionObject();
        }

    三、条件队列

    每个Condition对象都包含着一个FIFO队列,该队列是Condition对象通知/等待功能的关键。在队列中每一个节点(使用的AQS的节点)都包含着一个线程引用,该线程就是在该Condition对象上等待的线程。

    public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        private transient Node firstWaiter; //头节点
        private transient Node lastWaiter; //尾节点
        public ConditionObject() {
        }
        
        /**
         * 通过addConditionWaiter()方法理解等待队列数据结构
         * 将当前线程加入条件等待队列
         * 1.Node就是AQS的Node
         * 2.单向链表,通过nextWaiter连接
         * 3.waitStatus==Node.CONDITION才能在等待队列中
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
        
        /**
         * 清除队列中不是等待状态的线程
         */
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null; // 用于保存前一个节点
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }
    }

    四、等待await()

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter(); // 当前线程new Node()加入条件队列
            int savedState = fullyRelease(node); // 释放当先线程的锁
            int interruptMode = 0;
            
            /**
             * 自旋:
             * 1.当前节点不在同步队列(刚new的节点肯定不在),挂起当前线程,等待被唤醒
             * 2.当其他线程调用同一个ConditionObject的signal方法时,会将队列里的节点放入同步队列,并unpark线程(排队唤醒)
             * 3.如果该节点被唤醒,再自旋检查是否在同步队列。发现已经在队列中,就可以跳出循环,获取lock
             */
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 处理打断
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 获取锁
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

    五、唤醒signal()

    public final void signal() {
            if (!isHeldExclusively()) // 检查是否拿到锁
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
        
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) && // 唤醒队列第一个节点
                     (first = firstWaiter) != null);
        }
        
        final boolean transferForSignal(Node node) {
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
            Node p = enq(node); // 加入同步队列
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread); // 唤醒线程
            return true;
        }

    await() :造成当前线程在接到信号或被中断之前一直处于等待状态。

    await(long time, TimeUnit unit) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。

    awaitNanos(long nanosTimeout) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout – 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。

    awaitUninterruptibly() :造成当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】。

    awaitUntil(Date deadline) :造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false。

    signal():唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。

    signal()All:唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。

    参考资料 / 相关推荐

    一行一行源码分析清楚 AbstractQueuedSynchronizer (二)

    【死磕Java并发】—–J.U.C之Condition

    Java多线程系列--“JUC锁”06之 Condition条件

  • 相关阅读:
    my34_脚本冥等添加自动任务-mysql监控部署
    zabbix4.2 安装
    面试题12:字符串无重复子串的最大长度
    面试题11:二叉树的非递归前、中、后、层级遍历
    面试题10:二叉树的最大路径和
    面试题9:数组堆化、堆的插入、堆的删除、堆排序
    面试题8:无序数组的最大差值
    面试题7:判断链表是否有环,返回环的入口点
    面试题6:二叉树最近公共节点(LCA)《leetcode236》
    面试题6:二叉树转单链表
  • 原文地址:https://www.cnblogs.com/hexinwei1/p/9968446.html
Copyright © 2011-2022 走看看