锁的底层
1,java代码 synchronized
2,java字节码 monitorenter monitorexit monitor监视器监视这把锁
3,执行过程中自动锁升级 从new出来升级到偏向锁,从偏向锁升级到自旋锁,从自旋锁升级到重量级锁
4,lock comxchg
synchronized底层
1,无锁状态
2,偏向锁,markword记录这个线程id,环境中没有其他线程来争抢这块资源
3,自旋锁,当去执行一块资源的时候,发现其他线程已经占用了该资源,此时本线程等待其他线程执行完,等待过程中持续消耗cpu资源,默认自旋10次升级为重量级锁,自旋锁存在于用户态,不经过内核态,因此效率高。应用场景:执行时间短,线程数少。
4,重量级锁,去操作系统申请资源加这把锁synchronized
sychronized属于可重入锁,因为只要锁对象相同sychronized方法可以调用其他sychronized方法
线程通信的方式:
1,wait/notify
2,locksupport.park/unpark
3,semaphore.acquire/release 配合join
4,condition.await/singleAll condition的本质是等待队列个数,生产者和消费者都存在队列
wait释放锁,notify不会释放锁,wait回来继续执行的时候还需要拿到锁才会继续执行
Thread的API
start开启
yied舍弃当前线程cpu执行权,让给其他线程,然后排队,属于从运行状态变为可运行状态
join加入到另一个线程,等待另一个线程执行完之后继续执行当前线程,属于运行状态变为wait状态
park等待唤醒,不会阻塞
interrupt可以打断睡眠状态和阻塞状态,但是需要对异常进行处理
volatile
1,线程可见性 volatile尽量修饰简单的值,不要修饰引用的值,如果修饰的是一个对象,当对象内部属性改变之后volatile是不可见的
2,阻止指令重排序
3,不能保证原子性
Timer
Schedule(task, Date) 在未来时间执行一次任务,也可以多个任务在之前的时间执行
Schedule(TimerTask task, Date firsttime,long period) 此方法用于在指定的时间执行一次之后任务之后,在指定的period的时间间隔后不停的执
行任务
Schedule(TimerTask task, long delay)以当前时间为参考,在延迟指定的秒数后执行一次性任务
Schedule(TimerTask task, long delay,long period)以当前时间为参考,在延迟指定的秒数后执行一次性任务并且在period后重复执行任务,执行时间是从上次任务结束时间开始计算。凡是带period的都会在时间间隔后重复执行。
Schedule和ScheduleAtFixedRate的区别在于,如果指定开始执行的时间在当前系统运行时间之前,scheduleAtFixedRate会把已经过去的时间也作为周期执行,而schedule不会把过去的时间算上
ScheduleAtFixedRate(TimerTask task,long delay,long period) 有延迟,下次任务相对于上次任务的开始时间开始执行:
scheduleAtFixedRate(TimerTask task,Date startTime,long period) 指定开始执行的时间在当前系统运行时间之前,会把已经过去的时间也作为周期执行TimerTask有一个cancel方法用于将当前任务从任务队列删除。(也就是队列中不会执行该任务,如果正在执行中调用该方法会执行完毕)
Timer的cancel方法用于清空所有任务队列(如果有任务正在执行会等任务执行完清空;有时候不一定会停掉,因为cancel方法不一定能抢到queue对象的锁)
并发库
atomic AtomicInteger 保证原子性 所有的Atomic开头的类都是通过UnSafe里面compareAndSwap的CAS操作来实现的
LongAdder保证原子性,底层是分段锁,每个锁分开计算最后求总的数据,在线程较多时,自增比atomicInteger和synchronized更快,由于synchronized需要申请系统资源,因此synchronized比atomicInteger慢
AQS队列同步器
reentantLock可以替代synchronized,必须手动解锁,即在finally执行解锁,
synchronized是系统自带的,属于自动解锁,只要任务执行完就解锁了,reentantLock可以出现各种condition,也就是不同的等待队列,reentantLock底层是CAS的实现,而synchronized默认经过4种状态的锁升级
reentantLock可以tryLock尝试锁定,如果锁不了可以不继续拿锁,做其他事情,也就是reentantLock不会阻塞
reentantLock可以用lockInterruptibly申请打断锁,也就是上锁后能被打断
reentantLock构造方法传true可以申请公平锁,公平锁底层是队列结构,先到的线程排在队列前面,依次往后排,讲求先来后到,而默认的非公平锁是谁抢到算谁的。synchronized默认非公平锁。
CAS操作存在ABA问题,ABA问题即一条线程拿到一条数据做了更改,另一个线程拿过去改了之后又改回来,然后原来的线程发现值并没变,其实值已经被改过,虽然是同一个值,解决办法:加版本号控制
CountDownLatch t.join()方法只会使主线程(或者说调用t.join()的线程)进入等待池并等待t线程执行完毕后才会被唤醒。并不影响同一时刻处在运行状态的其他线程。
CyclicBarrier同步屏障
如何让多线程发挥最大效率?因为单核cpu是线程调度,cpu在同一时刻只能执行一条计算机指令,如何让多核cpu同时对多个线程执行指令?
MarriagePhaser阶段锁
ReadWriteLock 读写锁 读锁加了之后,如果是正在读这条数据,其他线程也能进来读,如果是写线程,需要在外面等待,如果正在写这条数据,那
么其他线程即不能进来读也不能进来写。所以写锁的好处是,大大的提高了读的效率,如果说一边读一边写,写的时候会自动加锁,读的时候无法访问
SELECT ... LOCK IN SHARE MODE 加读锁
SELECT ... FOR UPDATE 加写锁,update语句自动加锁
semaphore信号量,信号量指定个数,如果个数为0,其他线程等待,如果有,其他线程拿取,执行。作用:限流,最多允许多少线程同时运行。
它与FixedThreadPool的区别是,FixedThreadPool线程池内,永远只有固定个数的线程,比如5个。而semaphore可以有无数个线程,但是同时运行的个数只能是信号量的固定个数。场景,分批发短信
exchanger交换者,线程之间交换数据用的,只能两个线程交换,如果3个,可以自定义逻辑
locksupport有park和unpark方法,类似于wait/notify,不过locksupport不需要同步代码块,并且可以先unpark提前告知已解锁,park时不会被阻塞,locksupport基于Unsafe
park-unpark和wait-notify的作用是一样的,当然这两个有一点不一样就是park-unpark是可以顺序反着来的,
可以先unpark(发放许可),然后park,在park的时候不会阻塞;
但是wait-notify就不行,如果先执行notify,再wait的时候,那么当前线程会一直处于阻塞状态
阻塞队列
ArrayBlockingQueue是定长的阻塞队列,静态数组结构,不存在扩容机制,锁是用的ReentrantLock,锁对象就是数组对象,保证先进先出。
LinkedBlockingQueue是链表结构,node节点,存取分别是不同的node节点,所以采用的是锁分离技术,也就是上两把锁,写与写之间是互斥的,读与读之间也是互斥的,所以读写之间互不排斥,性能更高,存putLock,取takeLock,也保证先进先出。
SynchronousQueue 使用cas加自旋达到乐观锁的效果,自旋达到一定次数之后采用LockSupport.unpark()来阻塞,以公平设置为true的队列以先进先出顺序授予线程访问权限
LinkedTransferQueue 与SynchronousQueue差不多,不同点在于存元素LinkedTransferQueue 能控制是否阻塞
PriorityBlockingQueue优先级队列 使用数组加二叉堆,存不会阻塞二叉堆结构无限堆上去,出队会阻塞。锁是用的ReentrantLock。
优先级队列和通常的栈和队列一样,只不过里面的每一个元素都有一个"优先级”,在处理的时候,首先处理优先级最高的。如果两个元素具有相同的优先级,则按照他们插入到队列中的先后顺序处理。优先级队列可以通过链表,数组,堆或者其他数据结构实现
LinkedBlockingDeque和LinkedBlockingQueue差不多,但是LinkedBlockingDeque是双向链表,即可以从头存也可以从尾存,锁是用的ReentrantLock
DelayQueue延时队列,和PriorityBlockingQueue差不多存不会阻塞,取会阻塞同时取的时候需要判断堆顶元素是否过期。
什么是守护线程
在t.start()之前设置t.setDaemon(true)即守护线程,主线程执行完之后守护线程立即销毁,不会继续运行
synchronizedCollection,synchronizedMap,synchronizedList,synchronizedSet
Collections.synchronizedList(new ArrayList())同步容器
public class Demo1 { public static void main(String[] args){ List<String> list = Collections.synchronizedList(new ArrayList<String>()); list.add("1"); list.add("2"); list.add("3"); synchronized (list) { for (String s : list) { System.out.println(s); } } } }
执行add()等方法的时候是加了synchronized关键字的,但是listIterator(),iterator()却没有加.所以在使用的时候需要加上synchronized
ThreadFactory的作用,工厂模式,将创建线程并且控制线程数量的任务交给工厂来完成
public class TestThreadFactory { public static void main(String[] args) { //创建线程(并发)池,自动调节线程池大小 ExecutorService es = Executors.newCachedThreadPool(new WorkThreadFactory()); //同时并发5个工作线程 es.execute(new WorkRunnable()); es.execute(new WorkRunnable()); es.execute(new WorkRunnable()); es.execute(new WorkRunnable()); es.execute(new WorkRunnable()); //指示当所有线程执行完毕后关闭线程池和工作线程,如果不调用此方法,jvm不会自动关闭 es.shutdown(); try { //等待线程执行完毕,不能超过2*60秒,配合shutDown es.awaitTermination(2*60, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } } class WorkThreadFactory implements ThreadFactory { private AtomicInteger atomicInteger = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { int c = atomicInteger.incrementAndGet(); System.out.println("create no " + c + " Threads"); return new WorkThread(r, atomicInteger);//通过计数器,可以更好的管理线程 } } class WorkRunnable implements Runnable{ @Override public void run() { System.out.println("complete a task"); } } class WorkThread extends Thread{ private Runnable target; //线程执行目标 private AtomicInteger counter; public WorkThread(Runnable target, AtomicInteger counter) { this.target = target; this.counter = counter; } @Override public void run() { try { target.run(); } finally { int c = counter.getAndDecrement(); System.out.println("terminate no " + c + " Threads"); } } }
Executor框架
通过Executor框架来启动线程比使用Thread的start方法更好,
除了更易管理,效率更好(用线程池实现,节约开销)外,
还有关键的一点:有助于避免this逃逸问题——如果我们在构造器中启动一个线程,因为另一个任务可能会在构造器结束之前开始执行,此时可能会访问到初始化了一半的对象用Executor在构造器中。
Executor作为灵活且强大的异步执行框架,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务的线程相当于消费者,并用Runnable来表示任务,Executor的实现还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制。
ThreadPoolExecutor是自定义线程池,默认的四种线程池都是基于ThreadPoolExecutor实现的
Executor 和 ExecutorService 这两个接口主要的区别是:ExecutorService 接口继承了 Executor 接口,是 Executor 的子接口
Executor 和 ExecutorService 第二个区别是:Executor 接口定义了 execute()方法用来接收一个Runnable接口的对象,而 ExecutorService 接口中的 submit()方法可以接受Runnable和Callable接口的对象。
Executor 和 ExecutorService 接口第三个区别是 Executor 中的 execute() 方法不返回任何结果,而 ExecutorService 中的 submit()方法可以通
过一个 Future 对象返回运算结果。
Executor 和 ExecutorService 接口第四个区别是除了允许客户端提交一个任务,ExecutorService 还提供用来控制线程池的方法。比如:调用
shutDown() 方法终止线程池。
Executors类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。Executors 类提供工厂方法用来创建不同类型的线程池。
CompletionService接口定义了一组任务管理接口:
submit() - 提交任务
take() - 获取任务结果
poll() - 获取任务结果
ExecutorCompletionService类是CompletionService接口的实现
ExecutorCompletionService内部管理者一个已完成任务的阻塞队列
ExecutorCompletionService引用了一个Executor, 用来执行任务
submit()方法最终会委托给内部的executor去执行任务
take/poll方法的工作都委托给内部的已完成任务阻塞队列
如果阻塞队列中有已完成的任务, take方法就返回任务的结果, 否则阻塞等待任务完成
poll与take方法不同, poll有两个版本:
无参的poll方法 --- 如果完成队列中有数据就返回, 否则返回null
有参数的poll方法 --- 如果完成队列中有数据就直接返回, 否则等待指定的时间, 到时间后如果还是没有数据就返回null
ExecutorCompletionService主要用与管理异步任务 (有结果的任务, 任务完成后要处理结果)
ExecutorCompletionService是如何执行任务, 又是如何将任务的结果存储到完成队列中的呢?
ExecutorCompletionService在submit任务时, 会创建一个QueueingFuture, 然后将创建的QueueingFuture丢给executor, 让executor完成任务的执行工作
QueueingFuture继承与FutureTask类, 而FutureTask实现了两个接口Runnable和Future.
Runnable一般表示要执行的任务的过程, 而Future则表述执行任务的 结果 (或者说是任务的一个句柄, 可获取结果, 取消任务等).
因此FutureTask就是一个有结果可期待的任务. FutureTask实现了run方法, 我们指定此方法一般是在在工作线程(不是submit线程) 执行的。
FutureTask构造的时候需要一个Callable<V>参数, Callable表示一个任务的执行过程, 在run方法中恰好调用了Callable.call(), 也就是任务工作在工作线程中执行.
那么任务执行完了会返回结果, 这个结果是要在submit线程(就是提交任务的线程)中使用的, 那么如何让submit线程可以反问到呢? 答案也是在FutureTask类中, 我们可以看到run方法中执行任务(Callable.call())获取结果后, 会掉用一个set()方法, set() 将获取的结果存储到FuturnTask的一个outcome字段中, 这个过程是同步的, 所以其他线程稍后访问是可以读取到值的
ExecutorCompletionService中的完成队列中正好存储的是FuturnTask的子类, 当然可以调用FutureTask的get方法, FutureTask的get方法就是获取outcome值 (get()方法中调用了report()方法, report中返回了outcome字段).
发令枪 (也叫倒计数锁存器)
倒计数锁存器(CountDown Latch)是异常性障碍,允许一个或多个线程等待一个或者多个其他线程来做某些事情。
再比如:主线程中开4个线程A、B、C、D去执行任务,分别是计算上一年1、2、3、4四个季度的某某数据,主线程一直等待到 A、B、C、D 执行完毕(数据计算完毕)后再进行数据汇总。
使用发令枪完成一个100减到1的实例
public class CountDownLatchTest {
public static int nomal = 100;
// TODO 发令枪数量设置
private static final int N = 10;
public static void main(String rgs[]) throws InterruptedException {
//构造一个用给定计数初始化的 CountDownLatch。
CountDownLatch doneSignal = new CountDownLatch(1);
Executor service = Executors.newCachedThreadPool();
for (int i = 0; i < N; ++i)
//假如实际开的线程小于CountDownLatch构造器中的值,那么主线程会永远休眠
//假如实际开的线程大于CountDownLatch构造器中的值,那么当锁存器计数减到0时,主线程会提交运行
service.execute(new WorkerRunnable(doneSignal, i));
//使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间
doneSignal.await();
System.out.println("主线程继续执行");
((ExecutorService) service).shutdownNow();
}
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
doWork(i);
//递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
doneSignal.countDown();
}
// TODO 其他业务测试在此方法内修改代码即可
private synchronized void doWork(int i) {
CountDownLatchDemo.print(i);;
}
}
public class CountDownLatchDemo {
public static void print(int i){
while (CountDownLatchTest.nomal>0){
CountDownLatchTest.nomal-=1;
System.out.println("当前线程:"+i+" nomal:"+CountDownLatchTest.nomal);
}
}
}
关于ThreadPoolExecutor
引入线程池的好处:
1、重用线程池中的线程,避免因频繁创建和销毁线程造成的性能消耗。
2、更加有效的控制线程的最大并发数,防止线程过多抢占资源造成的系统阻塞。
3、对线程进行有效的管理。
构造函数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize
核心线程数。在创建线程池之后,默认情况下线程池中并没有任何的线程,而是等待任务到来才创建线程去执行任务,当线程池中的线程数目达到 corePoolSize后,新来的任务将会被添加到缓存队列中,也就是那个workQueue,除非调用ThreadPoolExecutor#prestartAllCoreThreads() 方法或者是 ThreadPoolExecutor # prestartCoreThread() 方法(从这两个方法的名字就可以看出是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或一个线程)。
PS:很多人不知道这个数该填多少合适,其实也不必特别纠结,根据实际情况填写就好,实在不知道,就按照阿里工程师的写法取下列值就好了:
int NUMBER_OF_CORES = Runtime.getRuntime().availableProcessors();
maximumPoolSize
线程池中的最大线程数。表示线程池中最多可以创建多少个线程,很多人以为它的作用是这样的:”当线程池中的任务数超过 corePoolSize 后,线程池会继续创建线程,直到线程池中的线程数小于maximumPoolSize“,其实这种理解是完全错误的。它真正的作用是:当线程池中的线程数等于 corePoolSize 并且 workQueue 已满,这时就要看当前线程数是否大于 maximumPoolSize,如果小于maximumPoolSize 定义的值,则会继续创建线程去执行任务, 否则将会调用去相应的任务拒绝策略来拒绝这个任务。另外超过 corePoolSize的线程被称做"Idle Thread", 这部分线程会有一个最大空闲存活时间(keepAliveTime),如果超过这个空闲存活时间还没有任务被分配,则会将这部分线程进行回收。
keepAliveTime
控制"idle Thread"的空闲存活时间。这个idle Thread就是上面提到的超过 corePoolSize 后新创建的那些线程,默认情况下,只有当线程池中的线程数大于corePoolSize,且这些"idle Thread"并没有被分配任务时,这个参数才会起作用。另外,如果调用了ThreadPoolExecutor#allowCoreThreadTimeOut(boolean) 的方法,在线程池中的线程数不大于corePoolSize,且这些core Thread 也没有被分配任务时,keepAliveTime 参数也会起作用。
unit
参数keepAliveTime的时间单位
workQueue
阻塞队列。如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到该队列当中,注意只要超过了 corePoolSize 就会把任务添加到该缓存队列,添加可能成功也可能不成功,如果成功的话就会等待空闲线程去执行该任务,若添加失败(一般是队列已满),就会根据当前线程池的状态决定如何处理该任务(若线程数 < maximumPoolSize 则新建线程;若线程数 >= maximumPoolSize,则会根据拒绝策略做具体处理)。
常用的阻塞队列有:
1)ArrayBlockingQueue //基于数组的先进先出队列,此队列创建时必须指定大小;
2)LinkedBlockingQueue //基于链表的先进先出队列,如果创建时没有指定此队列大小,
则默认为Integer.MAX_VALUE;
3)synchronousQueue //这个队列比较特殊,它不会保存提交的任务,
而是将直接新建一个线程来执行新来的任务。
threadFactory
线程工厂。用来为线程池创建线程,当我们不指定线程工厂时,线程池内部会调用Executors.defaultThreadFactory()
创建默认的线程工厂,其后续创建的线程优先级都是Thread.NORM_PRIORITY
。如果我们指定线程工厂,我们可以对产生的线程进行一定的操作。
handler
拒绝执行策略。当线程池的缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
ThreadPoolExecutor.AbortPolicy: // 丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy: // 也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy: // 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy: // 由调用线程处理该任务