LinkedBlockingQueue基本原理介绍
LinkedBlockingQueue是基于链表实现的阻塞队列,需要注意的是LinkedBlockingQueue是带头结点的队列(头结点不存真实数据,存的数据为null)。
在LinkedBlockingQueue中,元素是按照先进先出的顺序(FIFO),但需要注意的是这个顺序并不是“线程入队和出队的顺序”,多线程在并发入队或者并发出队时,是非公平的,这是因为LinkedBlockingQueue中控制同步的有两把锁(takeLock和putLock),都是直接new ReentrantLock,默认的就是非公平锁。
也就是说,线程A和线程B同时入栈元素,那么顺序是不一定的;线程A和线程B,谁能拿到队首元素,也是不一定的;
FIFO是指,队列中x、y、z元素的顺序,出队的顺序也是x-> y -> z,但是谁拿到队首的x,这是不一定的。
可以设置LinkedBlockingQueue的队列容量,如果不设置队列容量,那么默认的容量为Integer.MAX_VALUE。
对于基于数组实现的阻塞队列ArrayBlockingQueue分析,可以参考https://www.cnblogs.com/-beyond/p/14407201.html;
原文地址:https://www.cnblogs.com/-beyond/p/14407364.html
链表节点类型
LinkedBlockingQueue既然是基于链表的,那么就会涉及到链表的节点,在LinkedBlockingQueue中定义了Node类型,也就是链表节点类型,每个队列元素都是一个Node,item字段保存了队列元素的值。
/** * 链表的节点类型 */ static class Node<E> { E item; /** * 指向下一个节点的指针 */ Node<E> next; Node(E x) { item = x; } }
重要属性
/** * 队列的容量,如果没有设置,则默认为Integer.MAX_VALUE */ private final int capacity; /** * 队列中的元素个数 */ private final AtomicInteger count = new AtomicInteger(); /** * 链表的头指针 */ transient Node<E> head; /** * 链表的尾指针 */ private transient Node<E> last; /** * 控制出队的锁(take、poll),使用的非公平锁 */ private final ReentrantLock takeLock = new ReentrantLock(); /** * 控制入队的锁(offer、put),使用的非公平锁 */ private final ReentrantLock putLock = new ReentrantLock(); /** * 出队关联的Condition */ private final Condition notEmpty = takeLock.newCondition(); /** * 入队关联的Condition */ private final Condition notFull = putLock.newCondition();
构造方法
/** * 初始化LinkedBlockingQueue,设置默认的容量 */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } /** * 初始化LinkedBlockingQueue,设置指定的容量 */ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; // 创建头结点,头尾指针都指向头结点 last = head = new Node<E>(null); } /** * 初始化LinkedBlockingQueue,将传入的集合元素加入到队列中 */ public LinkedBlockingQueue(Collection<? extends E> c) { // 初始化,设置默认容量 this(Integer.MAX_VALUE); // 加put锁 final ReentrantLock putLock = this.putLock; putLock.lock(); try { // 遍历集合元素,将元素依次入队(创建节点,并入队) int n = 0; for (E e : c) { // 如果元素为null,则抛出NPE if (e == null) { throw new NullPointerException(); } // 如果元素数量已经达到设置的容量,则抛出队列已满的异常 if (n == capacity) { throw new IllegalStateException("Queue full"); } // 创建节点,并入对 enqueue(new Node<E>(e)); // 元素数量加1 ++n; } // 设置元素数量 count.set(n); } finally { putLock.unlock(); } }
元素入队
非阻塞式入队
非阻塞式入队,是指不管入队是否成功,都立即返回,不会发生阻塞。
非阻塞式入队,是通过调用offer方法执行的。
/** * 元素入队,不管是否入队成功,都立即返回 */ public boolean offer(E e) { // 如果入队元素为null,则抛出NPE if (e == null) { throw new NullPointerException(); } // 获取元素数量 final AtomicInteger count = this.count; // 判断队列是否已满,如果队列已满,则入队失败(返回false) if (count.get() == capacity) { return false; } int c = -1; // 创建队列元素节点 Node<E> node = new Node<E>(e); // 获取put锁(加锁) final ReentrantLock putLock = this.putLock; putLock.lock(); try { // 再次判断队列是否已满,如果队列未满,则进行入队操作 if (count.get() < capacity) { // 入队操作 enqueue(node); // 元素size+1,并将队列之前的size赋给c c = count.getAndIncrement(); // 如果入队后,队列仍旧未满,那么就唤醒notFull if (c + 1 < capacity) { notFull.signal(); } } } finally { // 释放锁 putLock.unlock(); } // 如果入队失败,则c的值为-1;入队成功,如果入队前的元素数量为0,那么现在入队后,队列就不为空了,则进行唤醒notEmpty if (c == 0) { // 唤醒notEmpty signalNotEmpty(); } // 返回入队是否成功 return c >= 0; } /** * 当put或者offer完成入队(当队列不为空的时候),唤醒notEmpty(take阻塞的线程获取信号后接触阻塞) */ private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
阻塞式入队
阻塞式入队,就是在入队失败时,尝试入队的线程发生阻塞,知道成功入队;
这里分了两种,一种是指定超时时间的入队(offer);另一种是阻塞直到成功的入队(put);
/** * 元素入队,并设置入队超时时间 */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { // 如果元素为null,则抛出NPE if (e == null) { throw new NullPointerException(); } // 超时时间转换 long nanos = unit.toNanos(timeout); int c = -1; // 获取put锁 final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { // 判断队列是否已满 // 如果队列已满,但是没有超时,那么线程就阻塞一段时间 // 如果队列已满,但是已经超时,则返回false(入队失败) while (count.get() == capacity) { if (nanos <= 0) { return false; } // 等待被唤醒,唤醒后继续循环 nanos = notFull.awaitNanos(nanos); } // 队列未满,进行入队操作 enqueue(new Node<E>(e)); // 元素数量加1,并将旧的队列元素数量赋给c c = count.getAndIncrement(); // 如果队列未满,则唤醒notFull if (c + 1 < capacity) { notFull.signal(); } } finally { // 释放锁 putLock.unlock(); } // 如果入队失败,则c的值为-1;入队成功,如果入队前的元素数量为0,那么现在入队后,队列就不为空了,则进行唤醒notEmpty if (c == 0) { signalNotEmpty(); } return true; }
/** * 元素入队,如果入队失败,则会阻塞重试,直到成功 */ public void put(E e) throws InterruptedException { // 元素null判断 if (e == null) { throw new NullPointerException(); } // c用来记录旧的元素数量 int c = -1; // 创建新节点 Node<E> node = new Node<E>(e); // 获取锁 final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { // 判断队列是否已满,如果队列已经满了,那么就调用notFull的wait阻塞,等待notFull的signal while (count.get() == capacity) { notFull.await(); } // 队列未满,有空间可以入队,则进行入队操作 enqueue(node); // 元素个数加1,并将入队前元素数量赋给c c = count.getAndIncrement(); // 如果队列为满,则进行唤醒notFull if (c + 1 < capacity) { notFull.signal(); } } finally { // 释放锁 putLock.unlock(); } // 如果c为0,表示入队前,队列为空,此时队列有新元素,则唤醒signal notEmpty if (c == 0) { signalNotEmpty(); } }
入队操作
前面的offer、put操作,其中enqueue才是真正执行入队的方法;而对于链表实现的队列来说,入队比较简单,只需要将新节点挂在最后即可:
/** * 入队,将新的队列元素节点挂到队列尾部 * * @param node 新节点 */ private void enqueue(Node<E> node) { // 将新节点挂到尾结点的后面,并将尾结点指针指向新加入的节点 last = last.next = node; }
获取队首元素
调用peek方法,可以获取队首元素,但队列元素并不需要出队
/** * 获取队首元素(不出队) * * @return 队首元素 */ public E peek() { // 如果队列为空,则返回null if (count.get() == 0) { return null; } // 获取take锁 final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { // 获取队首元素 Node<E> first = head.next; // 如果队首元素为null,则返回null;否则返回队首元素的值 if (first == null) { return null; } else { return first.item; } } finally { // 释放锁 takeLock.unlock(); } }
元素出队
元素出队也分为两种,阻塞式和非阻塞式的;
非阻塞式出队
/** * 元素出队,非阻塞式 * * @return 队首元素 */ public E poll() { // 如果队列为空,则返回null final AtomicInteger count = this.count; if (count.get() == 0) { return null; } // x用来存放返回值 E x = null; // c用来存放出队前的元素数量 int c = -1; // 获取take锁 final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { // 二次验证,如果队列不为空,则进行除对操作 if (count.get() > 0) { // 出队 x = dequeue(); // 队列元素数量减1 c = count.getAndDecrement(); // 如果出队前的元素数量超过1个,那么就唤醒notEmpty if (c > 1) { notEmpty.signal(); } } } finally { // 释放锁 takeLock.unlock(); } // 如果出队前,队列元素数量为队列容量,那么此次出队后,队列就未满,则signalNotFull,通知可以进行入队操作了 // 如果出队失败,那么c的值是初始值-1 if (c == capacity) { signalNotFull(); } // 返回出队的元素 return x; }
阻塞式出队
/** * 元素出队,并且设置出队超时时间,若超时,则立即返回null */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { // x保存出队的元素,c保存出队前的元素数量 E x = null; int c = -1; // 时间转换 long nanos = unit.toNanos(timeout); // 获取take锁 final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 如果队列未为空 // 如果未超时,则阻塞,等待notEmpty的signal // 如果已超时,则返回null while (count.get() == 0) { if (nanos <= 0) { return null; } // 阻塞等待 nanos = notEmpty.awaitNanos(nanos); } // 出队 x = dequeue(); // 出队后,元素数量减1,并将之前的元素数量赋给c c = count.getAndDecrement(); // 如果之前的元素数量大于1,那么本次出队后,队列仍不为空,则进行notEmpty的signal if (c > 1) { notEmpty.signal(); } } finally { // 释放锁 takeLock.unlock(); } // 如果出队前的元素数量为队列容量,那么本次出队后,队列将是未满状态,则进行signalNotFull if (c == capacity) { signalNotFull(); } // 返回出队的元素 return x; }
/** * 阻塞直到元素出队成功 */ public E take() throws InterruptedException { E x; int c = -1; // 获取锁 final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 如果队列为空,则进行阻塞 while (count.get() == 0) { notEmpty.await(); } // 队列不为空,则进行出队 x = dequeue(); // 出队后,元素数量减1,并将出队前的元素数量赋值给c c = count.getAndDecrement(); // 如果出队前的元素数量大于1,则本次出队后,队列肯定不为空,那么就唤醒notEmpty if (c > 1) { notEmpty.signal(); } } finally { // 释放锁 takeLock.unlock(); } // 如果出队前的队列元素已经达到了队列容量,那么本次出队后,队列就有空余来存放新元素了,于是唤醒signalNotFull // 如果出队失败,那么c为初始值-1 if (c == capacity) { signalNotFull(); } // 返回出队元素 return x; }
出队操作
上面的poll和take操作,执行出队操作的其实是dequeue方法,如下:
/** * 元素出队(将首元素元素删除) * * @return 队首元素 */ private E dequeue() { // 头结点指针 Node<E> h = head; // 头结点的next为队首元素(first) Node<E> first = h.next; // 头结点的下个节点指向自己,解除对队首元素的指向(帮助GC) h.next = h; // help GC // 将队首节点赋值给头结点指针 head = first; // 获取队首节点的值 E x = first.item; // 清空队首元素的值(置为null) first.item = null; // 返回队首元素的值 return x; }
原文地址:https://www.cnblogs.com/-beyond/p/14407364.html