zoukankan      html  css  js  c++  java
  • ArrayBlockingQueue和LinkedBlockingQueue队列

              在集合框架里,想必都用过ArrayList和LinkedList,ArrayList和ArrayBlockingQueue一样,内部基于数组来存放元素,而LinkedBlockingQueue则和LinkedList一样,内部基于链表来存放元素。

              队列常见的出队和入队方法


    根据下面代码看下 [ArrayBlockingQueue] 的源码
    ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(20,true);
            queue.put("element");//阻塞方法
            String element = queue.take();//阻塞方法

          [ArrayBlockingQueue]  构造方法

        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }

            元素入队列  [put] 方法

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//获取锁,如果interrupt,直接抛出异常
        try {
            while (count == items.length)
                notFull.await();//队列满了,阻塞等待被唤醒
            enqueue(e);//入队
        } finally {
            lock.unlock();//释放锁
        }
    }
    
    private void enqueue(E x) {
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length)//如果入队下标已满,重置putIndex
                putIndex = 0;
            count++;//队列总数+1
            notEmpty.signal();//唤醒取队列阻塞的线程
    }

         元素入队列  [offer] 方法

        public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
    
            checkNotNull(e);
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length) {
                    if (nanos <= 0)//只要时间没有超过给定值,一直自旋
                        return false;
                   //自己释放锁后重新竞争锁
                    nanos = notFull.awaitNanos(nanos);
                }
                enqueue(e);
                return true;
            } finally {
                lock.unlock();
            }
        }

         取队列元素  [take]  方法

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//获取锁,如果interrupt,直接抛出异常
        try {
            while (count == 0)
                notEmpty.await();//队列空了,阻塞等待被唤醒
            return dequeue();//出队
        } finally {
            lock.unlock();//释放锁
        }
    }
    
    private E dequeue() {
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length)//如果出队下标已满,重置takeIndex
                takeIndex = 0;
            count--;//总数减1
            //迭代器有关
            if (itrs != null)
                itrs.elementDequeued();
            notFull.signal();//唤醒入队列阻塞的线程
            return x;
    }

          取队列的非阻塞方法   [poll]  方法

        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return (count == 0) ? null : dequeue();
            } finally {
                lock.unlock();
            }
        }

            其他方法类似。

     根据下面代码看下  [LinkedBlockingQueue]  的源码
        //如果没有指定大小,默认为Integer.MAX_VALUE,也就是无界队列
        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(4);
        queue.put("element");//阻塞方法
        queue.take();//阻塞方法

       元素入队列  [put] 方法

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //获取putLock锁,如果interrupt,直接抛出异常
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();//队列满了,阻塞等待被唤醒
            }
            enqueue(node);//入队
            c = count.getAndIncrement();
            // 再次判断队列是否有可用空间,如果有唤醒下一个线程进行添加操作
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();//释放putLock锁
        }
        if (c == 0)//唤醒取队列阻塞的线程
            signalNotEmpty();
    }
    
    private void enqueue(Node<E> node) {
             //新增节点变为最后一个节点
            last = last.next = node;
    }

       元素出队列  [take] 方法

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;//总数
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly(); //获取takeLock锁,如果interrupt,直接抛出异常
        try {
            while (count.get() == 0) {
                notEmpty.await();//队列空了,阻塞等待被唤醒
            }
            x = dequeue();//出队
            c = count.getAndDecrement();
            //c > 1说明c至少为2, 队列中还有元素,唤醒下一个消费线程进行消费
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();//释放takeLock锁
        }
        if (c == capacity)//唤醒入队列阻塞的线程
            signalNotFull();
        return x;
    }
    
    private E dequeue() {
            // 获取到head节点
            Node<E> h = head;
            // 获取到head节点指向的下一个节点
            Node<E> first = h.next;
            // head节点原来指向的节点的next指向自己,等待下次gc回收
            h.next = h; // help GC
            // head节点指向新的节点
            head = first;
            // 获取到新的head节点的item值,即队列中的元素
            E x = first.item;
            first.item = null;
            return x;
        }

        元素出队列  [poll]  方法

    public E poll() {
        final AtomicInteger count = this.count;
        //poll方法和take方法不一样的地方,take是阻塞,poll直接返回null
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

         删除队列元素  [remove]  方法

    public boolean remove(Object o) {
        if (o == null) return false;
        fullyLock();//两个lock全部上锁
        try {
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {//循环查找
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();//释放锁
        }
    }
    
    void unlink(Node<E> p, Node<E> trail) {
           //删除节点后,node之间重新指向
            p.item = null;
            trail.next = p.next;
            if (last == p)
                last = trail;
            if (count.getAndDecrement() == capacity)
                notFull.signal();
    }

          总结两者的区别:

                 1.ArrayBlockingQueue是有界的,而LinkedBlockingQueue默认是无界的(可以通过指定大小来变为有界)。ArrayBlockingQueue有界就意味着我们使用ArrayBlockingQueue必须指定capacity大小。这样的话,内存空间会直接预先分配好,所以在使用LinkedBlockingQueue无界情况下时要考虑到内存实际使用问题,防止内存溢出问题的发生。

                 2.锁使用的比较。ArrayBlockingQueue内部使用1个锁来控制队列项的插入、取出操作,而LinkedBlockingQueue则是使用了2个锁来控制,一个名为putLock,另一个是takeLock,但是锁的本质都是ReentrantLock。因为LinkedBlockingQueue使用了2个锁的情况下,所以在一定程度上提升了LinkedBlockingQueue支持高并发的场景操作。

                 3.在ArrayBlockingQueue内部,因为是直接使用数组空间的,而且都是预先分配好的,所以操作没有那么复杂,而在LinkedBlockingQueue中,是通过链表进行维护的,而且每次插入的对象还要转为Node<>(e)对象,相当于多做了一步操作,但是根据LinkedBlockingQueue的官方描述,它是具有更好吞吐性能的。

                   官方解释:

                   

  • 相关阅读:
    解决winfrom下TextBox不支持透明背景色
    C# Winform 怎么让按钮在Panel里居中显示
    DevExpress Cpicturebox或者Dev控件 PictureEdit 按比例的缩放加载图片
    Alluxio : 开源分布式内存文件系统
    yarn cluster和yarn client模式区别——yarn-cluster适用于生产环境,结果存HDFS;而yarn-client适用于交互和调试,也就是希望快速地看到application的输出
    Linux 反弹shell(二)反弹shell的本质
    浅谈摄像头有关的安全问题
    pyspark AttributeError: 'NoneType' object has no attribute 'setCallSite'
    大规模异常滥用检测:基于局部敏感哈希算法——来自Uber Engineering的实践
    pyspark minHash LSH 查找相似度
  • 原文地址:https://www.cnblogs.com/dyg0826/p/11276493.html
Copyright © 2011-2022 走看看