在多线程开发中,线程池是个利器,可以帮助我们管理线程和复用线程。而在线程池中,用来保存线程和任务的数据结构就是队列,如newFixedThreadPool
和newSingleThreadExecutor
这两个线程池使用的LinkedBlockingQueue
队列,newCachedThreadPool
使用的是SynchronousQueue
。本文重点讲解一下LinkedBlockingQueue
。
LinkedBlocingQueue
的继承关系如下所示:
特点
- 基于链表的阻塞队列,底层数据结构为链表;
FIFO
,新元素被放在队尾,获取元素从队首拿;- 链表大小在初始化时可以设置,不设置的话默认为
Integer.MAX_VALUE
; - 既支持集合的增删改查,又支持队列的增删改查,同时兼具阻塞的特性;
成员变量
//容量
private final int capacity;
//元素数量 AtomicInteger类型 线程安全
private final AtomicInteger count = new AtomicInteger();
//头节点
transient Node<E> head;
//尾节点
private transient Node<E> last;
// take poll 操作的锁
private final ReentrantLock takeLock = new ReentrantLock();
// take 的条件队列,condition 可以简单理解为基于 ASQ 同步机制建立的条件队列
private final Condition notEmpty = takeLock.newCondition();
// put 时的锁,设计两把锁的目的,主要为了 take 和 put 可以同时进行
private final ReentrantLock putLock = new ReentrantLock();
// put 的条件队列
private final Condition notFull = putLock.newCondition();
//节点数据结构
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
这里需要注意的是,有两把锁,takeLock
和putLock
,主要是为了可以同时支持两种操作,互不影响,实现线程安全。
构造方法
//无参构造时,默认 Integer 的最大值
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//这里可以看出,初始化时 首尾两个节点为 null
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
// 已有集合数据进行初始化
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // 加锁
try {
int n = 0;
for (E e : c) {
// 传入的集合c中 元素不能为null 否则报错
if (e == null)
throw new NullPointerException();
// capacity 代表链表的大小,在这里是 Integer 的最大值
// 如果集合类的大小大于 Integer 的最大值,就会报错
// 其实这个判断完全可以放在 for 循环外面,这样可以减少 Integer 的最大值次循环(最坏情况)
if (n == capacity)
throw new IllegalStateException("Queue full");
//放入队列
enqueue(new Node<E>(e));
//更新 n
++n;
}
//设置元素个数
count.set(n);
} finally {
putLock.unlock();//解锁
}
}
//加入队列中
private void enqueue(Node<E> node) {
//last.next=node
//last=last.next
last = last.next = node;
}
常用方法
put(E e)
// 阻塞放数据
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//空元素直接抛出异常
if (e == null) throw new NullPointerException();
// 预先设置 c 为 -1,约定负数为新增失败
int c = -1;
//put锁
final ReentrantLock putLock = this.putLock;
//计数器
final AtomicInteger count = this.count;
// 设置可中断锁
putLock.lockInterruptibly();
try {
// 队列满了
// 当前线程阻塞,等待其他线程的唤醒(其他线程 take 成功后就会唤醒此处被阻塞的线程)
while (count.get() == capacity) {
//调用notFull的await方法,等待唤醒
notFull.awaitNanos(nanos);
}
// 队列没有满,直接新增到队列的尾部
enqueue(new Node<E>(e));
// 新增计数赋值,注意这里 getAndIncrement 返回的是旧值
// 这里的 c 是比真实的 count 小 1
c = count.getAndIncrement();
// 如果链表现在的大小 小于链表的容量,说明队列未满 此时可以继续放入数据
// 可以尝试唤醒一个 put 的等待线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();//释放锁
}
// c==0,代表队列里面有一个元素
// 一开始设定c=-1 如果有一个元素 count.getAndIncrement()=1,c+1=0
// 会尝试唤醒一个take的等待线程
if (c == 0)
signalNotEmpty();
return true;
}
//唤醒等待队列
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
总结一下流程如下:
- 判断元素是否为空,空直接抛出异常,否则继续;
- 加锁,保证线程安全;
- 新增时,如果队列满了,当前线程被阻塞,阻塞使用了锁来完成;
- 新增成功之后,在适当时机,会唤起
put
的等待线程(队列不满时),或者take
的等待线程(队列不为空时),这样保证队列一旦满足put
或者take
条件时,立马就能唤起阻塞线程,继续运行,保证了唤起的时机不被浪费;
offer(E e) && offer(E e, TimeUnit unit)
这两个方法与put
方法非常相似,下面附上源码,自行分析一下即可。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
add(E e)
public boolean add(E e) {
//添加成功
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
take()
// 阻塞拿数据
public E take() throws InterruptedException {
E x;
// 默认负数,代表失败
int c = -1;
// count 代表当前链表数据的真实大小
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//可中断锁
takeLock.lockInterruptibly();
try {
// 空队列时,阻塞,等待其他线程唤醒
while (count.get() == 0) {
notEmpty.await();
}
// 非空队列,从队列的头部拿一个出来
x = dequeue();
// 减一计算,注意 getAndDecrement 返回的值是旧值
// c 比真实的 count 大1
c = count.getAndDecrement();
// 如果队列里面有值,从 take 的等待线程里面唤醒一个 此时可以继续弹出元素
// 意思是队列里面有值啦,唤醒之前被阻塞的线程
// c>1 又因为c-1=count 即 count>0
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();//解锁
}
// 如果队列空闲还剩下一个,尝试从 put 的等待线程中唤醒一个
//count-1=c==capacity
if (c == capacity)
signalNotFull();
//返回值
return x;
}
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // 就是 he.next=null
head = first;
E x = first.item;
first.item = null;
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
整体和put
的流程相似,都是先加锁,然后从队列头部拿数据,如果队列为空,会一直阻塞到队里有值为止。
poll() && poll(TimeUnit unit)
这两个方法跟take
很像,自行分析一下即可。
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
remove
public E remove() {
//获取队尾元素
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
peek()
//获取队首元素
public E peek() {
//队列为空的话直接返回null
if (count.get() == 0)
return null;
//take锁
final ReentrantLock takeLock = this.takeLock;
//加锁
takeLock.lock();
try {
//获取链表的第一个元素
Node<E> first = head.next;
//为空的话直接返回null
if (first == null)
return null;
else//否则返回该节点的值
return first.item;
} finally {
takeLock.unlock();
}
}
element
public E element() {
//获取队首元素
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
总结
LinkedBlockingQueue
的操作总结如下所示,在开发过程根据具体情况选择合适的方法。