zoukankan      html  css  js  c++  java
  • 阻塞队列——LinkedBlockingQueue源码分析

    一、前言

      这几天准备研究一下Java中阻塞队列的实现。Java中的阻塞队列有七种,我准备逐一研究它们的源码,然后每一个阻塞队列写一篇分析博客,这是其中的第二篇。这篇博客就来说一说阻塞队列中比较常用的一种——LinkedBlockingQueue

      之前写了一篇分析ArrayBlockingQueue源码的博客,感兴趣的可以看一看:阻塞队列——ArrayBlockingQueue源码分析


    二、正文

    2.1 什么是阻塞队列

      在正式分析前,先简单介绍一下什么是阻塞队列。在说阻塞队列前,先要了解生产者消费者模式

    生产者消费者模式:生产者生产产品,将生产好的产品放入一个缓冲区域,消费者消费产品,它从缓冲区域获取生产者生产的产品进行消费。缓冲区域有容量限制,若缓存区域已经满了,则生产者需要停止生产,等待缓冲区有空闲位置后,再恢复生产;若缓冲区为空,则消费者需要等待,直到缓冲区中有产品后,才能进行消费;

      阻塞队列就是基于这种模式实现的队列型容器。阻塞队列的一般实现是:我们创建队列时,指定队列的容量,当队列中元素的个数已经满时,向队列中添加元素的线程将被阻塞,直到队列不满才恢复运行,将元素添加进去;当队列为空时,向队列获取元素的线程将被阻塞,直到队列不空才恢复运行,从队列中拿出元素。

      以上是阻塞队列的一般实现,根据具体情况的不同,也会有所差异,比如有的是基于链表实现,有的是基于数组实现;有的是阻塞队列的没有容量限制(无界),而有的是有限制的(有界)。我们现在要分析的LinkedBlockingQueue就是一个基于链表实现的有界阻塞队列。下面我们就来从源码的角度分析一下LinkedBlockingQueue


    2.2 LinkedBlockingQueue类的成员变量

      我们先来看看LinkedBlockingQueue类的成员变量,了解它的成员变量对于我们理解它的实现原理会用很大的帮助:

    /** 记录阻塞队列允许的最大容量 */
    private final int capacity;
    
    /** 使用int的原子类记录队列中元素的个数 */
    private final AtomicInteger count = new AtomicInteger();
    
    /** LinkedBlockingQueue基于链表实现,head记录链表头节点 */
    transient Node<E> head;
    
    /** LinkedBlockingQueue基于链表实现,last记录链尾头节点 */
    private transient Node<E> last;
    
    /** ReentrantLock锁对象,用来保证获取元素时的线程同步 */
    private final ReentrantLock takeLock = new ReentrantLock();
    
    /** takeLock上创建的条件对象,在队列为空时,通过此对象来阻塞消费者线程 */
    private final Condition notEmpty = takeLock.newCondition();
    
    /** ReentrantLock锁对象,用来保证添加元素时的线程同步 */
    private final ReentrantLock putLock = new ReentrantLock();
    
    /** putLock上创建的条件对象,在队列满时,通过此对象来阻塞生产者线程 */
    private final Condition notFull = putLock.newCondition();
    

      通过以上成员变量,我们可以知道很多信息:LinkedBlockingQueue是基于链表实现的,且有capacity变量证明它是一个有界阻塞队列;成员变量中有两个lock对象,分别用来同步生产者线程和消费者线程,减小了锁的粒度,生产者和消费者可以同时运行,这两个锁对象使用默认构造函数创建,也就是说创建的是非公平锁。既然LinkedBlockingQueue是由链表实现,那我们就来看看链表的节点实现:

    static class Node<E> {
        // 节点值
        E item;
        // 下一个节点的引用
        Node<E> next;
    
        Node(E x) { item = x; }
    }
    

      可以看到,Node类的实现非常简单,一个存储值的变量,一个指向下一个节点的引用,仅此而已。接下来我们看看构造方法:

    2.3 LinkedBlockingQueue的构造方法

    /** 带参构造方法,参数为阻塞队列的容量 */
    public LinkedBlockingQueue(int capacity) {
        // 容量必须大于0
        if (capacity <= 0) throw new IllegalArgumentException();
        // 记录容量
        this.capacity = capacity;
        // 初始化链表,创建一个无值的Node,头尾指针均指向它
        last = head = new Node<E>(null);
    }
    
    // 默认构造方法
    public LinkedBlockingQueue() {
        // 调用带参构造方法,默认容量为int的最大值
        this(Integer.MAX_VALUE);
    }
    

      构造方法的逻辑也比较简单,唯一值得注意的是:若使用默认构造方法创建,则阻塞队列的默认容量为int的最大值


    2.4 入队方法的实现

      对于一个阻塞队列来说,最核心的就是它入队和出队的实现,下面我们就来分析一下LinkedBlockingQueue类中入队方法的实现。元素入队的方法有多个,我们先来分析其中最核心的方法——put方法:

    public void put(E e) throws InterruptedException {
        // 新元素不能为空
        if (e == null) 
            throw new NullPointerException();
        
        // 初始化一个c变量,后面用于记录插入新元素前,队列中元素的个数
        int c = -1;
        // 将新元素封装成一个Node
        Node<E> node = new Node<E>(e);
        // 因为是添加元素,所以这里使用put锁进行线程同步,先获取put锁
        final ReentrantLock putLock = this.putLock;
        // 获取记录元素个数的变量
        final AtomicInteger count = this.count;
        
        // 在正式操作前,先使用putLock加锁,调用的是lockInterruptibly方法
        // 这个方法在在线程被阻塞时可以响应中断,使用它是防止线程一直无法添加成功,
        // 长期被阻塞在此处
        putLock.lockInterruptibly();
        try {
            // 添加元素前线判断队列中元素是否已经满了,
            // 若满了则使用notFull对象,让当前线程阻塞,直到被另一个线程唤醒
            // 使用while而不是if,目的是防止线程被唤醒时,
            // 队列仍然是满的,所以需要重复判断
            while (count.get() == capacity) {
                notFull.await();
            }
            
            // 调用enqueue方法将新节点加入队列的尾部
            enqueue(node);
            // getAndIncrement方法返回count的旧值,然后让count+1
            c = count.getAndIncrement();
            
            /**************关键点1****************/
            // c + 1就是插入当前这个节点后,队列中元素的数量
            // 若插入这个元素后,队列依旧没有满,则唤醒一个生产者线程
            // 也就是向队列中添加元素的线程(前提是有这么一个线程)。
            // 为什么是在添加一个元素后,唤醒另外一个生产者线程,
            // 而不是在有线程获取元素后,唤醒一个生产者线程呢,这不是才正常吗?
            // 答案就是消费者线程使用的是take锁,而唤醒生产者线程需要的是put锁,
            // 为了减少额外加锁解锁的次数,我们就可以在这里唤醒生产者线程,
            // 因为这里在添加元素前,已经获取了put锁了,不需要重复获取。
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            // 解锁
            putLock.unlock();
        }
        
        /**************关键点2****************/
        // c表示插入之前,队列中元素的个数,若插入之前c == 0,
        // 表示在插入前,队列是空,意味着很有可能存在正在等待的消费者线程
        // 于是调用signalNotEmpty方法唤醒一个消费者线程。
        // 可以看到,只有当添加元素之前,队列为空,这里才会唤醒一个消费者线程。
        // 但是可能有多个消费者线程在等待,此时唤醒一个,那剩下的那些怎么办?
        // 答案就是可以在获取元素的方法中唤醒它们,原理就是上面那段很长的注释
        if (c == 0)
            signalNotEmpty();
    }
    
    /** 此方法将节点加入到链表的末尾 */
    private void enqueue(Node<E> node) {
        /*
         * 以下代码可以分解为:
         * last.next = node;
         * last = node
        */
        last = last.next = node;
    }
    
    /** 此方法用于唤醒一个消费者线程 */
    private void signalNotEmpty() {
        // 因为消费者线程是获取元素,所以使用的是take锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            // notEmpty由take锁创建,所以需要先锁定take锁,再调用signal方法
            // 否则将抛出异常
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }
    

      以上就是put方法的实现,逻辑还是比较简单的,它的过程概括来说就是:将需要添加的元素封装成为一个Node,然后获取put锁,在添加前先判断当前队列是否已经满了,若满了,则会被阻塞等待,直到被其他线程唤醒;队列未满时,将元素添加到链表的末尾,若添加完后,队列依旧没有满,则再唤醒一个生产者线程。添加元素完成后,若判断添加前,队列为空,则很有可能有消费者线程在等待,于是唤醒一个消费者线程put方法中,我用注释标记出了两个关键点,这两个关键点是作者对锁的一个优化,take方法中也有这两个关键点,它们相互对应,请结合take方法理解。下面我们再来看看另一个添加元素的方法offer

    /** 向队列中添加元素,但是不会阻塞 */
    public boolean offer(E e) {
        // 判空
        if (e == null) 
            throw new NullPointerException();
        // 获取记录元素数量的count变量
        final AtomicInteger count = this.count;
        // 如果当前队列已经满了,则直接返回false,不进行添加
        if (count.get() == capacity)
            return false;
        
        // 此变量的作用于put方法中一致
        int c = -1;
        // 封装成Node
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        // 因为是添加元素,所以使用put锁进行锁定,这里调用的lock()方法
        // lock方法不响应中断,这里不需要响应中断,所以选择使用lock,
        // 不需要响应中断是因为这个方法并不会阻塞线程
        putLock.lock();
        try {
            // 再次判断当前队列是否满了,为什么再次判断?
            // 因为在上一次判断后,CPU可能暂停了当前线程,转而执行其他线程
            // 在这个过程中可能有其他线程向队列中添加了元素
            if (count.get() < capacity) {
                // 调用enqueue方法将元素添加到队列的尾部
                enqueue(node);
                // 此处操作与put方法中相同,不重复描述
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            // 解锁
            putLock.unlock();
        }
        // 此处操作与put方法相同
        if (c == 0)
            signalNotEmpty();
        // c记录的是添加前,队列中元素的个数,不可能出现负数,所以此处返回的一定是true
        return c >= 0;
    }
    
    
    /**
     * 此方法向队列中添加元素,若队列已经满了,则线程被阻塞,但是参数中限制了阻塞时间
     * 若超时后,当前线程还没有添加成功,则不继续等待,直接返回
     * 参数:
     * 	1、timeout:超时时间的数量级,一个long类型的整数
     *	2、unit:timeout的单位,例如TimeUnit.SECOND表示的就是秒
    */
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        
        if (e == null) 
            throw new NullPointerException();
        // 将超时时间转换为纳秒
        long nanos = unit.toNanos(timeout);
        // 以下几句与put方法相同
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            // 若当前队列已经满了,则线程需要进入等待
            while (count.get() == capacity) {
                // 判断等待的剩余时间是否<=0,若满足此条件,表示等待时间已经超时
                if (nanos <= 0)
                    return false;
                // 使用notFull对象让当前线程阻塞,传入需要阻塞的时间,但是这个方法并不精确
                // 所以会返回剩余需要阻塞的时间,这也就是为什么上一句需要判断nanos <= 0
                nanos = notFull.awaitNanos(nanos);
            }
            // 以下操作和put相同
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }
    

      以上就是offer方法的实现,可以看到,offer方法被重载了两次,第一个是直接向队列中添加元素,不会被阻塞,添加成功返回true,失败则返回false;而另外一个offer方法,在无法添加时会被阻塞,但是限定了阻塞的超时时间,若超时还未添加成功,则不会继续等待。


    2.5 出队方法的实现

      看完了入队的方法实现,下面再来看看出队的方法实现。出队的方法主要有两个,poll方法,以及阻塞队列的核心方法之一——take方法。下面我们就先来看看take方法:

    public E take() throws InterruptedException {
        E x;
        // 此变量的作用与put方法中一致,用来记录插入前,队列中元素的个数
        int c = -1;
        final AtomicInteger count = this.count;
        // 由于是向队列中获取元素,所以使用的是take锁
        final ReentrantLock takeLock = this.takeLock;
        
        // 实际操作前先锁定,调用lockInterruptibly锁定,且这个方法响应中断
        // 此处需要响应中断,因为这个方法会阻塞线程
        takeLock.lockInterruptibly();
        try {
            // 若队列为空,则当前线程需要等待,直到被其他线程唤醒
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 调用dequeue获取队列的队头元素
            x = dequeue();
            // 获得count的值,然后count + 1
            c = count.getAndDecrement();
            
            /*************关键点1***************/
            // 若队列中原来的元素数量>1,则表示当前线程拿走一个元素后,队列中还有元素
            // 于是此处唤醒其他消费者线程,让他们获取元素
            // 此处很关键,对应着put方法中的关键点2(注意是put,而不是此方法take),put方法中,
            // 只有添加元素前,队列为空,才会唤醒一个消费者线程,而剩余的消费者线程在此处唤醒
            // 因为这里已经拿到了take锁,不需要为了唤醒消费者线程再次获取take锁
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        
        /*******************关键点2*******************/
        // 此处判断,若在获取元素之前,队列是满的,那说明很有可能有生产者线程在等待
        // 因为这里拿走了一个元素,所以队列有空位了,于是就唤醒一个生产者线程,添加元素
        // 值得注意的是,等待的生产者线程可能不止一个,这里只唤醒了一个,剩下的怎么办,
        // 答案就在put方法的关键点2那里,由put方法唤醒了剩下的生产者线程
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
    /** 此方法获取队列头节点的值,并将头节点从队列删除 */
    private E dequeue() {
        // 获取头节点
        Node<E> h = head;
        // 获取头节点的下一个节点
        Node<E> first = h.next;
        // 让头节点指向自己,也就是让头节点从链表上断开
        h.next = h; // help GC
        // 设置新的头节点,也就是原来头节点的下一个节点
        head = first;
        // 获取原来头节点的值
        E x = first.item;
        // 将原来头节点的值置为空,有助于垃圾回收
        first.item = null;
        // 返回结果
        return x;
    }
    

      以上方法的逻辑也是比较简单的,相信有了注释理解起来不会太难。上面的take方法中,需要中点关注的就是我用注释标记出的关键点1关键点2,它们分别对应于put方法中的关键点2和关键点1,这样编写代码的意图就是为了尽量少获取锁,减少频繁获取和释放锁导致的资源消耗,提高性能。除了take方法,还有另外一个元素出队的方法poll,他被重载了两次,下面来看一看:

    public E poll() {
        // 获取元素数量数量
        final AtomicInteger count = this.count;
        // 若队列为空,则直接返回null,表示获取失败
        if (count.get() == 0)
            return null;
        
        // 以下几句代码于take方法相同
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        
        // 调用lock()方法加锁,不响应中断,因为当前方法不会阻塞,所以可以不用响应中断
        takeLock.lock();
        try {
            // 再次判断队列是否为空,因为在上一次判断之后,CPU可能暂停了这个线程,
            // 转而执行其他线程,这个过程中可能有线程向队列中添加了元素
            if (count.get() > 0) {
                // 以下代码均与take方法中相同,不重复解释
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
    
    /**
     * 此方法向队列中获取元素,若队列为空线程将会被阻塞,但是需要指定超时时间,
     * 超时后,线程还未获取元素,直接返回;
     * 参数timeout和unit的含义与会超时的offer方法相同,
     * 分别表示超时时间的数量级,已经时间的单位
    */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        // 将超时时间转换为纳秒
        long nanos = unit.toNanos(timeout);
        // 以下方法与take方法相同,不重复解释
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            // 若队列为空,则线程需要等待
            while (count.get() == 0) {
                // 判断等待的剩余时间,若剩余时间<=0,表示已经超时,直接返回null
                if (nanos <= 0)
                    return null;
                // 让当前线程在notEmpty中的等待nanos纳秒,因为awaitNanos方法不精确,
                // 所以这个方法会返回一个值,表示剩余需要等待的时间,所以才有了上一句的if
                nanos = notEmpty.awaitNanos(nanos);
            }
            // 以下代码与take相同,不重复解释
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    

      好了,关于LinkedBlockingQueue中的方法就说到这里。我们仔细观察这些方法会发现,它们的实现步骤大同小异,我写的很多注释都是重复的,因为每一个方法中都有类似的操作。


    2.6 LinkedBlockingQueue的优势与劣势

      最后,我们来分析以下LinkedBlockingQueue的优势。这里主要介绍它相对于ArrayBlockingQueue的优势和劣势:

    • LinkedBlockingQueue内部使用了两把锁进行线程同步,一把锁同步消费者线程,一把锁同步生产者线程,这也就意味着在添加元素时,不会影响获取元素,反之亦然。由于消费者操作的是队头,而生产者操作的是队尾,所以也不会发生线程安全问题,这样大大提高了队列的吞吐量。但是ArrayBlockingQueue内部只使用一把锁,生产者执行时,消费者也无法执行。
    • LinkedBlockingQueue基于链表实现,所以如果我们要对队列进行随机删除操作,将会非常高效;但是ArrayBlockingQueue基于数组实现,随机删除操作的消耗会很高,以为需要重整元素在数组中的位置。当然,队列是一个尾进头出的容器,所以在使用时还是不要进行随机删除操作。

      再说说它的劣势:

    • LinkedBlockingQueue中的lock对象使用的是非公平锁,无法根据需求切换为公平锁;而ArrayBlockingQueue可以根据实际情况,选择是否使用公平锁;

    2.7 LinkedBlockingQueue的误区

      这里再提一个很多人对LinkedBlockingQueue的误区。有许多人认为LinkedBlockingQueue既可以是一个有界队列,也可以是一个无界队列,这是一种错误的理解,LinkedBlockingQueue就是一个有界的阻塞队列。有人会这样认为的原因是创建LinkedBlockingQueue对象时可以不指定容量,但是不要忘记,如果我们不指定容量,底层实现也会指定一个默认的容量,即int的最大值。在我们向其中添加元素时,底层实现可以确保元素数量不会超过设置好的容量,这就是有界的。

      而什么是无界?举一个最简单的例子,Java中的另一个阻塞队列PriorityBlockingQueue就是无界的,它底层用数组存储元素,我们也可以指定容量,但是当元素个数到达容量时,PriorityBlockingQueue并不会阻塞,而是进行扩容,这就是无界。


    三、总结

      关于LinkedBlockingQueue就先说这么多吧,重点是要理解各种阻塞队列的结构,以及最关键的takeput方法实现的原理,只要理解了这些,就能有选择性地,更好地使用这些阻塞队列。


    四、参考

  • 相关阅读:
    CSV 文件的读写(函数)
    携带cookies请求github个人信息(类封装)
    网络和http协议理论
    思卡乐科技发布SR3系列RFID产品
    爱立信开始大规模mesh网络测试
    NB-IoT物联网,来了
    想象力是我们的局限,蓝牙5应用初探
    RFID电动车管理,智慧城市物联网建设的入口
    蓝牙模块选择经验谈
    RFID电动自行车与共享单车之物联网比较
  • 原文地址:https://www.cnblogs.com/tuyang1129/p/12683541.html
Copyright © 2011-2022 走看看