1. 同步器
1.1 CountDownLatch
- 当一个或多个线程调用
await()
时,这些线程会阻塞; - 其它线程调用
countDown()
会将计数器减 1(调用该方法的线程不会阻塞); - 当计数器的值变为 0 时(减少计数),因
await()
阻塞的线程会被唤醒,继续执行。
// 案例:秦灭六国
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// [减少计数] 让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒
CountDownLatch cd = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName() + "被灭了 ...");
cd.countDown();
}, String.valueOf(i)).start();
}
cd.await(); // 等待计数器归零,然后再向下执行。
System.out.println(Thread.currentThread().getName() + "一统天下 ...");
}
}
1.2 CyclicBarrier
CyclicBarrier 的字面意思是可循环(Cyclic) 使用的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫“同步点”)时被阻塞(线程进入屏障是通过 await()
方法),直到最后一个线程到达屏障时,屏障才会开门。只有等屏障开了,所有被屏障拦截的线程才会继续干活。
示例代码:
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("七龙珠集齐!召唤神龙!");
});
for (int i = 1; i <= 7; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"星龙珠被收集");
try {
cyclicBarrier.await(); // 线程阻塞
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"星龙珠合成...");
}, String.valueOf(i)).start();
}
}
}
打印结果:
2星龙珠被收集
5星龙珠被收集
4星龙珠被收集
1星龙珠被收集
3星龙珠被收集
7星龙珠被收集
6星龙珠被收集
七龙珠集齐!召唤神龙!
6星龙珠合成...
4星龙珠合成...
5星龙珠合成...
2星龙珠合成...
7星龙珠合成...
3星龙珠合成...
1星龙珠合成...
1.3 Semaphore
信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。在信号量上我们定义 2 种操作:
- acquire(获取):当一个线程调用 acquire 操作时,它要么通过成功获取信号量(信号量减 1),要么一直等下去,直到有线程释放信号量或超时。
- release(释放):实际上会将信号量的值加 1,然后唤醒等待的线程。
示例代码:
public class SemaphoreDemo {
public static void main(String[] args) {
// 3 个停车位
Semaphore sp = new Semaphore(3);
// 6 辆汽车
for (int i = 1; i <= 6; i++) {
new Thread(()->{
try {
sp.acquire();
System.out.println(Thread.currentThread().getName() + "号车驶入停车位");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "号车驶出停车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
sp.release();
}
}, String.valueOf(i)).start();
}
}
}
打印结果:
1号车驶入停车位
3号车驶入停车位
2号车驶入停车位
1号车驶出停车位
3号车驶出停车位
4号车驶入停车位
2号车驶出停车位
6号车驶入停车位
5号车驶入停车位
6号车驶出停车位
5号车驶出停车位
4号车驶出停车位
2. 读写锁
class MyCache {
private volatile Map<String, Object> map = new HashMap<>();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public void put(String key, Object value) {
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "开始写入 ...");
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写入完毕 ...");
} finally {
lock.writeLock().unlock();
}
}
public Object get(String key) {
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "开始读入 ...");
Object value = map.get(key);
System.out.println(Thread.currentThread().getName() + "读入完毕 ...");
return value;
} finally {
lock.readLock().unlock();
}
}
}
3. 阻塞队列
3.1 概念
- 试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素。
- 试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增。
3.2 用处
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起。
为什么需要 BlockingQueue?
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue 都给你一手包办了。
在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
3.3 架构&种类
- ArrayBlockingQueue:由数组结构组成的有界阻塞队列
- LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为 integer.MAX_VALUE)阻塞队列
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列
- DelayQueue:使用优先级队列实现的延迟无界阻塞队列
- SynchronousQueue:同步队列,不存储元素的阻塞队列,也即单个元素的队列
- LinkedTransferQueue:由链表组成的无界阻塞队列
- LinkedBlockingDeque:由链表组成的双向阻塞队列
3.4 示例代码
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class BlockQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 1. 抛出异常
// throwExTest();
// 2. 特殊值
// retBoolTest();
// 3. 阻塞
// blockingTest();
// 4. 超时退出
timeoutTest();
}
private static void timeoutTest() throws InterruptedException {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
System.out.println(queue.offer("a")); // true
System.out.println(queue.offer("b")); // true
System.out.println(queue.offer("c")); // true
System.out.println(queue.offer("d", 3, TimeUnit.SECONDS)); // false
System.out.println(queue.poll()); // a
System.out.println(queue.poll()); // b
System.out.println(queue.poll()); // c
System.out.println(queue.poll(4, TimeUnit.SECONDS)); // null
}
private static void blockingTest() throws InterruptedException {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
queue.put("a");
queue.put("b");
queue.put("c");
// queue.put("d"); -> blocking ...
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
// System.out.println(queue.take()); -> blocking ...
}
private static void retBoolTest() {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
System.out.println(queue.offer("a")); // true
System.out.println(queue.offer("b")); // true
System.out.println(queue.offer("c")); // true
System.out.println(queue.peek()); // a
System.out.println(queue.offer("d")); // false
System.out.println(queue.poll()); // a
System.out.println(queue.poll()); // b
System.out.println(queue.poll()); // c
System.out.println(queue.poll()); // null
}
private static void throwExTest() {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
// 1. 添加
System.out.println(queue.add("a")); // true
System.out.println(queue.add("b")); // true
System.out.println(queue.add("c")); // true
System.out.println(queue.element()); // a
// System.out.println(queue.add("d")); -> IllegalStateException: Queue full
// 2. 移除
System.out.println(queue.remove());
System.out.println(queue.remove());
System.out.println(queue.remove());
// System.out.println(queue.remove()); -> NoSuchElementException
}
}
4. AQS
5. 线程池
5.1 引入
10 年前单核 CPU 电脑,假的多线程,CPU 需要来回切换;现在是多核电脑,多个线程各自跑在独立的 CPU 上,不用切换效率高。
线程池的优势:线程池做的工作主要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果任务数量超过了线程最大数量,超出数量的任务排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
线程池的主要特点:线程复用;控制最大并发数;管理线程
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
- 提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控。
5.2 使用
5.2.1 架构说明
Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor,Executors,ExecutorService,ThreadPoolExecutor 这几个类。
5.2.2 有关 API
- Executors.newSingleThreadExecutor():一个任务一个任务的执行,一池一线程。
- Executors.newFixedThreadPool(N):执行长期任务性能好,创建一个有 N 个固定的线程的线程池。
- Executors.newCachedThreadPool():执行很多短期异步任务,线程池根据需要创建新线程,但在先前构建的线程可用时将重用它们。可扩容,遇强则强。
示例代码:
public class ThreadPoolTest {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(3); // 固定个数
ExecutorService threadPool2 = Executors.newSingleThreadExecutor(); // 单个
ExecutorService threadPool3 = Executors.newCachedThreadPool(); // 可扩展
for (int i = 1; i <= 500; i++) {
threadPool3.execute(()->{
System.out.println(Thread.currentThread().getName() + "处理任务");
});
}
threadPool.shutdown();
threadPool2.shutdown();
threadPool3.shutdown();
}
}
5.2.3 底层源码
ThreadPoolExecutor
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 || maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize:线程池中的常驻核心线程数(惰性加载)
- maximumPoolSize:线程池中能够容纳同时执行的最大线程数,此值必须 >= 1。
- keepAliveTime:多余的空闲线程的存活时间。当池中线程数量超过 corePoolSize 并且线程空闲时间达到 keepAliveTime 时,多余线程会被销毁直到只剩下 corePoolSize 个线程为止。
- unit:keepAliveTime 的单位
- workQueue:任务队列,被提交但尚未被执行的任务。
- threadFactory:表示生成线程池中工作线程的线程工厂。用于创建线程,一般默认的即可。
- handler:拒绝策略。表示当队列满了,并且工作线程大于等于线程池的 maximumPoolSize 时如何来拒绝请求执行的 Runnable 的策略。
在工作中单一的/固定数的/可变的 3 种创建线程池的方法哪个用的多?注意,哪个都不用!
5.3 工作原理
1. 在创建了线程池后,线程池中的线程数为 0。
2. 当调用 execute()
方法添加一个请求任务时,线程池会做出如下判断:
- 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
- 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
- 如果这个时候队列满了且正在运行的线程数量还小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个新来的任务;
- 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
3. 当一个线程完成任务时,它会从队列中取下一个任务来执行。
4. 当一个线程无事可做超过一定的时间(keepAliveTime) 时,线程会判断:如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以,当线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
5.4 拒绝策略
等待队列已经排满了,再也塞不下新任务了。同时,线程池中的 max 线程也达到了,无法继续为新任务服务。这个时候我们就需要拒绝策略机制合理的处理这个问题。
示例代码:20 个任务
public class ThreadPoolTest {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
3L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
try {
for (int i = 0; i < 20; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
JDK内置的拒绝策略(以下均实现了 RejectedExecutionHandle<I>
):
- AbortPolicy(默认):直接抛出 RejectedExecutionException 阻止系统正常运行。
pool-1-thread-1 pool-1-thread-4 pool-1-thread-1 pool-1-thread-1 pool-1-thread-3 pool-1-thread-2 pool-1-thread-4 pool-1-thread-5 java.util.concurrent.RejectedExecutionException: Task cn.edu.nuist.threadpool .ThreadPoolTest$$Lambda$1/821270929@85ede7b rejected from java.util.concurrent .ThreadPoolExecutor@5674cd4d [Running, pool size = 5, active threads = 0, queued tasks = 0, completed tasks = 8]
- CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
pool-1-thread-1 pool-1-thread-3 pool-1-thread-2 main <- 将某些任务回退到调用者 pool-1-thread-3 pool-1-thread-1 pool-1-thread-3 pool-1-thread-5 main <- 将某些任务回退到调用者 pool-1-thread-4 pool-1-thread-2 pool-1-thread-3 pool-1-thread-1 pool-1-thread-2 pool-1-thread-4 pool-1-thread-1 pool-1-thread-2 pool-1-thread-3 pool-1-thread-1 pool-1-thread-4
- DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中尝试再次提交当前任务。
pool-1-thread-1 pool-1-thread-4 pool-1-thread-3 pool-1-thread-3 pool-1-thread-2 pool-1-thread-1 pool-1-thread-4 pool-1-thread-5
- DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。
pool-1-thread-1 pool-1-thread-3 pool-1-thread-2 pool-1-thread-1 pool-1-thread-4 pool-1-thread-4 pool-1-thread-4 pool-1-thread-4 pool-1-thread-5 pool-1-thread-4 pool-1-thread-3 pool-1-thread-4 pool-1-thread-5 pool-1-thread-3
5.5 如何设计线程池
5.5.1 回答思路
摘自公众号:yes的练级攻略
这种设计类问题还是一样,先说下理解,表明你是知道这个东西的用处和原理的,然后开始 BB。基本上就是按照现有的设计来说,再添加一些个人见解。
线程池讲白了就是存储线程的一个容器,池内保存之前建立过的线程来重复执行任务,减少创建和销毁线程的开销,提高任务的响应速度,并便于线程的管理。
我个人觉得如果要设计一个线程池的话得考虑池内工作线程的管理、任务编排执行、线程池超负荷处理方案、监控等方面。
要将初始化线程数、核心线程数、最大线程池都暴露出来可配置,包括超过核心线程数的线程空闲消亡相关配置。
然后任务的存储结构也得可配置,可以是无界队列也可以是有界队列,也可以根据配置,分多个队列来分配不同优先级的任务,也可以采用 stealing 的机制来提高线程的利用率。
再提供配置来表明此线程池是 IO 密集型还是 CPU 密集型来改变任务的执行策略。
超负荷的方案可以有多种,包括丢弃任务、拒绝任务并抛出异常、丢弃最旧的任务或自定义等等。
至于监控的话,线程池设计要埋好点,暴露出用于监控的接口,如已处理任务数、待处理任务数、正在运行的线程数、拒绝的任务数等等信息。
我觉得基本上这样答就差不多了,等着面试官的追问就好。注意不需要跟面试官解释什么叫核心线程数之类的,都懂的没必要。
当然这种开放型问题还是仁者见仁智者见智,我这个不是标准答案,仅供参考。建议把线程池相关的关键字都要说出来,表面你对线程池的内部原理的理解是透彻的。
5.5.2 IO/CPU 密集型
- CPU 密集型(CPU-bound)
- CPU 密集型也叫计算密集型,指的是系统的硬盘、内存性能相对 CPU 要好很多,此时,系统运作大部分的状况是 CPU Loading 100%,CPU 要读/写 I/O(硬盘/内存),I/O 在很短的时间就可以完成,而 CPU 还有许多运算要处理,CPU Loading 很高。
- 在多重程序系统中,大部份时间用来做计算、逻辑判断等 CPU 动作的程序称之 CPU bound。例如一个计算圆周率至小数点一千位以下的程序,在执行的过程当中绝大部份时间用在三角函数和开根号的计算,便是属于 CPU bound 的程序。CPU bound 的程序一般而言 CPU 占用率相当高。这可能是因为任务本身不太需要访问 I/O 设备,也可能是因为程序是多线程实现因此屏蔽掉了等待 I/O 的时间。
- IO 密集型(I/O bound)
- IO 密集型指的是系统的 CPU 性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是 CPU 在等 I/O (硬盘/内存) 的读/写操作,此时 CPU Loading 并不高。
- I/O bound 的程序一般在达到性能极限时,CPU 占用率仍然较低。这可能是因为任务本身需要大量 I/O 操作,而 pipeline 做得不是很好,没有充分利用处理器能力。
CPU密集型 vs IO密集型
我们可以把任务分为计算密集型和 IO 密集型。
计算密集型任务的特点是要进行大量的计算,消耗 CPU 资源,比如计算圆周率、对视频进行高清解码等等,全靠 CPU 的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU 执行任务的效率就越低,所以,要最高效地利用 CPU,计算密集型任务同时进行的数量应当等于 CPU 的核心数。计算密集型任务由于主要消耗 CPU 资源,因此,代码运行效率至关重要。Python 这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用 C 语言编写。
涉及到网络、磁盘 IO 的任务都是 IO 密集型任务,这类任务的特点是 CPU 消耗很少,任务的大部分时间都在等待 IO 操作完成(因为 IO 的速度远远低于 CPU 和内存的速度)。对于 IO 密集型任务,任务越多,CPU 效率越高,但也有一个限度。常见的大部分任务都是 IO 密集型任务,比如 Web 应用。IO 密集型任务执行期间,99% 的时间都花在 IO 上,花在 CPU 上的时间很少,因此,用运行速度极快的 C 语言替换用 Python 这样运行速度极低的脚本语言,完全无法提升运行效率。对于 IO 密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C 语言最差。
总之,计算密集型程序适合 C 语言多线程,I/O 密集型适合脚本语言开发的多线程。
6. 分支合并框架
Fork:把一个复杂任务进行分拆,大事化小;Join:把分拆任务的结果进行合并。
- ForkJoinPool:分支合并池
- ForkJoinTask:类比“FutureTask”
- RecursiveTask:递归任务,继承后可以实现递归调用的任务
public class ForkJoinDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask myTask = new MyTask(0, 100);
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
System.out.println(forkJoinTask.get());
}
}
class MyTask extends RecursiveTask<Integer> {
public static final int CALCULATE_RANGE = 10;
private int begin;
private int end;
private int result;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
if ((end - begin) <= CALCULATE_RANGE) {
for (int i = begin; i <= end; i++) {
result += i;
}
} else {
int mid = (begin + end) >> 1;
MyTask task1 = new MyTask(begin, mid);
MyTask task2 = new MyTask(mid+1, end);
task1.fork();
task2.fork();
result = task1.join() + task2.join();
}
return result;
}
}
7. 异步回调
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(()->{
System.out.println(Thread.currentThread().getName()+" runAsync");
});
completableFuture1.get(); // 阻塞直至获取结果
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+" supplyAsync");
int i = 10/0;
return 1024;
});
// 异步回调
completableFuture2.whenComplete((correctRet, exceptionInfo) -> {
System.out.println("correctRet=" + correctRet);
System.out.println("exceptionInfo=" + exceptionInfo);
}).exceptionally(e -> {
System.out.println("exception:" + e.getMessage());
return 1101;
}).get();
}
}