zoukankan      html  css  js  c++  java
  • JDK源码那些事儿之LinkedBlockingQueue

    今天继续讲解阻塞队列,涉及到了常用线程池的其中一个队列LinkedBlockingQueue,从类命名部分我们就可以看出其用意,队列中很多方法名是通用的,只是每个队列内部实现不同,毕竟实现的都是同一个接口BlockingQueue,可以自行查看接口源码,下面我们一起看下LinkedBlockingQueue实现的源码部分

    前言

    JDK版本号:1.8.0_171

    LinkedBlockingQueue是链表实现的线程安全的无界的阻塞队列

    • 内部是通过Node节点组成的链表来实现的
    • 线程安全说明的是内部通过两个ReentrantLock锁保护竞争资源,实现了多线程对竞争资源的互斥访问,这里入队和出队互不影响
    • 无界,默认链表长度为Integer.MAX_VALUE,本质上还是有界
    • 阻塞队列,是指多线程访问竞争资源时,当竞争资源已被某线程获取时,其它要获取该资源的线程需要阻塞等待

    队列通过Node对象组成的链表实现,与ArrayBlockingQueue不同的地方在于,ArrayBlockingQueue是有界的,初始化需指定长度,LinkedBlockingQueue不定义长度时,默认Integer.MAX_VALUE,相当于“无界”了,但是这样会造成一些问题,这部分后边说,同时保证并发和阻塞部分使用了2个互斥锁分别对入队和出队互斥操作,这样来看,独立开来提升了队列的吞吐量,入队和出队操作可同时进行

    类定义

    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable
    

    继承关系图

    常量/变量

        /**
         * Linked list node class
         * 
         * 链表Node 
         * next引用指向后一个Node
         */
        static class Node<E> {
            E item;
    
            /**
             * One of:
             * - the real successor Node
             * - this Node, meaning the successor is head.next
             * - null, meaning there is no successor (this is the last node)
             */
            Node<E> next;
    
            Node(E x) { item = x; }
        }
    
        /** The capacity bound, or Integer.MAX_VALUE if none */
        // 链表容量大小,不传参数,默认Integer.MAX_VALUE
        // 这里final说明一旦确定链表容量就不能再改变了
        private final int capacity;
    
        /** Current number of elements */
        // 队列实际包含元素的长度,这里使用了原子类保证数据的准确性
        private final AtomicInteger count = new AtomicInteger();
    
        /**
         * Head of linked list.
         * Invariant: head.item == null
         */
        // 链表头节点,head.item == null
        transient Node<E> head;
    
        /**
         * Tail of linked list.
         * Invariant: last.next == null
         */
        // 链表尾节点,last.next == null
        private transient Node<E> last;
    
        /** Lock held by take, poll, etc */
        // 出队操作互斥锁
        private final ReentrantLock takeLock = new ReentrantLock();
        /** Wait queue for waiting takes */
        // 非空信号量,当无元素时阻塞等待入队操作
        private final Condition notEmpty = takeLock.newCondition();
        // 入队操作互斥锁
        /** Lock held by put, offer, etc */
        private final ReentrantLock putLock = new ReentrantLock();
        /** Wait queue for waiting puts */
        // 非满信号量,当队列已满时阻塞等待出队操作
        private final Condition notFull = putLock.newCondition();
    

    构造方法

    无参构造方法中默认取Integer.MAX_VALUE,使得链表容量限制为最大值,同时初始化头尾节点,值置为null

        public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }
        
        public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);
        }
    
        public LinkedBlockingQueue(Collection<? extends E> c) {
            this(Integer.MAX_VALUE);
            final ReentrantLock putLock = this.putLock;
            // 这里使用锁的原因和ArrayBlockingQueue相同,确保可见性,因为链表本身并不保证可见性,防止并发操作下链表不一致的情况出现
            putLock.lock(); // Never contended, but necessary for visibility
            try {
                int n = 0;
                for (E e : c) {
                    if (e == null)
                        throw new NullPointerException();
                    if (n == capacity)
                        throw new IllegalStateException("Queue full");
                    enqueue(new Node<E>(e));
                    ++n;
                }
                // 设置队列长度
                count.set(n);
            } finally {
                putLock.unlock();
            }
        }
    

    重要方法

    signalNotEmpty/signalNotFull

    在每次唤醒非空信号量的等待线程时,需要先获取出队互斥锁,简单说,就是当队列为空时,有线程在执行出队操作,通过notEmpty.await()阻塞等待,这时有线程入队操作,调用signalNotEmpty()唤醒执行notEmpty.await()的阻塞线程,在唤醒这个线程之前必须拿到takeLock互斥锁,为什么?因为执行唤醒操作的时候要获取到该signal对应的Condition对象的锁才行,在ArrayBlockingQueue中是同样的操作

        /**
         * Signals a waiting take. Called only from put/offer (which do not
         * otherwise ordinarily lock takeLock.)
         */
        private void signalNotEmpty() {
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
        }
    
        /**
         * Signals a waiting put. Called only from take/poll.
         */
        private void signalNotFull() {
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
                notFull.signal();
            } finally {
                putLock.unlock();
            }
        }
    

    enqueue/dequeue

    入队出队最终调用的方法,enqueue方法首先将原尾节点的next引用指向新节点,然后将尾节点更新为新节点。dequeue方法移除头节点,更新头节点,注意这里实际上返回的节点是第二个节点,因为头节点head.item == null

        /**
         * Links node at end of queue.
         *
         * @param node the node
         */
        private void enqueue(Node<E> node) {
            // assert putLock.isHeldByCurrentThread();
            // assert last.next == null;
            last = last.next = node;
        }
    
        /**
         * Removes a node from head of queue.
         *
         * @return the node
         */
        private E dequeue() {
            // assert takeLock.isHeldByCurrentThread();
            // assert head.item == null;
            // 使用h保存原头节点,头节点这里head.item == null
            Node<E> h = head;
            // 使用first保存原头节点之后的节点,实际上的第一个节点,我们需要的出队节点也是这个节点
            Node<E> first = h.next;
            // 原头节点next指向自己,这里指向自己在后边迭代器中nextNode方法中有用到,通过这种方式判断节点类型
            h.next = h; // help GC
            // 头节点更新为第二个节点
            head = first;
            // 保存第二个节点的值
            E x = first.item;
            // 更新头节点head.item == null,之后这个节点将作为头节点
            first.item = null;
            return x;
        }
    

    fullyLock/fullyUnlock

    两个操作全部加锁,在删除,验证是否包含某个元素,迭代等操作中使用

        /**
         * Locks to prevent both puts and takes.
         */
        void fullyLock() {
            putLock.lock();
            takeLock.lock();
        }
    
        /**
         * Unlocks to allow both puts and takes.
         */
        void fullyUnlock() {
            takeLock.unlock();
            putLock.unlock();
        }
    

    put

    put入队操作,队列已满时阻塞等待,队列未满则插入队列同时判断是否唤醒其他入队线程和出队线程,几种入队操作区别同ArrayBlockingQueue中的说明,这里不再一一说明了

    
        public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            // Note: convention in all put/take/etc is to preset local var
            // holding count negative to indicate failure unless set.
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            // 可中断putLock锁
            putLock.lockInterruptibly();
            try {
                // 队列已满则入队操作线程阻塞等待
                while (count.get() == capacity) {
                    notFull.await();
                }
                // 队列未满则入队操作
                enqueue(node);
                // 原子类更新队列长度值,返回值为原count的值
                c = count.getAndIncrement();
                // 再次判断队列是否有可用空间,如果有唤醒下一个线程进行添加操作
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            // 队列原本有0条数据,现在有了1条数据,则唤醒消费线程进行消费
            // 因为原本队列无元素,消费线程都被阻塞,只需要判断有一条数据的时候就可以
            if (c == 0)
                signalNotEmpty();
        }
    

    take

    take出队操作,队列为空时阻塞等待,队列非空时则出队同时队列长度减1,同时判断是否唤醒其他出队操作,方法区别同样可参考前面的ArrayBlockingQueue文章

        public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
                // 队列为空则等待
                while (count.get() == 0) {
                    notEmpty.await();
                }
                // 出队操作
                x = dequeue();
                // 减1
                c = count.getAndDecrement();
                // 队列是否有可用空间
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            // 队列原本是满的状态,现在一条数据出队,则可以唤醒非满信号量,进行入队操作
            if (c == capacity)
                signalNotFull();
            return x;
        }
    

    remove操作调用,删除p与trail节点之间的关系,重新构建p.next节点和trail节点关系,相当于删除p节点后对引用的处理

        /**
         * Unlinks interior Node p with predecessor trail.
         */
        void unlink(Node<E> p, Node<E> trail) {
            // assert isFullyLocked();
            // p.next is not changed, to allow iterators that are
            // traversing p to maintain their weak-consistency guarantee.
            // 删除节点置空
            p.item = null;
            // 删除节点前一个节点指向删除节点的后一个节点
            trail.next = p.next;
            // p是最后一个节点,则删除p后最后一个节点为trail
            if (last == p)
                last = trail;
            // 删除p之前队列是已满状态则删除p后调用notFull.signal()唤醒入队线程操作
            if (count.getAndDecrement() == capacity)
                notFull.signal();
        }
    

    contains

    判断是否包含某个对象,在执行时需同时获得入队锁和出队锁,保证在判断过程中不会有数据的变更。在toArray,toString,clear方法中都是如此

        /**
         * Returns {@code true} if this queue contains the specified element.
         * More formally, returns {@code true} if and only if this queue contains
         * at least one element {@code e} such that {@code o.equals(e)}.
         *
         * @param o object to be checked for containment in this queue
         * @return {@code true} if this queue contains the specified element
         */
        public boolean contains(Object o) {
            if (o == null) return false;
            // 获取两个锁
            fullyLock();
            try {
                // 正常循环判断
                for (Node<E> p = head.next; p != null; p = p.next)
                    if (o.equals(p.item))
                        return true;
                return false;
            } finally {
                fullyUnlock();
            }
        }
    

    drainTo

    一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁,在接口中已经声明这个方法,在ArrayBlockingQueue中同样有这个方法

        public int drainTo(Collection<? super E> c, int maxElements) {
            // 检查
            if (c == null)
                throw new NullPointerException();
            if (c == this)
                throw new IllegalArgumentException();
            if (maxElements <= 0)
                return 0;
            // 唤醒非满信号量为false
            boolean signalNotFull = false;
            final ReentrantLock takeLock = this.takeLock;
            // 获取takeLock互斥锁
            takeLock.lock();
            try {
                // 获取能获取队列的正常长度
                int n = Math.min(maxElements, count.get());
                // count.get provides visibility to first n Nodes
                Node<E> h = head;
                int i = 0;
                try {
                    // 将值放入c
                    while (i < n) {
                        Node<E> p = h.next;
                        c.add(p.item);
                        p.item = null;
                        h.next = h;
                        h = p;
                        ++i;
                    }
                    // 返回获取的队列长度n
                    return n;
                } finally {
                    // Restore invariants even if c.add() threw
                    // 重新保存常量
                    if (i > 0) {
                        // assert h.item == null;
                        head = h;
                        // getAndAdd返回未执行前的值,与队列容量相等,则说明之前队列是已满状态,入队线程全部阻塞,
                        // 而这里i > 0 则表明此时操作完队列未满可以唤醒入队线程
                        signalNotFull = (count.getAndAdd(-i) == capacity);
                    }
                }
            } finally {
                takeLock.unlock();
                // 唤醒入队线程
                if (signalNotFull)
                    signalNotFull();
            }
        }
    

    迭代器及内部类

    每次调用iterator创建Itr内部类,与ArrayBlockingQueue不同,LinkedBlockingQueue,内部类Itr没有那么复杂,通过fullyLock和fullyUnlock方法在每次迭代时需要获取锁才能操作,保证不会数据错乱

        public Iterator<E> iterator() {
            return new Itr();
        }
    
        private class Itr implements Iterator<E> {
            /*
             * Basic weakly-consistent iterator.  At all times hold the next
             * item to hand out so that if hasNext() reports true, we will
             * still have it to return even if lost race with a take etc.
             */
    
            private Node<E> current;
            private Node<E> lastRet;
            private E currentElement;
    
            Itr() {
                fullyLock();
                try {
                    // 保存头节点的下一个节点,头节点是无值的
                    current = head.next;
                    if (current != null)
                        // 获取第一个有值的数据
                        currentElement = current.item;
                } finally {
                    fullyUnlock();
                }
            }
    
            public boolean hasNext() {
                return current != null;
            }
    
            /**
             * Returns the next live successor of p, or null if no such.
             *
             * Unlike other traversal methods, iterators need to handle both:
             * - dequeued nodes (p.next == p)
             * - (possibly multiple) interior removed nodes (p.item == null)
             */
            // 找到迭代的下一个节点
            private Node<E> nextNode(Node<E> p) {
                for (;;) {
                    Node<E> s = p.next;
                    // p.next == p 说明已经出队,上边方法dequeue中有提到
                    // 这里直接使用head.next即可
                    if (s == p)
                        return head.next;
                    // 节点为空或者节点值不为空则返回这个节点
                    // 节点为空说明是队列尾,直接返回即可
                    // 节点值不为空说明已找到下一个节点,同样返回
                    if (s == null || s.item != null)
                        return s;
                    // s.item == null 可能节点被删除了,则继续判断下一个节点
                    p = s;
                }
            }
    
            public E next() {
                fullyLock();
                try {
                    if (current == null)
                        throw new NoSuchElementException();
                    E x = currentElement;
                    // 保存上次迭代的值
                    lastRet = current;
                    // 计算保存下次迭代值
                    current = nextNode(current);
                    currentElement = (current == null) ? null : current.item;
                    return x;
                } finally {
                    fullyUnlock();
                }
            }
    
            // 调用迭代的remove删除的是lastRet对应的值
            public void remove() {
                if (lastRet == null)
                    throw new IllegalStateException();
                fullyLock();
                try {
                    Node<E> node = lastRet;
                    lastRet = null;
                    for (Node<E> trail = head, p = trail.next;
                         p != null;
                         trail = p, p = p.next) {
                        if (p == node) {
                            // 删除p,调整链表
                            unlink(p, trail);
                            break;
                        }
                    }
                } finally {
                    fullyUnlock();
                }
            }
        }
    

    使用说明

    对于新手而言,LinkedBlockingQueue队列在线程池的使用中可能会出现一些问题,主要问题在于其创建的方式,新手使用封装类提供的方法,比如下面示例代码:

        ExecutorService pool = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            pool.submit(() -> System.out.println(Thread.currentThread().getName()));
        }
    

    从源码中我们可以看到其创建线程池时使用的队列方式如下:

    new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
    

    这会造成什么问题呢?

    默认无参构造,从LinkedBlockingQueue源码部分我们也能看到无参时默认链表最大容量为Integer.MAX_VALUE,假如我们设置了核心线程数和最大线程数都为5之后,如果线程一直被占用而没有释放,同时又有很多任务向线程池申请线程使用,这时我们会将任务放入队列中保存,生产者的速度远大于消费者,堆积的请求处理队列可能会耗费非常大的内存,甚至OOM

    所以阿里规范中提及了这部分内容,指出了其中存在的隐患,需要规避资源耗尽的风险,开发人员应直接使用ThreadPoolExecutor来创建线程池,每个参数需要根据自己的需求进行设置

    总结

    通过对LinkedBlockingQueue源码的解读我们可以了解到如下与ArrayBlockingQueue不同的内容:

    1. 内部通过Node对象链表实现
    2. 内部有2个Lock,分别对应入队和出队操作,两者可同时进行,提高了吞吐量
    3. 线程池使用LinkedBlockingQueue时,最好根据需要自行创建,设置大小,否则有可能引起OOM问题

    以上内容如有问题欢迎指出,笔者验证后将及时修正,谢谢

    作者:freeorange
    个人博客网站:https://www.gclearning.cn/
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须在文章页面给出原文连接,否则保留追究法律责任的权利。
  • 相关阅读:
    iOS
    iOS
    iOS
    CS页面-Asp.net+Spring.Net.Framework--SNF快速开发平台3.0
    SNF快速开发平台3.0之-界面个性化配置+10种皮肤+7种菜单-Asp.net+MVC4.0+WebAPI+EasyUI+Knockout
    SNF快速开发平台3.0之BS页面展示和九大优点-部分页面显示效果-Asp.net+MVC4.0+WebAPI+EasyUI+Knockout
    MVC通用控件库展示-MVC4.0+WebAPI+EasyUI+Knockout--SNF快速开发平台3.0
    已有打开的与此 Command 相关联的 DataReader,必须首先将它关闭。
    转:ECharts图表组件之简单关系图:如何轻松实现另类站点地图且扩展节点属性实现点击节点页面跳转
    转:zTree树控件入门之checkbox:如何动态设置节点的checkbox选择框启用与禁用状态(chkDisabled)
  • 原文地址:https://www.cnblogs.com/freeorange/p/11331917.html
Copyright © 2011-2022 走看看