zoukankan      html  css  js  c++  java
  • 阻塞队列LinkedBlockingQueue源码深入剖析

     

    1 前言

    与普通队列相比,阻塞队列另外支持两个附加操作,这两个附加的操作支持阻塞的插入和移除方法。

    • ①支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满。

    • ②支持阻塞的移除方法:在队列为空时,获取元素的线程会等待队列变为非空。

    LinkedBlockingQueue继承于抽象类AbstractQueue,并实现了BlockingQueue接口。它内部主要使用两个”条件“来实现”阻塞插入“、”阻塞移出“,这两个条件分别是”未满“、”非空“。LinkedBlockingQueue是一个有界的阻塞队列,如果构造方法没有指定容量,其默认容量是无限大。LinkedBlockingQueue是一个基于单向链表的阻塞队列,内部使用静态内部类Node表示一个链表节点。

    2 静态内部类Node与成员变量

    Node类

    static class Node<E> {
        E item;
    ​
        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;
    ​
        Node(E x) { item = x; }
    }

     一个Node对象代表一个链表节点,其属性item表示当前节点保存的元素,next属性表示当前节点的后继节点。

    成员变量

    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;
    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();
    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;
    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;
    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();
    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();
    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();
    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

    capacity:队列的容量。在没有显式指定时,使用Integer.MAX_VALUE作为其容量的边界。

    count:队列的元素个数

    head:链表的头节点, head.item始终为null,所以head.next才是第一个存元素的节点

    last: 链表的尾节点,last.next始终为null.

    takeLock: 出队时使用的锁

    notEmpty:出队时的“非空”条件

    putLock:入队时使用的锁。

    notFull:入队时的“未满”条件。

     

    3 构造方法

    LinkedBlockingQueue(int)是其主要的构造方法,构造方法主要涉及对容量、头尾节点的初始化。

    在使用无参构造方法时,阻塞队列的容量是Integer.MAX_VALUE,即无限大。

    在初始化后,队列中没有任何元素时,头节点 、尾节点指向同一个节点。

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility 保证可见性
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

    4 主要方法

    put

    put 方法将一个给定的元素入队 ,若队列已满就阻塞等待。

    put方法的主要逻辑:

    ①先获取入队的锁putLock,然后检查队列是否已满。②若当前队列已满,则(notFull.await())休眠等待直到队列未满时才被唤醒。

    ③若当前队列未满,执行enqueue,将元素入队,④若此元素入队后队列仍未满,就就唤醒一个等待“未满“条件的线程。

    ⑤最后若入队前队列为空,就就唤醒一个等待“非空”条件的线程

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /**
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                //容量已满就休眠等待
                notFull.await();
            }
            enqueue(node);//在容量未满时入队
            c = count.getAndIncrement();//当前元素入队前的元素个数
            //(count-1) +1 <capacity,即count<capacity, 未满就尝试唤醒一个等待“未满条件”的线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        //count-1=0  若此时队列中只有一个元素(即入队前队列为空),就唤醒一个等待“非空”条件的线程
        if (c == 0)  
            signalNotEmpty();
    }

    enqueue将一新元素入队,它的主要逻辑是:将原尾节点的后继节点设为新入队的node,同时将node节点作为新的尾节点。

    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null; 
        last = last.next = node;//即 last.next=node; last=node
    }

    signalNotEmpty唤醒一个等待“非空”条件的线程

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

     

    offer

    offer(E)尝试将一个给定的元素入队 ,成功入队返回true ,若队列已满则返回false.

    此方法与put方法的逻辑大致相似,只在此方法在队列已满时会直接返回,而不会阻塞等待。

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {//队列未满才入队
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;//队列已满是,此时c=-1
    }

    offer(E e, long , TimeUnit )方法可以看成put方法的超时版本,在队列已满时,若阻塞超时后仍无法入队就返回false.

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    ​
        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                if (nanos <= 0)//已经超时
                    return false;
                nanos = notFull.awaitNanos(nanos);//休眠给定的时长
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

    add

    LinkedBlockingQueue没有自己定义add方法,它直接使用父类AbstractQueue的add方法。

    它将核心逻辑委托给offer方法,若入队失败则抛出异常。

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

    take

    take() 将队首元素出队,若此时队列为空则阻塞等待。

    ①先获取出队的锁takeLock,然后检查队列是已空。 ②若当前队列已空,则(notEmpty.await())休眠等待直到队列非空时才被唤醒。

    ③若当前队列非空,执行dequeue,将队首元素出队,④若出队后队列仍非空,就就尝试唤醒一个等待“非空”条件的线程。

    ⑤最后若在出队前队列已满,就就唤醒一个等待“未满”条件的线程

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) { //队列为空,就休眠等待
                notEmpty.await();
            }
            x = dequeue();//出队
            c = count.getAndDecrement();//出队前的元素个数
            if (c > 1) //count -1>1,即count>0 ,非空时唤醒一个等待“非空”条件的线程
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        //count-1==capacity  还差一个元素队列就满时(即出队前是满),唤醒一个等待“未满”条件的线程
        if (c == capacity)
            signalNotFull();
        return x;
    }

    dequeue将队首元素出队。其主要逻辑是:将原头节点的后继节点作为新的头节点,将首元素的引用从队列中清空,再返回这个首元素。

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next; //首元素是头节点的后继节点,因为头节点不存储元素值 head.item一直是null
        h.next = h; // help GC  next属性自指  迭代器能根据next自指检测此节点已被删除
        head = first; //新的头节点是原头节点的后继节点
        E x = first.item; //获取首元素值
        first.item = null; //将原首元素从队列中删除
        return x;
    }

    signalNotFull 尝试唤醒一个等待”未满“条件的线程。

    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

    poll

    poll(E) 尝试出队,若成功出队就返回此元素 ,若队列为空则返回null.

    此方法与take方法的逻辑大致相似,只是在队列已空时会直接返回null,而不会阻塞等待。

        public E poll() {
            final AtomicInteger count = this.count;
            if (count.get() == 0)
                return null;
            E x = null;
            int c = -1;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                if (count.get() > 0) {//队列非空时出队
                    x = dequeue();
                    c = count.getAndDecrement();
                    if (c > 1)
                        notEmpty.signal();
                }
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }

    poll(long , TimeUnit)可以看作take方法的超时版本,若超时后仍未能出队,就返回null.

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)//已经超时
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

     

    peek

    peek方法返回队列的首元素但不删除它,若队列为空,则返回null

    public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }

    remove

    remove移除指定元素。

    其主要逻辑:先同时获取putLock 、takeLock锁,然后从头节点遍历链表,若链表的某个节点存储了这个元素,就将此节点删除返回true,反之就返回false.

    public boolean remove(Object o) {
        if (o == null) return false;
        fullyLock();//同时获取putLock和takeLock
        try {
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {//找到对就的元素,就将它从链表中删除
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();
        }
    }

    fullyLock同时获取putLock 、takeLock锁,而fullyUnlock同时释放putLock 、takeLock锁,两都恰好对应。这里同时要获取两把锁的原因在于:putLock锁保证在队列尾部添加元素的线程安全,takeLock锁保证在队列头部删除元素的线程安全,而remove方法需要从头到尾遍历所有元素且无法确定是在队列的头部 、尾部或中间位置删除元素,所以这里要同时获取这两把锁,阻止任何添加、删除元素的操作。

    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }
    ​
    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }

    unlink方法从链表中删除指定节点p. 其主要逻辑是:①将p节点从链表中删除;②若被删除节点p是尾节点,就将p的前驱节点作为新的尾节点;③若此节点被删除前,队列已满,就唤醒一个等待”未满“条件的线程

    void unlink(Node<E> p, Node<E> trail) {
        // assert isFullyLocked();
        // p.next is not changed, to allow iterators that are
        // traversing p to maintain their weak-consistency guarantee.
        p.item = null;//属性赋空,便于GC
        //trail是p的前驱节点
        trail.next = p.next; //p的前驱节点和后继节点直接链接起来,p被排除在外,p节点已从链表中删除
        if (last == p)//若p是尾节点,就将p的前驱节点作为新的尾节点
            last = trail;
        //若此元素被删除前,队列已满,就唤醒一个等待”未满“条件的线程
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }

    drainTo

    drainTo将元素批量出队。其主要逻辑:

    先获取takeLock锁,然后从头节点开始将maxElements个元素从队列中移除并将之添加至集合c中,再更新链表的头节点、队列元素个数。最后若在批量出队前队列已满,就唤醒一个等待”未满“条件的线程。

    public int drainTo(Collection<? super E> c) {//将所有元素出队
        return drainTo(c, Integer.MAX_VALUE);
    }
    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        boolean signalNotFull = false;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            int n = Math.min(maxElements, count.get());
            // count.get provides visibility to first n Nodes
            Node<E> h = head;
            int i = 0;
            try {
                while (i < n) {  
                    Node<E> p = h.next;
                    c.add(p.item);//添加到目标集合中
                    p.item = null;
                    h.next = h;
                    h = p;
                    ++i;
                }
                return n;
            } finally {
                // Restore invariants even if c.add() threw
                if (i > 0) {
                    // assert h.item == null;
                    head = h;//更新头节点
                    signalNotFull = (count.getAndAdd(-i) == capacity); //将count减少i
                }
            }
        } finally {
            takeLock.unlock();
            if (signalNotFull)//若批量出队前,队列已满,就唤醒一个等待”未满“条件的线程
                signalNotFull();
        }
    }

    其它方法

    contains 和clear 方法都需要同时获取putLock 、takeLock锁 ,这两个方法都需要遍历链表的所有节点,必须保证此时没有其他任何添加、删除节点的操作。

    LinkedBlockingQueue使用AtomicInteger类型原子变量来对元素个数计数,所以size方法不需要锁来保证线程安全。

    public boolean contains(Object o) {
        //遍历所有节点,查看某个节点是否存储了这个元素
        if (o == null) return false;
        fullyLock();
        try {
            for (Node<E> p = head.next; p != null; p = p.next)
                if (o.equals(p.item))
                    return true;
            return false;
        } finally {
            fullyUnlock();
        }
    }
    public void clear() {
        fullyLock();
        try {
            //将所有元素清空
            for (Node<E> p, h = head; (p = h.next) != null; h = p) {
                h.next = h;
                p.item = null;
            }
            head = last;//更新头节点,头尾节点指向同一节点,队列中没有任何元素了
            // assert head.item == null && head.next == null;
            if (count.getAndSet(0) == capacity)//清空所有元素前,队列已满,就唤醒一个等待"未满"条件的线程
                notFull.signal();
        } finally {
            fullyUnlock();
        }
    }
    ​
     public int size() {
            return count.get();
     }

    5 迭代器Itr

    LinkedBlockingQueue的iterator方法返回一个迭代器对象 ,Itr实现了Iterator接口。Itr是LinkedBlockingQueue的一个成员内部类,也就是说它与外部类LinkedBlockingQueue相绑定,Itr可以直接访问LinkedBlockingQueue的实例方法、实例变量。

    public Iterator<E> iterator() {
        return new Itr();
    }

    Itr有3个属性,current表示当前的遍历到的链表节点,lastRet表示上一个链表节点,currentElement表示当前链表节点中保存的元素值。

    private Node<E> current;
    private Node<E> lastRet;
    private E currentElement;

    构造方法 初始化current和currentElement属性

    Itr() {
        fullyLock();
        try {
            current = head.next;//首元素对应的节点(直接访问外部类的属性)
            if (current != null) 
                currentElement = current.item;//首元素
        } finally {
            fullyUnlock();
        }
    }

    next返回当前迭代应返回的元素

    public E next() {
        fullyLock();
        try {
            if (current == null)
                throw new NoSuchElementException();
            E x = currentElement;
            lastRet = current;//将current设为上次迭代的节点
            current = nextNode(current); //将current重设为下次迭代的节点
            currentElement = (current == null) ? null : current.item;//同时更新下次迭代的元素引用
            return x;
        } finally {
            fullyUnlock();
        }
    }

    nextNode() 返回指定节点p的后继节点

    private Node<E> nextNode(Node<E> p) {
        for (;;) {
            Node<E> s = p.next;
            if (s == p)  //next属性自指
               //  之前分析dequeue方法时,在出队时会将原头节点next属性自指。所以在逻辑上原头节点p已被删除
                //需要重新获取头节点,因此应返回新头节点的后继节点,即首元素所在节点
                return head.next;
            
            //s==null表明p是尾节点,p没有后继节点,所以返回null
            //s.item!=null 表明s是一个有效的后继节点,所以返回后继节点s
            if (s == null || s.item != null)
                return s;
            
                 //s != p  && s.item=null,
            //item为空但next属性不自指,表示节点s在链表(非头尾)中间位置,且在逻辑上s已被删除,
            //(可能是remove(Object)方法在队列中部删除了元素) 需要继续向后查找有效节点
            p = s;
        }
    }

    remove方法移除当前迭代的元素,此方法与外部类的remove方法类似。

    public void remove() {
        if (lastRet == null)
            throw new IllegalStateException();
        fullyLock();
        try {
            Node<E> node = lastRet;
            lastRet = null;
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (p == node) {
                    unlink(p, trail);//调用外部类的方法
                    break;
                }
            }
        } finally {
            fullyUnlock();
        }
    }

    迭代器Itr的所有公共方法需要同时获取putLock 、takeLock这两把锁,这是因为在迭代过程中必须阻止其他外部添加、删除元素的操作,否则无法保证数据的一致性。

    6 总结

    ①与ArrayBlockingQueue相比, LinkedBlockingQueue使用了putLock 、takeLock两把锁,它们分别保护入队、出队操作的线程安全,在出队的同时还可以入队,反之亦是。而ArrayBlockingQueue只有一把锁,在出队时不能入队,可以看出LinkedBlockingQueue使用了更细粒度的锁,提高了并发效率。

    ②某些操作需要遍历整个链表或在链表的(非头尾)中部删除添加节点时,必须要先同时获取putLock 、takeLock这两把锁,这时可能面临激烈的线程竞争,线程被阻塞,一定程度上会降低并发效率。

    ③LinkedBlockingQueue会在入队后检查当前队列是否未满,LinkedBlockingQueue若检测到入队后队列仍未满,(若存在这样的线程)它将进行“未满”通知(notFull.signal(),唤醒一个等待“未满”条件的线程 )。而ArrayBlockingQueue在入队后始终不会进行“未满”通知。对于出队操作面言,亦是如此. 总之,尽可能唤醒更多的线程。

    ④ArrayBlockingQueue在入队一个元素后会进行无条件"非空"通知(notFull.signal()),而LinkedBlockingQueue 在成功一个元素后会检测入队前队列是否为空。只有在入队前队列为空,LinkedBlockingQueue才会进行""非空""通知(notFull.signal()) 。对于出队操作面言,亦是如此. 简单来说,减少不必要的通知。

     

     

  • 相关阅读:
    OAccflow集成sql
    集成备注
    CCflow与基础框架组织机构整合
    Jeecg_Jflow整合记录
    Problem opening .cshtml files
    The document cannot be opened. It has been renamed, deleted or moved.
    js中!和!!的区别及用法
    hdu 4757 Tree(可持久化字典树)
    Tomcat 学习进阶历程之Tomcat架构与核心类分析
    BZOJ 3000(Big Number-Stirling公式求n!近似值)
  • 原文地址:https://www.cnblogs.com/gocode/p/analysis-source-code-of-LinkedBlockingQueue.html
Copyright © 2011-2022 走看看