zoukankan      html  css  js  c++  java
  • java阻塞队列之LinkedBlockingQueue

    LinkedBlockingQueue是BlockingQueue中的其中一个,其实现方式为单向链表,下面看其具体实现。(均为JDK8)

    一、构造函数

    在LinkedBlockingQueue中有三个构造函数,如下图,

    1、LinkedBlockingQueue()

    这是一个无参的构造函数,其定义如下,

    public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }

    在这个构造函数中调用了有参的构造函数,传入的整型值为Integer所能表示的最大值(0x7fffffff)。下面看这个带参数的构造方法。

    2、LinkedBlockingQueue(int capacity)

    这是一个整型参数的构造方法其,定义如下,

    public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);
        }

    从其实现上来看,其整型参数代表此队列的容量,即元素的最大个数;然后初始化了last和head两个变量,我们猜last应该是此队列的尾元素、head为此队列的头元素。这里有一个Node类,下面看Node类的实现,

    static class Node<E> {
            E item;
    
            /**
             * One of:
             * - the real successor Node
             * - this Node, meaning the successor is head.next
             * - null, meaning there is no successor (this is the last node)
             */
            Node<E> next;
    
            Node(E x) { item = x; }
        }

    Node是LinkedBlockingQueue的静态内部类,也是组成此队列的节点元素,其内部有两个属性,一个Item代表节点元素,next指向下一个Node节点,这样就可以得出LinkedBlockingQueue的结构是使用Node连接起来,头节点为head,尾节点为last。

    3、LinkedBlockingQueue(Collection<? extends E> c)

    此构造方法是使用一个集合类进行构造,其定义如下

    public LinkedBlockingQueue(Collection<? extends E> c) {
            this(Integer.MAX_VALUE);
            final ReentrantLock putLock = this.putLock;
            putLock.lock(); // Never contended, but necessary for visibility
            try {
                int n = 0;
                for (E e : c) {
                    if (e == null)
                        throw new NullPointerException();
                    if (n == capacity)
                        throw new IllegalStateException("Queue full");
                    enqueue(new Node<E>(e));
                    ++n;
                }
                count.set(n);
            } finally {
                putLock.unlock();
            }
        }

    4、LinkedBlockingQueue的属性

    从上面的构造函数中可以大体了解其属性有容量(capacity),队列中的元素数量(count)、头节点(head)、尾节点(last)等,如下

    /** The capacity bound, or Integer.MAX_VALUE if none */
        private final int capacity;
    
        /** Current number of elements */
        private final AtomicInteger count = new AtomicInteger();
    
        /**
         * Head of linked list.
         * Invariant: head.item == null
         */
        transient Node<E> head;
    
        /**
         * Tail of linked list.
         * Invariant: last.next == null
         */
        private transient Node<E> last;
    
        /** Lock held by take, poll, etc */
        private final ReentrantLock takeLock = new ReentrantLock();
    
        /** Wait queue for waiting takes */
        private final Condition notEmpty = takeLock.newCondition();
    
        /** Lock held by put, offer, etc */
        private final ReentrantLock putLock = new ReentrantLock();
    
        /** Wait queue for waiting puts */
        private final Condition notFull = putLock.newCondition();

    使用原子类AtomicInteger的count表示队列中元素的个数,可以很好的处理并发,AtomicInteger底层大都是使用乐观锁进行操作,多线程下是安全的。

    关注下takeLock、putLock这两个属性,这里定义了两把锁,一把take锁,另一把put锁。通过两把可重入锁实现并发,与ArrayBlockingQueue的一把锁相比。

    二、队列的操作

    需要使用阻塞队列,那么就需要向队列中添加或取出元素,在LinkedBlockingQueue中已经实现了相关操作,对于添加/取出均是成对出现,提供的方法中有抛出异常、返回false、线程阻塞等几种情形。

    1、put/take

    put/take是一对互斥操作,put向队列中添加元素,take从队列中取出元素。

    1.1、put

    put方法定义如下,

    public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            // Note: convention in all put/take/etc is to preset local var
            // holding count negative to indicate failure unless set.
            int c = -1;
            Node<E> node = new Node<E>(e);
            //获得添加锁,
            final ReentrantLock putLock = this.putLock;
            //获得当前队列中的元素数量
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();
            try {
                /*
                 * Note that count is used in wait guard even though it is
                 * not protected by lock. This works because count can
                 * only decrease at this point (all other puts are shut
                 * out by lock), and we (or some other waiting put) are
                 * signalled if it ever changes from capacity. Similarly
                 * for all other uses of count in other wait guards.
                 */
                //如果当前队列中元素的数量等于队列的容量,则阻塞当前线程,
                while (count.get() == capacity) {
                    notFull.await();
                }
                enqueue(node);
                //当前线程中元素数量增1,返回操作前的数量
                c = count.getAndIncrement();
                //c+1其实是当前队列中元素的数量,如果比容量小,则唤醒notFull的操作,即可以进行继续添加,执行put等添加操作。
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
    //说明在执行enqueue前的数量为0,执行完enqueue后数量为1,则需要唤醒取进程。
    if (c == 0) signalNotEmpty(); }

    此方法的执行步骤大体如下,

    • 判断要put的元素e是否为null,如果为null直接抛出空指针异常;
    • e不为null,则使用e创建一个Node节点,获得put锁;
    • 判断当前队列中的元素数量和队列的容量,如果相等,则阻塞当前线程;
    • 如果不相等,把生成的node节点插入队列,enqueue方法定义如下,
    private void enqueue(Node<E> node) {
            // assert putLock.isHeldByCurrentThread();
            // assert last.next == null;
            last = last.next = node;
        }
    • 使用原子操作类把当前队列中的元素数量增1;如果添加后的队列中的元素数量比容量小,则表示可以继续执行put类的操作,唤醒notFull.singal();
    • 如果c=0,即在enqueue前为空,数量为0(此时会阻塞take进程),enqueue后为1,则需要唤醒take进程,如下
    private void signalNotEmpty() {
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
        }

    1.2、take

    take方法和put刚好相反,其定义如下,

    public E take() throws InterruptedException {
            E x;
            int c = -1;
            //获得当前队列的元素数量
            final AtomicInteger count = this.count;
            //获得take锁
            final ReentrantLock takeLock = this.takeLock;
            //执行take操作
            takeLock.lockInterruptibly();
            try {
                while (count.get() == 0) {
                    notEmpty.await();//阻塞当前线程
                }
                x = dequeue();
                //当前队列的数量减1,返回操作前的数量
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
    //当前队列中元素数量为capacity-1,即未满,可以调用put方法,需要唤醒阻塞在put锁上的线程
    if (c == capacity) signalNotFull(); return x; }

     此方法的执行步骤大体如下,

    • 获得take锁,表示执行take操作;
    • 获得当前队列的元素数量,如果数量为0,则阻塞当前线程,直到被中断或者被唤醒;
    • 如果当前队列的元素数量不额外i0,则执行出队操作;
    private E dequeue() {
            // assert takeLock.isHeldByCurrentThread();
            // assert head.item == null;
            Node<E> h = head;//head赋值给h
            Node<E> first = h.next;//相当于第二个节点赋值给first
            h.next = h; // help GC
            head = first;//头节点指向第二个节点
            E x = first.item;
            first.item = null;
            return x;
        }

    从上面的代码可以看出把头节点进行出队,即head指向下一个节点

    • 当前队列的元素数量减一,并返回操作前的数量;
    • 如果之前大于1(c最小为2),指向dequeue后数量最小为1,证明队列中仍有元素,需要唤醒获得take锁的其他阻塞线程,take.singal();
    • 如果c等于当前队列的容量(执行完dequeue后,当前队列中元素的数量等于capacity-1,则未满),则需要唤醒获得put锁的其他put线程;
    private void signalNotFull() {
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
    //唤醒阻塞在put锁的其他线程 notFull.signal(); }
    finally { putLock.unlock(); } }

    2、offer/poll

    2.1、offer方法

    offer方法的定义如下,

    public boolean offer(E e) {
            if (e == null) throw new NullPointerException();
            final AtomicInteger count = this.count;
            if (count.get() == capacity)
                return false;
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
                if (count.get() < capacity) {
                    enqueue(node);
                    c = count.getAndIncrement();
                    if (c + 1 < capacity)
                        notFull.signal();
                }
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
            return c >= 0;
        }

    此方法的执行步骤大体如下,

    • 判断当前队列中的元素数量和队列容量,如果相等,直接返回false;
    • 如果当前队列中元素数量小于队列容量,执行入队操作;
    • 入队操作之后,判断队列中元素数量如果仍小于队列容量,唤醒其他的阻塞线程;
    • 如果c==0(即入队成功,队列中元素的数量为1),则需要唤醒阻塞在put锁的线程;

    2.2、poll

    poll方法定义如下,

    public E poll() {
            final AtomicInteger count = this.count;
            if (count.get() == 0)
                return null;
            E x = null;
            int c = -1;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                if (count.get() > 0) {
                    x = dequeue();
                    c = count.getAndDecrement();
                    if (c > 1)
                        notEmpty.signal();
                }
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }

    此方法的执行步骤大体如下,

    • 如果当前队列元素数量为0,直接返回null;
    • 如果当前队列元素数量大于0,执行出队操作;
    • 如果c>1,即c最小为2,则出队成功后,仍有1个元素,可以唤醒阻塞在take锁的线程;
    • 如果c=capacity,则出队成功后,队列中的元素为capacity-1,这时队列为满,可以唤醒阻塞在put锁上的其他线程,即可以添加元素;

    3、offer(E e, long timeout, TimeUnit unit)/poll(long timeout, TimeUnit unit)

    3.1、offer(E e, long timeout, TimeUnit unit)

    在规定的时间内阻塞线程,其定义如下

    public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
    
            if (e == null) throw new NullPointerException();
            long nanos = unit.toNanos(timeout);
            int c = -1;
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();
            try {
                while (count.get() == capacity) {
                    if (nanos <= 0)
                        return false;
                    nanos = notFull.awaitNanos(nanos);
                }
                enqueue(new Node<E>(e));
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
            return true;
        }

    此方法的执行步骤大体如下,

    • 如果当前队列中元素的数量等于队列的容量,则在规定时间内会一直阻塞,如果超过了规定时间则直接返回false;
    • 如果在规定时间内当前队列中元素的数量不等于队列容量,则跳出了while循环,则执行入队操作;
    • 判断入队前队列中元素的数量,如果小于队列容量,则唤醒其他put锁的阻塞线程;如果等于0,则入队后元素数量大于0,则唤醒take锁阻塞的线程;

    3.2、poll(long timeout, TimeUnit unit)

    其方法定义如下,

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            E x = null;
            int c = -1;
            long nanos = unit.toNanos(timeout);
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
                while (count.get() == 0) {
                    if (nanos <= 0)
                        return null;
                    nanos = notEmpty.awaitNanos(nanos);
                }
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }

    此方法的执行步骤大体如下,

    • 如果当前队列中的元素数量为0,则在规定的时间内阻塞线程;如果超过了规定时间直接返回null;
    • 执行出队操作,出队成功后,判断出队前队列中元素的数量,如果大于1(最小为2),则唤醒其他阻塞在take锁上的线程;
    • 如果出队前队列中元素数量等于队列容量,则出队后队列中元素数量为capacity-1,则唤醒阻塞在put锁上的线程;

    三、方法比较

    3.1、添加方法比较

    序号 方法名 队列满时处理方式 方法返回值
    1 offer(E e) 返回false boolean
    2 put(E e) 线程阻塞,直到中断或被唤醒 void
    3 offer(E e, long timeout, TimeUnit unit) 在规定时间内重试,超过规定时间返回false boolean

    3.2、取出方法比较

    序号 方法名 队列空时处理方式 方法返回值
    1 poll() 返回null E
    2 take() 线程阻塞,直到中断或被唤醒 E
    3 poll(long timeout, TimeUnit unit) 在规定时间内重试,超过规定时间返回null E

    四、总结

    以上是关于LinkedBlockingQueue队列的相关实现及方法介绍,此队列使用单向链表为载体,配合put/take锁实现生产线程和消费线程共享数据。LinkedBlockingQueue作为共享的数据池,实现并发环境下的添加及取出方法。

    有不正之处欢迎指正,感谢!

  • 相关阅读:
    warning: ISO C++ forbids converting a string constant to 'char*' [-Wwrite-strings]
    Windows10+CLion+OpenCV4.5.2开发环境搭建
    Android解决部分机型WebView播放视频全屏按钮灰色无法点击、点击全屏白屏无法播放等问题
    MediaCodec.configure Picture Width(1080) or Height(2163) invalid, should N*2
    tesseract
    Caer -- a friendly API wrapper for OpenCV
    Integrating OpenCV python tool into one SKlearn MNIST example for supporting prediction
    Integrating Hub with one sklearn mnist example
    What is WSGI (Web Server Gateway Interface)?
    Hub --- 机器学习燃料(数据)的仓库
  • 原文地址:https://www.cnblogs.com/teach/p/10665947.html
Copyright © 2011-2022 走看看