zoukankan      html  css  js  c++  java
  • java阻塞队列之LinkedBlockingQueue

    LinkedBlockingQueue是BlockingQueue中的其中一个,其实现方式为单向链表,下面看其具体实现。(均为JDK8)

    一、构造函数

    在LinkedBlockingQueue中有三个构造函数,如下图,

    1、LinkedBlockingQueue()

    这是一个无参的构造函数,其定义如下,

    public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }

    在这个构造函数中调用了有参的构造函数,传入的整型值为Integer所能表示的最大值(0x7fffffff)。下面看这个带参数的构造方法。

    2、LinkedBlockingQueue(int capacity)

    这是一个整型参数的构造方法其,定义如下,

    public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);
        }

    从其实现上来看,其整型参数代表此队列的容量,即元素的最大个数;然后初始化了last和head两个变量,我们猜last应该是此队列的尾元素、head为此队列的头元素。这里有一个Node类,下面看Node类的实现,

    static class Node<E> {
            E item;
    
            /**
             * One of:
             * - the real successor Node
             * - this Node, meaning the successor is head.next
             * - null, meaning there is no successor (this is the last node)
             */
            Node<E> next;
    
            Node(E x) { item = x; }
        }

    Node是LinkedBlockingQueue的静态内部类,也是组成此队列的节点元素,其内部有两个属性,一个Item代表节点元素,next指向下一个Node节点,这样就可以得出LinkedBlockingQueue的结构是使用Node连接起来,头节点为head,尾节点为last。

    3、LinkedBlockingQueue(Collection<? extends E> c)

    此构造方法是使用一个集合类进行构造,其定义如下

    public LinkedBlockingQueue(Collection<? extends E> c) {
            this(Integer.MAX_VALUE);
            final ReentrantLock putLock = this.putLock;
            putLock.lock(); // Never contended, but necessary for visibility
            try {
                int n = 0;
                for (E e : c) {
                    if (e == null)
                        throw new NullPointerException();
                    if (n == capacity)
                        throw new IllegalStateException("Queue full");
                    enqueue(new Node<E>(e));
                    ++n;
                }
                count.set(n);
            } finally {
                putLock.unlock();
            }
        }

    4、LinkedBlockingQueue的属性

    从上面的构造函数中可以大体了解其属性有容量(capacity),队列中的元素数量(count)、头节点(head)、尾节点(last)等,如下

    /** The capacity bound, or Integer.MAX_VALUE if none */
        private final int capacity;
    
        /** Current number of elements */
        private final AtomicInteger count = new AtomicInteger();
    
        /**
         * Head of linked list.
         * Invariant: head.item == null
         */
        transient Node<E> head;
    
        /**
         * Tail of linked list.
         * Invariant: last.next == null
         */
        private transient Node<E> last;
    
        /** Lock held by take, poll, etc */
        private final ReentrantLock takeLock = new ReentrantLock();
    
        /** Wait queue for waiting takes */
        private final Condition notEmpty = takeLock.newCondition();
    
        /** Lock held by put, offer, etc */
        private final ReentrantLock putLock = new ReentrantLock();
    
        /** Wait queue for waiting puts */
        private final Condition notFull = putLock.newCondition();

    使用原子类AtomicInteger的count表示队列中元素的个数,可以很好的处理并发,AtomicInteger底层大都是使用乐观锁进行操作,多线程下是安全的。

    关注下takeLock、putLock这两个属性,这里定义了两把锁,一把take锁,另一把put锁。通过两把可重入锁实现并发,与ArrayBlockingQueue的一把锁相比。

    二、队列的操作

    需要使用阻塞队列,那么就需要向队列中添加或取出元素,在LinkedBlockingQueue中已经实现了相关操作,对于添加/取出均是成对出现,提供的方法中有抛出异常、返回false、线程阻塞等几种情形。

    1、put/take

    put/take是一对互斥操作,put向队列中添加元素,take从队列中取出元素。

    1.1、put

    put方法定义如下,

    public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            // Note: convention in all put/take/etc is to preset local var
            // holding count negative to indicate failure unless set.
            int c = -1;
            Node<E> node = new Node<E>(e);
            //获得添加锁,
            final ReentrantLock putLock = this.putLock;
            //获得当前队列中的元素数量
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();
            try {
                /*
                 * Note that count is used in wait guard even though it is
                 * not protected by lock. This works because count can
                 * only decrease at this point (all other puts are shut
                 * out by lock), and we (or some other waiting put) are
                 * signalled if it ever changes from capacity. Similarly
                 * for all other uses of count in other wait guards.
                 */
                //如果当前队列中元素的数量等于队列的容量,则阻塞当前线程,
                while (count.get() == capacity) {
                    notFull.await();
                }
                enqueue(node);
                //当前线程中元素数量增1,返回操作前的数量
                c = count.getAndIncrement();
                //c+1其实是当前队列中元素的数量,如果比容量小,则唤醒notFull的操作,即可以进行继续添加,执行put等添加操作。
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
    //说明在执行enqueue前的数量为0,执行完enqueue后数量为1,则需要唤醒取进程。
    if (c == 0) signalNotEmpty(); }

    此方法的执行步骤大体如下,

    • 判断要put的元素e是否为null,如果为null直接抛出空指针异常;
    • e不为null,则使用e创建一个Node节点,获得put锁;
    • 判断当前队列中的元素数量和队列的容量,如果相等,则阻塞当前线程;
    • 如果不相等,把生成的node节点插入队列,enqueue方法定义如下,
    private void enqueue(Node<E> node) {
            // assert putLock.isHeldByCurrentThread();
            // assert last.next == null;
            last = last.next = node;
        }
    • 使用原子操作类把当前队列中的元素数量增1;如果添加后的队列中的元素数量比容量小,则表示可以继续执行put类的操作,唤醒notFull.singal();
    • 如果c=0,即在enqueue前为空,数量为0(此时会阻塞take进程),enqueue后为1,则需要唤醒take进程,如下
    private void signalNotEmpty() {
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
        }

    1.2、take

    take方法和put刚好相反,其定义如下,

    public E take() throws InterruptedException {
            E x;
            int c = -1;
            //获得当前队列的元素数量
            final AtomicInteger count = this.count;
            //获得take锁
            final ReentrantLock takeLock = this.takeLock;
            //执行take操作
            takeLock.lockInterruptibly();
            try {
                while (count.get() == 0) {
                    notEmpty.await();//阻塞当前线程
                }
                x = dequeue();
                //当前队列的数量减1,返回操作前的数量
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
    //当前队列中元素数量为capacity-1,即未满,可以调用put方法,需要唤醒阻塞在put锁上的线程
    if (c == capacity) signalNotFull(); return x; }

     此方法的执行步骤大体如下,

    • 获得take锁,表示执行take操作;
    • 获得当前队列的元素数量,如果数量为0,则阻塞当前线程,直到被中断或者被唤醒;
    • 如果当前队列的元素数量不额外i0,则执行出队操作;
    private E dequeue() {
            // assert takeLock.isHeldByCurrentThread();
            // assert head.item == null;
            Node<E> h = head;//head赋值给h
            Node<E> first = h.next;//相当于第二个节点赋值给first
            h.next = h; // help GC
            head = first;//头节点指向第二个节点
            E x = first.item;
            first.item = null;
            return x;
        }

    从上面的代码可以看出把头节点进行出队,即head指向下一个节点

    • 当前队列的元素数量减一,并返回操作前的数量;
    • 如果之前大于1(c最小为2),指向dequeue后数量最小为1,证明队列中仍有元素,需要唤醒获得take锁的其他阻塞线程,take.singal();
    • 如果c等于当前队列的容量(执行完dequeue后,当前队列中元素的数量等于capacity-1,则未满),则需要唤醒获得put锁的其他put线程;
    private void signalNotFull() {
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
    //唤醒阻塞在put锁的其他线程 notFull.signal(); }
    finally { putLock.unlock(); } }

    2、offer/poll

    2.1、offer方法

    offer方法的定义如下,

    public boolean offer(E e) {
            if (e == null) throw new NullPointerException();
            final AtomicInteger count = this.count;
            if (count.get() == capacity)
                return false;
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
                if (count.get() < capacity) {
                    enqueue(node);
                    c = count.getAndIncrement();
                    if (c + 1 < capacity)
                        notFull.signal();
                }
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
            return c >= 0;
        }

    此方法的执行步骤大体如下,

    • 判断当前队列中的元素数量和队列容量,如果相等,直接返回false;
    • 如果当前队列中元素数量小于队列容量,执行入队操作;
    • 入队操作之后,判断队列中元素数量如果仍小于队列容量,唤醒其他的阻塞线程;
    • 如果c==0(即入队成功,队列中元素的数量为1),则需要唤醒阻塞在put锁的线程;

    2.2、poll

    poll方法定义如下,

    public E poll() {
            final AtomicInteger count = this.count;
            if (count.get() == 0)
                return null;
            E x = null;
            int c = -1;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                if (count.get() > 0) {
                    x = dequeue();
                    c = count.getAndDecrement();
                    if (c > 1)
                        notEmpty.signal();
                }
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }

    此方法的执行步骤大体如下,

    • 如果当前队列元素数量为0,直接返回null;
    • 如果当前队列元素数量大于0,执行出队操作;
    • 如果c>1,即c最小为2,则出队成功后,仍有1个元素,可以唤醒阻塞在take锁的线程;
    • 如果c=capacity,则出队成功后,队列中的元素为capacity-1,这时队列为满,可以唤醒阻塞在put锁上的其他线程,即可以添加元素;

    3、offer(E e, long timeout, TimeUnit unit)/poll(long timeout, TimeUnit unit)

    3.1、offer(E e, long timeout, TimeUnit unit)

    在规定的时间内阻塞线程,其定义如下

    public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
    
            if (e == null) throw new NullPointerException();
            long nanos = unit.toNanos(timeout);
            int c = -1;
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();
            try {
                while (count.get() == capacity) {
                    if (nanos <= 0)
                        return false;
                    nanos = notFull.awaitNanos(nanos);
                }
                enqueue(new Node<E>(e));
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
            return true;
        }

    此方法的执行步骤大体如下,

    • 如果当前队列中元素的数量等于队列的容量,则在规定时间内会一直阻塞,如果超过了规定时间则直接返回false;
    • 如果在规定时间内当前队列中元素的数量不等于队列容量,则跳出了while循环,则执行入队操作;
    • 判断入队前队列中元素的数量,如果小于队列容量,则唤醒其他put锁的阻塞线程;如果等于0,则入队后元素数量大于0,则唤醒take锁阻塞的线程;

    3.2、poll(long timeout, TimeUnit unit)

    其方法定义如下,

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            E x = null;
            int c = -1;
            long nanos = unit.toNanos(timeout);
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
                while (count.get() == 0) {
                    if (nanos <= 0)
                        return null;
                    nanos = notEmpty.awaitNanos(nanos);
                }
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }

    此方法的执行步骤大体如下,

    • 如果当前队列中的元素数量为0,则在规定的时间内阻塞线程;如果超过了规定时间直接返回null;
    • 执行出队操作,出队成功后,判断出队前队列中元素的数量,如果大于1(最小为2),则唤醒其他阻塞在take锁上的线程;
    • 如果出队前队列中元素数量等于队列容量,则出队后队列中元素数量为capacity-1,则唤醒阻塞在put锁上的线程;

    三、方法比较

    3.1、添加方法比较

    序号 方法名 队列满时处理方式 方法返回值
    1 offer(E e) 返回false boolean
    2 put(E e) 线程阻塞,直到中断或被唤醒 void
    3 offer(E e, long timeout, TimeUnit unit) 在规定时间内重试,超过规定时间返回false boolean

    3.2、取出方法比较

    序号 方法名 队列空时处理方式 方法返回值
    1 poll() 返回null E
    2 take() 线程阻塞,直到中断或被唤醒 E
    3 poll(long timeout, TimeUnit unit) 在规定时间内重试,超过规定时间返回null E

    四、总结

    以上是关于LinkedBlockingQueue队列的相关实现及方法介绍,此队列使用单向链表为载体,配合put/take锁实现生产线程和消费线程共享数据。LinkedBlockingQueue作为共享的数据池,实现并发环境下的添加及取出方法。

    有不正之处欢迎指正,感谢!

  • 相关阅读:
    创建Android项目时出错——No resource found that matches the given name 'Theme.AppCompat.Light'
    Java、Android 开发环境搭建
    tomcat7的安装与配置、及Servlet部署
    让实体对象自行决定留存操作类型(增删改)
    聚合体
    PowerDesigner15中定义varbinary(max)列
    Pig limit用法举例
    Pig join用法举例
    Pig distinct用法举例
    Pig group用法举例
  • 原文地址:https://www.cnblogs.com/teach/p/10665947.html
Copyright © 2011-2022 走看看