zoukankan      html  css  js  c++  java
  • Java并发包分析——BlockingQueue

    之前因为找实习的缘故,博客1个多月没有写了。找实习的经历总算告一段落,现在重新更新博客,这次的内容是分析Java并发包中的阻塞队列
    关于阻塞队列,我之前是一直充满好奇,很好奇这个阻塞是怎么实现。现在我们先看一个该抽象类的实现类ArrayBlockingQueue。下面全部的代码均在github

    ArrayBlockingQueue

    ArrayBlockingQueue顾名思义是一种数组形式的阻塞队列,其自然就有数组的特点,即队列的长度不可改变,只有初始化的时候指定。
    下面,我们看一下例子。

    public class ArrayBlock {
    
        private BlockingQueue<String> blockingQueue;
    
        public ArrayBlock(){
            blockingQueue = new ArrayBlockingQueue<String>(3);
        }
    
        public BlockingQueue<String> getBlockingQueue() {
            return blockingQueue;
        }
    }
    
    

    创建一个大小为3的ArrayBlockingQueue,下面是一个生产者和消费者,通过ArrayBlockingQueue实现生产者/消费者模型。

    public class Producer extends Thread {
    
        private BlockingQueue<String> blockingQueue;
        @Override
        public void run() {
            super.run();
            for (int i = 0 ; i < 5;i++) {
                try {
                    blockingQueue.put(i + "");
                    System.out.println(getName() + " 生产数据");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public Producer(ArrayBlock arrayBlock){
            this.setName("Producer");
            blockingQueue = arrayBlock.getBlockingQueue();
        }
    }
    
    public class Costumer extends Thread{
    
        private BlockingQueue<String> blockingQueue;
    
        public Costumer(ArrayBlock arrayBlock) {
            blockingQueue = arrayBlock.getBlockingQueue();
            this.setName("Costumer");
        }
    
        @Override
        public void run() {
            super.run();
            while (true) {
                try {
                    Thread.sleep(6000);
                    String str = blockingQueue.take();
                    System.out.println(getName() + " 取出数据 " + str);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    测试过程就不放了,直接放出结果:

    Producer 生产数据
    Producer 生产数据
    Producer 生产数据
    Costumer 取出数据 0
    Producer 生产数据
    Costumer 取出数据 1
    Producer 生产数据
    Costumer 取出数据 2
    Costumer 取出数据 3
    Costumer 取出数据 4
    

    这可以看出put方法与take方法均是阻塞的方法。当队列已经满的时候,就会阻塞放入方法,当队列为空的时候,就会阻塞取出方法。
    下面,我们主要看这个两个方法,究竟是如何实现阻塞的。

    ** put方法 **

      public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }
    

    put方法是将元素放入到队列中,这里面可以看出是用过Lock类与Condition类来实现的,即通过等待/通知机制实现的阻塞队列。这里notFull是一个条件,当队列已经满的时候,就会执行await方法,如果没有满就执行入队(enqueue)方法。这里,判断队列已满用的是count == items.length。接下来,我们看一下take方法,来看看取数据的阻塞。

    ** take方法**

     public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
     }
    

    这里,与put方法类似,当元素为0时,就会执行await方法,上面方法中都没有直接说明signal方法的执行。其实该方法是入队与出队的方法中实现的。也就是当执行notFull.await()时,是通过dequeue()方法来通知停止等待的,可以放入元素。当执行到notEmpty.await()时,是通过enqueue来通知结束阻塞,可以取出元素。

    LinkedBlockingQueue

    LinkedBlockingQueue顾名思义是一个链表形式的阻塞队列,不同于ArrayBlockingQueue。如果不指定容量,则默认是Integer.MAX_VALUE。也就是说他是一个无界阻塞队列。他的例子与上面的类似,但是其put与take方法实现不同于ArrayBlockingQueue,但两者大致思路一致。我们只看一下put实现:

    ** put方法**

    public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            // Note: convention in all put/take/etc is to preset local var
            // holding count negative to indicate failure unless set.
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();
            try {
                /*
                 * Note that count is used in wait guard even though it is
                 * not protected by lock. This works because count can
                 * only decrease at this point (all other puts are shut
                 * out by lock), and we (or some other waiting put) are
                 * signalled if it ever changes from capacity. Similarly
                 * for all other uses of count in other wait guards.
                 */
                while (count.get() == capacity) {
                    notFull.await();
                }
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
        }
    

    这里阻塞的本质实现也是通过Condition类的等待/通知机制。但是有几点不同:

    第一 这里用了一个原子类的count计数,官方的给的注释是即使没有锁来提供保护,也能保证线程安全,实现wait guard。
    第二 ArrayBlockingQueue的通知是在入队与出队的方法中,LinkedBlockingQueue则不是,并且插入之后不满的时候,还有通知其他await的线程。
    第三 ArrayBlockingQueue的lock一直是一个,也就是put/take是用的一个锁,放与取无法实现并行。但是LinkedBlockingQueue是两个锁,放一个锁,取一个锁,可以实现put/take的并行,要高效一些。

    SynchronousQueue

    SynchronousQueue顾名思义是同步队列,特点不同于上面的阻塞队列,他是一个无界非缓存的队列,准确说他不存储元素,放入的元素,只有等待取走元素之后才能放入。也就是说任意时刻:

    • isEmpty()法永远返回是true
    • remainingCapacity() 方法永远返回是0
    • remove()和removeAll() 方法永远返回是false
    • iterator()方法永远返回空
    • peek()方法永远返回null

    元素并不会被生产者存在队列中,而是直接生产者与消费者进行交互。
    其实现是利用无锁算法,可以参考SynchronousQueue实现

    还有一点需要注意,同步队列支持公平性与非公平性。公平性是利用队列来管理多余生产者与消费者,非公平性是利用栈来管理多余生产者与消费者。

  • 相关阅读:
    Cookie 干货
    element-ui 框架中使用 NavMenu 导航菜单组件时,点击一个子菜单会出现多个子菜单同时展开或折叠?
    数组遍历的方法
    前端网页字体
    样式小收藏:完成、错误、提示动态图标样式
    多语言网站利器 rel="alternate" hreflang="x"
    网页中文章显示一部分,然后“查看全文”
    仿水滴筹中快捷留言祝福、随机生成祝福
    TypeScript知识点
    前端项目经验
  • 原文地址:https://www.cnblogs.com/qifengshi/p/6808065.html
Copyright © 2011-2022 走看看