LinkedBlockingQueue基本原理介绍
LinkedBlockingQueue是基于链表实现的阻塞队列,需要注意的是LinkedBlockingQueue是带头结点的队列(头结点不存真实数据,存的数据为null)。
在LinkedBlockingQueue中,元素是按照先进先出的顺序(FIFO),但需要注意的是这个顺序并不是“线程入队和出队的顺序”,多线程在并发入队或者并发出队时,是非公平的,这是因为LinkedBlockingQueue中控制同步的有两把锁(takeLock和putLock),都是直接new ReentrantLock,默认的就是非公平锁。
也就是说,线程A和线程B同时入栈元素,那么顺序是不一定的;线程A和线程B,谁能拿到队首元素,也是不一定的;
FIFO是指,队列中x、y、z元素的顺序,出队的顺序也是x-> y -> z,但是谁拿到队首的x,这是不一定的。
可以设置LinkedBlockingQueue的队列容量,如果不设置队列容量,那么默认的容量为Integer.MAX_VALUE。
对于基于数组实现的阻塞队列ArrayBlockingQueue分析,可以参考https://www.cnblogs.com/-beyond/p/14407201.html;
原文地址:https://www.cnblogs.com/-beyond/p/14407364.html
链表节点类型
LinkedBlockingQueue既然是基于链表的,那么就会涉及到链表的节点,在LinkedBlockingQueue中定义了Node类型,也就是链表节点类型,每个队列元素都是一个Node,item字段保存了队列元素的值。
/**
* 链表的节点类型
*/
static class Node<E> {
E item;
/**
* 指向下一个节点的指针
*/
Node<E> next;
Node(E x) {
item = x;
}
}
重要属性
/** * 队列的容量,如果没有设置,则默认为Integer.MAX_VALUE */ private final int capacity; /** * 队列中的元素个数 */ private final AtomicInteger count = new AtomicInteger(); /** * 链表的头指针 */ transient Node<E> head; /** * 链表的尾指针 */ private transient Node<E> last; /** * 控制出队的锁(take、poll),使用的非公平锁 */ private final ReentrantLock takeLock = new ReentrantLock(); /** * 控制入队的锁(offer、put),使用的非公平锁 */ private final ReentrantLock putLock = new ReentrantLock(); /** * 出队关联的Condition */ private final Condition notEmpty = takeLock.newCondition(); /** * 入队关联的Condition */ private final Condition notFull = putLock.newCondition();
构造方法
/**
* 初始化LinkedBlockingQueue,设置默认的容量
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* 初始化LinkedBlockingQueue,设置指定的容量
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 创建头结点,头尾指针都指向头结点
last = head = new Node<E>(null);
}
/**
* 初始化LinkedBlockingQueue,将传入的集合元素加入到队列中
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
// 初始化,设置默认容量
this(Integer.MAX_VALUE);
// 加put锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 遍历集合元素,将元素依次入队(创建节点,并入队)
int n = 0;
for (E e : c) {
// 如果元素为null,则抛出NPE
if (e == null) {
throw new NullPointerException();
}
// 如果元素数量已经达到设置的容量,则抛出队列已满的异常
if (n == capacity) {
throw new IllegalStateException("Queue full");
}
// 创建节点,并入对
enqueue(new Node<E>(e));
// 元素数量加1
++n;
}
// 设置元素数量
count.set(n);
} finally {
putLock.unlock();
}
}
元素入队
非阻塞式入队
非阻塞式入队,是指不管入队是否成功,都立即返回,不会发生阻塞。
非阻塞式入队,是通过调用offer方法执行的。
/**
* 元素入队,不管是否入队成功,都立即返回
*/
public boolean offer(E e) {
// 如果入队元素为null,则抛出NPE
if (e == null) {
throw new NullPointerException();
}
// 获取元素数量
final AtomicInteger count = this.count;
// 判断队列是否已满,如果队列已满,则入队失败(返回false)
if (count.get() == capacity) {
return false;
}
int c = -1;
// 创建队列元素节点
Node<E> node = new Node<E>(e);
// 获取put锁(加锁)
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 再次判断队列是否已满,如果队列未满,则进行入队操作
if (count.get() < capacity) {
// 入队操作
enqueue(node);
// 元素size+1,并将队列之前的size赋给c
c = count.getAndIncrement();
// 如果入队后,队列仍旧未满,那么就唤醒notFull
if (c + 1 < capacity) {
notFull.signal();
}
}
} finally {
// 释放锁
putLock.unlock();
}
// 如果入队失败,则c的值为-1;入队成功,如果入队前的元素数量为0,那么现在入队后,队列就不为空了,则进行唤醒notEmpty
if (c == 0) {
// 唤醒notEmpty
signalNotEmpty();
}
// 返回入队是否成功
return c >= 0;
}
/**
* 当put或者offer完成入队(当队列不为空的时候),唤醒notEmpty(take阻塞的线程获取信号后接触阻塞)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
阻塞式入队
阻塞式入队,就是在入队失败时,尝试入队的线程发生阻塞,知道成功入队;
这里分了两种,一种是指定超时时间的入队(offer);另一种是阻塞直到成功的入队(put);
/**
* 元素入队,并设置入队超时时间
*/
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 如果元素为null,则抛出NPE
if (e == null) {
throw new NullPointerException();
}
// 超时时间转换
long nanos = unit.toNanos(timeout);
int c = -1;
// 获取put锁
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 判断队列是否已满
// 如果队列已满,但是没有超时,那么线程就阻塞一段时间
// 如果队列已满,但是已经超时,则返回false(入队失败)
while (count.get() == capacity) {
if (nanos <= 0) {
return false;
}
// 等待被唤醒,唤醒后继续循环
nanos = notFull.awaitNanos(nanos);
}
// 队列未满,进行入队操作
enqueue(new Node<E>(e));
// 元素数量加1,并将旧的队列元素数量赋给c
c = count.getAndIncrement();
// 如果队列未满,则唤醒notFull
if (c + 1 < capacity) {
notFull.signal();
}
} finally {
// 释放锁
putLock.unlock();
}
// 如果入队失败,则c的值为-1;入队成功,如果入队前的元素数量为0,那么现在入队后,队列就不为空了,则进行唤醒notEmpty
if (c == 0) {
signalNotEmpty();
}
return true;
}
/**
* 元素入队,如果入队失败,则会阻塞重试,直到成功
*/
public void put(E e) throws InterruptedException {
// 元素null判断
if (e == null) {
throw new NullPointerException();
}
// c用来记录旧的元素数量
int c = -1;
// 创建新节点
Node<E> node = new Node<E>(e);
// 获取锁
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 判断队列是否已满,如果队列已经满了,那么就调用notFull的wait阻塞,等待notFull的signal
while (count.get() == capacity) {
notFull.await();
}
// 队列未满,有空间可以入队,则进行入队操作
enqueue(node);
// 元素个数加1,并将入队前元素数量赋给c
c = count.getAndIncrement();
// 如果队列为满,则进行唤醒notFull
if (c + 1 < capacity) {
notFull.signal();
}
} finally {
// 释放锁
putLock.unlock();
}
// 如果c为0,表示入队前,队列为空,此时队列有新元素,则唤醒signal notEmpty
if (c == 0) {
signalNotEmpty();
}
}
入队操作
前面的offer、put操作,其中enqueue才是真正执行入队的方法;而对于链表实现的队列来说,入队比较简单,只需要将新节点挂在最后即可:
/**
* 入队,将新的队列元素节点挂到队列尾部
*
* @param node 新节点
*/
private void enqueue(Node<E> node) {
// 将新节点挂到尾结点的后面,并将尾结点指针指向新加入的节点
last = last.next = node;
}
获取队首元素
调用peek方法,可以获取队首元素,但队列元素并不需要出队
/**
* 获取队首元素(不出队)
*
* @return 队首元素
*/
public E peek() {
// 如果队列为空,则返回null
if (count.get() == 0) {
return null;
}
// 获取take锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 获取队首元素
Node<E> first = head.next;
// 如果队首元素为null,则返回null;否则返回队首元素的值
if (first == null) {
return null;
} else {
return first.item;
}
} finally {
// 释放锁
takeLock.unlock();
}
}
元素出队
元素出队也分为两种,阻塞式和非阻塞式的;
非阻塞式出队
/**
* 元素出队,非阻塞式
*
* @return 队首元素
*/
public E poll() {
// 如果队列为空,则返回null
final AtomicInteger count = this.count;
if (count.get() == 0) {
return null;
}
// x用来存放返回值
E x = null;
// c用来存放出队前的元素数量
int c = -1;
// 获取take锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 二次验证,如果队列不为空,则进行除对操作
if (count.get() > 0) {
// 出队
x = dequeue();
// 队列元素数量减1
c = count.getAndDecrement();
// 如果出队前的元素数量超过1个,那么就唤醒notEmpty
if (c > 1) {
notEmpty.signal();
}
}
} finally {
// 释放锁
takeLock.unlock();
}
// 如果出队前,队列元素数量为队列容量,那么此次出队后,队列就未满,则signalNotFull,通知可以进行入队操作了
// 如果出队失败,那么c的值是初始值-1
if (c == capacity) {
signalNotFull();
}
// 返回出队的元素
return x;
}
阻塞式出队
/**
* 元素出队,并且设置出队超时时间,若超时,则立即返回null
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// x保存出队的元素,c保存出队前的元素数量
E x = null;
int c = -1;
// 时间转换
long nanos = unit.toNanos(timeout);
// 获取take锁
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 如果队列未为空
// 如果未超时,则阻塞,等待notEmpty的signal
// 如果已超时,则返回null
while (count.get() == 0) {
if (nanos <= 0) {
return null;
}
// 阻塞等待
nanos = notEmpty.awaitNanos(nanos);
}
// 出队
x = dequeue();
// 出队后,元素数量减1,并将之前的元素数量赋给c
c = count.getAndDecrement();
// 如果之前的元素数量大于1,那么本次出队后,队列仍不为空,则进行notEmpty的signal
if (c > 1) {
notEmpty.signal();
}
} finally {
// 释放锁
takeLock.unlock();
}
// 如果出队前的元素数量为队列容量,那么本次出队后,队列将是未满状态,则进行signalNotFull
if (c == capacity) {
signalNotFull();
}
// 返回出队的元素
return x;
}
/**
* 阻塞直到元素出队成功
*/
public E take() throws InterruptedException {
E x;
int c = -1;
// 获取锁
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 如果队列为空,则进行阻塞
while (count.get() == 0) {
notEmpty.await();
}
// 队列不为空,则进行出队
x = dequeue();
// 出队后,元素数量减1,并将出队前的元素数量赋值给c
c = count.getAndDecrement();
// 如果出队前的元素数量大于1,则本次出队后,队列肯定不为空,那么就唤醒notEmpty
if (c > 1) {
notEmpty.signal();
}
} finally {
// 释放锁
takeLock.unlock();
}
// 如果出队前的队列元素已经达到了队列容量,那么本次出队后,队列就有空余来存放新元素了,于是唤醒signalNotFull
// 如果出队失败,那么c为初始值-1
if (c == capacity) {
signalNotFull();
}
// 返回出队元素
return x;
}
出队操作
上面的poll和take操作,执行出队操作的其实是dequeue方法,如下:
/**
* 元素出队(将首元素元素删除)
*
* @return 队首元素
*/
private E dequeue() {
// 头结点指针
Node<E> h = head;
// 头结点的next为队首元素(first)
Node<E> first = h.next;
// 头结点的下个节点指向自己,解除对队首元素的指向(帮助GC)
h.next = h; // help GC
// 将队首节点赋值给头结点指针
head = first;
// 获取队首节点的值
E x = first.item;
// 清空队首元素的值(置为null)
first.item = null;
// 返回队首元素的值
return x;
}
原文地址:https://www.cnblogs.com/-beyond/p/14407364.html