一、volatile关键字与内存可见性
首先看一段代码
public class VolatileTest {
public static void main(String[] args) {
ThreadDemp td = new ThreadDemp();
new Thread(td).start();
while(true){
if(td.isFlag()){
System.out.println("--------------");
break;
}
}
}
}
class ThreadDemp implements Runnable{
private boolean flag = false;
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = true;
System.out.println("flag="+flag);
}
public boolean isFlag(){
return flag;
}
}
正常来说,在td线程运行后将flag变为true后,程序结束。然而运行后会发现,程序确实打印出来flag=true这条语句,但是却不能结束。
这是因为JVM会为每一线程都分配一个独立的缓存。flag是主存当中的线程共享数据,在它分别被加载到td线程和main线程中后,td线程会对其进行更改再把结果写回主存。但是,对于main线程来讲,因为td线程进行了延迟,所以main线程获取的flag一定是false,又因为main线程中while(true)循环运行效率非常快,以至于main线程无暇再从主存读取flag,因此flag的更改也对其无影响,因此程序无法退出。
内存可见性问题是,当多个线程操作共享数据时,彼此不可见。
要解决这个问题,一个方法就是加锁,但是加锁会导致效率降低。另一个方法就是使用volatile关键字,使得数据是可见的。也可以理解为线程对volatile修饰的数据的读与写都是直接在主存中进行的。
相较于synchronized,volatile是一种轻量级的同步策略。但:
- volatile不具有“互斥性”
- volatile不能保证变量的“原子性”
二、原子变量与CAS算法
i++的原子性问题,在底层i++其实相当于
int temp = i;
i = i + 1;
i = temp;
先阅读下面代码
public class AtomicTest {
public static void main(String[] args) {
AtomicDemo ad = new AtomicDemo();
for (int i = 0; i < 10; ++i) {
new Thread(ad).start();
}
}
}
class AtomicDemo implements Runnable {
private volatile int serialNumber = 0;
@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getSerialNumber());
}
public int getSerialNumber() {
return serialNumber++;
}
}
运行的结果为
2
0
0
1
3
4
5
6
7
8
因为i++相当于三步操作,所以volatile只能保证这三步操作都在主存中进行,但是每个线程可以执行不同的步骤,所以volatile不能保证原子性。
原子变量
为了解决上述问题,使用原子变量。
在jdk1.5后java.util.concurrent.atomic包下提供了常用的原子变量:
- 使用volatile保证内存可见性
- CAS(Compare-And-Swap)算法保证数据的原子性。CAS算法是硬件对并发操作共享数据的支持。CAS包括了三个操作数:内存值V,预估值A,更新值B,当且仅当V==A时,V=B,否则,不进行任何操作。
三、同步容器类ConcurrentHashMap
ConcurrentHashMap采用“锁分段机制”。
我们知道HashMap与HashTable的不同就在于HashTable是线程安全的,原因就是HashTable使用了锁,但是HashTable是全表锁,一旦对其中一个内容进行操作,整张表全被都被锁,效率非常低。
而ConcurrentHashMap默认将表分为16段,每段都有独立的锁。不同线程访问不同段是不会相互干扰的。
java.util.concurrent包中提供了用于多线程上下文中的Collection实现:ConcurrentHashMap、ConcurrentSkipListMap、ConCurrentSkipSet、CopyOnWriteArrayList和CopyOnWriteArraySet。当期望许多线程访问一个给定collection时,ConcurrentHashMap通常优于同步的HashMap,ConcurentSkipListMap通常优于同步的TreeMap。当期望的读数和遍历源源大于列表的更新数时,copyOnWriteArrayList优于同步的ArrayList。
四、CountDownLatch闭锁
CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
闭锁可以延迟线程的进度知道其达到终止状态,闭锁可以用来确保某些活动知道其他活动都完成才继续执行:
- 确保某个计算在其需要的所有资源都被初始化之后才继续执行;
- 确保某个服务在其依赖的所有其他服务器都已经启动之后才启动;
- 等待直到某个操作所有参与者都准备就绪在继续执行。
public class CountDownLatchTest {
public static void main(String[] args) {
CountDownLatch count = new CountDownLatch(5);
LatchDemo ld = new LatchDemo(count);
long start = System.currentTimeMillis();
for(int i = 0; i < 5; i++){
new Thread(ld).start();
}
try {
//阻塞等待,直到count的数目为0
count.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
//得出所有线程执行完毕的总时间
long end = System.currentTimeMillis();
System.out.println("time="+(end-start));
}
}
class LatchDemo implements Runnable{
private CountDownLatch count;
public LatchDemo(CountDownLatch count) {
this.count = count;
}
@Override
public void run() {
synchronized(this){
for(int i = 0; i < 50000; ++i){
if(i % 2 == 0){
System.out.println(i);
}
}
//每执行一次,数量减一
count.countDown();
}
}
}
五、实现Callable接口
创建线程一共有四种方式,继承Thread类,实现Runnable接口、实现Callable接口、通过线程池。
Callable相较于Runnable接口,方法可以有返回值,并且可以抛出异常。
执行Callable方式,需要FutureTask实现类的支持,用于接收运算结果。FutureTask是Future接口的实现类。
public class CallableTest {
public static void main(String[] args) {
CallableDemo callableDemo = new CallableDemo();
FutureTask<Integer> result = new FutureTask<>(callableDemo);
new Thread(result).start();
try {
Integer sum = result.get();
System.out.println(sum);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
class CallableDemo implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i < 100; i++) {
sum += i;
}
return sum;
}
}
FutureTask的get方法可以获取到call()方法的返回值,并且此方法要等线程结束才能获取返回值,故也可实现闭锁。
六、同步锁Lock
用于解决多线程安全问题,jdk1.5以后提供的一种方法。synchronized是隐式锁,而Lock是显示锁,需要通过lock()方法上锁,通过unlock()方法解锁
使用condition进行线程间的通信
Condition condition = lock.newCondition();
condition有方法await()、signal()和signalAll()作用与wait、notify和notifyAll相同。
线程按序交替执行
三个线程按照指定的顺序执行
public class ABCAternateTest {
public static void main(String[] args) {
AlternateDemp ad = new AlternateDemp();
new Thread(new Runnable() {
@Override
public void run() {
for(int i=0;i<20;++i){
ad.loopA(i);
}
}
},"A").start();
new Thread(new Runnable() {
@Override
public void run() {
for(int i=0;i<20;++i){
ad.loopB(i);
}
}
},"B").start();
new Thread(new Runnable() {
@Override
public void run() {
for(int i=0;i<20;++i){
ad.loopC(i);
}
}
},"C").start();
}
}
class AlternateDemp{
//标识
private int num = 1;
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void loopA(int totalLoop){
lock.lock();
try {
if (num != 1){
condition1.await();
}
for (int i = 0; i < 1; ++i){
System.out.println(Thread.currentThread().getName()+" "+i+" "+totalLoop);
}
num = 2;
condition2.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void loopB(int totalLoop){
lock.lock();
try {
if (num != 2){
condition2.await();
}
for (int i = 0; i < 1; ++i){
System.out.println(Thread.currentThread().getName()+" "+i+" "+totalLoop);
}
num = 3;
condition3.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void loopC(int totalLoop){
lock.lock();
try {
if (num != 3){
condition3.await();
}
for (int i = 0; i < 1; ++i){
System.out.println(Thread.currentThread().getName()+" "+i+" "+totalLoop);
}
num = 1;
condition1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
七、读写锁ReadWriteLock
ReadWriteLock内部维护两个锁,一个读锁,一个写锁。读锁可以同时被多个读线程持有,而写锁一次只能由一个线程持有。
写与写互斥,读与写也互斥,只有读操作可以同时进行
public class ReadWriteLockTest {
public static void main(String[] args) {
ReadWriteLockDemo demo = new ReadWriteLockDemo();
new Thread(new Runnable() {
@Override
public void run() {
demo.set((int) (Math.random()*101));
}
},"Write").start();
for(int i=0;i<100;++i){
new Thread(new Runnable() {
@Override
public void run() {
demo.get();
}
}).start();
}
}
}
class ReadWriteLockDemo {
private int num = 0;
private ReadWriteLock lock = new ReentrantReadWriteLock();
public void get() {
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() +" "+ num);
} finally {
lock.readLock().unlock();
}
}
public void set(int num){
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName());
this.num = num;
} finally {
lock.writeLock().unlock();
}
}
}
八、线程八锁
- 非静态方法的锁默认为this,静态方法的锁为对应的Class实例
- 某一个时刻内,只能有一个线程持有锁,无论几个方法。
九、线程池
提供了一个线程队列,队列中保存着所有等待状态的线程,避免了创建与销毁的额外开销,提高了响应速度。
线程池的体系结构
java.util.concurrent.Executor:负责线程的使用与调度的根接口
- ExecutorService子接口:线程池的主要接口
- ThreadPoolExecutor实现类。
- ScheduledExecutorService子接口:负责线程的调度。
- ScheduledThreadPoolExecutor:继承了ThreadPoolExecutor,实现了ScheduledExecutorService接口
工具类:Executors
- ExecutorService newFixedThreadPool():创建固定大小的线程池
- ExecutorService newCacheThreadPool():缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量
- ExecutorService newSingleThreadExecutor():创建单个线程池。线程池中只有一个线程。
ScheduledExecutorService newScheduledThreadPool():创建固定大小的线程,可以延迟或定时的执行任务。
public class ThreadPoolTest {
public static void main(String[] args) {
//1.创建线程池
ExecutorService pool = Executors.newFixedThreadPool(5);
ThreadDemo td = new ThreadDemo();
//2.分配任务
for (int i=0;i<10;++i){
pool.submit(td);
}
//3.关闭线程池
pool.shutdown();
}
}
class ThreadDemo implements Runnable{
private int num = 0;
@Override
public void run() {
while(num < 100) {
System.out.println(Thread.currentThread().getName()+" "+ ++num);
}
}
}
十、ForkJoinPool分支合并框架
Fork/Join框架:就是在必要的情况下,将一个大人物,进行拆分(fork)成若干个小任务,再将一个个的小任务运算的结果进行join汇总。
public class ForkJoinTest {
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinSumCalculate(0L,100000000L);
Long sum = pool.invoke(task);
System.out.println(sum);
}
}
class ForkJoinSumCalculate extends RecursiveTask<Long> {
private long start;
private long end;
private static final long THURSHOLD = 100000L;
public ForkJoinSumCalculate(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long length = end - start;
//一次只计算10000个数
if (length <= THURSHOLD) {
long sum = 0;
for (long i = 0; i <= end; ++i) {
sum += i;
}
return sum;
} else {
//拆分
long middle = (start + end) / 2;
ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle);
left.fork();//进行拆分,同时压入线程队列
ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle + 1, end);
right.fork();
//合并
return left.join() + right.join();
}
}
}