zoukankan      html  css  js  c++  java
  • 06、Java进阶--阻塞队列

    阻塞队列

    阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

    阻塞队列有两个常见的阻塞场景,它们分别是:

    (1)当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。

    (2)当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。

    支持以上两种阻塞场景的队列被称为阻塞队列。

    BlockingQueue核心方法

    offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里。

    offer(E o,long timeout,TimeUnit unit):可以设定等待的时间。

    put(anObject):将anObject加到BlockingQueue里。

    poll(time):取走 BlockingQueue 里排在首位的对象。

    poll(long timeout,TimeUnit unit):从BlockingQueue中取出一个队首的对象。

    take():取走BlockingQueue里排在首位的对象。

    drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数)。

    Java中的阻塞队列

    在Java中提供了7个阻塞队列,它们分别如下所示。

    名称 描述
    ArrayBlockingQueue 由数组结构组成的有界阻塞队列。
    LinkedBlockingQueue 由链表结构组成的有界阻塞队列。
    PriorityBlockingQueue 支持优先级排序的无界阻塞队列。
    DelayQueue 使用优先级队列实现的无界阻塞队列。
    SynchronousQueue 不存储元素的阻塞队列。
    LinkedTransferQueue 由链表结构组成的无界阻塞队列。
    LinkedBlockingDeque 由链表结构组成的双向阻塞队列。

    下面分别介绍这些阻塞队列。

    1、ArrayBlockingQueue

    它是用数组实现的有界阻塞队列,并按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证线程公平地访问队列。

    公平访问队列就是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列。即先阻塞的生产者线程,可以先往队列里插入元素;

    先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。我们可以使用以下代码创建一个公平的阻塞队列,如下所示:

    ArrayBlockingQueue fairQueue=new ArrayBlockingQueue(2000,true);
    

    2、LinkedBlockingQueue

    它是基于链表的阻塞队列,同ArrayListBlockingQueue类似,此队列按照先进先出(FIFO)的原则对元素进行排序,其内部也维持着一个数据缓冲队列。

    如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE)。

    如果生产者速度大于消费者速度,还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽。

    3、PriorityBlockingQueue

    它是一个支持优先级的无界队列。默认情况下元素采取自然顺序升序排列。

    这里可以自定义实现compareTo()方法来指定元素进行排序规则;或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。

    但其不能保证同优先级元素的顺序。

    4、DelayQueue

    它是一个支持延时获取元素的无界阻塞队列。

    队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口。创建元素时,可以指定元素到期的时间,只有在元素到期时才能从队列中取走。

    5、SynchronousQueue

    它是一个不存储元素的阻塞队列。

    每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。因此此队列内部其实没有任何一个元素,或者说容量是0,

    严格来说它并不是一种容器。由于队列没有容量,因此不能调用peek操作。

    6、LinkedTransferQueue

    它是一个由链表结构组成的无界阻塞TransferQueue队列。

    LinkedTransferQueue实现了一个重要的接口TransferQueue。该接口含有5个方法,其中有3个重要的方法,它们分别如下所示:

    transfer(E e):若当前存在一个正在等待获取的消费者线程,则立刻将元素传递给消费者;如果没有消费者在等待接收数据,就会将元素插入到队列尾部,并且等待进入阻塞状态,直到有消费者线程 取走该元素。

    tryTransfer(E e):若当前存在一个正在等待获取的消费者线程,则立刻将元素传递给消费者;若不存在,则返回 false,并且不进入队列,这是一个不阻塞的操作。

    tryTransfer(E e,long timeout,TimeUnit unit):若当前存在一个正在等待获取的消费者线程,则立刻将元素传递给消费者;若不存在则将元素插入到队列尾部,并且等待消费者线程取走该元素。

    7、LinkedBlockingDeque

    它是一个由链表结构组成的双向阻塞队列。

    双向队列可以从队列的两端插入和移出元素,因此在多线程同时入队时,也就减少了一半的竞争。由于是双向的,因此LinkedBlockingDeque多了addFirst、addLast、offerFirst、offerLast、peekFirst、peekLast等方法。

    阻塞队列的原理

    以ArrayBlockingQueue为例,我们先来看看代码,如下所示:

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
        private static final long serialVersionUID = -817911632652898426L;
        final Object[] items;
        int takeIndex;
        int putIndex;
        int count;
        final ReentrantLock lock;
        private final Condition notEmpty;
        private final Condition notFull;
        ......
    }
    

    ArrayBlockingQueue 是维护一个 Object 类型的数组,takeIndex 和putIndex分别表示队首元素和队尾元素的下标,count表示队列中元素的个数,lock则是一个可重入锁,notEmpty和notFull是等待条件。接下来我们看看关键方法put,代码如下所示:

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    

    从 put 方法的实现可以看出,它先获取了锁,并且获取的是可中断锁,然后判断当前元素个数是否等于数组的长度,如果相等,则调用notFull.await()进行等待。

    当此线程被其他线程唤醒时,通过enqueue(e)方法插入元素,最后解锁。接下来看看enqueue(e)方法,如下所示:

    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
    

    插入成功后,通过notEmpty唤醒正在等待取元素的线程。再来看看take方法。

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    

    跟put方法实现类似,put方法等待的是notFull信号,而take方法等待的是notEmpty信号。

    在take方法中,如果可以取元素,则通过dequeue方法取得元素。下面是dequeue方法的实现。

    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }
    

    跟enqueue方法类似,在获取元素后,通过notFull的signal方法来唤醒正在等待插入元素的线程。

    阻塞队列的使用场景

    除了线程池的实现使用阻塞队列外,我们还可以在生产者-消费者模式中使用阻塞队列:首先使用Object.wait()、Object.notify()和非阻塞队列实现生产者-消费者模式,代码如下所示:

    public class Main {
        private static int queueSize = 10;
        private static PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
        public static void main(String[] args) {
            Producer producer = new Producer();
            Consumer consumer = new Consumer();
            producer.start();
            consumer.start();
        }
    
        static class Consumer extends Thread {
            @Override
            public void run() {
                while (true){
                    synchronized (queue){
                        while (queue.size() == 0){
                            try {
                                System.out.println("队列空,等待数据");
                                queue.wait();
                            }catch (InterruptedException e){
                                e.printStackTrace();
                                queue.notify();
                            }
                        }
                        // 每次移走队首元素
                        queue.poll();
                        queue.notify();
                    }
                }
            }
        }
    
        static class Producer extends Thread {
            @Override
            public void run() {
                while (true){
                    synchronized (queue){
                        while (queue.size() == queueSize){
                            try {
                                System.out.println("队列满,等待有空余空间");
                                queue.wait();
                            }catch (InterruptedException e){
                                e.printStackTrace();
                                queue.notify();
                            }
                        }
                        // 每次插入一个元素
                        queue.offer(1);
                        queue.notify();
                    }
                }
            }
        }
    }
    

    下面是使用阻塞队列实现的生产者-消费者模式:

    public class Main {
        private static int queueSize = 10;
        private static ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(queueSize);
    
        public static void main(String[] args) {
            Producer producer = new Producer();
            Consumer consumer = new Consumer();
            producer.start();
            consumer.start();
        }
    
        static class Consumer extends Thread {
            @Override
            public void run() {
                while(true){
                    try {
                        queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        static class Producer extends Thread {
            @Override
            public void run() {
                try {
                    queue.put(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    很显然,使用阻塞队列实现无须单独考虑同步和线程间通信的问题,其实现起来很简单。

  • 相关阅读:
    SpringBoot(五)-- 整合Spring的拦截器
    SpringBoot(四)-- 整合Servlet、Filter、Listener
    SpringBoot(三)-- 整合FreeMarker模板
    XML转JSON工具类
    SpringBoot(二)-- 支持JSP
    SpringBoot(一)-- 知识点介绍
    导出Excel工具类
    Linux CentOS6.5上搭建环境遇到的问题
    网络环境未能通过安全验证,请稍候再试
    Struts2,Spring,Hibernate框架的优缺点
  • 原文地址:https://www.cnblogs.com/pengjingya/p/14984456.html
Copyright © 2011-2022 走看看