zoukankan      html  css  js  c++  java
  • Java集合框架整理4--基于写时复制机制实现的CopyOnWriteArrayList和CopyOnWriteArraySet实现原理

    前言:

    上篇文章通过源码解析了ArrayList和LinkedList的实现逻辑,但是在多线程的情况下,这两个List实现类都是无法保证线程安全的,所以在JUC中就提供了一种线程安全的List,也就是本文将要解析的CopyOnWriteArrayList

    CopyOnWriteArrayList从名字上可以看出底层的数据结构应该是和ArrayList一样采用数组来实现的,但是既然是线程安全的,所以应该是使用了同步操作,而JUC中同步的方式无非就是使用ReentrantLock来实现。另外JUC中有很多线程安全的集合类名字的前缀

    都是ConConcurrent* 表示是可以并发操作,而CopyOnWriteArrayList却是以CopyOnWrite来作为前缀的,这里就涉及到了一个概念--CopyOnWrite机制(写时复制机制)

    一、CopyOnWrite机制

    CopyOnWrite从单词上直译意思是 通过复制来写,专业的叫法就是写时复制。

    当我们使用集合容器来存储数据时,如果要保证线程安全,可以在操作的时候加锁处理,保证同一时间只有一个人操作即可。还有一种办法是每次操作的时候,都先将容器复制一份副本,然后各个线程直接在各自的容器副本中操作数据,操作完成之后再将当前的副本

    直接替换掉原容器即可,这样只需要保证在替换的时候是线程安全的,就可以保证容器的所有操作都是线程安全的了。这样的好处是只有写操作才会进行复制,而不会影响到读操作,所以读操作还是会直接读原容器,而写是容器副本,从而间接的达到了读写分离的效果。当然既然是读写分离的,就需要注意脏读读问题,需要保证读的是最新的数据,否则会出现读的数据还是旧的,但是实际上已经被更新过了。

    1.1、CopyOnWrite的优点

    读写分离,只对写做同步处理,不对读作同步处理,既保证了写的线程安全又保证了读的高性能。

    如果没有CopyOnWrite机制,如果想要实现线程安全的ArrayList,无非有几种方式

    Synchronzied:如果使用Synchronzied,相当于ArrayList同一时间只可以有一个线程可以访问,很显然效率较低;

    ReentrantLock:和Synchronzied基本上一致同一时间只有一个线程访问

    ReentrantReadWriteLock:读写锁一定程度上实现了读写分开加锁的操作,但是如果有一个线程占有了写锁,此时有大量的读请求时还是会同样被阻塞中,所以读写锁还是会有写锁阻塞读锁的情况

    而CopyOnWrite思想是复制副本来进行修改,而读线程读的还是原数据,就不会出现写线程将读线程给阻塞的情况了。

    二、CopyOnWriteArrayList 

    2.1、CopyOnWriteArrayList的初始化

    CopyOnWriteArrayList一共有三个构造方法,一个是无参,一个参数为对象数组,一个参数为Collection集合,源码分别如下:

     1 /**volatile修饰的对象数组,内部存储数据的结构*/
     2     private transient volatile Object[] array;
     3 
     4     /**无参构造函数*/
     5     public CopyOnWriteArrayList() {
     6         /**新建Object数组,调用setArray方法初始化*/
     7         setArray(new Object[0]);
     8     }
     9 
    10     /**参数为对象数组*/
    11     public CopyOnWriteArrayList(E[] toCopyIn) {
    12         /**直接将参数通过Arrays.copyOf方法复制一个数组*/
    13         setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class));
    14     }
    15 
    16     /**参数为Collection集合*/
    17     public CopyOnWriteArrayList(Collection<? extends E> c) {
    18         /**将集合转化成数组,在执行和上一个构造器方法相同的实现逻辑*/
    19         Object[] elements;
    20         if (c.getClass() == CopyOnWriteArrayList.class)
    21             elements = ((CopyOnWriteArrayList<?>)c).getArray();
    22         else {
    23             elements = c.toArray();
    24             // c.toArray might (incorrectly) not return Object[] (see 6260652)
    25             if (elements.getClass() != Object[].class)
    26                 elements = Arrays.copyOf(elements, elements.length, Object[].class);
    27         }
    28         setArray(elements);
    29     }
    30 
    31     /** 将参数中的数组赋值给内部对象数组属性array,该方法为final类型,不允许被重写 */
    32     final void setArray(Object[] a) {
    33         array = a;
    34     }

    如上代码示,CopyOnWriteArrayList内部存储数据的也是一个Object数组,只不过是volatile修饰的保证了多线程修改时内存可见性,而三个构造方法实现逻辑都是创建一个Object数组,然后直接赋值给CopyOnWriteArrayList内部的Object数组

    2.2、CopyOnWriteArrayList插入数据

    在List尾部插入元素和在指定位置插入元素源码分别如下:

     1 /** final类型重入锁 */
     2     final transient ReentrantLock lock = new ReentrantLock();
     3 
     4     /** 返回存储数据的数组 */
     5     final Object[] getArray() {
     6         return array;
     7     }
     8 
     9     /** 尾部插入数据 */
    10     public boolean add(E e) {
    11         final ReentrantLock lock = this.lock;
    12         lock.lock();
    13         try {
    14             Object[] elements = getArray();
    15             int len = elements.length;
    16             /** 复制一个新数组,容量+1 */
    17             Object[] newElements = Arrays.copyOf(elements, len + 1);
    18             newElements[len] = e;
    19             /** 将新数组赋值给原数组,相当于替换 */
    20             setArray(newElements);
    21             return true;
    22         } finally {
    23             lock.unlock();
    24         }
    25     }
    26 
    27     /** 指定位置插入数据 */
    28     public void add(int index, E element) {
    29         final ReentrantLock lock = this.lock;
    30         lock.lock();
    31         try {
    32             Object[] elements = getArray();
    33             int len = elements.length;
    34             if (index > len || index < 0)
    35                 throw new IndexOutOfBoundsException("Index: "+index+
    36                         ", Size: "+len);
    37             Object[] newElements;
    38             //需要移动的元素个数
    39             int numMoved = len - index;
    40             if (numMoved == 0)
    41                 /**如果不需要移动表示尾部插入,逻辑相当于add(E e)的实现逻辑*/
    42                 newElements = Arrays.copyOf(elements, len + 1);
    43             else {
    44                 /**如果需要移动,先创建新数组,将index前后两部分的数组分别通过复制来赋值给新数组*/
    45                 newElements = new Object[len + 1];
    46                 System.arraycopy(elements, 0, newElements, 0, index);
    47                 System.arraycopy(elements, index, newElements, index + 1,
    48                         numMoved);
    49             }
    50             /** index位置插入数据并替换原数组 */
    51             newElements[index] = element;
    52             setArray(newElements);
    53         } finally {
    54             lock.unlock();
    55         }
    56     }

    从源码可以看出,保证add方法线程安全的方式是通过可重入锁ReentrantLock来实现的,执行插入之前进行加锁,插入完成之后解锁,所以插入的过程肯定是线程安全的。另外和ArrayList实现逻辑不同的是,ArrayList每次插入之前都会判断是否需要扩容,如果需要扩容的话会扩容1.5倍,而CopyOnWriteArrayList每次只扩容1位,扩容之后插入数据直接将复制的副本替换掉原数组,而不是直接在原数组上进行修改的。

    正如CopyOnWrite机制所说的那样,写的时候先通过复制原数组,然后写入数据,最后再直接替换掉原数组。

    2.3、CopyOnWriteArrayList修改数据

     1 /** 设置指定位置数据 */
     2     public E set(int index, E element) {
     3         final ReentrantLock lock = this.lock;
     4         lock.lock();
     5         try {
     6             Object[] elements = getArray();
     7             E oldValue = get(elements, index);
     8 
     9             /** 当旧数据不等于新数据时,直接替换 */
    10             if (oldValue != element) {
    11                 int len = elements.length;
    12                 Object[] newElements = Arrays.copyOf(elements, len);
    13                 newElements[index] = element;
    14                 setArray(newElements);
    15             } else {
    16                 /** 当旧数据和新数据相同时,也进行替换*/
    17                 // Not quite a no-op; ensures volatile write semantics
    18                 setArray(elements);
    19             }
    20             return oldValue;
    21         } finally {
    22             lock.unlock();
    23         }
    24     }

    从源码上流程上没什么大的问题,加锁->复制->修改->替换

    但是有一段代码比较特殊,也就是第18行的setArray(elements), 当set的数据和旧的数据一致时,那么数组是并没有被修改的,理论上执行或不执行setArray这行代码效果是一样的,为什么还需要保留这一行代码呢?

    从第17行的注释可以得知一二,注释的意思是为了确保volatile写的语义。

    因为CopyOnWriteArrayList内部的数组对象array是通过volatile修饰的,而这里的setArray并不是为了保证array对于其他线程的可见性,而是为了保证外部非volatile变量的happen-before原则。

    比如以下案例:

     1 static volatile Object[] list = new Object[1];
     2     boolean flag = false;
     3 
     4     public void func1(){
     5         flag = true;//步骤1
     6         list[0] = new Object();//步骤2
     7     }
     8 
     9     public void func2(){
    10         if(list[0] != null){//步骤3
    11             boolean result = flag;//步骤4
    12         }
    13     }

    定义volatile类型的Object数组和非volatile类型的变量flag,这里存在的happen-before关系为:

    1 before 2 (程序先后关系)、 2 before 3 (volatile规则)、 3 before 4 (程序先后关系)、 1 before 4(根据happen-before传递规则)

    但是如果没有这一行setArray操作,相当于就没有了写操作,也就会丢失 2 before 3的关系了,也就是无法保证volatile变量的写语义了。

    2.4、CopyOnWriteArrayList读取数据

    1 /** 获取数据 */
    2     public E get(int index) {
    3         return get(getArray(), index);
    4     }
    5 
    6     private E get(Object[] a, int index) {
    7         return (E) a[index];
    8     }

    可以看出CopyOnWriteArrayList读取数据的逻辑比较简单,就是从数组中获取指定index的数据,并且没有做同步处理,所以get操作和ArrayList的逻辑和效果是一模一样的。

    但是由于get操作没有任何同步操作,所以理论上是会存在脏读的情况的,比如线程A写数据,在执行setArray之前,线程B读了数据,此时线程B读取的还是原数组中的数据,而实际上线程A已经对副本进行了修改操作了,只是还没有覆盖。同理的还有CopyOnWriteArrayList的size()、isEmpty()等读操作相关的方法都会存在类似的脏读问题。

    注意:虽然CopyOnWriteArrayList底层的数组是volatile修饰的,但是这只能让array对象对其他线程可见,但是array内部存储的数据对于其他线程而言还是不可见的。

    分析完CopyOnWriteArrayList的读写源码之后,实际上其他方法就可以不用再看了,删除数据的remove方法实际也是和add一样,先加锁在复制写数据然后再覆盖原数组。

    2.5、CopyOnWriteArrayList总结

    1、CopyOnWriteArrayList是线程安全的List,底层数据结构也是数组结构,不过通过volatile修饰,使得写操作之后立即刷新内存,使得其他线程读最新的数据。是基于CopyOnWrite机制实现的线程安全的List

    2、CopyOnWriteArrayList每次插入数据都会进行一次扩容,容量加1,并且在写之前都需要通过ReentrantLock加锁处理,然后复制原数组,写完数据之后直接覆盖原数组

    3、CopyOnWriteArrayList的读操作没有加锁处理,所以会存在脏读问题,可能会读到其他线程以及修改,但是还没有替换原数组的数据

    4、CopyOnWriteArrayList每次插入数据都会涉及到数组的复制,所以不适合频繁写而导致频繁复制数组的场景,而读没有加锁,所以适合写少读多的场景。

    5、CopyOnWriteArrayList通过迭代器循环时,只可以循环读,而不可以执行写操作,因为迭代的数据是副本数据。

    6、CopyOnWriteArrayList的set方法当设置数据一直时也同样会复制数组,不是为了保证数组的可见性,而是为了保证外部非volatile变量的happen-before关系,从而实现volatile的语义。

    三、CopyOnWriteArraySet

    CopyOnWriteArraySet是一个线程安全的无序集合,相当于线程安全的HashSet,但是实现和HashSet完全不同,HashSet底层是通过HashMap来实现的,而CopyOnWriteArraySet底层则是通过CopyOnWriteArrayList来实现的,

    CopyOnWriteArraySet 内部基本上所有的方法实现都是通过CopyOnWriteArrayList来实现的。

    3.1、CopyOnWriteArraySet初始化

     1 /** 内部存储数据的结果为CopyOnWriteArrayList */
     2     private final CopyOnWriteArrayList<E> al;
     3 
     4     /**
     5      * 无参构造函数实际就是初始化一个CopyOnWriteArrayList
     6      */
     7     public CopyOnWriteArraySet() {
     8         al = new CopyOnWriteArrayList<E>();
     9     }
    10 
    11     /**
    12      * 初始化CopyOnWriteArrayList,并将集合中的数据初始化到List中
    13      */
    14     public CopyOnWriteArraySet(Collection<? extends E> c) {
    15         if (c.getClass() == CopyOnWriteArraySet.class) {
    16             @SuppressWarnings("unchecked") CopyOnWriteArraySet<E> cc =
    17                     (CopyOnWriteArraySet<E>)c;
    18             al = new CopyOnWriteArrayList<E>(cc.al);
    19         }
    20         else {
    21             al = new CopyOnWriteArrayList<E>();
    22             al.addAllAbsent(c);
    23         }
    24     }

    CopyOnWriteArraySet的初始化过程实际就是初始化了内部的CopyOnWriteArrayList

    3.2、CopyOnWriteArraySet增删数据

    1 public boolean add(E e) {
    2         return al.addIfAbsent(e);
    3     }
    1 public boolean remove(Object o) {
    2         return al.remove(o);
    3     }

    插入数据是调用CopyOnWriteArrayList的addIfAbsent方法,该方法的作用是如果插入的数据不存在就插入,否则就插入失败,这样的操作就实现了set中不会出现重复数据的要求。源码如下:

     1 /** 如果不存在就插入数据 */
     2     public boolean addIfAbsent(E e) {
     3         /** 1.获取数组的快照 */
     4         Object[] snapshot = getArray();
     5         /** 2.判断元素在数组中的位置是否大于0,如果大于0表示已经存在,则直接返回false;
     6          *    如果位置小于0则表示元素不存在,则调用addIfAbsent()方法插入数据
     7          *    */
     8         return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
     9                 addIfAbsent(e, snapshot);
    10     }
    11 
    12     /** 如果不存在就插入数据 */
    13     private boolean addIfAbsent(E e, Object[] snapshot) {
    14         final ReentrantLock lock = this.lock;
    15         lock.lock();
    16         try {
    17             Object[] current = getArray();
    18             int len = current.length;
    19             //如果传入数组和当前数组不相等,表示数组已经被修改过
    20             if (snapshot != current) {
    21                 /**
    22                  * 取两个数组长度的最小值common
    23                  * 从0开始到common判断两个数组每一位数据是否相等,并且是否和插入的元素相等,如果数组某一位不一致或者和插入数据一直就返回false */
    24                 int common = Math.min(snapshot.length, len);
    25                 for (int i = 0; i < common; i++)
    26                     if (current[i] != snapshot[i] && eq(e, current[i]))
    27                         return false;
    28                 /**判断当前数组中common到最后直接是否存在插入的数据
    29                  * (因为此时快照snapshot已经判断过了不存在了,且0-common之间也已经不存在了,所以只需要判断current中
    30                  * 的common-len之间是否包含插入数据)  */
    31                 if (indexOf(e, current, common, len) >= 0)
    32                     return false;
    33             }
    34             /** 如果数组没有被修改过,或者修改之后的数组中也不包含待插入的数据,则之间在尾部插入 */
    35             Object[] newElements = Arrays.copyOf(current, len + 1);
    36             newElements[len] = e;
    37             setArray(newElements);
    38             return true;
    39         } finally {
    40             lock.unlock();
    41         }
    42     }
    43 
    44     /** 查找指定数据在数组中的位置index值*/
    45     private static int indexOf(Object o, Object[] elements,
    46                                int index, int fence) {
    47         if (o == null) {
    48             for (int i = index; i < fence; i++)
    49                 if (elements[i] == null)
    50                     return i;
    51         } else {
    52             for (int i = index; i < fence; i++)
    53                 if (o.equals(elements[i]))
    54                     return i;
    55         }
    56         return -1;
    57     }

    可以看出实际就是遍历数组,是否包含待插入的数据,如果不包含则可以插入,否则直接返回false不允许插入。

    类似的CopyOnWriteArraySet的所有方法基本上全是通过CopyOnWriteArrayList来实现的。相当于CopyOnWriteArraySet就是去重版本的CopyOnWriteArrayList。

    Extra

    1、CopyOnWrite思想是一种写时加锁,而读是不加锁,且写操作不会阻塞读操作,而读操作可以读取到最新的数据的一种机制。

    2、除了JUC中实现的CopyOnWriteArrayList和CopyOnWriteArraySet,Kafka的源码中也基于CopyOnWrite思想实现了CopyOnWriteMap,源码如下,有兴趣的可自行分析:

      1 public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
      2 
      3 private volatile Map<K, V> map;
      4 
      5 public CopyOnWriteMap() {
      6     this.map = Collections.emptyMap();
      7 }
      8 
      9 public CopyOnWriteMap(Map<K, V> map) {
     10     this.map = Collections.unmodifiableMap(map);
     11 }
     12 
     13 @Override
     14 public boolean containsKey(Object k) {
     15     return map.containsKey(k);
     16 }
     17 
     18 @Override
     19 public boolean containsValue(Object v) {
     20     return map.containsValue(v);
     21 }
     22 
     23 @Override
     24 public Set<java.util.Map.Entry<K, V>> entrySet() {
     25     return map.entrySet();
     26 }
     27 
     28 @Override
     29 public V get(Object k) {
     30     return map.get(k);
     31 }
     32 
     33 @Override
     34 public boolean isEmpty() {
     35     return map.isEmpty();
     36 }
     37 
     38 @Override
     39 public Set<K> keySet() {
     40     return map.keySet();
     41 }
     42 
     43 @Override
     44 public int size() {
     45     return map.size();
     46 }
     47 
     48 @Override
     49 public Collection<V> values() {
     50     return map.values();
     51 }
     52 
     53 @Override
     54 public synchronized void clear() {
     55     this.map = Collections.emptyMap();
     56 }
     57 
     58 @Override
     59 public synchronized V put(K k, V v) {
     60     Map<K, V> copy = new HashMap<K, V>(this.map);
     61     V prev = copy.put(k, v);
     62     this.map = Collections.unmodifiableMap(copy);
     63     return prev;
     64 }
     65 
     66 @Override
     67 public synchronized void putAll(Map<? extends K, ? extends V> entries) {
     68     Map<K, V> copy = new HashMap<K, V>(this.map);
     69     copy.putAll(entries);
     70     this.map = Collections.unmodifiableMap(copy);
     71 }
     72 
     73 @Override
     74 public synchronized V remove(Object key) {
     75     Map<K, V> copy = new HashMap<K, V>(this.map);
     76     V prev = copy.remove(key);
     77     this.map = Collections.unmodifiableMap(copy);
     78     return prev;
     79 }
     80 
     81 @Override
     82 public synchronized V putIfAbsent(K k, V v) {
     83     if (!containsKey(k))
     84         return put(k, v);
     85     else
     86         return get(k);
     87 }
     88 
     89 @Override
     90 public synchronized boolean remove(Object k, Object v) {
     91     if (containsKey(k) && get(k).equals(v)) {
     92         remove(k);
     93         return true;
     94     } else {
     95         return false;
     96     }
     97 }
     98 
     99 @Override
    100 public synchronized boolean replace(K k, V original, V replacement) {
    101     if (containsKey(k) && get(k).equals(original)) {
    102         put(k, replacement);
    103         return true;
    104     } else {
    105         return false;
    106     }
    107 }
    108 
    109 @Override
    110 public synchronized V replace(K k, V v) {
    111     if (containsKey(k)) {
    112         return put(k, v);
    113     } else {
    114         return null;
    115     }
    116 }
    117 }
  • 相关阅读:
    Windows2008R2安装DNS和SQLServer200r2服务 (9.18第七天)
    Windows2008R2安装iis和iis下搭建web服务器(9.18 第七天)
    Ubuntu 安装phpmyadmin (9.17第六天)
    Ubuntu Navicat链接mysql (9.17第六天)
    Spring之AOP由浅入深
    oracle并行模式(Parallel)
    转:Java后端面试自我学习
    Spring Security 简介
    spring boot入门
    git--分布式版本管理系统
  • 原文地址:https://www.cnblogs.com/jackion5/p/13033766.html
Copyright © 2011-2022 走看看