zoukankan      html  css  js  c++  java
  • Java编程的逻辑 (72)

    上节我们介绍了显式锁,本节介绍关联的显式条件,介绍其用法和原理。显式条件也可以被称做条件变量、条件队列、或条件,后文我们可能会交替使用。

    用法

    基本概念和方法

    锁用于解决竞态条件问题,条件是线程间的协作机制。显式锁与synchronzied相对应,而显式条件与wait/notify相对应。wait/notify与synchronized配合使用,显式条件与显式锁配合使用。

    条件与锁相关联,创建条件变量需要通过显式锁,Lock接口定义了创建方法:

    Condition newCondition();

    Condition表示条件变量,是一个接口,它的定义为:

    复制代码
    public interface Condition {
      void await() throws InterruptedException;
      void awaitUninterruptibly();
      long awaitNanos(long nanosTimeout) throws InterruptedException;
      boolean await(long time, TimeUnit unit) throws InterruptedException;
      boolean awaitUntil(Date deadline) throws InterruptedException;
      void signal();
      void signalAll();
    }
    复制代码

    await()对应于Object的wait(),signal()对应于notify,signalAll()对应于notifyAll(),语义也是一样的。

    与Object的wait方法类似,await也有几个限定等待时间的方法,但功能更多一些:

    复制代码
    //等待时间是相对时间,如果由于等待超时返回,返回值为false,否则为true
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    //等待时间也是相对时间,但参数单位是纳秒,返回值是nanosTimeout减去实际等待的时间
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    //等待时间是绝对时间,如果由于等待超时返回,返回值为false,否则为true
    boolean awaitUntil(Date deadline) throws InterruptedException;
    复制代码

    这些await方法都是响应中断的,如果发生了中断,会抛出InterruptedException,但中断标志位会被清空。Condition还定义了一个不响应中断的等待方法:

    void awaitUninterruptibly();

    该方法不会由于中断结束,但当它返回时,如果等待过程中发生了中断,中断标志位会被设置。

    一般而言,与Object的wait方法一样,调用await方法前需要先获取锁,如果没有锁,会抛出异常IllegalMonitorStateException。await在进入等待队列后,会释放锁,释放CPU,当其他线程将它唤醒后,或等待超时后,或发生中断异常后,它都需要重新获取锁,获取锁后,才会从await方法中退出。

    另外,与Object的wait方法一样,await返回后,不代表其等待的条件就一定满足了,通常要将await的调用放到一个循环内,只有条件满足后才退出。

    一般而言,signal/signalAll与notify/notifyAll一样,调用它们需要先获取锁,如果没有锁,会抛出异常IllegalMonitorStateException。signal与notify一样,挑选一个线程进行唤醒,signalAll与notifyAll一样,唤醒所有等待的线程,但这些线程被唤醒后都需要重新竞争锁,获取锁后才会从await调用中返回。

    用法示例

    ReentrantLock实现了newCondition方法,通过它,我们来看下条件的基本用法。我们实现与67节类似的例子WaitThread,一个线程启动后,在执行一项操作前,等待主线程给它指令,收到指令后才执行,示例代码为:

    复制代码
    public class WaitThread extends Thread {
        private volatile boolean fire = false;
        private Lock lock = new ReentrantLock();
        private Condition condition = lock.newCondition();
    
        @Override
        public void run() {
            try {
                lock.lock();
                try {
                    while (!fire) {
                        condition.await();
                    }
                } finally {
                    lock.unlock();
                }
                System.out.println("fired");
            } catch (InterruptedException e) {
                Thread.interrupted();
            }
        }
    
        public void fire() {
            lock.lock();
            try {
                this.fire = true;
                condition.signal();
            } finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            WaitThread waitThread = new WaitThread();
            waitThread.start();
            Thread.sleep(1000);
            System.out.println("fire");
            waitThread.fire();
        }
    }
    复制代码

    需要特别注意的是,不要将signal/signalAll与notify/notifyAll混淆,notify/notifyAll是Object中定义的方法,Condition对象也有,稍不注意就会误用,比如,对上面例子中的fire方法,可能会写为:

    复制代码
    public void fire() {
        lock.lock();
        try {
            this.fire = true;
            condition.notify();
        } finally {
            lock.unlock();
        }
    }
    复制代码

    写成这样,编译器不会报错,但运行时会抛出IllegalMonitorStateException,因为notify的调用不在synchronized语句内。

    同样,避免将锁与synchronzied混用,那样非常令人混淆,比如:

    复制代码
    public void fire() {
        synchronized(lock){
            this.fire = true;
            condition.signal();
        }
    }
    复制代码

    记住,显式条件与显式锁配合,wait/notify与synchronized配合。

    生产者/消费者模式

    在67节,我们用wait/notify实现了生产者/消费者模式,我们提到了wait/notify的一个局限,它只能有一个条件等待队列,分析等待条件也很复杂。在生产者/消费者模式中,其实有两个条件,一个与队列满有关,一个与队列空有关。使用显式锁,可以创建多个条件等待队列。下面,我们用显式锁/条件重新实现下其中的阻塞队列,代码为:

    复制代码
    static class MyBlockingQueue<E> {
        private Queue<E> queue = null;
        private int limit;
        private Lock lock = new ReentrantLock();
        private Condition notFull  = lock.newCondition();
        private Condition notEmpty = lock.newCondition();
    
    
        public MyBlockingQueue(int limit) {
            this.limit = limit;
            queue = new ArrayDeque<>(limit);
        }
    
        public void put(E e) throws InterruptedException {
            lock.lockInterruptibly();
            try{
                while (queue.size() == limit) {
                    notFull.await();
                }
                queue.add(e);
                notEmpty.signal();    
            }finally{
                lock.unlock();
            }
        }
    
        public E take() throws InterruptedException {
            lock.lockInterruptibly();
            try{
                while (queue.isEmpty()) {
                    notEmpty.await();
                }
                E e = queue.poll();
                notFull.signal();
                return e;    
            }finally{
                lock.unlock();
            }
        }
    }
    复制代码

    定义了两个等待条件:不满(notFull)、不空(notEmpty),在put方法中,如果队列满,则在notFull上等待,在take方法中,如果队列空,则在notEmpty上等待,put操作后通知notEmpty,take操作后通知notFull。

    这样,代码更为清晰易读,同时避免了不必要的唤醒和检查,提高了效率。Java并发包中的类ArrayBlockingQueue就采用了类似的方式实现。

    实现原理
    ConditionObject
    理解了显式条件的概念和用法,我们来看下ReentrantLock是如何实现它的,其newCondition()的代码为:

    public Condition newCondition() {
        return sync.newCondition();
    }

    sync是ReentrantLock的内部类对象,其newCondition()代码为:

    final ConditionObject newCondition() {
        return new ConditionObject();
    }

    ConditionObject是AQS中定义的一个内部类,不了解AQS请参看上节。ConditionObject的实现也比较复杂,我们通过一些主要代码来简要探讨其实现原理。ConditionObject内部也有一个队列,表示条件等待队列,其成员声明为:

    //条件队列的头节点
    private transient Node firstWaiter;
    //条件队列的尾节点
    private transient Node lastWaiter;

    ConditionObject是AQS的成员内部类,它可以直接访问AQS中的数据,比如AQS中定义的锁等待队列。

    我们看下几个方法的实现,先看await方法。

    await实现分析

    下面是await方法的代码,我们通过添加注释解释其基本思路。

    复制代码
    public final void await() throws InterruptedException {
        // 如果等待前中断标志位已被设置,直接抛异常
        if (Thread.interrupted())
            throw new InterruptedException();
        // 1.为当前线程创建节点,加入条件等待队列
        Node node = addConditionWaiter();
        // 2.释放持有的锁
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // 3.放弃CPU,进行等待,直到被中断或isOnSyncQueue变为true
        // isOnSyncQueue为true表示节点被其他线程从条件等待队列
        // 移到了外部的锁等待队列,等待的条件已满足
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // 4.重新获取锁
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        // 5.处理中断,抛出异常或设置中断标志位
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    复制代码

    awaitNanos实现分析

    awaitNanos与await的实现是基本类似的,区别主要是会限定等待的时间,如下所示:

    复制代码
    public final long awaitNanos(long nanosTimeout) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        long lastTime = System.nanoTime();
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            if (nanosTimeout <= 0L) {
                //等待超时,将节点从条件等待队列移到外部的锁等待队列
                transferAfterCancelledWait(node);
                break;
            }
            //限定等待的最长时间
            LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
    
            long now = System.nanoTime();
            //计算下次等待的最长时间
            nanosTimeout -= now - lastTime;
            lastTime = now;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return nanosTimeout - (System.nanoTime() - lastTime);
    }
    复制代码

    signal实现分析

    signal方法代码为:

    复制代码
    public final void signal() {
        //验证当前线程持有锁
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        //调用doSignal唤醒等待队列中第一个线程
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }
    复制代码

    doSignal的代码就不列举了,其基本逻辑是:

    1. 将节点从条件等待队列移到锁等待队列
    2. 调用LockSupport.unpark将线程唤醒

    小结

    本节介绍了显式条件的用法和实现原理。它与显式锁配合使用,与wait/notify相比,可以支持多个条件队列,代码更为易读,效率更高,使用时注意不要将signal/signalAll误写为notify/notifyAll。

    从70节到本节,我们介绍了Java并发包的基础 - 原子变量和CAS、显式锁和条件,基于这些,Java并发包还提供了很多更为易用的高层数据结构、工具和服务,从下一节开始,我们先探讨一些并发数据结构。

  • 相关阅读:
    opencv 单应矩阵
    对极约束
    opencv Mat 操作
    两个视角得到世界坐标系
    opencv storage 操作C++
    Python操作mysql数据库
    java——保留一位、两位小数
    数据库 select from 库名 表名 的用法
    python 使用国内镜像下载插件及报错Could not fetch URL https://pypi.org/simple/pywinauto/: There was a problem co解决方法
    pycharm下载第三方库AttributeError: module 'pip' has no attribute 'main'问题解决
  • 原文地址:https://www.cnblogs.com/ivy-xu/p/12364391.html
Copyright © 2011-2022 走看看