zoukankan      html  css  js  c++  java
  • 【Java】Java Queue的简介

    阻塞队列

    阻塞队列有几个实现:

    • ArrayBlockingQueue
    • LinkedBlockingQueue
    • PriorityBlockingQueue
    • DelayQueue
    • SynchronousQueue
    • LinkedTransferQueue
    • LinkedBlockingDeque

    他们的共同父类是AbstractQueue。我们一起看ArrayBlockingQueue的实现。

    ArrayBlockingQueue,数组、有界、出入队阻塞

    数据存储

    数据存储在数组中,用几个变量标记下一个获取或存储的元素:

        /** The queued items */
        final Object[] items; // 用数组存储元素
    
        /** items index for next take, poll, peek or remove */
        int takeIndex; // 返回元素的下标
    
        /** items index for next put, offer, or add */
        int putIndex; // 插入元素的下标
    
        /** Number of elements in the queue */
        int count; // 数量
    

    阻塞逻辑

    添加、删除元素需要使用ReentrantLock加锁,满队列、空队列情况的等待与唤醒使用各自的Condition:

        public ArrayBlockingQueue(int capacity, boolean fair) {
            ...
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    

    插入元素,返回是否成功

        public boolean offer(E e) {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count == items.length) // 队列满,插入不成,返回false
                    return false;
                else {
                    enqueue(e);
                    return true; // 插入成功,返回true
                }
            } finally {
                lock.unlock();
            }
        }
    

    插入元素,成功返回true,失败抛出异常

    它调用offer方法,插入成功返回true,失败抛出异常:

        public boolean add(E e) {
            if (offer(e))
                return true;
            else
                throw new IllegalStateException("Queue full");
        }
    

    插入元素,队列满了则阻塞

        public void put(E e) throws InterruptedException {
            checkNotNull(e); // e为空抛出异常
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length) // 当队列满了
                    notFull.await(); // notFull的Condition等待条件成熟
                enqueue(e); // 条件成熟了才插入元素
            } finally {
                lock.unlock();
            }
        }
    

    插入元素,队列满了则阻塞指定超时时间

    主体逻辑与put(E e)一致,只是加了超时逻辑:

        public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
    
            checkNotNull(e);
            long nanos = unit.toNanos(timeout); // 将超时时间转换为Nano单位
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length) {
                    if (nanos <= 0) // 超时了,返回false
                        return false;
                    nanos = notFull.awaitNanos(nanos); // Condition等待指定时间
                }
                enqueue(e);
                return true; // 超时时间内插入成功,返回true
            } finally {
                lock.unlock();
            }
        }
    

    删除元素,返回是否删除成功

        public boolean remove(Object o) {
            if (o == null) return false;
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count > 0) {
                    final int putIndex = this.putIndex;
                    int i = takeIndex;
                    do {
                        if (o.equals(items[i])) { // 遍历到要删除的元素,删除并返回true
                            removeAt(i);
                            return true;
                        }
                        if (++i == items.length)
                            i = 0;
                    } while (i != putIndex);
                }
                return false; // 遍历完毕,没有找到,返回false
            } finally {
                lock.unlock();
            }
        }
    

    删除元素,返回删除的元素,没匹配到返回null

        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return (count == 0) ? null : dequeue();
            } finally {
                lock.unlock();
            }
        }
    

    删除元素,队列为空则阻塞

        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0) // 队列为空
                    notEmpty.await(); // 元素出队,队列元素为空,等待非空条件成熟
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    

    入队逻辑,入队成功后同时非空条件成熟:

        private void enqueue(E x) { // 入队
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            notEmpty.signal(); // 元素入队后,通知非空条件已成熟
        }
    

    删除元素,队列为空阻塞指定超时时间

    主体逻辑与take()一直,但有等待超时逻辑:

        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout); // 转化为nano单位
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0) {
                    if (nanos <= 0)
                        return null;
                    nanos = notEmpty.awaitNanos(nanos); // 等待指定超时时间
                }
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    

    LinkedBlockingQueue,链表、有界、出入队阻塞

    存储结构

    用链表作为存储,Node是链表的节点:

        static class Node<E> {
            E item; // 元素值
    
            /**
             * One of:
             * - the real successor Node
             * - this Node, meaning the successor is head.next
             * - null, meaning there is no successor (this is the last node)
             */
            Node<E> next; // 下一节点
    
            Node(E x) { item = x; } // 构造方法
        }
    

    PriorityBlockingQueue,无界,出队阻塞

    出队阻塞

    它是无界的,所以只有出队时队列无元素才会堵塞,依赖notEmpty的Condition:

        /**
         * Condition for blocking when empty
         */
        private final Condition notEmpty;
    

    优先级顺序

    它的优先级依赖比较器:

        /**
         * The comparator, or null if priority queue uses elements'
         * natural ordering.
         */
        private transient Comparator<? super E> comparator;
    

    DelayQueue,无界、出队阻塞、等待指定时间才能出队

    数据存储

    它的数据实现依赖于PriorityQueue,所以队列的元素需实现Comparable:

        private final PriorityQueue<E> q = new PriorityQueue<E>();
    

    出队

        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                E first = q.peek(); // 获取下一个即将的出队元素
                if (first == null || first.getDelay(NANOSECONDS) > 0) // 如果无出队元素,或出队元素的时间未到
                    return null;
                else
                    return q.poll(); // 实际的出队
            } finally {
                lock.unlock();
            }
        }
    

    阻塞出队

        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    E first = q.peek(); // 获取将要出队的元素
                    if (first == null) // 为空,则等待
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0) // 时间已到,出队,跳出方法
                            return q.poll();
                        first = null; // don't retain ref while waiting // 等待期间取消引用
                        if (leader != null) # TODO,未理解透彻
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread; // 当前线程赋予leader
                            try {
                                available.awaitNanos(delay); // 等待剩余时间
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && q.peek() != null)
                    available.signal();
                lock.unlock();
            }
        }
    

    SynchronousQueue,阻塞队列,不存储元素

    依赖于TransferQueue和TransferStack

    它可设置是否公平,分别依赖于TransferQueue和TransferStack,默认非公平

        public SynchronousQueue(boolean fair) {
            transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
        }
    

    阻塞入队和出队

        public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            if (transferer.transfer(e, false, 0) == null) {
                Thread.interrupted();
                throw new InterruptedException();
            }
        }
    
        public E take() throws InterruptedException {
            E e = transferer.transfer(null, false, 0);
            if (e != null)
                return e;
            Thread.interrupted();
            throw new InterruptedException();
        }
    
  • 相关阅读:
    每周进度条07
    软件需求模式阅读笔记06
    每周进度条06
    软件需求模式阅读笔记05
    Django之ModelForm组件
    Django的性能优化
    分页,缓存,序列化,信号
    Django补充——中间件、请求的生命周期等
    Git基础介绍和使用
    Django基础之三
  • 原文地址:https://www.cnblogs.com/nick-huang/p/7392704.html
Copyright © 2011-2022 走看看