zoukankan      html  css  js  c++  java
  • Java并发之阻塞队列浅析

    背景

    因为在工作中经常会用到阻塞队列,有的时候还要根据业务场景获取重写阻塞队列中的方法,所以学习一下阻塞队列的实现原理还是很有必要的。(PS:不深入了解的话,很容易使用出错,造成没有技术深度的样子)

    阻塞队列是什么?

    要想了解阻塞队列,先了解一下队列是啥,简单的说队列就是一种先进先出的数据结构。(具体的内容去数据结构里学习一下)所以阻塞队列就是一种可阻塞的队列。和普通的队列的不同就体现在 ”阻塞“两个字上。阻塞是啥意思?

      百度看一下

    在软件工程里阻塞一般指的是阻塞调用,即调用结果返回之前,当前线程会被挂起。函数只有在得到结果之后才会返回。

    阻塞队列其实就是普通的队列根据需要将某些方法改为阻塞调用。所以阻塞队里和普通队里的不同主要体现在两个方面

    • 当队列是空的时,从队列中获取元素的操作将会被阻塞 。直到其他的线程往空的队列插入新的元素
    • 当队列是满时,往队列里添加元素的操作会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列

    为什么要使用阻塞队列?

        那么为什么要使用阻塞队列?阻塞队列又能完成什么特殊的任务吗?

        阻塞队列的经典使用 场景就是“生产者”和“消费者”模型,生产者生产数据,放入队列,然后消费从队列中获取数据,这个在一般情况下自然没有问题,但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?

            在出现消费者速度远大于生产者速度,消费者在数据消费至一定程度的情况下,暂停等待一下(阻塞消费者)来等待生产者,以保证生产者能够生产出新的数据;反之亦然。

       阻塞队列在java中的一种典型使用场景是线程池,在线程池中,当提交的任务不能被立即得到执行的时候,线程池就会将提交的任务放到一个阻塞的任务队列中来(线程池的具体使用参见之前写的一篇文章《java并发之线程池的浅析》)

       然而,在阻塞队列发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。在这里要感谢一下concurrent包,减轻了我们很多工作

    阻塞队列的成员有哪些

                                       

    下面分别简单介绍一下:

    • ArrayBlockingQueue:是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。构造时必须传入的参数是数组大小此外还可以指定是否公平性。【注:每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程的请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是当前等待时间最长的线程先获取锁】;在插入或删除元素时不会产生或销毁任何额外的对象实例

    • LinkedBlockingQueue:一个由链表结构组成的有界队列,照先进先出的顺序进行排序 ,未指定长度的话,默认 此队列的长度为Integer.MAX_VALUE。。【PS:如果生产者的速度远远大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已经被消耗殆尽了。】PriorityBlockingQueue: 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。
      • LinkedBlockingQueue之所以能够高效的处理并发数据,是因为take()方法和put(E param)方法使用了不同的可重入锁,分别为private final ReentrantLock putLock和private final ReentrantLock takeLock,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能
      • LinkedBlockingQueue在插入元素是会创建一个额外的Node对象,所以它这在长时间内需要高效并发地处理大批量数据的系统中,对于GC的还是存在一定的影响。
    • DelayQueue: 一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。(DelayQueue可以运用在以下应用场景:1.缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。2.定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。)
    • SynchronousQueue: 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
    • LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。
    • LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。

    阻塞队列的核心方法 

    阻塞对队列的核心方法主要是插入操作操作和取出操作,如下

    • Throws Exception 类型的插入和取出在不能立即被执行的时候就会抛出异常。
    • Special Value 类型的插入和取出在不能被立即执行的情况下会返回一个特殊的值(true 或者 false 或者null)
    • Blocked 类型的插入和取出操作在不能被立即执行的时候会阻塞线程直到可以操作的时候会被其他线程唤醒
    • Timed out 类型的插入和取出操作在不能立即执行的时候会被阻塞一定的时候,如果在指定的时间内没有被执行,那么会返回一个特殊值

     插入操作

    • boolean offer(E e):将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则返回 false。(本方法不阻塞当前执行方法的线程)。       
    • boolean  offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在设置的指定的时间内,还不能往队列中加入BlockingQueue,则返回false。
    • void put(E paramE) throws InterruptedException:将指定元素插入到此队列中里,如果队列没有空间,则调用此方法的线程被阻断直到队列里里面有空间再继续执行插入操作。
    • public boolean add(E e): 将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则抛出 IllegalStateException(其实就是调用了offer方法)
    public boolean add(E e) {
            if (offer(e))
                return true;
            else
                throw new IllegalStateException("Queue full");
    }

     获取操作

    • poll():取走BlockingQueue里排在首位的对象,,取不到时返回null;
    • poll(long timeout, TimeUnit unit):在指定时间内从BlockingQueue取出一个队首的对象,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回null。
    • take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入; 
    • drainTo(Collection<? super E> c, int maxElements):一次性从BlockingQueue获取所有可用的数据对象,将数据对象加入传递的集合中(还可以通过maxElements指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁

    阻塞队列的实现原理

        前面介绍了非阻塞队列和阻塞队列中常用的方法,下面来探讨阻塞队列的实现原理,本文以比较常用的ArrayBlockingQueue为例,其他阻塞队列实现原理根据特性会和ArrayBlockingQueue有一些差别,但是大体思路应该类似,有兴趣的朋友可自行查看其他阻塞队列的实现源码。

       首先看一下ArrayBlockingQueue的几个关键成员变量

    public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
        /** The queued items */
        final Object[] items;
    
        /** items index for next take, poll, peek or remove */
        int takeIndex;
    
        /** items index for next put, offer, or add */
        int putIndex;
    
        /** Number of elements in the queue */
        int count;
        /*
         * Concurrency control uses the classic two-condition algorithm
         * found in any textbook.
         */
    
        /** Main lock guarding all access */
        final ReentrantLock lock;
    
        /** Condition for waiting takes */
        private final Condition notEmpty;
    
        /** Condition for waiting puts */
        private final Condition notFull;
    }

     从上边可以明显的看出ArrayBlockingQueue用一个数组来存储数据,takeIndex和putIndex分别表示队首元素和队尾元素的下标,count表示队列中元素的个数。 lock是一个可重入锁,notEmpty和notFull是等待条件。

     然后看它的一个关键方法的实现:put()

    public void put(E e) throws InterruptedException {
      checkNotNull(e);
       final ReentrantLock lock = this.lock;
       lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
          lock.unlock();
        }
    }
    1. 首选检查元素是否为空,为空则抛出异常
    2. 接着实例化可重入锁
    3. 然后localReentrantLock.lockInterruptibly();这里特别强调一下 (lockInterruptibly()允许在等待时由其他线程的Thread.interrupt()方法来中断等待线程而直接返回,这时是不用获取锁的,而会抛出一个InterruptException。  而ReentrantLock.lock()方法则不允许Thread.interrupt()中断,即使检测到了Thread.interruptted一样会继续尝试获取锁,失败则继续休眠。只是在最后获取锁成功之后在把当前线程置为中断状态)
    4. 判断当前元素个数是否等于数组的长度,如果相等,则调用notFull.await()进行等待,即当队列满的时候,将会等待
    5. 将元素插入到队列中
    6. 解锁(这里一定要在finally中解锁啊!!!)

    enqueue(E x)将元素插入到数组啊item中

    /**
         * Inserts element at current put position, advances, and signals.
         * Call only when holding lock.
         */
        private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            notEmpty.signal();
        }

    该方法内部通过putIndex索引直接将元素添加到数组items中

        这里思考一个问题 为什么当putIndex索引大小等于数组长度时,需要将putIndex重新设置为0?

       这是因为当队列是先进先出的 所以获取元素总是从队列头部获取,而添加元素从中从队列尾部获取。所以当队列索引(从0开始)与数组长度相等时,所以下次我们就需要从数组头部开始添加了;

    最后当插入成功后,通过notEmpty唤醒正在等待取元素的线程

    阻塞队列中和put对应的就是take了

    下边是take方法的实现

     public E take() throws InterruptedException {
       final ReentrantLock lock = this.lock;
       lock.lockInterruptibly();
        try {
              while (count == 0)
               notEmpty.await();
                return dequeue();
         finally {
            lock.unlock();
         }
     }

    take方法其实很简单,队列中有数据就删除没有就阻塞,注意这个阻塞是可以中断的,如果队列没有数据那么就加入notEmpty条件队列等待(有数据就直接取走,方法结束),如果有新的put线程添加了数据,那么put操作将会唤醒take线程; 

    可以看到take的实现跟put方法实现很类似,只不过put方法等待的是notFull信号,而take方法等待的是notEmpty信号。(等的就是上文的put中的信号)当数组的数量为空时,也就是无任何数据可以被取出来的时候,notEmpty这个Condition就会进行阻塞,直到被notEmpty唤醒

    dequeue的实现如下

    private E dequeue() {
            final Object[] items = this.items;
            E x = (E) items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
            notFull.signal();
            return x;
        }

    take方法主要是从队列头部取元素,可以看到takeIndex是取元素的时候的偏移值,而put中是putIndex控制添加元素的偏移量,由此可见,put和take操作的偏移量分别是由putIndex和takeIndex控制的。其实仔细观察put和take的实现思路是有很多相似之处。

    • offer(E o, long timeout, TimeUnit unit)的实现方式其实和put的思想是差不多的区别是 offer在阻塞的时候调用的不是await()方法而是awaitNanos(long nanosTimeout) 带超时响应的等待(PS:具体区别可以参考我之前写的关于锁的博客《JAVA并发之锁的使用浅析》)
    • poll(long timeout, TimeUnit unit)的实现也是这样在take的基础上加了超时响应。感兴趣的朋友可以自行去看一下

    案例分析

    模拟食堂的经历,食堂窗口端出一道菜放在台面,然后等待顾客消费。写到代码里就是食堂窗口就是一个生产者线程,顾客就是消费者线程,台面就是阻塞队列。

    public class TestBlockingQueue {
      /**
       * 生产和消费业务操作
       *
       *
       */
      protected class WorkDesk {
    
        BlockingQueue<String> desk = new LinkedBlockingQueue<String>(8);
    
        public void work() throws InterruptedException {
          Thread.sleep(1000);
          desk.put("端出一道菜");
        }
    
        public String eat() throws InterruptedException {
          Thread.sleep(4000);
          return desk.take();
        }
    
      }
    
      /**
       * 生产者类
       *
       *
       */
      class Producer implements Runnable {
    
        private String producerName;
        private WorkDesk workDesk;
    
        public Producer(String producerName, WorkDesk workDesk) {
          this.producerName = producerName;
          this.workDesk = workDesk;
        }
    
        @Override
        public void run() {
          try {
            for (;;) {
    
              workDesk.work();
              System.out.println(producerName + "端出一道菜" +",Data:"+System.currentTimeMillis());
    
            }
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      }
    
      /**
       * 消费者类
       *
       *
       */
      class Consumer implements Runnable {
        private String consumerName;
        private WorkDesk workDesk;
    
        public Consumer(String consumerName, WorkDesk workDesk) {
          this.consumerName = consumerName;
          this.workDesk = workDesk;
        }
    
        @Override
        public void run() {
          try {
            for (;;) {
              workDesk.eat();
              System.out.println(consumerName + "端走了一个菜"+",Data:"+System.currentTimeMillis());
    
            }
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      }
    
      public static void main(String args[]) throws InterruptedException {
        TestBlockingQueue testQueue = new TestBlockingQueue();
        WorkDesk workDesk = testQueue.new WorkDesk();
    
        ExecutorService service = Executors.newFixedThreadPool(6);
        //四个生产者线程
        for (int i=1;i<=4;++i) {
          service.submit(testQueue.new Producer("食堂窗口-"+ i+"-", workDesk));
        }
    
        //两个消费者线程
        Consumer consumer1 = testQueue.new Consumer("顾客-1-", workDesk);
        Consumer consumer2 = testQueue.new Consumer("顾客-2-", workDesk);
    
        service.submit(consumer1);
        service.submit(consumer2);
        service.shutdown();
      }
    
    }

    结果部分如下

      

         可以看到当生产者产生的数据达到阻塞队列的容量时,生成者线程会阻塞,等待消费者线程进行消费,上述案例中最大容量为8个盘子,所以当食堂做好了8个菜后了8会等待顾客进行消费,消费后继续生产。上述案例使用阻塞队列,看起来代码要简单得多,不需要再单独考虑同步和线程间通信的问题。

     在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。

     阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,如线程池中就使用了阻塞队列,其实只要符合生产者-消费者模型的都可以使用阻塞队列。

    参考资料:

    《Java编程思想》

      https://www.cnblogs.com/dolphin0520/p/3932906.html

      https://www.cnblogs.com/superfj/p/7757876.html

  • 相关阅读:
    POJ 1751 Highways (kruskal)
    POJ 2031 Building a Space Station
    UVA 624
    POJ 1502 MPI Maelstrom (Dijkstra)
    POJ 3259 Wormholes(SPFA判负环)
    HZAU 1199 Little Red Riding Hood(水DP)
    HZAU 1205 Sequence Number(最大值前后缀 +双指针 + 二分)
    HZAU 1209 Deadline (hash 贪心 水题不水)
    STL完整版整理
    set集合完整版整理
  • 原文地址:https://www.cnblogs.com/NathanYang/p/11276428.html
Copyright © 2011-2022 走看看