zoukankan      html  css  js  c++  java
  • 并发编程中的阻塞队列概述

    1.简介

    阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。 1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。 2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。 阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

    在阻塞队列不可用时,插入、移除这两个附加操作提供4种处理方式:

    4种处理试的说明:

    抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。

    返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如 果是移除方法,则是从队列里取出一个元素,如果没有则返回null。

    一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。

    超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。

    注意:如果有无界阻塞队列,队列永不会满,put或offer方法永不会被阻塞,使用offer方法永远返回true.

    2.阻塞队列分类

    主要的几种阻塞队列

    1)普通阻塞队列:ArrayBlockingQueue,基于数组的有界阻塞队列 ;LinkedBlockingQueue ,基于单向链表的有界阻塞队列;LinkedBlockingDeque, 基于双向链表的有界双向阻塞队列。

    2)优先级阻塞队列:PriorityBlockingQueue, 基于优先级排序无界阻塞队列。

    3)延时阻塞队列:DelayQueue,基于优先级队列实现的无界阻塞队列。

    4)其他阻塞队列:SynchronousQueu,不存储元素的阻塞队列;LinkedTransferQueue,基于单向链表组成的无界阻塞队列。

    阻塞队列实现的基本原理:

    阻塞队列主要是利用并发编程中的“等待/通知”模型,使用显式锁ReentrantLock和条件ConditionObject实现的。如下自定义的阻塞队列BoundBlockQueue,它使用一把锁和两个条件来实现阻塞队列。一个把锁lock,可以保护所有的访问,一个条件notEmpty用来通知线程当前队列“不满”,另一个条件notFull用来通知线程当前队列“不空”。上面介绍的7种阻塞队列其实现原理大致与此类似。

    package thread;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class BoundBlockQueue<T> {
        private       int       addIndex    = 0;
        private       int       removeIndex = 0;
        private       int       count       = 0;
        private final Object[]  items;
        private final Lock      lock        = new ReentrantLock();
        private final Condition notEmpty    = lock.newCondition();
        private final Condition notFull     = lock.newCondition();
    
        public BoundBlockQueue(int size) {
            if (size < 0) throw new IllegalArgumentException("size must large than zero");
            this.items = new Object[size];
        }
    
        public void add(T o) throws InterruptedException {
            lock.lock();
            try {
                while (count == items.length) notFull.await();
                items[addIndex++] = o;
                count++;
                if (addIndex == items.length) addIndex = 0;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
        }
    
        public T remove() throws InterruptedException {
            lock.lock();
            try {
                while (count == 0) notEmpty.await();
                Object o = items[removeIndex];
                items[removeIndex++] = null;
                count--;
                if (removeIndex == items.length) removeIndex = 0;
                notFull.signal();
                return (T) o;
            } finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) {
            BoundBlockQueue<String> queue = new BoundBlockQueue<>(9);
    /*        new Thread(() -> {
                try {
                    int i = 0;
                    while(i<=5){
                        queue.add(String.valueOf(i));
                        System.out.println("add-digital: " + i);
                        Thread.sleep(1000);
                        i++;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();*/
            new Thread(() -> {
                try {
                    int i = 1;
                    while (i <= 10) {
                        queue.add("0" + i);
                        System.out.println("add-digital: 0" + i);
                        Thread.sleep(500);
                        i++;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "addItemThread1").start();
            new Thread(() -> {
                try {
                    int i = 0;
                    while (i <= 8) {
                        String s = queue.remove();
                        System.out.println("remove-digital:" + s);
                        Thread.sleep(1000);
                        i++;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "removeItemThread1").start();
        }
    }
    BoundBlockQueue

    3.阻塞队列详述

    1)普通阻塞队列

    ArrayBlockingQueue和LinkedBlockingQueue都实现了Queue接口,表示先进先出的队列,头出尾进,而LinkedBlockingDeque实现了Deque接口,它表示一个双端队列,头尾均可进出。

    这三个阻塞队列内部都是使用显式锁ReentrantLock和显式条件CoditionObject实现的。

    (1)ArrayBlockingQueue

    ArrayBlockingQueue是一个用数组实现的有界阻塞队列(构造方法必须指定容量) ,创建时指定容量大小,且运行时容量也不会变化。默认情况下不保证线程公平地访问队列。此队列按照先进先出(FIFO)的原则对元素进行排序。队列的容量和公平性选择均在构造方法中设定。

    如下便创建了一个容量为50的公平阻塞队列。

    ArrayBlockingQueue e = new ArrayBlockingQueue(50,true);

    ArrayBlockingQueue的实现比较简单,其内部有一个数组存储元素,有两个索引分别表示头和尾,有一个变量表示当前元素个数,有一个锁保护所有的访问,有“不满‘和”不空“两个条件处理线程协作问题。

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
        private static final long serialVersionUID = -817911632652898426L;
        final Object[] items;
        int takeIndex;
        int putIndex;
        int count;
        final ReentrantLock lock;
        private final Condition notEmpty;
        private final Condition notFull;
        //.....省略
    }

    (2)LinkedBlockingQueue

    LinkedBlockingQueue是基于单向链表实现的,在创建是可指定最大长度,也可不指定,默认是无限大。LinkedBlockingQueue不支锁的公平性选择,只支持非公平锁。

    LinkedBlockingQueue queue=new LinkedBlockingQueue();
    LinkedBlockingQueue queue2=new LinkedBlockingQueue(20);

    LinkedBlockingQueue的实现与ArrayBlockingQueue有些不同,它是单向链表结构,尾进头出,为提高性能它将锁的粒度细化了,使用了两个锁,一个保护头部、一个保护尾部,每个锁绑定一个条件。

    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
                implements BlockingQueue<E>, java.io.Serializable {
            private final int capacity;
            private final AtomicInteger count = new AtomicInteger();
            transient Node<E> head;
            private transient Node<E> last;
            private final ReentrantLock takeLock = new ReentrantLock();//保护头部(出队)的锁
            private final Condition notEmpty = takeLock.newCondition();//"不空"条件
            private final ReentrantLock putLock = new ReentrantLock();//保护尾部(入队)的锁
            private final Condition notFull = putLock.newCondition();//“不满”条件
        //......省略

    (3)LinkedBlockingDeque

    LinkedBlockingDeque,基于双向链表实现,其最大长度可也是可选的,默认无限大。LinkedBlockingDeque不支锁的公平性选择,只支持非公平锁。

    LinkedBlockingDeque deque = new LinkedBlockingDeque();
    LinkedBlockingDeque deque2 = new LinkedBlockingDeque(20);

    LinkedBlockingDeque,与ArrayBlockingQueue类似,也是使用一个锁,两个条件,使用锁保护所有操作,使用“不满‘和”不空“两个条件进行线程协作。

    public class LinkedBlockingDeque<E> extends AbstractQueue<E>
            implements BlockingDeque<E>, java.io.Serializable {
        transient Node<E> first;
        transient Node<E> last;
        private transient int count;
        private final int capacity;
        final ReentrantLock lock = new ReentrantLock(); //重入锁
        private final Condition notEmpty = lock.newCondition();//"非空"条件
        private final Condition notFull = lock.newCondition();//“非满”条件
        //......省略
    }

    LinkedBlockingDeque是双向队列,它可以在队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。因其有两个插入/移除的入口,双向阻塞队列常用在Jork/Join框架的“工作窃取”模式中。

    2)优先级阻塞队列

    ProrityBlockingQueue是一个支持优先级的无界阻塞队列,(内部元素排列不是完全有序)它是按照优先级出队的(不像普通阻塞队列那样先进先出)。其内部使用数组来存储元素(逻辑上是堆,物理上是数组),它是无界的,数组的长度会动态扩展。它要求元素类型实现了Comparable接口或在构造方法中传入一个Comparator比较器,另外它不能保证同优先级元素的顺序。

    public class PriorityBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
        private static final long serialVersionUID = 5595510919245408276L;
        private transient Object[] queue; //保存元素的数组
        private transient int size;
        private transient Comparator<? super E> comparator;
        private final ReentrantLock lock; //
        private final Condition notEmpty;//“不空”的条件
        private transient volatile int allocationSpinLock;
        private PriorityQueue<E> q;//只有序列化/反序列化会用到,主要目的是与以前版本兼容。
        //......省略
    }

    ProrityBlockingQueue不支锁的公平性选择,只支持非公平锁。ProrityBlockingQueue的实现,它有一个数组保存元素,它使用一把锁保护所有的操作,(因它是无界的,永不会“满”)只使用一个“不空”的条件进行线程协作。

    3)延时阻塞队列

    DelayQueue是一个支持延时获取元素的无界阻塞队列。它内部使用一个优先级队列来储存元素,使用一把锁来保护数据,用一个“可获取”的条件表示头部是否有元素可获取,当头部元素的延时未到时,take操作会根据延时计算需要休眠的时间,然后休眠,若此进程中有新的元素入队,且成为头部元素,则部位皮肤休眠的线程会被提前唤醒然后重新检查延时。DelayQueue不支锁的公平性选择,只支持非公平锁。

    public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
            implements BlockingQueue<E> {
    
        private final transient ReentrantLock lock = new ReentrantLock();
        private final PriorityQueue<E> q = new PriorityQueue<E>();
        private Thread leader = null;
        private final Condition available = lock.newCondition();
        //.....省略
    }

    它要求元素类型要实现Delay接口,而Delay接口双继承Comparable接口,那么DelayQueue中的每一个元素都是可比较的,额外的getDelay方法返回一个再延迟多长时间的整数(小于等于零就不再延迟)。

    public interface Delayed extends Comparable<Delayed> {
        long getDelay(TimeUnit unit);
    }

    DelayQueue常用于定时任务,因为DelayQueue是按照延时时间出队的。元素只有在过期后才能出队列中拿走,若没过期就要阻塞等待。

    4)其他阻塞队列

    SynchronousQueue和LinkedTransferQueue是两个特殊的阻塞队列。

    (1)SynchronousQueue

    SynchronousQueue,与其他队列不同,它没有存储元素的空间,它的每入队操作必须等待另一个线程的出队操作,反之亦然。

    它支持公平访问队列(通过构造方法的参数指定),默认情况下线程采用非公平性策略访问队列

    SynchronousQueue适用于两个线程之间直接传递信息、事件或任务。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。

    (2)LinkedTransferQueue

    LinkedTransferQueue实现了TransferQueue接口,TransferQueue扩展了BlockingQueue接口,增加了一此其他功能.生产者在往队列中放元素时,可以等待消费者接收后再返回,适用于消息传递类型的应用。

    public interface TransferQueue<E> extends BlockingQueue<E> {
        //如果有消费者在等待,直接转给消费者,返回true,否则返回false,不入队
        boolean tryTransfer(E e);
        //如果有消费者在等待,直接转给消费者,否则入队,阻塞等待直到被消费者接口后再返回
        void transfer(E e) throws InterruptedException;
        //如果有消费者在等待,直接转给消费者,返回true,否则入队,阻塞等待限定时间,若最后被消费者接收,返回true
        boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
        //是否有消费者在等待
        boolean hasWaitingConsumer();
        //在等待的消费者个数
        int getWaitingConsumerCount();
    }

    LinkedTransferQueue,与其他阻塞队列相比,主要是多了transfer系列方法, 且这几个方法核心逻辑都是委托给xfer方法实现的。

    public void transfer(E e) throws InterruptedException {
            if (xfer(e, true, SYNC, 0) != null) {
                Thread.interrupted(); // failure possible only due to interrupt
                throw new InterruptedException();
            }
        }
       public boolean tryTransfer(E e, long timeout, TimeUnit unit)
                throws InterruptedException {
            if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
                return true;
            if (!Thread.interrupted())
                return false;
            throw new InterruptedException();
        }
        public boolean tryTransfer(E e) {
            return xfer(e, true, NOW, 0) == null;
        }

    transfer(E)方法: 如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回.

    tryTransfer(E)方法 : 与transfer类似,只这里只是“尝试”,若没有消费者在等待,它不会阻塞,会直接返回。

    transfer(E,long,TimeUnit)方法: 与transfer类似,只是这里是个限时版本。

  • 相关阅读:
    【转】编写高质量代码改善C#程序的157个建议——建议41:实现标准的事件模型
    【转】编写高质量代码改善C#程序的157个建议——建议40:使用event关键字为委托施加保护
    【转】编写高质量代码改善C#程序的157个建议——建议39:了解委托的实质
    【转】编写高质量代码改善C#程序的157个建议——建议38:小心闭包中的陷阱
    【转】编写高质量代码改善C#程序的157个建议——建议37:使用Lambda表达式代替方法和匿名方法
    7.FactoryBean 和BeanFactory去区别
    6.2-SingletonBeanRegistry-DefaultSingletonBeanRegistry
    6.1-AliasRegistry
    ConfigurableBeanFactory
    4.AutowireCapableBeanFactory 自动装配工厂
  • 原文地址:https://www.cnblogs.com/gocode/p/introduce-blockingqueue.html
Copyright © 2011-2022 走看看