zoukankan      html  css  js  c++  java
  • 30分钟带你了解阻塞队列所有内容,再也不怕面试官刁难你了!(上)

    30分钟带你了解阻塞队列所有内容,再也不怕面试官刁难你了!(上)

    目录

    呕心沥血,耗费一周的时间来看源码,这么优秀的博主你难道不关注一下?

    1、概述

    今天在整理线程池的相关内容时,发现许多面试官对于阻塞队列这块内容问的还是挺挺深入的。事实上也是,公司招人的时候,总不会去招那些只会用,但是不知道为什么的人吧。而且阻塞队列是线程池的核心内容,因此我们将这块内容给搞懂了,这样才能在开发中游刃有余,在面试时镇定自若。
    然而在找寻相关博客的时候,发现除了大部分博客千篇一律的copy之外,一些原创博客出现一些内容不全、不明原理、甚至内容重复的现象,这真的是对我们求知若渴的孩子的打击。因此,我从 IDEA 中的类图功能一个个找过去,给大家解读源码,所以,这里,以我为准
    先给大家看一下BlockingQueue的相关类的图。(???为什么是BlockingQueue?兄弟快去看看线程池构造函数的参数!)

    2、BlockingQueue 源码解析

    BlockingQueue是一个接口,它继承了Queue接口。然后下面是它的方法:

    // 这里我们看到 BlockingQueue 是一个泛型接口,但是我们在线程池中用的是 Runnable 接口也就是线程可执行动作。这里了解一下
    public interface BlockingQueue<E> extends Queue<E> {
    
    	/**
    	*	Q:好的,问题来了,我们看到下面三个方法都是添加元素,那么他们有什么不同嘛?
    	*	A:有的,我们看上面,BlockingQueue 实现了 Queue 接口,Queue 接口又实现了 Collection 接口。其实这三个方法分别属于不同接口中的定义。
    	*	add 方法来源于 Collection 接口,并且在 BlockingQueue 中定义当大小不足时,会抛出 IllegalStateException;
    	*	offer 方法来源于 Queue 接口,并且在 BlockingQueue 中定义当大小不足时,会返回 false 而不是抛错;
    	*	put 方法是 BlockingQueue 接口自己定义的,并且在 BlockingQueue 中定义当大小不足时会挂起,直到有足够的大小。允许被打断,打断会抛出 InterruptedException。
    	*/
    
    	// 添加元素
        boolean add(E e);
    
    	// 添加元素
        boolean offer(E e);
    
    	// 添加元素
        void put(E e) throws InterruptedException;
    
        // 添加元素,并且有一个时间。超过时间就放弃
        boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException;
    
        // 把第一个元素取走并删除,如果没有的话就挂起直到有,打断会抛出 InterruptedException
        E take() throws InterruptedException;
    
        // 也是拿元素,超时就放弃,打断会抛出 InterruptedException
        E poll(long timeout, TimeUnit unit)
            throws InterruptedException;
    
        // 查询剩余容量
        int remainingCapacity();
    
        // 删除元素。这里为什么不用 E 而用 Object 呢?我觉得有两点吧:1、限定了元素为非基础类型;2、在元素比较时需要用到 equals 方法,在注释中有用到
        boolean remove(Object o);
    
        // 查询元素,也是用 Object
        public boolean contains(Object o);
    
        // 将当前所有可用元素放到 C 中,需要子类自己实现。原注释提出,drainTo 方法比迭代调用 poll 元素要好,因为迭代过程出现异常可能会导致元素的丢失
        int drainTo(Collection<? super E> c);
    
        // 将指定大小个元素放到 C 中
        int drainTo(Collection<? super E> c, int maxElements);
    }
    

    3、ArrayBlockingQueue 源码解析

    3-1、ArrayBlockingQueue 概述

    ArrayBlockingQueue是一个有界BlockingQueue,内部存储使用数组ArrayBlockingQueue提供先进先出的机制,提供公平锁和非公平锁的获取(那么有同学可能要问了,既然保证了先进先出那不就肯定是公平的嘛?别急,这个问题我们在下面源码中进行回答)。

    3-2、ArrayBlockingQueue 源码

    我会根据使用逻辑将源码顺序进行调换,不要以出现顺序当做源码中的真实位置。

    3-2-1、成员变量

    首先是一些成员变量:

    	// 存储元素的数组。可以看到是 final 修饰的,意味着一旦 ArrayBlockingQueue 创建之后,大小不能再改变了
        final Object[] items;
    
        // 下一个获取元素的索引值。为什么要这样子呢?因为 ArrayBlockingQueue 规定了获取肯定是从存储最久的元素开始的,如果是链表的话好办,直接改节点就行了,但是数组的话我们就必须用头尾指针表示当前头在哪里,尾在哪里,否则每次拿元素都得进行数组的重构,特别浪费时间。其实这两个索引值是相当于用数组来完成链表的表示的妥协方法
        int takeIndex;
    
        // 同上
        int putIndex;
    
        // 用于记录当前元素个数
        int count;
    
        /*
         * 下面是一些并发控制组件
         */
    
        // 一个可重入锁来保证并发过程中 ArrayBlockingQueue 的正确性。final 保证了这个锁的安全性
        final ReentrantLock lock;
    
        // 一个 condition 
        private final Condition notEmpty;
    
        /** Condition for waiting puts */
        private final Condition notFull;
    
        // 一个内部类迭代器
        transient Itrs itrs = null;
    
    3-2-2、构造方法

    构造方法:

    	// 调用下面的构造方法,默认使用非公平锁
    	public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
        }
    
        // 懒加载。这边就可以看出上面提出的问题了。既然 ArrayBlockingQueue 保证了先进先出那不就肯定是公平的嘛,为什么要说提供了公平和非公平呢?我们虽然保证了阻塞队列中获取线程池中线程资源的时候是公平的,但是我们没有保证入阻塞队列的时候是公平的,我们还要保证获取 ArrayBlockingQueue 的可重入锁的等待队列的公平性(想要知道这个,AQS 相关内容了解一下)
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    
        // 在懒加载的同时还把 c 中的内容放到这个阻塞队列中来
        public ArrayBlockingQueue(int capacity, boolean fair,
                                  Collection<? extends E> c) {
            this(capacity, fair);
    
            final ReentrantLock lock = this.lock;
            lock.lock(); // Lock only for visibility, not mutual exclusion
            try {
                int i = 0;
                try {
                    for (E e : c) {
                        checkNotNull(e);
                        items[i++] = e;
                    }
                } catch (ArrayIndexOutOfBoundsException ex) {
                    throw new IllegalArgumentException();
                }
                count = i;
                putIndex = (i == capacity) ? 0 : i;
            } finally {
                lock.unlock();
            }
        }
    
    3-2-3、数组辅助方法

    然后是三个数组操作的辅助方法:

    	// 返回 n-1 的值。当 n==0 时,返回阻塞队列的长度-1
        final int dec(int i) {
            return ((i == 0) ? items.length : i) - 1;
        }
    
        // 返回指定下标的元素
        @SuppressWarnings("unchecked")
        final E itemAt(int i) {
            return (E) items[i];
        }
    
        // 检查元素是否为 null
        private static void checkNotNull(Object v) {
            if (v == null)
                throw new NullPointerException();
        }
    
    3-2-4、队列方法

    接下来讲队列方法->进队、出队方法的实现。由于后面的代码会用到,但是后面代码太多,因此提前讲。

    	// 入队。使用了 condition 是为了满足之前 BlockingQueue 接口中提出的”队列已满就挂起直到有空闲空间“的功能,这里因为加了一个元素,因此将 notEmpty(即等待取出)的那些线程唤醒
        private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;
            // 增加元素后将尾指针的位置进行合理修改
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            // 唤醒那些准备出队的线程
            notEmpty.signal();
        }
    
        // 出队。使用 condition 的原因跟上面相同。和上面形成互补
        private E dequeue() {
            // assert lock.getHoldCount() == 1;
            // assert items[takeIndex] != null;
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];
            items[takeIndex] = null;
            // 元素出队后将头指针的位置进行合理修改
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            // 在迭代器中将元素出队
            if (itrs != null)
                itrs.elementDequeued();
            // 唤醒那些准备入队的线程
            notFull.signal();
            return x;
        }
    
        // 删除指定下标的元素(只有持有锁的线程才能这么做)
        void removeAt(final int removeIndex) {
            final Object[] items = this.items;
            // 如果下标值和头指针相等,直接执行出队逻辑
            if (removeIndex == takeIndex) {
                // removing front item; just advance
                items[takeIndex] = null;
                if (++takeIndex == items.length)
                    takeIndex = 0;
                count--;
                if (itrs != null)
                    itrs.elementDequeued();
            // 下标值和头指针的值不同,那么需要将后面的元素往前搬
            } else {
                final int putIndex = this.putIndex;
                for (int i = removeIndex;;) {
                    int next = i + 1;
                    if (next == items.length)
                        next = 0;
                    if (next != putIndex) {
                        items[i] = items[next];
                        i = next;
                    } else {
                        items[i] = null;
                        this.putIndex = i;
                        break;
                    }
                }
                count--;
                // 在迭代器中将元素出队
                if (itrs != null)
                    itrs.removedAt(removeIndex);
            }
            // 唤醒那些准备入队的线程
            notFull.signal();
        }
    
    3-2-5、接口方法实现

    然后是 BlockingQueueQueueCollection接口中方法的实现:

    	// 添加元素。add 方法没有加锁?为什么?因为 super.add 调用的是 offer 方法。。。
    	public boolean add(E e) {
            return super.add(e);
        }
    
        // 添加元素。加锁之后判断队列是否满了,满了话返回false,否则执行入队方法
        public boolean offer(E e) {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count == items.length)
                    return false;
                else {
                    enqueue(e);
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }
    
        // 添加元素。不同的是,这边会根据情况挂起
        public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            // 用 lock.lockInterruptibly() 而不用 lock,表明该方法允许被其他线程中断
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }
    
        // 添加元素。超时则放弃
        public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
    
            checkNotNull(e);
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length) {
                    if (nanos <= 0)
                        return false;
                    // 这个是设定等待时间,超时就会中断
                    nanos = notFull.awaitNanos(nanos);
                }
                enqueue(e);
                return true;
            } finally {
                lock.unlock();
            }
        }
    
        // 获取元素,没有元素则返回 null,有则执行出队方法
        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return (count == 0) ? null : dequeue();
            } finally {
                lock.unlock();
            }
        }
    
        // 获取元素,没有则等待。直到被入队方法唤醒
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    
        // 获取元素,没有则等待,超时则中断
        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0) {
                    if (nanos <= 0)
                        return null;
                    nanos = notEmpty.awaitNanos(nanos);
                }
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    
        // 这边也是获取元素,但是元素不出队,因此不删除,所以使用 itemAt 而不是 dequeue
        public E peek() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return itemAt(takeIndex); // null when queue is empty
            } finally {
                lock.unlock();
            }
        }
    
        // 返回大小。这边也 lock 了,严谨!
        public int size() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return count;
            } finally {
                lock.unlock();
            }
        }
    
        // 返回数组剩余空间
        public int remainingCapacity() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return items.length - count;
            } finally {
                lock.unlock();
            }
        }
    
        // 删除元素
        public boolean remove(Object o) {
            if (o == null) return false;
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count > 0) {
                    final int putIndex = this.putIndex;
                    int i = takeIndex;
                    do {
                    	// 这边使用 equals 方法进行判定,所以上面使用 Object 而不是泛型
                        if (o.equals(items[i])) {
                        	// 上面获取了锁,因此可以调用 removeAt 方法
                            removeAt(i);
                            return true;
                        }
                        if (++i == items.length)
                            i = 0;
                    } while (i != putIndex);
                }
                return false;
            } finally {
                lock.unlock();
            }
        }
    
        // 检查元素是否存在
        public boolean contains(Object o) {
            if (o == null) return false;
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count > 0) {
                    final int putIndex = this.putIndex;
                    int i = takeIndex;
                    // 看的出来,其实是遍历
                    do {
                        if (o.equals(items[i]))
                            return true;
                        if (++i == items.length)
                            i = 0;
                    } while (i != putIndex);
                }
                return false;
            } finally {
                lock.unlock();
            }
        }
    
        // 这边返回 Object数组。但不是直接返回数组,而是调用 System.arraycopy 进行复制
        public Object[] toArray() {
            Object[] a;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final int count = this.count;
                a = new Object[count];
                int n = items.length - takeIndex;
                // 因为有头尾指针的关系,因此需要判断。可以看的出来,这边复制之后的依然是有序的!
                if (count <= n)
                    System.arraycopy(items, takeIndex, a, 0, count);
                else {
                    System.arraycopy(items, takeIndex, a, 0, n);
                    System.arraycopy(items, 0, a, n, count - n);
                }
            } finally {
                lock.unlock();
            }
            return a;
        }
    
        // 这边返回泛型数组,也是使用 System.arraycopy 进行复制,不同的是这边使用了反射来进行泛型数组的初始化
        @SuppressWarnings("unchecked")
        public <T> T[] toArray(T[] a) {
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final int count = this.count;
                final int len = a.length;
                if (len < count)
                	// 不像 Object 那么方便,需要使用反射来进行数组的初始化
                    a = (T[])java.lang.reflect.Array.newInstance(
                        a.getClass().getComponentType(), count);
                int n = items.length - takeIndex;
                if (count <= n)
                    System.arraycopy(items, takeIndex, a, 0, count);
                else {
                    System.arraycopy(items, takeIndex, a, 0, n);
                    System.arraycopy(items, 0, a, n, count - n);
                }
                if (len > count)
                    a[count] = null;
            } finally {
                lock.unlock();
            }
            return a;
        }
    
        // toString 方法,不多说
        public String toString() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int k = count;
                if (k == 0)
                    return "[]";
    
                final Object[] items = this.items;
                StringBuilder sb = new StringBuilder();
                sb.append('[');
                for (int i = takeIndex; ; ) {
                    Object e = items[i];
                    sb.append(e == this ? "(this Collection)" : e);
                    if (--k == 0)
                        return sb.append(']').toString();
                    sb.append(',').append(' ');
                    if (++i == items.length)
                        i = 0;
                }
            } finally {
                lock.unlock();
            }
        }
    
        // 清空队列。因为有加锁,因此可以认为是原子操作
        public void clear() {
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int k = count;
                if (k > 0) {
                    final int putIndex = this.putIndex;
                    int i = takeIndex;
                    // 循环将数组清空。因为原数组是 final 的,因此不能直接 new 一个这么简单
                    do {
                        items[i] = null;
                        if (++i == items.length)
                            i = 0;
                    } while (i != putIndex);
                    takeIndex = putIndex;
                    count = 0;
                    // 顺便将迭代器中的也清空
                    if (itrs != null)
                        itrs.queueIsEmpty();
                    // 将阻塞队列中想要添加元素的线程全部唤醒
                    for (; k > 0 && lock.hasWaiters(notFull); k--)
                        notFull.signal();
                }
            } finally {
                lock.unlock();
            }
        }
    
        // 调用下面 drainTo 方法
        public int drainTo(Collection<? super E> c) {
            return drainTo(c, Integer.MAX_VALUE);
        }
    
        // 使用锁来保证元素转移的时候不会出问题
        public int drainTo(Collection<? super E> c, int maxElements) {
            checkNotNull(c);
            if (c == this)
                throw new IllegalArgumentException();
            if (maxElements <= 0)
                return 0;
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int n = Math.min(maxElements, count);
                int take = takeIndex;
                int i = 0;
                try {
                    while (i < n) {
                        @SuppressWarnings("unchecked")
                        E x = (E) items[take];
                        c.add(x);
                        items[take] = null;
                        if (++take == items.length)
                            take = 0;
                        i++;
                    }
                    return n;
                } finally {
                    // 如果出错,那么搬过去的内容也不转回来
                    if (i > 0) {
                        count -= i;
                        takeIndex = take;
                        if (itrs != null) {
                            if (count == 0)
                                itrs.queueIsEmpty();
                            else if (i > take)
                                itrs.takeIndexWrapped();
                        }
                        for (; i > 0 && lock.hasWaiters(notFull); i--)
                            notFull.signal();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    
    3-2-6、内部类

    接下来是两个内部类,这两个内部类提供了更多复杂的方法,但是跟我们阻塞队列的主逻辑(BlockingQueue)没有关系,因此这边搁置不谈,大家有兴趣可以自己去看,以后有机会我再补上来。

    ArrayBlockingQueue 总结

    上面我们看完了 ArrayBlockingQueue 的源码,相信大家都已经看过了。在1.8中,ArrayBlockingQueue一共也只有 1444 行源码,去除注释和无关核心的内容不谈也就寥寥几百行,所以看源码 jdk 源码真的不难,只要肯坚持。看到这边大家也忘得差不多了,这边再总结一下:

    • ArrayBlockingQueue 是一个有界队列,创建之后大小不能修改;
    • ArrayBlockingQueue 使用了头尾指针来构建队列,而不是链表的方式;
    • ArrayBlockingQueue 提供公平和非公平的机制,来让大家去向线程池中的资源,这种机制取决于队列的 FIFO 和可重入锁的公平机制的配合;
    • 内部使用了 condition 来唤醒或等待。

    4、LinkedBlockingQueue 源码解析

    4-1、LinkedBlockingQueue 概述

    LinkedBlockingQueue是一个无界的阻塞队列,内部采用Node链表来实现队列,实现了FIFO的特性。不同于ArrayBlockingQueue,LinkedBlockingQueue只提供了非公平的抢锁机制,因此入队先后是不公平的(这点在代码中可以体现)。

    4-2、LinkedBlockingQueue 源码

    4-2-1、Node 类

    使用静态内部类Node来作为存储结构:

    	// 可以看到节点只有'元素'和'next'两个成员,因此形成的链表是单向的
    	static class Node<E> {
            E item;
    
            Node<E> next;
    
            Node(E x) { item = x; }
        }
    
    4-2-2、成员变量

    接下来是一些成员变量(这里为了统一称呼我把 final 类型的也成为'变量')

    	// final 修饰的容量,因此可以看出容量是固定的。那就得看初始值是啥了
        private final int capacity;
    
        // final 和原子类型修饰的元素个数。使用原子类型来保证多线程操作的安全性。注意:使用 final 并不意味着值不能修改,只是该引用指向的对象地址不能修改而已,真正的值是 AtomicInteger 里的`private volatile int value;`,可以通过`setValue()`方法修改
        private final AtomicInteger count = new AtomicInteger();
    
        // 头结点
        transient Node<E> head;
    
        // 尾结点
        private transient Node<E> last;
    
        /**
        *	注意这里,LinkedBlockingQueue 将入队锁出队锁分离,提高了队列的操作速度,因此能够提高并发量
        */
    
        // 出队锁,使用饿汉,并用 final 修饰保证锁不被替换
        private final ReentrantLock takeLock = new ReentrantLock();
    
        // 非空条件对象,用来挂起和唤醒获取元素的线程
        private final Condition notEmpty = takeLock.newCondition();
    
        // 出队锁
        private final ReentrantLock putLock = new ReentrantLock();
    
        // 没满条件对象,用来挂起和唤醒添加元素的线程
        private final Condition notFull = putLock.newCondition();
    
    4-2-3、构造方法

    LinkedBlockingQueue的构造函数跟ArrayBlockingQueue差不多吧,只有一些细节地方需要注意

    	// 调用下面的构造方法,注意默认的队列长度设置为 Integer.MAX_VALUE,因此 LinkedBlockingQueue 默认是无界队列,如果这么多的话容易导致 OOM,因此建议自己手动设置一个值,设置成有界队列。
    	public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }
    
        // 这边判断了 capacity 参数范围并赋值,并且构建了哨兵节点,让头尾指针都指向了该哨兵节点
        public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);
        }
    
        // 这边赋值的时候顺便将 c 的内容入队了
        public LinkedBlockingQueue(Collection<? extends E> c) {
        	// 这边容量设置成了 Integer.MAX_VALUE
            this(Integer.MAX_VALUE);
            final ReentrantLock putLock = this.putLock;
            // 入队获取入队锁
            putLock.lock(); // Never contended, but necessary for visibility
            try {
                int n = 0;
                for (E e : c) {
                    if (e == null)
                        throw new NullPointerException();
                    if (n == capacity)
                        throw new IllegalStateException("Queue full");
                    // 执行入队操作
                    enqueue(new Node<E>(e));
                    // 已入队元素个数 ++ 
                    ++n;
                }
                // 将加进来的值赋值给了 count 里的 value
                // 这边这么写应该是为了提高效率,如果直接使用原子类型的 CAS 进行递增,还要直接跟内存打交道,显然降低了并发的效率
                // 但是有个问题,如果上面出错了,count 的值是不会赋值的,后面如果进行判断的时候会出问题的。我相信源码编写者肯定想到了这个问题,后面应该不会直接利用 count 进行判断吧。1.8里有这个bug,pick一下!!!
                count.set(n);
            } finally {
                putLock.unlock();
            }
        }
    
    4-2-4、condition 方法

    实现了两个方法,封装了 condition 的signal方法

    	// 唤醒等待获取元素的线程。只能被 offer、put 方法调用。需要获取出队锁
    	private void signalNotEmpty() {
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
        }
    
        // 唤醒等待添加元素的线程。只能被 take、poll 方法调用。需要获取入队锁
        private void signalNotFull() {
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
                notFull.signal();
            } finally {
                putLock.unlock();
            }
        }
    
    4-2-5、队列(链表)方法

    实现了队列的入队、出队方法。需要用到头指针和尾指针

    	// 入队。从放到末尾
        private void enqueue(Node<E> node) {
            // assert putLock.isHeldByCurrentThread();
            // assert last.next == null;
            last = last.next = node;
        }
    
        // 出队。出队的概念比较麻烦,下面代码可以通过辅助画一张图来理解
        private E dequeue() {
            // head 节点是哨兵节点,因此出队永远从 head.next 出队
            Node<E> h = head;
           	// 将要出队的元素赋值给 first
            Node<E> first = h.next;
            // 将原来的哨兵节点的 next 指向自己,让 GC回收(这里为什么不直接将其指向 next.next,然后回收出队的节点呢?)
            h.next = h; // help GC
            // 后面三行是将要出队的元素赋值给 x,并且将这个节点设置成哨兵节点
            head = first;
            E x = first.item;
            first.item = null;
            // 返回 x
            return x;
        }
    
    4-2-6、锁方法

    这边对锁方法进行了封装,提供了全锁和全解锁两个方法

    	// 同时获取出队锁和入队锁,防止读写操作的进行(我觉得可能是用于 drainTo 方法里吧)
        void fullyLock() {
            putLock.lock();
            takeLock.lock();
        }
    
        // 同时释放出对锁、入队锁
        void fullyUnlock() {
            takeLock.unlock();
            putLock.unlock();
        }
    
    4-2-7、接口方法实现

    然后是 BlockingQueueQueueCollection接口中方法的实现:

    	// 返回当前元素个数(由于之前构造函数那边的bug,这边是不是其实还要进行处理。。。至少确认一遍吧)
        public int size() {
            return count.get();
        }
    
        // 返回剩余空闲容量
        public int remainingCapacity() {
            return capacity - count.get();
        }
    
        // 入队操作,如果满了,那就挂起
        public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();
            try {
               	// 队列满了就挂起
                while (count.get() == capacity) {
                    notFull.await();
                }
                // 执行入队操作
                enqueue(node);
                // 使用原子操作保证线程安全,因为这边只获取了入队锁,没有获出队锁。此时 c 是 count ++ 之前的值(为什么?看一下原子操作)。
                c = count.getAndIncrement();
                // 如果队列没满就唤醒那些等着入队的操作
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            // 这边表明了已经有且一个元素入队了,唤醒等待获取的线程。
            // 但是,这边是不是也有问题?这边没有获得出队锁,万一其他线程已经读取了,实际上队列是空的???其实么有!哈哈,我们看其他出队被挂起的地方都是在 take() 方法中,而被唤醒之后,那个线程是在一个 while 中的!!!此时他又要去判断此时元素个数,是原子操作因此显然没有并发问题,所以他又会 wait!!!呵,骗我到这里来,100块也不给我???
            if (c == 0)
                signalNotEmpty();
        }
    
        // 入队。如果满了就等待一段时间,时间超过就放弃操作
        public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
    
            if (e == null) throw new NullPointerException();
            long nanos = unit.toNanos(timeout);
            int c = -1;
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();
            try {
                while (count.get() == capacity) {
                    if (nanos <= 0)
                        return false;
                    nanos = notFull.awaitNanos(nanos);
                }
                enqueue(new Node<E>(e));
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
            return true;
        }
    
        // 入队,满了就返回 false,不挂起,就是刚。
        public boolean offer(E e) {
            if (e == null) throw new NullPointerException();
            final AtomicInteger count = this.count;
            if (count.get() == capacity)
                return false;
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            // 没满,获的入队锁
            putLock.lock();
            try {
                if (count.get() < capacity) {
                    enqueue(node);
                    c = count.getAndIncrement();
                    if (c + 1 < capacity)
                        notFull.signal();
                }
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
            return c >= 0;
        }
    
        // 出队,队列为空就挂起
        public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
                while (count.get() == 0) {
                    notEmpty.await();
                }
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }
    
        // 出队,队列为空就等一段时间,超时就放弃
        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            E x = null;
            int c = -1;
            long nanos = unit.toNanos(timeout);
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
                while (count.get() == 0) {
                    if (nanos <= 0)
                        return null;
                    nanos = notEmpty.awaitNanos(nanos);
                }
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }
    
        // 出队,队列为空就返回false
        public E poll() {
            final AtomicInteger count = this.count;
            if (count.get() == 0)
                return null;
            E x = null;
            int c = -1;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                if (count.get() > 0) {
                    x = dequeue();
                    c = count.getAndDecrement();
                    if (c > 1)
                        notEmpty.signal();
                }
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }
    
        // 这边只是返回第一个节点的元素,不进行队列的修改
        public E peek() {
            if (count.get() == 0)
                return null;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                Node<E> first = head.next;
                if (first == null)
                    return null;
                else
                    return first.item;
            } finally {
                takeLock.unlock();
            }
        }
    
        // 把节点 p 的链接清除,其实就是删掉 p 节点
        void unlink(Node<E> p, Node<E> trail) {
            // assert isFullyLocked();
            // p.next is not changed, to allow iterators that are
            // traversing p to maintain their weak-consistency guarantee.
            p.item = null;
            trail.next = p.next;
            if (last == p)
                last = trail;
            if (count.getAndDecrement() == capacity)
                notFull.signal();
        }
    
        // 删除元素 o。
        public boolean remove(Object o) {
            if (o == null) return false;
            fullyLock();
            try {
            	// 利用 for 循环找到元素 o 的节点
                for (Node<E> trail = head, p = trail.next;
                     p != null;
                     trail = p, p = p.next) {
                    if (o.equals(p.item)) {
                    	// 找到节点后调用 unlink 方法删除节点 p 和他的链接
                        unlink(p, trail);
                        return true;
                    }
                }
                return false;
            } finally {
                fullyUnlock();
            }
        }
    
        // 检查是否存在元素 o。
        public boolean contains(Object o) {
            if (o == null) return false;
            fullyLock();
            try {
            	// 遍历 + 比较
                for (Node<E> p = head.next; p != null; p = p.next)
                    if (o.equals(p.item))
                        return true;
                return false;
            } finally {
                fullyUnlock();
            }
        }
    
        // 将元素迁移到 c 中。调用了下面的方法
        public int drainTo(Collection<? super E> c) {
            return drainTo(c, Integer.MAX_VALUE);
        }
    
        // 将元素迁移到 c 中,最多迁移 maxElements 个
        public int drainTo(Collection<? super E> c, int ) {
            if (c == null)
                throw new NullPointerException();
            if (c == this)
                throw new IllegalArgumentException();
            if (maxElements <= 0)
                return 0;
            boolean signalNotFull = false;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                int n = Math.min(maxElements, count.get());
                // count.get provides visibility to first n Nodes
                Node<E> h = head;
                int i = 0;
                try {
                    while (i < n) {
                        Node<E> p = h.next;
                        c.add(p.item);
                        // 迁移之后需要删掉队列中的节点
                        p.item = null;
                        h.next = h;
                        h = p;
                        ++i;
                    }
                    return n;
                } finally {
                    // Restore invariants even if c.add() threw
                    if (i > 0) {
                        // assert h.item == null;
                        head = h;
                        signalNotFull = (count.getAndAdd(-i) == capacity);
                    }
                }
            } finally {
                takeLock.unlock();
                if (signalNotFull)
                    signalNotFull();
            }
        }
    
    4-2-8、其他内部类和方法

    后面是 toStringtoArray方法、序列化反序列化方法以及迭代器内部类和拆分器内部类。由于这些内容和阻塞队列核心不怎么相关,就搁置不讲了,以后有机会再补。

    4-3、LinkedBlockingQueue 总结

    看完了源码,源码有点长,过了一遍也容易忘,这边再来总结一下:

    • LinkedBlockingQueue是一个默认无界(推荐调用有界的构造方法进行构造,避免出现 OOM 的情况)的阻塞队列,一旦创建大小不能修改;
    • LinkedBlockingQueue内部使用Node存储元素实现队列,单链表,有头尾指针(头指针专注出队、尾指针专注出队);
    • LinkedBlockingQueue将入队锁和出队锁分离,极大地提高了并发的效率。但是由于这两个锁都是 final 修饰的并且使用饿汉加载,因此默认是使用ReentrantLock的非公平策略,因此虽然保证了阻塞队列的先进先出,但是入队的过程是非公平的;
    • 内部使用 Condition 机制是进程进行休眠和唤醒;
    • 构造函数中有个小bug(面试的时候可以提出来,说不定可以加分。而且在修改节点以后判断长度唤醒线程那里也可以进行优化,避免唤醒线程又让其进行睡眠浪费资源)

    5、PriorityBlockingQueue 源码解析

    5-1、PriorityBlockingQueue 概述

    PriorityBlockingQueue 是一个基于数组+堆的默认无界阻塞队列,并且内部通过的结构实现了优先队列的特性。具体如何实现,还看源码。

    5-2、PriorityBlockingQueue 源码

    5-2-1、成员属性

    首先是PriorityBlockingQueue的成员属性:

    	// 默认初始化容量,指的是入队元素的个数
    	private static final int DEFAULT_INITIAL_CAPACITY = 11;
    
        //最大数组长度指的是最大的元素容量,为了避免虚拟机可能会保留一些空间,所以用了 Integer.MAX_VALUE - 8
        private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    
        // 存储数组,虽然说是堆但是实际上是用数组存储的,这跟堆的性质有关
        private transient Object[] queue;
    
        // 元素个数
        private transient int size;
    
        // 比较器
        private transient Comparator<? super E> comparator;
    
        // 可重入锁用于线程安全,final 修饰因此不可改变。在构造方法中没有给我们选公平锁或者非公平锁,因此默认是非公平锁。因此可得入队是不公平的
        private final ReentrantLock lock;
    
        // condition 用于睡眠或唤醒线程。这里只有一个非空的条件,因此可以猜测出该队列是可以无限扩容的(内存允许)
        private final Condition notEmpty;
    
        // 自旋锁的状态,使用CAS去获取值。用于扩容
        private transient volatile int allocationSpinLock;
    
        // 一个优先队列,只用于序列化和反序列化
        private PriorityQueue<E> q;
    
    5-2-2、构造方法

    然后是4个构造方法:

    	// 构造默认初始大小的阻塞队列
    	public PriorityBlockingQueue() {
            this(DEFAULT_INITIAL_CAPACITY, null);
        }
    
        // 构造指定初始大小的阻塞队列
        public PriorityBlockingQueue(int initialCapacity) {
            this(initialCapacity, null);
        }
    
        // 构造指定初始大小的阻塞队列,并传入比较器,初始化重入锁、condition等
        public PriorityBlockingQueue(int initialCapacity,
                                     Comparator<? super E> comparator) {
            if (initialCapacity < 1)
                throw new IllegalArgumentException();
            this.lock = new ReentrantLock();
            this.notEmpty = lock.newCondition();
            this.comparator = comparator;
            this.queue = new Object[initialCapacity];
        }
    
        // 创建一个阻塞队列,并将所给的集合中的元素放到队列中,并把大小设为所给集合的大小。
        // 在此过程中没有锁的获得,因此可能会因为并发出错
        public PriorityBlockingQueue(Collection<? extends E> c) {
            this.lock = new ReentrantLock();
            this.notEmpty = lock.newCondition();
           	// 是否要将整个堆重新排
            boolean heapify = true; // true if not known to be in heap order
            // 是否要检查元素是不是为 null
            boolean screen = true;  // true if must screen for nulls
            // 如果本身就是一个有序的set
            if (c instanceof SortedSet<?>) {
                SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
                // 把 c 的比较器传给该阻塞队列
                this.comparator = (Comparator<? super E>) ss.comparator();
                // 由于 set 有序,因此不需要整个堆再排序
                heapify = false;
            }
            // 如果本身就是一个优先阻塞队列
            else if (c instanceof PriorityBlockingQueue<?>) {
                PriorityBlockingQueue<? extends E> pq =
                    (PriorityBlockingQueue<? extends E>) c;
                // 把 c 的比较器传给该阻塞队列
                this.comparator = (Comparator<? super E>) pq.comparator();
                // 不需要检查元素是否为 null
                screen = false;
                // 如果 class 完全相等,那就不需要整个堆再排。(为什么子类还需要再排呢?为了防止子类中重写的排序方法与正常的排序矛盾?)
                if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                    heapify = false;
            }
            Object[] a = c.toArray();
            // 将 c 中元素个数值赋给 n,后面再赋给 size
            int n = a.length;
            // 如果 c.toArray() 返回的格式有问题,那就使用 Arrays.copy 进行拷贝
            if (a.getClass() != Object[].class)
                a = Arrays.copyOf(a, n, Object[].class);
            // 验证元素是否为 null,'与'后面的判断条件的条件很有意思,可以思考一下为什么
            if (screen && (n == 1 || this.comparator != null)) {
                for (int i = 0; i < n; ++i)
                    if (a[i] == null)
                        throw new NullPointerException();
            }
            // 赋值
            this.queue = a;
            this.size = n;
            // 堆重排
            if (heapify)
                heapify();
        }
    
    5-2-3、扩容机制

    下面是PriorityBlockingQueue的扩容机制:

        private void tryGrow(Object[] array, int oldCap) {
        	// 先释放锁,在分配好空间后再获得锁进行复制
            lock.unlock(); // must release and then re-acquire main lock
            Object[] newArray = null;
            // 此时扩容自旋锁为 0,表明当前没有其他线程在进行扩容,那么继续执行扩容机制
            if (allocationSpinLock == 0 &&
            // 这个 allocationSpinLockOffset 是类静态变量,后面才加入的。由于使用的是 sun 公司的 unsafe,因此源码没法看,大概就是查看这个字段是不是期望的 0 然后改成1(返回true),否则返回 false
                UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,  
                                         0, 1)) {
                try {
                	// 如果原容量 < 64,那么新容量变成 2 * old + 2,否则变成 1.5 * old。可以看得出来,当原容量越小,增长越快
                    int newCap = oldCap + ((oldCap < 64) ?
                                           (oldCap + 2) : // grow faster if small
                                           (oldCap >> 1));
                    // 如果新的容量大于最大容量
                    if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    	// 如果扩容时增加一个都导致容量大于最大容量,那就抛错,没法扩了
                        int minCap = oldCap + 1;
                        if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                            throw new OutOfMemoryError();
                        // 能扩,那就直接扩到最大
                        newCap = MAX_ARRAY_SIZE;
                    }
                    // 如果扩容了,并且数组没改变(这里是什么意思呢?首先,这个扩容机制是在 offer 中用的,传的 array 就是原 queue 的地址。这里没改变说明没有其他线程进行扩容,只有扩容了才会把地址进行修改),那就创建新容量大小的数组
                    if (newCap > oldCap && queue == array)
                        newArray = new Object[newCap];
                } finally {
                	// 将扩容锁变成释放状态(虽然这边是 volatile,但是还是会有并发安全的问题啊。。。作者咋想的)
                    allocationSpinLock = 0;
                }
            }
            // 说明上面的循环没进,说明有线程在扩容,那么放弃 CPU
            if (newArray == null) // back off if another thread is allocating
                Thread.yield();
            // 获得锁并复制旧元素
            lock.lock();
            if (newArray != null && queue == array) {
                queue = newArray;
                System.arraycopy(array, 0, newArray, 0, oldCap);
            }
        }
    
    5-2-4、队列方法

    这边虽然说是队列操作,但其实由于内部变成了的关系,已经不是传统意义上的入队出队的,所以虽然有dequeue方法,但是enqueue变成了堆的'上浮'和'下沉'。

    	/**
    	*    下面的方法都是基于已经获取锁的情况下才能调用的。上浮操作用于增加元素后堆重排,下沉操作用于元素出队以后的堆重排
    	*/
    	// 出队操作
    	private E dequeue() {
    		// 获得最后一个元素的下标
            int n = size - 1;
            // 如果没有元素,就返回 null
            if (n < 0)
                return null;
            else {
                Object[] array = queue;
                // 获取堆顶元素
                E result = (E) array[0];
                // 获取堆底的元素,用于堆的重排
                E x = (E) array[n];
                array[n] = null;
                Comparator<? super E> cmp = comparator;
                if (cmp == null)
                	// 从顶部开始向下比较。如果比较器为空,那么使用默认的方法(obj.compareTo)进行比较
                    siftDownComparable(0, x, array, n);
                else
                	// 从顶部开始向下比较。如果比较器不为空,那么使用比较器的比较方法(cmp.compareTo)进行比较
                    siftDownUsingComparator(0, x, array, n, cmp);
                size = n;
                return result;
            }
        }
    
        // 使用默认的比较器,从堆底部开始向上进行比较。可以看的出来,当子节点 > 父节点时会停止(这里的 > 看比较器是怎么比的),说明堆默认是小根堆???
        private static <T> void siftUpComparable(int k, T x, Object[] array) {
            Comparable<? super T> key = (Comparable<? super T>) x;
            // 这段代码完全可以直接调用下面的方法啊。。。
            while (k > 0) {
                int parent = (k - 1) >>> 1;
                Object e = array[parent];
                // 当元素比父元素大,停止'上浮'
                if (key.compareTo((T) e) >= 0)
                    break;
                array[k] = e;
                k = parent;
            }
            array[k] = key;
        }
    
    	// 使用给定的比较器,从堆底部开始向上进行比较。
        private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                           Comparator<? super T> cmp) {
            while (k > 0) {
                int parent = (k - 1) >>> 1;
                Object e = array[parent];
                // 当元素比父元素大,停止'上浮'
                if (cmp.compare(x, (T) e) >= 0)
                    break;
                array[k] = e;
                k = parent;
            }
            array[k] = x;
        }
    
        // 使用默认的比较器,从下表为 k 的节点作为父节点开始向下比较
        private static <T> void siftDownComparable(int k, T x, Object[] array,
                                                   int n) {
            if (n > 0) {
                Comparable<? super T> key = (Comparable<? super T>)x;
                int half = n >>> 1;           // loop while a non-leaf
                while (k < half) {
                    int child = (k << 1) + 1; // assume left child is least
                    Object c = array[child];
                    int right = child + 1;
                    // 找到下标为 k 的节点的左右子节点,并将其中的最小值与堆底的节点进行比较
                    if (right < n &&
                        ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                        c = array[child = right];
                    // 如果 x 的值小于 k 的最小子节点,那么就跳出循环,把 x 赋值给下标 k 的位置(一般在整个堆有序的情况下,只有在进行叶子节点的父节点的位置才会发生。因为堆是有序的,默认 x 是大于上面的节点的)。否则将子节点中的更小值'上浮',并从子节点开始继续向下比较。(常威,还说你不是小根堆???!!!)
                    if (key.compareTo((T) c) <= 0)
                        break;
                    array[k] = c;
                    k = child;
                }
                array[k] = key;
            }
        }
    
        // 使用默认的比较器,从下表为 k 的节点作为父节点开始向下比较。逻辑跟上面的差不多,只是将比较方法使用比较器的 compare 方法替换
        private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
                                                        int n,
                                                        Comparator<? super T> cmp) {
            if (n > 0) {
                int half = n >>> 1;
                while (k < half) {
                    int child = (k << 1) + 1;
                    Object c = array[child];
                    int right = child + 1;
                    if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                        c = array[child = right];
                    if (cmp.compare(x, (T) c) <= 0)
                        break;
                    array[k] = c;
                    k = child;
                }
                array[k] = x;
            }
        }
    
        // 整个堆都进行'下沉'操作,使整个堆变得有序
        private void heapify() {
            Object[] array = queue;
            int n = size;
            int half = (n >>> 1) - 1;
            Comparator<? super E> cmp = comparator;
            if (cmp == null) {
                for (int i = half; i >= 0; i--)
                    siftDownComparable(i, (E) array[i], array, n);
            }
            else {
                for (int i = half; i >= 0; i--)
                    siftDownUsingComparator(i, (E) array[i], array, n, cmp);
            }
        }
    
    5-2-5、接口方法实现

    下面是接口方法的实现:

    	// 增加元素,调用 offer 方法
    	public boolean add(E e) {
            return offer(e);
        }
    
        // 增加元素的底层方法,其他的方法都是调用它
        public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            final ReentrantLock lock = this.lock;
            // 获取锁
            lock.lock();
            int n, cap;
            Object[] array;
            // 如果元素个数大小 >= 数组大小(???这个不应该是严格相等的嘛???惊了!!!),执行扩容操作
            while ((n = size) >= (cap = (array = queue).length))
                tryGrow(array, cap);
            try {
                Comparator<? super E> cmp = comparator;
                if (cmp == null)
                	// 使用默认的比较器进行'上浮'
                    siftUpComparable(n, e, array);
                else
                	// 使用队列中已给的比较器进行'上浮'
                    siftUpUsingComparator(n, e, array, cmp);
                // 元素个数 +1
                size = n + 1;
                // 唤醒那些因为队列空了而挂起的线程
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
            return true;
        }
    
        // 增加元素,调用 offer 方法
        public void put(E e) {
            offer(e); // never need to block
        }
    
        // 增加元素,调用 offer 方法。可以看到这个给的时间限制根本没用!!!因为这个阻塞队列是真正无界的!永远不会满!因此增加元素时不会因为满了而挂起,只要锁被释放了就会去增加
        public boolean offer(E e, long timeout, TimeUnit unit) {
            return offer(e); // never need to block
        }
    
        // 出队元素。执行出队策略。如果队列空了显然就返回 null 了也不会报错。。。
        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    
        // 出队元素。执行出队策略。如果队列空了就挂起,除非被打断
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            E result;
            try {
                while ( (result = dequeue()) == null)
                    notEmpty.await();
            } finally {
                lock.unlock();
            }
            return result;
        }
    
    	// 出队元素。执行出队策略。如果队列空了就挂起,直到超时或被打断
        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            E result;
            try {
                while ( (result = dequeue()) == null && nanos > 0)
                    nanos = notEmpty.awaitNanos(nanos);
            } finally {
                lock.unlock();
            }
            return result;
        }
    
        // 直接获取堆顶元素而不修改数组
        public E peek() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return (size == 0) ? null : (E) queue[0];
            } finally {
                lock.unlock();
            }
        }
    
        // 返回比较器,如果没有那就是初始值 null
        public Comparator<? super E> comparator() {
            return comparator;
        }
    
        // 返回队列元素个数
        public int size() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return size;
            } finally {
                lock.unlock();
            }
        }
    
        // 返回剩余容量。。。要知道虽然返回了 Integer.MAX_VALUE 但其实不止,只要内存够,无限的好哇!
        public int remainingCapacity() {
            return Integer.MAX_VALUE;
        }
    
        // 获取元素 o 的下标,使用得是 equals 方法而不是 ==,因此是指同一个元素对象。(没有锁,会出问题的吧!)
        private int indexOf(Object o) {
            if (o != null) {
                Object[] array = queue;
                int n = size;
                for (int i = 0; i < n; i++)
                    if (o.equals(array[i]))
                        return i;
            }
            return -1;
        }
    
        // 删除指定下标的元素
        private void removeAt(int i) {
            Object[] array = queue;
            int n = size - 1;
            // 如果是最后一个元素,那就直接删了即可
            if (n == i) // removed last element
                array[i] = null;
            // 否则要进行堆重排操作
            else {
            	// 拿出最后一个元素来进行重排
                E moved = (E) array[n];
                array[n] = null;
                Comparator<? super E> cmp = comparator;
                // 从位置 i 下沉进行重排(为什么???此时不应该是有序的嘛?)
                if (cmp == null)
                    siftDownComparable(i, moved, array, n);
                else
                    siftDownUsingComparator(i, moved, array, n, cmp);
                // 重排后如果原来删除位置的元素 == 堆底元素,从 i 上浮(???更看不懂了?为什么?)
                if (array[i] == moved) {
                    if (cmp == null)
                        siftUpComparable(i, moved, array);
                    else
                        siftUpUsingComparator(i, moved, array, cmp);
                }
            }
            size = n;
        }
    
        // 删除元素 o,注意这边是 indexOf() 方法,用的 equals 方法,跟下面有点不同
        public boolean remove(Object o) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = indexOf(o);
                if (i == -1)
                    return false;
                removeAt(i);
                return true;
            } finally {
                lock.unlock();
            }
        }
    
        // 删除元素 o,比较的是地址
        void removeEQ(Object o) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                Object[] array = queue;
                for (int i = 0, n = size; i < n; i++) {
                    if (o == array[i]) {
                        removeAt(i);
                        break;
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    
        // 查找是否有元素 o
        public boolean contains(Object o) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return indexOf(o) != -1;
            } finally {
                lock.unlock();
            }
        }
    
        // 复制了一个数组
        public Object[] toArray() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return Arrays.copyOf(queue, size);
            } finally {
                lock.unlock();
            }
        }
    
        // toString 方法
        public String toString() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int n = size;
                if (n == 0)
                    return "[]";
                StringBuilder sb = new StringBuilder();
                sb.append('[');
                for (int i = 0; i < n; ++i) {
                    Object e = queue[i];
                    sb.append(e == this ? "(this Collection)" : e);
                    if (i != n - 1)
                        sb.append(',').append(' ');
                }
                return sb.append(']').toString();
            } finally {
                lock.unlock();
            }
        }
    
        // 将元素搬到 c 中,调用了下面的方法
        public int drainTo(Collection<? super E> c) {
            return drainTo(c, Integer.MAX_VALUE);
        }
    
        // 将元素搬到 c 中,执行将重复出队策略。。。(我靠,那每次出队还得下沉。。。为啥不直接把数组给复制过去算了???反正有序啊。。。看不懂)
        public int drainTo(Collection<? super E> c, int maxElements) {
            if (c == null)
                throw new NullPointerException();
            if (c == this)
                throw new IllegalArgumentException();
            if (maxElements <= 0)
                return 0;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int n = Math.min(size, maxElements);
                for (int i = 0; i < n; i++) {
                    c.add((E) queue[0]); // In this order, in case add() throws.
                    dequeue();
                }
                return n;
            } finally {
                lock.unlock();
            }
        }
    
        // 清楚所有元素,可以看到数组长度没有重置,还是原来的大小
        public void clear() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                Object[] array = queue;
                int n = size;
                size = 0;
                for (int i = 0; i < n; i++)
                    array[i] = null;
            } finally {
                lock.unlock();
            }
        }
    
        // 把元素放到 a 中。如果 a 不够大,那么直接返回阻塞队列存储数组的 copy
        public <T> T[] toArray(T[] a) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int n = size;
                if (a.length < n)
                    // Make a new array of a's runtime type, but my contents:
                    return (T[]) Arrays.copyOf(queue, size, a.getClass());
                System.arraycopy(queue, 0, a, 0, n);
                if (a.length > n)
                    a[n] = null;
                return a;
            } finally {
                lock.unlock();
            }
        }
    
    5-2-6、其他内部类和方法

    后面是序列化反序列化方法以及迭代器内部类和拆分器内部类。由于这些内容和阻塞队列核心不怎么相关,就搁置不讲了,以后有机会再补。

    5-2-7、unsafe 机制扩充
    	// 补充了静态代码块来帮助扩容的字段
    	private static final sun.misc.Unsafe UNSAFE;
        private static final long allocationSpinLockOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = PriorityBlockingQueue.class;
                // 通过反射将 allocationSpinLock 字段赋值给 allocationSpinLockOffset(???)
                allocationSpinLockOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("allocationSpinLock"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    

    5-1、PriorityBlockingQueue 概述

    看到这个阻塞队列,说实话三观有点崩坏。有时候程序员出问题,并不一定是程序员水平不到位,也有可能是源码出问题了喂!也有可能是网上的博客说的根本就是照搬人家的东西啊!也有可能官方说明就不对啊!!!下面来对这个阻塞队列进行总结:

    • PriorityBlockingQueue是一个无界(真正无界!)的阻塞队列,自带扩容机制,因此容易 OOM ?;
    • PriorityBlockingQueue内部使用数组存储元素实现小根堆(当然了,我们自己使用比较器,强行把比较结果调换也能实现大根堆),因此优先值越小的代表优先级越高,先执行。。。跟我们普通人理解的不一样啊喂orz;
    • PriorityBlockingQueue是使用ReentrantLock的非公平策略,但是由于堆的构造,所以入队公不公平他不care;
    • 内部使用 Condition 机制是进程进行休眠和唤醒;
    • 整个内部对于高并发的处理不够严谨,包括构造函数中没有用锁,可能会出现线程A构造玩数组还在将集合 c 中的元素搬过来的时候,线程B已经开始使用了,这样元素的个数也不对了;

    声明

    由于篇幅太长,准备将这篇分为上中下三部分,敬请期待。。。

  • 相关阅读:
    Oracle EBS 自治事务
    [转]Oracle GoldenGate安装配置
    [Oracle EBS R12]SQL Queries and Multi-Org Architecture in Release 12 (Doc ID 462383.1)
    LUN 和 LVM 知识
    [RAC] oracle rac 后台进程
    Manually connecting to the Oracle Linux Yum Server
    [转]ORACLE 绑定变量用法总结
    [转]分析函数计算移动平均的测试
    并发与多版本:update重启动问题
    WCF(二)三种通信模式
  • 原文地址:https://www.cnblogs.com/lewisyoung/p/12981102.html
Copyright © 2011-2022 走看看