zoukankan      html  css  js  c++  java
  • java源码阅读LinkedBlockingQueue

    1类签名与简介

    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable

    LinkedBlockingQueue是Java并发包的成员,该类基于链表实现了阻塞队列。

    基于链表的队列通常比基于数组的队列有更高的吞吐量,但是在大多数并发程序中可预测性能较低。(本质是理解链表和数组各自的性能优势)

    LinkedBlockingQueue的容量在未指定的情况下是Integer.MAX_VALUE,也可以自己指定容量,且在指定后不可更改,这样做是防止队列过多的容量扩展。

    其构造方法有三个

    //创建一个 LinkedBlockingQueue ,容量为 Integer.MAX_VALUE 。  
    LinkedBlockingQueue() 
    
    //创建一个 LinkedBlockingQueue ,容量为 Integer.MAX_VALUE ,
    //最初包含给定集合的元素,以集合的迭代器的遍历顺序添加。 
    LinkedBlockingQueue(Collection<? extends E> c) 
    
    //创建一个具有给定(固定)容量的 LinkedBlockingQueue 。  
    LinkedBlockingQueue(int capacity) 

    2数据结构

    static class Node<E> {
            E item;
    
            /**
             * One of:
             * - the real successor Node
             * - this Node, meaning the successor is head.next
             * - null, meaning there is no successor (this is the last node)
             */
            Node<E> next;
    
            Node(E x) { item = x; }
        }
    
        /** The capacity bound, or Integer.MAX_VALUE if none */
        private final int capacity;
    
        /** Current number of elements */
        private final AtomicInteger count = new AtomicInteger();
    
        /**
         * Head of linked list.
         * Invariant: head.item == null
         */
        transient Node<E> head;
    
        /**
         * Tail of linked list.
         * Invariant: last.next == null
         */
        private transient Node<E> last;
    
        /** Lock held by take, poll, etc */
        private final ReentrantLock takeLock = new ReentrantLock();
    
        /** Wait queue for waiting takes */
        private final Condition notEmpty = takeLock.newCondition();
    
        /** Lock held by put, offer, etc */
        private final ReentrantLock putLock = new ReentrantLock();
    
        /** Wait queue for waiting puts */
        private final Condition notFull = putLock.newCondition();

    这里把链表的每个节点抽象成了Node内部类,其他的都很好理解。

    注意:head节点的item是null,head节点的next才是队列的队头,而last节点就是指向队尾。

    但是如果从上一篇ArrayBlockingQueue过来的读者(包括我自己)也许会有几个疑问:

    (1)LinkedBlockingQueue的元素计数器count会什么要声明成并发支持的AtomicInteger?而ArrayBlockingQueue的count是int的。

    (2)LinkedBlockingQueue为什么需要两个锁(takeLock与putLock)?而ArrayBlockingQueue只要1个就ok了。

    下面我们希望能从源码中找到这些答案。

    3入队/出队

    (1)入队 enqueue

    private void enqueue(Node<E> node) {
            last = last.next = node;
        }

    将队尾指向新加入的节点。上面可以拆分为last.next = nodelast = last.next

    (2)出队 dequeue

     1 private E dequeue() {
     2         // assert takeLock.isHeldByCurrentThread();
     3         // assert head.item == null;
     4         Node<E> h = head;
     5         Node<E> first = h.next;
     6         h.next = h; // help GC
     7         head = first;
     8         E x = first.item;
     9         first.item = null;
    10         return x;
    11     }

    这里先是删除head指向的节点(item为null),然后将队头的节点变成head节点(将item置null)。dequeue操作的前提是队列不为null。

    4常用方法

    (1)添加元素(put、offer)

    put方法

     1 public void put(E e) throws InterruptedException {
     2         if (e == null) throw new NullPointerException();
     3         
     4         int c = -1;
     5         Node<E> node = new Node<E>(e);
     6         final ReentrantLock putLock = this.putLock;
     7         final AtomicInteger count = this.count;
     8         putLock.lockInterruptibly();
     9         try {
    10             
    11             while (count.get() == capacity) {
    12                 notFull.await();
    13             }
    14             enqueue(node);
    15             c = count.getAndIncrement();
    16             if (c + 1 < capacity)
    17                 notFull.signal();
    18         } finally {
    19             putLock.unlock();
    20         }
    21         if (c == 0)
    22             signalNotEmpty();
    23     }

    put是一个阻塞的入队实现,在队尾插入新元素。这里加了一个putLock锁,所以对于多线程而言put之间是互斥的,也就是说同时只有一个线程执行line8-20。

    在添加前判断队列是否满了,若是进入阻塞状态。没满则入队,注意line15 count是先赋值给c然后再增1的,所以line16才会判断c+1,若队列没有满则唤醒1个被notFull.await()阻塞的线程。line21若c==0则表示之前入队的是该队列的仅有的一个元素,在入队一个元素的时候,会调用signalNotEmpty,唤醒一个阻塞的出队线程。(因为刚刚加入元素了,队列不为null了,这是一个临界状态队列由null到非空)

    put方法是没有返回值的,offer提供了一个有返回值的实现,插入成功返回true,失败返回false。

    public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
    
            if (e == null) throw new NullPointerException();
            long nanos = unit.toNanos(timeout);
            int c = -1;
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();
            try {
                while (count.get() == capacity) {
                    if (nanos <= 0)
                        return false;
                    nanos = notFull.awaitNanos(nanos);
                }
                enqueue(new Node<E>(e));
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
            return true;
        }

    除了有返回值,阻塞时间限制,这里的offer与前面的put没什么两样。下面还有一个offer的重载版本

    public boolean offer(E e) {
            if (e == null) throw new NullPointerException();
            final AtomicInteger count = this.count;
            if (count.get() == capacity)
                return false;
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
                if (count.get() < capacity) {
                    enqueue(node);
                    c = count.getAndIncrement();
                    if (c + 1 < capacity)
                        notFull.signal();
                }
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
            return c >= 0;
        }

    这里的offer不会阻塞,当队列为满时,会直接返回false。

    (2)删除元素(take、poll)

    take方法

    public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
                while (count.get() == 0) {
                    notEmpty.await();
                }
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }

    take方法提供了一个阻塞删除(出队)操作。当队列为空时,会阻塞。若不为空则执行出队操作。每次出队后都会判断此时的队列是否为空,若不为空就会唤醒另一个阻塞的出队线程,另一个又会唤醒下一个直到队列为空。同理,c == capacity是一个临界状态,队列刚从满变成不满,然后去唤醒1个阻塞的入队线程。

    看到这里可能就明白了为什么只在临界状态的时候唤醒1个就行了。例如,从满队列出队1个元素,队列就不满了,这时唤醒1个阻塞的入队线程,他执行完入队操作后若不满则会继续唤醒其他阻塞的入队线程,直到下一个临界值。

    注意这里的锁是takeLock,入队的锁(putLock)和出队的锁(takeLock)不一样。也就是说入队操做之间是互斥的,出队操作之间也是互斥的,但是入队和出队之间是可以同步的。一个是操作表头,一个是操作表尾,同步似乎是可以理解的。

    这里我们可以解决上面预留的第一个问题:count为什么是AtomicInteger的?

    因为入队和出队可以同步执行,都可以修改count值,要保证count的自增和自减都是原子的,所以使用AtomicInteger类型。

    LinkedBlockingQueue用了两个锁似乎是符合逻辑的,那么ArrayBlockingQueue为什么不这么做?

    因为ArrayBlockingQueue是基于数组的循环队列。链表队列删除头部和添加尾部都只和1个节点相关,前面也分析了为了防止添加和移除互相影响,LinkedBlockingQueue维护了一个原子计数器count。而数组位置会循环,最后一个位置的下一个位置是第一个位置,这样的操作无法直接原子话,需要加锁。所以就没有必要了。(也不知道对不对- -!)

    同理,删除操作除了take还有poll,poll方法提供了重载版本,一个是阻塞的poll如下

    public E poll(long timeout, TimeUnit unit) throws InterruptedException

    其原理与take类似,另一个是非阻塞的poll,如下

    public E poll() {
            final AtomicInteger count = this.count;
            if (count.get() == 0)
                return null;
            E x = null;
            int c = -1;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                if (count.get() > 0) {
                    x = dequeue();
                    c = count.getAndDecrement();
                    if (c > 1)
                        notEmpty.signal();
                }
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }

    该poll不会阻塞,若队列为空,直接返回null,否则出队。

    (3)其他

    没有其他了!博主很懒,其他的大家自行阅读源码~~

  • 相关阅读:
    linux 部署项目命令
    List remove方法小坑
    centos7 安装mongoDB
    windows git 清除已保存的密码
    windows平台安装配置Gitblit
    oracle
    Mac 配置多jdk 随意切换
    idea远程调试jar包
    centos7 安装elasticsearch
    正则校验
  • 原文地址:https://www.cnblogs.com/ouym/p/9037036.html
Copyright © 2011-2022 走看看