zoukankan      html  css  js  c++  java
  • Java中的阻塞队列

    1. 什么是阻塞队列

    阻塞队列(BlockingQueue)是 Java 5 并发新特性中的内容,阻塞队列的接口是 java.util.concurrent.BlockingQueue,它提供了两个附加操作:当队列中为空时,从队列中获取元素的操作将被阻塞;当队列满时,向队列中添加元素的操作将被阻塞。

    阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器。

    阻塞队列提供了四种操作方法

    • 抛出异常:当队列满时,再向队列中插入元素,则会抛出IllegalStateException异常。当队列空时,再向队列中获取元素,则会抛出NoSuchElementException异常。
    • 返回特殊值:当队列满时,向队列中添加元素,则返回false,否则返回true。当队列为空时,向队列中获取元素,则返回null,否则返回元素。
    • 一直阻塞:当阻塞队列满时,如果生产者向队列中插入元素,则队列会一直阻塞生产线程,直到队列可用或响应中断退出。当阻塞队列为空时,如果消费者线程向阻塞队列中获取数据,则队列会一直阻塞消费线程,直到队列可用或响应中断退出。
    • 超时退出:当队列满时,如果生产线程向队列中添加元素,则队列会阻塞生产线程一段时间,超过指定的时间则退出返回false。当队列为空时,消费线程从队列中移除元素,则队列会阻塞一段时间,如果超过指定时间退出返回null。

    2. Java中的阻塞队列

     

    下面分别简单介绍一下:

    1. ArrayBlockingQueue:是一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。【注:每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程的请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是当前等待时间最长的线程先获取锁】

    2. LinkedBlockingQueue:一个由链表结构组成的有界队列,此队列的长度为Integer.MAX_VALUE。此队列按照先进先出的顺序进行排序。
    3. PriorityBlockingQueue: 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。
    4. DelayQueue: 一个实现PriorityBlockingQueue且实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。
    5. SynchronousQueue: 没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。支持公平锁和非公平锁。
    6. LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。
    7. LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。

    3. 阻塞队列的实现原理

    这里分析下ArrayBlockingQueue的实现原理。构造方法:

    ArrayBlockingQueue(int capacity);
    ArrayBlockingQueue(int capacity, boolean fair);
    ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)

    ArrayBlockingQueue提供了三种构造方法,参数含义如下:

    • capacity:容量,即队列大小。
    • fair:是否公平锁。
    • c:队列初始化元素,顺序按照Collection遍历顺序。

    插入元素

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    从源码可以看出,生产者首先获得锁lock,然后判断队列是否已经满了,如果满了,则等待,直到被唤醒,然后调用enqueue插入元素。

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

    以上是enqueue的实现,实现的操作是插入元素到一个环形数组,然后唤醒notEmpty上阻塞的线程。

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    从源码可以看出,消费者首先获得锁,然后判断队列是否为空,为空,则等待,直到被唤醒,然后调用dequeue获取元素。

    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

    以上是dequeue的实现,获取环形数组当前takeIndex的元素,并及时将当前元素置为null,设置下一次takeIndex的值takeIndex++,然后唤醒notFull上阻塞的线程。

    4. 阻塞队列的基本使用

    使用阻塞队列实现生产者-消费者模式

    /**
     * Created by noly on 2017/5/19.
     */
    public class BlockingQueueTest {
    
        public static void main (String[] args) {
            ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);
    
            Consumer consumer = new Consumer(queue);
            Producer producer = new Producer(queue);
    
            producer.start();
            consumer.start();
        }
    }
    
    class Consumer extends Thread {
        private ArrayBlockingQueue<Integer> queue;
        public Consumer(ArrayBlockingQueue<Integer> queue){
            this.queue = queue;
        }
        @Override
        public void run() {
            while(true) {
                try {
                    Integer i = queue.take();
                    System.out.println("消费者从队列取出元素:" + i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    class Producer extends Thread {
        private ArrayBlockingQueue<Integer> queue;
        public Producer(ArrayBlockingQueue<Integer> queue){
            this.queue = queue;
        }
        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    queue.put(i);
                    System.out.println("生产者向队列插入元素:" + i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
  • 相关阅读:
    Android Studio 开发
    Jsp编写的页面如何适应手机浏览器页面
    电影
    Oracle 拆分列为多行 Splitting string into multiple rows in Oracle
    sql server 2008 自动备份
    WINGIDE 激活失败
    python安装 错误 “User installations are disabled via policy on the machine”
    ble编程-外设发送数据到中心
    iOS开发-NSString去掉所有换行及空格
    ios9 字符串与UTF-8 互相转换
  • 原文地址:https://www.cnblogs.com/deityjian/p/11087177.html
Copyright © 2011-2022 走看看