zoukankan      html  css  js  c++  java
  • ArrayBlockingQueue和LinkedBlockingQueue队列

              在集合框架里,想必都用过ArrayList和LinkedList,ArrayList和ArrayBlockingQueue一样,内部基于数组来存放元素,而LinkedBlockingQueue则和LinkedList一样,内部基于链表来存放元素。

              队列常见的出队和入队方法


    根据下面代码看下 [ArrayBlockingQueue] 的源码
    ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(20,true);
            queue.put("element");//阻塞方法
            String element = queue.take();//阻塞方法

          [ArrayBlockingQueue]  构造方法

        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }

            元素入队列  [put] 方法

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//获取锁,如果interrupt,直接抛出异常
        try {
            while (count == items.length)
                notFull.await();//队列满了,阻塞等待被唤醒
            enqueue(e);//入队
        } finally {
            lock.unlock();//释放锁
        }
    }
    
    private void enqueue(E x) {
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length)//如果入队下标已满,重置putIndex
                putIndex = 0;
            count++;//队列总数+1
            notEmpty.signal();//唤醒取队列阻塞的线程
    }

         元素入队列  [offer] 方法

        public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
    
            checkNotNull(e);
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length) {
                    if (nanos <= 0)//只要时间没有超过给定值,一直自旋
                        return false;
                   //自己释放锁后重新竞争锁
                    nanos = notFull.awaitNanos(nanos);
                }
                enqueue(e);
                return true;
            } finally {
                lock.unlock();
            }
        }

         取队列元素  [take]  方法

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//获取锁,如果interrupt,直接抛出异常
        try {
            while (count == 0)
                notEmpty.await();//队列空了,阻塞等待被唤醒
            return dequeue();//出队
        } finally {
            lock.unlock();//释放锁
        }
    }
    
    private E dequeue() {
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length)//如果出队下标已满,重置takeIndex
                takeIndex = 0;
            count--;//总数减1
            //迭代器有关
            if (itrs != null)
                itrs.elementDequeued();
            notFull.signal();//唤醒入队列阻塞的线程
            return x;
    }

          取队列的非阻塞方法   [poll]  方法

        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return (count == 0) ? null : dequeue();
            } finally {
                lock.unlock();
            }
        }

            其他方法类似。

     根据下面代码看下  [LinkedBlockingQueue]  的源码
        //如果没有指定大小,默认为Integer.MAX_VALUE,也就是无界队列
        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(4);
        queue.put("element");//阻塞方法
        queue.take();//阻塞方法

       元素入队列  [put] 方法

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //获取putLock锁,如果interrupt,直接抛出异常
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();//队列满了,阻塞等待被唤醒
            }
            enqueue(node);//入队
            c = count.getAndIncrement();
            // 再次判断队列是否有可用空间,如果有唤醒下一个线程进行添加操作
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();//释放putLock锁
        }
        if (c == 0)//唤醒取队列阻塞的线程
            signalNotEmpty();
    }
    
    private void enqueue(Node<E> node) {
             //新增节点变为最后一个节点
            last = last.next = node;
    }

       元素出队列  [take] 方法

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;//总数
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly(); //获取takeLock锁,如果interrupt,直接抛出异常
        try {
            while (count.get() == 0) {
                notEmpty.await();//队列空了,阻塞等待被唤醒
            }
            x = dequeue();//出队
            c = count.getAndDecrement();
            //c > 1说明c至少为2, 队列中还有元素,唤醒下一个消费线程进行消费
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();//释放takeLock锁
        }
        if (c == capacity)//唤醒入队列阻塞的线程
            signalNotFull();
        return x;
    }
    
    private E dequeue() {
            // 获取到head节点
            Node<E> h = head;
            // 获取到head节点指向的下一个节点
            Node<E> first = h.next;
            // head节点原来指向的节点的next指向自己,等待下次gc回收
            h.next = h; // help GC
            // head节点指向新的节点
            head = first;
            // 获取到新的head节点的item值,即队列中的元素
            E x = first.item;
            first.item = null;
            return x;
        }

        元素出队列  [poll]  方法

    public E poll() {
        final AtomicInteger count = this.count;
        //poll方法和take方法不一样的地方,take是阻塞,poll直接返回null
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

         删除队列元素  [remove]  方法

    public boolean remove(Object o) {
        if (o == null) return false;
        fullyLock();//两个lock全部上锁
        try {
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {//循环查找
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();//释放锁
        }
    }
    
    void unlink(Node<E> p, Node<E> trail) {
           //删除节点后,node之间重新指向
            p.item = null;
            trail.next = p.next;
            if (last == p)
                last = trail;
            if (count.getAndDecrement() == capacity)
                notFull.signal();
    }

          总结两者的区别:

                 1.ArrayBlockingQueue是有界的,而LinkedBlockingQueue默认是无界的(可以通过指定大小来变为有界)。ArrayBlockingQueue有界就意味着我们使用ArrayBlockingQueue必须指定capacity大小。这样的话,内存空间会直接预先分配好,所以在使用LinkedBlockingQueue无界情况下时要考虑到内存实际使用问题,防止内存溢出问题的发生。

                 2.锁使用的比较。ArrayBlockingQueue内部使用1个锁来控制队列项的插入、取出操作,而LinkedBlockingQueue则是使用了2个锁来控制,一个名为putLock,另一个是takeLock,但是锁的本质都是ReentrantLock。因为LinkedBlockingQueue使用了2个锁的情况下,所以在一定程度上提升了LinkedBlockingQueue支持高并发的场景操作。

                 3.在ArrayBlockingQueue内部,因为是直接使用数组空间的,而且都是预先分配好的,所以操作没有那么复杂,而在LinkedBlockingQueue中,是通过链表进行维护的,而且每次插入的对象还要转为Node<>(e)对象,相当于多做了一步操作,但是根据LinkedBlockingQueue的官方描述,它是具有更好吞吐性能的。

                   官方解释:

                   

  • 相关阅读:
    PAT 甲级 1027 Colors in Mars
    PAT 甲级 1026 Table Tennis(模拟)
    PAT 甲级 1025 PAT Ranking
    PAT 甲级 1024 Palindromic Number
    PAT 甲级 1023 Have Fun with Numbers
    PAT 甲级 1021 Deepest Root (并查集,树的遍历)
    Java实现 蓝桥杯VIP 算法训练 无权最长链
    Java实现 蓝桥杯VIP 算法训练 无权最长链
    Java实现 蓝桥杯 算法提高 抽卡游戏
    Java实现 蓝桥杯 算法提高 抽卡游戏
  • 原文地址:https://www.cnblogs.com/dyg0826/p/11276493.html
Copyright © 2011-2022 走看看