zoukankan      html  css  js  c++  java
  • LinkedBlockingQueue源码解析-Java8

    LinkedBlockingQueue基本原理介绍

      LinkedBlockingQueue是基于链表实现的阻塞队列,需要注意的是LinkedBlockingQueue是带头结点的队列(头结点不存真实数据,存的数据为null)。

      在LinkedBlockingQueue中,元素是按照先进先出的顺序(FIFO),但需要注意的是这个顺序并不是“线程入队和出队的顺序”,多线程在并发入队或者并发出队时,是非公平的,这是因为LinkedBlockingQueue中控制同步的有两把锁(takeLock和putLock),都是直接new ReentrantLock,默认的就是非公平锁。

      也就是说,线程A和线程B同时入栈元素,那么顺序是不一定的;线程A和线程B,谁能拿到队首元素,也是不一定的;

      FIFO是指,队列中x、y、z元素的顺序,出队的顺序也是x-> y -> z,但是谁拿到队首的x,这是不一定的。

      可以设置LinkedBlockingQueue的队列容量,如果不设置队列容量,那么默认的容量为Integer.MAX_VALUE。

      对于基于数组实现的阻塞队列ArrayBlockingQueue分析,可以参考https://www.cnblogs.com/-beyond/p/14407201.html;

      原文地址:https://www.cnblogs.com/-beyond/p/14407364.html

    链表节点类型

      LinkedBlockingQueue既然是基于链表的,那么就会涉及到链表的节点,在LinkedBlockingQueue中定义了Node类型,也就是链表节点类型,每个队列元素都是一个Node,item字段保存了队列元素的值。

    /**
     * 链表的节点类型
     */
    static class Node<E> {
        E item;
    
        /**
         * 指向下一个节点的指针
         */
        Node<E> next;
    
        Node(E x) {
            item = x;
        }
    }
    

      

    重要属性

    /**
     * 队列的容量,如果没有设置,则默认为Integer.MAX_VALUE
     */
    private final int capacity;
    
    /**
     * 队列中的元素个数
     */
    private final AtomicInteger count = new AtomicInteger();
    
    /**
     * 链表的头指针
     */
    transient Node<E> head;
    
    /**
     * 链表的尾指针
     */
    private transient Node<E> last;
    
    /**
     * 控制出队的锁(take、poll),使用的非公平锁
     */
    private final ReentrantLock takeLock = new ReentrantLock();
    
    /**
     * 控制入队的锁(offer、put),使用的非公平锁
     */
    private final ReentrantLock putLock = new ReentrantLock();
    
    /**
     * 出队关联的Condition
     */
    private final Condition notEmpty = takeLock.newCondition();
    
    /**
     * 入队关联的Condition
     */
    private final Condition notFull = putLock.newCondition();
    

      

    构造方法

    /**
     * 初始化LinkedBlockingQueue,设置默认的容量
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    
    /**
     * 初始化LinkedBlockingQueue,设置指定的容量
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        // 创建头结点,头尾指针都指向头结点
        last = head = new Node<E>(null);
    }
    
    /**
     * 初始化LinkedBlockingQueue,将传入的集合元素加入到队列中
     */
    public LinkedBlockingQueue(Collection<? extends E> c) {
        // 初始化,设置默认容量
        this(Integer.MAX_VALUE);
    
        // 加put锁
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            // 遍历集合元素,将元素依次入队(创建节点,并入队)
            int n = 0;
            for (E e : c) {
                // 如果元素为null,则抛出NPE
                if (e == null) {
                    throw new NullPointerException();
                }
    
                // 如果元素数量已经达到设置的容量,则抛出队列已满的异常
                if (n == capacity) {
                    throw new IllegalStateException("Queue full");
                }
    
                // 创建节点,并入对
                enqueue(new Node<E>(e));
    
                // 元素数量加1
                ++n;
            }
    
            // 设置元素数量
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

      

    元素入队

    非阻塞式入队

    非阻塞式入队,是指不管入队是否成功,都立即返回,不会发生阻塞。

    非阻塞式入队,是通过调用offer方法执行的。

    /**
     * 元素入队,不管是否入队成功,都立即返回
     */
    public boolean offer(E e) {
        // 如果入队元素为null,则抛出NPE
        if (e == null) {
            throw new NullPointerException();
        }
    
        // 获取元素数量
        final AtomicInteger count = this.count;
    
        // 判断队列是否已满,如果队列已满,则入队失败(返回false)
        if (count.get() == capacity) {
            return false;
        }
    
        int c = -1;
        // 创建队列元素节点
        Node<E> node = new Node<E>(e);
    
        // 获取put锁(加锁)
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            // 再次判断队列是否已满,如果队列未满,则进行入队操作
            if (count.get() < capacity) {
                // 入队操作
                enqueue(node);
    
                // 元素size+1,并将队列之前的size赋给c
                c = count.getAndIncrement();
    
                // 如果入队后,队列仍旧未满,那么就唤醒notFull
                if (c + 1 < capacity) {
                    notFull.signal();
                }
            }
        } finally {
            // 释放锁
            putLock.unlock();
        }
    
        // 如果入队失败,则c的值为-1;入队成功,如果入队前的元素数量为0,那么现在入队后,队列就不为空了,则进行唤醒notEmpty
        if (c == 0) {
            // 唤醒notEmpty
            signalNotEmpty();
        }
    
        // 返回入队是否成功
        return c >= 0;
    }
    
    /**
     * 当put或者offer完成入队(当队列不为空的时候),唤醒notEmpty(take阻塞的线程获取信号后接触阻塞)
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }
    

      

    阻塞式入队

    阻塞式入队,就是在入队失败时,尝试入队的线程发生阻塞,知道成功入队;

    这里分了两种,一种是指定超时时间的入队(offer);另一种是阻塞直到成功的入队(put);

    /**
     * 元素入队,并设置入队超时时间
     */
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        // 如果元素为null,则抛出NPE
        if (e == null) {
            throw new NullPointerException();
        }
    
        // 超时时间转换
        long nanos = unit.toNanos(timeout);
        int c = -1;
    
        // 获取put锁
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            // 判断队列是否已满
            // 如果队列已满,但是没有超时,那么线程就阻塞一段时间
            // 如果队列已满,但是已经超时,则返回false(入队失败)
            while (count.get() == capacity) {
                if (nanos <= 0) {
                    return false;
                }
    
                // 等待被唤醒,唤醒后继续循环
                nanos = notFull.awaitNanos(nanos);
            }
    
            // 队列未满,进行入队操作
            enqueue(new Node<E>(e));
    
            // 元素数量加1,并将旧的队列元素数量赋给c
            c = count.getAndIncrement();
    
            // 如果队列未满,则唤醒notFull
            if (c + 1 < capacity) {
                notFull.signal();
            }
        } finally {
            // 释放锁
            putLock.unlock();
        }
    
        // 如果入队失败,则c的值为-1;入队成功,如果入队前的元素数量为0,那么现在入队后,队列就不为空了,则进行唤醒notEmpty
        if (c == 0) {
            signalNotEmpty();
        }
        return true;
    }
    

      

    /**
     * 元素入队,如果入队失败,则会阻塞重试,直到成功
     */
    public void put(E e) throws InterruptedException {
        // 元素null判断
        if (e == null) {
            throw new NullPointerException();
        }
    
        // c用来记录旧的元素数量
        int c = -1;
    
        // 创建新节点
        Node<E> node = new Node<E>(e);
    
        // 获取锁
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            // 判断队列是否已满,如果队列已经满了,那么就调用notFull的wait阻塞,等待notFull的signal
            while (count.get() == capacity) {
                notFull.await();
            }
    
            // 队列未满,有空间可以入队,则进行入队操作
            enqueue(node);
    
            // 元素个数加1,并将入队前元素数量赋给c
            c = count.getAndIncrement();
    
            // 如果队列为满,则进行唤醒notFull
            if (c + 1 < capacity) {
                notFull.signal();
            }
        } finally {
            // 释放锁
            putLock.unlock();
        }
    
        // 如果c为0,表示入队前,队列为空,此时队列有新元素,则唤醒signal notEmpty
        if (c == 0) {
            signalNotEmpty();
        }
    }
    

      

    入队操作

    前面的offer、put操作,其中enqueue才是真正执行入队的方法;而对于链表实现的队列来说,入队比较简单,只需要将新节点挂在最后即可:

    /**
     * 入队,将新的队列元素节点挂到队列尾部
     *
     * @param node 新节点
     */
    private void enqueue(Node<E> node) {
        // 将新节点挂到尾结点的后面,并将尾结点指针指向新加入的节点
        last = last.next = node;
    }
    

      

    获取队首元素

    调用peek方法,可以获取队首元素,但队列元素并不需要出队

    /**
     * 获取队首元素(不出队)
     *
     * @return 队首元素
     */
    public E peek() {
        // 如果队列为空,则返回null
        if (count.get() == 0) {
            return null;
        }
    
        // 获取take锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            // 获取队首元素
            Node<E> first = head.next;
    
            // 如果队首元素为null,则返回null;否则返回队首元素的值
            if (first == null) {
                return null;
            } else {
                return first.item;
            }
        } finally {
            // 释放锁
            takeLock.unlock();
        }
    }
    

      

    元素出队

    元素出队也分为两种,阻塞式和非阻塞式的;

    非阻塞式出队

    /**
     * 元素出队,非阻塞式
     *
     * @return 队首元素
     */
    public E poll() {
        // 如果队列为空,则返回null
        final AtomicInteger count = this.count;
        if (count.get() == 0) {
            return null;
        }
    
        // x用来存放返回值
        E x = null;
    
        // c用来存放出队前的元素数量
        int c = -1;
    
        // 获取take锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            // 二次验证,如果队列不为空,则进行除对操作
            if (count.get() > 0) {
                // 出队
                x = dequeue();
    
                // 队列元素数量减1
                c = count.getAndDecrement();
    
                // 如果出队前的元素数量超过1个,那么就唤醒notEmpty
                if (c > 1) {
                    notEmpty.signal();
                }
            }
        } finally {
            // 释放锁
            takeLock.unlock();
        }
    
        // 如果出队前,队列元素数量为队列容量,那么此次出队后,队列就未满,则signalNotFull,通知可以进行入队操作了
        // 如果出队失败,那么c的值是初始值-1
        if (c == capacity) {
            signalNotFull();
        }
        
        // 返回出队的元素
        return x;
    }
    

      

    阻塞式出队

    /**
     * 元素出队,并且设置出队超时时间,若超时,则立即返回null
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        // x保存出队的元素,c保存出队前的元素数量
        E x = null;
        int c = -1;
    
        // 时间转换
        long nanos = unit.toNanos(timeout);
    
        // 获取take锁
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            // 如果队列未为空
            // 如果未超时,则阻塞,等待notEmpty的signal
            // 如果已超时,则返回null
            while (count.get() == 0) {
                if (nanos <= 0) {
                    return null;
                }
    
                // 阻塞等待
                nanos = notEmpty.awaitNanos(nanos);
            }
    
            // 出队
            x = dequeue();
    
            // 出队后,元素数量减1,并将之前的元素数量赋给c
            c = count.getAndDecrement();
    
            // 如果之前的元素数量大于1,那么本次出队后,队列仍不为空,则进行notEmpty的signal
            if (c > 1) {
                notEmpty.signal();
            }
        } finally {
            // 释放锁
            takeLock.unlock();
        }
    
        // 如果出队前的元素数量为队列容量,那么本次出队后,队列将是未满状态,则进行signalNotFull
        if (c == capacity) {
            signalNotFull();
        }
    
        // 返回出队的元素
        return x;
    }
    

      

    /**
     * 阻塞直到元素出队成功
     */
    public E take() throws InterruptedException {
        E x;
        int c = -1;
    
        // 获取锁
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            // 如果队列为空,则进行阻塞
            while (count.get() == 0) {
                notEmpty.await();
            }
    
            // 队列不为空,则进行出队
            x = dequeue();
    
            // 出队后,元素数量减1,并将出队前的元素数量赋值给c
            c = count.getAndDecrement();
    
            // 如果出队前的元素数量大于1,则本次出队后,队列肯定不为空,那么就唤醒notEmpty
            if (c > 1) {
                notEmpty.signal();
            }
        } finally {
            // 释放锁
            takeLock.unlock();
        }
    
        // 如果出队前的队列元素已经达到了队列容量,那么本次出队后,队列就有空余来存放新元素了,于是唤醒signalNotFull
        // 如果出队失败,那么c为初始值-1
        if (c == capacity) {
            signalNotFull();
        }
    
        // 返回出队元素
        return x;
    }
    

      

    出队操作

    上面的poll和take操作,执行出队操作的其实是dequeue方法,如下:

    /**
     * 元素出队(将首元素元素删除)
     *
     * @return 队首元素
     */
    private E dequeue() {
        // 头结点指针
        Node<E> h = head;
    
        // 头结点的next为队首元素(first)
        Node<E> first = h.next;
    
        // 头结点的下个节点指向自己,解除对队首元素的指向(帮助GC)
        h.next = h; // help GC
    
        // 将队首节点赋值给头结点指针
        head = first;
    
        // 获取队首节点的值
        E x = first.item;
    
        // 清空队首元素的值(置为null)
        first.item = null;
    
        // 返回队首元素的值
        return x;
    }
    

      

      原文地址:https://www.cnblogs.com/-beyond/p/14407364.html

    如需转载,请注明文章出处,谢谢!!!
  • 相关阅读:
    shell脚本简单调试
    计算机揭秘之:网络分类和性能分析
    centos 7 虚拟机忘记密码
    算法时间复杂度计算
    strace命令 linux下调试神器
    Linux下core dump (段错误)
    dmesg + addr2line查看堆栈错误位置
    镜像 开源网站
    C语言---链表(包括学习过程中的思想活动)
    The C compiler "/usr/bin/cc" is not able to compile a simple test program. 解决方法
  • 原文地址:https://www.cnblogs.com/-beyond/p/14407364.html
Copyright © 2011-2022 走看看