说明
本篇是继上一篇并发编程未讨论完的内容的续篇。上一篇传送门:
活跃性问题
在上一篇我们讨论并发编程带来的风险的时候,说到其中 一个风险就是活跃性问题。活跃性问题其实就是我们的程序在某些场景或条件下执行不下去了。在这个话题下我们会去了解什么是死锁、活锁以及饥饿,该如何避免这些情况的发生。
死锁
我们一般使用加锁来保证线程安全,但是过度地使用加锁,可能导致死锁发生。
哲学家进餐问题
“哲学家进餐”问题能很好地描述死锁的场景。5个哲学家去吃火锅,坐在一张圆桌上。它们有5根筷子(不是5双),这5根筷子放在每个人的中间。哲学家时而思考,时而进餐。每个人都要取到一双筷子才能吃到东西,并且在吃完后将筷子放回原处。
可以考虑一下这种情况,如果每个人都立即抓住自己左边的筷子,然后等待自己右边的筷子空出来,但同时都不放手自己已经拿到的筷子。会出现什么情况。可以想到,每个人都吃不上火锅了,只等凉凉了。
什么是死锁
每个人都拥有其他人需要的资源,同时又等待其他人已经拥有的资源,并且每个人在获得所需资源之前都不会放弃已经拥有的资源。这就是一种死锁。
再使用线程的术语描述一下。在线程A持有锁L并想获得锁M的同时,线程B持有锁M并尝试获取锁L,那么这两个线程将永远地等待下去。
简单死锁代码示例
public class LeftRightDeadLock {
private final Object left = new Object();
private final Object right = new Object();
public void leftRight(){
synchronized (left){
synchronized (right){
doSomething();
}
}
}
public void rightLeft(){
synchronized (right){
synchronized (left){
doSomething();
}
}
}
}
上面的代码中,如果一个线程执行leftRight()方法,另一个线程调用rightLeft()方法,则会发生死锁。
上面生产死锁的原因是,两个线程视图以不同的顺序来获得相同的锁。如果按照相同的顺序请求锁,那么就不会出现循环的加锁依赖性,因此就不会产生死锁。
产生死锁的四个条件
有个叫Coffman的牛人帮我们总结了产生死锁的四个条件:
- 互斥,共享资源X和Y只能被一个线程占用
- 占用且等待,线程T1已经获得了共享资源X,在等待共享资源Y的时候,不释放共享资源X;
- 不可抢占,其他线程不能强行抢占线程T1占用的资源;
- 循环等待,线程T1等待线程T2占有的资源,线程T2等待线程T1占有的资源,就是循环等待。
反过来说,我们只要破坏掉四个条件中的一个,就可以避免死锁的发生。
首先第一个互斥条件没法破坏,因为加锁就是互斥的语义。
- 对于“占用且等待”的条件,我们可以一次性申请所有资源;
- 对于“不可抢占”这个条件,占用部分资源的线程在申请其他资源时,如果申请不到,可以主动释放它占有的资源。
- 对于“循环等待”这个条件,可以按照固定的顺序申请资源,所有线程都按照规定的顺序获得锁,这样就不存在循环等待了。
活锁
活锁是另一种形式的活跃性问题,该问题尽管不会阻塞线程,但也不能继续执行下去,因为线程将不断重复执行相同的操作,而且总会失败。
当多个相互协作的线程都对彼此进行响应从而修改各自的状态,并使得任何一个线程都无法继续执行时,就发生了活锁。就比如路上两个人相遇,出于礼貌,都给对方让路,结果每次都碰到一起。
要解决这种活锁问题,需要在重试机制中加入随机性。比如,在网络上,两台机器使用相同的载波来发送数据包,那么这些数据包就会发生冲突。这两台机器都检查到了冲突,并都在稍后再次重发。如果二者都选择了在1秒后重试,那么又会发生冲突,并且不断地冲突下去,因而即使有大量闲置的带宽,也无法将数据包发送出去。为避免这种情况的发生,需要让他们分别等待一段随机的时间,这样就能避免活锁的发生了。
饥饿
“饥饿”就是当线程由于无法访问它所需要的资源而不能继续执行时的场景。所谓“不患寡而患不均”。当某些线程一直获取不到CPU执行资源的时候,就发生了“饥饿”。
一些容易导致饥饿的场景:
- 在应用中对Java线程优先级的使用不当。(因为JVM会将Thread API中的10个优先级映射到操作系统的调度优先级上,这就可能存在两个不同的优先级被映射到了操作系统层的同一个优先级,因此尽量不要改变线程优先级)
- 持有锁的线程,如果执行的时间过程或者存在无限循环,也可能导致“饥饿”问题。
解决“饥饿”问题的一般方案就是使用公平锁(注意synchronized术语非公平锁)。
JUC工具类库
Java并发包给我们提供了非常丰富的构建并发程序的基础模块,例如线程安全容器类、同步工具类,阻塞队列等。
这些工具类都在java.util.concurrent包下面,所以简称J.U.C工具包。
同步容器和并发容器
同步容器类有哪些
主要有Vector和Hashtable,这两个都是早期JDK的一部分,还有一些封装器类是由Collections.sychronizedXxx等工厂方法创建的。
这些类实现线程安全的方式都是:将他们的状态封装起来,并对每个公有方法都进行同步,也就是使用synchronized内置锁的方式,使得每次只有一个线程能访问容器的状态。
同步容器类的问题
同步容器类虽然是线程安全的类,但是在某些场景下可能需要额外的客户端加锁来保护复合操作的线程安全性。比如迭代(反复访问元素,直到遍历完容器中的所有元素)、条件运算(如果没有则添加)。下面给出一个示例说明下:
public class GetLastElement implements Runnable{
private List<Object> list;
public GetLastElement(List<Object> list) {
this.list = list;
}
public Object getLast(){
int index = list.size() - 1;
return list.get(index);
}
public void run() {
getLast();
}
}
public class RemoveLastElement implements Runnable{
private List<Object> list;
public RemoveLastElement(List<Object> list) {
this.list = list;
}
public void deleteLast(){
int index = list.size() - 1;
list.remove(index);
}
public void run() {
deleteLast();
System.out.println(list);
}
public static void main(String[] args) {
List<Object> list = new Vector<Object>();
list.add(123);
list.add("hello");
Thread thread1 = new Thread(new GetLastElement(list));
Thread thread2 = new Thread(new RemoveLastElement(list));
thread1.start();
thread2.start();
}
}
上面的代码中,可能会出现,线程1在获取get(2)的时候,2位置上的元素已经被线程2给remove(2)掉了。此时线程1会抛出异常,但这并不是我们期望得到的结果。
因此为了保证复合操作的原子性,就需要在get和remove方法上加锁。像下面这样:
public Object getLast(){
synchronized (list){
int index = list.size() - 1;
return list.get(index);
}
}
public void deleteLast(){
synchronized (list){
int index = list.size() - 1;
list.remove(index);
}
}
这样就保证了在调用size()和get()方法之间,不会有机会让另外一个线程进来remove掉元素了。
迭代器和ConcurrentModificationException
Java中的集合类库都是实现了Iterator迭代器接口。无论是使用标准的迭代还是for-each循还语法中,对容器类进行迭代遍历的方式都是Iterator。然而,如果有其他线程并发地修改容器,那么即使是使用迭代器也无法避免在迭代期间对容器加锁。
快速失败(fail-fast)机制
所谓fail-fast,其实就是一种警示机制,就是当它们发现容器在迭代过程中被修改是,就会抛出一个ConcurrentModificationException异常。
比如下面这段代码:
List<String> list = new Vector<String>();
list.add("java");
for(String str : list){
list.remove(str);
}
运行时就会抛出ConcurrentModificationException异常,事实上我们换成ArrayLlist也一样存在这种fail-fast机制。
Exception in thread "main" java.util.ConcurrentModificationException
at java.util.Vector$Itr.checkForComodification(Vector.java:1184)
at java.util.Vector$Itr.next(Vector.java:1137)
at thread.syncontainer.FailFast.main(FailFast.java:10)
另外一点,同步容器类将所有对容器状态的访问都串行化,以此来实现线程安全性,但这种方法严重降低了并发性,当多个线程竞争容器的锁时,吞吐量将严重降低。
并发容器
Java5提供了多种并发容器用来改进同步容器的性能问题。
- 增加了ConcurrentHashMap,用来代替同步且基于散列的Map;
- CopyOnWriteArrayList,用于在遍历操作为主要操作的情况下代替同步的List。
- 新增了Queue和BlockingQueue;
- Java6又引入了ConcurrentSkipListMap和ConcurrentSkipListSet,分别作为同步的SortedMap和SortedSet的并发替代品。
ConcurrentHashMap
与HashMap一样,ConcurrentHashMap也是一个基于散列的Map,但是它使用了一种完全不同的加锁策略来提供更高的并发性和伸缩性。ConcurrentHashMap并不是将每个方法都在同一把锁上同步并使得每次只能有一个线程访问容器,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制成为分段锁。在这种机制中,多个读线程可以并发访问Map,读线程和写线程也可以并发地访问Map。并且一定数量的写入线程可以并发地修改Map。ConcurrentHashMap带来的结果就是,在并发访问环境下实现更高的吞吐量。
同时ConcurrentHashMap提供的迭代器不会抛出ConcurrentModificationException异常。另外一点是实现了ConcurrentMap,该接口提供了一些诸如“如没有则添加”,“若相等则移除”,“若相等则替换”等复合操作,并且能保证是原子操作。
CopyOnWriteArrayList
“写入时复制(copy-on-write)”容器的线程安全性在于,只要正确地发布一个事实不可变的对象,那么在访问该对象那个时就不再需要进一步的同步。在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。
阻塞队列
阻塞队列提供了可阻塞的put和take方法,以及支持定时的offer和poll方法。如果队列已经满了,那么put方法将阻塞知道有空间可用;如果队列为空,那么take方法将会阻塞直到有元素可用。队列可以使有界的,也可以是无界的,无界队列永远都不会充满,因此无界队列上的put方法也永远不会阻塞。
Java类库中阻塞队列BlockingQueue是一个接口,它有多种实现,其中LinkedBlockingQueue和ArrayBlockingQueue是FIFO(先进先出)队列,二者分别和LinkedList和ArrayList类似,但比同步List拥有更好的并发性能。PriorityBlockingQueue是一个按优先级排序的队列,当你希望按照某种顺序而不是FIFO来处理元素时,可以使用这个队列。
基于阻塞队列,可以非常容易地实现生产者-消费者模型。当数据生成时,生产者只需要把数据放入阻塞队列中,而消费者从队列中拿数据消费就可以了。相比wait/notify实现起来要简单许多。
CountDownLatch
CountDownLatch是一种闭锁的实现。那什么是闭锁呢?闭锁就是一个同步工具类,可以延迟线程的进度直到其到达终止状态。闭锁就相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时(一般就是计数器为0),这扇门会打开并允许所有线程通过。
你可以向CountDownLatch对象设置一个初始的计数值,任何在这个对象上调用await()方法都将阻塞,知道这个计数值为0。而其他任务(或者说线程)在结束其工作时,可以在该对象上调用countDown()方法来减小这个计数器值。
另外需要注意的一点是,CountDownLatch被设计为只触发一次,意思就是计数器减到0之后不会再重置了。
看个图能更清楚点:
可以看到Thread-0在CountDownLatch实体对象上调用了await()方法后阻塞,直到其他两个线程的运行将计数器count的值减为0,Thread-0线程才继续执行。
CountDownLatch的典型用法是将一个任务分成N个独立的子任务,N个子任务分别由N个线程执行,创建一个初始值为N的CountDownLatch闭锁,主线程调用await()方法阻塞等待子任务的执行结果,子线程在执行完任务是调用countDown递减计数器值。
public class TaskPortion implements Runnable{
private static int counter = 0;
private final int id = counter++;
private final CountDownLatch countDownLatch;
public TaskPortion(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
public void run() {
doWork();
countDownLatch.countDown();
}
public void doWork(){
System.out.println(this + "完成");
}
@Override
public String toString() {
return "TaskPortion{" +
"id=" + id +
'}';
}
}
public class MainTask implements Runnable{
private CountDownLatch countDownLatch;
public MainTask(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
public void run() {
try {
countDownLatch.await();
System.out.println("主任务完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
CountDownLatch countDownLatch = new CountDownLatch(10);
for(int i = 0; i < 10; i++){
exec.execute(new TaskPortion(countDownLatch));
}
exec.execute(new MainTask(countDownLatch));
exec.shutdown();
}
}
运行结果:
TaskPortion{id=1}完成
TaskPortion{id=3}完成
TaskPortion{id=0}完成
TaskPortion{id=5}完成
TaskPortion{id=9}完成
TaskPortion{id=4}完成
TaskPortion{id=8}完成
TaskPortion{id=6}完成
TaskPortion{id=2}完成
TaskPortion{id=7}完成
主任务完成
可以看到当所有子任务都完成了,主任务才能完成。
CyclicBarrier
栅栏(Barrier)类似于闭锁,它能阻塞一组线程直到某个事件的发生。栅栏和闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。
CyclicBarrier是栅栏的一种实现,而且从名字中Cycilc可以判断是可以循环利用的栅栏(Barrier)。它适合这种一种场景:你希望创建一组任务,它们并行地执行工作,然后在进行下一个步骤前等待,直至所有任务都完成。它使得所有的并行任务都将在栅栏处列队,因此可以一致地向前移动。
下面引用《Java编程思想》中的赛马游戏中的例子做为示例:
public class Horse implements Runnable{
private static int counter = 0;
private final int id = counter++;
private int strides = 0;
private static Random rand = new Random(7);
private static CyclicBarrier barrier;
public Horse(CyclicBarrier b){ barrier = b; }
public synchronized int getStrides(){ return strides; }
public void run() {
try {
while (!Thread.interrupted()){
synchronized (this){
strides += rand.nextInt(3);
}
barrier.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
public String toString(){ return "Horse " + id + " "; }
public String tracks(){
StringBuilder s = new StringBuilder();
for(int i = 0; i < getStrides(); i++ ){
s.append("*");
}
s.append(id);
return s.toString();
}
}
public class HorseRace {
static final int FINISH_LINE = 15;
private List<Horse> horses = new ArrayList<Horse>();
private ExecutorService exec = Executors.newCachedThreadPool();
private CyclicBarrier barrier;
public HorseRace(int nHorses, final int pause){
barrier = new CyclicBarrier(nHorses, new Runnable() {
public void run() {
StringBuilder s = new StringBuilder();
for(int i = 0; i< FINISH_LINE; i++){
s.append("=");
}
System.out.println(s);
for(Horse horse : horses){
System.out.println(horse.tracks());
}
for(Horse horse : horses){
if(horse.getStrides() >= FINISH_LINE){
System.out.println(horse + "won!");
exec.shutdownNow();
return;
}
}
try {
TimeUnit.MILLISECONDS.sleep(pause);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
for (int i = 0; i < nHorses; i++){
Horse horse = new Horse(barrier);
horses.add(horse);
exec.execute(horse);
}
}
public static void main(String[] args) {
int nHorses = 7;
int pause = 200;
new HorseRace(nHorses,pause);
}
}
这个程序挺有意思,这里我把马的匹数调小到3匹,FINISH_LINE 的值调小一些,运行结果如下:
===============
*0
1
**2
===============
**0
*1
***2
===============
****0
*1
****2
===============
****0
*1
****2
===============
****0
**1
*****2
===============
****0
**1
******2
===============
*****0
**1
******2
===============
*******0
***1
******2
===============
*********0
*****1
******2
===============
***********0
*****1
*******2
===============
***********0
******1
*******2
===============
***********0
******1
*********2
===============
***********0
******1
**********2
===============
*************0
********1
************2
===============
*************0
********1
*************2
===============
*************0
*********1
**************2
===============
*************0
*********1
***************2
Horse 2 won!
CyclicBarrier的构造器中有两个参数,第一个是并行任务数量,第二个是一个Runnable类型的栅栏任务,就是当我们的计数器减为0的时候,会自动执行这个栅栏任务。
这里的计数器递减的动作不是我们做的(代码中我们只调用了CyclicBarrier的await()方法),而是CyclicBarrier自动帮我们做了,在CyclicBarrier中有个count域,通过看下面注释我们知道当所有并行的线程都到达栅栏处了,就会减为0,然后在新一轮开始时又会被重置。
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
private int count;
Semaphore
正常的锁(Lock显示锁或Jvm内建锁synchronized)在任何时刻都只允许一个任务访问一项资源,而计数信号量(Semaphore)允许n个任务同时访问这个资源。
Semaphore中管理着一组虚拟的许可(permit),许可的初始数量可以通过构造函数指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么acquire()方法将阻塞直到有许可(或者直到被中断或操作超时)。release()方法将返回一个许可给信号量。
Semaphore一般可以用来实现“对象池”,它管理着数量有限的对象,当要使用对象时可以签出它们,而在用户使用完毕时,可以将它们签回。
public class Pool<T> {
private int size;
private List<T> items = new ArrayList<T>();
private volatile boolean [] checkedOut;
private Semaphore available;
public Pool(Class<T> classObject, int size){
this.size = size;
checkedOut = new boolean[size];
available = new Semaphore(size,true);
for (int i = 0; i < size; i++){
try {
items.add(classObject.newInstance());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
public T checkOut() throws InterruptedException {
available.acquire();
return getItem();
}
public void checkIn(T x){
if(releaseItem(x)){
available.release();
}
}
private synchronized T getItem(){
for (int i = 0; i < size; i++){
if(!checkedOut[i]){
checkedOut[i] = true;
return items.get(i);
}
}
return null;
}
private synchronized boolean releaseItem(T item){
int index = items.indexOf(item);
if(index == -1) return false;
if(checkedOut[index]){
checkedOut[index] = false;
return true;
}
return false;
}
}
线程池
不要在创建野生线程了
像下面这样创建的线程,我们叫做“野生线程”,作为测试demo没有问题,如果要上生产的系统,最好不要这么做。
Thread t = new Thread(r)//r是runnable对象
因为这样去创建线程,很容易造成无限制的线程创建,而且线程管理起来非常不便。
我们知道创建线程的开销是比较高的,因此线程需要复用,需要池化管理。
Executor框架
Executor框架其实是Java类库提供给我们的任务管理框架,线程池是其中的一部分。
在Java类库中,任务执行的主要抽象不是Thread,而是Executor。Executor是一个接口,如下:
public interface Executor {
void execute(Runnable command);
}
Executor它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。Executor的实现类还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视机制。
将任务的提交和执行解耦开来之后,就可以灵活地为某种类型的任务指定和修改执行策略。在执行策略中定义了任务执行的方方面面,这包括:
- 在什么线程中执行任务?
- 任务按照什么顺序执行(FIFO、LIFO、优先级)?
- 有多少个任务能并发执行?
- 在队列中有多少个任务在等待执行?
- 如果系统由于过载而需要拒绝一个任务,那么该选择哪一个任务?另外,如何通知应用程序有任务被拒绝?
- 在执行一个任务之前或之后,应该进行哪些动作?
通过Executors工具类中的静态工厂方法创建线程池
首先要知道Executors是一个工具类,是帮助我们创建一些常用的线程池的,这些线程池提供了一些默认的配置。
比如我们可以通过Executors的静态工厂方法创建下面几种常用线程池:
newFixedThreadPool:
创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化。
newCachedThreadPool:
创建一个可以缓存的线程池,如果线程池的当前规模超过了吹需求时,那么将会回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
newSingleThreadExecutor:
是一个单线程的Executor,它创建单个工作线程来执行任务,如果这个线程异常结束,会创建另一个线程来代替。
ExecutorService接口提供生命周期管理方法
ExecutorService是一个接口,继承自Executor,扩展了Executor接口的功能,提供了服务的生命周期管理的一些方法:
public interface ExecutorService extends Executor {
/**
* 关闭执行器, 主要有以下特点:
* 1. 已经提交给该执行器的任务将会继续执行, 但是不再接受新任务的提交;
* 2. 如果执行器已经关闭了, 则再次调用没有副作用.
*/
void shutdown();
/**
* 立即关闭执行器, 主要有以下特点:
* 1. 尝试停止所有正在执行的任务, 无法保证能够停止成功, 但会尽力尝试(例如, 通过 Thread.interrupt中断任务, 但是不响应中断的任务可能无法终止);
* 2. 暂停处理已经提交但未执行的任务;
*
* @return 返回已经提交但未执行的任务列表
*/
List<Runnable> shutdownNow();
/**
* 如果该执行器已经关闭, 则返回true.
*/
boolean isShutdown();
/**
* 判断执行器是否已经【终止】.
* <p>
* 仅当执行器已关闭且所有任务都已经执行完成, 才返回true.
* 注意: 除非首先调用 shutdown 或 shutdownNow, 否则该方法永远返回false.
*/
boolean isTerminated();
/**
* 阻塞调用线程, 等待执行器到达【终止】状态.
*
* @return {@code true} 如果执行器最终到达终止状态, 则返回true; 否则返回false
* @throws InterruptedException if interrupted while waiting
*/
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
/**
* 提交一个具有返回值的任务用于执行.
* 注意: Future的get方法在成功完成时将会返回task的返回值.
*
* @param task 待提交的任务
* @param <T> 任务的返回值类型
* @return 返回该任务的Future对象
* @throws RejectedExecutionException 如果任务无法安排执行
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Callable<T> task);
/**
* 提交一个 Runnable 任务用于执行.
* 注意: Future的get方法在成功完成时将会返回给定的结果(入参时指定).
*
* @param task 待提交的任务
* @param result 返回的结果
* @param <T> 返回的结果类型
* @return 返回该任务的Future对象
* @throws RejectedExecutionException 如果任务无法安排执行
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Runnable task, T result);
/**
* 提交一个 Runnable 任务用于执行.
* 注意: Future的get方法在成功完成时将会返回null.
*
* @param task 待提交的任务
* @return 返回该任务的Future对象
* @throws RejectedExecutionException 如果任务无法安排执行
* @throws NullPointerException if the task is null
*/
Future<?> submit(Runnable task);
/**
* 执行给定集合中的所有任务, 当所有任务都执行完成后, 返回保持任务状态和结果的 Future 列表.
* <p>
* 注意: 该方法为同步方法. 返回列表中的所有元素的Future.isDone() 为 true.
*
* @param tasks 任务集合
* @param <T> 任务的返回结果类型
* @return 任务的Future对象列表,列表顺序与集合中的迭代器所生成的顺序相同,
* @throws InterruptedException 如果等待时发生中断, 会将所有未完成的任务取消.
* @throws NullPointerException 任一任务为 null
* @throws RejectedExecutionException 如果任一任务无法安排执行
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
/**
* 执行给定集合中的所有任务, 当所有任务都执行完成后或超时期满时(无论哪个首先发生), 返回保持任务状态和结果的 Future 列表.
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
/**
* 执行给定集合中的任务, 只有其中某个任务率先成功完成(未抛出异常), 则返回其结果.
* 一旦正常或异常返回后, 则取消尚未完成的任务.
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
/**
* 执行给定集合中的任务, 如果在给定的超时期满前, 某个任务已成功完成(未抛出异常), 则返回其结果.
* 一旦正常或异常返回后, 则取消尚未完成的任务.
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
详解线程池ThreadPoolExecutor
计算线程池的大小
线程池的理想大小取决于被提交的任务类型以及所部署系统的特性。在代码中通常不会硬编码写死一个值,而应该是通过某种配置机制提供。
要避免过大或者过小。设置过大,那么大量的线程将在相对很少的CPU和内存资源上发生竞争,这不仅会导致更高的内存使用量,而且还可能耗尽资源。如果设置过小,那么将导致许多空闲的处理器无法执行工作,造成资源浪费,从而降低吞吐率。
《并发编程实践》中给我提供了一个经验公式:
要是处理器达到期望的使用率,线程池的最优大小等于:
Nthreads = Ncpu * Ucpu * (1 + W/C),其中
Ncpu = CPU核心数
Ucpu = CPU使用率,0~1
W/C = 等待时间与计算时间的比率
可以通过Runtime来获得CPU的数目:
int N_CPUS = Runtime.getRuntime().availableProcessors();
配置ThreadPoolExecutor
前面我们通过Executors工具类中的工厂方法创建的newFixedThreadPool、newCachedThreadPool这些线程池,其池返回的就是ThreadPoolExecutor对象。
当默认的执行策略不能满足需求时,我们就可以通过ThreadPoolExecutor来定制自己的线程池。
我们来看下ThreadPoolExecutor的构造函数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
总共有4个重载的构造函数,我们选择参数最多的进行分析,其他几个都调用了上面这个。
先看下参数:
corePoolSize:表示线程池的基本大小,也叫核心线程数,并且当工作队列满了的时候,如果在有任务提交上来,会创建超过这个数量的线程。
maximumPoolSize:最大线程数,表示该线程池最多可以创建这么多线程。
keepAliveTime:空闲线程的存活时间,如果某个线程的空闲时间超过了存活时间,将会被回收。
unit:存活时间的时间单位。
workQueue:阻塞队列,提交上来的任务如果没有可用的线程执行时,会被放入该队列中等待执行。
threadFactory:线程工厂,用于指定如何创建一个线程,比如可以指定一个有意义的名字,指定一个UncaughtExceptionHandler等。
RejectedExecutionHandler :拒绝策略或者饱和策略处理器。当有界队列被填满后,该如何处理提交上来的任务。JDK提供了四种RejectedExecutionHandler的实现。AbortPolicy、DiscardPolicy、DiscardOldestPolicy和CallerRunsPolicy。
AbortPolicy策略是默认的饱和策略,该策略会抛出未检查的RejectedExecutionExcetion异常。调用者可以捕获异常进行相应地处理。
DiscardPolicy策略会抛弃任务,DiscardOldestPolicy会抛弃下一个将要被回字形的任务,然后尝试重新提交新的任务。
CallerRunsPolicy不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
下图描述了线程池的总体工作流程:
JAVA内存模型
Java内存模型是个很复杂的规范,这里简单介绍下。
Java内存模型规定了所有的变量都存储在主内存中,每条线程还有自己的工作内存,线程的工作内存中保存了该线程中是用到的变量的主内存副本拷贝,线程对变量的所有操作都必须在工作内存中进行,而不能直接读写主内存。不同的线程之间也无法直接访问对方工作内存中的变量,线程间变量的传递均需要自己的工作内存和主存之间进行数据同步进行。
JMM是一种规范,目的是解决由于多线程通过共享内存进行通信时,存在的本地内存数据不一致、编译器会对代码指令重排序、处理器会对代码乱序执行等带来的问题。目的是保证并发编程场景中的原子性、可见性和有序性。
happen-before规则
happen-before规则是JMM中限制指令重排序的一些规则。意思是前面一个操作的结果对后续操作是可见的。这里总结6条happen-before规则:
1、程序的顺序性规则。
意思就是程序前面对某个变量的修改一定是对后续操作可见的。
public void read(){
int x = 20;
boolean b = true;
}
x = 20 happen-before b = true,这也符合我们的直觉。
2、volatile变量规则
对volatile白你两的写操作 happen-before 于对该变量的读操作。这也是volatile类型的变量能保证可见性的原因。
3、传递性规则
如果A happen-before B,B happen-before C。则A happen-before C。
4、监视器锁规则
监视器锁指的就是synchronized。对一个锁的解锁 happen-before 于后续对这个锁的加锁。
public void read(){
int x = 20;
boolean b = true;
synchronized (this){// 这里会自动加锁
x = 10;
}//执行结束,自动释放锁
}
这条规则说的就是,如果线程A执行完释放锁后将x改为10了,线程B在获得锁进入临界区后是可以看到线程A对x的写操作的,也就是B能看到x=10。
5、线程启动规则
它是指主线程A启动子线程B后,子线程B能够看到主线程在启动子线程B前的操作。
int x = 10;
Thread B = new Thread(()->{
// 此处是可以看到主线程修改的x=30的
});
x = 30;
B.start();
6、线程join规则
这条是关于线程等待的。它是指主线程A等待子线程B完成(主线程A通过调用子线程B的join()方法实现),当子线程B完成后(主线程A中join()方法返回),主线程能够看到子线程的操作。
int x = 10;
Thread B = new Thread(()->{
// 对共享变量的修改
x = 15;
});
B.start();
B.join();
// 主线程在调用了join之后是能
// 看到x = 15的
后记
CAS原理,AQS和显式锁ReentrantLock原理性的东西后期单独总结。