zoukankan      html  css  js  c++  java
  • Java并发包——线程安全的Collection相关类

    Java并发包——线程安全的Collection相关类

    摘要:本文主要学习了Java并发包下线程安全的Collection相关的类。

    部分内容来自以下博客:

    https://www.cnblogs.com/skywang12345/p/3498483.html

    https://www.cnblogs.com/skywang12345/p/3498652.html

    https://www.cnblogs.com/skywang12345/p/3503458.html

    https://www.cnblogs.com/skywang12345/p/3498995.html

    分类

    参照之前在学习集合时候的分类,可以将JUC下有关Collection相关的类进行分类。

    CopyOnWriteArrayList:实现了List接口,相当于线程安全的ArrayList。

    CopyOnWriteArraySet:继承于AbstractSet类,相当于线程安全的HashSet。CopyOnWriteArraySet内部包含一个CopyOnWriteArrayList对象,它是通过CopyOnWriteArrayList实现的。

    ConcurrentSkipListSet:继承于AbstractSet类,相当于线程安全的TreeSet。ConcurrentSkipListSet是通过ConcurrentSkipListMap实现的。

    ArrayBlockingQueue:继承于AbstractQueue类,是数组实现的线程安全的有界的阻塞队列。

    LinkedBlockingQueue:继承于AbstractQueue类,是单向链表实现的(指定大小)阻塞队列,该队列按FIFO(先进先出)排序元素。

    LinkedBlockingDeque:继承于AbstractQueue类,是双向链表实现的(指定大小)双向并发阻塞队列,该阻塞队列同时支持FIFO和FILO两种操作方式。

    ConcurrentLinkedQueue:继承于AbstractQueue类,是单向链表实现的无界队列,该队列按FIFO(先进先出)排序元素。

    ConcurrentLinkedDeque:继承于AbstractQueue类,是双向链表实现的无界队列,该队列同时支持FIFO和FILO两种操作方式。

    CopyOnWriteArrayList

    说明

    CopyOnWriteArrayList的内部有个“volatile数组”来保持数据。在“添加/修改/删除”数据时,都会新建一个数组,并将更新后的数据拷贝到新建的数组中,最后再将该数组赋值给“volatile数组”,这就是它叫做CopyOnWriteArrayList的原因。CopyOnWriteArrayList就是通过这种方式实现的动态数组,不过正由于它在“添加/修改/删除”数据时,都会新建数组,所以涉及到修改数据的操作,CopyOnWriteArrayList效率很低,但是单单只是进行遍历查找的话,效率比较高。

    CopyOnWriteArrayList是通过“volatile数组”来保存数据的。一个线程读取volatile数组时,总能看到其它线程对该volatile变量最后的写入,就这样,通过volatile提供了“读取到的数据总是最新的”这个机制的
    保证。

    CopyOnWriteArrayList通过互斥锁来保护数据。在“添加/修改/删除”数据时,会先“获取互斥锁”,再修改完毕之后,先将数据更新到“volatile数组”中,然后再“释放互斥锁”,这样,就达到了保护数据的目的。

    使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代器时,迭代器依赖于不变的数组快照。迭代器支持hasNext()、next()等不可变操作,但不支持add()、remove()等可变操作。

    构造方法:

     1 public CopyOnWriteArrayList() {
     2     setArray(new Object[0]);
     3 }
     4 
     5 public CopyOnWriteArrayList(E[] toCopyIn) {
     6     setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class));
     7 }
     8 
     9 public CopyOnWriteArrayList(Collection<? extends E> c) {
    10     Object[] elements;
    11     if (c.getClass() == CopyOnWriteArrayList.class)
    12         elements = ((CopyOnWriteArrayList<?>)c).getArray();
    13     else {
    14         elements = c.toArray();
    15         // c.toArray might (incorrectly) not return Object[] (see 6260652)
    16         if (elements.getClass() != Object[].class)
    17             elements = Arrays.copyOf(elements, elements.length, Object[].class);
    18     }
    19     setArray(elements);
    20 }

    获取和设置array的方法

    array是被volatile和transient修饰的一个数组。

    关于volatile关键字,我们知道“volatile能让变量变得可见”,即对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入。正在由于这种特性,每次更新了“volatile数组”之后,其它线程都能看到对它所做的更新。

    关于transient关键字,它是在序列化中才起作用,transient变量不会被自动序列化。

    1 private transient volatile Object[] array;
    2 
    3 final Object[] getArray() {
    4     return array;
    5 }
    6 
    7 final void setArray(Object[] a) {
    8     array = a;
    9 }

    添加元素

    因为array数组是volatile修饰的,不能保证线程安全,所以在添加元素时使用锁来保证线程安全。

    又因为array数组是volatile修饰的,所以在调用了setArray()方法后,能保证其它线程都能看到新添加的元素。

     1 public void add(int index, E element) {
     2     // 使用锁来保证线程安全。
     3     final ReentrantLock lock = this.lock;
     4     lock.lock();
     5     try {
     6         // 获得array指向的引用地址。
     7         Object[] elements = getArray();
     8         int len = elements.length;
     9         // 如果指定位置越界,则抛出异常。
    10         if (index > len || index < 0)
    11             throw new IndexOutOfBoundsException("Index: "+index+", Size: "+len);
    12         Object[] newElements;
    13         // 如果插入位置是末尾。
    14         int numMoved = len - index;
    15         if (numMoved == 0)
    16             // 将原数组进行拷贝并扩大一个容量。
    17             newElements = Arrays.copyOf(elements, len + 1);
    18         else {
    19             // 如果不是插入到末尾,则创建扩大一个容量的数组。
    20             newElements = new Object[len + 1];
    21             // 分段复制原数组,并空出指定位置。
    22             System.arraycopy(elements, 0, newElements, 0, index);
    23             System.arraycopy(elements, index, newElements, index + 1, numMoved);
    24         }
    25         // 设置指定位置的指定元素。
    26         newElements[index] = element;
    27         // 将array引用的地址指向新的数组。
    28         setArray(newElements);
    29     } finally {
    30         lock.unlock();
    31     }
    32 }

    删除元素

    删除元素就是将array数组中指定位置的元素删除。

    它的实现方式是,如果被删除的是最后一个元素,则直接通过Arrays.copyOf()进行处理,而不需要新建数组。否则,新建数组,然后将array数组中被删除元素之外的其它元素拷贝到新数组中。最后,将新数组赋值给array数组。

     1 public E remove(int index) {
     2     // 使用锁来保证线程安全。
     3     final ReentrantLock lock = this.lock;
     4     lock.lock();
     5     try {
     6         // 获得array指向的引用地址。
     7         Object[] elements = getArray();
     8         int len = elements.length;
     9         // 根据指定的位置获取元素。
    10         E oldValue = get(elements, index);
    11         // 如果指定的元素是最后一个元素。
    12         int numMoved = len - index - 1;
    13         if (numMoved == 0)
    14             // 将原数组进行拷贝截取并将array的引用地址指向新的数组。
    15             setArray(Arrays.copyOf(elements, len - 1));
    16         else {
    17             // 如果不是最后一个元素,则创建减少一个容量的数组。
    18             Object[] newElements = new Object[len - 1];
    19             // 分段复制原数组,并空出指定位置。
    20             System.arraycopy(elements, 0, newElements, 0, index);
    21             System.arraycopy(elements, index + 1, newElements, index, numMoved);
    22             // 将array的引用地址指向新的数组。
    23             setArray(newElements);
    24         }
    25         // 返回该位置上的元素。
    26         return oldValue;
    27     } finally {
    28         lock.unlock();
    29     }
    30 }

    获取元素

    获取元素很简单,就是返回array数组的指定位置的元素。

    1 public E get(int index) {
    2     return get(getArray(), index);
    3 }
    4 
    5 private E get(Object[] a, int index) {
    6     return (E) a[index];
    7 }

    设置元素

    在设置元素之前判断指定位置的旧元素是否和新元素相等,如果相等则不进行替换,但仍然要调用setArray()方法。

     1 public E set(int index, E element) {
     2     // 使用锁来保证线程安全。
     3     final ReentrantLock lock = this.lock;
     4     lock.lock();
     5     try {
     6         // 获得array指向的引用地址。
     7         Object[] elements = getArray();
     8         // 获取指定位置的旧元素。
     9         E oldValue = get(elements, index);
    10         // 如果旧元素的引用和新元素的引用不同。
    11         if (oldValue != element) {
    12             // 创建新的数组并拷贝array数组的值,替换新数组指定位置的元素。
    13             int len = elements.length;
    14             Object[] newElements = Arrays.copyOf(elements, len);
    15             newElements[index] = element;
    16             // 将array的引用地址指向新的数组
    17             setArray(newElements);
    18         } else {
    19             // 为了确保voliatile的语义,任何一个读操作都应该是写操作的结构,所以尽管写操作没有改变数据,还是调用set方法,当然这仅仅是语义的说明,去掉也是可以的。
    20             setArray(elements);
    21         }
    22         return oldValue;
    23     } finally {
    24         lock.unlock();
    25     }
    26 }

    遍历

    CopyOnWriteArrayList类的迭代方法返回的是一个COWIterator类的对象。

    1 public Iterator<E> iterator() {
    2     return new COWIterator<E>(getArray(), 0);
    3 }

    CopyOnWriteArrayList在类里维护了一个用于遍历的COWIterator类,COWIterator类实现了ListIterator接口。

     1 static final class COWIterator<E> implements ListIterator<E> {
     2     // 数组的快照。
     3     private final Object[] snapshot;
     4     // 指定下标。
     5     private int cursor;
     6 
     7     // 构造方法。
     8     private COWIterator(Object[] elements, int initialCursor) {
     9         cursor = initialCursor;
    10         snapshot = elements;
    11     }
    12 
    13     // 判断是否存在下一个元素。
    14     public boolean hasNext() {
    15         return cursor < snapshot.length;
    16     }
    17 
    18     // 判断是否存在上一个元素。
    19     public boolean hasPrevious() {
    20         return cursor > 0;
    21     }
    22 
    23     // 获取下一个元素。
    24     @SuppressWarnings("unchecked")
    25     public E next() {
    26         if (! hasNext())
    27             throw new NoSuchElementException();
    28         return (E) snapshot[cursor++];
    29     }
    30 
    31     // 获取上一个元素。
    32     @SuppressWarnings("unchecked")
    33     public E previous() {
    34         if (! hasPrevious())
    35             throw new NoSuchElementException();
    36         return (E) snapshot[--cursor];
    37     }
    38 
    39     // 获取下一个元素的位置。
    40     public int nextIndex() {
    41         return cursor;
    42     }
    43 
    44     // 获取上一个元素的位置。
    45     public int previousIndex() {
    46         return cursor-1;
    47     }
    48 
    49     // 不支持删除元素。
    50     public void remove() {
    51         throw new UnsupportedOperationException();
    52     }
    53 
    54     // 不支持修改元素。
    55     public void set(E e) {
    56         throw new UnsupportedOperationException();
    57     }
    58 
    59     // 不支持添加元素。
    60     public void add(E e) {
    61         throw new UnsupportedOperationException();
    62     }
    63 
    64     // JDK1.8新增的方法,使用迭代器Iterator的所有元素,并且第二次调用它将不会做任何事情。
    65     @Override
    66     public void forEachRemaining(Consumer<? super E> action) {
    67         Objects.requireNonNull(action);
    68         Object[] elements = snapshot;
    69         final int size = elements.length;
    70         for (int i = cursor; i < size; i++) {
    71             @SuppressWarnings("unchecked") E e = (E) elements[i];
    72             action.accept(e);
    73         }
    74         cursor = size;
    75     }
    76 }

    ArrayBlockingQueue

    说明

    ArrayBlockingQueue内部是通过Object[]数组保存数据的,也就是说ArrayBlockingQueue本质上是通过数组实现的。ArrayBlockingQueue的大小,即数组的容量是创建ArrayBlockingQueue时指定的。

    ArrayBlockingQueue与ReentrantLock是组合关系,ArrayBlockingQueue中包含一个ReentrantLock对象。ReentrantLock是可重入的互斥锁,ArrayBlockingQueue就是根据该互斥锁实现“多线程对竞争资源的互斥访问”。而且,ReentrantLock分为公平锁和非公平锁,关于具体使用公平锁还是非公平锁,在创建ArrayBlockingQueue时可以指定,ArrayBlockingQueue默认会使用非公平锁。

    ArrayBlockingQueue与Condition是组合关系,ArrayBlockingQueue中包含两个Condition对象。而且,Condition又依赖于ArrayBlockingQueue而存在,通过Condition可以实现对ArrayBlockingQueue的更精确的访问。

    构造方法

     1 public ArrayBlockingQueue(int capacity) {
     2     this(capacity, false);
     3 }
     4 
     5 public ArrayBlockingQueue(int capacity, boolean fair) {
     6     if (capacity <= 0)
     7         throw new IllegalArgumentException();
     8     this.items = new Object[capacity];
     9     lock = new ReentrantLock(fair);
    10     notEmpty = lock.newCondition();
    11     notFull =  lock.newCondition();
    12 }
    13 
    14 public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
    15     this(capacity, fair);
    16     // 加锁是为了保证可见性,因为可能存在其他线程在初始化之后修改集合。
    17     final ReentrantLock lock = this.lock;
    18     lock.lock();
    19     try {
    20         int i = 0;
    21         try {
    22             for (E e : c) {
    23                 checkNotNull(e);
    24                 items[i++] = e;
    25             }
    26         } catch (ArrayIndexOutOfBoundsException ex) {
    27             throw new IllegalArgumentException();
    28         }
    29         count = i;
    30         putIndex = (i == capacity) ? 0 : i;
    31     } finally {
    32         lock.unlock();
    33     }
    34 }

    添加元素

    ArrayBlockingQueue提供了offer()方法和put()方法两种方式添加元素。

    offer()方法添加失败会立即返回false,并且添加过程中不允许被其他线程中断。

    put()方法添加失败会等待,并且在添加过程中可以被其他线程中断,抛出InterruptedException异常。

     1 // 不允许被其他线程中断,添加失败则立即返回false。
     2 public boolean offer(E e) {
     3     checkNotNull(e);
     4     final ReentrantLock lock = this.lock;
     5     lock.lock();
     6     try {
     7         if (count == items.length)
     8             return false;
     9         else {
    10             enqueue(e);
    11             return true;
    12         }
    13     } finally {
    14         lock.unlock();
    15     }
    16 }
    17 
    18 // 允许被其他线程中断,抛出InterruptedException,并且添加失败会等待。
    19 public void put(E e) throws InterruptedException {
    20     checkNotNull(e);
    21     final ReentrantLock lock = this.lock;
    22     lock.lockInterruptibly();
    23     try {
    24         while (count == items.length)
    25             notFull.await();
    26         enqueue(e);
    27     } finally {
    28         lock.unlock();
    29     }
    30 }
    31 
    32 // 实际上的添加方法,添加成功后会唤醒一个等待删除元素的线程。
    33 private void enqueue(E x) {
    34     final Object[] items = this.items;
    35     items[putIndex] = x;
    36     if (++putIndex == items.length)
    37         putIndex = 0;
    38     count++;
    39     notEmpty.signal();
    40 }

    删除元素

    ArrayBlockingQueue提供了poll()方法和take()方法两种方式删除元素。

    poll()方法删除失败会立即返回false,并且添加过程中不允许被其他线程中断。

    take()方法删除失败会等待,并且在删除过程中可以被其他线程中断,抛出InterruptedException异常。

     1 // 不允许被其他线程中断,删除失败则立即返回null。
     2 public E poll() {
     3     final ReentrantLock lock = this.lock;
     4     lock.lock();
     5     try {
     6         return (count == 0) ? null : dequeue();
     7     } finally {
     8         lock.unlock();
     9     }
    10 }
    11 
    12 // 允许被其他线程中断,抛出InterruptedException,并且删除失败会等待。
    13 public E take() throws InterruptedException {
    14     final ReentrantLock lock = this.lock;
    15     lock.lockInterruptibly();
    16     try {
    17         while (count == 0)
    18             notEmpty.await();
    19         return dequeue();
    20     } finally {
    21         lock.unlock();
    22     }
    23 }
    24 
    25 // 实际上的删除方法,删除成功后会唤醒一个等待添加元素的线程。
    26 private E dequeue() {
    27     final Object[] items = this.items;
    28     @SuppressWarnings("unchecked")
    29     E x = (E) items[takeIndex];
    30     items[takeIndex] = null;
    31     if (++takeIndex == items.length)
    32         takeIndex = 0;
    33     count--;
    34     if (itrs != null)
    35         itrs.elementDequeued();
    36     notFull.signal();
    37     return x;
    38 }

    LinkedBlockingQueue

    说明

    LinkedBlockingQueue是一个单向链表实现的阻塞队列。该队列按FIFO(先进先出)排序元素,新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。

    LinkedBlockingQueue是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小等于Integer.MAX_VALUE。

    LinkedBlockingQueue实现了BlockingQueue接口,它支持多线程并发。当多线程竞争同一个资源时,某线程获取到该资源之后,其它线程需要阻塞等待。

    LinkedBlockingQueue在实现多线程对竞争资源的互斥访问时,对于插入和取出操作分别使用了不同的锁。此外,插入锁putLock和非满条件notFull相关联,取出锁takeLock和非空条件notEmpty相关联。通过notFull和notEmpty更细腻的控制锁。

    属性

    1 head是链表的表头。取出数据时,都是从表头head处插入。
    2 last是链表的表尾。新增数据时,都是从表尾last处插入。
    3 count是链表的实际大小,即当前链表中包含的节点个数。
    4 capacity是列表的容量,它是在创建链表时指定的。
    5 putLock是插入锁。
    6 takeLock是取出锁。
    7 notEmpty是非空条件。
    8 notFull是非满条件。

    构造方法

    1 // 创建一个容量为Integer.MAX_VALUE的LinkedBlockingQueue。
    2 LinkedBlockingQueue()
    3 // 创建一个指定容量的LinkedBlockingQueue。
    4 LinkedBlockingQueue(int capacity)
    5 // 创建一个容量是Integer.MAX_VALUE的LinkedBlockingQueue,最初包含给定collection的元素,元素按该collection迭代器的遍历顺序添加。
    6 LinkedBlockingQueue(Collection<? extends E> c)

    其他方法

     1 // 将指定元素插入到此队列的尾部,如果队列已满,则等待。
     2 void put(E e)
     3 // 将指定元素插入到此队列的尾部,如果队列已满,则返回false。
     4 boolean offer(E e)
     5 // 将指定元素插入到此队列的尾部,如果队列已满,则等待指定的时间。
     6 boolean offer(E e, long timeout, TimeUnit unit)
     7 // 获取并移除此队列的头部,如果队列为空,则等待。
     8 E take()
     9 // 获取并移除此队列的头部,如果队列为空,则返回null。
    10 E poll()
    11 // 获取并移除此队列的头部,如果队列为空,则等待指定的时间。
    12 E poll(long timeout, TimeUnit unit)
    13 // 获取但不移除此队列的头,如果此队列为空,则返回null。
    14 E peek()
    15 // 返回在队列中的元素上按适当顺序进行迭代的迭代器。
    16 Iterator<E> iterator()

    ConcurrentLinkedQueue

    说明

    ConcurrentLinkedQueue是线程安全的队列,它适用于“高并发”的场景。ConcurrentLinkedQueue使用CAS来保证更新的线程安全,是一个非阻塞队列。

    ConcurrentLinkedQueue是一个基于链表的无界线程安全队列,按照FIFO(先进先出)原则对元素进行排序。队列元素中不可以放置null元素(内部实现的特殊节点除外)。

    构造方法

    1 // 创建一个最初为空的ConcurrentLinkedQueue。
    2 ConcurrentLinkedQueue()
    3 // 创建一个最初包含给定collection元素的ConcurrentLinkedQueue,按照此collection迭代器的遍历顺序来添加元素。
    4 ConcurrentLinkedQueue(Collection<? extends E> c)

    其他方法

     1 // 将指定元素插入此队列的尾部。
     2 boolean offer(E e)
     3 // 获取并移除此队列的头,如果队列为空,则返回null。
     4 E poll()
     5 // 获取但不移除此队列的头,如果队列为空,则返回null。
     6 E peek()
     7 // 返回在此队列元素上以恰当顺序进行迭代的迭代器。
     8 Iterator<E> iterator()
     9 // 返回此队列中的元素数量。
    10 int size()
  • 相关阅读:
    vue中computed计算属性和methods区别
    java解决跨域问题
    redis服务端开启
    使用excel生成商品条形码
    MySQL主键自增时SQL写法/当前时间写法
    修改MySQL数据库密码
    MySQL5.6.42解压版安装教程
    完全卸载MySQL数据库
    IDEA快捷键及xml文件中网址报错
    IDEA导入外部包
  • 原文地址:https://www.cnblogs.com/shamao/p/11065885.html
Copyright © 2011-2022 走看看