Java并发包——线程安全的Collection相关类
摘要:本文主要学习了Java并发包下线程安全的Collection相关的类。
部分内容来自以下博客:
https://www.cnblogs.com/skywang12345/p/3498483.html
https://www.cnblogs.com/skywang12345/p/3498652.html
https://www.cnblogs.com/skywang12345/p/3503458.html
https://www.cnblogs.com/skywang12345/p/3498995.html
分类
参照之前在学习集合时候的分类,可以将JUC下有关Collection相关的类进行分类。
CopyOnWriteArrayList:实现了List接口,相当于线程安全的ArrayList。
CopyOnWriteArraySet:继承于AbstractSet类,相当于线程安全的HashSet。CopyOnWriteArraySet内部包含一个CopyOnWriteArrayList对象,它是通过CopyOnWriteArrayList实现的。
ConcurrentSkipListSet:继承于AbstractSet类,相当于线程安全的TreeSet。ConcurrentSkipListSet是通过ConcurrentSkipListMap实现的。
ArrayBlockingQueue:继承于AbstractQueue类,是数组实现的线程安全的有界的阻塞队列。
LinkedBlockingQueue:继承于AbstractQueue类,是单向链表实现的(指定大小)阻塞队列,该队列按FIFO(先进先出)排序元素。
LinkedBlockingDeque:继承于AbstractQueue类,是双向链表实现的(指定大小)双向并发阻塞队列,该阻塞队列同时支持FIFO和FILO两种操作方式。
ConcurrentLinkedQueue:继承于AbstractQueue类,是单向链表实现的无界队列,该队列按FIFO(先进先出)排序元素。
ConcurrentLinkedDeque:继承于AbstractQueue类,是双向链表实现的无界队列,该队列同时支持FIFO和FILO两种操作方式。
CopyOnWriteArrayList
说明
CopyOnWriteArrayList的内部有个“volatile数组”来保持数据。在“添加/修改/删除”数据时,都会新建一个数组,并将更新后的数据拷贝到新建的数组中,最后再将该数组赋值给“volatile数组”,这就是它叫做CopyOnWriteArrayList的原因。CopyOnWriteArrayList就是通过这种方式实现的动态数组,不过正由于它在“添加/修改/删除”数据时,都会新建数组,所以涉及到修改数据的操作,CopyOnWriteArrayList效率很低,但是单单只是进行遍历查找的话,效率比较高。
CopyOnWriteArrayList是通过“volatile数组”来保存数据的。一个线程读取volatile数组时,总能看到其它线程对该volatile变量最后的写入,就这样,通过volatile提供了“读取到的数据总是最新的”这个机制的
保证。
CopyOnWriteArrayList通过互斥锁来保护数据。在“添加/修改/删除”数据时,会先“获取互斥锁”,再修改完毕之后,先将数据更新到“volatile数组”中,然后再“释放互斥锁”,这样,就达到了保护数据的目的。
使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代器时,迭代器依赖于不变的数组快照。迭代器支持hasNext()、next()等不可变操作,但不支持add()、remove()等可变操作。
构造方法:
1 public CopyOnWriteArrayList() { 2 setArray(new Object[0]); 3 } 4 5 public CopyOnWriteArrayList(E[] toCopyIn) { 6 setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class)); 7 } 8 9 public CopyOnWriteArrayList(Collection<? extends E> c) { 10 Object[] elements; 11 if (c.getClass() == CopyOnWriteArrayList.class) 12 elements = ((CopyOnWriteArrayList<?>)c).getArray(); 13 else { 14 elements = c.toArray(); 15 // c.toArray might (incorrectly) not return Object[] (see 6260652) 16 if (elements.getClass() != Object[].class) 17 elements = Arrays.copyOf(elements, elements.length, Object[].class); 18 } 19 setArray(elements); 20 }
获取和设置array的方法
array是被volatile和transient修饰的一个数组。
关于volatile关键字,我们知道“volatile能让变量变得可见”,即对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入。正在由于这种特性,每次更新了“volatile数组”之后,其它线程都能看到对它所做的更新。
关于transient关键字,它是在序列化中才起作用,transient变量不会被自动序列化。
1 private transient volatile Object[] array; 2 3 final Object[] getArray() { 4 return array; 5 } 6 7 final void setArray(Object[] a) { 8 array = a; 9 }
添加元素
因为array数组是volatile修饰的,不能保证线程安全,所以在添加元素时使用锁来保证线程安全。
又因为array数组是volatile修饰的,所以在调用了setArray()方法后,能保证其它线程都能看到新添加的元素。
1 public void add(int index, E element) { 2 // 使用锁来保证线程安全。 3 final ReentrantLock lock = this.lock; 4 lock.lock(); 5 try { 6 // 获得array指向的引用地址。 7 Object[] elements = getArray(); 8 int len = elements.length; 9 // 如果指定位置越界,则抛出异常。 10 if (index > len || index < 0) 11 throw new IndexOutOfBoundsException("Index: "+index+", Size: "+len); 12 Object[] newElements; 13 // 如果插入位置是末尾。 14 int numMoved = len - index; 15 if (numMoved == 0) 16 // 将原数组进行拷贝并扩大一个容量。 17 newElements = Arrays.copyOf(elements, len + 1); 18 else { 19 // 如果不是插入到末尾,则创建扩大一个容量的数组。 20 newElements = new Object[len + 1]; 21 // 分段复制原数组,并空出指定位置。 22 System.arraycopy(elements, 0, newElements, 0, index); 23 System.arraycopy(elements, index, newElements, index + 1, numMoved); 24 } 25 // 设置指定位置的指定元素。 26 newElements[index] = element; 27 // 将array引用的地址指向新的数组。 28 setArray(newElements); 29 } finally { 30 lock.unlock(); 31 } 32 }
删除元素
删除元素就是将array数组中指定位置的元素删除。
它的实现方式是,如果被删除的是最后一个元素,则直接通过Arrays.copyOf()进行处理,而不需要新建数组。否则,新建数组,然后将array数组中被删除元素之外的其它元素拷贝到新数组中。最后,将新数组赋值给array数组。
1 public E remove(int index) { 2 // 使用锁来保证线程安全。 3 final ReentrantLock lock = this.lock; 4 lock.lock(); 5 try { 6 // 获得array指向的引用地址。 7 Object[] elements = getArray(); 8 int len = elements.length; 9 // 根据指定的位置获取元素。 10 E oldValue = get(elements, index); 11 // 如果指定的元素是最后一个元素。 12 int numMoved = len - index - 1; 13 if (numMoved == 0) 14 // 将原数组进行拷贝截取并将array的引用地址指向新的数组。 15 setArray(Arrays.copyOf(elements, len - 1)); 16 else { 17 // 如果不是最后一个元素,则创建减少一个容量的数组。 18 Object[] newElements = new Object[len - 1]; 19 // 分段复制原数组,并空出指定位置。 20 System.arraycopy(elements, 0, newElements, 0, index); 21 System.arraycopy(elements, index + 1, newElements, index, numMoved); 22 // 将array的引用地址指向新的数组。 23 setArray(newElements); 24 } 25 // 返回该位置上的元素。 26 return oldValue; 27 } finally { 28 lock.unlock(); 29 } 30 }
获取元素
获取元素很简单,就是返回array数组的指定位置的元素。
1 public E get(int index) { 2 return get(getArray(), index); 3 } 4 5 private E get(Object[] a, int index) { 6 return (E) a[index]; 7 }
设置元素
在设置元素之前判断指定位置的旧元素是否和新元素相等,如果相等则不进行替换,但仍然要调用setArray()方法。
1 public E set(int index, E element) { 2 // 使用锁来保证线程安全。 3 final ReentrantLock lock = this.lock; 4 lock.lock(); 5 try { 6 // 获得array指向的引用地址。 7 Object[] elements = getArray(); 8 // 获取指定位置的旧元素。 9 E oldValue = get(elements, index); 10 // 如果旧元素的引用和新元素的引用不同。 11 if (oldValue != element) { 12 // 创建新的数组并拷贝array数组的值,替换新数组指定位置的元素。 13 int len = elements.length; 14 Object[] newElements = Arrays.copyOf(elements, len); 15 newElements[index] = element; 16 // 将array的引用地址指向新的数组 17 setArray(newElements); 18 } else { 19 // 为了确保voliatile的语义,任何一个读操作都应该是写操作的结构,所以尽管写操作没有改变数据,还是调用set方法,当然这仅仅是语义的说明,去掉也是可以的。 20 setArray(elements); 21 } 22 return oldValue; 23 } finally { 24 lock.unlock(); 25 } 26 }
遍历
CopyOnWriteArrayList类的迭代方法返回的是一个COWIterator类的对象。
1 public Iterator<E> iterator() { 2 return new COWIterator<E>(getArray(), 0); 3 }
CopyOnWriteArrayList在类里维护了一个用于遍历的COWIterator类,COWIterator类实现了ListIterator接口。
1 static final class COWIterator<E> implements ListIterator<E> { 2 // 数组的快照。 3 private final Object[] snapshot; 4 // 指定下标。 5 private int cursor; 6 7 // 构造方法。 8 private COWIterator(Object[] elements, int initialCursor) { 9 cursor = initialCursor; 10 snapshot = elements; 11 } 12 13 // 判断是否存在下一个元素。 14 public boolean hasNext() { 15 return cursor < snapshot.length; 16 } 17 18 // 判断是否存在上一个元素。 19 public boolean hasPrevious() { 20 return cursor > 0; 21 } 22 23 // 获取下一个元素。 24 @SuppressWarnings("unchecked") 25 public E next() { 26 if (! hasNext()) 27 throw new NoSuchElementException(); 28 return (E) snapshot[cursor++]; 29 } 30 31 // 获取上一个元素。 32 @SuppressWarnings("unchecked") 33 public E previous() { 34 if (! hasPrevious()) 35 throw new NoSuchElementException(); 36 return (E) snapshot[--cursor]; 37 } 38 39 // 获取下一个元素的位置。 40 public int nextIndex() { 41 return cursor; 42 } 43 44 // 获取上一个元素的位置。 45 public int previousIndex() { 46 return cursor-1; 47 } 48 49 // 不支持删除元素。 50 public void remove() { 51 throw new UnsupportedOperationException(); 52 } 53 54 // 不支持修改元素。 55 public void set(E e) { 56 throw new UnsupportedOperationException(); 57 } 58 59 // 不支持添加元素。 60 public void add(E e) { 61 throw new UnsupportedOperationException(); 62 } 63 64 // JDK1.8新增的方法,使用迭代器Iterator的所有元素,并且第二次调用它将不会做任何事情。 65 @Override 66 public void forEachRemaining(Consumer<? super E> action) { 67 Objects.requireNonNull(action); 68 Object[] elements = snapshot; 69 final int size = elements.length; 70 for (int i = cursor; i < size; i++) { 71 @SuppressWarnings("unchecked") E e = (E) elements[i]; 72 action.accept(e); 73 } 74 cursor = size; 75 } 76 }
ArrayBlockingQueue
说明
ArrayBlockingQueue内部是通过Object[]数组保存数据的,也就是说ArrayBlockingQueue本质上是通过数组实现的。ArrayBlockingQueue的大小,即数组的容量是创建ArrayBlockingQueue时指定的。
ArrayBlockingQueue与ReentrantLock是组合关系,ArrayBlockingQueue中包含一个ReentrantLock对象。ReentrantLock是可重入的互斥锁,ArrayBlockingQueue就是根据该互斥锁实现“多线程对竞争资源的互斥访问”。而且,ReentrantLock分为公平锁和非公平锁,关于具体使用公平锁还是非公平锁,在创建ArrayBlockingQueue时可以指定,ArrayBlockingQueue默认会使用非公平锁。
ArrayBlockingQueue与Condition是组合关系,ArrayBlockingQueue中包含两个Condition对象。而且,Condition又依赖于ArrayBlockingQueue而存在,通过Condition可以实现对ArrayBlockingQueue的更精确的访问。
构造方法
1 public ArrayBlockingQueue(int capacity) { 2 this(capacity, false); 3 } 4 5 public ArrayBlockingQueue(int capacity, boolean fair) { 6 if (capacity <= 0) 7 throw new IllegalArgumentException(); 8 this.items = new Object[capacity]; 9 lock = new ReentrantLock(fair); 10 notEmpty = lock.newCondition(); 11 notFull = lock.newCondition(); 12 } 13 14 public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { 15 this(capacity, fair); 16 // 加锁是为了保证可见性,因为可能存在其他线程在初始化之后修改集合。 17 final ReentrantLock lock = this.lock; 18 lock.lock(); 19 try { 20 int i = 0; 21 try { 22 for (E e : c) { 23 checkNotNull(e); 24 items[i++] = e; 25 } 26 } catch (ArrayIndexOutOfBoundsException ex) { 27 throw new IllegalArgumentException(); 28 } 29 count = i; 30 putIndex = (i == capacity) ? 0 : i; 31 } finally { 32 lock.unlock(); 33 } 34 }
添加元素
ArrayBlockingQueue提供了offer()方法和put()方法两种方式添加元素。
offer()方法添加失败会立即返回false,并且添加过程中不允许被其他线程中断。
put()方法添加失败会等待,并且在添加过程中可以被其他线程中断,抛出InterruptedException异常。
1 // 不允许被其他线程中断,添加失败则立即返回false。 2 public boolean offer(E e) { 3 checkNotNull(e); 4 final ReentrantLock lock = this.lock; 5 lock.lock(); 6 try { 7 if (count == items.length) 8 return false; 9 else { 10 enqueue(e); 11 return true; 12 } 13 } finally { 14 lock.unlock(); 15 } 16 } 17 18 // 允许被其他线程中断,抛出InterruptedException,并且添加失败会等待。 19 public void put(E e) throws InterruptedException { 20 checkNotNull(e); 21 final ReentrantLock lock = this.lock; 22 lock.lockInterruptibly(); 23 try { 24 while (count == items.length) 25 notFull.await(); 26 enqueue(e); 27 } finally { 28 lock.unlock(); 29 } 30 } 31 32 // 实际上的添加方法,添加成功后会唤醒一个等待删除元素的线程。 33 private void enqueue(E x) { 34 final Object[] items = this.items; 35 items[putIndex] = x; 36 if (++putIndex == items.length) 37 putIndex = 0; 38 count++; 39 notEmpty.signal(); 40 }
删除元素
ArrayBlockingQueue提供了poll()方法和take()方法两种方式删除元素。
poll()方法删除失败会立即返回false,并且添加过程中不允许被其他线程中断。
take()方法删除失败会等待,并且在删除过程中可以被其他线程中断,抛出InterruptedException异常。
1 // 不允许被其他线程中断,删除失败则立即返回null。 2 public E poll() { 3 final ReentrantLock lock = this.lock; 4 lock.lock(); 5 try { 6 return (count == 0) ? null : dequeue(); 7 } finally { 8 lock.unlock(); 9 } 10 } 11 12 // 允许被其他线程中断,抛出InterruptedException,并且删除失败会等待。 13 public E take() throws InterruptedException { 14 final ReentrantLock lock = this.lock; 15 lock.lockInterruptibly(); 16 try { 17 while (count == 0) 18 notEmpty.await(); 19 return dequeue(); 20 } finally { 21 lock.unlock(); 22 } 23 } 24 25 // 实际上的删除方法,删除成功后会唤醒一个等待添加元素的线程。 26 private E dequeue() { 27 final Object[] items = this.items; 28 @SuppressWarnings("unchecked") 29 E x = (E) items[takeIndex]; 30 items[takeIndex] = null; 31 if (++takeIndex == items.length) 32 takeIndex = 0; 33 count--; 34 if (itrs != null) 35 itrs.elementDequeued(); 36 notFull.signal(); 37 return x; 38 }
LinkedBlockingQueue
说明
LinkedBlockingQueue是一个单向链表实现的阻塞队列。该队列按FIFO(先进先出)排序元素,新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。
LinkedBlockingQueue是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小等于Integer.MAX_VALUE。
LinkedBlockingQueue实现了BlockingQueue接口,它支持多线程并发。当多线程竞争同一个资源时,某线程获取到该资源之后,其它线程需要阻塞等待。
LinkedBlockingQueue在实现多线程对竞争资源的互斥访问时,对于插入和取出操作分别使用了不同的锁。此外,插入锁putLock和非满条件notFull相关联,取出锁takeLock和非空条件notEmpty相关联。通过notFull和notEmpty更细腻的控制锁。
属性
1 head是链表的表头。取出数据时,都是从表头head处插入。 2 last是链表的表尾。新增数据时,都是从表尾last处插入。 3 count是链表的实际大小,即当前链表中包含的节点个数。 4 capacity是列表的容量,它是在创建链表时指定的。 5 putLock是插入锁。 6 takeLock是取出锁。 7 notEmpty是非空条件。 8 notFull是非满条件。
构造方法
1 // 创建一个容量为Integer.MAX_VALUE的LinkedBlockingQueue。 2 LinkedBlockingQueue() 3 // 创建一个指定容量的LinkedBlockingQueue。 4 LinkedBlockingQueue(int capacity) 5 // 创建一个容量是Integer.MAX_VALUE的LinkedBlockingQueue,最初包含给定collection的元素,元素按该collection迭代器的遍历顺序添加。 6 LinkedBlockingQueue(Collection<? extends E> c)
其他方法
1 // 将指定元素插入到此队列的尾部,如果队列已满,则等待。 2 void put(E e) 3 // 将指定元素插入到此队列的尾部,如果队列已满,则返回false。 4 boolean offer(E e) 5 // 将指定元素插入到此队列的尾部,如果队列已满,则等待指定的时间。 6 boolean offer(E e, long timeout, TimeUnit unit) 7 // 获取并移除此队列的头部,如果队列为空,则等待。 8 E take() 9 // 获取并移除此队列的头部,如果队列为空,则返回null。 10 E poll() 11 // 获取并移除此队列的头部,如果队列为空,则等待指定的时间。 12 E poll(long timeout, TimeUnit unit) 13 // 获取但不移除此队列的头,如果此队列为空,则返回null。 14 E peek() 15 // 返回在队列中的元素上按适当顺序进行迭代的迭代器。 16 Iterator<E> iterator()
ConcurrentLinkedQueue
说明
ConcurrentLinkedQueue是线程安全的队列,它适用于“高并发”的场景。ConcurrentLinkedQueue使用CAS来保证更新的线程安全,是一个非阻塞队列。
ConcurrentLinkedQueue是一个基于链表的无界线程安全队列,按照FIFO(先进先出)原则对元素进行排序。队列元素中不可以放置null元素(内部实现的特殊节点除外)。
构造方法
1 // 创建一个最初为空的ConcurrentLinkedQueue。 2 ConcurrentLinkedQueue() 3 // 创建一个最初包含给定collection元素的ConcurrentLinkedQueue,按照此collection迭代器的遍历顺序来添加元素。 4 ConcurrentLinkedQueue(Collection<? extends E> c)
其他方法
1 // 将指定元素插入此队列的尾部。 2 boolean offer(E e) 3 // 获取并移除此队列的头,如果队列为空,则返回null。 4 E poll() 5 // 获取但不移除此队列的头,如果队列为空,则返回null。 6 E peek() 7 // 返回在此队列元素上以恰当顺序进行迭代的迭代器。 8 Iterator<E> iterator() 9 // 返回此队列中的元素数量。 10 int size()