线程不安全集合类
ArrayList
List是线程不安全的集合类,底层是Object数组实现,初始化容量是10(其实是一个空数组,第一次扩容时,将数组扩容为10),其后每次扩容大小为当前容量的一半(oldCapacity >> 1)。
初始化
/**
* Constructs an empty list with an initial capacity of ten.
*/
public ArrayList() {
this.elementData = DEFAULTCAPACITY_EMPTY_ELEMENTDATA;
}
private static final Object[] DEFAULTCAPACITY_EMPTY_ELEMENTDATA = {};
扩容
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}
private void ensureCapacityInternal(int minCapacity) {
ensureExplicitCapacity(calculateCapacity(elementData, minCapacity));
}
private static int calculateCapacity(Object[] elementData, int minCapacity) {
if (elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA) {
return Math.max(DEFAULT_CAPACITY, minCapacity);
}
return minCapacity;
}
初次扩容,将底层数组容量设为10。
private void ensureExplicitCapacity(int minCapacity) {
modCount++;
// overflow-conscious code
if (minCapacity - elementData.length > 0)
grow(minCapacity);
}
private void grow(int minCapacity) {
// overflow-conscious code
int oldCapacity = elementData.length;
int newCapacity = oldCapacity + (oldCapacity >> 1);
if (newCapacity - minCapacity < 0)
newCapacity = minCapacity;
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
// minCapacity is usually close to size, so this is a win:
elementData = Arrays.copyOf(elementData, newCapacity);
}
动态扩容,是将底层数组容量扩容当前容量的一半(oldCapacity >> 1)。
线程不安全示例
package com.chinda.juc.coll;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.IdUtil;
import java.util.ArrayList;
import java.util.List;
/**
* ArrayList线程不安全
* @author Wang Chinda
* @date 2020/5/10
* @see
* @since 1.0
*/
public class ListUnsafe {
public static void main(String[] args) {
List<String> list = CollUtil.newArrayList();
for (int i = 1; i <= 3; i++) {
new Thread(() -> {
list.add(IdUtil.simpleUUID());
System.out.println(list);
}, String.valueOf(i)).start();
}
}
}
本示例依赖包
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-core</artifactId>
<version>5.2.3</version>
</dependency>
控制台输出
[0b867a48ef73409885294f6e1e643ce3]
[0b867a48ef73409885294f6e1e643ce3]
[0b867a48ef73409885294f6e1e643ce3]
循环30次控制台输出异常
java.util.ConcurrentModificationException
导致原因
因是线程并发写入数据,当线程A正在写数据时,线程执行一半时,线程B抢到资源,开始执行。这就会导致线程A写入数据不正确。
比如现实中的花名册
单线程执行解释:班长将会依次的将全班所有的同学都写入花名册。
多线程执行解释:全班同学自己签自己名字,可能会出现李四刚写一个李字时,花名册被张三抢去接着写的情况。假如班长看着,必须一个人写完名字,才允许第二个人写名字,依次往复。这就是锁的概念。
解决不安全
第一种方案
添加元素方法中添加synchronized。Java已实现,Vector类。
第二种方案
集合操作工具类。
List<String> list = Collections.synchronizedList(CollUtil.newArrayList());
第三种方案
推荐使用此种方案。
List<String> list = new CopyOnWriteArrayList<>();
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
附带,HashMap初始化容量是16, 每次扩容是当前容量*2。HashSet底层数据结构就是HashMap,为什么HashMap是put存放数据而HashSet是add,是因为HashSet存放将add的元素存放为HashMap的key中,value一直是一个Object对象。
锁概念
公平锁和非公平锁
java.util.concurrent.locks
包中ReentrantLock的创建可以指定构造函数的boolean类型指定是公平锁还是非公平锁,默认是非公平锁。非公平锁的有点在于吞吐量比公平锁大。synchronized也是一种非公平锁。
公平锁
多个线程按照申请锁的顺序来获取锁,遵循先申请到先得原则。
非公平锁
多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请锁的线程比先申请锁的线程优先获取锁,在高并发的情况下,有可能造成优先级反转(后申请锁的线程总是先得到锁)或者饥饿现象(先申请锁的线程一直没有获取到锁)。
可重入锁(递归锁)
同一线程外出函数获得锁之后,内存递归函数仍然能获取该锁得代码,同一线程在外层方法获取锁的时候,在进入内层方法时会自动获取锁,即线程可以进入任何一个它已经拥有的锁同步的代码块。可重入锁最大的作用时避免死锁。
可重入锁示例
package com.chinda.juc.coll;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author Wang Chinda
* @date 2020/5/10
* @see
* @since 1.0
*/
public class ReenterLock {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(() -> {
phone.sendSMS();
}, "t1").start();
new Thread(() -> {
phone.sendSMS();
}, "t2").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println();
System.out.println();
System.out.println();
System.out.println();
Thread t3 = new Thread(phone, "t3");
Thread t4 = new Thread(phone, "t4");
t3.start();
t4.start();
}
}
class Phone implements Runnable {
public synchronized void sendSMS() {
System.out.println(Thread.currentThread().getName() + " invoked sendSMS()");
sendEmail();
}
public synchronized void sendEmail() {
System.out.println(Thread.currentThread().getName() + " *****invoked sendEmail()");
}
Lock lock = new ReentrantLock();
@Override
public void run() {
get();
}
private void get() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " invoked get()");
set();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private void set() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " invoked set()");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
控制台打印
t1 invoked sendSMS()
t1 *****invoked sendEmail()
t2 invoked sendSMS()
t2 *****invoked sendEmail()
t4 invoked get()
t4 invoked set()
t3 invoked get()
t3 invoked set()
线程执行时,进入到嵌套方法时,不需要获取锁,可直接进入。线程执行嵌套方法时,没有被其余线程加塞。
注意:若加锁与解锁个数相匹配,编译不会失败,执行不会阻塞;若加锁比解锁多,线程会进入阻塞状态;若解锁比加锁多执行会抛出IllegalMonitorStateException异常
自旋锁
尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU资源。
public final int getAndSetInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var4));
return var5;
}
自旋锁示例
package com.chinda.juc.coll;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* 优点: 循环比较获取直到成功为止, 没有类似wait的阻塞
* <p>
* 通过CAS操作完成自旋锁, A线程先进来掉哦那个myLock方法自己持有锁5秒, B随后进来后发现当前线程持有锁, 不是null, 自选等待, 直到A释放锁后B才会抢到。
*
* @author Wang Chinda
* @date 2020/5/10
* @see
* @since 1.0
*/
public class SpinDemo {
AtomicReference<Thread> atomicReference = new AtomicReference<Thread>();
public void myLock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + " com in (*^_^*)");
while (!atomicReference.compareAndSet(null, thread)) {
}
}
public void myUnlock() {
Thread thread = Thread.currentThread();
atomicReference.compareAndSet(thread, null);
System.out.println(Thread.currentThread().getName() + " invoked myUnlock()");
}
public static void main(String[] args) {
SpinDemo spinDemo = new SpinDemo();
new Thread(() -> {
spinDemo.myLock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
spinDemo.myUnlock();
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
spinDemo.myLock();
spinDemo.myUnlock();
}, "B").start();
}
}
控制台打印
A com in (*^_^*)
B com in (*^_^*)
A invoked myUnlock()
B invoked myUnlock()
线程A执行锁时,因为没有任何人获取锁,所以锁为null。线程B获取锁时,锁已经被线程A占用,线程B循环循环获取锁,直到线程A释放锁为止。
独占锁(写锁)/共享锁(读锁)/互斥锁
独占锁
一次只能被一个线程所持有。ReetrantLock和synchronized都是独占锁。
共享锁
可以被多个线程所持有。
ReentrantReadWriteLock其读锁时共享锁,写时独占锁。读锁的共享锁可保证高并发读时非常高效的。读写、写读、写写的过程时互斥的。
读写锁
线程不安全示例
package com.chinda.juc.coll;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.thread.ThreadUtil;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 多个线程同时读取一个资源类没有任何问题, 所以为了满足并发量, 读取共享资源应该可以同时进行。
* 但是如果有一个线程去写共享资源,就不应该再有其他线程可以对该资源进行读或者写。
* 即:
* 读-读 能共存
* 读-写 不能共存
* 写-写 不能共存
* 写操作: 原子+独占, 整个过程必须时完成的统一体,中间不允许被分割, 被打断。
* @author Wang Chinda
* @date 2020/5/11
* @see
* @since 1.0
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 1; i <= 10; i++) {
String finalI = i + "";
new Thread(() -> {
myCache.put(finalI, finalI);
}, String.valueOf(i)).start();
}
for (int i = 1; i <= 10; i++) {
String finalI = i + "";
new Thread(() -> {
myCache.get(finalI);
}, String.valueOf(i)).start();
}
}
}
class MyCache {
private volatile Map<String, Object> map = CollUtil.newHashMap();
public void put(String key, Object value) {
System.out.println(Thread.currentThread().getName() + " 正在写入: " + key);
ThreadUtil.sleep(300, TimeUnit.MILLISECONDS);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + " 写入完成");
}
public void get(String key) {
System.out.println(Thread.currentThread().getName() + " 正在读取: " + key);
ThreadUtil.sleep(300, TimeUnit.MILLISECONDS);
Object result = map.get(key);
System.out.println(Thread.currentThread().getName() + " 读取完成:" + result);
}
}
线程安全示例
package com.chinda.juc.coll;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.thread.ThreadUtil;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 多个线程同时读取一个资源类没有任何问题, 所以为了满足并发量, 读取共享资源应该可以同时进行。
* 但是如果有一个线程去写共享资源,就不应该再有其他线程可以对该资源进行读或者写。
* 即:
* 读-读 能共存
* 读-写 不能共存
* 写-写 不能共存
* 写操作: 原子+独占, 整个过程必须时完成的统一体,中间不允许被分割, 被打断。
* @author Wang Chinda
* @date 2020/5/11
* @see
* @since 1.0
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 1; i <= 10; i++) {
String finalI = i + "";
new Thread(() -> {
myCache.put(finalI, finalI);
}, String.valueOf(i)).start();
}
for (int i = 1; i <= 10; i++) {
String finalI = i + "";
new Thread(() -> {
myCache.get(finalI);
}, String.valueOf(i)).start();
}
}
}
class MyCache {
private volatile Map<String, Object> map = CollUtil.newHashMap();
private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
public void put(String key, Object value) {
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " 正在写入: " + key);
ThreadUtil.sleep(300, TimeUnit.MILLISECONDS);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + " 写入完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.writeLock().unlock();
}
}
public void get(String key) {
rwLock.readLock();
try {
System.out.println(Thread.currentThread().getName() + " 正在读取: " + key);
ThreadUtil.sleep(300, TimeUnit.MILLISECONDS);
Object result = map.get(key);
System.out.println(Thread.currentThread().getName() + " 读取完成:" + result);
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.readLock().unlock();
}
}
}
CountDownLatch
CountDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复。
示例
实现效果:图书馆有6个人,等6个人离开图书馆,门卫大爷锁门。
反面示例
package com.chinda.juc.coll;
/**
* @author Wang Chinda
* @date 2020/5/11
* @see
* @since 1.0
*/
public class CountDownLatchDemo {
public static void main(String[] args) {
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 离开图书馆");
}, String.valueOf(i)).start();
}
System.out.println(Thread.currentThread().getName() + " ************* 门卫大爷锁门!");
}
}
可能会出现人没有全部离开,门就被锁。
CountDownLatch示例
package com.chinda.juc.coll;
import lombok.SneakyThrows;
import java.util.concurrent.CountDownLatch;
/**
* @author Wang Chinda
* @date 2020/5/11
* @see
* @since 1.0
*/
public class CountDownLatchDemo {
@SneakyThrows
public static void main(String[] args) {
CountDownLatch downLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 离开图书馆");
downLatch.countDown();
}, String.valueOf(i)).start();
}
downLatch.await();
System.out.println(Thread.currentThread().getName() + " ************* 门卫大爷锁门!");
}
}
天下一统
大秦帝国统一天下,前提灭六国,六国被灭顺序:韩国,赵国,魏国,楚国,燕国,齐国。
示例
package com.chinda.juc.coll;
import lombok.Getter;
/**
* @author Wang Chinda
* @date 2020/5/12
* @see
* @since 1.0
*/
public enum SengokuEnum {
ONE(1, "韩国"),
TWO(2, "赵国"),
THREE(3, "魏国"),
FOUR(4, "楚国"),
FIVE(5, "燕国"),
SIX(6, "齐国");
@Getter
private Integer code;
@Getter
private String name;
SengokuEnum(Integer code, String name) {
this.code = code;
this.name = name;
}
public static SengokuEnum eachSengKu(int index) {
SengokuEnum[] sengokus = SengokuEnum.values();
for (SengokuEnum sengoku : sengokus) {
if (index == sengoku.getCode()) {
return sengoku;
}
}
return null;
}
}
package com.chinda.juc.coll;
import lombok.SneakyThrows;
import java.util.concurrent.CountDownLatch;
/**
* @author Wang Chinda
* @date 2020/5/12
* @see
* @since 1.0
*/
public class ChinDemo {
@SneakyThrows
public static void main(String[] args) {
CountDownLatch downLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "被灭。");
downLatch.countDown();
}, SengokuEnum.eachSengKu(i).getName()).start();
}
downLatch.await();
System.out.println("天下一统, 秦国统一天下。");
}
}
CyclicBarrier
集齐7颗龙珠,召唤神龙。
示例
package com.chinda.juc.coll;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* @author Wang Chinda
* @date 2020/5/12
* @see
* @since 1.0
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("***********召唤神龙***********");
});
for (int i = 1; i <= 7; i++) {
int finalI = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 收集到第: " + finalI + "龙珠。");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}
Semaphore
信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
示例
实现效果:6辆车争抢3个车位,只允许一辆车开走,第二辆才允许进入车位。
package com.chinda.juc.coll;
import cn.hutool.core.thread.ThreadUtil;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* @author Wang Chinda
* @date 2020/5/12
* @see
* @since 1.0
*/
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 抢到车位");
ThreadUtil.sleep(3, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + " 离开车位");
} catch (Exception e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}