zoukankan      html  css  js  c++  java
  • ArrayBlockingQueue 源码分析

    ArrayBlockingQueue

    ArrayBlockingQueue 能解决什么问题?什么时候使用 ArrayBlockingQueue?

    1)ArrayBlockingQueue 是底层由数组支持的有界阻塞队列,队列按照 FIFO 顺序对元素进行排序,读取元素从头部开始,写入元素追加到队列尾部。
    2)当容量超出限制时,put 写入操作将被阻塞;当队列为空时,take 读取操作将被阻塞。
    3)offer 操作支持 fast-fail 写入和超时写入,poll 支持 fast-fail 读取和超时读取。
    4)ArrayBlockingQueue 支持对等待的生产者线程和消费者线程进行排序的可选公平策略,默认情况下是非公平的,公平模式下会降低其吞吐量。
    5)ArrayBlockingQueue 使用 ReentrantLock 来保证线程安全。
    

    如何使用 ArrayBlockingQueue?

    1)生成者消费者并发读写的场景下,并且生产者和消费者基本平衡。
    

    使用 ArrayBlockingQueue 有什么风险?

    1)读写操作使用相同的互斥锁,不支持并发读写,相对于 LinkedBlockingQueue 而言性能不是特别高。
    2)消费者和生产者不平衡时,高并发读写会阻塞操作线程导致大量线程等待,浪费资源。
    

    ArrayBlockingQueue 核心操作的实现原理?

    • 创建实例
        /** 底层存储元素的对象数组 */
        final Object[] items;
    
        /** 下一次 take, poll, peek or remove 操作的目标元素索引 */
        int takeIndex;
    
        /** 下一次 put, offer, or add 操作的目标元素索引*/
        int putIndex;
    
        /** 队列中的已有元素个数 */
        int count;
    
        /*
         * Concurrency control uses the classic two-condition algorithm
         * found in any textbook.
         */
    
        /** 保证线程安全访问的可重入互斥锁 */
        final ReentrantLock lock;
    
        /** 读取元素时,队列为空,则在该条件上阻塞 */
        private final Condition notEmpty;
    
        /** 写入元素时,队列已满,则在该条件上阻塞 */
        private final Condition notFull;
    
        /**
         * 创建最大容量为 capacity 的非公平有界阻塞队列。
         */
        public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
        }
    
        /**
         * 1)fair=true,创建最大容量为 capacity 的公平有界阻塞队列。
         * 2)fair=true,创建最大容量为 capacity 的非公平有界阻塞队列。
         */
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0) {
                throw new IllegalArgumentException();
            }
            this.items = new Object[capacity];
            // 通过 ReentrantLock 来保证多线程并发安全性
            lock = new ReentrantLock(fair);
            // 读取元素时队列为空,则在非空添加上阻塞等待
            notEmpty = lock.newCondition();
            // 写入元素时队列已满,则在非满条件上阻塞等待
            notFull =  lock.newCondition();
        }
    
    • 写入元素:获取互斥锁,进入条件队列后释放互斥锁,节点被转移到同步队列中并获取互斥锁,put 操作完毕释放互斥锁。
        /**
         * 将元素添加到队列尾部,如果队列已满,则阻塞当前线程。
         * 可响应线程中断
         */
        @Override
        public void put(E e) throws InterruptedException {
            Objects.requireNonNull(e);
            // 读取互斥锁
            final ReentrantLock lock = this.lock;
            // 可中断地获取互斥锁
            lock.lockInterruptibly();
            try {
                // 如果队列已满
                while (count == items.length) {
                    /**
                     * 则将当前线程加入非满的条件队列,并阻塞等待唤醒,
                     * 当前线程被唤醒后会再次尝试将目标元素加入到队列中,可以被重复阻塞。
                     */
                    notFull.await();
                }
                // 将元素添加到队列尾部
                enqueue(e);
            } finally {
                // 释放锁
                lock.unlock();
            }
        }
    
    AbstractQueuedSynchronizer#
            /**
             * 当前线程在指定条件上阻塞等待
             */
            public final void await() throws InterruptedException {
                // 线程被设置了中断标识
                if (Thread.interrupted()) {
                    throw new InterruptedException();
                }
                // 创建一个新的节点并将其加入到条件队列尾部
                final Node node = addConditionWaiter();
                // 尝试释放互斥锁,并返回同步状态
                final int savedState = fullyRelease(node);
                int interruptMode = 0;
                // 如果新建节点在条件队列中
                while (!isOnSyncQueue(node)) {
                    // 阻塞当前线程,等待被 signal 唤醒或被其他线程中断
                    LockSupport.park(this);
                    // 读取线程的中断状态,如果未被中断,则退出循环
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
                        break;
                    }
                }
                /**
                 * 1)在独占、线程不可中断的模式下获取锁,如果线程被中断
                 * && 是在被唤醒后中断,则更新中断模式。
                 */
                if (acquireQueued(node, savedState) && interruptMode != ConditionObject.THROW_IE) {
                    interruptMode = ConditionObject.REINTERRUPT;
                }
                // 当前节点存在后置节点
                if (node.nextWaiter != null) {
                    // 踢除无效节点
                    unlinkCancelledWaiters();
                }
                // 如果中断模式不是 0
                if (interruptMode != 0) {
                    reportInterruptAfterWait(interruptMode);
                }
            }
    
            private Node addConditionWaiter() {
                // 互斥锁没有被当前线程持有
                if (!isHeldExclusively()) {
                    // 则抛出 IllegalMonitorStateException 异常
                    throw new IllegalMonitorStateException();
                }
                // 读取最后一个等待节点
                Node t = lastWaiter;
                // 条件队列中的最后一个节点已经被取消
                if (t != null && t.waitStatus != Node.CONDITION) {
                    // 踢除所有被取消的节点
                    unlinkCancelledWaiters();
                    // 重新读取条件队列尾节点
                    t = lastWaiter;
                }
                // 创建一个条件节点,waitStatus=-2
                final Node node = new Node(Node.CONDITION);
                // 尾节点为空,表示当前节点是第一个有效的节点
                if (t == null) {
                    // 设置头节点
                    firstWaiter = node;
                } else {
                    // 更新旧尾节点的后置节点,即将当前节点链接到条件队列尾部
                    t.nextWaiter = node;
                }
                // 更新尾节点
                lastWaiter = node;
                return node;
            }
    
            private void unlinkCancelledWaiters() {
                // 读取条件队列头节点
                Node t = firstWaiter;
                Node trail = null;
                while (t != null) {
                    // 读取后置节点
                    final Node next = t.nextWaiter;
                    // 当前节点的同步状态不是 Node.CONDITION,则需要踢除
                    if (t.waitStatus != Node.CONDITION) {
                        t.nextWaiter = null;
                        // 还没有有效的节点
                        if (trail == null) {
                            // 更新头节点
                            firstWaiter = next;
                        } else {
                            // 更新最近一个有效节点的 nextWaiter
                            trail.nextWaiter = next;
                        }
                        // 已经遍历到最后一个节点,并且该节点被取消
                        if (next == null) {
                            // 更新尾节点为最近一个有效节点
                            lastWaiter = trail;
                        }
                    } else {
                        // 写入最近一个有效节点
                        trail = t;
                    }
                    // 递归处理下一个节点
                    t = next;
                }
            }
    
            /**
             * 释放互斥锁
             */
            final int fullyRelease(Node node) {
                try {
                    // 读取同步状态
                    final int savedState = getState();
                    // 在独占模式下释放锁
                    if (release(savedState)) {
                        return savedState;
                    }
                    throw new IllegalMonitorStateException();
                } catch (final Throwable t) {
                    node.waitStatus = Node.CANCELLED;
                    throw t;
                }
            }
    
            /**
             * 在独占模式下释放锁
             */
            public final boolean release(int arg) {
                // 尝试释放锁
                if (tryRelease(arg)) {
                    // 读取头节点
                    final Node h = head;
                    // 头结点的同步状态不为 0,则唤醒其后置节点
                    if (h != null && h.waitStatus != 0) {
                        // 唤醒目标节点的后继节点
                        unparkSuccessor(h);
                    }
                    return true;
                }
                return false;
            }    
    
            /**
             * 形参节点是否在条件队列中
             */
            final boolean isOnSyncQueue(Node node) {
                // 节点等待状态为 Node.CONDITION,或节点无前置节点,都说明其在同步队列中
                if (node.waitStatus == Node.CONDITION || node.prev == null) {
                    return false;
                }
                // 节点的 next 不为 null,则其在条件队列中
                if (node.next != null) {
                    return true;
                }
                // 从尾部开始查找节点
                return findNodeFromTail(node);
            }
    
            /**
             * 从尾部向前查找形参节点,如果其在条件队列中,则返回 true
             */
            private boolean findNodeFromTail(Node node) {
                for (Node p = tail;;) {
                    // 当前节点就是目标节点
                    if (p == node) {
                        return true;
                    }
                    // 已经无前置节点
                    if (p == null) {
                        return false;
                    }
                    // 迭代前一个节点
                    p = p.prev;
                }
            }
    
            /**
             * 检查线程中断状态
             * 1)ConditionObject.THROW_IE 线程在被唤醒前,被其他线程中断
             * 2)ConditionObject.REINTERRUPT 线程在被唤醒后,被其他线程中断
             * 3)0 线程未被中断
             */
            private int checkInterruptWhileWaiting(Node node) {
                return Thread.interrupted() ?
                        transferAfterCancelledWait(node) ? ConditionObject.THROW_IE : ConditionObject.REINTERRUPT :
                            0;
            }
    
            /**
             * 等待条件已经取消,则尝试将当前节点转移到同步队列
             */
            final boolean transferAfterCancelledWait(Node node) {
                // 更新同步状态为 0
                if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
                    // 将当前节点加入到同步队列尾部,等待获取锁
                    enq(node);
                    return true;
                }
                /**
                 * 在节点被转移到同步队列时,线程被唤醒,则自旋等待其完全进入同步队列为止
                 */
                while (!isOnSyncQueue(node)) {
                    Thread.yield();
                }
                return false;
            }
            
            /**
             * 将节点加入到同步队列尾部
             */
            private Node enq(Node node) {
                for (;;) {
                    final Node oldTail = tail;
                    if (oldTail != null) {
                        node.setPrevRelaxed(oldTail);
                        if (compareAndSetTail(oldTail, node)) {
                            oldTail.next = node;
                            return oldTail;
                        }
                    } else {
                        initializeSyncQueue();
                    }
                }
            }
            /**
             * 在独占、线程不可中断的模式下获取锁,当前线程已经在同步队列中。
             */
            final boolean acquireQueued(final Node node, int arg) {
                boolean interrupted = false;
                try {
                    for (;;) {
                        // 读取当前节点的前置节点
                        final Node p = node.predecessor();
                        // 如果前置节点是 head,则尝试获取锁
                        if (p == head && tryAcquire(arg)) {
                            // 获取成功,则设置当前节点为 head
                            setHead(node);
                            p.next = null; // help GC
                            // 返回线程中断标识
                            return interrupted;
                        }
                        // 当前线程是否需要被阻塞
                        if (AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire(p, node)) {
                            // 阻塞当前线程,并等待唤醒,唤醒后返回其中断状态
                            interrupted |= parkAndCheckInterrupt();
                        }
                    }
                    // 线程运行过程中出现异常
                } catch (final Throwable t) {
                    // 取消当前节点
                    cancelAcquire(node);
                    // 如果线程被设置中断标识
                    if (interrupted) {
                        // 则线程自我中断
                        AbstractQueuedSynchronizer.selfInterrupt();
                    }
                    throw t;
                }
            }
        
            private void reportInterruptAfterWait(int interruptMode)
                    throws InterruptedException {
                // 1)线程在被唤醒前,被其他线程设置了中断标识
                if (interruptMode == ConditionObject.THROW_IE) {
                    // 则抛出 InterruptedException 异常
                    throw new InterruptedException();
                    // 2)线程在被唤醒后,被其他线程设置了中断标识
                } else if (interruptMode == ConditionObject.REINTERRUPT) {
                    // 中断当前线程
                    AbstractQueuedSynchronizer.selfInterrupt();
                }
            }
    
        private void enqueue(E e) {
            // 读取底层对象数组
            final Object[] items = this.items;
            // 插入元素
            items[putIndex] = e;
            // 递增下一个插入索引,如果已经到达数组长度
            if (++putIndex == items.length) {
                // 循环更新到数组头部
                putIndex = 0;
            }
            // 递增元素总数
            count++;
            // 唤醒在非空条件上阻塞等待的单个线程
            notEmpty.signal();
        }
    
    AbstractQueuedSynchronizer#
            /**
             * 唤醒在当前条件上阻塞等待的单个线程
             */
            public final void signal() {
                // 互斥锁没有被当前线程持有
                if (!isHeldExclusively()) {
                    // 则抛出 IllegalMonitorStateException异常 
                    throw new IllegalMonitorStateException();
                }
                // 读取条件队列中第一个等待的节点
                final Node first = firstWaiter;
                if (first != null) {
                    // 将该节点从条件队列转移到同步队列
                    doSignal(first);
                }
            }
    
            /**
             * 将目标节点从条件队列转移到同步队列中
             */
            private void doSignal(Node first) {
                do {
                    // 更新 firstWaiter 为当前节点的后置节点
                    if ( (firstWaiter = first.nextWaiter) == null) {
                        // 如果无后置节点
                        lastWaiter = null;
                    }
                    first.nextWaiter = null;
                // 转移成功则退出循环
                } while (!transferForSignal(first) &&
                        (first = firstWaiter) != null);
            }
    
            /**
             * 将形参节点从条件队列转移到同步队列,转移成功返回 true
             */
            final boolean transferForSignal(Node node) {
                /**
                 * If cannot change waitStatus, the node has been cancelled.
                 * 节点在转移前被取消
                 */
                if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
                    return false;
                }
    
                // 将节点加入到同步队列尾部
                final Node p = enq(node);
                // 读取其同步状态
                final int ws = p.waitStatus;
                /**
                 * 1)转移成功后线程被中断
                 * 2)原子更新状态失败,即节点被取消
                 */
                if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) {
                    // 唤醒驻留其上的线程
                    LockSupport.unpark(node.thread);
                }
                return true;
            }
    
    • 读取元素
        @Override
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                // 队列为空
                while (count == 0) {
                    // 则在非空条件上阻塞等待
                    notEmpty.await();
                }
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * 移除并返回队列头部元素
         */
        private E dequeue() {
            final Object[] items = this.items;
            final
            // 读取目标元素
            E e = (E) items[takeIndex];
            // 将目标索引处的元素置空
            items[takeIndex] = null;
            // 循环递增读取索引
            if (++takeIndex == items.length) {
                takeIndex = 0;
            }
            count--;
            if (itrs != null) {
                itrs.elementDequeued();
            }
            // 唤醒在非满条件上等待的线程
            notFull.signal();
            return e;
        }
    
    • 非阻塞添加元素
        /**
         * 1)如果队列已满,则返回 false,元素添加失败。
         * 2)如果队列有可用空间,则添加元素,并返回 true。
         */
        @Override
        public boolean offer(E e) {
            Objects.requireNonNull(e);
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                // 1)队列已满
                if (count == items.length) {
                    return false;
                // 2)队列中有可用位置   
                } else {
                    enqueue(e);
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * 1)如果队列有可用位置,则添加元素到队列尾部,并返回 true。
         * 2)如果队列已满,在指定的超时时间内队列可用,则添加元素;否则,返回 false,添加失败。
         */
        @Override
        public boolean offer(E e, long timeout, TimeUnit unit)
                throws InterruptedException {
    
            Objects.requireNonNull(e);
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length) {
                    if (nanos <= 0L) {
                        return false;
                    }
                    // 在非满条件上等待指定的超时时间
                    nanos = notFull.awaitNanos(nanos);
                }
                enqueue(e);
                return true;
            } finally {
                lock.unlock();
            }
        }
    
    AbstractQueuedSynchronizer#awaitNanos
            /**
             * 1)当前线程被设置了中断标识,则抛出 InterruptedException 异常。
             * 2)阻塞直到当前线程被唤醒、中断、或超时。
             */
            public final long awaitNanos(long nanosTimeout)
                    throws InterruptedException {
                // 如果线程已经被设置了中断标识,则抛出 InterruptedException 异常
                if (Thread.interrupted()) {
                    throw new InterruptedException();
                }
                // 计算截止时间
                final long deadline = System.nanoTime() + nanosTimeout;
                final long initialNanos = nanosTimeout;
                // 往条件队列中添加节点
                final Node node = addConditionWaiter();
                // 尝试释放互斥锁
                final int savedState = fullyRelease(node);
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    // 如果已经超时
                    if (nanosTimeout <= 0L) {
                        // 将节点从条件队列转移到同步队列中
                        transferAfterCancelledWait(node);
                        break;
                    }
                    // 如果超时时间 > 1000 纳秒
                    if (nanosTimeout > AbstractQueuedSynchronizer.SPIN_FOR_TIMEOUT_THRESHOLD) {
                        // 最多阻塞当期线程指定的超时时间
                        LockSupport.parkNanos(this, nanosTimeout);
                    }
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
                        break;
                    }
                    // 计算剩余时间
                    nanosTimeout = deadline - System.nanoTime();
                }
                // 尝试获取互斥锁
                if (acquireQueued(node, savedState) && interruptMode != ConditionObject.THROW_IE) {
                    interruptMode = ConditionObject.REINTERRUPT;
                }
                if (node.nextWaiter != null) {
                    unlinkCancelledWaiters();
                }
                if (interruptMode != 0) {
                    reportInterruptAfterWait(interruptMode);
                }
                // 计算剩余时间
                final long remaining = deadline - System.nanoTime(); // avoid overflow
                return remaining <= initialNanos ? remaining : Long.MIN_VALUE;
            }
    
    • 非阻塞读取元素
        /**
         * 1)如果队列为空,则返回 null
         * 2)如果队列非空,则移除并返回队列头部元素
         * created by ZXD at 2 Dec 2018 T 09:41:30
         * @return
         */
        @Override
        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return count == 0 ? null : dequeue();
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * 1)如果队列非空,则移除并返回头部元素。
         * 2)如果队列为空,则最多在指定的超时时间内等待队列有元素可用,
         *  如果有则移除并返回队列头部;如果已经超时,则返回 null.
         */
        @Override
        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            // 计算以纳秒为单位的超时时间
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0) {
                    if (nanos <= 0L) {
                        return null;
                    }
                    // 在非空条件上最多阻塞 nanos 纳秒
                    nanos = notEmpty.awaitNanos(nanos);
                }
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    
  • 相关阅读:
    A Complete Guide to the <Picture> Element
    html5 在移动端的缩放控制
    新版itunes添加铃声
    html5 背景音乐 js控制播放 暂停
    thinkphp mysql 坐标按距离排序
    jquery ajax跨域 thinkphp getjson
    webkit-box
    Javascript 获取页面高度(多种浏览器)
    怎样实现iMessage群发
    css3背景透明文字不透明
  • 原文地址:https://www.cnblogs.com/zhuxudong/p/10053181.html
Copyright © 2011-2022 走看看