zoukankan      html  css  js  c++  java
  • 源码分析之Queue(二)ArrayBlockingQueue

      ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列。线程安全是指,ArrayBlockingQueue内部通过“互斥锁”保护竞争资源,实现了多线程对竞争资源的互斥访问;有界,则是指ArrayBlockingQueue对应的数组是有界限的。 阻塞队列,是指多线程访问竞争资源时,当竞争资源已被某线程获取时,其它要获取该资源的线程需要阻塞等待;而且,ArrayBlockingQueue是按 FIFO(先进先出)原则对元素进行排序,元素都是从尾部插入到队列,从头部开始返回。

    原理及数据结构

    1. ArrayBlockingQueue继承于AbstractQueue,并且它实现了BlockingQueue接口。
    2. ArrayBlockingQueue内部是通过Object[]数组保存数据的,也就是说ArrayBlockingQueue本质上是通过数组实现的。ArrayBlockingQueue的大小,即数组的容量是创建ArrayBlockingQueue时指定的。
    3. ArrayBlockingQueue与ReentrantLock(一把锁)是组合关系,ArrayBlockingQueue中包含一个ReentrantLock对象(lock)。ReentrantLock是可重入的互斥锁,ArrayBlockingQueue就是根据该互斥锁实现“多线程对竞争资源的互斥访问”。而且,ReentrantLock分为公平锁和非公平锁,关于具体使用公平锁还是非公平锁,在创建ArrayBlockingQueue时可以指定;而且,ArrayBlockingQueue默认会使用非公平锁。
    4.  ArrayBlockingQueue与Condition是组合关系,ArrayBlockingQueue中包含两个Condition对象(notEmpty和notFull)。而且,Condition又依赖于ArrayBlockingQueue而存在,通过Condition可以实现对ArrayBlockingQueue的更精确的访问 -- (01)若某线程(线程A)要取数据时,数组正好为空,则该线程会执行notEmpty.await()进行等待;当其它某个线程(线程B)向数组中插入了数据之后,会调用notEmpty.signal()唤醒“notEmpty上的等待线程”。此时,线程A会被唤醒从而得以继续运行。(02)若某线程(线程H)要插入数据时,数组已满,则该线程会它执行notFull.await()进行等待;当其它某个线程(线程I)取出数据之后,会调用notFull.signal()唤醒“notFull上的等待线程”。此时,线程H就会被唤醒从而得以继续运行。

    源码解析:

    public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
    
        /**
         * Serialization ID
         */
        private static final long serialVersionUID = -817911632652898426L;
    
        /** The queued items 使用数组存储队列元素*/
        final Object[] items;
    
        /** items index for next take, poll, peek or remove 取元素的index*/
        int takeIndex;
    
        /** items index for next put, offer, or add 存元素的index*/
        int putIndex;
    
        /** Number of elements in the queue 元素数量*/
        int count;
    
        /** Main lock guarding all access 保证并发访问的锁,俗称的“两把锁”*/
        final ReentrantLock lock;
    
        /** Condition for waiting takes 取元素的非空条件*/
        private final Condition notEmpty;
    
        /** Condition for waiting puts 存元素的非满条件*/
        private final Condition notFull;
    
        /**
         * Shared state for currently active iterators, or null if there are known not to be any.  Allows queue operations to update iterator state.
         */
        transient Itrs itrs = null;
    
        // Internal helper methods
    
        /**
         * Circularly decrement i.
         */
        final int dec(int i) {
            return ((i == 0) ? items.length : i) - 1;
        }
    
        /**
         * Returns item at index i.
         */
        @SuppressWarnings("unchecked")
        final E itemAt(int i) {
            return (E) items[i];
        }
    
        /**
         * Throws NullPointerException if argument is null.
         * @param v the element
         */
        private static void checkNotNull(Object v) {
            if (v == null)
                throw new NullPointerException();
        }
    
        /**
         * Inserts element at current put position, advances, and signals.Call only when holding lock. 元素入队
         */
        private void enqueue(E x) {
            final Object[] items = this.items;
            items[putIndex] = x;     // 如果数组空间满了,又从头开始put
            if (++putIndex == items.length)
                putIndex = 0;
            count++;     // 线程间通信。发出一个信号通知,说明队列不为空。还能取元素
            notEmpty.signal();
        }
    
        /**
         * Extracts element at current take position, advances, and signals.Call only when holding lock. 元素出队
         */
        private E dequeue() {
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];
            items[takeIndex] = null;     //如果taskIndex等于了数组空间的大小,说明队列元素个数已经取完,需要重置为0
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();     // 线程间通信,发出一个信号通知,说明队列不满,还能put元素
            notFull.signal();
            return x;
        }
    
        /**
         * Deletes item at array index removeIndex.Utility for remove(Object) and iterator.remove.
         * Call only when holding lock.
         */
        void removeAt(final int removeIndex) {
            final Object[] items = this.items;
            if (removeIndex == takeIndex) {// 同dequeue()方法
                items[takeIndex] = null;
                if (++takeIndex == items.length)
                    takeIndex = 0;
                count--;
                if (itrs != null)
                    itrs.elementDequeued();
            } else {
                
                final int putIndex = this.putIndex;
                for (int i = removeIndex;;) {
                    int next = i + 1;
                    if (next == items.length)
                        next = 0;
                    if (next != putIndex) { //循环用后一个元素的值赋给前一个元素
                        items[i] = items[next];
                        i = next;
                    } else {
                        items[i] = null;
                        this.putIndex = i;
                        break;
                    }
                }
                count--;
                if (itrs != null)
                    itrs.removedAt(removeIndex);
            }
            notFull.signal();
        }
    
        /**
         * Creates an {@code ArrayBlockingQueue} with the given (fixed) capacity and default access policy.
         * @param capacity the capacity of this queue
         * @throws IllegalArgumentException if {@code capacity < 1}
         */
        public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
        }
    
        /**
         * Creates an {@code ArrayBlockingQueue} with the given (fixed) capacity and the specified access policy.
         *
         * @param capacity the capacity of this queue
         * @param fair if {@code true} then queue accesses for threads blocked on insertion or removal, are processed in FIFO order;
         *        if {@code false} the access order is unspecified. fair设置是否是公平锁
         * @throws IllegalArgumentException if {@code capacity < 1}
         */
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);//如果是公平锁,先到的线程会获得锁对象
            notEmpty = lock.newCondition();//为线程间通信做准备
            notFull =  lock.newCondition();
        }
    
        /**
         * Creates an {@code ArrayBlockingQueue} with the given (fixed) capacity, the specified access policy and initially containing the
         * elements of the given collection,added in traversal order of the collection's iterator.*/
        public ArrayBlockingQueue(int capacity, boolean fair,
                                  Collection<? extends E> c) {
            this(capacity, fair);
    
            final ReentrantLock lock = this.lock;        //保证可见性 不是为了互斥 防止指令重排 保证item的安全
            lock.lock(); // Lock only for visibility, not mutual exclusion
            try {
                int i = 0;
                try {
                    for (E e : c) {
                        checkNotNull(e);
                        items[i++] = e;
                    }
                } catch (ArrayIndexOutOfBoundsException ex) {
                    throw new IllegalArgumentException();
                }
                count = i;
                putIndex = (i == capacity) ? 0 : i;
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Inserts the specified element at the tail of this queue
         * 调用offer方法,增加了异常处理
         * @param e the element to add
         * @return {@code true} (as specified by {@link Collection#add})
         * @throws IllegalStateException if this queue is full
         * @throws NullPointerException if the specified element is null
         */
        public boolean add(E e) {
            return super.add(e);
        }
    
        /**
         * Inserts the specified element at the tail of this queue 如果当前队列已满,则返回false
         * @throws NullPointerException if the specified element is null
         */
        public boolean offer(E e) {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count == items.length)
                    return false;
                else {
                    enqueue(e);
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Inserts the specified element at the tail of this queue, waiting for space to become available if the queue is full.
         * 增加元素,队列满则阻塞等待
         * @throws InterruptedException {@inheritDoc}
         * @throws NullPointerException {@inheritDoc}
         */
        public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();//和lock()的区别是在阻塞时也可抛异常跳出
            try {       //使用while而不是if,是因为可能多个线程阻塞在lock上,即使唤醒了可能其他线程先一步修改了队列又变成满的,因此必须重新判断
               while (count == items.length)
                    notFull.await();//notFull等待表示现在队列满了,等待被唤醒
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Inserts the specified element at the tail of this queue, waiting up to the specified wait time for space to become available if
         * the queue is full. 如果在指定的等待时间内,还是等不到队列有空的位置,则返回false
         *
         * @throws InterruptedException {@inheritDoc}
         * @throws NullPointerException {@inheritDoc}
         */
        public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
    
            checkNotNull(e);
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length) {
                    if (nanos <= 0)
                        return false;
                    nanos = notFull.awaitNanos(nanos);
                }
                enqueue(e);
                return true;
            } finally {
                lock.unlock();
            }
        }
    
        public E poll() { 
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return (count == 0) ? null : dequeue();
            } finally {
                lock.unlock();
            }
        }
    
        public E take() throws InterruptedException {//取元素,队列为空时阻塞等待
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    
        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0) {
                    if (nanos <= 0)
                        return null;
                    nanos = notEmpty.awaitNanos(nanos);
                }
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    
        public E peek() { //返回元素,但不删除
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return itemAt(takeIndex); // null when queue is empty
            } finally {
                lock.unlock();
            }
        }
    
        // this doc comment is overridden to remove the reference to collections
        // greater in size than Integer.MAX_VALUE
        /**
         * @return the number of elements in this queue
         */
        public int size() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return count;
            } finally {
                lock.unlock();
            }
        }
    
        // this doc comment is a modified copy of the inherited doc comment,without the reference to unlimited queues.
        /**
         * 剩余空间大小*/
        public int remainingCapacity() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return items.length - count;
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * 将队列转移到集合中*/
        public int drainTo(Collection<? super E> c) {
            return drainTo(c, Integer.MAX_VALUE);
        }
    
        /**
         * @throws UnsupportedOperationException {@inheritDoc}
         * @throws ClassCastException            {@inheritDoc}
         * @throws NullPointerException          {@inheritDoc}
         * @throws IllegalArgumentException      {@inheritDoc}
         */
        public int drainTo(Collection<? super E> c, int maxElements) {
            checkNotNull(c);
            if (c == this)
                throw new IllegalArgumentException();
            if (maxElements <= 0)
                return 0;
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int n = Math.min(maxElements, count);
                int take = takeIndex;
                int i = 0;
                try {
                    while (i < n) {
                        @SuppressWarnings("unchecked")
                        E x = (E) items[take];
                        c.add(x);
                        items[take] = null;
                        if (++take == items.length)
                            take = 0;
                        i++;
                    }
                    return n;
                } finally {
                    // Restore invariants even if c.add() threw
                    if (i > 0) {
                        count -= i;
                        takeIndex = take;
                        if (itrs != null) {
                            if (count == 0)
                                itrs.queueIsEmpty();
                            else if (i > take)
                                itrs.takeIndexWrapped();
                        }
                        for (; i > 0 && lock.hasWaiters(notFull); i--)
                            notFull.signal();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    }
    View Code

    总结:

      1.使用数组来实现队列,我们需要四个变量:Object[] array来存储队列中元素,headIndex和tailIndex分别记录队列头和队列尾,count记录队列的个数。

    • 因为数组的长度是固定,所以当count==array.length时,表示队列已经满了,当count==0的时候,表示队列是空的。
    • 当添加元素的时候,将array[tailIndex] = e将tailIndex位置设置成新元素,之后将tailIndex++自增,然后将count++自增。但是有两点需要注意,在添加之前必须先判断队列是否已满,不然会出现覆盖已有元素。当tailIndex的值等于数组最后一个位置的时候,需要将tailIndex=0,循环利用数组
    • 当删除元素的时候,将先记录下array[headIndex] 元素,之后将headIndex++自增,然后将count--自减。但是有两点需要注意要注意,在删除之前,必须先判断队列是否为空,不然可能会删除已删除的元素。

      2.线程间通信,基于Condition的await()和singal()方法来实现。

      3.ArrayBlockingQueue 是有界的,所以我们在初始化是容量要设计好,因为它是不可以扩容的,

  • 相关阅读:
    eclipse c++
    smb
    osx mount nfs/smb
    0927用的
    0926 隐藏地址栏
    0921笔记
    生成文件并下载
    在线图标
    react redux
    electron
  • 原文地址:https://www.cnblogs.com/ryjJava/p/14328059.html
Copyright © 2011-2022 走看看