转载:http://www.cnblogs.com/itdragon/p/8319183.html
线程池 BlockingQueue synchronized volatile
前段时间看了一篇关于"一名3年工作经验的程序员应该具备的技能"文章,倍受打击。很多熟悉而又陌生的知识让我怀疑自己是一个假的程序员。本章从线程池,阻塞队列,synchronized 和 volatile关键字,wait,notify方法实现线程之间的通讯,死锁,常考面试题。将这些零碎的知识整合在一起。如下图所示。
学习流程图:
技术:Executors,BlockingQueue,synchronized,volatile,wait,notify
说明:文章学习思路:线程池---->队列---->关键字---->死锁---->线程池实战
源码:https://github.com/ITDragonBlog/daydayup/tree/master/ThreadBase
线程池
线程池,顾名思义存放线程的池子,可以类比数据库的连接池。因为频繁地创建和销毁线程会给服务器带来很大的压力。若能将创建的线程不再销毁而是存放在池中等待下一个任务使用,可以不仅减少了创建和销毁线程所用的时间,提高了性能,同时还减轻了服务器的压力。
线程池的使用
初始化线程池有五个核心参数,分别是 corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue。还有两个默认参数 threadFactory, handler
corePoolSize:线程池初始核心线程数。初始化线程池的时候,池内是没有线程,只有在执行任务的时会创建线程。
maximumPoolSize:线程池允许存在的最大线程数。若超过该数字,默认提示RejectedExecutionException
异常
keepAliveTime:当前线程数大于核心线程时,该参数生效,其目的是终止多余的空闲线程等待新任务的最长时间。即指定时间内将还未接收任务的线程销毁。
unit:keepAliveTime 的时间单位
workQueue:缓存任务的的队列,一般采用LinkedBlockingQueue。
threadFactory:执行程序创建新线程时使用的工厂,一般采用默认值。
handler:超出线程范围和队列容量而使执行被阻塞时所使用的处理程序,一般采用默认值。
线程池工作流程
开始,游泳馆来了一名学员,于是馆主安排一个教练负责培训这名学员;
然后,游泳馆来了六名学员,可馆主只招了五名教练,于是有一名学员被安排到休息室等待;
后来,游泳馆来了十六名学员,休息室已经满了,馆主核算了开支,预计最多可招十名教练;
最后,游泳馆只来了十名学员,馆主对教练说,如果半天内接不到学员的教练就可以走了;
结果,游泳馆没有学员,关闭了。
在接收任务前,线程池内是没有线程。只有当任务来了才开始新建线程。当任务数大于核心线程数时,任务进入队列中等待。若队列满了,则线程池新增线程直到最大线程数。再超过则会执行拒绝策略。
线程池的三种关闭
shutdown: 线程池不再接收任务,等待线程池中所有任务完成后,关闭线程池。常用
shutdownNow: 线程池不再接收任务,忽略队列中的任务,尝试中断正在执行的任务,返回未执行任务列表,关闭线程池。慎用
awaitTermination: 线程池可以继续接收任务,当任务都完成后,或者超过设置的时间后,关闭线程池。方法是阻塞的,考虑使用
线程池的种类
1 newSingleThreadExecutor() 单线程线程池
初始线程数和允许最大线程数都是一,keepAliveTime 也就失效了,队列是无界阻塞队列。该线程池的主要作用是负责缓存任务。
2 newFixedThreadPool(n) 固定大小线程池
初始线程数和允许最大线程数相同,且大小自定义,keepAliveTime 也就失效了,队列是无界阻塞队列。符合大部分业务要求,常用。
3 newCachedThreadPool() 缓存无界线程池
初始线程数为零,最大线程数为无穷大,keepAliveTime 60秒类终止空闲线程,队列是无缓冲无界队列。适合任务数不多的场景,慎用。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 线程池
* 优势,类比数据库的连接池
* 1. 频繁的创建和销毁线程会给服务器带来很大的压力
* 2. 若创建的线程不销毁而是留在线程池中等待下次使用,则会很大地提高效率也减轻了服务器的压力
*
* 三种workQueue策略
* 直接提交 SynchronousQueue
* 无界队列 LinkedBlockingQueue
* 有界队列 ArrayBlockingQueue
*
* 四种拒绝策略
* AbortPolicy : JDK默认,超出 MAXIMUM_POOL_SIZE 放弃任务抛异常 RejectedExecutionException
* CallerRunsPolicy : 尝试直接调用被拒绝的任务,若线程池被关闭,则丢弃任务
* DiscardOldestPolicy : 放弃队列最前面的任务,然后重新尝试执被拒绝的任务。若线程池被关闭,则丢弃任务
* DiscardPolicy : 放弃不能执行的任务但不抛异常
*/
public class ThreadPoolExecutorStu {
// 线程池中初始线程个数
private final static Integer CORE_POOL_SIZE = 3;
// 线程池中允许的最大线程数
private final static Integer MAXIMUM_POOL_SIZE = 8;
// 当线程数大于初始线程时。终止多余的空闲线程等待新任务的最长时间
private final static Long KEEP_ALIVE_TIME = 10L;
// 任务缓存队列 ,即线程数大于初始线程数时先进入队列中等待,此数字可以稍微设置大点,避免线程数超过最大线程数时报错。或者直接用无界队列
private final static ArrayBlockingQueue<Runnable> WORK_QUEUE = new ArrayBlockingQueue<Runnable>(5);
public static void main(String[] args) {
Long start = System.currentTimeMillis();
/**
* ITDragonThreadPoolExecutor 耗时 1503
* ITDragonFixedThreadPool 耗时 505
* ITDragonSingleThreadExecutor 语法问题报错,
* ITDragonCachedThreadPool 耗时506
* 推荐使用自定义线程池,或newFixedThreadPool(n)
*/
ThreadPoolExecutor threadPoolExecutor = ITDragonThreadPoolExecutor();
for (int i = 0; i < 8; i++) { // 执行8个任务,若超过MAXIMUM_POOL_SIZE则会报错 RejectedExecutionException
MyRunnableTest myRunnable = new MyRunnableTest(i);
threadPoolExecutor.execute(myRunnable);
System.out.println("线程池中现在的线程数目是:"+threadPoolExecutor.getPoolSize()+", 队列中正在等待执行的任务数量为:"+
threadPoolExecutor.getQueue().size());
}
// 关掉线程池 ,并不会立即停止(停止接收外部的submit任务,等待内部任务完成后才停止),推荐使用。 与之对应的是shutdownNow,不推荐使用
threadPoolExecutor.shutdown();
try {
// 阻塞等待30秒关掉线程池,返回true表示已经关闭。和shutdown不同,它可以接收外部任务,并且还阻塞。这里为了方便统计时间,所以选择阻塞等待关闭。
threadPoolExecutor.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("耗时 : " + (System.currentTimeMillis() - start));
}
// 自定义线程池,开发推荐使用
public static ThreadPoolExecutor ITDragonThreadPoolExecutor() {
// 构建一个,初始线程数量为3,最大线程数据为8,等待时间10分钟 ,队列长度为5 的线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.MINUTES, WORK_QUEUE);
return threadPoolExecutor;
}
/**
* 固定大小线程池
* corePoolSize初始线程数和maximumPoolSize最大线程数一样,keepAliveTime参数不起作用,workQueue用的是无界阻塞队列
*/
public static ThreadPoolExecutor ITDragonFixedThreadPool() {
ExecutorService executor = Executors.newFixedThreadPool(8);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
return threadPoolExecutor;
}
/**
* 单线程线程池
* 等价与Executors.newFixedThreadPool(1);
*/
public static ThreadPoolExecutor ITDragonSingleThreadExecutor() {
ExecutorService executor = Executors.newSingleThreadExecutor();
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
return threadPoolExecutor;
}
/**
* 无界线程池
* corePoolSize 初始线程数为零
* maximumPoolSize 最大线程数无穷大
* keepAliveTime 60秒类将没有被用到的线程终止
* workQueue SynchronousQueue 队列,无容量,来任务就直接新增线程
* 不推荐使用
*/
public static ThreadPoolExecutor ITDragonCachedThreadPool() {
ExecutorService executor = Executors.newCachedThreadPool();
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
return threadPoolExecutor;
}
}
class MyRunnableTest implements Runnable {
private Integer num; // 正在执行的任务数
public MyRunnableTest(Integer num) {
this.num = num;
}
public void run() {
System.out.println("正在执行的MyRunnable " + num);
try {
Thread.sleep(500);// 模拟执行事务需要耗时
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("MyRunnable " + num + "执行完毕");
}
}
队列
队列,是一种数据结构。大部分的队列都是以FIFO(先进先出)的方式对各个元素进行排序的(PriorityBlockingQueue是根据优先级排序的)。队列的头移除元素,队列的末尾插入元素。插入的元素建议不能为null。Queue主要分两类,一类是高性能队列 ConcurrentLinkedQueue;一类是阻塞队列 BlockingQueue。本章重点介绍BlockingQueue
ConcurrentLinkedQueue
ConcurrentLinkedQueue性能好于BlockingQueue。是基于链接节点的无界限线程安全队列。该队列的元素遵循先进先出的原则。不允许null元素。
BlockingQueue
ArrayBlockingQueue: 基于数组的阻塞队列,在内部维护了一个定长数组,以便缓存队列中的数据对象。并没有实现读写分离,也就意味着生产和消费不能完全并行。是一个有界队列
LinkedBlockingQueue:基于列表的阻塞队列,在内部维护了一个数据缓冲队列(由一个链表构成),实现采用分离锁(读写分离两个锁),从而实现生产者和消费者操作的完全并行运行。是一个无界队列,也可以指定队列大小
SynchronousQueue: 没有缓存的队列,生存者生产的数据直接会被消费者获取并消费。若没有数据就直接调用出栈方法则会报错。
三种队列使用场景
newFixedThreadPool 线程池采用的队列是LinkedBlockingQueue。其优点是无界可缓存,内部实现读写分离,并发的处理能力高于ArrayBlockingQueue
newCachedThreadPool 线程池采用的队列是SynchronousQueue。其优点就是无缓存,接收到的任务均可直接处理,再次强调,慎用!
并发量不大,服务器性能较好,可以考虑使用SynchronousQueue。
并发量较大,服务器性能较好,可以考虑使用LinkedBlockingQueue。
并发量很大,服务器性能无法满足,可以考虑使用ArrayBlockingQueue。系统的稳定最重要。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
/**
* 阻塞队列
* ArrayBlockingQueue :有界
* LinkedBlockingQueue :无界
* SynchronousQueue :无缓冲直接用
* 非阻塞队列
* ConcurrentLinkedQueue :高性能
*/
public class ITDragonQueue {
/**
* ArrayBlockingQueue : 基于数组的阻塞队列实现,在内部维护了一个定长数组,以便缓存队列中的数据对象。
* 内部没有实现读写分离,生产和消费不能完全并行,
* 长度是需要定义的,
* 可以指定先进先出或者先进后出,
* 是一个有界队列。
*/
@Test
public void ITDragonArrayBlockingQueue() throws Exception {
ArrayBlockingQueue<String> array = new ArrayBlockingQueue<String>(5); // 可以尝试 队列长度由3改到5
array.offer("offer 插入数据方法---成功返回true 否则返回false");
array.offer("offer 3秒后插入数据方法", 3, TimeUnit.SECONDS);
array.put("put 插入数据方法---但超出队列长度则阻塞等待,没有返回值");
array.add("add 插入数据方法---但超出队列长度则提示 java.lang.IllegalStateException"); // java.lang.IllegalStateException: Queue full
System.out.println(array);
System.out.println(array.take() + " 还剩元素 : " + array); // 从头部取出元素,并从队列里删除,若队列为null则一直等待
System.out.println(array.poll() + " 还剩元素 : " + array); // 从头部取出元素,并从队列里删除,执行poll 后 元素减少一个
System.out.println(array.peek() + " 还剩元素 : " + array); // 从头部取出元素,执行peek 不移除元素
}
/**
* LinkedBlockingQueue:基于列表的阻塞队列,在内部维护了一个数据缓冲队列(该队列由一个链表构成)。
* 其内部实现采用读写分离锁,能高效的处理并发数据,生产者和消费者操作的完全并行运行
* 可以不指定长度,
* 是一个无界队列。
*/
@Test
public void ITDragonLinkedBlockingQueue() throws Exception {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>();
queue.offer("1.无界队列");
queue.add("2.语法和ArrayBlockingQueue差不多");
queue.put("3.实现采用读写分离");
List<String> list = new ArrayList<String>();
System.out.println("返回截取的长度 : " + queue.drainTo(list, 2));
System.out.println("list : " + list);
}
/**
* SynchronousQueue:没有缓冲的队列,生存者生产的数据直接会被消费者获取并消费。
*/
@Test
public void ITDragonSynchronousQueue() throws Exception {
final SynchronousQueue<String> queue = new SynchronousQueue<String>();
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("take , 在没有取到值之前一直处理阻塞 : " + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread1.start();
Thread.sleep(2000);
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
queue.add("进值!!!");
}
});
thread2.start();
}
/**
* ConcurrentLinkedQueue:是一个适合高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,性能好于BlockingQueue。
* 它是一个基于链接节点的无界限线程安全队列。该队列的元素遵循先进先出的原则。头是最先加入的,尾是最后加入的,不允许null元素。
* 无阻塞队列,没有 put 和 take 方法
*/
@Test
public void ITDragonConcurrentLinkedQueue() throws Exception {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
queue.offer("1.高性能无阻塞");
queue.add("2.无界队列");
System.out.println(queue);
System.out.println(queue.poll() + " : " + queue); // 从头部取出元素,并从队列里删除,执行poll 后 元素减少一个
System.out.println(queue.peek() + " : " + queue); // 从头部取出元素,执行peek 不移除元素
}
}
关键字
关键字是为了线程安全服务的,哪什么是线程安全呢?当多个线程访问某一个类(对象或方法)时,这个对象始终都能表现出正确的行为,那么这个类(对象或方法)就是线程安全的。
线程安全的两个特性:原子性和可见性。synchronized 同步,原子性。volatile 可见性。wait,notify 负责多个线程之间的通信。
synchronized
synchronized 可以在任意对象及方法上加锁,而加锁的这段代码称为"互斥区"或"临界区",若一个线程想要执行synchronized修饰的代码块,首先要
step1 尝试获得锁
step2 如果拿到锁,执行synchronized代码体内容
step3 如果拿不到锁,这个线程就会不断的尝试获得这把锁,直到拿到为止,而且是多个线程同时去竞争这把锁。
注*(线程多了也就是会出现锁竞争的问题,多个线程执行的顺序是按照CPU分配的先后顺序而定的,而并非代码执行的先后顺序)
synchronized 可以修饰方法,修饰代码块,这些都是对象锁。若和static一起使用,则升级为类锁。
synchronized 锁是可以重入的,当一个线程得到了一个对象的锁后,再次请求此对象时是可以再次得到该对象的锁。锁重入的机制,也支持在父子类继承的场景。
synchronized 同步异步,一个线程得到了一个对象的锁后,其他线程是可以执行非加锁的方法(异步)。但是不能执行其他加锁的方法(同步)。
synchronized 锁异常,当一个线程执行的代码出现异常时,其所持有的锁会自动释放。
/**
* synchronized 关键字,可以修饰方法,也可以修饰代码块。建议采用后者,通过减小锁的粒度,以提高系统性能。
* synchronized 关键字,如果以字符串作为锁,请注意String常量池的缓存功能和字符串改变后锁是否的情况。
* synchronized 锁重入,当一个线程得到了一个对象的锁后,再次请求此对象时是可以再次得到该对象的锁。
* synchronized 同异步,一个线程获得锁后,另外一个线程可以执行非synchronized修饰的方法,这是异步。若另外一个线程执行任何synchronized修饰的方法则需要等待,这是同步
* synchronized 类锁,用static + synchronized 修饰则表示对整个类进行加锁
*/
public class ITDragonSynchronized {
private void thisLock () { // 对象锁
synchronized (this) {
System.out.println("this 对象锁!");
}
}
private void classLock () { // 类锁
synchronized (ITDragonSynchronized.class) {
System.out.println("class 类锁!");
}
}
private Object lock = new Object();
private void objectLock () { // 任何对象锁
synchronized (lock) {
System.out.println("object 任何对象锁!");
}
}
private void stringLock () { // 字符串锁,注意String常量池的缓存功能
synchronized ("string") { // 用 new String("string") t4 和 t5 同时进入。用string t4完成后,t5在开始
try {
for(int i = 0; i < 3; i++) {
System.out.println("thread : " + Thread.currentThread().getName() + " stringLock !");
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private String strLock = "lock"; // 字符串锁改变
private void changeStrLock () {
synchronized (strLock) {
try {
System.out.println("thread : " + Thread.currentThread().getName() + " changeLock start !");
strLock = "changeLock";
Thread.sleep(500);
System.out.println("thread : " + Thread.currentThread().getName() + " changeLock end !");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private synchronized void method1() { // 锁重入
System.out.println("^^^^^^^^^^^^^^^^^^^^ method1");
method2();
}
private synchronized void method2() {
System.out.println("-------------------- method2");
method3();
}
private synchronized void method3() {
System.out.println("******************** method3");
}
private synchronized void syncMethod() {
try {
System.out.println(Thread.currentThread().getName() + " synchronized method!");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 若次方法也加上了synchronized,就必须等待t1线程执行完后,t2才能调用,两个synchronized块之间具有互斥性,synchronized块获得的是一个对象锁,锁定的是整个对象
private void asyncMethod() {
System.out.println(Thread.currentThread().getName() + " asynchronized method!");
}
// static + synchronized 修饰则表示类锁,打印的结果是thread1线程先执行完,然后在执行thread2线程。若没有被static修饰,则thread1和 thread2几乎同时执行,同时结束
private synchronized void classLock(String args) {
System.out.println(args + "start......");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(args + "end......");
}
public static void main(String[] args) throws Exception {
final ITDragonSynchronized itDragonSynchronized = new ITDragonSynchronized();
System.out.println("------------------------- synchronized 代码块加锁 -------------------------");
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
itDragonSynchronized.thisLock();
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
itDragonSynchronized.classLock();
}
});
Thread thread3 = new Thread(new Runnable() {
@Override
public void run() {
itDragonSynchronized.objectLock();
}
});
thread1.start();
thread2.start();
thread3.start();
Thread.sleep(2000);
System.out.println("------------------------- synchronized 字符串加锁 -------------------------");
// 如果字符串锁,用new String("string") t4,t5线程是可以获取锁的,如果直接使用"string" ,若锁不释放,t5线程一直处理等待中
Thread thread4 = new Thread(new Runnable() {
@Override
public void run() {
itDragonSynchronized.stringLock();
}
}, "t4");
Thread thread5 = new Thread(new Runnable() {
@Override
public void run() {
itDragonSynchronized.stringLock();
}
}, "t5");
thread4.start();
thread5.start();
Thread.sleep(3000);
System.out.println("------------------------- synchronized 字符串变锁 -------------------------");
// 字符串变了,锁也会改变,导致t7线程在t6线程未结束后变开始执行,但一个对象的属性变了,不影响这个对象的锁。
Thread thread6 = new Thread(new Runnable() {
@Override
public void run() {
itDragonSynchronized.changeStrLock();
}
}, "t6");
Thread thread7 = new Thread(new Runnable() {
@Override
public void run() {
itDragonSynchronized.changeStrLock();
}
}, "t7");
thread6.start();
thread7.start();
Thread.sleep(2000);
System.out.println("------------------------- synchronized 锁重入 -------------------------");
Thread thread8 = new Thread(new Runnable() {
@Override
public void run() {
itDragonSynchronized.method1();
}
}, "t8");
thread8.start();
Thread thread9 = new Thread(new Runnable() {
@Override
public void run() {
SunClass sunClass = new SunClass();
sunClass.sunMethod();
}
}, "t9");
thread9.start();
Thread.sleep(2000);
System.out.println("------------------------- synchronized 同步异步 -------------------------");
Thread thread10 = new Thread(new Runnable() {
@Override
public void run() {
itDragonSynchronized.syncMethod();
}
}, "t10");
Thread thread11 = new Thread(new Runnable() {
@Override
public void run() {
itDragonSynchronized.asyncMethod();
}
}, "t11");
thread10.start();
thread11.start();
Thread.sleep(2000);
System.out.println("------------------------- synchronized 同步异步 -------------------------");
ITDragonSynchronized classLock1 = new ITDragonSynchronized();
ITDragonSynchronized classLock2 = new ITDragonSynchronized();
Thread thread12 = new Thread(new Runnable() {
@Override