zoukankan      html  css  js  c++  java
  • BlockingQueu 阻塞队列

    java.util.concurrent
    public interface BlockingQueue<E> extends Queue<E>

    简介

    当阻塞队列插入数据时:
        如果队列已经满了,线程则会阻塞等待队列中元素被取出后在插入。
        当从阻塞队列中取数据时,如果队列是空的,则线程会阻塞等待队列中有新元素。
    
    
    阻塞队列(BlockingQueue)是一个支持两个附加操作的队列:
    
        这两个附加的操作是:
    
            在队列为空时,获取元素的线程会等待队列变为非空。
    
            当队列满时,存储元素的线程会等待队列可用。
    
        阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。
        阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
    
    
    ArrayBlockingQueue : 一个由数组结构组成的有界阻塞队列(实现的接口是 BlockingQueue)。
    
    LinkedBlockingQueue : 一个由链表结构组成的有界阻塞队列(实现的接口是 BlockingQueue)。
    
    PriorityBlockingQueue : 一个支持优先级排序的无界阻塞队列(实现的接口是 BlockingQueue)。
    
    DelayQueue: 一个使用优先级队列实现的无界阻塞队列(实现的接口是 BlockingQueue)。
    
    SynchronousQueue: 一个不存储元素的阻塞队列(实现的接口是 BlockingQueue)。
    
    LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列(实现的接口是 BlockingQueue)。
    
    LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列(特例:实现的接口是 BlockingDeque)。
    

    处理方法

    方法描述 抛出异常 返回特殊的值 一直阻塞(中断会抛出异常) 超时退出
    插入数据 add(e) offer(e) put(e) offer(e,time,unit)
    获取并移除队列的头 remove() poll() take() poll(time,unit)
    获取但不移除队列的头 element() peek() 不可用 不可用
    抛出异常: 
        是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。
        当队列为空时,从队列里获取元素时会抛出NoSuchElementEx·ception异常 。
    
    返回特殊值: 
        插入方法会返回是否成功,成功则返回true。
        移除方法,则是从队列里拿出一个元素,如果没有则返回null
    
    一直阻塞: 
        当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。
        当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
    
    超时退出: 
        当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。
    
    
    抛出异常与返回特殊值方法的实现是一样的,只不过对失败的操作的处理不一样!
    通过 AbstractQueue 的源码可以发现,add(e),remove(),element() 都是分别基于offer(),poll(),peek()实现的
    
    public interface BlockingQueue<E> extends Queue<E> {
    
        //将给定元素设置到队列中,如果设置成功返回true, 否则返回false。如果是往限定了长度的队列中设置值,推荐使用offer()方法。
        boolean add(E e);
    
        //将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。
        boolean offer(E e);
    
        //将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
        void put(E e) throws InterruptedException;
    
        //将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.
        boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException;
    
        //从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。
        E take() throws InterruptedException;
    
        //在给定的时间里,从队列中获取值,时间到了直接调用普通的poll方法,为null则直接返回null。
        E poll(long timeout, TimeUnit unit)
            throws InterruptedException;
    
        //获取队列中剩余的空间。
        int remainingCapacity();
    
        //从队列中移除指定的值。
        boolean remove(Object o);
    
        //判断队列中是否拥有该值。
        public boolean contains(Object o);
    
        //将队列中值,全部移除,并发设置到给定的集合中。
        int drainTo(Collection<? super E> c);
    
        //指定最多数量限制将队列中值,全部移除,并发设置到给定的集合中。
        int drainTo(Collection<? super E> c, int maxElements);
    }
    

    BlockingQueue特点

    BlockingQueue不接受 null 元素。
    
    BlockingQueue可以是限定容量的(默认:Integer.MAX_VALUE)。
    
    BlockingQueue实现是线程安全的。
    
    
    与BlockingQueue一样,BlockingDeque是线程安全的,但不允许 null 元素,并且可以有容量限制。
    

    七个阻塞队列

    ArrayBlockingQueue(需指定容量):
        ArrayBlockingQueue是一个用数组实现的有界阻塞队列。 
        此队列按照先进先出(FIFO)的原则对元素进行排序。
    
    
    LinkedBlockingQueue:
        是一个用链表实现的有界阻塞队列。
        此队列的默认和最大长度为Integer.MAX_VALUE。
        此队列按照先进先出的原则对元素进行排序。
    
    
    PriorityBlockingQueue(不会阻塞生产者):
        是一个支持优先级的无界队列。
        默认情况下元素采取自然顺序排列(每个元素都必须实现 Comparable 接口),也可以通过比较器comparator来指定元素的排序规则。元素按照升序排列。
    
        其iterator()方法中提供的迭代器并不保证以特定的顺序遍历 PriorityBlockingQueue 的元素。
        如果需要有序地进行遍历,则应考虑使用Arrays.sort(priorityBlockingQueue.toArray())。
    
    
    DelayQueue:
        无界阻塞队列,只有在延迟期满时才能从中提取元素(如果元素没有达到延时时间,就阻塞当前线程)。
        注意 DelayQueue 的所有方法只能操作“到期的元素“,例如,poll()、remove()、size()等方法,都会忽略掉未到期的元素。
    
        我们可以将DelayQueue运用在以下应用场景:
    
            缓存系统的设计:
                可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,
                一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
    
            定时任务调度:
                使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,
                从比如TimerQueue就是使用DelayQueue实现的。
    
        DelayQueue 的实现是基于 PriorityQueue,是一个优先级队列,是以延时时间的长短进行排序的:
            所以,DelayQueue 需要知道每个元素的延时时间,而这个延时时间是由 Delayed 接口的 getDelay() 方法获取的。
            所以,DelayQueue 的元素必须实现 Delay 接口。
    
    
    SynchronousQueue:
        一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作(只有一个元素)。
        SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。
    
    
    LinkedTransferQueue(队列不满时也可以阻塞):
        是一个由链表结构组成的无界阻塞TransferQueue队列 。相对于其他阻塞队列LinkedTransferQueue多了tryTransfer和transfer方法。
    
        transfer方法(阻塞): 
            如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),
            transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。
            如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并阻塞到该元素被消费者消费了才返回。
    
        tryTransfer方法: 
            则是用来试探下生产者传入的元素是否能直接传给消费者。
            如果没有消费者等待接收元素,则返回false。
            和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回。而transfer方法是必须等到消费者消费了才返回。
    
            对于带有时间限制的 tryTransfer(E e, long timeout, TimeUnit unit)方法 ,
            则是试图把生产者传入的元素直接传给消费者,
            但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。
    
    
    LinkedBlockingDeque:
        是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你可以从队列的两端插入和移出元素。
        有界的阻塞队列,默认长度以及最大长度是 Integer.MAX_VALUE。可在创建时,指定容量。
    
        LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法。
        另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。
        但是take方法却等同于takeFirst。
    

    ArrayBlockingQueue源码分析

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
        final ReentrantLock lock;
    
        private final Condition notEmpty;
    
        private final Condition notFull;
    
        /** items index for next take, poll, peek or remove */
        int takeIndex;
    
        /** items index for next put, offer, or add */
        int putIndex;
    
        /** Number of elements in the queue */
        int count;
    
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    
    
        public boolean add(E e) {
            if (offer(e))
                return true;
            else
                throw new IllegalStateException("Queue full");
        }
    
        public boolean remove(Object o) {
            if (o == null) return false;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count > 0) {
                    final Object[] items = this.items;
                    for (int i = takeIndex, end = putIndex,
                             to = (i < end) ? end : items.length;
                         ; i = 0, to = end) {
                        for (; i < to; i++)
                            if (o.equals(items[i])) {
                                removeAt(i);
                                return true;
                            }
                        if (to == end) break;
                    }
                }
                return false;
            } finally {
                lock.unlock();
            }
        }
    
    
        public boolean offer(E e) {
            Objects.requireNonNull(e);
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count == items.length)
                    return false;
                else {
                    enqueue(e);
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }
    
        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return (count == 0) ? null : dequeue();
            } finally {
                lock.unlock();
            }
        }
    
    
        public void put(E e) throws InterruptedException {
            Objects.requireNonNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }
    
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    
    
        private void enqueue(E e) {
            // assert lock.isHeldByCurrentThread();
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = e;
            if (++putIndex == items.length) putIndex = 0;
            count++;
            notEmpty.signal();
        }
    
        private E dequeue() {
            // assert lock.isHeldByCurrentThread();
            // assert lock.getHoldCount() == 1;
            // assert items[takeIndex] != null;
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E e = (E) items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length) takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
            notFull.signal();
            return e;
        }
    }
    
  • 相关阅读:
    1. Deep Q-Learning
    Ⅶ. Policy Gradient Methods
    Ⅴ. Temporal-Difference Learning
    idea在service窗口中显示多个服务
    pycharm下运行flask框架的脚本时报错
    windows下部署 flask (win10+flask+nginx)
    pip install selenium报错解决方法
    pip 及 selenium安装命令
    动作捕捉系统用于模仿学习
    柔性微创手术机器人性能实验验证
  • 原文地址:https://www.cnblogs.com/loveer/p/11518687.html
Copyright © 2011-2022 走看看