一、ArrayBlockingQueue类图结构
如图ArrayBlockingQueue内部有个数组items用来存放队列元素,putindex下标标示入队元素下标,takeIndex是出队下标,count统计队列元素个数,
从定义可知道并没有使用volatile修饰,这是因为访问这些变量使用都是在锁块内,并不存在可见性问题。
另外有个独占锁lock用来对出入队操作加锁,这导致同时只有一个线程可以访问入队出队,另外notEmpty,notFull条件变量用来进行出入队的同步
另外构造函数必须传入队列大小参数,所以为有界队列,默认是Lock为非公平锁。
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
二、offer操作
在队尾插入元素,如果队列满则返回false,否者入队返回true。
public boolean offer(E e) { //e为null,则抛出NullPointerException异常 checkNotNull(e); //获取独占锁 final ReentrantLock lock = this.lock; lock.lock(); try { //如果队列满则返回false if (count == items.length) return false; else { //否者插入元素 insert(e); return true; } } finally { //释放锁 lock.unlock(); } } private void insert(E x) { //元素入队 items[putIndex] = x; //计算下一个元素应该存放的下标 putIndex = inc(putIndex); ++count; notEmpty.signal(); } //循环队列,计算下标 final int inc(int i) { return (++i == items.length) ? 0 : i; }
这里由于在操作共享变量前加了锁,所以不存在内存不可见问题,加过锁后获取的共享变量都是从主内存获取的,
而不是在CPU缓存或者寄存器里面的值,释放锁后修改的共享变量值会刷新会主内存中。
另外这个队列是使用循环数组实现,所以计算下一个元素存放下标时候有些特殊。
另外insert后调用 notEmpty.signal();是为了激活调用notEmpty.await()阻塞后放入notEmpty条件队列中的线程。
三、put操作
在队列尾部添加元素,如果队列满则等待队列有空位置插入后返回
checkNotNull(e); final ReentrantLock lock = this.lock; //获取可被中断锁 lock.lockInterruptibly(); try { //如果队列满,则把当前线程放入notFull管理的条件队列 while (count == items.length) notFull.await(); //插入元素 insert(e); } finally { lock.unlock(); } }
需要注意的是如果队列满了那么当前线程会阻塞,知道出队操作调用了notFull.signal方法激活该线程。
代码逻辑很简单,但是这里需要思考一个问题为啥调用lockInterruptibly方法而不是Lock方法。
我的理解是因为调用了条件变量的await()方法,而await()方法会在中断标志设置后抛出InterruptedException异常后退出,
所以还不如在加锁时候先看中断标志是不是被设置了,如果设置了直接抛出InterruptedException异常,就不用再去获取锁了。
然后看了其他并发类里面凡是调用了await的方法获取锁时候都是使用的lockInterruptibly方法而不是Lock也验证了这个想法。
四、poll操作
从队头获取并移除元素,队列为空,则返回null。
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { //当前队列为空则返回null,否者 return (count == 0) ? null : extract(); } finally { lock.unlock(); } } private E extract() { final Object[] items = this.items; //获取元素值 E x = this.<E>cast(items[takeIndex]); //数组中值值为null; items[takeIndex] = null; //队头指针计算,队列元素个数减一 takeIndex = inc(takeIndex); --count; //发送信号激活notFull条件队列里面的线程 notFull.signal(); return x; }
五、take操作
从队头获取元素,如果队列为空则阻塞直到队列有元素。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //队列为空,则等待,直到队列有元素 while (count == 0) notEmpty.await(); return extract(); } finally { lock.unlock(); } }
需要注意的是如果队列为空,当前线程会被挂起放到notEmpty的条件队列里面,直到入队操作执行调用notEmpty.signal后当前线程才会被激活,await才会返回。
六、peek操作
返回队列头元素但不移除该元素,队列为空,返回null
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { //队列为空返回null,否者返回头元素 return (count == 0) ? null : itemAt(takeIndex); } finally { lock.unlock(); } } final E itemAt(int i) { return this.<E>cast(items[i]); }
七、size操作
获取队列元素个数,非常精确因为计算size时候加了独占锁,其他线程不能入队或者出队或者删除元素
public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return count; } finally { lock.unlock(); } }
八、 总结
ArrayBlockingQueue通过使用全局独占锁实现同时只能有一个线程进行入队或者出队操作,这个锁的粒度比较大,有点类似在方法上添加synchronized的意味。
其中offer,poll操作通过简单的加锁进行入队出队操作,而put,take则使用了条件变量实现如果队列满则等待,
如果队列空则等待,然后分别在出队和入队操作中发送信号激活等待线程实现同步。
另外相比LinkedBlockingQueue,ArrayBlockingQueue的size操作的结果是精确的,因为计算前加了全局锁。