Java线程池相关类
// 最顶层接口,仅提供execute方法
public interface Executor {
void execute(Runnable command);
}
// ExecutorService,扩展自Executor,提供了对任务的管理能力
public interface ExecutorService extends Executor {
// 停止接受新任务
void shutdown();
// 停止在执行的任务
List<Runnable> showdownNow();
// 等待所有任务结束(完成、中断或超时)
boolean awaitTermination(long timeout, TimeUnit unit);
// 以及各种类型的submit方法,返回Future对象
<T> Future<T> submit(Callable<T> task);
// 以及批量处理任务
<T> T invokeAny(Collection<> tasks);
<T> List<Future<T>> invokeAll(Collection<> tasks);
}
// 其中涉及的Future接口
public interface Future<V> {
// 取消执行任务
boolean cancel(boolean mayInterruptIfRunning);
//是否完成
boolean isDone();
// 等待任务执行结束并返回结果
V get();
V get(long timeout, TimeUnit unit);
}
// ExecutorService的默认抽象实现
public abstract class AbstractExecutorService implements ExecutorService {
// 基础方法,用于生成Future对象
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
// submit仅把task封装为Future,再调用一次Executor.execute()方法
public <T> Future<T> submit(Callable<T> task) {
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
// 还包含invokeAny、invokeAll的实现
}
// 然后说真正的线程池类
public class ThreadPoolExecutor extends AbstractExecutorService {}
// Fork/Join框架是JDK1.7开始提供的用于并行执行任务的框架,它的思想是将一个大任务分割成若干小任务,最终汇总
// 线程池和ForkJoin池都是直接继承自AbstractExecutorService
public class ForkJoinPool extends AbstractExecutorService {}
public class ThreadPoolExecutor {
// ThreadPoolExecutor构造方法
public ThreadPoolExecutor (
int corePoolSize, // 线程池中核心的线程数(即使空闲也保持)
int maximumPoolSize, // 允许最多的线程数
long keepAliveTime, // 多出的线程空闲多久回收
TimeUnit unit, // 回收的时间单位
BlockingQueue<Runnable> workQueue, // 工作队列,用于存放已提交的任务
ThreadFactory threadFactory, // 线程工厂,工厂方法:Thread newThread(Runnable r);
RejectedExecutionHandler handler // 拒绝执行时的处理回调
) {
//
}
// 提交任务,分三步走:
// 1. 如果线程数少于核心线程数,则创建新线程
// 2. 否则尝试加入工作队列(加入之后会再次检查是否需要创建新线程)
// 3. 如果不能加入工作队列,则创建新线程,否则拒绝
public void execute(Runnable command) {
// 线程数少时直接开启新线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) return;
}
// 加入队列
if (isRunning(c) && workQueue.offer(command)) {
// double check
} else if (!addWorker(command, false)) {
reject(command);
}
}
// 线程池的Worker继承自AQS,定义了lock、unlock等方法
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
//
}
}
public class ForkJoinPool {
// ForkJoinPool构造方法
public ForkJoinPool(
int parallelism, // 并行度,默认取CPU的核心数
ForkJoinWorkerThreadFactory factory, // worker线程的工厂类
UncaughtExceptionHandler handler, // 异常处理回调
boolean asyncMode // 控制工作队列的工作模式,如果为true则使用FIFO(从尾取),否则LIFO(从头取)
) {}
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
// externalPush();
}
public <T> ForkJoinTask<T> submit(Callable<T> task) {
// externalPush(new ForkJoinTask.AdaptedCallable<T>(task));
}
public <T> ForkJoinTask<T> submit(Runnable<T> task) {
// externalPush(new ForkJoinTask.AdaptedRunnableAction<T>(task));
}
// ForkJoinTask是一个抽象类
// 递归的RecursiveTask(有返回值)
// 递归的RecursiveAction(无返回值)
public static class XXXTask extends RecursiveTask<Integer> {
// 递归任务需实现compute方法
// 该方法主要实现任务量的拆分(分治思想)
@Override
public Integer compute() {
// this => XXXTask left, right;
return left.join() + right.join();
}
}
}
// 最后看看Executors工具类
public class Executors {
// 创建拥有固定线程数的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads, // corePoolSize = nThreads
nThreads, // maximumPoolSize = nThreads
0L, // 不超时
MILLISECONDS,
new LinkedBlockingQueue<Runnable>() // 无界队列
);
}
// 创建ForkJoin线程池
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool(
parallelism, // 并行度
ForkJoinPool.defaultForkJoinWorkerThreadFactory, // 默认的线程工厂
null, // 不设置异常处理
true // 异步模式
);
}
// 创建只有一个线程的线程池
public static ExecutorService newSingleThreadExecutor() {
//
}
// 创建一个可缓存的线程池(需要时创建,不需要时回收)
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0, // 初始线程数
Integer.MAX_VALUE, // 不限制最大数
60L, // 空闲60秒回收
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>() // 同步队列(长度为0,因为需要时直接创建线程,不需要排队)
);
}
// 创建一个可以调度任务的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(
corePoolSize, //
Integer.MAX_VALUE, // 默认的可调度任务的线程池不限制最大线程数
0, // 而且不会过期
NANOSECONDS,
new DelayedWorkQueue()
);
}
}
// 补充一下ScheduledExecutorService
public interface ScheduledExecutorService extends ExecutorService {
// 延迟执行
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
// 固定频率执行(按开始时间)
ScheduledFuture<?> scheduleAtFixedRate(
Runnable command,
long initialDelay, // 第一次执行的延迟时间
long period, // 每个固定的时间间隔后执行
TimeUnit unit
);
// 固定延迟时间执行(按上一次完成时间)
ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command,
long initialDelay,
long delay,
TimeUnit unit
);
}
}
最后再说一点:
阿里开发规范为什么不允许直接使用Executors创建线程池?
是因为Executors还是通过调用ThreadPoolExecutor实现,但是对于最大线程数和队列提供了默认方式,该方式容易被调用者忽略从而导致OOM问题,所以需要开发者根据实际情况选择合适的参数。