zoukankan      html  css  js  c++  java
  • 基于链表的有界阻塞队列 —— LinkedBlockingQueue

    前言

    上一节看了基于数据的有界阻塞队列 ArrayBlockingQueue 的源码,通过阅读源码了解到在 ArrayBlockingQueue 中入队列和出队列操作都是用了 ReentrantLock 来保证线程安全。下面咱们看另一种有界阻塞队列:LinkedBlockingQueue。

    介绍

    一个基于链接节点的,可选绑定的 BlockingQueue 阻塞队列。

    对元素 FIFO(先进先出)进行排序。队列的头部是已在队列中停留最长时间的元素。队列的尾部是最短时间出现在队列中的元素。将新元素插入队列的尾部,并检索队列操作获取队列开头的元素。

    基于连表的队列通常具有比基于数组的队列有更高的吞吐量,但是大多数并发应用程序中的可预测性较差。

    基本使用

    public class LinkedBlockingQueueTest {
    
        private static final LinkedBlockingQueue<String> QUEUE = new LinkedBlockingQueue<>(10);
    
        public static void main(String[] args) throws InterruptedException {
    
            // 入队列
            QUEUE.put("put 入队列, 队列满则会阻塞等待");
    
            QUEUE.add("add 入队列, 队列满则会抛出异常");
    
            QUEUE.offer("offer 入队列, 队列满会返回 false");
    
            // 出队列
            // 队列空返回 null
            String poll = QUEUE.poll();
    
            // 队列空会阻塞等待
            String take = QUEUE.take();
    
            // 仅仅看一下最早入队列的元素
            String peek = QUEUE.peek();
    
        }
    
    }
    

    问题疑问

    1. LinkedBlockingQueue 的实现原理是什么?
    2. LinkedBlockingQueue 和 ArrayBlockingQueue 的区别是什么?

    源码分析

    基本结构

    LinkedBlockingQueue-uml-Ma14n3

    参数介绍

    static class Node<E> {
        
        E item;
        /**
        * One of:
        * - 真正的后继节点
        * - 有值,表示后继者是head.next
        * - null,表示没有后继(这是最后一个节点)
        */
        Node<E> next;
    
        Node(E x) { item = x; }
    }
    

    首先在 LinkedBlockingQueue 中有一个静态内部类 Node 支持泛型,下面看下其他字段:

    /** 初始容量,如果没有,则为Integer.MAX_VALUE */
    private final int capacity;
    
    /** 当前元素数 */
    private final AtomicInteger count = new AtomicInteger();
    
    /**
    * 链表头
    * 不变的是: head.item == null
    */
    transient Node<E> head;
    
    /**
    * 链表尾
    * 不变的是: last.next == null
    */
    private transient Node<E> last;
    
    /** 执行 take, poll 等操作需要获取到 takeLock */
    private final ReentrantLock takeLock = new ReentrantLock();
    
    /** 等待执行 take 操作的线程,会放入这个条件队列 */
    private final Condition notEmpty = takeLock.newCondition();
    
    /** 执行 put, offer 等操作需要获取到 putLock */
    private final ReentrantLock putLock = new ReentrantLock();
    
    /** 等待执行 put 操作的线程,会被放入这个条件队列 */
    private final Condition notFull = putLock.newCondition();
    
    

    构造函数

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    
    // 创建时指定容量
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
    
    

    通过构造函数可以看出,在初始化 LinkedBlockingQueue 时,如果不传入容量则会默认指定 Integer.MAX_VALUE。

    添加元素

    add 方法是直接调用的父类 AbstractQueue 的方法,内部调用的 LinkedBlockingQueue 自己实现的 offer 方法

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
    

    主要阅读的还是 LinkedBlockingQueue 的 put 和 offer 方法:

    public void put(E e) throws InterruptedException {
        // 插入元
        if (e == null) throw new NullPointerException();
        // Note: 所有put / take / etc中的约定是预设本地变量
        // 保持计数为负表示失败,除非置位。
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
           
            // 如果已经到最大容量,则等待 
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            // 总数进行增加, 返回的是先前的容量
            c = count.getAndIncrement();
            // 判断是否需要唤醒入队列阻塞的线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            // 唤醒因调用 notEmpty 的 await 方法而被阻塞的线程
            signalNotEmpty();
    }
    
    public boolean offer(E e) {
        // 为空抛出异常
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        // 如果已经到最大容量,返回 false
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
    

    通过上面两段代码可以看出 put 和 offer 的最大区别在于是否阻塞。 put 方法当队列达到指定容量时,会阻塞,等待有元素出队列。而 offer 方法会直接返回 false。

    同时两个方法操作元素入队列都是调用的 enqueue(node) 方法,下面一起看下 enqueue 方法。

    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }
    

    在 enqueue 方法中,直接指定当前尾节点的 next 为传入的元素即可。

    获取元素

    public E poll() {
        final AtomicInteger count = this.count;
        // 队列为空返回 null
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        // 加锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                // 减少队列元素计数,返回的是旧值
                c = count.getAndDecrement();
                if (c > 1)
                // 旧值大于 1 ,就是当前大于 0
                // 唤醒调用 notEmpty.await 等待的线程
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            // 如果旧值等于 capacity 说明当前空了一个位置
            signalNotFull();
        return x;
    }
    
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            // 阻塞等待
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    

    通过上面代码可以看出 poll 和 take 方法逻辑大致相同。区别就是在当前队列为空时的处理逻辑。poll 在当前队列为空时返回 null,take 会阻塞等待,知道当前队列中有元素。

    poll 和 take 都试用 dequeue() 方法从队列中获取元素。

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }
    

    dequeue() 方法逻辑就是获取头节点,并将 head 指向下一个节点。

    查看元素

    public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }
    

    peek() 方法比较简单,直接获取 head 的元素值即可。

    总结

    Q&A

    Q: LinkedBlockingQueue 的实现原理?

    A: LinkedBlockingQueue 是基于链表实现的,内部使用 ReentrantLock 互斥锁,防止并发放置元素或者取出元素的冲突问题。

    1. take、poll、peek 等从队列中获取元素的操作共用 takeLock 锁。
    2. add、put、offer 等向队列中添加元素的操作共同 putLock 锁。
    3. notEmpty 和 notFull 是 Condition 类型,在 take 和 put 操作时,如果如果队列为空或者队列已满,会调用相应的 await 将线程放入条件队列。

    Q: 入队列和出队列方法之间的区别是什么?

    方法 作用
    add 添加元素,队列满了,添加失败抛出异常
    offer 添加元素, 队列满了,添加失败,返回 false
    put 添加元素,队列满了,阻塞等待
    poll 弹出元素,队列为空则返回 null
    take 弹出元素,队列为空则等待队列中有元素
    peek 查看队列中放入最早的一个元素

    结束语

    LinkedBlockingQueue 使用和 ArrayBlockingQueue 并没有什么区别,内部实现都是使用的 ReentrantLock,可以对照着阅读。同时 Condition 这块也需要着重了解一下。

  • 相关阅读:
    api服务器思路
    利用express写api接口
    sql基础语句
    安装npm后,nrm ls 报错internal/validators.js:124 throw new ERR_INVALID_ARG_TYPE(name, ‘string‘, value)
    JavaScript垃圾回收机制和闭包
    用git clone 远程的所有分支
    面试题重点
    Linux 常用命令
    防抖函数和节流函数
    GIT PUSH 出现EVERYTHING UP-TO-DATE 解决方法
  • 原文地址:https://www.cnblogs.com/liuzhihang/p/LinkedBlockingQueue.html
Copyright © 2011-2022 走看看