多线程环境下Java提供的一些简单容器都无法使用了要用到JUC中的大部分容器,由于ConcurrentHashMap是高频考点用到也不较多因此着重写了下,其余的容器就先Mark下。奥利给
跳表
简而言之跳表就是多层链表的结合体,跳表分为许多层(level),每一层都可以看作是数据的索引,这些索引的意义就是加快跳表查找数据速度。每一层的数据都是有序的,上一层数据是下一层数据的子集,并且第一层(level 1)包含了全部的数据;层次越高,跳跃性越大,包含的数据越少。并且随便插入一个数据该数据是否会是跳表索引完全随机的跟玩骰子一样,redis中的zset
底层就是跳表数据结构。并且跳表的速度几乎接近红黑树了。
跳表包含一个表头,它查找数据时,是从上往下,从左往右
进行查找。现在“需要找出值为37的节点”为例,来对比说明跳表和普遍的链表。
- 没有跳表查询
比如我查询数据37,如果没有上面的索引时候路线如下图:
- 有跳表查询
有跳表查询37的时候路线如下图:
延伸思考:
既然跳表实现简单速度挺好ConcurrentHashMap
为什么不直接用跳表用红黑树?
先说下在HashMap
中一般空间利用率就在40%作用,而ConcurrentHashMap
空间利用率只能达到10%~20%。如果这个时候在不节省空间还用跳表替换红黑树。那么就凉凉了。
ConcurrentSkipListMap
我们在存储KV的时候一般有三种容器可以使用,TreeMap、ConcurrentSkipListMap、HashMap三种容器。其中TreeMap
可以理解为红黑树在Java中的具体实现,细节部分以前写过这里不再重复。ConcurrentSkipListMap主要就是利用跳表的思维来实现速度的提升,他们区别跟性能对比如下:
- TreeMap基于红黑树(平衡二叉查找树)实现的,时间复杂度平均能达到O(log n),多线程不安全。
- HashMap是基于散列表实现的,时间复杂度平均能达到O(1),多线程不安全。
- ConcurrentSkipListMap是基于跳表实现的,时间复杂度平均能达到O(log n),多线程安全。
- 红黑树涉及各种旋转操作比较复杂,HashMap底层数组+ 链表+ 红黑树,跳表实现起来就很简单了。
结论:
- 当数据量增加时,HashMap会引起散列冲突,解决冲突需要多花费一些时间代价,故在f(n)=1向上浮动。随着数据量的增加,HashMap的时间花费小且稳定,充分秉承着空间换时间的思想,在单线程的环境下比TreeMap和ConcurrentSkipListMap在插入和查找上有很大的优势。
- 如果必须有序且多线程就用ConcurrentSkipListMap,如果单线程不需要考虑是否有序就用HashMap。
其中ConcurrentSkipListMap
基础结构图如下:
ConcurrentSkipListSet
Set
是一个无序的数据集合,TreeSet
的底层是通过TreeMap
实现的,思想其实跟HashMap
跟HashSet
类似,TreeSet
就是只有Key的TreeMap
。TreeSet
是通过红黑树来实现的速度可达到O(log n)
但是线程也是不安全的。ConcurrentSkipListSet
是基于跳表实现的线程安全的ListSet。
ConcurrentLinkedQueue
可以认为是LinkedList
的多线程安全升级版。一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。
新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue
是一个恰当的选择,底层用了很多sun.misc.Unsafe UNSAFE
硬件级别的原子操作。此队列不允许使用 null 元素。
- offer(E e) :将指定元素插入此队列的尾部。
- add(E e): 跟offer 功能一样将指定元素插入此队列的尾部, add方法体调用的就是offer.
- poll() : 获取并移除此队列的头,如果此队列为空,则返回 null
- peek() : 获取但不移除此队列的头,如果此队列为空,则返回 null
- remove(Object o) : 从队列中移除指定元素的单个实例(如果存在)
CopyOnWriteArrayList
CopyOnWrite
写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy
,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以写时复制容器也是一种读写分离的思想,读和写不同的容器。如果读的时候有多个线程正在向容器添加数据,读还是会读到旧的数据,因为写的时候不会锁住旧的,只能保证最终一致性。Redis
中执行bgsave
时候就是用的此机制。这种机制写一次就要copy一份。多个线程要执行此操作必须等上一个线程执行完毕。如果用读写锁我在写的时候你是无法读的,锁锁无法降级的。
CopyOnWriteArrayList
底层用的ReentrantLock()
来实现加锁,这又印证来AQS占据JUC半壁江山。
优点
对于一些读多写少的数据,这种做法的确很不错,例如配置、黑名单、物流地址等变化非常少的数据,这是一种无锁的实现。可以帮我们实现程序更高的并发,只能保证抢一致性。
缺点
这种实现只是保证数据的最终一致性,在添加到拷贝数据而还没进行替换的时候,读到的仍然是旧数据。如果对象比较大,频繁地进行替换会消耗内存,从而引发Java的GC问题,这个时候,我们应该考虑其他的容器,例如ConcurrentHashMap。
CopyOnWriteArraySet
CopyOnWriteArraySet
是基于CopyOnWriteArrayList
实现的,只有add
的方法稍微有些不同,因为CopyOnWriteArraySet
是Set
也就是不能有重复的元素,故在CopyOnWriteArraySet
中用了addIfAbsent(e)
这样的方法。
BlockingQueue
在JUC包中BlockingQueue
很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。BlockingQueue
即阻塞队列,它是基于ReentrantLock
实现的,BlockingQueue
阻塞队列的概念:
- 当队列满的时候,插入元素的线程被阻塞,直达队列不满。
- 队列为空的时候,获取元素的线程被阻塞,直到队列不空。
生产者和消费者模式概念:
生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。生产者和消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而是通过阻塞队列来进行通信,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
BlockingQueue
是个接口,主要又有若干方法。
常用方法:
方法 | 抛出异常 | 返回值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add | offer | put | offer(time) |
移除方法 | remove | poll | take | poll(time) |
检查 方法 | element | peek | N/A | N/A |
1.add(E e):在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回IllegalStateException异常。
2.offer(E e): 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回false。
3.put(E e): 直接在队列中插入元素,当无可用空间时候,阻塞等待。
4.offer(E e, long time, timeunit unit):将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false。
5. E take():获取并移除队列头部的元素,无元素时候阻塞等待。
6. E poll( long time, timeunit unit):获取并移除队列头部的元素,无元素时候阻塞等待指定时间。
7. remove(Object o) :若队列为空,抛出NoSuchElementException异常
8. E poll():若队列为空,返回null
BlockingQueue
是一个接口,它的实现类有ArrayBlockingQueue
、LinkedBlockingDeque
、PriorityBlockingQueue
、DelayQueue
、SynchronousQueue
、LinkedTransferQueue
、LinkedBlockingQueue
等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。
ArrayBlockingQueue
一个由数组结构组成的有界阻塞队列,按照先进先出原则,其中有界也就意味着,它不能够存储无限多数量的对象,要求设定初始大小。
数组类型:
/** The queued items */
final Object[] items;
唯一全局锁
// 这是一个掌管所有访问操作的锁。全局共享。都会使用这个锁。
final ReentrantLock lock;
两个等待队列
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
put 方法
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock; // 唯一锁
lock.lockInterruptibly();// 加锁
try {
while (count == items.length)
notFull.await();//await 让出操作权
enqueue(e);// 被唤醒就加入队列。
} finally {
lock.unlock();// 解锁
}
}
take方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock; // 加锁
lock.lockInterruptibly();
try {
while (count == 0)//为空则释放当前锁
notEmpty.await();
return dequeue();// 获得锁被唤醒了则返回数据
} finally {
lock.unlock();// 释放锁
}
}
LinkedBlockingQueue
LinkedBlockingQueue是一个由链表结构组成的有界阻塞队列,按照先进先出原则,可以不设定初始大小,默认Integer.MAX_VALUE
,为了避免队列过大造成机器负载或者内存爆满的情况出现,在使用的时候一般建议手动传一个队列的大小。
//节点类,用于存储数据
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
// 阻塞队列的大小,默认为Integer.MAX_VALUE
private final int capacity;
//当前阻塞队列中的元素个数
private final AtomicInteger count = new AtomicInteger();
// 阻塞队列的头结点
transient Node<E> head;
// 阻塞队列的尾节点
private transient Node<E> last;
// 获取并移除元素时使用的锁,如take, poll
private final ReentrantLock takeLock = new ReentrantLock();
// notEmpty条件对象,当队列没有数据时用于 挂起 执行删除的线程
private final Condition notEmpty = takeLock.newCondition();
// 添加元素时使用的锁如 put, offer
private final ReentrantLock putLock = new ReentrantLock();
// notFull条件对象,当队列数据已满时用于 挂起 执行添加的线程
private final Condition notFull = putLock.newCondition();
添加到LinkedBlockingQueue
队列中的数据都将被封装成Node节点,添加的链表队列中,其中head和last分别指向队列的头结点和尾结点。与ArrayBlockingQueue不同的是,LinkedBlockingQueue内部分别使用了takeLock
和putLock
对并发进行控制,也就是说,添加和删除操作并不是互斥操作,可以同时进行,这样也就可以大大提高吞吐量。
ArrayBlockingQueue和LinkedBlockingQueue对比:
- 实现:ArrayBlockingQueue底层上数组,ArrayBlockingQueue用Node包装后的链表(包含包装导致更大更冗余易触发GC)
- 初始化:ArrayBlockingQueue必须要求有初始值,ArrayBlockingQueue没有强制要求。
- 锁上:ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而ArrayBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量。
SynchronousQueue
一个不存储元素的阻塞队列。每一个put操作都要等待一个take操作请求才会put数据。
PriorityBlockingQueue
PriorityBlockingQueue是一个支持优先级的无界阻塞队列,直到系统资源耗尽。默认情况下元素采用自然顺序升序排列。也可以自定义类继承comparable<E>
实现compareTo()
方法来指定元素排序规则,或者初始化PriorityBlockingQueue
时,指定构造参数Comparator
来对元素进行排序。但需要注意的是不能保证同优先级元素的顺序。PriorityBlockingQueue
也是基最小二叉堆实现,使用基于CAS实现的自旋锁来控制队列的动态扩容,保证了扩容操作不会阻塞take操作的执行
LinkedTransferQueue
LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue
队列。该类实现了一个TransferQueue
接口,相对于其他阻塞队列,LinkedTransferQueue
多了tryTransfer
和transfer
方法。
public interface TransferQueue<E> extends BlockingQueue<E> {
// 如果可能,立即将元素转移给等待的消费者。
// 更确切地说,如果存在消费者已经等待接收它(在 take 或 timed poll(long,TimeUnit)poll)中,则 立即传送指定 的元素,否则返回 false。
boolean tryTransfer(E e);
// 将元素转移给消费者,如果需要的话等待。
// 更准确地说,如果存在一个消费者已经等待接收它(在 take 或timed poll(long,TimeUnit)poll)中,则立即传送指定的元素,否则 等待 直到 元素由消费者接收。
void transfer(E e) throws InterruptedException;
// 上面方法的基础上设置超时时间
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
// 如果至少有一位消费者在等待,则返回 true
boolean hasWaitingConsumer();
// 返回等待消费者人数的估计值
int getWaitingConsumerCount();
}
LinkedBlockingDeque
LinkedBlockingDeque
一个个由链表结构组成的双向阻塞队列,注意Deque
的存在。
可以从队列的头和尾都可以插入和移除元素,可以实现工作密取,比如ForkJoin底层任务队列。方法名带了First对头部操作,带了last从尾部操作。
另外方法调用的时候默认
add=addLast; remove=removeFirst; take=takeFirst
public boolean add(E e) {
addLast(e); // 等价
return true;
}
public E remove() {
return removeFirst();
}
public E take() throws InterruptedException {
return takeFirst();
}
DelayQueue
DelayQueue 一个使用优先级队列实现的无界阻塞队列。支持延时获取的元素的阻塞队列,元素必须要实现Delayed接口。放入队列中的元素只有在指定的timeout后才可以取出,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间赿晚。
适用场景:实现自己的缓存系统,订单到期,限时支付等。
demo
简单选择最后一个DelayQueue
做个demo演示加深理解。
任务:我有一个订单系统,需要通过阻塞队列延时功能实现。
需要(订单类,包装订单类,生产者,消费者,测试)
- 订单类
public class Order {
private final String orderNo;//订单的编号
private final double orderMoney;//订单的金额
public Order(String orderNo, double orderMoney) {
super();
this.orderNo = orderNo;
this.orderMoney = orderMoney;
}
public String getOrderNo() {
return orderNo;
}
public double getOrderMoney() {
return orderMoney;
}
}
- 包装类
// 类说明:存放到队列的元素
public class ItemVo<T> implements Delayed {
private long activeTime;//到期时间,单位毫秒
private T object;
//activeTime是个过期时长
public ItemVo(long activeTime, T object) {
super();
this.activeTime = TimeUnit.NANOSECONDS.convert(activeTime, TimeUnit.MILLISECONDS) + System.nanoTime();
// 将传入的时长转换为超时的时刻
this.object = object;
}
public T getObject() {
return object;
}
//按照剩余时间排序
@Override
public int compareTo(Delayed o) {
long d = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
return (d == 0) ? 0 : ((d > 0) ? 1 : -1);
}
//返回元素的剩余时间
@Override
public long getDelay(TimeUnit unit) {
long d = unit.convert(this.activeTime - System.nanoTime(), TimeUnit.NANOSECONDS);
return d;
}
}
- 生产者
public class PutOrder implements Runnable {
private DelayQueue<ItemVo<Order>> queue;
public PutOrder(DelayQueue<ItemVo<Order>> queue) {
super();
this.queue = queue;
}
@Override
public void run() {
//5秒到期
Order ordeTb = new Order("TBSoWhat", 14);
ItemVo<Order> itemTb = new ItemVo<Order>(5000, ordeTb);
queue.offer(itemTb); //插入
System.out.println("订单5秒后到期:" + ordeTb.getOrderNo());
//8秒到期
Order ordeJd = new Order("JDSoWhat", 12);
ItemVo<Order> itemJd = new ItemVo<Order>(8000, ordeJd);
queue.offer(itemJd);// 插入
System.out.println("订单8秒后到期:" + ordeJd.getOrderNo());
}
}
- 消费者
public class FetchOrder implements Runnable {
private DelayQueue<ItemVo<Order>> queue;
public FetchOrder(DelayQueue<ItemVo<Order>> queue) {
super();
this.queue = queue;
}
@Override
public void run() {
while(true) {
try {
ItemVo<Order> item = queue.take();
Order order = (Order)item.getObject();
System.out.println("get from queue:"+order.getOrderNo());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- 测试延时功能
public class Test {
public static void main(String[] args) throws InterruptedException {
DelayQueue<ItemVo<Order>> queue = new DelayQueue<>();
new Thread(new PutOrder(queue)).start();
new Thread(new FetchOrder(queue)).start();
//每隔1秒,打印个数字
for(int i=1;i<19;i++){
Thread.sleep(1000);
System.out.println(i*1000);
}
}
}