前言:
上篇文章通过源码解析了ArrayList和LinkedList的实现逻辑,但是在多线程的情况下,这两个List实现类都是无法保证线程安全的,所以在JUC中就提供了一种线程安全的List,也就是本文将要解析的CopyOnWriteArrayList
CopyOnWriteArrayList从名字上可以看出底层的数据结构应该是和ArrayList一样采用数组来实现的,但是既然是线程安全的,所以应该是使用了同步操作,而JUC中同步的方式无非就是使用ReentrantLock来实现。另外JUC中有很多线程安全的集合类名字的前缀
都是ConConcurrent* 表示是可以并发操作,而CopyOnWriteArrayList却是以CopyOnWrite来作为前缀的,这里就涉及到了一个概念--CopyOnWrite机制(写时复制机制)
一、CopyOnWrite机制
CopyOnWrite从单词上直译意思是 通过复制来写,专业的叫法就是写时复制。
当我们使用集合容器来存储数据时,如果要保证线程安全,可以在操作的时候加锁处理,保证同一时间只有一个人操作即可。还有一种办法是每次操作的时候,都先将容器复制一份副本,然后各个线程直接在各自的容器副本中操作数据,操作完成之后再将当前的副本
直接替换掉原容器即可,这样只需要保证在替换的时候是线程安全的,就可以保证容器的所有操作都是线程安全的了。这样的好处是只有写操作才会进行复制,而不会影响到读操作,所以读操作还是会直接读原容器,而写是容器副本,从而间接的达到了读写分离的效果。当然既然是读写分离的,就需要注意脏读读问题,需要保证读的是最新的数据,否则会出现读的数据还是旧的,但是实际上已经被更新过了。
1.1、CopyOnWrite的优点
读写分离,只对写做同步处理,不对读作同步处理,既保证了写的线程安全又保证了读的高性能。
如果没有CopyOnWrite机制,如果想要实现线程安全的ArrayList,无非有几种方式
Synchronzied:如果使用Synchronzied,相当于ArrayList同一时间只可以有一个线程可以访问,很显然效率较低;
ReentrantLock:和Synchronzied基本上一致同一时间只有一个线程访问
ReentrantReadWriteLock:读写锁一定程度上实现了读写分开加锁的操作,但是如果有一个线程占有了写锁,此时有大量的读请求时还是会同样被阻塞中,所以读写锁还是会有写锁阻塞读锁的情况
而CopyOnWrite思想是复制副本来进行修改,而读线程读的还是原数据,就不会出现写线程将读线程给阻塞的情况了。
二、CopyOnWriteArrayList
2.1、CopyOnWriteArrayList的初始化
CopyOnWriteArrayList一共有三个构造方法,一个是无参,一个参数为对象数组,一个参数为Collection集合,源码分别如下:
1 /**volatile修饰的对象数组,内部存储数据的结构*/ 2 private transient volatile Object[] array; 3 4 /**无参构造函数*/ 5 public CopyOnWriteArrayList() { 6 /**新建Object数组,调用setArray方法初始化*/ 7 setArray(new Object[0]); 8 } 9 10 /**参数为对象数组*/ 11 public CopyOnWriteArrayList(E[] toCopyIn) { 12 /**直接将参数通过Arrays.copyOf方法复制一个数组*/ 13 setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class)); 14 } 15 16 /**参数为Collection集合*/ 17 public CopyOnWriteArrayList(Collection<? extends E> c) { 18 /**将集合转化成数组,在执行和上一个构造器方法相同的实现逻辑*/ 19 Object[] elements; 20 if (c.getClass() == CopyOnWriteArrayList.class) 21 elements = ((CopyOnWriteArrayList<?>)c).getArray(); 22 else { 23 elements = c.toArray(); 24 // c.toArray might (incorrectly) not return Object[] (see 6260652) 25 if (elements.getClass() != Object[].class) 26 elements = Arrays.copyOf(elements, elements.length, Object[].class); 27 } 28 setArray(elements); 29 } 30 31 /** 将参数中的数组赋值给内部对象数组属性array,该方法为final类型,不允许被重写 */ 32 final void setArray(Object[] a) { 33 array = a; 34 }
如上代码示,CopyOnWriteArrayList内部存储数据的也是一个Object数组,只不过是volatile修饰的保证了多线程修改时内存可见性,而三个构造方法实现逻辑都是创建一个Object数组,然后直接赋值给CopyOnWriteArrayList内部的Object数组
2.2、CopyOnWriteArrayList插入数据
在List尾部插入元素和在指定位置插入元素源码分别如下:
1 /** final类型重入锁 */ 2 final transient ReentrantLock lock = new ReentrantLock(); 3 4 /** 返回存储数据的数组 */ 5 final Object[] getArray() { 6 return array; 7 } 8 9 /** 尾部插入数据 */ 10 public boolean add(E e) { 11 final ReentrantLock lock = this.lock; 12 lock.lock(); 13 try { 14 Object[] elements = getArray(); 15 int len = elements.length; 16 /** 复制一个新数组,容量+1 */ 17 Object[] newElements = Arrays.copyOf(elements, len + 1); 18 newElements[len] = e; 19 /** 将新数组赋值给原数组,相当于替换 */ 20 setArray(newElements); 21 return true; 22 } finally { 23 lock.unlock(); 24 } 25 } 26 27 /** 指定位置插入数据 */ 28 public void add(int index, E element) { 29 final ReentrantLock lock = this.lock; 30 lock.lock(); 31 try { 32 Object[] elements = getArray(); 33 int len = elements.length; 34 if (index > len || index < 0) 35 throw new IndexOutOfBoundsException("Index: "+index+ 36 ", Size: "+len); 37 Object[] newElements; 38 //需要移动的元素个数 39 int numMoved = len - index; 40 if (numMoved == 0) 41 /**如果不需要移动表示尾部插入,逻辑相当于add(E e)的实现逻辑*/ 42 newElements = Arrays.copyOf(elements, len + 1); 43 else { 44 /**如果需要移动,先创建新数组,将index前后两部分的数组分别通过复制来赋值给新数组*/ 45 newElements = new Object[len + 1]; 46 System.arraycopy(elements, 0, newElements, 0, index); 47 System.arraycopy(elements, index, newElements, index + 1, 48 numMoved); 49 } 50 /** index位置插入数据并替换原数组 */ 51 newElements[index] = element; 52 setArray(newElements); 53 } finally { 54 lock.unlock(); 55 } 56 }
从源码可以看出,保证add方法线程安全的方式是通过可重入锁ReentrantLock来实现的,执行插入之前进行加锁,插入完成之后解锁,所以插入的过程肯定是线程安全的。另外和ArrayList实现逻辑不同的是,ArrayList每次插入之前都会判断是否需要扩容,如果需要扩容的话会扩容1.5倍,而CopyOnWriteArrayList每次只扩容1位,扩容之后插入数据直接将复制的副本替换掉原数组,而不是直接在原数组上进行修改的。
正如CopyOnWrite机制所说的那样,写的时候先通过复制原数组,然后写入数据,最后再直接替换掉原数组。
2.3、CopyOnWriteArrayList修改数据
1 /** 设置指定位置数据 */ 2 public E set(int index, E element) { 3 final ReentrantLock lock = this.lock; 4 lock.lock(); 5 try { 6 Object[] elements = getArray(); 7 E oldValue = get(elements, index); 8 9 /** 当旧数据不等于新数据时,直接替换 */ 10 if (oldValue != element) { 11 int len = elements.length; 12 Object[] newElements = Arrays.copyOf(elements, len); 13 newElements[index] = element; 14 setArray(newElements); 15 } else { 16 /** 当旧数据和新数据相同时,也进行替换*/ 17 // Not quite a no-op; ensures volatile write semantics 18 setArray(elements); 19 } 20 return oldValue; 21 } finally { 22 lock.unlock(); 23 } 24 }
从源码上流程上没什么大的问题,加锁->复制->修改->替换
但是有一段代码比较特殊,也就是第18行的setArray(elements), 当set的数据和旧的数据一致时,那么数组是并没有被修改的,理论上执行或不执行setArray这行代码效果是一样的,为什么还需要保留这一行代码呢?
从第17行的注释可以得知一二,注释的意思是为了确保volatile写的语义。
因为CopyOnWriteArrayList内部的数组对象array是通过volatile修饰的,而这里的setArray并不是为了保证array对于其他线程的可见性,而是为了保证外部非volatile变量的happen-before原则。
比如以下案例:
1 static volatile Object[] list = new Object[1]; 2 boolean flag = false; 3 4 public void func1(){ 5 flag = true;//步骤1 6 list[0] = new Object();//步骤2 7 } 8 9 public void func2(){ 10 if(list[0] != null){//步骤3 11 boolean result = flag;//步骤4 12 } 13 }
定义volatile类型的Object数组和非volatile类型的变量flag,这里存在的happen-before关系为:
1 before 2 (程序先后关系)、 2 before 3 (volatile规则)、 3 before 4 (程序先后关系)、 1 before 4(根据happen-before传递规则)
但是如果没有这一行setArray操作,相当于就没有了写操作,也就会丢失 2 before 3的关系了,也就是无法保证volatile变量的写语义了。
2.4、CopyOnWriteArrayList读取数据
1 /** 获取数据 */ 2 public E get(int index) { 3 return get(getArray(), index); 4 } 5 6 private E get(Object[] a, int index) { 7 return (E) a[index]; 8 }
可以看出CopyOnWriteArrayList读取数据的逻辑比较简单,就是从数组中获取指定index的数据,并且没有做同步处理,所以get操作和ArrayList的逻辑和效果是一模一样的。
但是由于get操作没有任何同步操作,所以理论上是会存在脏读的情况的,比如线程A写数据,在执行setArray之前,线程B读了数据,此时线程B读取的还是原数组中的数据,而实际上线程A已经对副本进行了修改操作了,只是还没有覆盖。同理的还有CopyOnWriteArrayList的size()、isEmpty()等读操作相关的方法都会存在类似的脏读问题。
注意:虽然CopyOnWriteArrayList底层的数组是volatile修饰的,但是这只能让array对象对其他线程可见,但是array内部存储的数据对于其他线程而言还是不可见的。
分析完CopyOnWriteArrayList的读写源码之后,实际上其他方法就可以不用再看了,删除数据的remove方法实际也是和add一样,先加锁在复制写数据然后再覆盖原数组。
2.5、CopyOnWriteArrayList总结
1、CopyOnWriteArrayList是线程安全的List,底层数据结构也是数组结构,不过通过volatile修饰,使得写操作之后立即刷新内存,使得其他线程读最新的数据。是基于CopyOnWrite机制实现的线程安全的List
2、CopyOnWriteArrayList每次插入数据都会进行一次扩容,容量加1,并且在写之前都需要通过ReentrantLock加锁处理,然后复制原数组,写完数据之后直接覆盖原数组
3、CopyOnWriteArrayList的读操作没有加锁处理,所以会存在脏读问题,可能会读到其他线程以及修改,但是还没有替换原数组的数据
4、CopyOnWriteArrayList每次插入数据都会涉及到数组的复制,所以不适合频繁写而导致频繁复制数组的场景,而读没有加锁,所以适合写少读多的场景。
5、CopyOnWriteArrayList通过迭代器循环时,只可以循环读,而不可以执行写操作,因为迭代的数据是副本数据。
6、CopyOnWriteArrayList的set方法当设置数据一直时也同样会复制数组,不是为了保证数组的可见性,而是为了保证外部非volatile变量的happen-before关系,从而实现volatile的语义。
三、CopyOnWriteArraySet
CopyOnWriteArraySet是一个线程安全的无序集合,相当于线程安全的HashSet,但是实现和HashSet完全不同,HashSet底层是通过HashMap来实现的,而CopyOnWriteArraySet底层则是通过CopyOnWriteArrayList来实现的,
CopyOnWriteArraySet 内部基本上所有的方法实现都是通过CopyOnWriteArrayList来实现的。
3.1、CopyOnWriteArraySet初始化
1 /** 内部存储数据的结果为CopyOnWriteArrayList */ 2 private final CopyOnWriteArrayList<E> al; 3 4 /** 5 * 无参构造函数实际就是初始化一个CopyOnWriteArrayList 6 */ 7 public CopyOnWriteArraySet() { 8 al = new CopyOnWriteArrayList<E>(); 9 } 10 11 /** 12 * 初始化CopyOnWriteArrayList,并将集合中的数据初始化到List中 13 */ 14 public CopyOnWriteArraySet(Collection<? extends E> c) { 15 if (c.getClass() == CopyOnWriteArraySet.class) { 16 @SuppressWarnings("unchecked") CopyOnWriteArraySet<E> cc = 17 (CopyOnWriteArraySet<E>)c; 18 al = new CopyOnWriteArrayList<E>(cc.al); 19 } 20 else { 21 al = new CopyOnWriteArrayList<E>(); 22 al.addAllAbsent(c); 23 } 24 }
CopyOnWriteArraySet的初始化过程实际就是初始化了内部的CopyOnWriteArrayList
3.2、CopyOnWriteArraySet增删数据
1 public boolean add(E e) { 2 return al.addIfAbsent(e); 3 }
1 public boolean remove(Object o) { 2 return al.remove(o); 3 }
插入数据是调用CopyOnWriteArrayList的addIfAbsent方法,该方法的作用是如果插入的数据不存在就插入,否则就插入失败,这样的操作就实现了set中不会出现重复数据的要求。源码如下:
1 /** 如果不存在就插入数据 */ 2 public boolean addIfAbsent(E e) { 3 /** 1.获取数组的快照 */ 4 Object[] snapshot = getArray(); 5 /** 2.判断元素在数组中的位置是否大于0,如果大于0表示已经存在,则直接返回false; 6 * 如果位置小于0则表示元素不存在,则调用addIfAbsent()方法插入数据 7 * */ 8 return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false : 9 addIfAbsent(e, snapshot); 10 } 11 12 /** 如果不存在就插入数据 */ 13 private boolean addIfAbsent(E e, Object[] snapshot) { 14 final ReentrantLock lock = this.lock; 15 lock.lock(); 16 try { 17 Object[] current = getArray(); 18 int len = current.length; 19 //如果传入数组和当前数组不相等,表示数组已经被修改过 20 if (snapshot != current) { 21 /** 22 * 取两个数组长度的最小值common 23 * 从0开始到common判断两个数组每一位数据是否相等,并且是否和插入的元素相等,如果数组某一位不一致或者和插入数据一直就返回false */ 24 int common = Math.min(snapshot.length, len); 25 for (int i = 0; i < common; i++) 26 if (current[i] != snapshot[i] && eq(e, current[i])) 27 return false; 28 /**判断当前数组中common到最后直接是否存在插入的数据 29 * (因为此时快照snapshot已经判断过了不存在了,且0-common之间也已经不存在了,所以只需要判断current中 30 * 的common-len之间是否包含插入数据) */ 31 if (indexOf(e, current, common, len) >= 0) 32 return false; 33 } 34 /** 如果数组没有被修改过,或者修改之后的数组中也不包含待插入的数据,则之间在尾部插入 */ 35 Object[] newElements = Arrays.copyOf(current, len + 1); 36 newElements[len] = e; 37 setArray(newElements); 38 return true; 39 } finally { 40 lock.unlock(); 41 } 42 } 43 44 /** 查找指定数据在数组中的位置index值*/ 45 private static int indexOf(Object o, Object[] elements, 46 int index, int fence) { 47 if (o == null) { 48 for (int i = index; i < fence; i++) 49 if (elements[i] == null) 50 return i; 51 } else { 52 for (int i = index; i < fence; i++) 53 if (o.equals(elements[i])) 54 return i; 55 } 56 return -1; 57 }
可以看出实际就是遍历数组,是否包含待插入的数据,如果不包含则可以插入,否则直接返回false不允许插入。
类似的CopyOnWriteArraySet的所有方法基本上全是通过CopyOnWriteArrayList来实现的。相当于CopyOnWriteArraySet就是去重版本的CopyOnWriteArrayList。
Extra
1、CopyOnWrite思想是一种写时加锁,而读是不加锁,且写操作不会阻塞读操作,而读操作可以读取到最新的数据的一种机制。
2、除了JUC中实现的CopyOnWriteArrayList和CopyOnWriteArraySet,Kafka的源码中也基于CopyOnWrite思想实现了CopyOnWriteMap,源码如下,有兴趣的可自行分析:
1 public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> { 2 3 private volatile Map<K, V> map; 4 5 public CopyOnWriteMap() { 6 this.map = Collections.emptyMap(); 7 } 8 9 public CopyOnWriteMap(Map<K, V> map) { 10 this.map = Collections.unmodifiableMap(map); 11 } 12 13 @Override 14 public boolean containsKey(Object k) { 15 return map.containsKey(k); 16 } 17 18 @Override 19 public boolean containsValue(Object v) { 20 return map.containsValue(v); 21 } 22 23 @Override 24 public Set<java.util.Map.Entry<K, V>> entrySet() { 25 return map.entrySet(); 26 } 27 28 @Override 29 public V get(Object k) { 30 return map.get(k); 31 } 32 33 @Override 34 public boolean isEmpty() { 35 return map.isEmpty(); 36 } 37 38 @Override 39 public Set<K> keySet() { 40 return map.keySet(); 41 } 42 43 @Override 44 public int size() { 45 return map.size(); 46 } 47 48 @Override 49 public Collection<V> values() { 50 return map.values(); 51 } 52 53 @Override 54 public synchronized void clear() { 55 this.map = Collections.emptyMap(); 56 } 57 58 @Override 59 public synchronized V put(K k, V v) { 60 Map<K, V> copy = new HashMap<K, V>(this.map); 61 V prev = copy.put(k, v); 62 this.map = Collections.unmodifiableMap(copy); 63 return prev; 64 } 65 66 @Override 67 public synchronized void putAll(Map<? extends K, ? extends V> entries) { 68 Map<K, V> copy = new HashMap<K, V>(this.map); 69 copy.putAll(entries); 70 this.map = Collections.unmodifiableMap(copy); 71 } 72 73 @Override 74 public synchronized V remove(Object key) { 75 Map<K, V> copy = new HashMap<K, V>(this.map); 76 V prev = copy.remove(key); 77 this.map = Collections.unmodifiableMap(copy); 78 return prev; 79 } 80 81 @Override 82 public synchronized V putIfAbsent(K k, V v) { 83 if (!containsKey(k)) 84 return put(k, v); 85 else 86 return get(k); 87 } 88 89 @Override 90 public synchronized boolean remove(Object k, Object v) { 91 if (containsKey(k) && get(k).equals(v)) { 92 remove(k); 93 return true; 94 } else { 95 return false; 96 } 97 } 98 99 @Override 100 public synchronized boolean replace(K k, V original, V replacement) { 101 if (containsKey(k) && get(k).equals(original)) { 102 put(k, replacement); 103 return true; 104 } else { 105 return false; 106 } 107 } 108 109 @Override 110 public synchronized V replace(K k, V v) { 111 if (containsKey(k)) { 112 return put(k, v); 113 } else { 114 return null; 115 } 116 } 117 }