图来
上述图片表述的线程池的 其中 FutureTask依赖了 callable
public interface Executor { // 一个运行的命令将在某时 执行, // 一个新的线程,或者线程池,或者 calling void execute(Runnable command); }
public abstract class AbstractExecutorService implements ExecutorService { // 生成 FutureTask protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) // 提交任务 public Future<?> submit(Runnable task) public <T> Future<T> submit(Runnable task, T result) public <T> Future<T> submit(Callable<T> task) // 批量提交任务, 不过any(完成一个就好) 和all(都要完成) 有不同 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos); public <T> T invokeAny(Collection<? extends Callable<T>> tasks); public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit); public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks); public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) }
接下来看看 ThreadPoolExecutor
/** * 1. 通常通过Executors {factory methods} 构造 * 2. 如果提交的时候running的线程小于corePoolSize那么 会直接运行 * 3. 如果running的线程>corePoolSize && < maximumPoolSize && queue满了 那么也会运行这个提交 * 4. corePoolSize,maximumPoolSize 可以动态设定 * 5. 核心线程被初始化在第一个任务到达,或者通过[prestartCoreThread, prestartAllCoreThreads],当然你也可以传递一个非空的工作队列,那他也会跑. * 6. 这里面执行的线程都是通过 ThreadFactory (有默认值)创建的(这里面故事挺多的有兴趣的自己看) * 7. 对于当前线程已经超过了corePoolSize的情况下,多余的线程在空闲超过keepAliveTime的情况下才会被释放. * 8. 补充7,allowCoreThreadTimeOut核心线程的回收可设置 * * 9. 队列以及执行 1)小于核心线程数,新任务到达,不管有没有队列,都是直接新建一个核心线程。 2)如果大于等于核心线程数量时在运行,会入队,如不下队列就直接新建线程,如果线程已经达到最大那么就 拒绝 a) SynchronousQueue 直接队列 一个put 一个take,直到 线程MAX b) LinkedBlockingQueue 无界队列 不会有入不了队的情况 c) ArrayBlockingQueue 有界队列 队列有个阀值,入不了对会触发上述的动作. * 10. 拒绝: 1) Executor 停止了 2) 在 有界队列 的情况下队满且线程MAX * 11. 拒绝策略:(RejectedExecutionHandler) 1)AbortPolicy, 抛出运行时异常 2)CallerRunsPolicy 调用者直接运行,不在线程中运行。 3)DiscardPolicy 直接将任务丢弃 4)DiscardOldestPolicy 丢弃队列中头部的任务。 **/
属性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock = new ReentrantLock(); private final HashSet<Worker> workers = new HashSet<Worker>(); private final Condition termination = mainLock.newCondition(); private int largestPoolSize; private long completedTaskCount; private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; private volatile long keepAliveTime; private volatile boolean allowCoreThreadTimeOut; private volatile int corePoolSize; private volatile int maximumPoolSize; private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
哼 未完待续 不想写了