zoukankan      html  css  js  c++  java
  • 从BlockingQueue 看 Condition配合使用实现生产/消费阻塞

    BlockingQueue之所以叫阻塞队列这个名字,主要就是以下两个特点

    1. 当队列元素满时,添加元素的操作(put())会阻塞直到队列有空位
    2. 当队列为空时,获取元素的操作(poll())会阻塞直到队列不为空(可以设置获取超时时间,超时返回null)

    实现以上特性主要是使用了ReentrantLock+Condition两个juc的类,以LinkedBlockingQueue源码为例,我们简单解析下它的代码实现
    LinkedBlockingQueue除了会阻塞的put poll方法外还有offer、take等不阻塞的方法,可以根据实际情况选用

    由于Condition是绑定在ReentrantLock上的,我们首先看下相关的定义

    Condition的特性:
    1.Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的这些方法是和同步锁捆绑使用的;而Condition是需要与互斥锁/共享锁捆绑使用的。
    2.Condition它更强大的地方在于:能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,在不同的情况下使用不同的Condition。
    例如,假如多线程读/写同一个缓冲区:当向缓冲区中写入数据之后,唤醒"读线程";当从缓冲区读出数据之后,唤醒"写线程";并且当缓冲区满的时候,"写线程"需要等待;当缓冲区为空时,"读线程"需要等待。
    如果采用Object类中的wait(), notify(), notifyAll()实现该缓冲区,当向缓冲区写入数据之后需要唤醒"读线程"时,不可能通过notify()或notifyAll()明确的指定唤醒"读线程",而只能通过notifyAll唤醒所有线程(但是notifyAll无法区分唤醒的线程是读线程,还是写线程)。 但是,通过Condition,就能明确的指定唤醒读线程。

    *** LinkedBlockingQueue关键变量如下 ***

      /** 队列容量,默认大小为Integer.MAX_VALUE!!可能导致OOM */
        private final int capacity;
    
        /** 当前队列中元素个数 */
        private final AtomicInteger count = new AtomicInteger();
    
        /**
         * 列表头结点
         */
        transient Node<E> head;
    
        /**
         * 列表尾结点
         */
        private transient Node<E> last;
    
        /** 取元素 锁*/
        private final ReentrantLock takeLock = new ReentrantLock();
    
        /** 控制获取操作阻塞/执行 */
        private final Condition notEmpty = takeLock.newCondition();
    
        /** 放元素 锁 */
        private final ReentrantLock putLock = new ReentrantLock();
    
        /** 控制放置操作阻塞/执行 */
        private final Condition notFull = putLock.newCondition();
    
    

    这里可能有人有疑问,为什么有了两个ReentrantLock 还要建两个 Condition
    其实主要是逻辑上的功能不同,比如takeLock 在代码中是这样使用的:

         /**
         * 唤醒等待的取对象线程 只提供给 put/offer 调用
         */
        private void signalNotEmpty() {
            final ReentrantLock takeLock = this.takeLock;
            //这时候不允许取元素,防止变为空队列
            takeLock.lock();
            try {
                notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
        }
    
    

    这一段的源码注释也说明了用途,两个ReentrantLock主要用于控制当前队列是否能放入/取出对象,而 Condition用于标识的是队列满/空 这两个临界状态

    接下来就是看put和poll两个核心方法

     public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            // 创建本地变量c保存获取count
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            //使用putLock,实现串行添加对象
            putLock.lockInterruptibly();
            try {
                //队列已满,用notFull阻塞
                while (count.get() == capacity) {
                    notFull.await();
                }
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    //队列不为空,更新notFull
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            //安全地调用notEmpty.signal();通知获取元素的线程
            if (c == 0)
                signalNotEmpty();
        }
    
    
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            E x = null;
            int c = -1;
            long nanos = unit.toNanos(timeout);
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
                //等待指定时长后返回null
                while (count.get() == 0) {
                    if (nanos <= 0)
                        return null;
                    nanos = notEmpty.awaitNanos(nanos);
                }
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }
    

    总结:

    可以看到put和poll方法总体流程都差不多,都是通过putLock/takeLock将获取/防止对象的操作变为串行化,并且在开始/完成操作时根据AtomicInteger的count个数,更新notFull和NotEmpty两个Condition,唤醒相对应操作的线程

  • 相关阅读:
    mybatis中一直获取xml配置文件输入流值为空的类似解决方法
    switch中能有的值都有哪些
    length,length(),size()
    Spring中IOC的基本原理
    ajax中的一些小问题
    Servlet简单业务流程
    推荐用字节流处理文件拷贝
    更有效率的数据交换
    1.7版本处理io流自动关闭流的写法
    集合中的简单知识
  • 原文地址:https://www.cnblogs.com/CodeSpike/p/13650701.html
Copyright © 2011-2022 走看看