zoukankan      html  css  js  c++  java
  • LinkedBlockingQueue 源码分析

    LinkedBlockingQueue

    LinkedBlockingQueue 是基于链表实现的,可以选择有界或无界的阻塞队列。
    队列的元素按照 FIFO 的顺序访问,新增元素添加到队列尾部,移除元素从队列头部开始。
    队列头部通过 takeLock 进行并发控制,队列尾部通过 putLock 进行并发控制,
    该队列最多可以有两个线程同时操作,其吞吐量要高于 ArrayBlockQueue。
    

    创建实例

        /**
         *  单向链表节点
         */
        static class Node<E> {
            /**
             *  存储的元素
             */
            E item;
    
            /**
             * One of:
             * - the real successor Node
             * - this Node, meaning the successor is head.next
             * - null, meaning there is no successor (this is the last node)
             */
            Node<E> next;
    
            Node(E x) { item = x; }
        }
    
        /** 阻塞队列的容量,如果不指定,则为 Integer.MAX_VALUE */
        private final int capacity;
    
        /** 当前的元素总数 */
        private final AtomicInteger count = new AtomicInteger();
    
        /**
         *  链表头结点
         * Invariant: head.item == null
         */
        transient Node<E> head;
    
        /**
         *  链表尾节点
         * Invariant: last.next == null
         */
        private transient Node<E> last;
    
        /** 执行读取操作时必须持有的锁 */
        private final ReentrantLock takeLock = new ReentrantLock();
    
        /** 队列为空时执行的 take 操作,当前线程将在此条件阻塞等待  */
        private final Condition notEmpty = takeLock.newCondition();
    
        /** 执行写入操作时必须持有的锁 */
        private final ReentrantLock putLock = new ReentrantLock();
    
        /** 队列已满时执行的 put 操作,当前线程将在此条件阻塞等待 */
        private final Condition notFull = putLock.newCondition();
    
        /**
         *  创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue 实例
         */
        public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }
    
        /**
         *  创建一个容量为 capacity 的 LinkedBlockingQueue 实例
         */
        public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) {
                throw new IllegalArgumentException();
            }
            this.capacity = capacity;
            last = head = new Node<>(null);
        }
    

    写入元素

    • 在队列尾部插入元素,如果队列已满,则阻塞等待有可用空间之后,再尝试插入
        /**
         *  在队列尾部插入元素,如果队列已满,则阻塞等待有可用空间之后,再尝试插入。
         */
        @Override
        public void put(E e) throws InterruptedException {
            if (e == null) {
                throw new NullPointerException();
            }
            // Note: convention in all put/take/etc is to preset local var
            // holding count negative to indicate failure unless set.
            int c = -1;
            // 创建新节点
            final Node<E> node = new Node<>(e);
            // 读取写锁
            final ReentrantLock putLock = this.putLock;
            // 读取元素计数值
            final AtomicInteger count = this.count;
            // 可中断地锁定
            putLock.lockInterruptibly();
            try {
                // 如果当前队列已满
                while (count.get() == capacity) {
                    // 则在非满条件上阻塞等待,线程被唤醒后进行重试
                    notFull.await();
                }
                // 加入队列尾部
                enqueue(node);
                // 递增元素总数
                c = count.getAndIncrement();
                // 如果添加元素之后还有可用空间
                if (c + 1 < capacity) {
                    // 唤醒在非满条件上阻塞等待的线程
                    notFull.signal();
                }
            } finally {
                // 释放写锁
                putLock.unlock();
            }
            // 如果是添加的第一个元素
            if (c == 0) {
                // 唤醒在非空条件上阻塞等待的线程来读取元素
                signalNotEmpty();
            }
        }
    
        /**
         *  将节点插入队列尾部
         */
        private void enqueue(Node<E> node) {
            last = last.next = node;
        }
    
        /**
         * Signals a waiting take. Called only from put/offer
         * 唤醒一个等待读取元素的线程
         */
        private void signalNotEmpty() {
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
        }
    
    • 如果队列有可用空间,则将元素添加到队列尾部,并返回 true;如果队列已满,则立即返回 false,元素被丢弃。
        /**
         *  如果队列有可用空间,则将元素添加到队列尾部,并返回 true;
         *  如果队列已满,则立即返回 false,元素被丢弃。
         */
        @Override
        public boolean offer(E e) {
            if (e == null) {
                throw new NullPointerException();
            }
            final AtomicInteger count = this.count;
            // 队列已满
            if (count.get() == capacity) {
                // 返回 false
                return false;
            }
            int c = -1;
            // 新建节点
            final Node<E> node = new Node<>(e);
            // 读取写锁
            final ReentrantLock putLock = this.putLock;
            // 获取锁
            putLock.lock();
            try {
                // 进行二次判断
                if (count.get() < capacity) {
                    // 将节点加入到队列尾部
                    enqueue(node);
                    // 递增元素总个数
                    c = count.getAndIncrement();
                    if (c + 1 < capacity) {
                        notFull.signal();
                    }
                }
            } finally {
                putLock.unlock();
            }
            if (c == 0) {
                signalNotEmpty();
            }
            return c >= 0;
        }
    
    • 在指定的超时时间内,尝试将元素插入到队列尾部,插入成功返回 true
        /**
         *  在指定的超时时间内,尝试将元素插入到队列尾部,插入成功返回 true
         */
        @Override
        public boolean offer(E e, long timeout, TimeUnit unit)
                throws InterruptedException {
            if (e == null) {
                throw new NullPointerException();
            }
            // 转换为纳秒
            long nanos = unit.toNanos(timeout);
            int c = -1;
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();
            try {
                // 队列已满
                while (count.get() == capacity) {
                    // 已经超时则直接返回 false
                    if (nanos <= 0L) {
                        return false;
                    }
                    // 最多阻塞等待指定的超时时间后再次尝试添加元素
                    nanos = notFull.awaitNanos(nanos);
                }
                enqueue(new Node<>(e));
                c = count.getAndIncrement();
                if (c + 1 < capacity) {
                    notFull.signal();
                }
            } finally {
                putLock.unlock();
            }
            if (c == 0) {
                signalNotEmpty();
            }
            return true;
        }
    

    读取元素

    • 如果队列为空,则阻塞等待有可用元素,否则移除并获取队列头部元素
        /**
         *  如果队列为空,则阻塞等待有可用元素,否则移除并获取队列头部元素
         * created by ZXD at 6 Dec 2018 T 20:20:15
         * @return
         * @throws InterruptedException
         */
        @Override
        public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
                // 队列为空
                while (count.get() == 0) {
                    // 在非空条件上阻塞等待,线程被唤醒后再次尝试读取
                    notEmpty.await();
                }
                // 移除队列头部元素
                x = dequeue();
                // 递减总数
                c = count.getAndDecrement();
                // 如果还有元素可用
                if (c > 1) {
                    // 唤醒在非空条件上阻塞等待的线程
                    notEmpty.signal();
                }
            } finally {
                takeLock.unlock();
            }
            // 如果有可用空间
            if (c == capacity) {
                // 唤醒在非满条件上阻塞等待的线程来插入元素
                signalNotFull();
            }
            return x;
        }
    
        /**
         *  移除并返回头部节点元素
         */
        private E dequeue() {
            // 读取头节点
            final Node<E> h = head;
            // 读取后继节点
            final Node<E> first = h.next;
            // 清除旧头结点
            h.next = h; // help GC
            // 写入新头节点
            head = first;
            // 读取元素值
            final E x = first.item;
            // 清除头结点的元素值,它只作为一个标记节点
            first.item = null;
            // 返回元素值
            return x;
        }
    
    
    
    • 如果队列为空,则直接返回 null,否则尝试移除并返回头部元素
        /**
         *  如果队列为空,则直接返回 null,否则尝试移除并返回头部元素
         * created by ZXD at 6 Dec 2018 T 20:23:51
         * @return
         */
        @Override
        public E poll() {
            final AtomicInteger count = this.count;
            // 队列为空时直接返回 null
            if (count.get() == 0) {
                return null;
            }
            E x = null;
            int c = -1;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                // 有元素可用
                if (count.get() > 0) {
                    // 移除并返回头部元素
                    x = dequeue();
                    c = count.getAndDecrement();
                    if (c > 1) {
                        notEmpty.signal();
                    }
                }
            } finally {
                takeLock.unlock();
            }
            if (c == capacity) {
                signalNotFull();
            }
            return x;
        }
    
    • 尝试在指定的超时时间内移除并返回头部元素,如果已经超时,则返回 null
        /**
         *  尝试在指定的超时时间内移除并返回头部元素,如果已经超时,则返回 null
         * created by ZXD at 6 Dec 2018 T 20:27:27
         * @param timeout
         * @param unit
         * @return
         * @throws InterruptedException
         */
        @Override
        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            E x = null;
            int c = -1;
            long nanos = unit.toNanos(timeout);
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
                // 队列为空
                while (count.get() == 0) {
                    // 已经超时则直接返回 null
                    if (nanos <= 0L) {
                        return null;
                    }
                    // 在非空条件上阻塞等待指定的纳秒数,被唤醒后再次进行读取
                    nanos = notEmpty.awaitNanos(nanos);
                }
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1) {
                    notEmpty.signal();
                }
            } finally {
                takeLock.unlock();
            }
            if (c == capacity) {
                signalNotFull();
            }
            return x;
        }
    
  • 相关阅读:
    BZOJ_2588_Spoj 10628. Count on a tree_树剖+主席树
    BZOJ_1901_Zju2112 Dynamic Rankings_树状数组+主席树
    单例模式
    JDBC连接数据库查询信息的步骤(提取成配置文件方式)
    JDBC访问数据库查询信息的步骤(硬编码格式)
    大数据
    accp
    递归
    struts2中Action到底是什么,怎么理解
    转发和重定向的区别(简单解释)
  • 原文地址:https://www.cnblogs.com/zhuxudong/p/10079026.html
Copyright © 2011-2022 走看看